Files
tyapi-server/internal/shared/saga/saga.go

731 lines
16 KiB
Go
Raw Normal View History

2025-07-02 16:17:59 +08:00
package saga
import (
"context"
"fmt"
"sync"
"time"
"go.uber.org/zap"
"tyapi-server/internal/shared/interfaces"
)
// SagaStatus Saga状态
type SagaStatus int
const (
// StatusPending 等待中
StatusPending SagaStatus = iota
// StatusRunning 执行中
StatusRunning
// StatusCompleted 已完成
StatusCompleted
// StatusFailed 失败
StatusFailed
// StatusCompensating 补偿中
StatusCompensating
// StatusCompensated 已补偿
StatusCompensated
// StatusAborted 已中止
StatusAborted
)
func (s SagaStatus) String() string {
switch s {
case StatusPending:
return "PENDING"
case StatusRunning:
return "RUNNING"
case StatusCompleted:
return "COMPLETED"
case StatusFailed:
return "FAILED"
case StatusCompensating:
return "COMPENSATING"
case StatusCompensated:
return "COMPENSATED"
case StatusAborted:
return "ABORTED"
default:
return "UNKNOWN"
}
}
// StepStatus 步骤状态
type StepStatus int
const (
// StepPending 等待执行
StepPending StepStatus = iota
// StepRunning 执行中
StepRunning
// StepCompleted 完成
StepCompleted
// StepFailed 失败
StepFailed
// StepCompensated 已补偿
StepCompensated
// StepSkipped 跳过
StepSkipped
)
func (s StepStatus) String() string {
switch s {
case StepPending:
return "PENDING"
case StepRunning:
return "RUNNING"
case StepCompleted:
return "COMPLETED"
case StepFailed:
return "FAILED"
case StepCompensated:
return "COMPENSATED"
case StepSkipped:
return "SKIPPED"
default:
return "UNKNOWN"
}
}
// SagaStep Saga步骤
type SagaStep struct {
Name string
Action func(ctx context.Context, data interface{}) error
Compensate func(ctx context.Context, data interface{}) error
Status StepStatus
Error error
StartTime time.Time
EndTime time.Time
RetryCount int
MaxRetries int
Timeout time.Duration
}
// SagaConfig Saga配置
type SagaConfig struct {
// 默认超时时间
DefaultTimeout time.Duration
// 默认重试次数
DefaultMaxRetries int
// 是否并行执行(当前只支持串行)
Parallel bool
// 事件发布器
EventBus interfaces.EventBus
}
// DefaultSagaConfig 默认Saga配置
func DefaultSagaConfig() SagaConfig {
return SagaConfig{
DefaultTimeout: 30 * time.Second,
DefaultMaxRetries: 3,
Parallel: false,
}
}
// Saga 分布式事务
type Saga struct {
ID string
Name string
Steps []*SagaStep
Status SagaStatus
Data interface{}
StartTime time.Time
EndTime time.Time
Error error
Config SagaConfig
logger *zap.Logger
mutex sync.RWMutex
currentStep int
result interface{}
}
// NewSaga 创建新的Saga
func NewSaga(id, name string, config SagaConfig, logger *zap.Logger) *Saga {
return &Saga{
ID: id,
Name: name,
Steps: make([]*SagaStep, 0),
Status: StatusPending,
Config: config,
logger: logger,
currentStep: -1,
}
}
// AddStep 添加步骤
func (s *Saga) AddStep(name string, action, compensate func(ctx context.Context, data interface{}) error) *Saga {
step := &SagaStep{
Name: name,
Action: action,
Compensate: compensate,
Status: StepPending,
MaxRetries: s.Config.DefaultMaxRetries,
Timeout: s.Config.DefaultTimeout,
}
s.mutex.Lock()
s.Steps = append(s.Steps, step)
s.mutex.Unlock()
s.logger.Debug("Added step to saga",
zap.String("saga_id", s.ID),
zap.String("step_name", name))
return s
}
// AddStepWithConfig 添加带配置的步骤
func (s *Saga) AddStepWithConfig(name string, action, compensate func(ctx context.Context, data interface{}) error, maxRetries int, timeout time.Duration) *Saga {
step := &SagaStep{
Name: name,
Action: action,
Compensate: compensate,
Status: StepPending,
MaxRetries: maxRetries,
Timeout: timeout,
}
s.mutex.Lock()
s.Steps = append(s.Steps, step)
s.mutex.Unlock()
s.logger.Debug("Added step with config to saga",
zap.String("saga_id", s.ID),
zap.String("step_name", name),
zap.Int("max_retries", maxRetries),
zap.Duration("timeout", timeout))
return s
}
// Execute 执行Saga
func (s *Saga) Execute(ctx context.Context, data interface{}) error {
s.mutex.Lock()
if s.Status != StatusPending {
s.mutex.Unlock()
return fmt.Errorf("saga %s is not in pending status", s.ID)
}
s.Status = StatusRunning
s.Data = data
s.StartTime = time.Now()
s.mutex.Unlock()
s.logger.Info("Starting saga execution",
zap.String("saga_id", s.ID),
zap.String("saga_name", s.Name),
zap.Int("total_steps", len(s.Steps)))
// 发布Saga开始事件
s.publishEvent(ctx, "saga.started")
// 执行所有步骤
for i, step := range s.Steps {
s.mutex.Lock()
s.currentStep = i
s.mutex.Unlock()
if err := s.executeStep(ctx, step, data); err != nil {
s.logger.Error("Step execution failed",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name),
zap.Error(err))
// 执行补偿
if compensateErr := s.compensate(ctx, i-1); compensateErr != nil {
s.logger.Error("Compensation failed",
zap.String("saga_id", s.ID),
zap.Error(compensateErr))
s.setStatus(StatusAborted)
s.publishEvent(ctx, "saga.aborted")
return fmt.Errorf("saga execution failed and compensation failed: %w", compensateErr)
}
s.setStatus(StatusCompensated)
s.publishEvent(ctx, "saga.compensated")
return fmt.Errorf("saga execution failed: %w", err)
}
}
// 所有步骤成功完成
s.setStatus(StatusCompleted)
s.EndTime = time.Now()
s.logger.Info("Saga completed successfully",
zap.String("saga_id", s.ID),
zap.Duration("duration", s.EndTime.Sub(s.StartTime)))
s.publishEvent(ctx, "saga.completed")
return nil
}
// executeStep 执行单个步骤
func (s *Saga) executeStep(ctx context.Context, step *SagaStep, data interface{}) error {
step.Status = StepRunning
step.StartTime = time.Now()
s.logger.Debug("Executing step",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name))
// 设置超时上下文
stepCtx, cancel := context.WithTimeout(ctx, step.Timeout)
defer cancel()
// 重试逻辑
var lastErr error
for attempt := 0; attempt <= step.MaxRetries; attempt++ {
if attempt > 0 {
s.logger.Debug("Retrying step",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name),
zap.Int("attempt", attempt))
}
err := step.Action(stepCtx, data)
if err == nil {
step.Status = StepCompleted
step.EndTime = time.Now()
s.logger.Debug("Step completed successfully",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name),
zap.Duration("duration", step.EndTime.Sub(step.StartTime)))
return nil
}
lastErr = err
step.RetryCount = attempt
// 检查是否应该重试
if attempt < step.MaxRetries {
select {
case <-stepCtx.Done():
// 上下文被取消,停止重试
break
case <-time.After(time.Duration(attempt+1) * 100 * time.Millisecond):
// 等待一段时间后重试
}
}
}
// 所有重试都失败了
step.Status = StepFailed
step.Error = lastErr
step.EndTime = time.Now()
return lastErr
}
// compensate 执行补偿
func (s *Saga) compensate(ctx context.Context, fromStep int) error {
s.setStatus(StatusCompensating)
s.logger.Info("Starting compensation",
zap.String("saga_id", s.ID),
zap.Int("from_step", fromStep))
// 逆序执行补偿
for i := fromStep; i >= 0; i-- {
step := s.Steps[i]
// 只补偿已完成的步骤
if step.Status != StepCompleted {
step.Status = StepSkipped
continue
}
if step.Compensate == nil {
s.logger.Warn("No compensation function for step",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name))
continue
}
s.logger.Debug("Compensating step",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name))
// 设置超时上下文
compensateCtx, cancel := context.WithTimeout(ctx, step.Timeout)
err := step.Compensate(compensateCtx, s.Data)
cancel()
if err != nil {
s.logger.Error("Compensation failed for step",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name),
zap.Error(err))
return err
}
step.Status = StepCompensated
s.logger.Debug("Step compensated successfully",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name))
}
s.logger.Info("Compensation completed",
zap.String("saga_id", s.ID))
return nil
}
// setStatus 设置状态
func (s *Saga) setStatus(status SagaStatus) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.Status = status
}
// GetStatus 获取状态
func (s *Saga) GetStatus() SagaStatus {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.Status
}
// GetProgress 获取进度
func (s *Saga) GetProgress() SagaProgress {
s.mutex.RLock()
defer s.mutex.RUnlock()
completed := 0
for _, step := range s.Steps {
if step.Status == StepCompleted {
completed++
}
}
var percentage float64
if len(s.Steps) > 0 {
percentage = float64(completed) / float64(len(s.Steps)) * 100
}
return SagaProgress{
SagaID: s.ID,
Status: s.Status.String(),
TotalSteps: len(s.Steps),
CompletedSteps: completed,
CurrentStep: s.currentStep + 1,
PercentComplete: percentage,
StartTime: s.StartTime,
Duration: time.Since(s.StartTime),
}
}
// GetStepStatus 获取所有步骤状态
func (s *Saga) GetStepStatus() []StepProgress {
s.mutex.RLock()
defer s.mutex.RUnlock()
progress := make([]StepProgress, len(s.Steps))
for i, step := range s.Steps {
progress[i] = StepProgress{
Name: step.Name,
Status: step.Status.String(),
RetryCount: step.RetryCount,
StartTime: step.StartTime,
EndTime: step.EndTime,
Duration: step.EndTime.Sub(step.StartTime),
Error: "",
}
if step.Error != nil {
progress[i].Error = step.Error.Error()
}
}
return progress
}
// publishEvent 发布事件
func (s *Saga) publishEvent(ctx context.Context, eventType string) {
if s.Config.EventBus == nil {
return
}
event := &SagaEvent{
SagaID: s.ID,
SagaName: s.Name,
EventType: eventType,
Status: s.Status.String(),
Timestamp: time.Now(),
Data: s.Data,
}
// 这里应该实现Event接口简化处理
_ = event
}
// SagaProgress Saga进度
type SagaProgress struct {
SagaID string `json:"saga_id"`
Status string `json:"status"`
TotalSteps int `json:"total_steps"`
CompletedSteps int `json:"completed_steps"`
CurrentStep int `json:"current_step"`
PercentComplete float64 `json:"percent_complete"`
StartTime time.Time `json:"start_time"`
Duration time.Duration `json:"duration"`
}
// StepProgress 步骤进度
type StepProgress struct {
Name string `json:"name"`
Status string `json:"status"`
RetryCount int `json:"retry_count"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
Error string `json:"error,omitempty"`
}
// SagaEvent Saga事件
type SagaEvent struct {
SagaID string `json:"saga_id"`
SagaName string `json:"saga_name"`
EventType string `json:"event_type"`
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
Data interface{} `json:"data,omitempty"`
}
// SagaManager Saga管理器
type SagaManager struct {
sagas map[string]*Saga
logger *zap.Logger
mutex sync.RWMutex
config SagaConfig
}
// NewSagaManager 创建Saga管理器
func NewSagaManager(config SagaConfig, logger *zap.Logger) *SagaManager {
return &SagaManager{
sagas: make(map[string]*Saga),
logger: logger,
config: config,
}
}
// CreateSaga 创建Saga
func (sm *SagaManager) CreateSaga(id, name string) *Saga {
saga := NewSaga(id, name, sm.config, sm.logger.Named("saga"))
sm.mutex.Lock()
sm.sagas[id] = saga
sm.mutex.Unlock()
sm.logger.Info("Created saga",
zap.String("saga_id", id),
zap.String("saga_name", name))
return saga
}
// GetSaga 获取Saga
func (sm *SagaManager) GetSaga(id string) (*Saga, bool) {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
saga, exists := sm.sagas[id]
return saga, exists
}
// ListSagas 列出所有Saga
func (sm *SagaManager) ListSagas() []*Saga {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
sagas := make([]*Saga, 0, len(sm.sagas))
for _, saga := range sm.sagas {
sagas = append(sagas, saga)
}
return sagas
}
// GetSagaProgress 获取Saga进度
func (sm *SagaManager) GetSagaProgress(id string) (SagaProgress, bool) {
saga, exists := sm.GetSaga(id)
if !exists {
return SagaProgress{}, false
}
return saga.GetProgress(), true
}
// RemoveSaga 移除Saga
func (sm *SagaManager) RemoveSaga(id string) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
delete(sm.sagas, id)
sm.logger.Debug("Removed saga", zap.String("saga_id", id))
}
// GetStats 获取统计信息
func (sm *SagaManager) GetStats() map[string]interface{} {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
statusCount := make(map[string]int)
for _, saga := range sm.sagas {
status := saga.GetStatus().String()
statusCount[status]++
}
return map[string]interface{}{
"total_sagas": len(sm.sagas),
"status_count": statusCount,
}
}
// 实现Service接口
// Name 返回服务名称
func (sm *SagaManager) Name() string {
return "saga-manager"
}
// Initialize 初始化服务
func (sm *SagaManager) Initialize(ctx context.Context) error {
sm.logger.Info("Saga manager service initialized")
return nil
}
// HealthCheck 健康检查
func (sm *SagaManager) HealthCheck(ctx context.Context) error {
return nil
}
// Shutdown 关闭服务
func (sm *SagaManager) Shutdown(ctx context.Context) error {
sm.logger.Info("Saga manager service shutdown")
return nil
}
2025-07-20 20:53:26 +08:00
// ==================== Saga构建器 ====================
// StepBuilder Saga步骤构建器
type StepBuilder struct {
name string
action func(ctx context.Context, data interface{}) error
compensate func(ctx context.Context, data interface{}) error
timeout time.Duration
maxRetries int
}
// Step 创建步骤构建器
func Step(name string) *StepBuilder {
return &StepBuilder{
name: name,
timeout: 30 * time.Second,
maxRetries: 3,
}
}
// Action 设置正向操作
func (sb *StepBuilder) Action(action func(ctx context.Context, data interface{}) error) *StepBuilder {
sb.action = action
return sb
}
// Compensate 设置补偿操作
func (sb *StepBuilder) Compensate(compensate func(ctx context.Context, data interface{}) error) *StepBuilder {
sb.compensate = compensate
return sb
}
// Timeout 设置超时时间
func (sb *StepBuilder) Timeout(timeout time.Duration) *StepBuilder {
sb.timeout = timeout
return sb
}
// MaxRetries 设置最大重试次数
func (sb *StepBuilder) MaxRetries(maxRetries int) *StepBuilder {
sb.maxRetries = maxRetries
return sb
}
// Build 构建Saga步骤
func (sb *StepBuilder) Build() *SagaStep {
return &SagaStep{
Name: sb.name,
Action: sb.action,
Compensate: sb.compensate,
Status: StepPending,
MaxRetries: sb.maxRetries,
Timeout: sb.timeout,
}
}
// SagaBuilder Saga构建器
type SagaBuilder struct {
manager *SagaManager
saga *Saga
steps []*SagaStep
}
// NewSagaBuilder 创建Saga构建器
func NewSagaBuilder(manager *SagaManager, id, name string) *SagaBuilder {
saga := manager.CreateSaga(id, name)
return &SagaBuilder{
manager: manager,
saga: saga,
steps: make([]*SagaStep, 0),
}
}
// AddStep 添加步骤
func (sb *SagaBuilder) AddStep(step *SagaStep) *SagaBuilder {
sb.steps = append(sb.steps, step)
sb.saga.AddStepWithConfig(step.Name, step.Action, step.Compensate, step.MaxRetries, step.Timeout)
return sb
}
// AddSteps 批量添加步骤
func (sb *SagaBuilder) AddSteps(steps ...*SagaStep) *SagaBuilder {
for _, step := range steps {
sb.AddStep(step)
}
return sb
}
// Execute 执行Saga
func (sb *SagaBuilder) Execute(ctx context.Context, data interface{}) error {
return sb.saga.Execute(ctx, data)
}
// GetSaga 获取Saga实例
func (sb *SagaBuilder) GetSaga() *Saga {
return sb.saga
}
// 便捷函数
// CreateSaga 快速创建Saga
func CreateSaga(manager *SagaManager, name string) *SagaBuilder {
id := fmt.Sprintf("%s_%d", name, time.Now().Unix())
return NewSagaBuilder(manager, id, name)
}
// ExecuteSaga 快速执行Saga
func ExecuteSaga(manager *SagaManager, name string, steps []*SagaStep, data interface{}, logger *zap.Logger) error {
saga := CreateSaga(manager, name)
saga.AddSteps(steps...)
logger.Info("开始执行Saga",
zap.String("saga_name", name),
zap.Int("steps_count", len(steps)))
return saga.Execute(context.Background(), data)
}