468 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			468 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package resilience
 | ||
| 
 | ||
| import (
 | ||
| 	"context"
 | ||
| 	"fmt"
 | ||
| 	"math/rand"
 | ||
| 	"sync"
 | ||
| 	"time"
 | ||
| 
 | ||
| 	"go.uber.org/zap"
 | ||
| )
 | ||
| 
 | ||
| // RetryConfig 重试配置
 | ||
| type RetryConfig struct {
 | ||
| 	// 最大重试次数
 | ||
| 	MaxAttempts int
 | ||
| 	// 初始延迟
 | ||
| 	InitialDelay time.Duration
 | ||
| 	// 最大延迟
 | ||
| 	MaxDelay time.Duration
 | ||
| 	// 退避倍数
 | ||
| 	BackoffMultiplier float64
 | ||
| 	// 抖动系数
 | ||
| 	JitterFactor float64
 | ||
| 	// 重试条件
 | ||
| 	RetryCondition func(error) bool
 | ||
| 	// 延迟函数
 | ||
| 	DelayFunc func(attempt int, config RetryConfig) time.Duration
 | ||
| }
 | ||
| 
 | ||
| // DefaultRetryConfig 默认重试配置
 | ||
| func DefaultRetryConfig() RetryConfig {
 | ||
| 	return RetryConfig{
 | ||
| 		MaxAttempts:       3,
 | ||
| 		InitialDelay:      100 * time.Millisecond,
 | ||
| 		MaxDelay:          5 * time.Second,
 | ||
| 		BackoffMultiplier: 2.0,
 | ||
| 		JitterFactor:      0.1,
 | ||
| 		RetryCondition:    DefaultRetryCondition,
 | ||
| 		DelayFunc:         ExponentialBackoffWithJitter,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // RetryableError 可重试错误接口
 | ||
| type RetryableError interface {
 | ||
| 	error
 | ||
| 	IsRetryable() bool
 | ||
| }
 | ||
| 
 | ||
| // DefaultRetryCondition 默认重试条件
 | ||
| func DefaultRetryCondition(err error) bool {
 | ||
| 	if err == nil {
 | ||
| 		return false
 | ||
| 	}
 | ||
| 
 | ||
| 	// 检查是否实现了RetryableError接口
 | ||
| 	if retryable, ok := err.(RetryableError); ok {
 | ||
| 		return retryable.IsRetryable()
 | ||
| 	}
 | ||
| 
 | ||
| 	// 默认所有错误都重试
 | ||
| 	return true
 | ||
| }
 | ||
| 
 | ||
| // IsRetryableHTTPError HTTP错误重试条件
 | ||
| func IsRetryableHTTPError(statusCode int) bool {
 | ||
| 	// 5xx错误通常可以重试
 | ||
| 	// 429(Too Many Requests)也可以重试
 | ||
| 	return statusCode >= 500 || statusCode == 429
 | ||
| }
 | ||
| 
 | ||
| // DelayFunction 延迟函数类型
 | ||
| type DelayFunction func(attempt int, config RetryConfig) time.Duration
 | ||
| 
 | ||
| // FixedDelay 固定延迟
 | ||
| func FixedDelay(attempt int, config RetryConfig) time.Duration {
 | ||
| 	return config.InitialDelay
 | ||
| }
 | ||
| 
 | ||
| // LinearBackoff 线性退避
 | ||
| func LinearBackoff(attempt int, config RetryConfig) time.Duration {
 | ||
| 	delay := time.Duration(attempt) * config.InitialDelay
 | ||
| 	if delay > config.MaxDelay {
 | ||
| 		delay = config.MaxDelay
 | ||
| 	}
 | ||
| 	return delay
 | ||
| }
 | ||
| 
 | ||
| // ExponentialBackoff 指数退避
 | ||
| func ExponentialBackoff(attempt int, config RetryConfig) time.Duration {
 | ||
| 	delay := config.InitialDelay
 | ||
| 	for i := 0; i < attempt; i++ {
 | ||
| 		delay = time.Duration(float64(delay) * config.BackoffMultiplier)
 | ||
| 	}
 | ||
| 
 | ||
| 	if delay > config.MaxDelay {
 | ||
| 		delay = config.MaxDelay
 | ||
| 	}
 | ||
| 
 | ||
| 	return delay
 | ||
| }
 | ||
| 
 | ||
| // ExponentialBackoffWithJitter 带抖动的指数退避
 | ||
| func ExponentialBackoffWithJitter(attempt int, config RetryConfig) time.Duration {
 | ||
| 	delay := ExponentialBackoff(attempt, config)
 | ||
| 
 | ||
| 	// 添加抖动
 | ||
| 	jitter := config.JitterFactor
 | ||
| 	if jitter > 0 {
 | ||
| 		jitterRange := float64(delay) * jitter
 | ||
| 		jitterOffset := (rand.Float64() - 0.5) * 2 * jitterRange
 | ||
| 		delay = time.Duration(float64(delay) + jitterOffset)
 | ||
| 	}
 | ||
| 
 | ||
| 	if delay < 0 {
 | ||
| 		delay = config.InitialDelay
 | ||
| 	}
 | ||
| 
 | ||
| 	return delay
 | ||
| }
 | ||
| 
 | ||
| // RetryStats 重试统计
 | ||
| type RetryStats struct {
 | ||
| 	TotalAttempts   int           `json:"total_attempts"`
 | ||
| 	Successes       int           `json:"successes"`
 | ||
| 	Failures        int           `json:"failures"`
 | ||
| 	TotalRetries    int           `json:"total_retries"`
 | ||
| 	AverageAttempts float64       `json:"average_attempts"`
 | ||
| 	TotalDelay      time.Duration `json:"total_delay"`
 | ||
| 	LastError       string        `json:"last_error,omitempty"`
 | ||
| }
 | ||
| 
 | ||
| // Retryer 重试器
 | ||
| type Retryer struct {
 | ||
| 	config RetryConfig
 | ||
| 	logger *zap.Logger
 | ||
| 	stats  RetryStats
 | ||
| }
 | ||
| 
 | ||
| // NewRetryer 创建重试器
 | ||
| func NewRetryer(config RetryConfig, logger *zap.Logger) *Retryer {
 | ||
| 	if config.DelayFunc == nil {
 | ||
| 		config.DelayFunc = ExponentialBackoffWithJitter
 | ||
| 	}
 | ||
| 	if config.RetryCondition == nil {
 | ||
| 		config.RetryCondition = DefaultRetryCondition
 | ||
| 	}
 | ||
| 
 | ||
| 	return &Retryer{
 | ||
| 		config: config,
 | ||
| 		logger: logger,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // Execute 执行带重试的函数
 | ||
| func (r *Retryer) Execute(ctx context.Context, operation func() error) error {
 | ||
| 	return r.ExecuteWithResult(ctx, func() (interface{}, error) {
 | ||
| 		return nil, operation()
 | ||
| 	})
 | ||
| }
 | ||
| 
 | ||
| // ExecuteWithResult 执行带重试和返回值的函数
 | ||
| func (r *Retryer) ExecuteWithResult(ctx context.Context, operation func() (interface{}, error)) error {
 | ||
| 	var lastErr error
 | ||
| 	startTime := time.Now()
 | ||
| 
 | ||
| 	for attempt := 0; attempt < r.config.MaxAttempts; attempt++ {
 | ||
| 		// 检查上下文是否被取消
 | ||
| 		select {
 | ||
| 		case <-ctx.Done():
 | ||
| 			return ctx.Err()
 | ||
| 		default:
 | ||
| 		}
 | ||
| 
 | ||
| 		// 执行操作
 | ||
| 		attemptStart := time.Now()
 | ||
| 		_, err := operation()
 | ||
| 		attemptDuration := time.Since(attemptStart)
 | ||
| 
 | ||
| 		// 更新统计
 | ||
| 		r.stats.TotalAttempts++
 | ||
| 		if err == nil {
 | ||
| 			r.stats.Successes++
 | ||
| 			r.logger.Debug("Operation succeeded",
 | ||
| 				zap.Int("attempt", attempt+1),
 | ||
| 				zap.Duration("duration", attemptDuration))
 | ||
| 			return nil
 | ||
| 		}
 | ||
| 
 | ||
| 		lastErr = err
 | ||
| 		r.stats.Failures++
 | ||
| 		if attempt > 0 {
 | ||
| 			r.stats.TotalRetries++
 | ||
| 		}
 | ||
| 
 | ||
| 		// 检查是否应该重试
 | ||
| 		if !r.config.RetryCondition(err) {
 | ||
| 			r.logger.Debug("Error is not retryable",
 | ||
| 				zap.Error(err),
 | ||
| 				zap.Int("attempt", attempt+1))
 | ||
| 			break
 | ||
| 		}
 | ||
| 
 | ||
| 		// 如果这是最后一次尝试,不需要延迟
 | ||
| 		if attempt == r.config.MaxAttempts-1 {
 | ||
| 			r.logger.Debug("Reached max attempts",
 | ||
| 				zap.Error(err),
 | ||
| 				zap.Int("max_attempts", r.config.MaxAttempts))
 | ||
| 			break
 | ||
| 		}
 | ||
| 
 | ||
| 		// 计算延迟
 | ||
| 		delay := r.config.DelayFunc(attempt, r.config)
 | ||
| 		r.stats.TotalDelay += delay
 | ||
| 
 | ||
| 		r.logger.Debug("Operation failed, retrying",
 | ||
| 			zap.Error(err),
 | ||
| 			zap.Int("attempt", attempt+1),
 | ||
| 			zap.Duration("delay", delay),
 | ||
| 			zap.Duration("attempt_duration", attemptDuration))
 | ||
| 
 | ||
| 		// 等待重试
 | ||
| 		select {
 | ||
| 		case <-ctx.Done():
 | ||
| 			return ctx.Err()
 | ||
| 		case <-time.After(delay):
 | ||
| 		}
 | ||
| 	}
 | ||
| 
 | ||
| 	// 更新最终统计
 | ||
| 	totalDuration := time.Since(startTime)
 | ||
| 	if r.stats.TotalAttempts > 0 {
 | ||
| 		r.stats.AverageAttempts = float64(r.stats.TotalRetries) / float64(r.stats.Successes+r.stats.Failures)
 | ||
| 	}
 | ||
| 	if lastErr != nil {
 | ||
| 		r.stats.LastError = lastErr.Error()
 | ||
| 	}
 | ||
| 
 | ||
| 	r.logger.Warn("Operation failed after all retries",
 | ||
| 		zap.Error(lastErr),
 | ||
| 		zap.Int("total_attempts", r.stats.TotalAttempts),
 | ||
| 		zap.Duration("total_duration", totalDuration))
 | ||
| 
 | ||
| 	return fmt.Errorf("operation failed after %d attempts: %w", r.config.MaxAttempts, lastErr)
 | ||
| }
 | ||
| 
 | ||
| // GetStats 获取重试统计
 | ||
| func (r *Retryer) GetStats() RetryStats {
 | ||
| 	return r.stats
 | ||
| }
 | ||
| 
 | ||
| // Reset 重置统计
 | ||
| func (r *Retryer) Reset() {
 | ||
| 	r.stats = RetryStats{}
 | ||
| 	r.logger.Debug("Retry stats reset")
 | ||
| }
 | ||
| 
 | ||
| // Retry 简单重试函数
 | ||
| func Retry(ctx context.Context, config RetryConfig, operation func() error) error {
 | ||
| 	retryer := NewRetryer(config, zap.NewNop())
 | ||
| 	return retryer.Execute(ctx, operation)
 | ||
| }
 | ||
| 
 | ||
| // RetryWithResult 带返回值的重试函数
 | ||
| func RetryWithResult[T any](ctx context.Context, config RetryConfig, operation func() (T, error)) (T, error) {
 | ||
| 	var result T
 | ||
| 	var finalErr error
 | ||
| 
 | ||
| 	retryer := NewRetryer(config, zap.NewNop())
 | ||
| 	err := retryer.ExecuteWithResult(ctx, func() (interface{}, error) {
 | ||
| 		r, e := operation()
 | ||
| 		result = r
 | ||
| 		return r, e
 | ||
| 	})
 | ||
| 
 | ||
| 	if err != nil {
 | ||
| 		finalErr = err
 | ||
| 	}
 | ||
| 
 | ||
| 	return result, finalErr
 | ||
| }
 | ||
| 
 | ||
| // 预定义的重试配置
 | ||
| 
 | ||
| // QuickRetry 快速重试(适用于轻量级操作)
 | ||
| func QuickRetry() RetryConfig {
 | ||
| 	return RetryConfig{
 | ||
| 		MaxAttempts:       3,
 | ||
| 		InitialDelay:      50 * time.Millisecond,
 | ||
| 		MaxDelay:          500 * time.Millisecond,
 | ||
| 		BackoffMultiplier: 2.0,
 | ||
| 		JitterFactor:      0.1,
 | ||
| 		RetryCondition:    DefaultRetryCondition,
 | ||
| 		DelayFunc:         ExponentialBackoffWithJitter,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // StandardRetry 标准重试(适用于一般操作)
 | ||
| func StandardRetry() RetryConfig {
 | ||
| 	return DefaultRetryConfig()
 | ||
| }
 | ||
| 
 | ||
| // PatientRetry 耐心重试(适用于重要操作)
 | ||
| func PatientRetry() RetryConfig {
 | ||
| 	return RetryConfig{
 | ||
| 		MaxAttempts:       5,
 | ||
| 		InitialDelay:      200 * time.Millisecond,
 | ||
| 		MaxDelay:          10 * time.Second,
 | ||
| 		BackoffMultiplier: 2.0,
 | ||
| 		JitterFactor:      0.2,
 | ||
| 		RetryCondition:    DefaultRetryCondition,
 | ||
| 		DelayFunc:         ExponentialBackoffWithJitter,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // DatabaseRetry 数据库重试配置
 | ||
| func DatabaseRetry() RetryConfig {
 | ||
| 	return RetryConfig{
 | ||
| 		MaxAttempts:       3,
 | ||
| 		InitialDelay:      100 * time.Millisecond,
 | ||
| 		MaxDelay:          2 * time.Second,
 | ||
| 		BackoffMultiplier: 1.5,
 | ||
| 		JitterFactor:      0.1,
 | ||
| 		RetryCondition: func(err error) bool {
 | ||
| 			// 这里可以根据具体的数据库错误类型判断
 | ||
| 			// 例如:连接超时、临时网络错误等
 | ||
| 			return DefaultRetryCondition(err)
 | ||
| 		},
 | ||
| 		DelayFunc: ExponentialBackoffWithJitter,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // HTTPRetry HTTP重试配置
 | ||
| func HTTPRetry() RetryConfig {
 | ||
| 	return RetryConfig{
 | ||
| 		MaxAttempts:       3,
 | ||
| 		InitialDelay:      200 * time.Millisecond,
 | ||
| 		MaxDelay:          5 * time.Second,
 | ||
| 		BackoffMultiplier: 2.0,
 | ||
| 		JitterFactor:      0.15,
 | ||
| 		RetryCondition: func(err error) bool {
 | ||
| 			// HTTP相关的重试条件
 | ||
| 			return DefaultRetryCondition(err)
 | ||
| 		},
 | ||
| 		DelayFunc: ExponentialBackoffWithJitter,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // RetryManager 重试管理器
 | ||
| type RetryManager struct {
 | ||
| 	retryers map[string]*Retryer
 | ||
| 	logger   *zap.Logger
 | ||
| 	mutex    sync.RWMutex
 | ||
| }
 | ||
| 
 | ||
| // NewRetryManager 创建重试管理器
 | ||
| func NewRetryManager(logger *zap.Logger) *RetryManager {
 | ||
| 	return &RetryManager{
 | ||
| 		retryers: make(map[string]*Retryer),
 | ||
| 		logger:   logger,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // GetOrCreate 获取或创建重试器
 | ||
| func (rm *RetryManager) GetOrCreate(name string, config RetryConfig) *Retryer {
 | ||
| 	rm.mutex.Lock()
 | ||
| 	defer rm.mutex.Unlock()
 | ||
| 
 | ||
| 	if retryer, exists := rm.retryers[name]; exists {
 | ||
| 		return retryer
 | ||
| 	}
 | ||
| 
 | ||
| 	retryer := NewRetryer(config, rm.logger.Named(name))
 | ||
| 	rm.retryers[name] = retryer
 | ||
| 
 | ||
| 	rm.logger.Info("Created retryer", zap.String("name", name))
 | ||
| 	return retryer
 | ||
| }
 | ||
| 
 | ||
| // Execute 执行带重试的操作
 | ||
| func (rm *RetryManager) Execute(ctx context.Context, name string, operation func() error) error {
 | ||
| 	retryer := rm.GetOrCreate(name, DefaultRetryConfig())
 | ||
| 	return retryer.Execute(ctx, operation)
 | ||
| }
 | ||
| 
 | ||
| // GetStats 获取所有重试器统计
 | ||
| func (rm *RetryManager) GetStats() map[string]RetryStats {
 | ||
| 	rm.mutex.RLock()
 | ||
| 	defer rm.mutex.RUnlock()
 | ||
| 
 | ||
| 	stats := make(map[string]RetryStats)
 | ||
| 	for name, retryer := range rm.retryers {
 | ||
| 		stats[name] = retryer.GetStats()
 | ||
| 	}
 | ||
| 
 | ||
| 	return stats
 | ||
| }
 | ||
| 
 | ||
| // ResetAll 重置所有重试器统计
 | ||
| func (rm *RetryManager) ResetAll() {
 | ||
| 	rm.mutex.RLock()
 | ||
| 	defer rm.mutex.RUnlock()
 | ||
| 
 | ||
| 	for name, retryer := range rm.retryers {
 | ||
| 		retryer.Reset()
 | ||
| 		rm.logger.Info("Reset retryer stats", zap.String("name", name))
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // RetryerWrapper 重试器包装器
 | ||
| type RetryerWrapper struct {
 | ||
| 	manager *RetryManager
 | ||
| 	logger  *zap.Logger
 | ||
| }
 | ||
| 
 | ||
| // NewRetryerWrapper 创建重试器包装器
 | ||
| func NewRetryerWrapper(logger *zap.Logger) *RetryerWrapper {
 | ||
| 	return &RetryerWrapper{
 | ||
| 		manager: NewRetryManager(logger),
 | ||
| 		logger:  logger,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // ExecuteWithQuickRetry 执行快速重试
 | ||
| func (rw *RetryerWrapper) ExecuteWithQuickRetry(ctx context.Context, name string, operation func() error) error {
 | ||
| 	retryer := rw.manager.GetOrCreate(name+".quick", QuickRetry())
 | ||
| 	return retryer.Execute(ctx, operation)
 | ||
| }
 | ||
| 
 | ||
| // ExecuteWithStandardRetry 执行标准重试
 | ||
| func (rw *RetryerWrapper) ExecuteWithStandardRetry(ctx context.Context, name string, operation func() error) error {
 | ||
| 	retryer := rw.manager.GetOrCreate(name+".standard", StandardRetry())
 | ||
| 	return retryer.Execute(ctx, operation)
 | ||
| }
 | ||
| 
 | ||
| // ExecuteWithDatabaseRetry 执行数据库重试
 | ||
| func (rw *RetryerWrapper) ExecuteWithDatabaseRetry(ctx context.Context, name string, operation func() error) error {
 | ||
| 	retryer := rw.manager.GetOrCreate(name+".database", DatabaseRetry())
 | ||
| 	return retryer.Execute(ctx, operation)
 | ||
| }
 | ||
| 
 | ||
| // ExecuteWithHTTPRetry 执行HTTP重试
 | ||
| func (rw *RetryerWrapper) ExecuteWithHTTPRetry(ctx context.Context, name string, operation func() error) error {
 | ||
| 	retryer := rw.manager.GetOrCreate(name+".http", HTTPRetry())
 | ||
| 	return retryer.Execute(ctx, operation)
 | ||
| }
 | ||
| 
 | ||
| // ExecuteWithCustomRetry 执行自定义重试
 | ||
| func (rw *RetryerWrapper) ExecuteWithCustomRetry(ctx context.Context, name string, config RetryConfig, operation func() error) error {
 | ||
| 	retryer := rw.manager.GetOrCreate(name+".custom", config)
 | ||
| 	return retryer.Execute(ctx, operation)
 | ||
| }
 | ||
| 
 | ||
| // GetManager 获取重试管理器
 | ||
| func (rw *RetryerWrapper) GetManager() *RetryManager {
 | ||
| 	return rw.manager
 | ||
| }
 | ||
| 
 | ||
| // GetAllStats 获取所有统计信息
 | ||
| func (rw *RetryerWrapper) GetAllStats() map[string]RetryStats {
 | ||
| 	return rw.manager.GetStats()
 | ||
| }
 | ||
| 
 | ||
| // ResetAllStats 重置所有统计信息
 | ||
| func (rw *RetryerWrapper) ResetAllStats() {
 | ||
| 	rw.manager.ResetAll()
 | ||
| }
 |