18 KiB
18 KiB
微服务网关管理详解
🚪 什么是 API 网关?
API 网关就像一个智能门卫,站在所有微服务的前面:
客户端请求 → API网关 → 路由到具体的微服务
↑ ↑ ↑
用户App 统一入口 用户服务
管理后台 ↓ 支付服务
第三方系统 认证授权 产品服务
限流控制 数据服务
日志记录
🎯 网关的核心功能
1. 请求路由 (Request Routing)
// 路由配置示例
type Route struct {
Path string `yaml:"path"`
Method string `yaml:"method"`
Service string `yaml:"service"`
Middlewares []string `yaml:"middlewares"`
Headers map[string]string `yaml:"headers"`
}
var routes = []Route{
{
Path: "/api/v1/users/*",
Method: "*",
Service: "user-service",
Middlewares: []string{"auth", "rate-limit"},
},
{
Path: "/api/v1/payments/*",
Method: "*",
Service: "payment-service",
Middlewares: []string{"auth", "encrypt"},
},
{
Path: "/api/v1/products/*",
Method: "*",
Service: "product-service",
Middlewares: []string{"auth", "cache"},
},
}
2. 负载均衡 (Load Balancing)
// 负载均衡器
type LoadBalancer interface {
Next() *ServiceInstance
}
// 轮询负载均衡
type RoundRobinBalancer struct {
instances []*ServiceInstance
current int
mutex sync.Mutex
}
func (rb *RoundRobinBalancer) Next() *ServiceInstance {
rb.mutex.Lock()
defer rb.mutex.Unlock()
if len(rb.instances) == 0 {
return nil
}
instance := rb.instances[rb.current]
rb.current = (rb.current + 1) % len(rb.instances)
return instance
}
// 加权轮询
type WeightedRoundRobinBalancer struct {
instances []*WeightedInstance
total int
current int
}
type WeightedInstance struct {
Instance *ServiceInstance
Weight int
Current int
}
3. 服务发现 (Service Discovery)
// 服务发现接口
type ServiceDiscovery interface {
Register(service *ServiceInstance) error
Deregister(serviceID string) error
Discover(serviceName string) ([]*ServiceInstance, error)
Watch(serviceName string) <-chan []*ServiceInstance
}
// Consul 服务发现实现
type ConsulDiscovery struct {
client *consul.Client
}
func (cd *ConsulDiscovery) Discover(serviceName string) ([]*ServiceInstance, error) {
services, _, err := cd.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, err
}
var instances []*ServiceInstance
for _, service := range services {
instances = append(instances, &ServiceInstance{
ID: service.Service.ID,
Name: service.Service.Service,
Address: service.Service.Address,
Port: service.Service.Port,
Meta: service.Service.Meta,
})
}
return instances, nil
}
🛡️ 网关中间件系统
1. 认证中间件
// 认证中间件
func AuthMiddleware(jwtSecret string) gin.HandlerFunc {
return func(c *gin.Context) {
// 1. 获取 Token
token := extractToken(c)
if token == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "缺少认证令牌"})
c.Abort()
return
}
// 2. 验证 Token
claims, err := validateJWT(token, jwtSecret)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "无效的认证令牌"})
c.Abort()
return
}
// 3. 设置用户上下文
c.Set("user_id", claims.UserID)
c.Set("username", claims.Username)
c.Set("roles", claims.Roles)
c.Next()
}
}
func extractToken(c *gin.Context) string {
// 从 Header 获取
authHeader := c.GetHeader("Authorization")
if authHeader != "" && strings.HasPrefix(authHeader, "Bearer ") {
return strings.TrimPrefix(authHeader, "Bearer ")
}
// 从 Query 参数获取
return c.Query("token")
}
2. 限流中间件
// 限流中间件
type RateLimiter struct {
redis *redis.Client
window time.Duration
limit int
}
func NewRateLimiter(redis *redis.Client, window time.Duration, limit int) *RateLimiter {
return &RateLimiter{
redis: redis,
window: window,
limit: limit,
}
}
func (rl *RateLimiter) Middleware() gin.HandlerFunc {
return func(c *gin.Context) {
// 1. 获取客户端标识
clientID := getClientID(c)
// 2. 检查限流
allowed, err := rl.isAllowed(clientID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "限流检查失败"})
c.Abort()
return
}
if !allowed {
c.JSON(http.StatusTooManyRequests, gin.H{
"error": "请求过于频繁,请稍后再试",
"retry_after": int(rl.window.Seconds()),
})
c.Abort()
return
}
c.Next()
}
}
func (rl *RateLimiter) isAllowed(clientID string) (bool, error) {
key := fmt.Sprintf("rate_limit:%s", clientID)
// 使用滑动窗口算法
now := time.Now().Unix()
window := now - int64(rl.window.Seconds())
pipe := rl.redis.Pipeline()
// 删除过期的记录
pipe.ZRemRangeByScore(context.Background(), key, "0", fmt.Sprintf("%d", window))
// 获取当前窗口内的请求数
countCmd := pipe.ZCard(context.Background(), key)
// 添加当前请求
pipe.ZAdd(context.Background(), key, &redis.Z{
Score: float64(now),
Member: fmt.Sprintf("%d", now),
})
// 设置过期时间
pipe.Expire(context.Background(), key, rl.window)
_, err := pipe.Exec(context.Background())
if err != nil {
return false, err
}
count := countCmd.Val()
return count < int64(rl.limit), nil
}
func getClientID(c *gin.Context) string {
// 优先使用用户ID
if userID := c.GetString("user_id"); userID != "" {
return "user:" + userID
}
// 其次使用API Key
if apiKey := c.GetHeader("X-API-Key"); apiKey != "" {
return "api:" + apiKey
}
// 最后使用IP地址
return "ip:" + c.ClientIP()
}
3. 缓存中间件
// 缓存中间件
type CacheMiddleware struct {
redis *redis.Client
ttl time.Duration
}
func NewCacheMiddleware(redis *redis.Client, ttl time.Duration) *CacheMiddleware {
return &CacheMiddleware{
redis: redis,
ttl: ttl,
}
}
func (cm *CacheMiddleware) Middleware() gin.HandlerFunc {
return func(c *gin.Context) {
// 只缓存 GET 请求
if c.Request.Method != http.MethodGet {
c.Next()
return
}
// 生成缓存键
cacheKey := generateCacheKey(c)
// 尝试从缓存获取
cached, err := cm.redis.Get(context.Background(), cacheKey).Result()
if err == nil {
var response CachedResponse
if json.Unmarshal([]byte(cached), &response) == nil {
// 设置响应头
for key, value := range response.Headers {
c.Header(key, value)
}
c.Data(response.Status, response.ContentType, response.Body)
c.Abort()
return
}
}
// 包装 ResponseWriter 来捕获响应
crw := &cachedResponseWriter{
ResponseWriter: c.Writer,
body: &bytes.Buffer{},
headers: make(map[string]string),
}
c.Writer = crw
c.Next()
// 缓存响应
if crw.status >= 200 && crw.status < 300 {
response := CachedResponse{
Status: crw.status,
Headers: crw.headers,
ContentType: crw.Header().Get("Content-Type"),
Body: crw.body.Bytes(),
}
if data, err := json.Marshal(response); err == nil {
cm.redis.Set(context.Background(), cacheKey, data, cm.ttl)
}
}
}
}
type CachedResponse struct {
Status int `json:"status"`
Headers map[string]string `json:"headers"`
ContentType string `json:"content_type"`
Body []byte `json:"body"`
}
type cachedResponseWriter struct {
gin.ResponseWriter
body *bytes.Buffer
headers map[string]string
status int
}
func (crw *cachedResponseWriter) Write(data []byte) (int, error) {
crw.body.Write(data)
return crw.ResponseWriter.Write(data)
}
func (crw *cachedResponseWriter) WriteHeader(status int) {
crw.status = status
crw.ResponseWriter.WriteHeader(status)
}
🔧 网关配置管理
1. 动态配置
# gateway.yaml
gateway:
port: 8080
timeout: 30s
services:
user-service:
discovery: consul
load_balancer: round_robin
health_check:
enabled: true
interval: 10s
timeout: 3s
path: /health
circuit_breaker:
enabled: true
failure_threshold: 5
timeout: 60s
payment-service:
discovery: consul
load_balancer: weighted_round_robin
weights:
- instance: payment-service-1
weight: 3
- instance: payment-service-2
weight: 1
middlewares:
auth:
jwt_secret: ${JWT_SECRET}
excluded_paths:
- /api/v1/auth/login
- /api/v1/auth/register
- /health
rate_limit:
default:
window: 60s
limit: 100
user_limits:
premium_user:
window: 60s
limit: 1000
basic_user:
window: 60s
limit: 100
cache:
ttl: 300s
excluded_paths:
- /api/v1/users/me
- /api/v1/payments/*
routes:
- path: /api/v1/users/*
service: user-service
middlewares: [auth, rate_limit]
- path: /api/v1/payments/*
service: payment-service
middlewares: [auth, rate_limit]
- path: /api/v1/products/*
service: product-service
middlewares: [auth, rate_limit, cache]
2. 配置热更新
// 配置管理器
type ConfigManager struct {
config *GatewayConfig
mutex sync.RWMutex
watchers []chan *GatewayConfig
}
func NewConfigManager() *ConfigManager {
return &ConfigManager{
watchers: make([]chan *GatewayConfig, 0),
}
}
func (cm *ConfigManager) LoadConfig(path string) error {
cm.mutex.Lock()
defer cm.mutex.Unlock()
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
var config GatewayConfig
if err := yaml.Unmarshal(data, &config); err != nil {
return err
}
cm.config = &config
// 通知所有监听者
for _, watcher := range cm.watchers {
select {
case watcher <- &config:
default:
// 非阻塞发送
}
}
return nil
}
func (cm *ConfigManager) Watch() <-chan *GatewayConfig {
cm.mutex.Lock()
defer cm.mutex.Unlock()
watcher := make(chan *GatewayConfig, 1)
cm.watchers = append(cm.watchers, watcher)
// 立即发送当前配置
if cm.config != nil {
watcher <- cm.config
}
return watcher
}
func (cm *ConfigManager) WatchFile(path string) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
err = watcher.Add(path)
if err != nil {
log.Fatal(err)
}
for {
select {
case event := <-watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
log.Println("配置文件已修改,重新加载...")
time.Sleep(100 * time.Millisecond) // 避免多次触发
cm.LoadConfig(path)
}
case err := <-watcher.Errors:
log.Println("配置文件监听错误:", err)
}
}
}
📊 网关监控和可观测性
1. 指标收集
// 指标收集
var (
requestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gateway_requests_total",
Help: "Total number of requests",
},
[]string{"service", "method", "path", "status"},
)
requestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gateway_request_duration_seconds",
Help: "Request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"service", "method", "path"},
)
activeConnections = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "gateway_active_connections",
Help: "Number of active connections",
},
)
)
func MetricsMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
activeConnections.Inc()
defer activeConnections.Dec()
c.Next()
duration := time.Since(start).Seconds()
status := strconv.Itoa(c.Writer.Status())
service := c.GetString("target_service")
requestsTotal.WithLabelValues(
service,
c.Request.Method,
c.Request.URL.Path,
status,
).Inc()
requestDuration.WithLabelValues(
service,
c.Request.Method,
c.Request.URL.Path,
).Observe(duration)
}
}
2. 链路追踪
// OpenTelemetry 追踪
func TracingMiddleware(tracer trace.Tracer) gin.HandlerFunc {
return func(c *gin.Context) {
spanName := fmt.Sprintf("%s %s", c.Request.Method, c.Request.URL.Path)
ctx, span := tracer.Start(c.Request.Context(), spanName)
defer span.End()
// 设置 span 属性
span.SetAttributes(
attribute.String("http.method", c.Request.Method),
attribute.String("http.url", c.Request.URL.String()),
attribute.String("http.user_agent", c.Request.UserAgent()),
attribute.String("client.ip", c.ClientIP()),
)
// 传递上下文
c.Request = c.Request.WithContext(ctx)
c.Next()
// 设置响应属性
span.SetAttributes(
attribute.Int("http.status_code", c.Writer.Status()),
attribute.Int("http.response_size", c.Writer.Size()),
)
if c.Writer.Status() >= 400 {
span.SetStatus(codes.Error, fmt.Sprintf("HTTP %d", c.Writer.Status()))
}
}
}
🏗️ 网关架构设计
1. 高可用部署
# docker-compose.yml
version: "3.8"
services:
gateway-1:
image: your-gateway:latest
ports:
- "8080:8080"
environment:
- GATEWAY_ID=gateway-1
- CONSUL_ADDRESS=consul:8500
depends_on:
- consul
- redis
gateway-2:
image: your-gateway:latest
ports:
- "8081:8080"
environment:
- GATEWAY_ID=gateway-2
- CONSUL_ADDRESS=consul:8500
depends_on:
- consul
- redis
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- gateway-1
- gateway-2
consul:
image: consul:latest
ports:
- "8500:8500"
command: consul agent -dev -client=0.0.0.0
redis:
image: redis:alpine
ports:
- "6379:6379"
2. Nginx 负载均衡配置
# nginx.conf
upstream gateway {
server gateway-1:8080 weight=1 max_fails=3 fail_timeout=30s;
server gateway-2:8080 weight=1 max_fails=3 fail_timeout=30s;
keepalive 32;
}
server {
listen 80;
server_name api.example.com;
location / {
proxy_pass http://gateway;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_cache_bypass $http_upgrade;
# 健康检查
proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504;
proxy_connect_timeout 5s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
# 健康检查端点
location /health {
access_log off;
proxy_pass http://gateway;
}
}
🚀 实施建议
1. 渐进式迁移
阶段1: 搭建基础网关
├── 基本的路由功能
├── 健康检查
└── 监控指标
阶段2: 添加安全功能
├── JWT 认证
├── API 限流
└── 请求日志
阶段3: 性能优化
├── 响应缓存
├── 连接池
└── 负载均衡
阶段4: 高级功能
├── 熔断器
├── 灰度发布
└── API 版本管理
2. 网关选型建议
自研网关 (推荐给你):
// 基于 gin + consul + redis 的轻量级网关
// 优势: 完全控制、定制化强、集成容易
// 适合: 中小型项目、快速迭代
开源网关:
- Kong: 功能丰富,插件生态好
- Traefik: 配置简单,支持多种后端
- Envoy: 性能极高,配置复杂
📋 总结
网关管理的核心要点:
- 统一入口 - 所有外部请求通过网关
- 服务路由 - 将请求路由到正确的后端服务
- 安全认证 - 统一的身份验证和授权
- 流量控制 - 限流、熔断、负载均衡
- 可观测性 - 监控、日志、链路追踪
- 高可用 - 多实例部署、健康检查
对于你的项目,建议先实现基础的路由和认证功能,然后逐步添加高级特性。