390 lines
8.7 KiB
Go
390 lines
8.7 KiB
Go
package resilience
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// CircuitState 熔断器状态
|
|
type CircuitState int
|
|
|
|
const (
|
|
// StateClosed 关闭状态(正常)
|
|
StateClosed CircuitState = iota
|
|
// StateOpen 开启状态(熔断)
|
|
StateOpen
|
|
// StateHalfOpen 半开状态(测试)
|
|
StateHalfOpen
|
|
)
|
|
|
|
func (s CircuitState) String() string {
|
|
switch s {
|
|
case StateClosed:
|
|
return "CLOSED"
|
|
case StateOpen:
|
|
return "OPEN"
|
|
case StateHalfOpen:
|
|
return "HALF_OPEN"
|
|
default:
|
|
return "UNKNOWN"
|
|
}
|
|
}
|
|
|
|
// CircuitBreakerConfig 熔断器配置
|
|
type CircuitBreakerConfig struct {
|
|
// 故障阈值
|
|
FailureThreshold int
|
|
// 重置超时时间
|
|
ResetTimeout time.Duration
|
|
// 检测窗口大小
|
|
WindowSize int
|
|
// 半开状态允许的请求数
|
|
HalfOpenMaxRequests int
|
|
// 成功阈值(半开->关闭)
|
|
SuccessThreshold int
|
|
}
|
|
|
|
// DefaultCircuitBreakerConfig 默认熔断器配置
|
|
func DefaultCircuitBreakerConfig() CircuitBreakerConfig {
|
|
return CircuitBreakerConfig{
|
|
FailureThreshold: 5,
|
|
ResetTimeout: 60 * time.Second,
|
|
WindowSize: 10,
|
|
HalfOpenMaxRequests: 3,
|
|
SuccessThreshold: 2,
|
|
}
|
|
}
|
|
|
|
// CircuitBreaker 熔断器
|
|
type CircuitBreaker struct {
|
|
config CircuitBreakerConfig
|
|
logger *zap.Logger
|
|
mutex sync.RWMutex
|
|
|
|
// 状态
|
|
state CircuitState
|
|
|
|
// 计数器
|
|
failures int
|
|
successes int
|
|
requests int
|
|
consecutiveFailures int
|
|
|
|
// 时间记录
|
|
lastFailTime time.Time
|
|
lastStateChange time.Time
|
|
|
|
// 统计窗口
|
|
window []bool // true=success, false=failure
|
|
windowIndex int
|
|
windowFull bool
|
|
|
|
// 事件回调
|
|
onStateChange func(from, to CircuitState)
|
|
}
|
|
|
|
// NewCircuitBreaker 创建熔断器
|
|
func NewCircuitBreaker(config CircuitBreakerConfig, logger *zap.Logger) *CircuitBreaker {
|
|
cb := &CircuitBreaker{
|
|
config: config,
|
|
logger: logger,
|
|
state: StateClosed,
|
|
window: make([]bool, config.WindowSize),
|
|
lastStateChange: time.Now(),
|
|
}
|
|
|
|
return cb
|
|
}
|
|
|
|
// Execute 执行函数,如果熔断器开启则快速失败
|
|
func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() error) error {
|
|
// 检查是否允许执行
|
|
if !cb.allowRequest() {
|
|
return ErrCircuitBreakerOpen
|
|
}
|
|
|
|
// 执行函数
|
|
start := time.Now()
|
|
err := fn()
|
|
duration := time.Since(start)
|
|
|
|
// 记录结果
|
|
cb.recordResult(err == nil, duration)
|
|
|
|
return err
|
|
}
|
|
|
|
// allowRequest 检查是否允许请求
|
|
func (cb *CircuitBreaker) allowRequest() bool {
|
|
cb.mutex.Lock()
|
|
defer cb.mutex.Unlock()
|
|
|
|
now := time.Now()
|
|
|
|
switch cb.state {
|
|
case StateClosed:
|
|
return true
|
|
|
|
case StateOpen:
|
|
// 检查是否到了重置时间
|
|
if now.Sub(cb.lastStateChange) > cb.config.ResetTimeout {
|
|
cb.setState(StateHalfOpen)
|
|
return true
|
|
}
|
|
return false
|
|
|
|
case StateHalfOpen:
|
|
// 半开状态下限制请求数
|
|
return cb.requests < cb.config.HalfOpenMaxRequests
|
|
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// recordResult 记录执行结果
|
|
func (cb *CircuitBreaker) recordResult(success bool, duration time.Duration) {
|
|
cb.mutex.Lock()
|
|
defer cb.mutex.Unlock()
|
|
|
|
cb.requests++
|
|
|
|
// 更新滑动窗口
|
|
cb.updateWindow(success)
|
|
|
|
if success {
|
|
cb.successes++
|
|
cb.consecutiveFailures = 0
|
|
cb.onSuccess()
|
|
} else {
|
|
cb.failures++
|
|
cb.consecutiveFailures++
|
|
cb.lastFailTime = time.Now()
|
|
cb.onFailure()
|
|
}
|
|
|
|
cb.logger.Debug("Circuit breaker recorded result",
|
|
zap.Bool("success", success),
|
|
zap.Duration("duration", duration),
|
|
zap.String("state", cb.state.String()),
|
|
zap.Int("failures", cb.failures),
|
|
zap.Int("successes", cb.successes))
|
|
}
|
|
|
|
// updateWindow 更新滑动窗口
|
|
func (cb *CircuitBreaker) updateWindow(success bool) {
|
|
cb.window[cb.windowIndex] = success
|
|
cb.windowIndex = (cb.windowIndex + 1) % cb.config.WindowSize
|
|
|
|
if cb.windowIndex == 0 {
|
|
cb.windowFull = true
|
|
}
|
|
}
|
|
|
|
// onSuccess 成功时的处理
|
|
func (cb *CircuitBreaker) onSuccess() {
|
|
if cb.state == StateHalfOpen {
|
|
// 半开状态下,如果成功次数达到阈值,则关闭熔断器
|
|
if cb.successes >= cb.config.SuccessThreshold {
|
|
cb.setState(StateClosed)
|
|
}
|
|
}
|
|
}
|
|
|
|
// onFailure 失败时的处理
|
|
func (cb *CircuitBreaker) onFailure() {
|
|
if cb.state == StateClosed {
|
|
// 关闭状态下,检查是否需要开启熔断器
|
|
if cb.shouldTrip() {
|
|
cb.setState(StateOpen)
|
|
}
|
|
} else if cb.state == StateHalfOpen {
|
|
// 半开状态下,如果失败则立即开启熔断器
|
|
cb.setState(StateOpen)
|
|
}
|
|
}
|
|
|
|
// shouldTrip 检查是否应该触发熔断
|
|
func (cb *CircuitBreaker) shouldTrip() bool {
|
|
// 基于连续失败次数
|
|
if cb.consecutiveFailures >= cb.config.FailureThreshold {
|
|
return true
|
|
}
|
|
|
|
// 基于滑动窗口的失败率
|
|
if cb.windowFull {
|
|
failures := 0
|
|
for _, success := range cb.window {
|
|
if !success {
|
|
failures++
|
|
}
|
|
}
|
|
|
|
failureRate := float64(failures) / float64(cb.config.WindowSize)
|
|
return failureRate >= 0.5 // 50%失败率
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// setState 设置状态
|
|
func (cb *CircuitBreaker) setState(newState CircuitState) {
|
|
if cb.state == newState {
|
|
return
|
|
}
|
|
|
|
oldState := cb.state
|
|
cb.state = newState
|
|
cb.lastStateChange = time.Now()
|
|
|
|
// 重置计数器
|
|
if newState == StateClosed {
|
|
cb.requests = 0
|
|
cb.failures = 0
|
|
cb.successes = 0
|
|
cb.consecutiveFailures = 0
|
|
} else if newState == StateHalfOpen {
|
|
cb.requests = 0
|
|
cb.successes = 0
|
|
}
|
|
|
|
cb.logger.Info("Circuit breaker state changed",
|
|
zap.String("from", oldState.String()),
|
|
zap.String("to", newState.String()),
|
|
zap.Int("failures", cb.failures),
|
|
zap.Int("successes", cb.successes))
|
|
|
|
// 触发状态变更回调
|
|
if cb.onStateChange != nil {
|
|
cb.onStateChange(oldState, newState)
|
|
}
|
|
}
|
|
|
|
// GetState 获取当前状态
|
|
func (cb *CircuitBreaker) GetState() CircuitState {
|
|
cb.mutex.RLock()
|
|
defer cb.mutex.RUnlock()
|
|
return cb.state
|
|
}
|
|
|
|
// GetStats 获取统计信息
|
|
func (cb *CircuitBreaker) GetStats() CircuitBreakerStats {
|
|
cb.mutex.RLock()
|
|
defer cb.mutex.RUnlock()
|
|
|
|
return CircuitBreakerStats{
|
|
State: cb.state.String(),
|
|
Failures: cb.failures,
|
|
Successes: cb.successes,
|
|
Requests: cb.requests,
|
|
ConsecutiveFailures: cb.consecutiveFailures,
|
|
LastFailTime: cb.lastFailTime,
|
|
LastStateChange: cb.lastStateChange,
|
|
FailureThreshold: cb.config.FailureThreshold,
|
|
ResetTimeout: cb.config.ResetTimeout,
|
|
}
|
|
}
|
|
|
|
// Reset 重置熔断器
|
|
func (cb *CircuitBreaker) Reset() {
|
|
cb.mutex.Lock()
|
|
defer cb.mutex.Unlock()
|
|
|
|
cb.setState(StateClosed)
|
|
cb.window = make([]bool, cb.config.WindowSize)
|
|
cb.windowIndex = 0
|
|
cb.windowFull = false
|
|
|
|
cb.logger.Info("Circuit breaker reset")
|
|
}
|
|
|
|
// SetStateChangeCallback 设置状态变更回调
|
|
func (cb *CircuitBreaker) SetStateChangeCallback(callback func(from, to CircuitState)) {
|
|
cb.mutex.Lock()
|
|
defer cb.mutex.Unlock()
|
|
cb.onStateChange = callback
|
|
}
|
|
|
|
// CircuitBreakerStats 熔断器统计信息
|
|
type CircuitBreakerStats struct {
|
|
State string `json:"state"`
|
|
Failures int `json:"failures"`
|
|
Successes int `json:"successes"`
|
|
Requests int `json:"requests"`
|
|
ConsecutiveFailures int `json:"consecutive_failures"`
|
|
LastFailTime time.Time `json:"last_fail_time"`
|
|
LastStateChange time.Time `json:"last_state_change"`
|
|
FailureThreshold int `json:"failure_threshold"`
|
|
ResetTimeout time.Duration `json:"reset_timeout"`
|
|
}
|
|
|
|
// 预定义错误
|
|
var (
|
|
ErrCircuitBreakerOpen = errors.New("circuit breaker is open")
|
|
)
|
|
|
|
// Wrapper 熔断器包装器
|
|
type Wrapper struct {
|
|
breakers map[string]*CircuitBreaker
|
|
logger *zap.Logger
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewWrapper 创建熔断器包装器
|
|
func NewWrapper(logger *zap.Logger) *Wrapper {
|
|
return &Wrapper{
|
|
breakers: make(map[string]*CircuitBreaker),
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// GetOrCreate 获取或创建熔断器
|
|
func (w *Wrapper) GetOrCreate(name string, config CircuitBreakerConfig) *CircuitBreaker {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
if cb, exists := w.breakers[name]; exists {
|
|
return cb
|
|
}
|
|
|
|
cb := NewCircuitBreaker(config, w.logger.Named(name))
|
|
w.breakers[name] = cb
|
|
|
|
w.logger.Info("Created circuit breaker", zap.String("name", name))
|
|
return cb
|
|
}
|
|
|
|
// Execute 执行带熔断器的函数
|
|
func (w *Wrapper) Execute(ctx context.Context, name string, fn func() error) error {
|
|
cb := w.GetOrCreate(name, DefaultCircuitBreakerConfig())
|
|
return cb.Execute(ctx, fn)
|
|
}
|
|
|
|
// GetStats 获取所有熔断器统计
|
|
func (w *Wrapper) GetStats() map[string]CircuitBreakerStats {
|
|
w.mutex.RLock()
|
|
defer w.mutex.RUnlock()
|
|
|
|
stats := make(map[string]CircuitBreakerStats)
|
|
for name, cb := range w.breakers {
|
|
stats[name] = cb.GetStats()
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// ResetAll 重置所有熔断器
|
|
func (w *Wrapper) ResetAll() {
|
|
w.mutex.RLock()
|
|
defer w.mutex.RUnlock()
|
|
|
|
for name, cb := range w.breakers {
|
|
cb.Reset()
|
|
w.logger.Info("Reset circuit breaker", zap.String("name", name))
|
|
}
|
|
}
|