731 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			731 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package saga
 | ||
| 
 | ||
| import (
 | ||
| 	"context"
 | ||
| 	"fmt"
 | ||
| 	"sync"
 | ||
| 	"time"
 | ||
| 
 | ||
| 	"go.uber.org/zap"
 | ||
| 
 | ||
| 	"tyapi-server/internal/shared/interfaces"
 | ||
| )
 | ||
| 
 | ||
| // SagaStatus Saga状态
 | ||
| type SagaStatus int
 | ||
| 
 | ||
| const (
 | ||
| 	// StatusPending 等待中
 | ||
| 	StatusPending SagaStatus = iota
 | ||
| 	// StatusRunning 执行中
 | ||
| 	StatusRunning
 | ||
| 	// StatusCompleted 已完成
 | ||
| 	StatusCompleted
 | ||
| 	// StatusFailed 失败
 | ||
| 	StatusFailed
 | ||
| 	// StatusCompensating 补偿中
 | ||
| 	StatusCompensating
 | ||
| 	// StatusCompensated 已补偿
 | ||
| 	StatusCompensated
 | ||
| 	// StatusAborted 已中止
 | ||
| 	StatusAborted
 | ||
| )
 | ||
| 
 | ||
| func (s SagaStatus) String() string {
 | ||
| 	switch s {
 | ||
| 	case StatusPending:
 | ||
| 		return "PENDING"
 | ||
| 	case StatusRunning:
 | ||
| 		return "RUNNING"
 | ||
| 	case StatusCompleted:
 | ||
| 		return "COMPLETED"
 | ||
| 	case StatusFailed:
 | ||
| 		return "FAILED"
 | ||
| 	case StatusCompensating:
 | ||
| 		return "COMPENSATING"
 | ||
| 	case StatusCompensated:
 | ||
| 		return "COMPENSATED"
 | ||
| 	case StatusAborted:
 | ||
| 		return "ABORTED"
 | ||
| 	default:
 | ||
| 		return "UNKNOWN"
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // StepStatus 步骤状态
 | ||
| type StepStatus int
 | ||
| 
 | ||
| const (
 | ||
| 	// StepPending 等待执行
 | ||
| 	StepPending StepStatus = iota
 | ||
| 	// StepRunning 执行中
 | ||
| 	StepRunning
 | ||
| 	// StepCompleted 完成
 | ||
| 	StepCompleted
 | ||
| 	// StepFailed 失败
 | ||
| 	StepFailed
 | ||
| 	// StepCompensated 已补偿
 | ||
| 	StepCompensated
 | ||
| 	// StepSkipped 跳过
 | ||
| 	StepSkipped
 | ||
| )
 | ||
| 
 | ||
| func (s StepStatus) String() string {
 | ||
| 	switch s {
 | ||
| 	case StepPending:
 | ||
| 		return "PENDING"
 | ||
| 	case StepRunning:
 | ||
| 		return "RUNNING"
 | ||
| 	case StepCompleted:
 | ||
| 		return "COMPLETED"
 | ||
| 	case StepFailed:
 | ||
| 		return "FAILED"
 | ||
| 	case StepCompensated:
 | ||
| 		return "COMPENSATED"
 | ||
| 	case StepSkipped:
 | ||
| 		return "SKIPPED"
 | ||
| 	default:
 | ||
| 		return "UNKNOWN"
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // SagaStep Saga步骤
 | ||
| type SagaStep struct {
 | ||
| 	Name       string
 | ||
| 	Action     func(ctx context.Context, data interface{}) error
 | ||
| 	Compensate func(ctx context.Context, data interface{}) error
 | ||
| 	Status     StepStatus
 | ||
| 	Error      error
 | ||
| 	StartTime  time.Time
 | ||
| 	EndTime    time.Time
 | ||
| 	RetryCount int
 | ||
| 	MaxRetries int
 | ||
| 	Timeout    time.Duration
 | ||
| }
 | ||
| 
 | ||
| // SagaConfig Saga配置
 | ||
| type SagaConfig struct {
 | ||
| 	// 默认超时时间
 | ||
| 	DefaultTimeout time.Duration
 | ||
| 	// 默认重试次数
 | ||
| 	DefaultMaxRetries int
 | ||
| 	// 是否并行执行(当前只支持串行)
 | ||
| 	Parallel bool
 | ||
| 	// 事件发布器
 | ||
| 	EventBus interfaces.EventBus
 | ||
| }
 | ||
| 
 | ||
| // DefaultSagaConfig 默认Saga配置
 | ||
| func DefaultSagaConfig() SagaConfig {
 | ||
| 	return SagaConfig{
 | ||
| 		DefaultTimeout:    30 * time.Second,
 | ||
| 		DefaultMaxRetries: 3,
 | ||
| 		Parallel:          false,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // Saga 分布式事务
 | ||
| type Saga struct {
 | ||
| 	ID          string
 | ||
| 	Name        string
 | ||
| 	Steps       []*SagaStep
 | ||
| 	Status      SagaStatus
 | ||
| 	Data        interface{}
 | ||
| 	StartTime   time.Time
 | ||
| 	EndTime     time.Time
 | ||
| 	Error       error
 | ||
| 	Config      SagaConfig
 | ||
| 	logger      *zap.Logger
 | ||
| 	mutex       sync.RWMutex
 | ||
| 	currentStep int
 | ||
| 	result      interface{}
 | ||
| }
 | ||
| 
 | ||
| // NewSaga 创建新的Saga
 | ||
| func NewSaga(id, name string, config SagaConfig, logger *zap.Logger) *Saga {
 | ||
| 	return &Saga{
 | ||
| 		ID:          id,
 | ||
| 		Name:        name,
 | ||
| 		Steps:       make([]*SagaStep, 0),
 | ||
| 		Status:      StatusPending,
 | ||
| 		Config:      config,
 | ||
| 		logger:      logger,
 | ||
| 		currentStep: -1,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // AddStep 添加步骤
 | ||
| func (s *Saga) AddStep(name string, action, compensate func(ctx context.Context, data interface{}) error) *Saga {
 | ||
| 	step := &SagaStep{
 | ||
| 		Name:       name,
 | ||
| 		Action:     action,
 | ||
| 		Compensate: compensate,
 | ||
| 		Status:     StepPending,
 | ||
| 		MaxRetries: s.Config.DefaultMaxRetries,
 | ||
| 		Timeout:    s.Config.DefaultTimeout,
 | ||
| 	}
 | ||
| 
 | ||
| 	s.mutex.Lock()
 | ||
| 	s.Steps = append(s.Steps, step)
 | ||
| 	s.mutex.Unlock()
 | ||
| 
 | ||
| 	s.logger.Debug("Added step to saga",
 | ||
| 		zap.String("saga_id", s.ID),
 | ||
| 		zap.String("step_name", name))
 | ||
| 
 | ||
| 	return s
 | ||
| }
 | ||
| 
 | ||
| // AddStepWithConfig 添加带配置的步骤
 | ||
| func (s *Saga) AddStepWithConfig(name string, action, compensate func(ctx context.Context, data interface{}) error, maxRetries int, timeout time.Duration) *Saga {
 | ||
| 	step := &SagaStep{
 | ||
| 		Name:       name,
 | ||
| 		Action:     action,
 | ||
| 		Compensate: compensate,
 | ||
| 		Status:     StepPending,
 | ||
| 		MaxRetries: maxRetries,
 | ||
| 		Timeout:    timeout,
 | ||
| 	}
 | ||
| 
 | ||
| 	s.mutex.Lock()
 | ||
| 	s.Steps = append(s.Steps, step)
 | ||
| 	s.mutex.Unlock()
 | ||
| 
 | ||
| 	s.logger.Debug("Added step with config to saga",
 | ||
| 		zap.String("saga_id", s.ID),
 | ||
| 		zap.String("step_name", name),
 | ||
| 		zap.Int("max_retries", maxRetries),
 | ||
| 		zap.Duration("timeout", timeout))
 | ||
| 
 | ||
| 	return s
 | ||
| }
 | ||
| 
 | ||
| // Execute 执行Saga
 | ||
| func (s *Saga) Execute(ctx context.Context, data interface{}) error {
 | ||
| 	s.mutex.Lock()
 | ||
| 	if s.Status != StatusPending {
 | ||
| 		s.mutex.Unlock()
 | ||
| 		return fmt.Errorf("saga %s is not in pending status", s.ID)
 | ||
| 	}
 | ||
| 
 | ||
| 	s.Status = StatusRunning
 | ||
| 	s.Data = data
 | ||
| 	s.StartTime = time.Now()
 | ||
| 	s.mutex.Unlock()
 | ||
| 
 | ||
| 	s.logger.Info("Starting saga execution",
 | ||
| 		zap.String("saga_id", s.ID),
 | ||
| 		zap.String("saga_name", s.Name),
 | ||
| 		zap.Int("total_steps", len(s.Steps)))
 | ||
| 
 | ||
| 	// 发布Saga开始事件
 | ||
| 	s.publishEvent(ctx, "saga.started")
 | ||
| 
 | ||
| 	// 执行所有步骤
 | ||
| 	for i, step := range s.Steps {
 | ||
| 		s.mutex.Lock()
 | ||
| 		s.currentStep = i
 | ||
| 		s.mutex.Unlock()
 | ||
| 
 | ||
| 		if err := s.executeStep(ctx, step, data); err != nil {
 | ||
| 			s.logger.Error("Step execution failed",
 | ||
| 				zap.String("saga_id", s.ID),
 | ||
| 				zap.String("step_name", step.Name),
 | ||
| 				zap.Error(err))
 | ||
| 
 | ||
| 			// 执行补偿
 | ||
| 			if compensateErr := s.compensate(ctx, i-1); compensateErr != nil {
 | ||
| 				s.logger.Error("Compensation failed",
 | ||
| 					zap.String("saga_id", s.ID),
 | ||
| 					zap.Error(compensateErr))
 | ||
| 
 | ||
| 				s.setStatus(StatusAborted)
 | ||
| 				s.publishEvent(ctx, "saga.aborted")
 | ||
| 				return fmt.Errorf("saga execution failed and compensation failed: %w", compensateErr)
 | ||
| 			}
 | ||
| 
 | ||
| 			s.setStatus(StatusCompensated)
 | ||
| 			s.publishEvent(ctx, "saga.compensated")
 | ||
| 			return fmt.Errorf("saga execution failed: %w", err)
 | ||
| 		}
 | ||
| 	}
 | ||
| 
 | ||
| 	// 所有步骤成功完成
 | ||
| 	s.setStatus(StatusCompleted)
 | ||
| 	s.EndTime = time.Now()
 | ||
| 
 | ||
| 	s.logger.Info("Saga completed successfully",
 | ||
| 		zap.String("saga_id", s.ID),
 | ||
| 		zap.Duration("duration", s.EndTime.Sub(s.StartTime)))
 | ||
| 
 | ||
| 	s.publishEvent(ctx, "saga.completed")
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // executeStep 执行单个步骤
 | ||
| func (s *Saga) executeStep(ctx context.Context, step *SagaStep, data interface{}) error {
 | ||
| 	step.Status = StepRunning
 | ||
| 	step.StartTime = time.Now()
 | ||
| 
 | ||
| 	s.logger.Debug("Executing step",
 | ||
| 		zap.String("saga_id", s.ID),
 | ||
| 		zap.String("step_name", step.Name))
 | ||
| 
 | ||
| 	// 设置超时上下文
 | ||
| 	stepCtx, cancel := context.WithTimeout(ctx, step.Timeout)
 | ||
| 	defer cancel()
 | ||
| 
 | ||
| 	// 重试逻辑
 | ||
| 	var lastErr error
 | ||
| 	for attempt := 0; attempt <= step.MaxRetries; attempt++ {
 | ||
| 		if attempt > 0 {
 | ||
| 			s.logger.Debug("Retrying step",
 | ||
| 				zap.String("saga_id", s.ID),
 | ||
| 				zap.String("step_name", step.Name),
 | ||
| 				zap.Int("attempt", attempt))
 | ||
| 		}
 | ||
| 
 | ||
| 		err := step.Action(stepCtx, data)
 | ||
| 		if err == nil {
 | ||
| 			step.Status = StepCompleted
 | ||
| 			step.EndTime = time.Now()
 | ||
| 
 | ||
| 			s.logger.Debug("Step completed successfully",
 | ||
| 				zap.String("saga_id", s.ID),
 | ||
| 				zap.String("step_name", step.Name),
 | ||
| 				zap.Duration("duration", step.EndTime.Sub(step.StartTime)))
 | ||
| 
 | ||
| 			return nil
 | ||
| 		}
 | ||
| 
 | ||
| 		lastErr = err
 | ||
| 		step.RetryCount = attempt
 | ||
| 
 | ||
| 		// 检查是否应该重试
 | ||
| 		if attempt < step.MaxRetries {
 | ||
| 			select {
 | ||
| 			case <-stepCtx.Done():
 | ||
| 				// 上下文被取消,停止重试
 | ||
| 				break
 | ||
| 			case <-time.After(time.Duration(attempt+1) * 100 * time.Millisecond):
 | ||
| 				// 等待一段时间后重试
 | ||
| 			}
 | ||
| 		}
 | ||
| 	}
 | ||
| 
 | ||
| 	// 所有重试都失败了
 | ||
| 	step.Status = StepFailed
 | ||
| 	step.Error = lastErr
 | ||
| 	step.EndTime = time.Now()
 | ||
| 
 | ||
| 	return lastErr
 | ||
| }
 | ||
| 
 | ||
| // compensate 执行补偿
 | ||
| func (s *Saga) compensate(ctx context.Context, fromStep int) error {
 | ||
| 	s.setStatus(StatusCompensating)
 | ||
| 
 | ||
| 	s.logger.Info("Starting compensation",
 | ||
| 		zap.String("saga_id", s.ID),
 | ||
| 		zap.Int("from_step", fromStep))
 | ||
| 
 | ||
| 	// 逆序执行补偿
 | ||
| 	for i := fromStep; i >= 0; i-- {
 | ||
| 		step := s.Steps[i]
 | ||
| 
 | ||
| 		// 只补偿已完成的步骤
 | ||
| 		if step.Status != StepCompleted {
 | ||
| 			step.Status = StepSkipped
 | ||
| 			continue
 | ||
| 		}
 | ||
| 
 | ||
| 		if step.Compensate == nil {
 | ||
| 			s.logger.Warn("No compensation function for step",
 | ||
| 				zap.String("saga_id", s.ID),
 | ||
| 				zap.String("step_name", step.Name))
 | ||
| 			continue
 | ||
| 		}
 | ||
| 
 | ||
| 		s.logger.Debug("Compensating step",
 | ||
| 			zap.String("saga_id", s.ID),
 | ||
| 			zap.String("step_name", step.Name))
 | ||
| 
 | ||
| 		// 设置超时上下文
 | ||
| 		compensateCtx, cancel := context.WithTimeout(ctx, step.Timeout)
 | ||
| 
 | ||
| 		err := step.Compensate(compensateCtx, s.Data)
 | ||
| 		cancel()
 | ||
| 
 | ||
| 		if err != nil {
 | ||
| 			s.logger.Error("Compensation failed for step",
 | ||
| 				zap.String("saga_id", s.ID),
 | ||
| 				zap.String("step_name", step.Name),
 | ||
| 				zap.Error(err))
 | ||
| 			return err
 | ||
| 		}
 | ||
| 
 | ||
| 		step.Status = StepCompensated
 | ||
| 
 | ||
| 		s.logger.Debug("Step compensated successfully",
 | ||
| 			zap.String("saga_id", s.ID),
 | ||
| 			zap.String("step_name", step.Name))
 | ||
| 	}
 | ||
| 
 | ||
| 	s.logger.Info("Compensation completed",
 | ||
| 		zap.String("saga_id", s.ID))
 | ||
| 
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // setStatus 设置状态
 | ||
| func (s *Saga) setStatus(status SagaStatus) {
 | ||
| 	s.mutex.Lock()
 | ||
| 	defer s.mutex.Unlock()
 | ||
| 	s.Status = status
 | ||
| }
 | ||
| 
 | ||
| // GetStatus 获取状态
 | ||
| func (s *Saga) GetStatus() SagaStatus {
 | ||
| 	s.mutex.RLock()
 | ||
| 	defer s.mutex.RUnlock()
 | ||
| 	return s.Status
 | ||
| }
 | ||
| 
 | ||
| // GetProgress 获取进度
 | ||
| func (s *Saga) GetProgress() SagaProgress {
 | ||
| 	s.mutex.RLock()
 | ||
| 	defer s.mutex.RUnlock()
 | ||
| 
 | ||
| 	completed := 0
 | ||
| 	for _, step := range s.Steps {
 | ||
| 		if step.Status == StepCompleted {
 | ||
| 			completed++
 | ||
| 		}
 | ||
| 	}
 | ||
| 
 | ||
| 	var percentage float64
 | ||
| 	if len(s.Steps) > 0 {
 | ||
| 		percentage = float64(completed) / float64(len(s.Steps)) * 100
 | ||
| 	}
 | ||
| 
 | ||
| 	return SagaProgress{
 | ||
| 		SagaID:          s.ID,
 | ||
| 		Status:          s.Status.String(),
 | ||
| 		TotalSteps:      len(s.Steps),
 | ||
| 		CompletedSteps:  completed,
 | ||
| 		CurrentStep:     s.currentStep + 1,
 | ||
| 		PercentComplete: percentage,
 | ||
| 		StartTime:       s.StartTime,
 | ||
| 		Duration:        time.Since(s.StartTime),
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // GetStepStatus 获取所有步骤状态
 | ||
| func (s *Saga) GetStepStatus() []StepProgress {
 | ||
| 	s.mutex.RLock()
 | ||
| 	defer s.mutex.RUnlock()
 | ||
| 
 | ||
| 	progress := make([]StepProgress, len(s.Steps))
 | ||
| 	for i, step := range s.Steps {
 | ||
| 		progress[i] = StepProgress{
 | ||
| 			Name:       step.Name,
 | ||
| 			Status:     step.Status.String(),
 | ||
| 			RetryCount: step.RetryCount,
 | ||
| 			StartTime:  step.StartTime,
 | ||
| 			EndTime:    step.EndTime,
 | ||
| 			Duration:   step.EndTime.Sub(step.StartTime),
 | ||
| 			Error:      "",
 | ||
| 		}
 | ||
| 
 | ||
| 		if step.Error != nil {
 | ||
| 			progress[i].Error = step.Error.Error()
 | ||
| 		}
 | ||
| 	}
 | ||
| 
 | ||
| 	return progress
 | ||
| }
 | ||
| 
 | ||
| // publishEvent 发布事件
 | ||
| func (s *Saga) publishEvent(ctx context.Context, eventType string) {
 | ||
| 	if s.Config.EventBus == nil {
 | ||
| 		return
 | ||
| 	}
 | ||
| 
 | ||
| 	event := &SagaEvent{
 | ||
| 		SagaID:    s.ID,
 | ||
| 		SagaName:  s.Name,
 | ||
| 		EventType: eventType,
 | ||
| 		Status:    s.Status.String(),
 | ||
| 		Timestamp: time.Now(),
 | ||
| 		Data:      s.Data,
 | ||
| 	}
 | ||
| 
 | ||
| 	// 这里应该实现Event接口,简化处理
 | ||
| 	_ = event
 | ||
| }
 | ||
| 
 | ||
| // SagaProgress Saga进度
 | ||
| type SagaProgress struct {
 | ||
| 	SagaID          string        `json:"saga_id"`
 | ||
| 	Status          string        `json:"status"`
 | ||
| 	TotalSteps      int           `json:"total_steps"`
 | ||
| 	CompletedSteps  int           `json:"completed_steps"`
 | ||
| 	CurrentStep     int           `json:"current_step"`
 | ||
| 	PercentComplete float64       `json:"percent_complete"`
 | ||
| 	StartTime       time.Time     `json:"start_time"`
 | ||
| 	Duration        time.Duration `json:"duration"`
 | ||
| }
 | ||
| 
 | ||
| // StepProgress 步骤进度
 | ||
| type StepProgress struct {
 | ||
| 	Name       string        `json:"name"`
 | ||
| 	Status     string        `json:"status"`
 | ||
| 	RetryCount int           `json:"retry_count"`
 | ||
| 	StartTime  time.Time     `json:"start_time"`
 | ||
| 	EndTime    time.Time     `json:"end_time"`
 | ||
| 	Duration   time.Duration `json:"duration"`
 | ||
| 	Error      string        `json:"error,omitempty"`
 | ||
| }
 | ||
| 
 | ||
| // SagaEvent Saga事件
 | ||
| type SagaEvent struct {
 | ||
| 	SagaID    string      `json:"saga_id"`
 | ||
| 	SagaName  string      `json:"saga_name"`
 | ||
| 	EventType string      `json:"event_type"`
 | ||
| 	Status    string      `json:"status"`
 | ||
| 	Timestamp time.Time   `json:"timestamp"`
 | ||
| 	Data      interface{} `json:"data,omitempty"`
 | ||
| }
 | ||
| 
 | ||
| // SagaManager Saga管理器
 | ||
| type SagaManager struct {
 | ||
| 	sagas  map[string]*Saga
 | ||
| 	logger *zap.Logger
 | ||
| 	mutex  sync.RWMutex
 | ||
| 	config SagaConfig
 | ||
| }
 | ||
| 
 | ||
| // NewSagaManager 创建Saga管理器
 | ||
| func NewSagaManager(config SagaConfig, logger *zap.Logger) *SagaManager {
 | ||
| 	return &SagaManager{
 | ||
| 		sagas:  make(map[string]*Saga),
 | ||
| 		logger: logger,
 | ||
| 		config: config,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // CreateSaga 创建Saga
 | ||
| func (sm *SagaManager) CreateSaga(id, name string) *Saga {
 | ||
| 	saga := NewSaga(id, name, sm.config, sm.logger.Named("saga"))
 | ||
| 
 | ||
| 	sm.mutex.Lock()
 | ||
| 	sm.sagas[id] = saga
 | ||
| 	sm.mutex.Unlock()
 | ||
| 
 | ||
| 	sm.logger.Info("Created saga",
 | ||
| 		zap.String("saga_id", id),
 | ||
| 		zap.String("saga_name", name))
 | ||
| 
 | ||
| 	return saga
 | ||
| }
 | ||
| 
 | ||
| // GetSaga 获取Saga
 | ||
| func (sm *SagaManager) GetSaga(id string) (*Saga, bool) {
 | ||
| 	sm.mutex.RLock()
 | ||
| 	defer sm.mutex.RUnlock()
 | ||
| 
 | ||
| 	saga, exists := sm.sagas[id]
 | ||
| 	return saga, exists
 | ||
| }
 | ||
| 
 | ||
| // ListSagas 列出所有Saga
 | ||
| func (sm *SagaManager) ListSagas() []*Saga {
 | ||
| 	sm.mutex.RLock()
 | ||
| 	defer sm.mutex.RUnlock()
 | ||
| 
 | ||
| 	sagas := make([]*Saga, 0, len(sm.sagas))
 | ||
| 	for _, saga := range sm.sagas {
 | ||
| 		sagas = append(sagas, saga)
 | ||
| 	}
 | ||
| 
 | ||
| 	return sagas
 | ||
| }
 | ||
| 
 | ||
| // GetSagaProgress 获取Saga进度
 | ||
| func (sm *SagaManager) GetSagaProgress(id string) (SagaProgress, bool) {
 | ||
| 	saga, exists := sm.GetSaga(id)
 | ||
| 	if !exists {
 | ||
| 		return SagaProgress{}, false
 | ||
| 	}
 | ||
| 
 | ||
| 	return saga.GetProgress(), true
 | ||
| }
 | ||
| 
 | ||
| // RemoveSaga 移除Saga
 | ||
| func (sm *SagaManager) RemoveSaga(id string) {
 | ||
| 	sm.mutex.Lock()
 | ||
| 	defer sm.mutex.Unlock()
 | ||
| 
 | ||
| 	delete(sm.sagas, id)
 | ||
| 	sm.logger.Debug("Removed saga", zap.String("saga_id", id))
 | ||
| }
 | ||
| 
 | ||
| // GetStats 获取统计信息
 | ||
| func (sm *SagaManager) GetStats() map[string]interface{} {
 | ||
| 	sm.mutex.RLock()
 | ||
| 	defer sm.mutex.RUnlock()
 | ||
| 
 | ||
| 	statusCount := make(map[string]int)
 | ||
| 	for _, saga := range sm.sagas {
 | ||
| 		status := saga.GetStatus().String()
 | ||
| 		statusCount[status]++
 | ||
| 	}
 | ||
| 
 | ||
| 	return map[string]interface{}{
 | ||
| 		"total_sagas":  len(sm.sagas),
 | ||
| 		"status_count": statusCount,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // 实现Service接口
 | ||
| 
 | ||
| // Name 返回服务名称
 | ||
| func (sm *SagaManager) Name() string {
 | ||
| 	return "saga-manager"
 | ||
| }
 | ||
| 
 | ||
| // Initialize 初始化服务
 | ||
| func (sm *SagaManager) Initialize(ctx context.Context) error {
 | ||
| 	sm.logger.Info("Saga manager service initialized")
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // HealthCheck 健康检查
 | ||
| func (sm *SagaManager) HealthCheck(ctx context.Context) error {
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // Shutdown 关闭服务
 | ||
| func (sm *SagaManager) Shutdown(ctx context.Context) error {
 | ||
| 	sm.logger.Info("Saga manager service shutdown")
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // ==================== Saga构建器 ====================
 | ||
| 
 | ||
| // StepBuilder Saga步骤构建器
 | ||
| type StepBuilder struct {
 | ||
| 	name       string
 | ||
| 	action     func(ctx context.Context, data interface{}) error
 | ||
| 	compensate func(ctx context.Context, data interface{}) error
 | ||
| 	timeout    time.Duration
 | ||
| 	maxRetries int
 | ||
| }
 | ||
| 
 | ||
| // Step 创建步骤构建器
 | ||
| func Step(name string) *StepBuilder {
 | ||
| 	return &StepBuilder{
 | ||
| 		name:       name,
 | ||
| 		timeout:    30 * time.Second,
 | ||
| 		maxRetries: 3,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // Action 设置正向操作
 | ||
| func (sb *StepBuilder) Action(action func(ctx context.Context, data interface{}) error) *StepBuilder {
 | ||
| 	sb.action = action
 | ||
| 	return sb
 | ||
| }
 | ||
| 
 | ||
| // Compensate 设置补偿操作
 | ||
| func (sb *StepBuilder) Compensate(compensate func(ctx context.Context, data interface{}) error) *StepBuilder {
 | ||
| 	sb.compensate = compensate
 | ||
| 	return sb
 | ||
| }
 | ||
| 
 | ||
| // Timeout 设置超时时间
 | ||
| func (sb *StepBuilder) Timeout(timeout time.Duration) *StepBuilder {
 | ||
| 	sb.timeout = timeout
 | ||
| 	return sb
 | ||
| }
 | ||
| 
 | ||
| // MaxRetries 设置最大重试次数
 | ||
| func (sb *StepBuilder) MaxRetries(maxRetries int) *StepBuilder {
 | ||
| 	sb.maxRetries = maxRetries
 | ||
| 	return sb
 | ||
| }
 | ||
| 
 | ||
| // Build 构建Saga步骤
 | ||
| func (sb *StepBuilder) Build() *SagaStep {
 | ||
| 	return &SagaStep{
 | ||
| 		Name:       sb.name,
 | ||
| 		Action:     sb.action,
 | ||
| 		Compensate: sb.compensate,
 | ||
| 		Status:     StepPending,
 | ||
| 		MaxRetries: sb.maxRetries,
 | ||
| 		Timeout:    sb.timeout,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // SagaBuilder Saga构建器
 | ||
| type SagaBuilder struct {
 | ||
| 	manager *SagaManager
 | ||
| 	saga    *Saga
 | ||
| 	steps   []*SagaStep
 | ||
| }
 | ||
| 
 | ||
| // NewSagaBuilder 创建Saga构建器
 | ||
| func NewSagaBuilder(manager *SagaManager, id, name string) *SagaBuilder {
 | ||
| 	saga := manager.CreateSaga(id, name)
 | ||
| 	return &SagaBuilder{
 | ||
| 		manager: manager,
 | ||
| 		saga:    saga,
 | ||
| 		steps:   make([]*SagaStep, 0),
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // AddStep 添加步骤
 | ||
| func (sb *SagaBuilder) AddStep(step *SagaStep) *SagaBuilder {
 | ||
| 	sb.steps = append(sb.steps, step)
 | ||
| 	sb.saga.AddStepWithConfig(step.Name, step.Action, step.Compensate, step.MaxRetries, step.Timeout)
 | ||
| 	return sb
 | ||
| }
 | ||
| 
 | ||
| // AddSteps 批量添加步骤
 | ||
| func (sb *SagaBuilder) AddSteps(steps ...*SagaStep) *SagaBuilder {
 | ||
| 	for _, step := range steps {
 | ||
| 		sb.AddStep(step)
 | ||
| 	}
 | ||
| 	return sb
 | ||
| }
 | ||
| 
 | ||
| // Execute 执行Saga
 | ||
| func (sb *SagaBuilder) Execute(ctx context.Context, data interface{}) error {
 | ||
| 	return sb.saga.Execute(ctx, data)
 | ||
| }
 | ||
| 
 | ||
| // GetSaga 获取Saga实例
 | ||
| func (sb *SagaBuilder) GetSaga() *Saga {
 | ||
| 	return sb.saga
 | ||
| }
 | ||
| 
 | ||
| // 便捷函数
 | ||
| 
 | ||
| // CreateSaga 快速创建Saga
 | ||
| func CreateSaga(manager *SagaManager, name string) *SagaBuilder {
 | ||
| 	id := fmt.Sprintf("%s_%d", name, time.Now().Unix())
 | ||
| 	return NewSagaBuilder(manager, id, name)
 | ||
| }
 | ||
| 
 | ||
| // ExecuteSaga 快速执行Saga
 | ||
| func ExecuteSaga(manager *SagaManager, name string, steps []*SagaStep, data interface{}, logger *zap.Logger) error {
 | ||
| 	saga := CreateSaga(manager, name)
 | ||
| 	saga.AddSteps(steps...)
 | ||
| 	
 | ||
| 	logger.Info("开始执行Saga",
 | ||
| 		zap.String("saga_name", name),
 | ||
| 		zap.Int("steps_count", len(steps)))
 | ||
| 	
 | ||
| 	return saga.Execute(context.Background(), data)
 | ||
| }
 |