Files
tyapi-server/internal/shared/saga/saga.go
2025-07-20 20:53:26 +08:00

731 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package saga
import (
"context"
"fmt"
"sync"
"time"
"go.uber.org/zap"
"tyapi-server/internal/shared/interfaces"
)
// SagaStatus Saga状态
type SagaStatus int
const (
// StatusPending 等待中
StatusPending SagaStatus = iota
// StatusRunning 执行中
StatusRunning
// StatusCompleted 已完成
StatusCompleted
// StatusFailed 失败
StatusFailed
// StatusCompensating 补偿中
StatusCompensating
// StatusCompensated 已补偿
StatusCompensated
// StatusAborted 已中止
StatusAborted
)
func (s SagaStatus) String() string {
switch s {
case StatusPending:
return "PENDING"
case StatusRunning:
return "RUNNING"
case StatusCompleted:
return "COMPLETED"
case StatusFailed:
return "FAILED"
case StatusCompensating:
return "COMPENSATING"
case StatusCompensated:
return "COMPENSATED"
case StatusAborted:
return "ABORTED"
default:
return "UNKNOWN"
}
}
// StepStatus 步骤状态
type StepStatus int
const (
// StepPending 等待执行
StepPending StepStatus = iota
// StepRunning 执行中
StepRunning
// StepCompleted 完成
StepCompleted
// StepFailed 失败
StepFailed
// StepCompensated 已补偿
StepCompensated
// StepSkipped 跳过
StepSkipped
)
func (s StepStatus) String() string {
switch s {
case StepPending:
return "PENDING"
case StepRunning:
return "RUNNING"
case StepCompleted:
return "COMPLETED"
case StepFailed:
return "FAILED"
case StepCompensated:
return "COMPENSATED"
case StepSkipped:
return "SKIPPED"
default:
return "UNKNOWN"
}
}
// SagaStep Saga步骤
type SagaStep struct {
Name string
Action func(ctx context.Context, data interface{}) error
Compensate func(ctx context.Context, data interface{}) error
Status StepStatus
Error error
StartTime time.Time
EndTime time.Time
RetryCount int
MaxRetries int
Timeout time.Duration
}
// SagaConfig Saga配置
type SagaConfig struct {
// 默认超时时间
DefaultTimeout time.Duration
// 默认重试次数
DefaultMaxRetries int
// 是否并行执行(当前只支持串行)
Parallel bool
// 事件发布器
EventBus interfaces.EventBus
}
// DefaultSagaConfig 默认Saga配置
func DefaultSagaConfig() SagaConfig {
return SagaConfig{
DefaultTimeout: 30 * time.Second,
DefaultMaxRetries: 3,
Parallel: false,
}
}
// Saga 分布式事务
type Saga struct {
ID string
Name string
Steps []*SagaStep
Status SagaStatus
Data interface{}
StartTime time.Time
EndTime time.Time
Error error
Config SagaConfig
logger *zap.Logger
mutex sync.RWMutex
currentStep int
result interface{}
}
// NewSaga 创建新的Saga
func NewSaga(id, name string, config SagaConfig, logger *zap.Logger) *Saga {
return &Saga{
ID: id,
Name: name,
Steps: make([]*SagaStep, 0),
Status: StatusPending,
Config: config,
logger: logger,
currentStep: -1,
}
}
// AddStep 添加步骤
func (s *Saga) AddStep(name string, action, compensate func(ctx context.Context, data interface{}) error) *Saga {
step := &SagaStep{
Name: name,
Action: action,
Compensate: compensate,
Status: StepPending,
MaxRetries: s.Config.DefaultMaxRetries,
Timeout: s.Config.DefaultTimeout,
}
s.mutex.Lock()
s.Steps = append(s.Steps, step)
s.mutex.Unlock()
s.logger.Debug("Added step to saga",
zap.String("saga_id", s.ID),
zap.String("step_name", name))
return s
}
// AddStepWithConfig 添加带配置的步骤
func (s *Saga) AddStepWithConfig(name string, action, compensate func(ctx context.Context, data interface{}) error, maxRetries int, timeout time.Duration) *Saga {
step := &SagaStep{
Name: name,
Action: action,
Compensate: compensate,
Status: StepPending,
MaxRetries: maxRetries,
Timeout: timeout,
}
s.mutex.Lock()
s.Steps = append(s.Steps, step)
s.mutex.Unlock()
s.logger.Debug("Added step with config to saga",
zap.String("saga_id", s.ID),
zap.String("step_name", name),
zap.Int("max_retries", maxRetries),
zap.Duration("timeout", timeout))
return s
}
// Execute 执行Saga
func (s *Saga) Execute(ctx context.Context, data interface{}) error {
s.mutex.Lock()
if s.Status != StatusPending {
s.mutex.Unlock()
return fmt.Errorf("saga %s is not in pending status", s.ID)
}
s.Status = StatusRunning
s.Data = data
s.StartTime = time.Now()
s.mutex.Unlock()
s.logger.Info("Starting saga execution",
zap.String("saga_id", s.ID),
zap.String("saga_name", s.Name),
zap.Int("total_steps", len(s.Steps)))
// 发布Saga开始事件
s.publishEvent(ctx, "saga.started")
// 执行所有步骤
for i, step := range s.Steps {
s.mutex.Lock()
s.currentStep = i
s.mutex.Unlock()
if err := s.executeStep(ctx, step, data); err != nil {
s.logger.Error("Step execution failed",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name),
zap.Error(err))
// 执行补偿
if compensateErr := s.compensate(ctx, i-1); compensateErr != nil {
s.logger.Error("Compensation failed",
zap.String("saga_id", s.ID),
zap.Error(compensateErr))
s.setStatus(StatusAborted)
s.publishEvent(ctx, "saga.aborted")
return fmt.Errorf("saga execution failed and compensation failed: %w", compensateErr)
}
s.setStatus(StatusCompensated)
s.publishEvent(ctx, "saga.compensated")
return fmt.Errorf("saga execution failed: %w", err)
}
}
// 所有步骤成功完成
s.setStatus(StatusCompleted)
s.EndTime = time.Now()
s.logger.Info("Saga completed successfully",
zap.String("saga_id", s.ID),
zap.Duration("duration", s.EndTime.Sub(s.StartTime)))
s.publishEvent(ctx, "saga.completed")
return nil
}
// executeStep 执行单个步骤
func (s *Saga) executeStep(ctx context.Context, step *SagaStep, data interface{}) error {
step.Status = StepRunning
step.StartTime = time.Now()
s.logger.Debug("Executing step",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name))
// 设置超时上下文
stepCtx, cancel := context.WithTimeout(ctx, step.Timeout)
defer cancel()
// 重试逻辑
var lastErr error
for attempt := 0; attempt <= step.MaxRetries; attempt++ {
if attempt > 0 {
s.logger.Debug("Retrying step",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name),
zap.Int("attempt", attempt))
}
err := step.Action(stepCtx, data)
if err == nil {
step.Status = StepCompleted
step.EndTime = time.Now()
s.logger.Debug("Step completed successfully",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name),
zap.Duration("duration", step.EndTime.Sub(step.StartTime)))
return nil
}
lastErr = err
step.RetryCount = attempt
// 检查是否应该重试
if attempt < step.MaxRetries {
select {
case <-stepCtx.Done():
// 上下文被取消,停止重试
break
case <-time.After(time.Duration(attempt+1) * 100 * time.Millisecond):
// 等待一段时间后重试
}
}
}
// 所有重试都失败了
step.Status = StepFailed
step.Error = lastErr
step.EndTime = time.Now()
return lastErr
}
// compensate 执行补偿
func (s *Saga) compensate(ctx context.Context, fromStep int) error {
s.setStatus(StatusCompensating)
s.logger.Info("Starting compensation",
zap.String("saga_id", s.ID),
zap.Int("from_step", fromStep))
// 逆序执行补偿
for i := fromStep; i >= 0; i-- {
step := s.Steps[i]
// 只补偿已完成的步骤
if step.Status != StepCompleted {
step.Status = StepSkipped
continue
}
if step.Compensate == nil {
s.logger.Warn("No compensation function for step",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name))
continue
}
s.logger.Debug("Compensating step",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name))
// 设置超时上下文
compensateCtx, cancel := context.WithTimeout(ctx, step.Timeout)
err := step.Compensate(compensateCtx, s.Data)
cancel()
if err != nil {
s.logger.Error("Compensation failed for step",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name),
zap.Error(err))
return err
}
step.Status = StepCompensated
s.logger.Debug("Step compensated successfully",
zap.String("saga_id", s.ID),
zap.String("step_name", step.Name))
}
s.logger.Info("Compensation completed",
zap.String("saga_id", s.ID))
return nil
}
// setStatus 设置状态
func (s *Saga) setStatus(status SagaStatus) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.Status = status
}
// GetStatus 获取状态
func (s *Saga) GetStatus() SagaStatus {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.Status
}
// GetProgress 获取进度
func (s *Saga) GetProgress() SagaProgress {
s.mutex.RLock()
defer s.mutex.RUnlock()
completed := 0
for _, step := range s.Steps {
if step.Status == StepCompleted {
completed++
}
}
var percentage float64
if len(s.Steps) > 0 {
percentage = float64(completed) / float64(len(s.Steps)) * 100
}
return SagaProgress{
SagaID: s.ID,
Status: s.Status.String(),
TotalSteps: len(s.Steps),
CompletedSteps: completed,
CurrentStep: s.currentStep + 1,
PercentComplete: percentage,
StartTime: s.StartTime,
Duration: time.Since(s.StartTime),
}
}
// GetStepStatus 获取所有步骤状态
func (s *Saga) GetStepStatus() []StepProgress {
s.mutex.RLock()
defer s.mutex.RUnlock()
progress := make([]StepProgress, len(s.Steps))
for i, step := range s.Steps {
progress[i] = StepProgress{
Name: step.Name,
Status: step.Status.String(),
RetryCount: step.RetryCount,
StartTime: step.StartTime,
EndTime: step.EndTime,
Duration: step.EndTime.Sub(step.StartTime),
Error: "",
}
if step.Error != nil {
progress[i].Error = step.Error.Error()
}
}
return progress
}
// publishEvent 发布事件
func (s *Saga) publishEvent(ctx context.Context, eventType string) {
if s.Config.EventBus == nil {
return
}
event := &SagaEvent{
SagaID: s.ID,
SagaName: s.Name,
EventType: eventType,
Status: s.Status.String(),
Timestamp: time.Now(),
Data: s.Data,
}
// 这里应该实现Event接口简化处理
_ = event
}
// SagaProgress Saga进度
type SagaProgress struct {
SagaID string `json:"saga_id"`
Status string `json:"status"`
TotalSteps int `json:"total_steps"`
CompletedSteps int `json:"completed_steps"`
CurrentStep int `json:"current_step"`
PercentComplete float64 `json:"percent_complete"`
StartTime time.Time `json:"start_time"`
Duration time.Duration `json:"duration"`
}
// StepProgress 步骤进度
type StepProgress struct {
Name string `json:"name"`
Status string `json:"status"`
RetryCount int `json:"retry_count"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
Error string `json:"error,omitempty"`
}
// SagaEvent Saga事件
type SagaEvent struct {
SagaID string `json:"saga_id"`
SagaName string `json:"saga_name"`
EventType string `json:"event_type"`
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
Data interface{} `json:"data,omitempty"`
}
// SagaManager Saga管理器
type SagaManager struct {
sagas map[string]*Saga
logger *zap.Logger
mutex sync.RWMutex
config SagaConfig
}
// NewSagaManager 创建Saga管理器
func NewSagaManager(config SagaConfig, logger *zap.Logger) *SagaManager {
return &SagaManager{
sagas: make(map[string]*Saga),
logger: logger,
config: config,
}
}
// CreateSaga 创建Saga
func (sm *SagaManager) CreateSaga(id, name string) *Saga {
saga := NewSaga(id, name, sm.config, sm.logger.Named("saga"))
sm.mutex.Lock()
sm.sagas[id] = saga
sm.mutex.Unlock()
sm.logger.Info("Created saga",
zap.String("saga_id", id),
zap.String("saga_name", name))
return saga
}
// GetSaga 获取Saga
func (sm *SagaManager) GetSaga(id string) (*Saga, bool) {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
saga, exists := sm.sagas[id]
return saga, exists
}
// ListSagas 列出所有Saga
func (sm *SagaManager) ListSagas() []*Saga {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
sagas := make([]*Saga, 0, len(sm.sagas))
for _, saga := range sm.sagas {
sagas = append(sagas, saga)
}
return sagas
}
// GetSagaProgress 获取Saga进度
func (sm *SagaManager) GetSagaProgress(id string) (SagaProgress, bool) {
saga, exists := sm.GetSaga(id)
if !exists {
return SagaProgress{}, false
}
return saga.GetProgress(), true
}
// RemoveSaga 移除Saga
func (sm *SagaManager) RemoveSaga(id string) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
delete(sm.sagas, id)
sm.logger.Debug("Removed saga", zap.String("saga_id", id))
}
// GetStats 获取统计信息
func (sm *SagaManager) GetStats() map[string]interface{} {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
statusCount := make(map[string]int)
for _, saga := range sm.sagas {
status := saga.GetStatus().String()
statusCount[status]++
}
return map[string]interface{}{
"total_sagas": len(sm.sagas),
"status_count": statusCount,
}
}
// 实现Service接口
// Name 返回服务名称
func (sm *SagaManager) Name() string {
return "saga-manager"
}
// Initialize 初始化服务
func (sm *SagaManager) Initialize(ctx context.Context) error {
sm.logger.Info("Saga manager service initialized")
return nil
}
// HealthCheck 健康检查
func (sm *SagaManager) HealthCheck(ctx context.Context) error {
return nil
}
// Shutdown 关闭服务
func (sm *SagaManager) Shutdown(ctx context.Context) error {
sm.logger.Info("Saga manager service shutdown")
return nil
}
// ==================== Saga构建器 ====================
// StepBuilder Saga步骤构建器
type StepBuilder struct {
name string
action func(ctx context.Context, data interface{}) error
compensate func(ctx context.Context, data interface{}) error
timeout time.Duration
maxRetries int
}
// Step 创建步骤构建器
func Step(name string) *StepBuilder {
return &StepBuilder{
name: name,
timeout: 30 * time.Second,
maxRetries: 3,
}
}
// Action 设置正向操作
func (sb *StepBuilder) Action(action func(ctx context.Context, data interface{}) error) *StepBuilder {
sb.action = action
return sb
}
// Compensate 设置补偿操作
func (sb *StepBuilder) Compensate(compensate func(ctx context.Context, data interface{}) error) *StepBuilder {
sb.compensate = compensate
return sb
}
// Timeout 设置超时时间
func (sb *StepBuilder) Timeout(timeout time.Duration) *StepBuilder {
sb.timeout = timeout
return sb
}
// MaxRetries 设置最大重试次数
func (sb *StepBuilder) MaxRetries(maxRetries int) *StepBuilder {
sb.maxRetries = maxRetries
return sb
}
// Build 构建Saga步骤
func (sb *StepBuilder) Build() *SagaStep {
return &SagaStep{
Name: sb.name,
Action: sb.action,
Compensate: sb.compensate,
Status: StepPending,
MaxRetries: sb.maxRetries,
Timeout: sb.timeout,
}
}
// SagaBuilder Saga构建器
type SagaBuilder struct {
manager *SagaManager
saga *Saga
steps []*SagaStep
}
// NewSagaBuilder 创建Saga构建器
func NewSagaBuilder(manager *SagaManager, id, name string) *SagaBuilder {
saga := manager.CreateSaga(id, name)
return &SagaBuilder{
manager: manager,
saga: saga,
steps: make([]*SagaStep, 0),
}
}
// AddStep 添加步骤
func (sb *SagaBuilder) AddStep(step *SagaStep) *SagaBuilder {
sb.steps = append(sb.steps, step)
sb.saga.AddStepWithConfig(step.Name, step.Action, step.Compensate, step.MaxRetries, step.Timeout)
return sb
}
// AddSteps 批量添加步骤
func (sb *SagaBuilder) AddSteps(steps ...*SagaStep) *SagaBuilder {
for _, step := range steps {
sb.AddStep(step)
}
return sb
}
// Execute 执行Saga
func (sb *SagaBuilder) Execute(ctx context.Context, data interface{}) error {
return sb.saga.Execute(ctx, data)
}
// GetSaga 获取Saga实例
func (sb *SagaBuilder) GetSaga() *Saga {
return sb.saga
}
// 便捷函数
// CreateSaga 快速创建Saga
func CreateSaga(manager *SagaManager, name string) *SagaBuilder {
id := fmt.Sprintf("%s_%d", name, time.Now().Unix())
return NewSagaBuilder(manager, id, name)
}
// ExecuteSaga 快速执行Saga
func ExecuteSaga(manager *SagaManager, name string, steps []*SagaStep, data interface{}, logger *zap.Logger) error {
saga := CreateSaga(manager, name)
saga.AddSteps(steps...)
logger.Info("开始执行Saga",
zap.String("saga_name", name),
zap.Int("steps_count", len(steps)))
return saga.Execute(context.Background(), data)
}