19 KiB
19 KiB
go-zero 服务实现详解
🔥 核心服务实现
1. Gateway API 服务 (HTTP 入口)
API 定义 (gateway.api)
syntax = "v1"
info(
title: "天远API网关"
desc: "统一API入口"
version: "v1.0"
)
type (
// 登录请求
LoginReq {
Username string `json:"username"`
Password string `json:"password"`
}
LoginResp {
Token string `json:"token"`
UserInfo UserInfo `json:"userInfo"`
}
UserInfo {
UserId int64 `json:"userId"`
Username string `json:"username"`
EnterpriseId int64 `json:"enterpriseId"`
}
// 数据查询请求 (你的核心业务)
DataQueryReq {
QueryType string `json:"queryType"` // risk/credit/company/data
Parameters map[string]interface{} `json:"parameters"`
}
DataQueryResp {
Success bool `json:"success"`
Data interface{} `json:"data"`
TransactionId string `json:"transactionId"`
RemainingBalance float64 `json:"remainingBalance"`
}
)
@server(
jwt: Auth
group: auth
)
service gateway-api {
@handler LoginHandler
post /api/v1/auth/login (LoginReq) returns (LoginResp)
@handler LogoutHandler
post /api/v1/auth/logout returns ()
}
@server(
jwt: Auth
group: data
middleware: RateLimit,Audit
)
service gateway-api {
@handler RiskAssessmentHandler
post /api/v1/data/risk-assessment (DataQueryReq) returns (DataQueryResp)
@handler CreditCheckHandler
post /api/v1/data/credit-check (DataQueryReq) returns (DataQueryResp)
@handler CompanyInfoHandler
post /api/v1/data/company-info (DataQueryReq) returns (DataQueryResp)
@handler DataQueryHandler
post /api/v1/data/query (DataQueryReq) returns (DataQueryResp)
}
核心 Logic 实现 (处理复杂调用链)
// api/gateway/internal/logic/data/risklogic.go
type RiskLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func (l *RiskLogic) RiskAssessment(req *types.DataQueryReq) (resp *types.DataQueryResp, err error) {
// 获取用户信息 (从JWT中解析)
userId := ctxdata.GetUidFromCtx(l.ctx)
// 🔥 调用数据域RPC进行复杂业务处理
dataResp, err := l.svcCtx.DataRpc.ProcessDataRequest(l.ctx, &data.ProcessDataRequestReq{
UserId: userId,
QueryType: "risk-assessment",
Parameters: req.Parameters,
ClientIp: httpx.GetClientIP(l.ctx),
})
if err != nil {
logx.Errorf("调用数据域RPC失败: %v", err)
return nil, err
}
return &types.DataQueryResp{
Success: dataResp.Success,
Data: dataResp.Data,
TransactionId: dataResp.TransactionId,
RemainingBalance: dataResp.RemainingBalance,
}, nil
}
服务上下文 (包含所有 RPC 客户端)
// api/gateway/internal/svc/servicecontext.go
type ServiceContext struct {
Config config.Config
// 🔗 RPC客户端连接
UserRpc user.User
DataRpc data.Data
SecurityRpc security.Security
BillingRpc billing.Billing
ProductRpc product.Product
AuditRpc audit.Audit
// 中间件
RateLimit rest.Middleware
AuditMiddleware rest.Middleware
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
// 初始化RPC客户端
UserRpc: user.NewUser(zrpc.MustNewClient(c.UserRpc)),
DataRpc: data.NewData(zrpc.MustNewClient(c.DataRpc)),
SecurityRpc: security.NewSecurity(zrpc.MustNewClient(c.SecurityRpc)),
BillingRpc: billing.NewBilling(zrpc.MustNewClient(c.BillingRpc)),
ProductRpc: product.NewProduct(zrpc.MustNewClient(c.ProductRpc)),
AuditRpc: audit.NewAudit(zrpc.MustNewClient(c.AuditRpc)),
// 初始化中间件
RateLimit: ratelimit.NewRateLimit(c.RateLimit),
AuditMiddleware: auditrpc.NewAuditMiddleware(c.Audit),
}
}
2. Data RPC 服务 (核心协调者)
Proto 定义 (data.proto)
syntax = "proto3";
package data;
option go_package = "./pb";
// 数据处理请求
message ProcessDataRequestReq {
int64 user_id = 1;
string query_type = 2; // risk-assessment/credit-check/company-info/data-query
map<string, string> parameters = 3;
string client_ip = 4;
}
message ProcessDataRequestResp {
bool success = 1;
string data = 2; // JSON格式的业务数据
string transaction_id = 3;
double remaining_balance = 4;
string error_message = 5;
}
service Data {
rpc ProcessDataRequest(ProcessDataRequestReq) returns (ProcessDataRequestResp);
}
核心协调逻辑 (你的复杂业务流程)
// rpc/data/internal/logic/orchestrator/dataorchestratorlogic.go
type DataOrchestratorLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func (l *DataOrchestratorLogic) ProcessDataRequest(in *pb.ProcessDataRequestReq) (*pb.ProcessDataRequestResp, error) {
startTime := time.Now()
// === 第1步:安全验证 ===
// 1.1 获取用户企业信息
userResp, err := l.svcCtx.UserRpc.GetUserInfo(l.ctx, &user.GetUserInfoReq{
UserId: in.UserId,
})
if err != nil {
return nil, fmt.Errorf("获取用户信息失败: %w", err)
}
// 1.2 IP白名单验证
_, err = l.svcCtx.SecurityRpc.CheckWhitelist(l.ctx, &security.CheckWhitelistReq{
EnterpriseId: userResp.EnterpriseId,
ClientIp: in.ClientIp,
})
if err != nil {
return nil, fmt.Errorf("IP白名单验证失败: %w", err)
}
// 1.3 密钥解密
decryptResp, err := l.svcCtx.SecurityRpc.DecryptSecret(l.ctx, &security.DecryptSecretReq{
EnterpriseId: userResp.EnterpriseId,
EncryptedKey: userResp.EncryptedSecretKey,
})
if err != nil {
return nil, fmt.Errorf("密钥解密失败: %w", err)
}
// === 第2步:权限与产品验证 ===
// 2.1 产品权限检查
productResp, err := l.svcCtx.ProductRpc.CheckProductAccess(l.ctx, &product.CheckProductAccessReq{
UserId: in.UserId,
QueryType: in.QueryType,
SecretKey: decryptResp.SecretKey,
})
if err != nil {
return nil, fmt.Errorf("产品权限检查失败: %w", err)
}
// 2.2 余额检查
balanceResp, err := l.svcCtx.BillingRpc.CheckBalance(l.ctx, &billing.CheckBalanceReq{
UserId: in.UserId,
ProductCode: productResp.ProductCode,
QueryType: in.QueryType,
})
if err != nil {
return nil, fmt.Errorf("余额不足: %w", err)
}
// === 第3步:执行业务逻辑 ===
var businessResult *BusinessResult
switch in.QueryType {
case "risk-assessment":
businessResult, err = l.processRiskAssessment(in.Parameters)
case "credit-check":
businessResult, err = l.processCreditCheck(in.Parameters)
case "company-info":
businessResult, err = l.processCompanyInfo(in.Parameters)
case "data-query":
businessResult, err = l.processDataQuery(in.Parameters)
default:
return nil, errors.New("不支持的查询类型")
}
if err != nil {
return nil, fmt.Errorf("业务处理失败: %w", err)
}
// === 第4步:计费和审计 ===
// 4.1 执行扣费
chargeResp, err := l.svcCtx.BillingRpc.Charge(l.ctx, &billing.ChargeReq{
UserId: in.UserId,
ProductCode: productResp.ProductCode,
Amount: balanceResp.RequiredAmount,
TransactionType: in.QueryType,
RequestId: generateRequestId(),
})
if err != nil {
return nil, fmt.Errorf("扣费失败: %w", err)
}
// 4.2 异步记录审计日志
go func() {
l.svcCtx.AuditRpc.RecordAPICall(context.Background(), &audit.RecordAPICallReq{
UserId: in.UserId,
EnterpriseId: userResp.EnterpriseId,
QueryType: in.QueryType,
ClientIp: in.ClientIp,
TransactionId: chargeResp.TransactionId,
ResponseTime: time.Since(startTime).Milliseconds(),
Status: "success",
})
}()
return &pb.ProcessDataRequestResp{
Success: true,
Data: businessResult.ToJSON(),
TransactionId: chargeResp.TransactionId,
RemainingBalance: chargeResp.RemainingBalance,
}, nil
}
// 🔥 原FLXG逻辑 - 风险评估
func (l *DataOrchestratorLogic) processRiskAssessment(params map[string]string) (*BusinessResult, error) {
// 调用西部数据源
westData, err := l.svcCtx.WestDexClient.QueryRiskData(params)
if err != nil {
return nil, err
}
// 调用百度风控API
baiduData, err := l.svcCtx.BaiduClient.RiskAssessment(params)
if err != nil {
logx.Errorf("百度API调用失败: %v", err)
// 降级处理,只使用西部数据
}
// 数据融合处理
result := &BusinessResult{
Code: "FLXG001",
Data: mergeRiskData(westData, baiduData),
Source: "west+baidu",
}
return result, nil
}
// 🔥 原JRZQ逻辑 - 征信查询
func (l *DataOrchestratorLogic) processCreditCheck(params map[string]string) (*BusinessResult, error) {
// 调用征信API
creditData, err := l.svcCtx.CreditClient.QueryCredit(params)
if err != nil {
return nil, err
}
return &BusinessResult{
Code: "JRZQ001",
Data: creditData,
Source: "credit_bureau",
}, nil
}
3. 数据库操作 (go-zero Model)
用户模型
// rpc/user/internal/model/usermodel.go
type User struct {
Id int64 `db:"id"`
Username string `db:"username"`
Password string `db:"password"`
Email string `db:"email"`
EnterpriseId int64 `db:"enterprise_id"`
Status int64 `db:"status"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}
type UserModel interface {
Insert(ctx context.Context, data *User) (sql.Result, error)
FindOne(ctx context.Context, id int64) (*User, error)
FindOneByUsername(ctx context.Context, username string) (*User, error)
Update(ctx context.Context, data *User) error
Delete(ctx context.Context, id int64) error
}
type defaultUserModel struct {
conn sqlx.SqlConn
table string
}
func NewUserModel(conn sqlx.SqlConn) UserModel {
return &defaultUserModel{
conn: conn,
table: "`users`",
}
}
func (m *defaultUserModel) FindOneByUsername(ctx context.Context, username string) (*User, error) {
query := fmt.Sprintf("select %s from %s where `username` = ? limit 1", userRows, m.table)
var resp User
err := m.conn.QueryRowCtx(ctx, &resp, query, username)
switch err {
case nil:
return &resp, nil
case sqlc.ErrNotFound:
return nil, ErrNotFound
default:
return nil, err
}
}
缓存处理
// rpc/user/internal/logic/user/getuserinfologic.go
func (l *GetUserInfoLogic) GetUserInfo(in *pb.GetUserInfoReq) (*pb.GetUserInfoResp, error) {
// 1. 先查缓存
cacheKey := fmt.Sprintf("user:info:%d", in.UserId)
cached, err := l.svcCtx.RedisClient.Get(cacheKey)
if err == nil && cached != "" {
var userInfo pb.GetUserInfoResp
if json.Unmarshal([]byte(cached), &userInfo) == nil {
return &userInfo, nil
}
}
// 2. 查数据库
user, err := l.svcCtx.UserModel.FindOne(l.ctx, in.UserId)
if err != nil {
return nil, err
}
enterprise, err := l.svcCtx.EnterpriseModel.FindOne(l.ctx, user.EnterpriseId)
if err != nil {
return nil, err
}
resp := &pb.GetUserInfoResp{
UserId: user.Id,
Username: user.Username,
Email: user.Email,
EnterpriseId: user.EnterpriseId,
EnterpriseName: enterprise.Name,
EncryptedSecretKey: enterprise.EncryptedSecretKey,
}
// 3. 写入缓存 (5分钟过期)
respJson, _ := json.Marshal(resp)
l.svcCtx.RedisClient.Setex(cacheKey, string(respJson), 300)
return resp, nil
}
🚀 部署配置
Docker 部署
1. 服务 Dockerfile
# rpc/user/Dockerfile
FROM golang:1.19-alpine AS builder
WORKDIR /build
COPY . .
RUN go mod download
RUN go build -ldflags="-s -w" -o user rpc/user/user.go
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /app
COPY --from=builder /build/user .
COPY --from=builder /build/rpc/user/etc/user.yaml ./etc/
EXPOSE 8001
CMD ["./user", "-f", "etc/user.yaml"]
2. Docker Compose (开发环境)
# deploy/docker/docker-compose.dev.yml
version: "3.8"
services:
# 基础设施
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root123
MYSQL_DATABASE: tianyuan
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
redis:
image: redis:7-alpine
ports:
- "6379:6379"
etcd:
image: quay.io/coreos/etcd:v3.5.0
environment:
- ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
- ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
ports:
- "2379:2379"
kafka:
image: confluentinc/cp-kafka:latest
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
ports:
- "9092:9092"
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# 微服务
user-rpc:
build:
context: ../../
dockerfile: rpc/user/Dockerfile
ports:
- "8001:8001"
environment:
- DB_HOST=mysql
- REDIS_HOST=redis
- ETCD_HOSTS=etcd:2379
depends_on:
- mysql
- redis
- etcd
data-rpc:
build:
context: ../../
dockerfile: rpc/data/Dockerfile
ports:
- "8002:8002"
environment:
- DB_HOST=mysql
- REDIS_HOST=redis
- ETCD_HOSTS=etcd:2379
- USER_RPC=user-rpc:8001
depends_on:
- user-rpc
gateway-api:
build:
context: ../../
dockerfile: api/gateway/Dockerfile
ports:
- "8000:8000"
environment:
- USER_RPC=user-rpc:8001
- DATA_RPC=data-rpc:8002
- SECURITY_RPC=security-rpc:8003
- BILLING_RPC=billing-rpc:8004
depends_on:
- user-rpc
- data-rpc
volumes:
mysql_data:
3. Kubernetes 部署
# deploy/k8s/user-rpc.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-rpc
spec:
replicas: 3
selector:
matchLabels:
app: user-rpc
template:
metadata:
labels:
app: user-rpc
spec:
containers:
- name: user-rpc
image: tianyuan/user-rpc:latest
ports:
- containerPort: 8001
env:
- name: DB_HOST
value: "mysql-svc"
- name: REDIS_HOST
value: "redis-svc"
- name: ETCD_HOSTS
value: "etcd-svc:2379"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8001
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: user-rpc-svc
spec:
selector:
app: user-rpc
ports:
- port: 8001
targetPort: 8001
type: ClusterIP
4. Makefile (统一构建部署)
# Makefile
.PHONY: build-all start-dev stop-dev deploy-k8s
# 构建所有服务
build-all:
@echo "构建所有微服务..."
cd api/gateway && go build -o ../../bin/gateway gateway.go
cd rpc/user && go build -o ../../bin/user-rpc user.go
cd rpc/data && go build -o ../../bin/data-rpc data.go
cd rpc/security && go build -o ../../bin/security-rpc security.go
cd rpc/billing && go build -o ../../bin/billing-rpc billing.go
# 生成代码
gen-api:
cd api/gateway && goctl api go -api gateway.api -dir .
gen-rpc:
cd rpc/user && goctl rpc protoc user.proto --go_out=. --go-grpc_out=. --zrpc_out=.
# 开发环境
start-dev:
docker-compose -f deploy/docker/docker-compose.dev.yml up -d
stop-dev:
docker-compose -f deploy/docker/docker-compose.dev.yml down
# 数据库迁移
migrate-up:
cd tools/migrate && go run migrate.go up
migrate-down:
cd tools/migrate && go run migrate.go down
# K8s部署
deploy-k8s:
kubectl apply -f deploy/k8s/namespace.yaml
kubectl apply -f deploy/k8s/configmap.yaml
kubectl apply -f deploy/k8s/mysql.yaml
kubectl apply -f deploy/k8s/redis.yaml
kubectl apply -f deploy/k8s/user-rpc.yaml
kubectl apply -f deploy/k8s/data-rpc.yaml
kubectl apply -f deploy/k8s/gateway-api.yaml
# 测试
test-all:
go test ./api/gateway/...
go test ./rpc/user/...
go test ./rpc/data/...
# 代码检查
lint:
golangci-lint run ./...
# 清理
clean:
rm -rf bin/
docker system prune -f
🔄 CI/CD 配置
# .github/workflows/deploy.yml
name: Deploy Microservices
on:
push:
branches: [main]
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19
- name: Build services
run: make build-all
- name: Run tests
run: make test-all
- name: Build Docker images
run: |
docker build -t tianyuan/gateway:${{ github.sha }} api/gateway/
docker build -t tianyuan/user-rpc:${{ github.sha }} rpc/user/
docker build -t tianyuan/data-rpc:${{ github.sha }} rpc/data/
- name: Push to registry
run: |
echo ${{ secrets.DOCKER_PASSWORD }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin
docker push tianyuan/gateway:${{ github.sha }}
docker push tianyuan/user-rpc:${{ github.sha }}
docker push tianyuan/data-rpc:${{ github.sha }}
- name: Deploy to K8s
run: |
echo "${{ secrets.KUBECONFIG }}" | base64 -d > kubeconfig
export KUBECONFIG=kubeconfig
sed -i 's|latest|${{ github.sha }}|g' deploy/k8s/*.yaml
kubectl apply -f deploy/k8s/
这个架构设计完全基于 go-zero 框架,保持了你原有业务逻辑的同时,提供了清晰的服务边界和强大的扩展能力。每个服务都可以独立开发、测试、部署和扩容。