468 lines
12 KiB
Go
468 lines
12 KiB
Go
|
|
package resilience
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"context"
|
|||
|
|
"fmt"
|
|||
|
|
"math/rand"
|
|||
|
|
"sync"
|
|||
|
|
"time"
|
|||
|
|
|
|||
|
|
"go.uber.org/zap"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// RetryConfig 重试配置
|
|||
|
|
type RetryConfig struct {
|
|||
|
|
// 最大重试次数
|
|||
|
|
MaxAttempts int
|
|||
|
|
// 初始延迟
|
|||
|
|
InitialDelay time.Duration
|
|||
|
|
// 最大延迟
|
|||
|
|
MaxDelay time.Duration
|
|||
|
|
// 退避倍数
|
|||
|
|
BackoffMultiplier float64
|
|||
|
|
// 抖动系数
|
|||
|
|
JitterFactor float64
|
|||
|
|
// 重试条件
|
|||
|
|
RetryCondition func(error) bool
|
|||
|
|
// 延迟函数
|
|||
|
|
DelayFunc func(attempt int, config RetryConfig) time.Duration
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// DefaultRetryConfig 默认重试配置
|
|||
|
|
func DefaultRetryConfig() RetryConfig {
|
|||
|
|
return RetryConfig{
|
|||
|
|
MaxAttempts: 3,
|
|||
|
|
InitialDelay: 100 * time.Millisecond,
|
|||
|
|
MaxDelay: 5 * time.Second,
|
|||
|
|
BackoffMultiplier: 2.0,
|
|||
|
|
JitterFactor: 0.1,
|
|||
|
|
RetryCondition: DefaultRetryCondition,
|
|||
|
|
DelayFunc: ExponentialBackoffWithJitter,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RetryableError 可重试错误接口
|
|||
|
|
type RetryableError interface {
|
|||
|
|
error
|
|||
|
|
IsRetryable() bool
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// DefaultRetryCondition 默认重试条件
|
|||
|
|
func DefaultRetryCondition(err error) bool {
|
|||
|
|
if err == nil {
|
|||
|
|
return false
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 检查是否实现了RetryableError接口
|
|||
|
|
if retryable, ok := err.(RetryableError); ok {
|
|||
|
|
return retryable.IsRetryable()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 默认所有错误都重试
|
|||
|
|
return true
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// IsRetryableHTTPError HTTP错误重试条件
|
|||
|
|
func IsRetryableHTTPError(statusCode int) bool {
|
|||
|
|
// 5xx错误通常可以重试
|
|||
|
|
// 429(Too Many Requests)也可以重试
|
|||
|
|
return statusCode >= 500 || statusCode == 429
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// DelayFunction 延迟函数类型
|
|||
|
|
type DelayFunction func(attempt int, config RetryConfig) time.Duration
|
|||
|
|
|
|||
|
|
// FixedDelay 固定延迟
|
|||
|
|
func FixedDelay(attempt int, config RetryConfig) time.Duration {
|
|||
|
|
return config.InitialDelay
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// LinearBackoff 线性退避
|
|||
|
|
func LinearBackoff(attempt int, config RetryConfig) time.Duration {
|
|||
|
|
delay := time.Duration(attempt) * config.InitialDelay
|
|||
|
|
if delay > config.MaxDelay {
|
|||
|
|
delay = config.MaxDelay
|
|||
|
|
}
|
|||
|
|
return delay
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ExponentialBackoff 指数退避
|
|||
|
|
func ExponentialBackoff(attempt int, config RetryConfig) time.Duration {
|
|||
|
|
delay := config.InitialDelay
|
|||
|
|
for i := 0; i < attempt; i++ {
|
|||
|
|
delay = time.Duration(float64(delay) * config.BackoffMultiplier)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if delay > config.MaxDelay {
|
|||
|
|
delay = config.MaxDelay
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return delay
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ExponentialBackoffWithJitter 带抖动的指数退避
|
|||
|
|
func ExponentialBackoffWithJitter(attempt int, config RetryConfig) time.Duration {
|
|||
|
|
delay := ExponentialBackoff(attempt, config)
|
|||
|
|
|
|||
|
|
// 添加抖动
|
|||
|
|
jitter := config.JitterFactor
|
|||
|
|
if jitter > 0 {
|
|||
|
|
jitterRange := float64(delay) * jitter
|
|||
|
|
jitterOffset := (rand.Float64() - 0.5) * 2 * jitterRange
|
|||
|
|
delay = time.Duration(float64(delay) + jitterOffset)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if delay < 0 {
|
|||
|
|
delay = config.InitialDelay
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return delay
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RetryStats 重试统计
|
|||
|
|
type RetryStats struct {
|
|||
|
|
TotalAttempts int `json:"total_attempts"`
|
|||
|
|
Successes int `json:"successes"`
|
|||
|
|
Failures int `json:"failures"`
|
|||
|
|
TotalRetries int `json:"total_retries"`
|
|||
|
|
AverageAttempts float64 `json:"average_attempts"`
|
|||
|
|
TotalDelay time.Duration `json:"total_delay"`
|
|||
|
|
LastError string `json:"last_error,omitempty"`
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Retryer 重试器
|
|||
|
|
type Retryer struct {
|
|||
|
|
config RetryConfig
|
|||
|
|
logger *zap.Logger
|
|||
|
|
stats RetryStats
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewRetryer 创建重试器
|
|||
|
|
func NewRetryer(config RetryConfig, logger *zap.Logger) *Retryer {
|
|||
|
|
if config.DelayFunc == nil {
|
|||
|
|
config.DelayFunc = ExponentialBackoffWithJitter
|
|||
|
|
}
|
|||
|
|
if config.RetryCondition == nil {
|
|||
|
|
config.RetryCondition = DefaultRetryCondition
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return &Retryer{
|
|||
|
|
config: config,
|
|||
|
|
logger: logger,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Execute 执行带重试的函数
|
|||
|
|
func (r *Retryer) Execute(ctx context.Context, operation func() error) error {
|
|||
|
|
return r.ExecuteWithResult(ctx, func() (interface{}, error) {
|
|||
|
|
return nil, operation()
|
|||
|
|
})
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ExecuteWithResult 执行带重试和返回值的函数
|
|||
|
|
func (r *Retryer) ExecuteWithResult(ctx context.Context, operation func() (interface{}, error)) error {
|
|||
|
|
var lastErr error
|
|||
|
|
startTime := time.Now()
|
|||
|
|
|
|||
|
|
for attempt := 0; attempt < r.config.MaxAttempts; attempt++ {
|
|||
|
|
// 检查上下文是否被取消
|
|||
|
|
select {
|
|||
|
|
case <-ctx.Done():
|
|||
|
|
return ctx.Err()
|
|||
|
|
default:
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 执行操作
|
|||
|
|
attemptStart := time.Now()
|
|||
|
|
_, err := operation()
|
|||
|
|
attemptDuration := time.Since(attemptStart)
|
|||
|
|
|
|||
|
|
// 更新统计
|
|||
|
|
r.stats.TotalAttempts++
|
|||
|
|
if err == nil {
|
|||
|
|
r.stats.Successes++
|
|||
|
|
r.logger.Debug("Operation succeeded",
|
|||
|
|
zap.Int("attempt", attempt+1),
|
|||
|
|
zap.Duration("duration", attemptDuration))
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
lastErr = err
|
|||
|
|
r.stats.Failures++
|
|||
|
|
if attempt > 0 {
|
|||
|
|
r.stats.TotalRetries++
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 检查是否应该重试
|
|||
|
|
if !r.config.RetryCondition(err) {
|
|||
|
|
r.logger.Debug("Error is not retryable",
|
|||
|
|
zap.Error(err),
|
|||
|
|
zap.Int("attempt", attempt+1))
|
|||
|
|
break
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 如果这是最后一次尝试,不需要延迟
|
|||
|
|
if attempt == r.config.MaxAttempts-1 {
|
|||
|
|
r.logger.Debug("Reached max attempts",
|
|||
|
|
zap.Error(err),
|
|||
|
|
zap.Int("max_attempts", r.config.MaxAttempts))
|
|||
|
|
break
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 计算延迟
|
|||
|
|
delay := r.config.DelayFunc(attempt, r.config)
|
|||
|
|
r.stats.TotalDelay += delay
|
|||
|
|
|
|||
|
|
r.logger.Debug("Operation failed, retrying",
|
|||
|
|
zap.Error(err),
|
|||
|
|
zap.Int("attempt", attempt+1),
|
|||
|
|
zap.Duration("delay", delay),
|
|||
|
|
zap.Duration("attempt_duration", attemptDuration))
|
|||
|
|
|
|||
|
|
// 等待重试
|
|||
|
|
select {
|
|||
|
|
case <-ctx.Done():
|
|||
|
|
return ctx.Err()
|
|||
|
|
case <-time.After(delay):
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 更新最终统计
|
|||
|
|
totalDuration := time.Since(startTime)
|
|||
|
|
if r.stats.TotalAttempts > 0 {
|
|||
|
|
r.stats.AverageAttempts = float64(r.stats.TotalRetries) / float64(r.stats.Successes+r.stats.Failures)
|
|||
|
|
}
|
|||
|
|
if lastErr != nil {
|
|||
|
|
r.stats.LastError = lastErr.Error()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
r.logger.Warn("Operation failed after all retries",
|
|||
|
|
zap.Error(lastErr),
|
|||
|
|
zap.Int("total_attempts", r.stats.TotalAttempts),
|
|||
|
|
zap.Duration("total_duration", totalDuration))
|
|||
|
|
|
|||
|
|
return fmt.Errorf("operation failed after %d attempts: %w", r.config.MaxAttempts, lastErr)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetStats 获取重试统计
|
|||
|
|
func (r *Retryer) GetStats() RetryStats {
|
|||
|
|
return r.stats
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Reset 重置统计
|
|||
|
|
func (r *Retryer) Reset() {
|
|||
|
|
r.stats = RetryStats{}
|
|||
|
|
r.logger.Debug("Retry stats reset")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Retry 简单重试函数
|
|||
|
|
func Retry(ctx context.Context, config RetryConfig, operation func() error) error {
|
|||
|
|
retryer := NewRetryer(config, zap.NewNop())
|
|||
|
|
return retryer.Execute(ctx, operation)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RetryWithResult 带返回值的重试函数
|
|||
|
|
func RetryWithResult[T any](ctx context.Context, config RetryConfig, operation func() (T, error)) (T, error) {
|
|||
|
|
var result T
|
|||
|
|
var finalErr error
|
|||
|
|
|
|||
|
|
retryer := NewRetryer(config, zap.NewNop())
|
|||
|
|
err := retryer.ExecuteWithResult(ctx, func() (interface{}, error) {
|
|||
|
|
r, e := operation()
|
|||
|
|
result = r
|
|||
|
|
return r, e
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
if err != nil {
|
|||
|
|
finalErr = err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return result, finalErr
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 预定义的重试配置
|
|||
|
|
|
|||
|
|
// QuickRetry 快速重试(适用于轻量级操作)
|
|||
|
|
func QuickRetry() RetryConfig {
|
|||
|
|
return RetryConfig{
|
|||
|
|
MaxAttempts: 3,
|
|||
|
|
InitialDelay: 50 * time.Millisecond,
|
|||
|
|
MaxDelay: 500 * time.Millisecond,
|
|||
|
|
BackoffMultiplier: 2.0,
|
|||
|
|
JitterFactor: 0.1,
|
|||
|
|
RetryCondition: DefaultRetryCondition,
|
|||
|
|
DelayFunc: ExponentialBackoffWithJitter,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// StandardRetry 标准重试(适用于一般操作)
|
|||
|
|
func StandardRetry() RetryConfig {
|
|||
|
|
return DefaultRetryConfig()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// PatientRetry 耐心重试(适用于重要操作)
|
|||
|
|
func PatientRetry() RetryConfig {
|
|||
|
|
return RetryConfig{
|
|||
|
|
MaxAttempts: 5,
|
|||
|
|
InitialDelay: 200 * time.Millisecond,
|
|||
|
|
MaxDelay: 10 * time.Second,
|
|||
|
|
BackoffMultiplier: 2.0,
|
|||
|
|
JitterFactor: 0.2,
|
|||
|
|
RetryCondition: DefaultRetryCondition,
|
|||
|
|
DelayFunc: ExponentialBackoffWithJitter,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// DatabaseRetry 数据库重试配置
|
|||
|
|
func DatabaseRetry() RetryConfig {
|
|||
|
|
return RetryConfig{
|
|||
|
|
MaxAttempts: 3,
|
|||
|
|
InitialDelay: 100 * time.Millisecond,
|
|||
|
|
MaxDelay: 2 * time.Second,
|
|||
|
|
BackoffMultiplier: 1.5,
|
|||
|
|
JitterFactor: 0.1,
|
|||
|
|
RetryCondition: func(err error) bool {
|
|||
|
|
// 这里可以根据具体的数据库错误类型判断
|
|||
|
|
// 例如:连接超时、临时网络错误等
|
|||
|
|
return DefaultRetryCondition(err)
|
|||
|
|
},
|
|||
|
|
DelayFunc: ExponentialBackoffWithJitter,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// HTTPRetry HTTP重试配置
|
|||
|
|
func HTTPRetry() RetryConfig {
|
|||
|
|
return RetryConfig{
|
|||
|
|
MaxAttempts: 3,
|
|||
|
|
InitialDelay: 200 * time.Millisecond,
|
|||
|
|
MaxDelay: 5 * time.Second,
|
|||
|
|
BackoffMultiplier: 2.0,
|
|||
|
|
JitterFactor: 0.15,
|
|||
|
|
RetryCondition: func(err error) bool {
|
|||
|
|
// HTTP相关的重试条件
|
|||
|
|
return DefaultRetryCondition(err)
|
|||
|
|
},
|
|||
|
|
DelayFunc: ExponentialBackoffWithJitter,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RetryManager 重试管理器
|
|||
|
|
type RetryManager struct {
|
|||
|
|
retryers map[string]*Retryer
|
|||
|
|
logger *zap.Logger
|
|||
|
|
mutex sync.RWMutex
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewRetryManager 创建重试管理器
|
|||
|
|
func NewRetryManager(logger *zap.Logger) *RetryManager {
|
|||
|
|
return &RetryManager{
|
|||
|
|
retryers: make(map[string]*Retryer),
|
|||
|
|
logger: logger,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetOrCreate 获取或创建重试器
|
|||
|
|
func (rm *RetryManager) GetOrCreate(name string, config RetryConfig) *Retryer {
|
|||
|
|
rm.mutex.Lock()
|
|||
|
|
defer rm.mutex.Unlock()
|
|||
|
|
|
|||
|
|
if retryer, exists := rm.retryers[name]; exists {
|
|||
|
|
return retryer
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
retryer := NewRetryer(config, rm.logger.Named(name))
|
|||
|
|
rm.retryers[name] = retryer
|
|||
|
|
|
|||
|
|
rm.logger.Info("Created retryer", zap.String("name", name))
|
|||
|
|
return retryer
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Execute 执行带重试的操作
|
|||
|
|
func (rm *RetryManager) Execute(ctx context.Context, name string, operation func() error) error {
|
|||
|
|
retryer := rm.GetOrCreate(name, DefaultRetryConfig())
|
|||
|
|
return retryer.Execute(ctx, operation)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetStats 获取所有重试器统计
|
|||
|
|
func (rm *RetryManager) GetStats() map[string]RetryStats {
|
|||
|
|
rm.mutex.RLock()
|
|||
|
|
defer rm.mutex.RUnlock()
|
|||
|
|
|
|||
|
|
stats := make(map[string]RetryStats)
|
|||
|
|
for name, retryer := range rm.retryers {
|
|||
|
|
stats[name] = retryer.GetStats()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return stats
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ResetAll 重置所有重试器统计
|
|||
|
|
func (rm *RetryManager) ResetAll() {
|
|||
|
|
rm.mutex.RLock()
|
|||
|
|
defer rm.mutex.RUnlock()
|
|||
|
|
|
|||
|
|
for name, retryer := range rm.retryers {
|
|||
|
|
retryer.Reset()
|
|||
|
|
rm.logger.Info("Reset retryer stats", zap.String("name", name))
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// RetryerWrapper 重试器包装器
|
|||
|
|
type RetryerWrapper struct {
|
|||
|
|
manager *RetryManager
|
|||
|
|
logger *zap.Logger
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewRetryerWrapper 创建重试器包装器
|
|||
|
|
func NewRetryerWrapper(logger *zap.Logger) *RetryerWrapper {
|
|||
|
|
return &RetryerWrapper{
|
|||
|
|
manager: NewRetryManager(logger),
|
|||
|
|
logger: logger,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ExecuteWithQuickRetry 执行快速重试
|
|||
|
|
func (rw *RetryerWrapper) ExecuteWithQuickRetry(ctx context.Context, name string, operation func() error) error {
|
|||
|
|
retryer := rw.manager.GetOrCreate(name+".quick", QuickRetry())
|
|||
|
|
return retryer.Execute(ctx, operation)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ExecuteWithStandardRetry 执行标准重试
|
|||
|
|
func (rw *RetryerWrapper) ExecuteWithStandardRetry(ctx context.Context, name string, operation func() error) error {
|
|||
|
|
retryer := rw.manager.GetOrCreate(name+".standard", StandardRetry())
|
|||
|
|
return retryer.Execute(ctx, operation)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ExecuteWithDatabaseRetry 执行数据库重试
|
|||
|
|
func (rw *RetryerWrapper) ExecuteWithDatabaseRetry(ctx context.Context, name string, operation func() error) error {
|
|||
|
|
retryer := rw.manager.GetOrCreate(name+".database", DatabaseRetry())
|
|||
|
|
return retryer.Execute(ctx, operation)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ExecuteWithHTTPRetry 执行HTTP重试
|
|||
|
|
func (rw *RetryerWrapper) ExecuteWithHTTPRetry(ctx context.Context, name string, operation func() error) error {
|
|||
|
|
retryer := rw.manager.GetOrCreate(name+".http", HTTPRetry())
|
|||
|
|
return retryer.Execute(ctx, operation)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ExecuteWithCustomRetry 执行自定义重试
|
|||
|
|
func (rw *RetryerWrapper) ExecuteWithCustomRetry(ctx context.Context, name string, config RetryConfig, operation func() error) error {
|
|||
|
|
retryer := rw.manager.GetOrCreate(name+".custom", config)
|
|||
|
|
return retryer.Execute(ctx, operation)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetManager 获取重试管理器
|
|||
|
|
func (rw *RetryerWrapper) GetManager() *RetryManager {
|
|||
|
|
return rw.manager
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// GetAllStats 获取所有统计信息
|
|||
|
|
func (rw *RetryerWrapper) GetAllStats() map[string]RetryStats {
|
|||
|
|
return rw.manager.GetStats()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ResetAllStats 重置所有统计信息
|
|||
|
|
func (rw *RetryerWrapper) ResetAllStats() {
|
|||
|
|
rw.manager.ResetAll()
|
|||
|
|
}
|