tianyuan-api-server/网关管理详解.md
2025-07-13 20:37:12 +08:00

18 KiB

微服务网关管理详解

🚪 什么是 API 网关?

API 网关就像一个智能门卫,站在所有微服务的前面:

客户端请求 → API网关 → 路由到具体的微服务
    ↑           ↑              ↑
  用户App    统一入口        用户服务
  管理后台      ↓            支付服务
  第三方系统   认证授权         产品服务
             限流控制         数据服务
             日志记录

🎯 网关的核心功能

1. 请求路由 (Request Routing)

// 路由配置示例
type Route struct {
    Path        string            `yaml:"path"`
    Method      string            `yaml:"method"`
    Service     string            `yaml:"service"`
    Middlewares []string          `yaml:"middlewares"`
    Headers     map[string]string `yaml:"headers"`
}

var routes = []Route{
    {
        Path:        "/api/v1/users/*",
        Method:      "*",
        Service:     "user-service",
        Middlewares: []string{"auth", "rate-limit"},
    },
    {
        Path:        "/api/v1/payments/*",
        Method:      "*",
        Service:     "payment-service",
        Middlewares: []string{"auth", "encrypt"},
    },
    {
        Path:        "/api/v1/products/*",
        Method:      "*",
        Service:     "product-service",
        Middlewares: []string{"auth", "cache"},
    },
}

2. 负载均衡 (Load Balancing)

// 负载均衡器
type LoadBalancer interface {
    Next() *ServiceInstance
}

// 轮询负载均衡
type RoundRobinBalancer struct {
    instances []*ServiceInstance
    current   int
    mutex     sync.Mutex
}

func (rb *RoundRobinBalancer) Next() *ServiceInstance {
    rb.mutex.Lock()
    defer rb.mutex.Unlock()

    if len(rb.instances) == 0 {
        return nil
    }

    instance := rb.instances[rb.current]
    rb.current = (rb.current + 1) % len(rb.instances)
    return instance
}

// 加权轮询
type WeightedRoundRobinBalancer struct {
    instances []*WeightedInstance
    total     int
    current   int
}

type WeightedInstance struct {
    Instance *ServiceInstance
    Weight   int
    Current  int
}

3. 服务发现 (Service Discovery)

// 服务发现接口
type ServiceDiscovery interface {
    Register(service *ServiceInstance) error
    Deregister(serviceID string) error
    Discover(serviceName string) ([]*ServiceInstance, error)
    Watch(serviceName string) <-chan []*ServiceInstance
}

// Consul 服务发现实现
type ConsulDiscovery struct {
    client *consul.Client
}

func (cd *ConsulDiscovery) Discover(serviceName string) ([]*ServiceInstance, error) {
    services, _, err := cd.client.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return nil, err
    }

    var instances []*ServiceInstance
    for _, service := range services {
        instances = append(instances, &ServiceInstance{
            ID:      service.Service.ID,
            Name:    service.Service.Service,
            Address: service.Service.Address,
            Port:    service.Service.Port,
            Meta:    service.Service.Meta,
        })
    }

    return instances, nil
}

🛡️ 网关中间件系统

1. 认证中间件

// 认证中间件
func AuthMiddleware(jwtSecret string) gin.HandlerFunc {
    return func(c *gin.Context) {
        // 1. 获取 Token
        token := extractToken(c)
        if token == "" {
            c.JSON(http.StatusUnauthorized, gin.H{"error": "缺少认证令牌"})
            c.Abort()
            return
        }

        // 2. 验证 Token
        claims, err := validateJWT(token, jwtSecret)
        if err != nil {
            c.JSON(http.StatusUnauthorized, gin.H{"error": "无效的认证令牌"})
            c.Abort()
            return
        }

        // 3. 设置用户上下文
        c.Set("user_id", claims.UserID)
        c.Set("username", claims.Username)
        c.Set("roles", claims.Roles)

        c.Next()
    }
}

func extractToken(c *gin.Context) string {
    // 从 Header 获取
    authHeader := c.GetHeader("Authorization")
    if authHeader != "" && strings.HasPrefix(authHeader, "Bearer ") {
        return strings.TrimPrefix(authHeader, "Bearer ")
    }

    // 从 Query 参数获取
    return c.Query("token")
}

2. 限流中间件

// 限流中间件
type RateLimiter struct {
    redis  *redis.Client
    window time.Duration
    limit  int
}

func NewRateLimiter(redis *redis.Client, window time.Duration, limit int) *RateLimiter {
    return &RateLimiter{
        redis:  redis,
        window: window,
        limit:  limit,
    }
}

func (rl *RateLimiter) Middleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        // 1. 获取客户端标识
        clientID := getClientID(c)

        // 2. 检查限流
        allowed, err := rl.isAllowed(clientID)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "限流检查失败"})
            c.Abort()
            return
        }

        if !allowed {
            c.JSON(http.StatusTooManyRequests, gin.H{
                "error": "请求过于频繁,请稍后再试",
                "retry_after": int(rl.window.Seconds()),
            })
            c.Abort()
            return
        }

        c.Next()
    }
}

func (rl *RateLimiter) isAllowed(clientID string) (bool, error) {
    key := fmt.Sprintf("rate_limit:%s", clientID)

    // 使用滑动窗口算法
    now := time.Now().Unix()
    window := now - int64(rl.window.Seconds())

    pipe := rl.redis.Pipeline()

    // 删除过期的记录
    pipe.ZRemRangeByScore(context.Background(), key, "0", fmt.Sprintf("%d", window))

    // 获取当前窗口内的请求数
    countCmd := pipe.ZCard(context.Background(), key)

    // 添加当前请求
    pipe.ZAdd(context.Background(), key, &redis.Z{
        Score:  float64(now),
        Member: fmt.Sprintf("%d", now),
    })

    // 设置过期时间
    pipe.Expire(context.Background(), key, rl.window)

    _, err := pipe.Exec(context.Background())
    if err != nil {
        return false, err
    }

    count := countCmd.Val()
    return count < int64(rl.limit), nil
}

func getClientID(c *gin.Context) string {
    // 优先使用用户ID
    if userID := c.GetString("user_id"); userID != "" {
        return "user:" + userID
    }

    // 其次使用API Key
    if apiKey := c.GetHeader("X-API-Key"); apiKey != "" {
        return "api:" + apiKey
    }

    // 最后使用IP地址
    return "ip:" + c.ClientIP()
}

3. 缓存中间件

// 缓存中间件
type CacheMiddleware struct {
    redis *redis.Client
    ttl   time.Duration
}

func NewCacheMiddleware(redis *redis.Client, ttl time.Duration) *CacheMiddleware {
    return &CacheMiddleware{
        redis: redis,
        ttl:   ttl,
    }
}

func (cm *CacheMiddleware) Middleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        // 只缓存 GET 请求
        if c.Request.Method != http.MethodGet {
            c.Next()
            return
        }

        // 生成缓存键
        cacheKey := generateCacheKey(c)

        // 尝试从缓存获取
        cached, err := cm.redis.Get(context.Background(), cacheKey).Result()
        if err == nil {
            var response CachedResponse
            if json.Unmarshal([]byte(cached), &response) == nil {
                // 设置响应头
                for key, value := range response.Headers {
                    c.Header(key, value)
                }

                c.Data(response.Status, response.ContentType, response.Body)
                c.Abort()
                return
            }
        }

        // 包装 ResponseWriter 来捕获响应
        crw := &cachedResponseWriter{
            ResponseWriter: c.Writer,
            body:          &bytes.Buffer{},
            headers:       make(map[string]string),
        }
        c.Writer = crw

        c.Next()

        // 缓存响应
        if crw.status >= 200 && crw.status < 300 {
            response := CachedResponse{
                Status:      crw.status,
                Headers:     crw.headers,
                ContentType: crw.Header().Get("Content-Type"),
                Body:        crw.body.Bytes(),
            }

            if data, err := json.Marshal(response); err == nil {
                cm.redis.Set(context.Background(), cacheKey, data, cm.ttl)
            }
        }
    }
}

type CachedResponse struct {
    Status      int               `json:"status"`
    Headers     map[string]string `json:"headers"`
    ContentType string            `json:"content_type"`
    Body        []byte            `json:"body"`
}

type cachedResponseWriter struct {
    gin.ResponseWriter
    body    *bytes.Buffer
    headers map[string]string
    status  int
}

func (crw *cachedResponseWriter) Write(data []byte) (int, error) {
    crw.body.Write(data)
    return crw.ResponseWriter.Write(data)
}

func (crw *cachedResponseWriter) WriteHeader(status int) {
    crw.status = status
    crw.ResponseWriter.WriteHeader(status)
}

🔧 网关配置管理

1. 动态配置

# gateway.yaml
gateway:
    port: 8080
    timeout: 30s

services:
    user-service:
        discovery: consul
        load_balancer: round_robin
        health_check:
            enabled: true
            interval: 10s
            timeout: 3s
            path: /health
        circuit_breaker:
            enabled: true
            failure_threshold: 5
            timeout: 60s

    payment-service:
        discovery: consul
        load_balancer: weighted_round_robin
        weights:
            - instance: payment-service-1
              weight: 3
            - instance: payment-service-2
              weight: 1

middlewares:
    auth:
        jwt_secret: ${JWT_SECRET}
        excluded_paths:
            - /api/v1/auth/login
            - /api/v1/auth/register
            - /health

    rate_limit:
        default:
            window: 60s
            limit: 100
        user_limits:
            premium_user:
                window: 60s
                limit: 1000
            basic_user:
                window: 60s
                limit: 100

    cache:
        ttl: 300s
        excluded_paths:
            - /api/v1/users/me
            - /api/v1/payments/*

routes:
    - path: /api/v1/users/*
      service: user-service
      middlewares: [auth, rate_limit]

    - path: /api/v1/payments/*
      service: payment-service
      middlewares: [auth, rate_limit]

    - path: /api/v1/products/*
      service: product-service
      middlewares: [auth, rate_limit, cache]

2. 配置热更新

// 配置管理器
type ConfigManager struct {
    config   *GatewayConfig
    mutex    sync.RWMutex
    watchers []chan *GatewayConfig
}

func NewConfigManager() *ConfigManager {
    return &ConfigManager{
        watchers: make([]chan *GatewayConfig, 0),
    }
}

func (cm *ConfigManager) LoadConfig(path string) error {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()

    data, err := ioutil.ReadFile(path)
    if err != nil {
        return err
    }

    var config GatewayConfig
    if err := yaml.Unmarshal(data, &config); err != nil {
        return err
    }

    cm.config = &config

    // 通知所有监听者
    for _, watcher := range cm.watchers {
        select {
        case watcher <- &config:
        default:
            // 非阻塞发送
        }
    }

    return nil
}

func (cm *ConfigManager) Watch() <-chan *GatewayConfig {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()

    watcher := make(chan *GatewayConfig, 1)
    cm.watchers = append(cm.watchers, watcher)

    // 立即发送当前配置
    if cm.config != nil {
        watcher <- cm.config
    }

    return watcher
}

func (cm *ConfigManager) WatchFile(path string) {
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        log.Fatal(err)
    }
    defer watcher.Close()

    err = watcher.Add(path)
    if err != nil {
        log.Fatal(err)
    }

    for {
        select {
        case event := <-watcher.Events:
            if event.Op&fsnotify.Write == fsnotify.Write {
                log.Println("配置文件已修改,重新加载...")
                time.Sleep(100 * time.Millisecond) // 避免多次触发
                cm.LoadConfig(path)
            }
        case err := <-watcher.Errors:
            log.Println("配置文件监听错误:", err)
        }
    }
}

📊 网关监控和可观测性

1. 指标收集

// 指标收集
var (
    requestsTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "gateway_requests_total",
            Help: "Total number of requests",
        },
        []string{"service", "method", "path", "status"},
    )

    requestDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "gateway_request_duration_seconds",
            Help:    "Request duration in seconds",
            Buckets: prometheus.DefBuckets,
        },
        []string{"service", "method", "path"},
    )

    activeConnections = prometheus.NewGauge(
        prometheus.GaugeOpts{
            Name: "gateway_active_connections",
            Help: "Number of active connections",
        },
    )
)

func MetricsMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        start := time.Now()

        activeConnections.Inc()
        defer activeConnections.Dec()

        c.Next()

        duration := time.Since(start).Seconds()
        status := strconv.Itoa(c.Writer.Status())
        service := c.GetString("target_service")

        requestsTotal.WithLabelValues(
            service,
            c.Request.Method,
            c.Request.URL.Path,
            status,
        ).Inc()

        requestDuration.WithLabelValues(
            service,
            c.Request.Method,
            c.Request.URL.Path,
        ).Observe(duration)
    }
}

2. 链路追踪

// OpenTelemetry 追踪
func TracingMiddleware(tracer trace.Tracer) gin.HandlerFunc {
    return func(c *gin.Context) {
        spanName := fmt.Sprintf("%s %s", c.Request.Method, c.Request.URL.Path)

        ctx, span := tracer.Start(c.Request.Context(), spanName)
        defer span.End()

        // 设置 span 属性
        span.SetAttributes(
            attribute.String("http.method", c.Request.Method),
            attribute.String("http.url", c.Request.URL.String()),
            attribute.String("http.user_agent", c.Request.UserAgent()),
            attribute.String("client.ip", c.ClientIP()),
        )

        // 传递上下文
        c.Request = c.Request.WithContext(ctx)

        c.Next()

        // 设置响应属性
        span.SetAttributes(
            attribute.Int("http.status_code", c.Writer.Status()),
            attribute.Int("http.response_size", c.Writer.Size()),
        )

        if c.Writer.Status() >= 400 {
            span.SetStatus(codes.Error, fmt.Sprintf("HTTP %d", c.Writer.Status()))
        }
    }
}

🏗️ 网关架构设计

1. 高可用部署

# docker-compose.yml
version: "3.8"
services:
    gateway-1:
        image: your-gateway:latest
        ports:
            - "8080:8080"
        environment:
            - GATEWAY_ID=gateway-1
            - CONSUL_ADDRESS=consul:8500
        depends_on:
            - consul
            - redis

    gateway-2:
        image: your-gateway:latest
        ports:
            - "8081:8080"
        environment:
            - GATEWAY_ID=gateway-2
            - CONSUL_ADDRESS=consul:8500
        depends_on:
            - consul
            - redis

    nginx:
        image: nginx:alpine
        ports:
            - "80:80"
        volumes:
            - ./nginx.conf:/etc/nginx/nginx.conf
        depends_on:
            - gateway-1
            - gateway-2

    consul:
        image: consul:latest
        ports:
            - "8500:8500"
        command: consul agent -dev -client=0.0.0.0

    redis:
        image: redis:alpine
        ports:
            - "6379:6379"

2. Nginx 负载均衡配置

# nginx.conf
upstream gateway {
    server gateway-1:8080 weight=1 max_fails=3 fail_timeout=30s;
    server gateway-2:8080 weight=1 max_fails=3 fail_timeout=30s;
    keepalive 32;
}

server {
    listen 80;
    server_name api.example.com;

    location / {
        proxy_pass http://gateway;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_cache_bypass $http_upgrade;

        # 健康检查
        proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504;
        proxy_connect_timeout 5s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
    }

    # 健康检查端点
    location /health {
        access_log off;
        proxy_pass http://gateway;
    }
}

🚀 实施建议

1. 渐进式迁移

阶段1: 搭建基础网关
├── 基本的路由功能
├── 健康检查
└── 监控指标

阶段2: 添加安全功能
├── JWT 认证
├── API 限流
└── 请求日志

阶段3: 性能优化
├── 响应缓存
├── 连接池
└── 负载均衡

阶段4: 高级功能
├── 熔断器
├── 灰度发布
└── API 版本管理

2. 网关选型建议

自研网关 (推荐给你):

// 基于 gin + consul + redis 的轻量级网关
// 优势: 完全控制、定制化强、集成容易
// 适合: 中小型项目、快速迭代

开源网关:

  • Kong: 功能丰富,插件生态好
  • Traefik: 配置简单,支持多种后端
  • Envoy: 性能极高,配置复杂

📋 总结

网关管理的核心要点:

  1. 统一入口 - 所有外部请求通过网关
  2. 服务路由 - 将请求路由到正确的后端服务
  3. 安全认证 - 统一的身份验证和授权
  4. 流量控制 - 限流、熔断、负载均衡
  5. 可观测性 - 监控、日志、链路追踪
  6. 高可用 - 多实例部署、健康检查

对于你的项目,建议先实现基础的路由和认证功能,然后逐步添加高级特性。