390 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			390 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|  | package resilience | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"context" | ||
|  | 	"errors" | ||
|  | 	"sync" | ||
|  | 	"time" | ||
|  | 
 | ||
|  | 	"go.uber.org/zap" | ||
|  | ) | ||
|  | 
 | ||
|  | // CircuitState 熔断器状态 | ||
|  | type CircuitState int | ||
|  | 
 | ||
|  | const ( | ||
|  | 	// StateClosed 关闭状态(正常) | ||
|  | 	StateClosed CircuitState = iota | ||
|  | 	// StateOpen 开启状态(熔断) | ||
|  | 	StateOpen | ||
|  | 	// StateHalfOpen 半开状态(测试) | ||
|  | 	StateHalfOpen | ||
|  | ) | ||
|  | 
 | ||
|  | func (s CircuitState) String() string { | ||
|  | 	switch s { | ||
|  | 	case StateClosed: | ||
|  | 		return "CLOSED" | ||
|  | 	case StateOpen: | ||
|  | 		return "OPEN" | ||
|  | 	case StateHalfOpen: | ||
|  | 		return "HALF_OPEN" | ||
|  | 	default: | ||
|  | 		return "UNKNOWN" | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // CircuitBreakerConfig 熔断器配置 | ||
|  | type CircuitBreakerConfig struct { | ||
|  | 	// 故障阈值 | ||
|  | 	FailureThreshold int | ||
|  | 	// 重置超时时间 | ||
|  | 	ResetTimeout time.Duration | ||
|  | 	// 检测窗口大小 | ||
|  | 	WindowSize int | ||
|  | 	// 半开状态允许的请求数 | ||
|  | 	HalfOpenMaxRequests int | ||
|  | 	// 成功阈值(半开->关闭) | ||
|  | 	SuccessThreshold int | ||
|  | } | ||
|  | 
 | ||
|  | // DefaultCircuitBreakerConfig 默认熔断器配置 | ||
|  | func DefaultCircuitBreakerConfig() CircuitBreakerConfig { | ||
|  | 	return CircuitBreakerConfig{ | ||
|  | 		FailureThreshold:    5, | ||
|  | 		ResetTimeout:        60 * time.Second, | ||
|  | 		WindowSize:          10, | ||
|  | 		HalfOpenMaxRequests: 3, | ||
|  | 		SuccessThreshold:    2, | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // CircuitBreaker 熔断器 | ||
|  | type CircuitBreaker struct { | ||
|  | 	config CircuitBreakerConfig | ||
|  | 	logger *zap.Logger | ||
|  | 	mutex  sync.RWMutex | ||
|  | 
 | ||
|  | 	// 状态 | ||
|  | 	state CircuitState | ||
|  | 
 | ||
|  | 	// 计数器 | ||
|  | 	failures            int | ||
|  | 	successes           int | ||
|  | 	requests            int | ||
|  | 	consecutiveFailures int | ||
|  | 
 | ||
|  | 	// 时间记录 | ||
|  | 	lastFailTime    time.Time | ||
|  | 	lastStateChange time.Time | ||
|  | 
 | ||
|  | 	// 统计窗口 | ||
|  | 	window      []bool // true=success, false=failure | ||
|  | 	windowIndex int | ||
|  | 	windowFull  bool | ||
|  | 
 | ||
|  | 	// 事件回调 | ||
|  | 	onStateChange func(from, to CircuitState) | ||
|  | } | ||
|  | 
 | ||
|  | // NewCircuitBreaker 创建熔断器 | ||
|  | func NewCircuitBreaker(config CircuitBreakerConfig, logger *zap.Logger) *CircuitBreaker { | ||
|  | 	cb := &CircuitBreaker{ | ||
|  | 		config:          config, | ||
|  | 		logger:          logger, | ||
|  | 		state:           StateClosed, | ||
|  | 		window:          make([]bool, config.WindowSize), | ||
|  | 		lastStateChange: time.Now(), | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return cb | ||
|  | } | ||
|  | 
 | ||
|  | // Execute 执行函数,如果熔断器开启则快速失败 | ||
|  | func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() error) error { | ||
|  | 	// 检查是否允许执行 | ||
|  | 	if !cb.allowRequest() { | ||
|  | 		return ErrCircuitBreakerOpen | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// 执行函数 | ||
|  | 	start := time.Now() | ||
|  | 	err := fn() | ||
|  | 	duration := time.Since(start) | ||
|  | 
 | ||
|  | 	// 记录结果 | ||
|  | 	cb.recordResult(err == nil, duration) | ||
|  | 
 | ||
|  | 	return err | ||
|  | } | ||
|  | 
 | ||
|  | // allowRequest 检查是否允许请求 | ||
|  | func (cb *CircuitBreaker) allowRequest() bool { | ||
|  | 	cb.mutex.Lock() | ||
|  | 	defer cb.mutex.Unlock() | ||
|  | 
 | ||
|  | 	now := time.Now() | ||
|  | 
 | ||
|  | 	switch cb.state { | ||
|  | 	case StateClosed: | ||
|  | 		return true | ||
|  | 
 | ||
|  | 	case StateOpen: | ||
|  | 		// 检查是否到了重置时间 | ||
|  | 		if now.Sub(cb.lastStateChange) > cb.config.ResetTimeout { | ||
|  | 			cb.setState(StateHalfOpen) | ||
|  | 			return true | ||
|  | 		} | ||
|  | 		return false | ||
|  | 
 | ||
|  | 	case StateHalfOpen: | ||
|  | 		// 半开状态下限制请求数 | ||
|  | 		return cb.requests < cb.config.HalfOpenMaxRequests | ||
|  | 
 | ||
|  | 	default: | ||
|  | 		return false | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // recordResult 记录执行结果 | ||
|  | func (cb *CircuitBreaker) recordResult(success bool, duration time.Duration) { | ||
|  | 	cb.mutex.Lock() | ||
|  | 	defer cb.mutex.Unlock() | ||
|  | 
 | ||
|  | 	cb.requests++ | ||
|  | 
 | ||
|  | 	// 更新滑动窗口 | ||
|  | 	cb.updateWindow(success) | ||
|  | 
 | ||
|  | 	if success { | ||
|  | 		cb.successes++ | ||
|  | 		cb.consecutiveFailures = 0 | ||
|  | 		cb.onSuccess() | ||
|  | 	} else { | ||
|  | 		cb.failures++ | ||
|  | 		cb.consecutiveFailures++ | ||
|  | 		cb.lastFailTime = time.Now() | ||
|  | 		cb.onFailure() | ||
|  | 	} | ||
|  | 
 | ||
|  | 	cb.logger.Debug("Circuit breaker recorded result", | ||
|  | 		zap.Bool("success", success), | ||
|  | 		zap.Duration("duration", duration), | ||
|  | 		zap.String("state", cb.state.String()), | ||
|  | 		zap.Int("failures", cb.failures), | ||
|  | 		zap.Int("successes", cb.successes)) | ||
|  | } | ||
|  | 
 | ||
|  | // updateWindow 更新滑动窗口 | ||
|  | func (cb *CircuitBreaker) updateWindow(success bool) { | ||
|  | 	cb.window[cb.windowIndex] = success | ||
|  | 	cb.windowIndex = (cb.windowIndex + 1) % cb.config.WindowSize | ||
|  | 
 | ||
|  | 	if cb.windowIndex == 0 { | ||
|  | 		cb.windowFull = true | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // onSuccess 成功时的处理 | ||
|  | func (cb *CircuitBreaker) onSuccess() { | ||
|  | 	if cb.state == StateHalfOpen { | ||
|  | 		// 半开状态下,如果成功次数达到阈值,则关闭熔断器 | ||
|  | 		if cb.successes >= cb.config.SuccessThreshold { | ||
|  | 			cb.setState(StateClosed) | ||
|  | 		} | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // onFailure 失败时的处理 | ||
|  | func (cb *CircuitBreaker) onFailure() { | ||
|  | 	if cb.state == StateClosed { | ||
|  | 		// 关闭状态下,检查是否需要开启熔断器 | ||
|  | 		if cb.shouldTrip() { | ||
|  | 			cb.setState(StateOpen) | ||
|  | 		} | ||
|  | 	} else if cb.state == StateHalfOpen { | ||
|  | 		// 半开状态下,如果失败则立即开启熔断器 | ||
|  | 		cb.setState(StateOpen) | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // shouldTrip 检查是否应该触发熔断 | ||
|  | func (cb *CircuitBreaker) shouldTrip() bool { | ||
|  | 	// 基于连续失败次数 | ||
|  | 	if cb.consecutiveFailures >= cb.config.FailureThreshold { | ||
|  | 		return true | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// 基于滑动窗口的失败率 | ||
|  | 	if cb.windowFull { | ||
|  | 		failures := 0 | ||
|  | 		for _, success := range cb.window { | ||
|  | 			if !success { | ||
|  | 				failures++ | ||
|  | 			} | ||
|  | 		} | ||
|  | 
 | ||
|  | 		failureRate := float64(failures) / float64(cb.config.WindowSize) | ||
|  | 		return failureRate >= 0.5 // 50%失败率 | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return false | ||
|  | } | ||
|  | 
 | ||
|  | // setState 设置状态 | ||
|  | func (cb *CircuitBreaker) setState(newState CircuitState) { | ||
|  | 	if cb.state == newState { | ||
|  | 		return | ||
|  | 	} | ||
|  | 
 | ||
|  | 	oldState := cb.state | ||
|  | 	cb.state = newState | ||
|  | 	cb.lastStateChange = time.Now() | ||
|  | 
 | ||
|  | 	// 重置计数器 | ||
|  | 	if newState == StateClosed { | ||
|  | 		cb.requests = 0 | ||
|  | 		cb.failures = 0 | ||
|  | 		cb.successes = 0 | ||
|  | 		cb.consecutiveFailures = 0 | ||
|  | 	} else if newState == StateHalfOpen { | ||
|  | 		cb.requests = 0 | ||
|  | 		cb.successes = 0 | ||
|  | 	} | ||
|  | 
 | ||
|  | 	cb.logger.Info("Circuit breaker state changed", | ||
|  | 		zap.String("from", oldState.String()), | ||
|  | 		zap.String("to", newState.String()), | ||
|  | 		zap.Int("failures", cb.failures), | ||
|  | 		zap.Int("successes", cb.successes)) | ||
|  | 
 | ||
|  | 	// 触发状态变更回调 | ||
|  | 	if cb.onStateChange != nil { | ||
|  | 		cb.onStateChange(oldState, newState) | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // GetState 获取当前状态 | ||
|  | func (cb *CircuitBreaker) GetState() CircuitState { | ||
|  | 	cb.mutex.RLock() | ||
|  | 	defer cb.mutex.RUnlock() | ||
|  | 	return cb.state | ||
|  | } | ||
|  | 
 | ||
|  | // GetStats 获取统计信息 | ||
|  | func (cb *CircuitBreaker) GetStats() CircuitBreakerStats { | ||
|  | 	cb.mutex.RLock() | ||
|  | 	defer cb.mutex.RUnlock() | ||
|  | 
 | ||
|  | 	return CircuitBreakerStats{ | ||
|  | 		State:               cb.state.String(), | ||
|  | 		Failures:            cb.failures, | ||
|  | 		Successes:           cb.successes, | ||
|  | 		Requests:            cb.requests, | ||
|  | 		ConsecutiveFailures: cb.consecutiveFailures, | ||
|  | 		LastFailTime:        cb.lastFailTime, | ||
|  | 		LastStateChange:     cb.lastStateChange, | ||
|  | 		FailureThreshold:    cb.config.FailureThreshold, | ||
|  | 		ResetTimeout:        cb.config.ResetTimeout, | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // Reset 重置熔断器 | ||
|  | func (cb *CircuitBreaker) Reset() { | ||
|  | 	cb.mutex.Lock() | ||
|  | 	defer cb.mutex.Unlock() | ||
|  | 
 | ||
|  | 	cb.setState(StateClosed) | ||
|  | 	cb.window = make([]bool, cb.config.WindowSize) | ||
|  | 	cb.windowIndex = 0 | ||
|  | 	cb.windowFull = false | ||
|  | 
 | ||
|  | 	cb.logger.Info("Circuit breaker reset") | ||
|  | } | ||
|  | 
 | ||
|  | // SetStateChangeCallback 设置状态变更回调 | ||
|  | func (cb *CircuitBreaker) SetStateChangeCallback(callback func(from, to CircuitState)) { | ||
|  | 	cb.mutex.Lock() | ||
|  | 	defer cb.mutex.Unlock() | ||
|  | 	cb.onStateChange = callback | ||
|  | } | ||
|  | 
 | ||
|  | // CircuitBreakerStats 熔断器统计信息 | ||
|  | type CircuitBreakerStats struct { | ||
|  | 	State               string        `json:"state"` | ||
|  | 	Failures            int           `json:"failures"` | ||
|  | 	Successes           int           `json:"successes"` | ||
|  | 	Requests            int           `json:"requests"` | ||
|  | 	ConsecutiveFailures int           `json:"consecutive_failures"` | ||
|  | 	LastFailTime        time.Time     `json:"last_fail_time"` | ||
|  | 	LastStateChange     time.Time     `json:"last_state_change"` | ||
|  | 	FailureThreshold    int           `json:"failure_threshold"` | ||
|  | 	ResetTimeout        time.Duration `json:"reset_timeout"` | ||
|  | } | ||
|  | 
 | ||
|  | // 预定义错误 | ||
|  | var ( | ||
|  | 	ErrCircuitBreakerOpen = errors.New("circuit breaker is open") | ||
|  | ) | ||
|  | 
 | ||
|  | // Wrapper 熔断器包装器 | ||
|  | type Wrapper struct { | ||
|  | 	breakers map[string]*CircuitBreaker | ||
|  | 	logger   *zap.Logger | ||
|  | 	mutex    sync.RWMutex | ||
|  | } | ||
|  | 
 | ||
|  | // NewWrapper 创建熔断器包装器 | ||
|  | func NewWrapper(logger *zap.Logger) *Wrapper { | ||
|  | 	return &Wrapper{ | ||
|  | 		breakers: make(map[string]*CircuitBreaker), | ||
|  | 		logger:   logger, | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // GetOrCreate 获取或创建熔断器 | ||
|  | func (w *Wrapper) GetOrCreate(name string, config CircuitBreakerConfig) *CircuitBreaker { | ||
|  | 	w.mutex.Lock() | ||
|  | 	defer w.mutex.Unlock() | ||
|  | 
 | ||
|  | 	if cb, exists := w.breakers[name]; exists { | ||
|  | 		return cb | ||
|  | 	} | ||
|  | 
 | ||
|  | 	cb := NewCircuitBreaker(config, w.logger.Named(name)) | ||
|  | 	w.breakers[name] = cb | ||
|  | 
 | ||
|  | 	w.logger.Info("Created circuit breaker", zap.String("name", name)) | ||
|  | 	return cb | ||
|  | } | ||
|  | 
 | ||
|  | // Execute 执行带熔断器的函数 | ||
|  | func (w *Wrapper) Execute(ctx context.Context, name string, fn func() error) error { | ||
|  | 	cb := w.GetOrCreate(name, DefaultCircuitBreakerConfig()) | ||
|  | 	return cb.Execute(ctx, fn) | ||
|  | } | ||
|  | 
 | ||
|  | // GetStats 获取所有熔断器统计 | ||
|  | func (w *Wrapper) GetStats() map[string]CircuitBreakerStats { | ||
|  | 	w.mutex.RLock() | ||
|  | 	defer w.mutex.RUnlock() | ||
|  | 
 | ||
|  | 	stats := make(map[string]CircuitBreakerStats) | ||
|  | 	for name, cb := range w.breakers { | ||
|  | 		stats[name] = cb.GetStats() | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return stats | ||
|  | } | ||
|  | 
 | ||
|  | // ResetAll 重置所有熔断器 | ||
|  | func (w *Wrapper) ResetAll() { | ||
|  | 	w.mutex.RLock() | ||
|  | 	defer w.mutex.RUnlock() | ||
|  | 
 | ||
|  | 	for name, cb := range w.breakers { | ||
|  | 		cb.Reset() | ||
|  | 		w.logger.Info("Reset circuit breaker", zap.String("name", name)) | ||
|  | 	} | ||
|  | } |