tianyuan-api-server/go-zero服务实现详解.md
2025-07-13 20:37:12 +08:00

745 lines
19 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# go-zero 服务实现详解
## 🔥 核心服务实现
### 1. Gateway API 服务 (HTTP 入口)
#### API 定义 (gateway.api)
```go
syntax = "v1"
info(
title: "天远API网关"
desc: "统一API入口"
version: "v1.0"
)
type (
// 登录请求
LoginReq {
Username string `json:"username"`
Password string `json:"password"`
}
LoginResp {
Token string `json:"token"`
UserInfo UserInfo `json:"userInfo"`
}
UserInfo {
UserId int64 `json:"userId"`
Username string `json:"username"`
EnterpriseId int64 `json:"enterpriseId"`
}
// 数据查询请求 (你的核心业务)
DataQueryReq {
QueryType string `json:"queryType"` // risk/credit/company/data
Parameters map[string]interface{} `json:"parameters"`
}
DataQueryResp {
Success bool `json:"success"`
Data interface{} `json:"data"`
TransactionId string `json:"transactionId"`
RemainingBalance float64 `json:"remainingBalance"`
}
)
@server(
jwt: Auth
group: auth
)
service gateway-api {
@handler LoginHandler
post /api/v1/auth/login (LoginReq) returns (LoginResp)
@handler LogoutHandler
post /api/v1/auth/logout returns ()
}
@server(
jwt: Auth
group: data
middleware: RateLimit,Audit
)
service gateway-api {
@handler RiskAssessmentHandler
post /api/v1/data/risk-assessment (DataQueryReq) returns (DataQueryResp)
@handler CreditCheckHandler
post /api/v1/data/credit-check (DataQueryReq) returns (DataQueryResp)
@handler CompanyInfoHandler
post /api/v1/data/company-info (DataQueryReq) returns (DataQueryResp)
@handler DataQueryHandler
post /api/v1/data/query (DataQueryReq) returns (DataQueryResp)
}
```
#### 核心 Logic 实现 (处理复杂调用链)
```go
// api/gateway/internal/logic/data/risklogic.go
type RiskLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func (l *RiskLogic) RiskAssessment(req *types.DataQueryReq) (resp *types.DataQueryResp, err error) {
// 获取用户信息 (从JWT中解析)
userId := ctxdata.GetUidFromCtx(l.ctx)
// 🔥 调用数据域RPC进行复杂业务处理
dataResp, err := l.svcCtx.DataRpc.ProcessDataRequest(l.ctx, &data.ProcessDataRequestReq{
UserId: userId,
QueryType: "risk-assessment",
Parameters: req.Parameters,
ClientIp: httpx.GetClientIP(l.ctx),
})
if err != nil {
logx.Errorf("调用数据域RPC失败: %v", err)
return nil, err
}
return &types.DataQueryResp{
Success: dataResp.Success,
Data: dataResp.Data,
TransactionId: dataResp.TransactionId,
RemainingBalance: dataResp.RemainingBalance,
}, nil
}
```
#### 服务上下文 (包含所有 RPC 客户端)
```go
// api/gateway/internal/svc/servicecontext.go
type ServiceContext struct {
Config config.Config
// 🔗 RPC客户端连接
UserRpc user.User
DataRpc data.Data
SecurityRpc security.Security
BillingRpc billing.Billing
ProductRpc product.Product
AuditRpc audit.Audit
// 中间件
RateLimit rest.Middleware
AuditMiddleware rest.Middleware
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
// 初始化RPC客户端
UserRpc: user.NewUser(zrpc.MustNewClient(c.UserRpc)),
DataRpc: data.NewData(zrpc.MustNewClient(c.DataRpc)),
SecurityRpc: security.NewSecurity(zrpc.MustNewClient(c.SecurityRpc)),
BillingRpc: billing.NewBilling(zrpc.MustNewClient(c.BillingRpc)),
ProductRpc: product.NewProduct(zrpc.MustNewClient(c.ProductRpc)),
AuditRpc: audit.NewAudit(zrpc.MustNewClient(c.AuditRpc)),
// 初始化中间件
RateLimit: ratelimit.NewRateLimit(c.RateLimit),
AuditMiddleware: auditrpc.NewAuditMiddleware(c.Audit),
}
}
```
### 2. Data RPC 服务 (核心协调者)
#### Proto 定义 (data.proto)
```protobuf
syntax = "proto3";
package data;
option go_package = "./pb";
// 数据处理请求
message ProcessDataRequestReq {
int64 user_id = 1;
string query_type = 2; // risk-assessment/credit-check/company-info/data-query
map<string, string> parameters = 3;
string client_ip = 4;
}
message ProcessDataRequestResp {
bool success = 1;
string data = 2; // JSON格式的业务数据
string transaction_id = 3;
double remaining_balance = 4;
string error_message = 5;
}
service Data {
rpc ProcessDataRequest(ProcessDataRequestReq) returns (ProcessDataRequestResp);
}
```
#### 核心协调逻辑 (你的复杂业务流程)
```go
// rpc/data/internal/logic/orchestrator/dataorchestratorlogic.go
type DataOrchestratorLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func (l *DataOrchestratorLogic) ProcessDataRequest(in *pb.ProcessDataRequestReq) (*pb.ProcessDataRequestResp, error) {
startTime := time.Now()
// === 第1步安全验证 ===
// 1.1 获取用户企业信息
userResp, err := l.svcCtx.UserRpc.GetUserInfo(l.ctx, &user.GetUserInfoReq{
UserId: in.UserId,
})
if err != nil {
return nil, fmt.Errorf("获取用户信息失败: %w", err)
}
// 1.2 IP白名单验证
_, err = l.svcCtx.SecurityRpc.CheckWhitelist(l.ctx, &security.CheckWhitelistReq{
EnterpriseId: userResp.EnterpriseId,
ClientIp: in.ClientIp,
})
if err != nil {
return nil, fmt.Errorf("IP白名单验证失败: %w", err)
}
// 1.3 密钥解密
decryptResp, err := l.svcCtx.SecurityRpc.DecryptSecret(l.ctx, &security.DecryptSecretReq{
EnterpriseId: userResp.EnterpriseId,
EncryptedKey: userResp.EncryptedSecretKey,
})
if err != nil {
return nil, fmt.Errorf("密钥解密失败: %w", err)
}
// === 第2步权限与产品验证 ===
// 2.1 产品权限检查
productResp, err := l.svcCtx.ProductRpc.CheckProductAccess(l.ctx, &product.CheckProductAccessReq{
UserId: in.UserId,
QueryType: in.QueryType,
SecretKey: decryptResp.SecretKey,
})
if err != nil {
return nil, fmt.Errorf("产品权限检查失败: %w", err)
}
// 2.2 余额检查
balanceResp, err := l.svcCtx.BillingRpc.CheckBalance(l.ctx, &billing.CheckBalanceReq{
UserId: in.UserId,
ProductCode: productResp.ProductCode,
QueryType: in.QueryType,
})
if err != nil {
return nil, fmt.Errorf("余额不足: %w", err)
}
// === 第3步执行业务逻辑 ===
var businessResult *BusinessResult
switch in.QueryType {
case "risk-assessment":
businessResult, err = l.processRiskAssessment(in.Parameters)
case "credit-check":
businessResult, err = l.processCreditCheck(in.Parameters)
case "company-info":
businessResult, err = l.processCompanyInfo(in.Parameters)
case "data-query":
businessResult, err = l.processDataQuery(in.Parameters)
default:
return nil, errors.New("不支持的查询类型")
}
if err != nil {
return nil, fmt.Errorf("业务处理失败: %w", err)
}
// === 第4步计费和审计 ===
// 4.1 执行扣费
chargeResp, err := l.svcCtx.BillingRpc.Charge(l.ctx, &billing.ChargeReq{
UserId: in.UserId,
ProductCode: productResp.ProductCode,
Amount: balanceResp.RequiredAmount,
TransactionType: in.QueryType,
RequestId: generateRequestId(),
})
if err != nil {
return nil, fmt.Errorf("扣费失败: %w", err)
}
// 4.2 异步记录审计日志
go func() {
l.svcCtx.AuditRpc.RecordAPICall(context.Background(), &audit.RecordAPICallReq{
UserId: in.UserId,
EnterpriseId: userResp.EnterpriseId,
QueryType: in.QueryType,
ClientIp: in.ClientIp,
TransactionId: chargeResp.TransactionId,
ResponseTime: time.Since(startTime).Milliseconds(),
Status: "success",
})
}()
return &pb.ProcessDataRequestResp{
Success: true,
Data: businessResult.ToJSON(),
TransactionId: chargeResp.TransactionId,
RemainingBalance: chargeResp.RemainingBalance,
}, nil
}
// 🔥 原FLXG逻辑 - 风险评估
func (l *DataOrchestratorLogic) processRiskAssessment(params map[string]string) (*BusinessResult, error) {
// 调用西部数据源
westData, err := l.svcCtx.WestDexClient.QueryRiskData(params)
if err != nil {
return nil, err
}
// 调用百度风控API
baiduData, err := l.svcCtx.BaiduClient.RiskAssessment(params)
if err != nil {
logx.Errorf("百度API调用失败: %v", err)
// 降级处理,只使用西部数据
}
// 数据融合处理
result := &BusinessResult{
Code: "FLXG001",
Data: mergeRiskData(westData, baiduData),
Source: "west+baidu",
}
return result, nil
}
// 🔥 原JRZQ逻辑 - 征信查询
func (l *DataOrchestratorLogic) processCreditCheck(params map[string]string) (*BusinessResult, error) {
// 调用征信API
creditData, err := l.svcCtx.CreditClient.QueryCredit(params)
if err != nil {
return nil, err
}
return &BusinessResult{
Code: "JRZQ001",
Data: creditData,
Source: "credit_bureau",
}, nil
}
```
### 3. 数据库操作 (go-zero Model)
#### 用户模型
```go
// rpc/user/internal/model/usermodel.go
type User struct {
Id int64 `db:"id"`
Username string `db:"username"`
Password string `db:"password"`
Email string `db:"email"`
EnterpriseId int64 `db:"enterprise_id"`
Status int64 `db:"status"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}
type UserModel interface {
Insert(ctx context.Context, data *User) (sql.Result, error)
FindOne(ctx context.Context, id int64) (*User, error)
FindOneByUsername(ctx context.Context, username string) (*User, error)
Update(ctx context.Context, data *User) error
Delete(ctx context.Context, id int64) error
}
type defaultUserModel struct {
conn sqlx.SqlConn
table string
}
func NewUserModel(conn sqlx.SqlConn) UserModel {
return &defaultUserModel{
conn: conn,
table: "`users`",
}
}
func (m *defaultUserModel) FindOneByUsername(ctx context.Context, username string) (*User, error) {
query := fmt.Sprintf("select %s from %s where `username` = ? limit 1", userRows, m.table)
var resp User
err := m.conn.QueryRowCtx(ctx, &resp, query, username)
switch err {
case nil:
return &resp, nil
case sqlc.ErrNotFound:
return nil, ErrNotFound
default:
return nil, err
}
}
```
#### 缓存处理
```go
// rpc/user/internal/logic/user/getuserinfologic.go
func (l *GetUserInfoLogic) GetUserInfo(in *pb.GetUserInfoReq) (*pb.GetUserInfoResp, error) {
// 1. 先查缓存
cacheKey := fmt.Sprintf("user:info:%d", in.UserId)
cached, err := l.svcCtx.RedisClient.Get(cacheKey)
if err == nil && cached != "" {
var userInfo pb.GetUserInfoResp
if json.Unmarshal([]byte(cached), &userInfo) == nil {
return &userInfo, nil
}
}
// 2. 查数据库
user, err := l.svcCtx.UserModel.FindOne(l.ctx, in.UserId)
if err != nil {
return nil, err
}
enterprise, err := l.svcCtx.EnterpriseModel.FindOne(l.ctx, user.EnterpriseId)
if err != nil {
return nil, err
}
resp := &pb.GetUserInfoResp{
UserId: user.Id,
Username: user.Username,
Email: user.Email,
EnterpriseId: user.EnterpriseId,
EnterpriseName: enterprise.Name,
EncryptedSecretKey: enterprise.EncryptedSecretKey,
}
// 3. 写入缓存 (5分钟过期)
respJson, _ := json.Marshal(resp)
l.svcCtx.RedisClient.Setex(cacheKey, string(respJson), 300)
return resp, nil
}
```
## 🚀 部署配置
### Docker 部署
#### 1. 服务 Dockerfile
```dockerfile
# rpc/user/Dockerfile
FROM golang:1.19-alpine AS builder
WORKDIR /build
COPY . .
RUN go mod download
RUN go build -ldflags="-s -w" -o user rpc/user/user.go
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /app
COPY --from=builder /build/user .
COPY --from=builder /build/rpc/user/etc/user.yaml ./etc/
EXPOSE 8001
CMD ["./user", "-f", "etc/user.yaml"]
```
#### 2. Docker Compose (开发环境)
```yaml
# deploy/docker/docker-compose.dev.yml
version: "3.8"
services:
# 基础设施
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root123
MYSQL_DATABASE: tianyuan
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
redis:
image: redis:7-alpine
ports:
- "6379:6379"
etcd:
image: quay.io/coreos/etcd:v3.5.0
environment:
- ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
- ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
ports:
- "2379:2379"
kafka:
image: confluentinc/cp-kafka:latest
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
ports:
- "9092:9092"
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# 微服务
user-rpc:
build:
context: ../../
dockerfile: rpc/user/Dockerfile
ports:
- "8001:8001"
environment:
- DB_HOST=mysql
- REDIS_HOST=redis
- ETCD_HOSTS=etcd:2379
depends_on:
- mysql
- redis
- etcd
data-rpc:
build:
context: ../../
dockerfile: rpc/data/Dockerfile
ports:
- "8002:8002"
environment:
- DB_HOST=mysql
- REDIS_HOST=redis
- ETCD_HOSTS=etcd:2379
- USER_RPC=user-rpc:8001
depends_on:
- user-rpc
gateway-api:
build:
context: ../../
dockerfile: api/gateway/Dockerfile
ports:
- "8000:8000"
environment:
- USER_RPC=user-rpc:8001
- DATA_RPC=data-rpc:8002
- SECURITY_RPC=security-rpc:8003
- BILLING_RPC=billing-rpc:8004
depends_on:
- user-rpc
- data-rpc
volumes:
mysql_data:
```
#### 3. Kubernetes 部署
```yaml
# deploy/k8s/user-rpc.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-rpc
spec:
replicas: 3
selector:
matchLabels:
app: user-rpc
template:
metadata:
labels:
app: user-rpc
spec:
containers:
- name: user-rpc
image: tianyuan/user-rpc:latest
ports:
- containerPort: 8001
env:
- name: DB_HOST
value: "mysql-svc"
- name: REDIS_HOST
value: "redis-svc"
- name: ETCD_HOSTS
value: "etcd-svc:2379"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8001
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: user-rpc-svc
spec:
selector:
app: user-rpc
ports:
- port: 8001
targetPort: 8001
type: ClusterIP
```
### 4. Makefile (统一构建部署)
```makefile
# Makefile
.PHONY: build-all start-dev stop-dev deploy-k8s
# 构建所有服务
build-all:
@echo "构建所有微服务..."
cd api/gateway && go build -o ../../bin/gateway gateway.go
cd rpc/user && go build -o ../../bin/user-rpc user.go
cd rpc/data && go build -o ../../bin/data-rpc data.go
cd rpc/security && go build -o ../../bin/security-rpc security.go
cd rpc/billing && go build -o ../../bin/billing-rpc billing.go
# 生成代码
gen-api:
cd api/gateway && goctl api go -api gateway.api -dir .
gen-rpc:
cd rpc/user && goctl rpc protoc user.proto --go_out=. --go-grpc_out=. --zrpc_out=.
# 开发环境
start-dev:
docker-compose -f deploy/docker/docker-compose.dev.yml up -d
stop-dev:
docker-compose -f deploy/docker/docker-compose.dev.yml down
# 数据库迁移
migrate-up:
cd tools/migrate && go run migrate.go up
migrate-down:
cd tools/migrate && go run migrate.go down
# K8s部署
deploy-k8s:
kubectl apply -f deploy/k8s/namespace.yaml
kubectl apply -f deploy/k8s/configmap.yaml
kubectl apply -f deploy/k8s/mysql.yaml
kubectl apply -f deploy/k8s/redis.yaml
kubectl apply -f deploy/k8s/user-rpc.yaml
kubectl apply -f deploy/k8s/data-rpc.yaml
kubectl apply -f deploy/k8s/gateway-api.yaml
# 测试
test-all:
go test ./api/gateway/...
go test ./rpc/user/...
go test ./rpc/data/...
# 代码检查
lint:
golangci-lint run ./...
# 清理
clean:
rm -rf bin/
docker system prune -f
```
## 🔄 CI/CD 配置
```yaml
# .github/workflows/deploy.yml
name: Deploy Microservices
on:
push:
branches: [main]
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19
- name: Build services
run: make build-all
- name: Run tests
run: make test-all
- name: Build Docker images
run: |
docker build -t tianyuan/gateway:${{ github.sha }} api/gateway/
docker build -t tianyuan/user-rpc:${{ github.sha }} rpc/user/
docker build -t tianyuan/data-rpc:${{ github.sha }} rpc/data/
- name: Push to registry
run: |
echo ${{ secrets.DOCKER_PASSWORD }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin
docker push tianyuan/gateway:${{ github.sha }}
docker push tianyuan/user-rpc:${{ github.sha }}
docker push tianyuan/data-rpc:${{ github.sha }}
- name: Deploy to K8s
run: |
echo "${{ secrets.KUBECONFIG }}" | base64 -d > kubeconfig
export KUBECONFIG=kubeconfig
sed -i 's|latest|${{ github.sha }}|g' deploy/k8s/*.yaml
kubectl apply -f deploy/k8s/
```
这个架构设计完全基于 go-zero 框架,保持了你原有业务逻辑的同时,提供了清晰的服务边界和强大的扩展能力。每个服务都可以独立开发、测试、部署和扩容。