Files
tyapi-server/internal/shared/tracing/redis_wrapper.go

408 lines
10 KiB
Go
Raw Normal View History

2025-07-02 16:17:59 +08:00
package tracing
import (
"context"
"fmt"
"strings"
"time"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"tyapi-server/internal/shared/interfaces"
)
// TracedRedisCache Redis缓存自动追踪包装器
type TracedRedisCache struct {
client redis.UniversalClient
tracer *Tracer
logger *zap.Logger
prefix string
config RedisTracingConfig
}
// RedisTracingConfig Redis追踪配置
type RedisTracingConfig struct {
IncludeKeys bool
IncludeValues bool
MaxKeyLength int
MaxValueLength int
SlowThreshold time.Duration
SanitizeValues bool
}
// DefaultRedisTracingConfig 默认Redis追踪配置
func DefaultRedisTracingConfig() RedisTracingConfig {
return RedisTracingConfig{
IncludeKeys: true,
IncludeValues: false, // 生产环境建议设为false保护敏感数据
MaxKeyLength: 100,
MaxValueLength: 1000,
SlowThreshold: 50 * time.Millisecond,
SanitizeValues: true,
}
}
// NewTracedRedisCache 创建带追踪的Redis缓存
func NewTracedRedisCache(client redis.UniversalClient, tracer *Tracer, logger *zap.Logger, prefix string) interfaces.CacheService {
return &TracedRedisCache{
client: client,
tracer: tracer,
logger: logger,
prefix: prefix,
config: DefaultRedisTracingConfig(),
}
}
// Name 返回服务名称
func (c *TracedRedisCache) Name() string {
return "redis-cache"
}
// Initialize 初始化服务
func (c *TracedRedisCache) Initialize(ctx context.Context) error {
c.logger.Info("Redis缓存服务已初始化")
return nil
}
// HealthCheck 健康检查
func (c *TracedRedisCache) HealthCheck(ctx context.Context) error {
_, err := c.client.Ping(ctx).Result()
return err
}
// Shutdown 关闭服务
func (c *TracedRedisCache) Shutdown(ctx context.Context) error {
c.logger.Info("Redis缓存服务已关闭")
return c.client.Close()
}
// Get 获取缓存值
func (c *TracedRedisCache) Get(ctx context.Context, key string, dest interface{}) error {
// 开始追踪
ctx, span := c.tracer.StartCacheSpan(ctx, "get", key)
defer span.End()
// 添加基础属性
c.addBaseAttributes(span, "get", key)
// 记录开始时间
startTime := time.Now()
// 构建完整键名
fullKey := c.buildKey(key)
// 执行Redis操作
result, err := c.client.Get(ctx, fullKey).Result()
// 计算执行时间
duration := time.Since(startTime)
c.tracer.AddSpanAttributes(span,
attribute.Int64("redis.duration_ms", duration.Milliseconds()),
)
// 检查慢操作
if duration > c.config.SlowThreshold {
c.tracer.AddSpanAttributes(span,
attribute.Bool("redis.slow_operation", true),
)
c.logger.Warn("Redis慢操作检测",
zap.String("operation", "get"),
zap.String("key", c.sanitizeKey(key)),
zap.Duration("duration", duration),
zap.String("trace_id", c.tracer.GetTraceID(ctx)),
)
}
// 处理结果
if err != nil {
if err == redis.Nil {
// 缓存未命中
c.tracer.AddSpanAttributes(span,
attribute.Bool("redis.hit", false),
attribute.String("redis.result", "miss"),
)
c.tracer.SetSpanSuccess(span)
return interfaces.ErrCacheMiss
} else {
// Redis错误
c.tracer.SetSpanError(span, err)
c.logger.Error("Redis GET操作失败",
zap.String("key", c.sanitizeKey(key)),
zap.Error(err),
zap.String("trace_id", c.tracer.GetTraceID(ctx)),
)
return err
}
}
// 缓存命中
c.tracer.AddSpanAttributes(span,
attribute.Bool("redis.hit", true),
attribute.String("redis.result", "hit"),
attribute.Int("redis.value_size", len(result)),
)
// 反序列化
if err := c.deserialize(result, dest); err != nil {
c.tracer.SetSpanError(span, err)
return err
}
c.tracer.SetSpanSuccess(span)
return nil
}
// Set 设置缓存值
func (c *TracedRedisCache) Set(ctx context.Context, key string, value interface{}, ttl ...interface{}) error {
// 开始追踪
ctx, span := c.tracer.StartCacheSpan(ctx, "set", key)
defer span.End()
// 添加基础属性
c.addBaseAttributes(span, "set", key)
// 处理TTL
var expiration time.Duration
if len(ttl) > 0 {
if duration, ok := ttl[0].(time.Duration); ok {
expiration = duration
c.tracer.AddSpanAttributes(span,
attribute.Int64("redis.ttl_seconds", int64(expiration.Seconds())),
)
}
}
// 记录开始时间
startTime := time.Now()
// 序列化值
serialized, err := c.serialize(value)
if err != nil {
c.tracer.SetSpanError(span, err)
return err
}
// 构建完整键名
fullKey := c.buildKey(key)
// 执行Redis操作
err = c.client.Set(ctx, fullKey, serialized, expiration).Err()
// 计算执行时间
duration := time.Since(startTime)
c.tracer.AddSpanAttributes(span,
attribute.Int64("redis.duration_ms", duration.Milliseconds()),
attribute.Int("redis.value_size", len(serialized)),
)
// 检查慢操作
if duration > c.config.SlowThreshold {
c.tracer.AddSpanAttributes(span,
attribute.Bool("redis.slow_operation", true),
)
c.logger.Warn("Redis慢操作检测",
zap.String("operation", "set"),
zap.String("key", c.sanitizeKey(key)),
zap.Duration("duration", duration),
zap.String("trace_id", c.tracer.GetTraceID(ctx)),
)
}
// 处理错误
if err != nil {
c.tracer.SetSpanError(span, err)
c.logger.Error("Redis SET操作失败",
zap.String("key", c.sanitizeKey(key)),
zap.Error(err),
zap.String("trace_id", c.tracer.GetTraceID(ctx)),
)
return err
}
c.tracer.SetSpanSuccess(span)
return nil
}
// Delete 删除缓存
func (c *TracedRedisCache) Delete(ctx context.Context, keys ...string) error {
// 开始追踪
ctx, span := c.tracer.StartCacheSpan(ctx, "delete", strings.Join(keys, ","))
defer span.End()
// 添加基础属性
c.tracer.AddSpanAttributes(span,
attribute.String("redis.operation", "delete"),
attribute.Int("redis.key_count", len(keys)),
)
// 记录开始时间
startTime := time.Now()
// 构建完整键名
fullKeys := make([]string, len(keys))
for i, key := range keys {
fullKeys[i] = c.buildKey(key)
}
// 执行Redis操作
deleted, err := c.client.Del(ctx, fullKeys...).Result()
// 计算执行时间
duration := time.Since(startTime)
c.tracer.AddSpanAttributes(span,
attribute.Int64("redis.duration_ms", duration.Milliseconds()),
attribute.Int64("redis.deleted_count", deleted),
)
// 处理错误
if err != nil {
c.tracer.SetSpanError(span, err)
c.logger.Error("Redis DELETE操作失败",
zap.Strings("keys", c.sanitizeKeys(keys)),
zap.Error(err),
zap.String("trace_id", c.tracer.GetTraceID(ctx)),
)
return err
}
c.tracer.SetSpanSuccess(span)
return nil
}
// Exists 检查键是否存在
func (c *TracedRedisCache) Exists(ctx context.Context, key string) (bool, error) {
// 开始追踪
ctx, span := c.tracer.StartCacheSpan(ctx, "exists", key)
defer span.End()
// 添加基础属性
c.addBaseAttributes(span, "exists", key)
// 记录开始时间
startTime := time.Now()
// 构建完整键名
fullKey := c.buildKey(key)
// 执行Redis操作
count, err := c.client.Exists(ctx, fullKey).Result()
// 计算执行时间
duration := time.Since(startTime)
c.tracer.AddSpanAttributes(span,
attribute.Int64("redis.duration_ms", duration.Milliseconds()),
attribute.Bool("redis.exists", count > 0),
)
// 处理错误
if err != nil {
c.tracer.SetSpanError(span, err)
return false, err
}
c.tracer.SetSpanSuccess(span)
return count > 0, nil
}
// GetMultiple 批量获取(基础实现)
func (c *TracedRedisCache) GetMultiple(ctx context.Context, keys []string) (map[string]interface{}, error) {
result := make(map[string]interface{})
// 简单实现逐个获取实际应用中可以使用MGET优化
for _, key := range keys {
var value interface{}
if err := c.Get(ctx, key, &value); err == nil {
result[key] = value
}
}
return result, nil
}
// SetMultiple 批量设置(基础实现)
func (c *TracedRedisCache) SetMultiple(ctx context.Context, data map[string]interface{}, ttl ...interface{}) error {
// 简单实现逐个设置实际应用中可以使用pipeline优化
for key, value := range data {
if err := c.Set(ctx, key, value, ttl...); err != nil {
return err
}
}
return nil
}
// DeletePattern 按模式删除(基础实现)
func (c *TracedRedisCache) DeletePattern(ctx context.Context, pattern string) error {
// 这里需要实现模式删除逻辑
return fmt.Errorf("DeletePattern not implemented")
}
// Keys 获取匹配的键(基础实现)
func (c *TracedRedisCache) Keys(ctx context.Context, pattern string) ([]string, error) {
// 这里需要实现键匹配逻辑
return nil, fmt.Errorf("Keys not implemented")
}
// Stats 获取缓存统计(基础实现)
func (c *TracedRedisCache) Stats(ctx context.Context) (interfaces.CacheStats, error) {
return interfaces.CacheStats{}, fmt.Errorf("Stats not implemented")
}
// 辅助方法
// addBaseAttributes 添加基础属性
func (c *TracedRedisCache) addBaseAttributes(span trace.Span, operation, key string) {
c.tracer.AddSpanAttributes(span,
attribute.String("redis.operation", operation),
attribute.String("db.system", "redis"),
)
if c.config.IncludeKeys {
sanitizedKey := c.sanitizeKey(key)
if len(sanitizedKey) <= c.config.MaxKeyLength {
c.tracer.AddSpanAttributes(span,
attribute.String("redis.key", sanitizedKey),
)
}
}
}
// buildKey 构建完整的Redis键名
func (c *TracedRedisCache) buildKey(key string) string {
if c.prefix == "" {
return key
}
return fmt.Sprintf("%s:%s", c.prefix, key)
}
// sanitizeKey 清理键名用于日志记录
func (c *TracedRedisCache) sanitizeKey(key string) string {
if len(key) <= c.config.MaxKeyLength {
return key
}
return key[:c.config.MaxKeyLength] + "..."
}
// sanitizeKeys 批量清理键名
func (c *TracedRedisCache) sanitizeKeys(keys []string) []string {
result := make([]string, len(keys))
for i, key := range keys {
result[i] = c.sanitizeKey(key)
}
return result
}
// serialize 序列化值(简单实现)
func (c *TracedRedisCache) serialize(value interface{}) (string, error) {
// 这里应该使用JSON或其他序列化方法
return fmt.Sprintf("%v", value), nil
}
// deserialize 反序列化值(简单实现)
func (c *TracedRedisCache) deserialize(data string, dest interface{}) error {
// 这里应该实现真正的反序列化逻辑
return fmt.Errorf("deserialize not fully implemented")
}