package saga import ( "context" "fmt" "sync" "time" "go.uber.org/zap" "tyapi-server/internal/shared/interfaces" ) // SagaStatus Saga状态 type SagaStatus int const ( // StatusPending 等待中 StatusPending SagaStatus = iota // StatusRunning 执行中 StatusRunning // StatusCompleted 已完成 StatusCompleted // StatusFailed 失败 StatusFailed // StatusCompensating 补偿中 StatusCompensating // StatusCompensated 已补偿 StatusCompensated // StatusAborted 已中止 StatusAborted ) func (s SagaStatus) String() string { switch s { case StatusPending: return "PENDING" case StatusRunning: return "RUNNING" case StatusCompleted: return "COMPLETED" case StatusFailed: return "FAILED" case StatusCompensating: return "COMPENSATING" case StatusCompensated: return "COMPENSATED" case StatusAborted: return "ABORTED" default: return "UNKNOWN" } } // StepStatus 步骤状态 type StepStatus int const ( // StepPending 等待执行 StepPending StepStatus = iota // StepRunning 执行中 StepRunning // StepCompleted 完成 StepCompleted // StepFailed 失败 StepFailed // StepCompensated 已补偿 StepCompensated // StepSkipped 跳过 StepSkipped ) func (s StepStatus) String() string { switch s { case StepPending: return "PENDING" case StepRunning: return "RUNNING" case StepCompleted: return "COMPLETED" case StepFailed: return "FAILED" case StepCompensated: return "COMPENSATED" case StepSkipped: return "SKIPPED" default: return "UNKNOWN" } } // SagaStep Saga步骤 type SagaStep struct { Name string Action func(ctx context.Context, data interface{}) error Compensate func(ctx context.Context, data interface{}) error Status StepStatus Error error StartTime time.Time EndTime time.Time RetryCount int MaxRetries int Timeout time.Duration } // SagaConfig Saga配置 type SagaConfig struct { // 默认超时时间 DefaultTimeout time.Duration // 默认重试次数 DefaultMaxRetries int // 是否并行执行(当前只支持串行) Parallel bool // 事件发布器 EventBus interfaces.EventBus } // DefaultSagaConfig 默认Saga配置 func DefaultSagaConfig() SagaConfig { return SagaConfig{ DefaultTimeout: 30 * time.Second, DefaultMaxRetries: 3, Parallel: false, } } // Saga 分布式事务 type Saga struct { ID string Name string Steps []*SagaStep Status SagaStatus Data interface{} StartTime time.Time EndTime time.Time Error error Config SagaConfig logger *zap.Logger mutex sync.RWMutex currentStep int result interface{} } // NewSaga 创建新的Saga func NewSaga(id, name string, config SagaConfig, logger *zap.Logger) *Saga { return &Saga{ ID: id, Name: name, Steps: make([]*SagaStep, 0), Status: StatusPending, Config: config, logger: logger, currentStep: -1, } } // AddStep 添加步骤 func (s *Saga) AddStep(name string, action, compensate func(ctx context.Context, data interface{}) error) *Saga { step := &SagaStep{ Name: name, Action: action, Compensate: compensate, Status: StepPending, MaxRetries: s.Config.DefaultMaxRetries, Timeout: s.Config.DefaultTimeout, } s.mutex.Lock() s.Steps = append(s.Steps, step) s.mutex.Unlock() s.logger.Debug("Added step to saga", zap.String("saga_id", s.ID), zap.String("step_name", name)) return s } // AddStepWithConfig 添加带配置的步骤 func (s *Saga) AddStepWithConfig(name string, action, compensate func(ctx context.Context, data interface{}) error, maxRetries int, timeout time.Duration) *Saga { step := &SagaStep{ Name: name, Action: action, Compensate: compensate, Status: StepPending, MaxRetries: maxRetries, Timeout: timeout, } s.mutex.Lock() s.Steps = append(s.Steps, step) s.mutex.Unlock() s.logger.Debug("Added step with config to saga", zap.String("saga_id", s.ID), zap.String("step_name", name), zap.Int("max_retries", maxRetries), zap.Duration("timeout", timeout)) return s } // Execute 执行Saga func (s *Saga) Execute(ctx context.Context, data interface{}) error { s.mutex.Lock() if s.Status != StatusPending { s.mutex.Unlock() return fmt.Errorf("saga %s is not in pending status", s.ID) } s.Status = StatusRunning s.Data = data s.StartTime = time.Now() s.mutex.Unlock() s.logger.Info("Starting saga execution", zap.String("saga_id", s.ID), zap.String("saga_name", s.Name), zap.Int("total_steps", len(s.Steps))) // 发布Saga开始事件 s.publishEvent(ctx, "saga.started") // 执行所有步骤 for i, step := range s.Steps { s.mutex.Lock() s.currentStep = i s.mutex.Unlock() if err := s.executeStep(ctx, step, data); err != nil { s.logger.Error("Step execution failed", zap.String("saga_id", s.ID), zap.String("step_name", step.Name), zap.Error(err)) // 执行补偿 if compensateErr := s.compensate(ctx, i-1); compensateErr != nil { s.logger.Error("Compensation failed", zap.String("saga_id", s.ID), zap.Error(compensateErr)) s.setStatus(StatusAborted) s.publishEvent(ctx, "saga.aborted") return fmt.Errorf("saga execution failed and compensation failed: %w", compensateErr) } s.setStatus(StatusCompensated) s.publishEvent(ctx, "saga.compensated") return fmt.Errorf("saga execution failed: %w", err) } } // 所有步骤成功完成 s.setStatus(StatusCompleted) s.EndTime = time.Now() s.logger.Info("Saga completed successfully", zap.String("saga_id", s.ID), zap.Duration("duration", s.EndTime.Sub(s.StartTime))) s.publishEvent(ctx, "saga.completed") return nil } // executeStep 执行单个步骤 func (s *Saga) executeStep(ctx context.Context, step *SagaStep, data interface{}) error { step.Status = StepRunning step.StartTime = time.Now() s.logger.Debug("Executing step", zap.String("saga_id", s.ID), zap.String("step_name", step.Name)) // 设置超时上下文 stepCtx, cancel := context.WithTimeout(ctx, step.Timeout) defer cancel() // 重试逻辑 var lastErr error for attempt := 0; attempt <= step.MaxRetries; attempt++ { if attempt > 0 { s.logger.Debug("Retrying step", zap.String("saga_id", s.ID), zap.String("step_name", step.Name), zap.Int("attempt", attempt)) } err := step.Action(stepCtx, data) if err == nil { step.Status = StepCompleted step.EndTime = time.Now() s.logger.Debug("Step completed successfully", zap.String("saga_id", s.ID), zap.String("step_name", step.Name), zap.Duration("duration", step.EndTime.Sub(step.StartTime))) return nil } lastErr = err step.RetryCount = attempt // 检查是否应该重试 if attempt < step.MaxRetries { select { case <-stepCtx.Done(): // 上下文被取消,停止重试 break case <-time.After(time.Duration(attempt+1) * 100 * time.Millisecond): // 等待一段时间后重试 } } } // 所有重试都失败了 step.Status = StepFailed step.Error = lastErr step.EndTime = time.Now() return lastErr } // compensate 执行补偿 func (s *Saga) compensate(ctx context.Context, fromStep int) error { s.setStatus(StatusCompensating) s.logger.Info("Starting compensation", zap.String("saga_id", s.ID), zap.Int("from_step", fromStep)) // 逆序执行补偿 for i := fromStep; i >= 0; i-- { step := s.Steps[i] // 只补偿已完成的步骤 if step.Status != StepCompleted { step.Status = StepSkipped continue } if step.Compensate == nil { s.logger.Warn("No compensation function for step", zap.String("saga_id", s.ID), zap.String("step_name", step.Name)) continue } s.logger.Debug("Compensating step", zap.String("saga_id", s.ID), zap.String("step_name", step.Name)) // 设置超时上下文 compensateCtx, cancel := context.WithTimeout(ctx, step.Timeout) err := step.Compensate(compensateCtx, s.Data) cancel() if err != nil { s.logger.Error("Compensation failed for step", zap.String("saga_id", s.ID), zap.String("step_name", step.Name), zap.Error(err)) return err } step.Status = StepCompensated s.logger.Debug("Step compensated successfully", zap.String("saga_id", s.ID), zap.String("step_name", step.Name)) } s.logger.Info("Compensation completed", zap.String("saga_id", s.ID)) return nil } // setStatus 设置状态 func (s *Saga) setStatus(status SagaStatus) { s.mutex.Lock() defer s.mutex.Unlock() s.Status = status } // GetStatus 获取状态 func (s *Saga) GetStatus() SagaStatus { s.mutex.RLock() defer s.mutex.RUnlock() return s.Status } // GetProgress 获取进度 func (s *Saga) GetProgress() SagaProgress { s.mutex.RLock() defer s.mutex.RUnlock() completed := 0 for _, step := range s.Steps { if step.Status == StepCompleted { completed++ } } var percentage float64 if len(s.Steps) > 0 { percentage = float64(completed) / float64(len(s.Steps)) * 100 } return SagaProgress{ SagaID: s.ID, Status: s.Status.String(), TotalSteps: len(s.Steps), CompletedSteps: completed, CurrentStep: s.currentStep + 1, PercentComplete: percentage, StartTime: s.StartTime, Duration: time.Since(s.StartTime), } } // GetStepStatus 获取所有步骤状态 func (s *Saga) GetStepStatus() []StepProgress { s.mutex.RLock() defer s.mutex.RUnlock() progress := make([]StepProgress, len(s.Steps)) for i, step := range s.Steps { progress[i] = StepProgress{ Name: step.Name, Status: step.Status.String(), RetryCount: step.RetryCount, StartTime: step.StartTime, EndTime: step.EndTime, Duration: step.EndTime.Sub(step.StartTime), Error: "", } if step.Error != nil { progress[i].Error = step.Error.Error() } } return progress } // publishEvent 发布事件 func (s *Saga) publishEvent(ctx context.Context, eventType string) { if s.Config.EventBus == nil { return } event := &SagaEvent{ SagaID: s.ID, SagaName: s.Name, EventType: eventType, Status: s.Status.String(), Timestamp: time.Now(), Data: s.Data, } // 这里应该实现Event接口,简化处理 _ = event } // SagaProgress Saga进度 type SagaProgress struct { SagaID string `json:"saga_id"` Status string `json:"status"` TotalSteps int `json:"total_steps"` CompletedSteps int `json:"completed_steps"` CurrentStep int `json:"current_step"` PercentComplete float64 `json:"percent_complete"` StartTime time.Time `json:"start_time"` Duration time.Duration `json:"duration"` } // StepProgress 步骤进度 type StepProgress struct { Name string `json:"name"` Status string `json:"status"` RetryCount int `json:"retry_count"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` Duration time.Duration `json:"duration"` Error string `json:"error,omitempty"` } // SagaEvent Saga事件 type SagaEvent struct { SagaID string `json:"saga_id"` SagaName string `json:"saga_name"` EventType string `json:"event_type"` Status string `json:"status"` Timestamp time.Time `json:"timestamp"` Data interface{} `json:"data,omitempty"` } // SagaManager Saga管理器 type SagaManager struct { sagas map[string]*Saga logger *zap.Logger mutex sync.RWMutex config SagaConfig } // NewSagaManager 创建Saga管理器 func NewSagaManager(config SagaConfig, logger *zap.Logger) *SagaManager { return &SagaManager{ sagas: make(map[string]*Saga), logger: logger, config: config, } } // CreateSaga 创建Saga func (sm *SagaManager) CreateSaga(id, name string) *Saga { saga := NewSaga(id, name, sm.config, sm.logger.Named("saga")) sm.mutex.Lock() sm.sagas[id] = saga sm.mutex.Unlock() sm.logger.Info("Created saga", zap.String("saga_id", id), zap.String("saga_name", name)) return saga } // GetSaga 获取Saga func (sm *SagaManager) GetSaga(id string) (*Saga, bool) { sm.mutex.RLock() defer sm.mutex.RUnlock() saga, exists := sm.sagas[id] return saga, exists } // ListSagas 列出所有Saga func (sm *SagaManager) ListSagas() []*Saga { sm.mutex.RLock() defer sm.mutex.RUnlock() sagas := make([]*Saga, 0, len(sm.sagas)) for _, saga := range sm.sagas { sagas = append(sagas, saga) } return sagas } // GetSagaProgress 获取Saga进度 func (sm *SagaManager) GetSagaProgress(id string) (SagaProgress, bool) { saga, exists := sm.GetSaga(id) if !exists { return SagaProgress{}, false } return saga.GetProgress(), true } // RemoveSaga 移除Saga func (sm *SagaManager) RemoveSaga(id string) { sm.mutex.Lock() defer sm.mutex.Unlock() delete(sm.sagas, id) sm.logger.Debug("Removed saga", zap.String("saga_id", id)) } // GetStats 获取统计信息 func (sm *SagaManager) GetStats() map[string]interface{} { sm.mutex.RLock() defer sm.mutex.RUnlock() statusCount := make(map[string]int) for _, saga := range sm.sagas { status := saga.GetStatus().String() statusCount[status]++ } return map[string]interface{}{ "total_sagas": len(sm.sagas), "status_count": statusCount, } } // 实现Service接口 // Name 返回服务名称 func (sm *SagaManager) Name() string { return "saga-manager" } // Initialize 初始化服务 func (sm *SagaManager) Initialize(ctx context.Context) error { sm.logger.Info("Saga manager service initialized") return nil } // HealthCheck 健康检查 func (sm *SagaManager) HealthCheck(ctx context.Context) error { return nil } // Shutdown 关闭服务 func (sm *SagaManager) Shutdown(ctx context.Context) error { sm.logger.Info("Saga manager service shutdown") return nil }