package resilience import ( "context" "errors" "sync" "time" "go.uber.org/zap" ) // CircuitState 熔断器状态 type CircuitState int const ( // StateClosed 关闭状态(正常) StateClosed CircuitState = iota // StateOpen 开启状态(熔断) StateOpen // StateHalfOpen 半开状态(测试) StateHalfOpen ) func (s CircuitState) String() string { switch s { case StateClosed: return "CLOSED" case StateOpen: return "OPEN" case StateHalfOpen: return "HALF_OPEN" default: return "UNKNOWN" } } // CircuitBreakerConfig 熔断器配置 type CircuitBreakerConfig struct { // 故障阈值 FailureThreshold int // 重置超时时间 ResetTimeout time.Duration // 检测窗口大小 WindowSize int // 半开状态允许的请求数 HalfOpenMaxRequests int // 成功阈值(半开->关闭) SuccessThreshold int } // DefaultCircuitBreakerConfig 默认熔断器配置 func DefaultCircuitBreakerConfig() CircuitBreakerConfig { return CircuitBreakerConfig{ FailureThreshold: 5, ResetTimeout: 60 * time.Second, WindowSize: 10, HalfOpenMaxRequests: 3, SuccessThreshold: 2, } } // CircuitBreaker 熔断器 type CircuitBreaker struct { config CircuitBreakerConfig logger *zap.Logger mutex sync.RWMutex // 状态 state CircuitState // 计数器 failures int successes int requests int consecutiveFailures int // 时间记录 lastFailTime time.Time lastStateChange time.Time // 统计窗口 window []bool // true=success, false=failure windowIndex int windowFull bool // 事件回调 onStateChange func(from, to CircuitState) } // NewCircuitBreaker 创建熔断器 func NewCircuitBreaker(config CircuitBreakerConfig, logger *zap.Logger) *CircuitBreaker { cb := &CircuitBreaker{ config: config, logger: logger, state: StateClosed, window: make([]bool, config.WindowSize), lastStateChange: time.Now(), } return cb } // Execute 执行函数,如果熔断器开启则快速失败 func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() error) error { // 检查是否允许执行 if !cb.allowRequest() { return ErrCircuitBreakerOpen } // 执行函数 start := time.Now() err := fn() duration := time.Since(start) // 记录结果 cb.recordResult(err == nil, duration) return err } // allowRequest 检查是否允许请求 func (cb *CircuitBreaker) allowRequest() bool { cb.mutex.Lock() defer cb.mutex.Unlock() now := time.Now() switch cb.state { case StateClosed: return true case StateOpen: // 检查是否到了重置时间 if now.Sub(cb.lastStateChange) > cb.config.ResetTimeout { cb.setState(StateHalfOpen) return true } return false case StateHalfOpen: // 半开状态下限制请求数 return cb.requests < cb.config.HalfOpenMaxRequests default: return false } } // recordResult 记录执行结果 func (cb *CircuitBreaker) recordResult(success bool, duration time.Duration) { cb.mutex.Lock() defer cb.mutex.Unlock() cb.requests++ // 更新滑动窗口 cb.updateWindow(success) if success { cb.successes++ cb.consecutiveFailures = 0 cb.onSuccess() } else { cb.failures++ cb.consecutiveFailures++ cb.lastFailTime = time.Now() cb.onFailure() } cb.logger.Debug("Circuit breaker recorded result", zap.Bool("success", success), zap.Duration("duration", duration), zap.String("state", cb.state.String()), zap.Int("failures", cb.failures), zap.Int("successes", cb.successes)) } // updateWindow 更新滑动窗口 func (cb *CircuitBreaker) updateWindow(success bool) { cb.window[cb.windowIndex] = success cb.windowIndex = (cb.windowIndex + 1) % cb.config.WindowSize if cb.windowIndex == 0 { cb.windowFull = true } } // onSuccess 成功时的处理 func (cb *CircuitBreaker) onSuccess() { if cb.state == StateHalfOpen { // 半开状态下,如果成功次数达到阈值,则关闭熔断器 if cb.successes >= cb.config.SuccessThreshold { cb.setState(StateClosed) } } } // onFailure 失败时的处理 func (cb *CircuitBreaker) onFailure() { if cb.state == StateClosed { // 关闭状态下,检查是否需要开启熔断器 if cb.shouldTrip() { cb.setState(StateOpen) } } else if cb.state == StateHalfOpen { // 半开状态下,如果失败则立即开启熔断器 cb.setState(StateOpen) } } // shouldTrip 检查是否应该触发熔断 func (cb *CircuitBreaker) shouldTrip() bool { // 基于连续失败次数 if cb.consecutiveFailures >= cb.config.FailureThreshold { return true } // 基于滑动窗口的失败率 if cb.windowFull { failures := 0 for _, success := range cb.window { if !success { failures++ } } failureRate := float64(failures) / float64(cb.config.WindowSize) return failureRate >= 0.5 // 50%失败率 } return false } // setState 设置状态 func (cb *CircuitBreaker) setState(newState CircuitState) { if cb.state == newState { return } oldState := cb.state cb.state = newState cb.lastStateChange = time.Now() // 重置计数器 if newState == StateClosed { cb.requests = 0 cb.failures = 0 cb.successes = 0 cb.consecutiveFailures = 0 } else if newState == StateHalfOpen { cb.requests = 0 cb.successes = 0 } cb.logger.Info("Circuit breaker state changed", zap.String("from", oldState.String()), zap.String("to", newState.String()), zap.Int("failures", cb.failures), zap.Int("successes", cb.successes)) // 触发状态变更回调 if cb.onStateChange != nil { cb.onStateChange(oldState, newState) } } // GetState 获取当前状态 func (cb *CircuitBreaker) GetState() CircuitState { cb.mutex.RLock() defer cb.mutex.RUnlock() return cb.state } // GetStats 获取统计信息 func (cb *CircuitBreaker) GetStats() CircuitBreakerStats { cb.mutex.RLock() defer cb.mutex.RUnlock() return CircuitBreakerStats{ State: cb.state.String(), Failures: cb.failures, Successes: cb.successes, Requests: cb.requests, ConsecutiveFailures: cb.consecutiveFailures, LastFailTime: cb.lastFailTime, LastStateChange: cb.lastStateChange, FailureThreshold: cb.config.FailureThreshold, ResetTimeout: cb.config.ResetTimeout, } } // Reset 重置熔断器 func (cb *CircuitBreaker) Reset() { cb.mutex.Lock() defer cb.mutex.Unlock() cb.setState(StateClosed) cb.window = make([]bool, cb.config.WindowSize) cb.windowIndex = 0 cb.windowFull = false cb.logger.Info("Circuit breaker reset") } // SetStateChangeCallback 设置状态变更回调 func (cb *CircuitBreaker) SetStateChangeCallback(callback func(from, to CircuitState)) { cb.mutex.Lock() defer cb.mutex.Unlock() cb.onStateChange = callback } // CircuitBreakerStats 熔断器统计信息 type CircuitBreakerStats struct { State string `json:"state"` Failures int `json:"failures"` Successes int `json:"successes"` Requests int `json:"requests"` ConsecutiveFailures int `json:"consecutive_failures"` LastFailTime time.Time `json:"last_fail_time"` LastStateChange time.Time `json:"last_state_change"` FailureThreshold int `json:"failure_threshold"` ResetTimeout time.Duration `json:"reset_timeout"` } // 预定义错误 var ( ErrCircuitBreakerOpen = errors.New("circuit breaker is open") ) // Wrapper 熔断器包装器 type Wrapper struct { breakers map[string]*CircuitBreaker logger *zap.Logger mutex sync.RWMutex } // NewWrapper 创建熔断器包装器 func NewWrapper(logger *zap.Logger) *Wrapper { return &Wrapper{ breakers: make(map[string]*CircuitBreaker), logger: logger, } } // GetOrCreate 获取或创建熔断器 func (w *Wrapper) GetOrCreate(name string, config CircuitBreakerConfig) *CircuitBreaker { w.mutex.Lock() defer w.mutex.Unlock() if cb, exists := w.breakers[name]; exists { return cb } cb := NewCircuitBreaker(config, w.logger.Named(name)) w.breakers[name] = cb w.logger.Info("Created circuit breaker", zap.String("name", name)) return cb } // Execute 执行带熔断器的函数 func (w *Wrapper) Execute(ctx context.Context, name string, fn func() error) error { cb := w.GetOrCreate(name, DefaultCircuitBreakerConfig()) return cb.Execute(ctx, fn) } // GetStats 获取所有熔断器统计 func (w *Wrapper) GetStats() map[string]CircuitBreakerStats { w.mutex.RLock() defer w.mutex.RUnlock() stats := make(map[string]CircuitBreakerStats) for name, cb := range w.breakers { stats[name] = cb.GetStats() } return stats } // ResetAll 重置所有熔断器 func (w *Wrapper) ResetAll() { w.mutex.RLock() defer w.mutex.RUnlock() for name, cb := range w.breakers { cb.Reset() w.logger.Info("Reset circuit breaker", zap.String("name", name)) } }