tianyuan-api-server/go-zero服务实现详解.md
2025-07-13 20:37:12 +08:00

19 KiB
Raw Permalink Blame History

go-zero 服务实现详解

🔥 核心服务实现

1. Gateway API 服务 (HTTP 入口)

API 定义 (gateway.api)

syntax = "v1"

info(
    title: "天远API网关"
    desc: "统一API入口"
    version: "v1.0"
)

type (
    // 登录请求
    LoginReq {
        Username string `json:"username"`
        Password string `json:"password"`
    }

    LoginResp {
        Token string `json:"token"`
        UserInfo UserInfo `json:"userInfo"`
    }

    UserInfo {
        UserId int64 `json:"userId"`
        Username string `json:"username"`
        EnterpriseId int64 `json:"enterpriseId"`
    }

    // 数据查询请求 (你的核心业务)
    DataQueryReq {
        QueryType string `json:"queryType"`  // risk/credit/company/data
        Parameters map[string]interface{} `json:"parameters"`
    }

    DataQueryResp {
        Success bool `json:"success"`
        Data interface{} `json:"data"`
        TransactionId string `json:"transactionId"`
        RemainingBalance float64 `json:"remainingBalance"`
    }
)

@server(
    jwt: Auth
    group: auth
)
service gateway-api {
    @handler LoginHandler
    post /api/v1/auth/login (LoginReq) returns (LoginResp)

    @handler LogoutHandler
    post /api/v1/auth/logout returns ()
}

@server(
    jwt: Auth
    group: data
    middleware: RateLimit,Audit
)
service gateway-api {
    @handler RiskAssessmentHandler
    post /api/v1/data/risk-assessment (DataQueryReq) returns (DataQueryResp)

    @handler CreditCheckHandler
    post /api/v1/data/credit-check (DataQueryReq) returns (DataQueryResp)

    @handler CompanyInfoHandler
    post /api/v1/data/company-info (DataQueryReq) returns (DataQueryResp)

    @handler DataQueryHandler
    post /api/v1/data/query (DataQueryReq) returns (DataQueryResp)
}

核心 Logic 实现 (处理复杂调用链)

// api/gateway/internal/logic/data/risklogic.go

type RiskLogic struct {
    logx.Logger
    ctx    context.Context
    svcCtx *svc.ServiceContext
}

func (l *RiskLogic) RiskAssessment(req *types.DataQueryReq) (resp *types.DataQueryResp, err error) {
    // 获取用户信息 (从JWT中解析)
    userId := ctxdata.GetUidFromCtx(l.ctx)

    // 🔥 调用数据域RPC进行复杂业务处理
    dataResp, err := l.svcCtx.DataRpc.ProcessDataRequest(l.ctx, &data.ProcessDataRequestReq{
        UserId: userId,
        QueryType: "risk-assessment",
        Parameters: req.Parameters,
        ClientIp: httpx.GetClientIP(l.ctx),
    })

    if err != nil {
        logx.Errorf("调用数据域RPC失败: %v", err)
        return nil, err
    }

    return &types.DataQueryResp{
        Success: dataResp.Success,
        Data: dataResp.Data,
        TransactionId: dataResp.TransactionId,
        RemainingBalance: dataResp.RemainingBalance,
    }, nil
}

服务上下文 (包含所有 RPC 客户端)

// api/gateway/internal/svc/servicecontext.go

type ServiceContext struct {
    Config config.Config

    // 🔗 RPC客户端连接
    UserRpc     user.User
    DataRpc     data.Data
    SecurityRpc security.Security
    BillingRpc  billing.Billing
    ProductRpc  product.Product
    AuditRpc    audit.Audit

    // 中间件
    RateLimit   rest.Middleware
    AuditMiddleware rest.Middleware
}

func NewServiceContext(c config.Config) *ServiceContext {
    return &ServiceContext{
        Config: c,

        // 初始化RPC客户端
        UserRpc: user.NewUser(zrpc.MustNewClient(c.UserRpc)),
        DataRpc: data.NewData(zrpc.MustNewClient(c.DataRpc)),
        SecurityRpc: security.NewSecurity(zrpc.MustNewClient(c.SecurityRpc)),
        BillingRpc: billing.NewBilling(zrpc.MustNewClient(c.BillingRpc)),
        ProductRpc: product.NewProduct(zrpc.MustNewClient(c.ProductRpc)),
        AuditRpc: audit.NewAudit(zrpc.MustNewClient(c.AuditRpc)),

        // 初始化中间件
        RateLimit: ratelimit.NewRateLimit(c.RateLimit),
        AuditMiddleware: auditrpc.NewAuditMiddleware(c.Audit),
    }
}

2. Data RPC 服务 (核心协调者)

Proto 定义 (data.proto)

syntax = "proto3";

package data;

option go_package = "./pb";

// 数据处理请求
message ProcessDataRequestReq {
  int64 user_id = 1;
  string query_type = 2;  // risk-assessment/credit-check/company-info/data-query
  map<string, string> parameters = 3;
  string client_ip = 4;
}

message ProcessDataRequestResp {
  bool success = 1;
  string data = 2;  // JSON格式的业务数据
  string transaction_id = 3;
  double remaining_balance = 4;
  string error_message = 5;
}

service Data {
  rpc ProcessDataRequest(ProcessDataRequestReq) returns (ProcessDataRequestResp);
}

核心协调逻辑 (你的复杂业务流程)

// rpc/data/internal/logic/orchestrator/dataorchestratorlogic.go

type DataOrchestratorLogic struct {
    ctx    context.Context
    svcCtx *svc.ServiceContext
    logx.Logger
}

func (l *DataOrchestratorLogic) ProcessDataRequest(in *pb.ProcessDataRequestReq) (*pb.ProcessDataRequestResp, error) {
    startTime := time.Now()

    // === 第1步安全验证 ===

    // 1.1 获取用户企业信息
    userResp, err := l.svcCtx.UserRpc.GetUserInfo(l.ctx, &user.GetUserInfoReq{
        UserId: in.UserId,
    })
    if err != nil {
        return nil, fmt.Errorf("获取用户信息失败: %w", err)
    }

    // 1.2 IP白名单验证
    _, err = l.svcCtx.SecurityRpc.CheckWhitelist(l.ctx, &security.CheckWhitelistReq{
        EnterpriseId: userResp.EnterpriseId,
        ClientIp: in.ClientIp,
    })
    if err != nil {
        return nil, fmt.Errorf("IP白名单验证失败: %w", err)
    }

    // 1.3 密钥解密
    decryptResp, err := l.svcCtx.SecurityRpc.DecryptSecret(l.ctx, &security.DecryptSecretReq{
        EnterpriseId: userResp.EnterpriseId,
        EncryptedKey: userResp.EncryptedSecretKey,
    })
    if err != nil {
        return nil, fmt.Errorf("密钥解密失败: %w", err)
    }

    // === 第2步权限与产品验证 ===

    // 2.1 产品权限检查
    productResp, err := l.svcCtx.ProductRpc.CheckProductAccess(l.ctx, &product.CheckProductAccessReq{
        UserId: in.UserId,
        QueryType: in.QueryType,
        SecretKey: decryptResp.SecretKey,
    })
    if err != nil {
        return nil, fmt.Errorf("产品权限检查失败: %w", err)
    }

    // 2.2 余额检查
    balanceResp, err := l.svcCtx.BillingRpc.CheckBalance(l.ctx, &billing.CheckBalanceReq{
        UserId: in.UserId,
        ProductCode: productResp.ProductCode,
        QueryType: in.QueryType,
    })
    if err != nil {
        return nil, fmt.Errorf("余额不足: %w", err)
    }

    // === 第3步执行业务逻辑 ===

    var businessResult *BusinessResult
    switch in.QueryType {
    case "risk-assessment":
        businessResult, err = l.processRiskAssessment(in.Parameters)
    case "credit-check":
        businessResult, err = l.processCreditCheck(in.Parameters)
    case "company-info":
        businessResult, err = l.processCompanyInfo(in.Parameters)
    case "data-query":
        businessResult, err = l.processDataQuery(in.Parameters)
    default:
        return nil, errors.New("不支持的查询类型")
    }

    if err != nil {
        return nil, fmt.Errorf("业务处理失败: %w", err)
    }

    // === 第4步计费和审计 ===

    // 4.1 执行扣费
    chargeResp, err := l.svcCtx.BillingRpc.Charge(l.ctx, &billing.ChargeReq{
        UserId: in.UserId,
        ProductCode: productResp.ProductCode,
        Amount: balanceResp.RequiredAmount,
        TransactionType: in.QueryType,
        RequestId: generateRequestId(),
    })
    if err != nil {
        return nil, fmt.Errorf("扣费失败: %w", err)
    }

    // 4.2 异步记录审计日志
    go func() {
        l.svcCtx.AuditRpc.RecordAPICall(context.Background(), &audit.RecordAPICallReq{
            UserId: in.UserId,
            EnterpriseId: userResp.EnterpriseId,
            QueryType: in.QueryType,
            ClientIp: in.ClientIp,
            TransactionId: chargeResp.TransactionId,
            ResponseTime: time.Since(startTime).Milliseconds(),
            Status: "success",
        })
    }()

    return &pb.ProcessDataRequestResp{
        Success: true,
        Data: businessResult.ToJSON(),
        TransactionId: chargeResp.TransactionId,
        RemainingBalance: chargeResp.RemainingBalance,
    }, nil
}

// 🔥 原FLXG逻辑 - 风险评估
func (l *DataOrchestratorLogic) processRiskAssessment(params map[string]string) (*BusinessResult, error) {
    // 调用西部数据源
    westData, err := l.svcCtx.WestDexClient.QueryRiskData(params)
    if err != nil {
        return nil, err
    }

    // 调用百度风控API
    baiduData, err := l.svcCtx.BaiduClient.RiskAssessment(params)
    if err != nil {
        logx.Errorf("百度API调用失败: %v", err)
        // 降级处理,只使用西部数据
    }

    // 数据融合处理
    result := &BusinessResult{
        Code: "FLXG001",
        Data: mergeRiskData(westData, baiduData),
        Source: "west+baidu",
    }

    return result, nil
}

// 🔥 原JRZQ逻辑 - 征信查询
func (l *DataOrchestratorLogic) processCreditCheck(params map[string]string) (*BusinessResult, error) {
    // 调用征信API
    creditData, err := l.svcCtx.CreditClient.QueryCredit(params)
    if err != nil {
        return nil, err
    }

    return &BusinessResult{
        Code: "JRZQ001",
        Data: creditData,
        Source: "credit_bureau",
    }, nil
}

3. 数据库操作 (go-zero Model)

用户模型

// rpc/user/internal/model/usermodel.go

type User struct {
    Id           int64     `db:"id"`
    Username     string    `db:"username"`
    Password     string    `db:"password"`
    Email        string    `db:"email"`
    EnterpriseId int64     `db:"enterprise_id"`
    Status       int64     `db:"status"`
    CreatedAt    time.Time `db:"created_at"`
    UpdatedAt    time.Time `db:"updated_at"`
}

type UserModel interface {
    Insert(ctx context.Context, data *User) (sql.Result, error)
    FindOne(ctx context.Context, id int64) (*User, error)
    FindOneByUsername(ctx context.Context, username string) (*User, error)
    Update(ctx context.Context, data *User) error
    Delete(ctx context.Context, id int64) error
}

type defaultUserModel struct {
    conn  sqlx.SqlConn
    table string
}

func NewUserModel(conn sqlx.SqlConn) UserModel {
    return &defaultUserModel{
        conn:  conn,
        table: "`users`",
    }
}

func (m *defaultUserModel) FindOneByUsername(ctx context.Context, username string) (*User, error) {
    query := fmt.Sprintf("select %s from %s where `username` = ? limit 1", userRows, m.table)
    var resp User
    err := m.conn.QueryRowCtx(ctx, &resp, query, username)
    switch err {
    case nil:
        return &resp, nil
    case sqlc.ErrNotFound:
        return nil, ErrNotFound
    default:
        return nil, err
    }
}

缓存处理

// rpc/user/internal/logic/user/getuserinfologic.go

func (l *GetUserInfoLogic) GetUserInfo(in *pb.GetUserInfoReq) (*pb.GetUserInfoResp, error) {
    // 1. 先查缓存
    cacheKey := fmt.Sprintf("user:info:%d", in.UserId)
    cached, err := l.svcCtx.RedisClient.Get(cacheKey)
    if err == nil && cached != "" {
        var userInfo pb.GetUserInfoResp
        if json.Unmarshal([]byte(cached), &userInfo) == nil {
            return &userInfo, nil
        }
    }

    // 2. 查数据库
    user, err := l.svcCtx.UserModel.FindOne(l.ctx, in.UserId)
    if err != nil {
        return nil, err
    }

    enterprise, err := l.svcCtx.EnterpriseModel.FindOne(l.ctx, user.EnterpriseId)
    if err != nil {
        return nil, err
    }

    resp := &pb.GetUserInfoResp{
        UserId:       user.Id,
        Username:     user.Username,
        Email:        user.Email,
        EnterpriseId: user.EnterpriseId,
        EnterpriseName: enterprise.Name,
        EncryptedSecretKey: enterprise.EncryptedSecretKey,
    }

    // 3. 写入缓存 (5分钟过期)
    respJson, _ := json.Marshal(resp)
    l.svcCtx.RedisClient.Setex(cacheKey, string(respJson), 300)

    return resp, nil
}

🚀 部署配置

Docker 部署

1. 服务 Dockerfile

# rpc/user/Dockerfile
FROM golang:1.19-alpine AS builder

WORKDIR /build
COPY . .
RUN go mod download
RUN go build -ldflags="-s -w" -o user rpc/user/user.go

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /app
COPY --from=builder /build/user .
COPY --from=builder /build/rpc/user/etc/user.yaml ./etc/

EXPOSE 8001
CMD ["./user", "-f", "etc/user.yaml"]

2. Docker Compose (开发环境)

# deploy/docker/docker-compose.dev.yml
version: "3.8"

services:
    # 基础设施
    mysql:
        image: mysql:8.0
        environment:
            MYSQL_ROOT_PASSWORD: root123
            MYSQL_DATABASE: tianyuan
        ports:
            - "3306:3306"
        volumes:
            - mysql_data:/var/lib/mysql
            - ./init.sql:/docker-entrypoint-initdb.d/init.sql

    redis:
        image: redis:7-alpine
        ports:
            - "6379:6379"

    etcd:
        image: quay.io/coreos/etcd:v3.5.0
        environment:
            - ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
            - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
        ports:
            - "2379:2379"

    kafka:
        image: confluentinc/cp-kafka:latest
        environment:
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
        ports:
            - "9092:9092"
        depends_on:
            - zookeeper

    zookeeper:
        image: confluentinc/cp-zookeeper:latest
        environment:
            ZOOKEEPER_CLIENT_PORT: 2181

    # 微服务
    user-rpc:
        build:
            context: ../../
            dockerfile: rpc/user/Dockerfile
        ports:
            - "8001:8001"
        environment:
            - DB_HOST=mysql
            - REDIS_HOST=redis
            - ETCD_HOSTS=etcd:2379
        depends_on:
            - mysql
            - redis
            - etcd

    data-rpc:
        build:
            context: ../../
            dockerfile: rpc/data/Dockerfile
        ports:
            - "8002:8002"
        environment:
            - DB_HOST=mysql
            - REDIS_HOST=redis
            - ETCD_HOSTS=etcd:2379
            - USER_RPC=user-rpc:8001
        depends_on:
            - user-rpc

    gateway-api:
        build:
            context: ../../
            dockerfile: api/gateway/Dockerfile
        ports:
            - "8000:8000"
        environment:
            - USER_RPC=user-rpc:8001
            - DATA_RPC=data-rpc:8002
            - SECURITY_RPC=security-rpc:8003
            - BILLING_RPC=billing-rpc:8004
        depends_on:
            - user-rpc
            - data-rpc

volumes:
    mysql_data:

3. Kubernetes 部署

# deploy/k8s/user-rpc.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
    name: user-rpc
spec:
    replicas: 3
    selector:
        matchLabels:
            app: user-rpc
    template:
        metadata:
            labels:
                app: user-rpc
        spec:
            containers:
                - name: user-rpc
                  image: tianyuan/user-rpc:latest
                  ports:
                      - containerPort: 8001
                  env:
                      - name: DB_HOST
                        value: "mysql-svc"
                      - name: REDIS_HOST
                        value: "redis-svc"
                      - name: ETCD_HOSTS
                        value: "etcd-svc:2379"
                  resources:
                      requests:
                          memory: "128Mi"
                          cpu: "100m"
                      limits:
                          memory: "512Mi"
                          cpu: "500m"
                  livenessProbe:
                      httpGet:
                          path: /health
                          port: 8001
                      initialDelaySeconds: 30
                      periodSeconds: 10
                  readinessProbe:
                      httpGet:
                          path: /ready
                          port: 8001
                      initialDelaySeconds: 5
                      periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
    name: user-rpc-svc
spec:
    selector:
        app: user-rpc
    ports:
        - port: 8001
          targetPort: 8001
    type: ClusterIP

4. Makefile (统一构建部署)

# Makefile
.PHONY: build-all start-dev stop-dev deploy-k8s

# 构建所有服务
build-all:
	@echo "构建所有微服务..."
	cd api/gateway && go build -o ../../bin/gateway gateway.go
	cd rpc/user && go build -o ../../bin/user-rpc user.go
	cd rpc/data && go build -o ../../bin/data-rpc data.go
	cd rpc/security && go build -o ../../bin/security-rpc security.go
	cd rpc/billing && go build -o ../../bin/billing-rpc billing.go

# 生成代码
gen-api:
	cd api/gateway && goctl api go -api gateway.api -dir .

gen-rpc:
	cd rpc/user && goctl rpc protoc user.proto --go_out=. --go-grpc_out=. --zrpc_out=.

# 开发环境
start-dev:
	docker-compose -f deploy/docker/docker-compose.dev.yml up -d

stop-dev:
	docker-compose -f deploy/docker/docker-compose.dev.yml down

# 数据库迁移
migrate-up:
	cd tools/migrate && go run migrate.go up

migrate-down:
	cd tools/migrate && go run migrate.go down

# K8s部署
deploy-k8s:
	kubectl apply -f deploy/k8s/namespace.yaml
	kubectl apply -f deploy/k8s/configmap.yaml
	kubectl apply -f deploy/k8s/mysql.yaml
	kubectl apply -f deploy/k8s/redis.yaml
	kubectl apply -f deploy/k8s/user-rpc.yaml
	kubectl apply -f deploy/k8s/data-rpc.yaml
	kubectl apply -f deploy/k8s/gateway-api.yaml

# 测试
test-all:
	go test ./api/gateway/...
	go test ./rpc/user/...
	go test ./rpc/data/...

# 代码检查
lint:
	golangci-lint run ./...

# 清理
clean:
	rm -rf bin/
	docker system prune -f

🔄 CI/CD 配置

# .github/workflows/deploy.yml
name: Deploy Microservices

on:
    push:
        branches: [main]

jobs:
    build-and-deploy:
        runs-on: ubuntu-latest

        steps:
            - uses: actions/checkout@v3

            - name: Set up Go
              uses: actions/setup-go@v3
              with:
                  go-version: 1.19

            - name: Build services
              run: make build-all

            - name: Run tests
              run: make test-all

            - name: Build Docker images
              run: |
                  docker build -t tianyuan/gateway:${{ github.sha }} api/gateway/
                  docker build -t tianyuan/user-rpc:${{ github.sha }} rpc/user/
                  docker build -t tianyuan/data-rpc:${{ github.sha }} rpc/data/                  

            - name: Push to registry
              run: |
                  echo ${{ secrets.DOCKER_PASSWORD }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin
                  docker push tianyuan/gateway:${{ github.sha }}
                  docker push tianyuan/user-rpc:${{ github.sha }}
                  docker push tianyuan/data-rpc:${{ github.sha }}                  

            - name: Deploy to K8s
              run: |
                  echo "${{ secrets.KUBECONFIG }}" | base64 -d > kubeconfig
                  export KUBECONFIG=kubeconfig
                  sed -i 's|latest|${{ github.sha }}|g' deploy/k8s/*.yaml
                  kubectl apply -f deploy/k8s/                  

这个架构设计完全基于 go-zero 框架,保持了你原有业务逻辑的同时,提供了清晰的服务边界和强大的扩展能力。每个服务都可以独立开发、测试、部署和扩容。