package events import ( "context" "fmt" "sync" "time" "go.uber.org/zap" "tyapi-server/internal/shared/interfaces" ) // MemoryEventBus 内存事件总线实现 type MemoryEventBus struct { subscribers map[string][]interfaces.EventHandler mutex sync.RWMutex logger *zap.Logger running bool stopCh chan struct{} eventQueue chan eventTask workerCount int } // eventTask 事件任务 type eventTask struct { event interfaces.Event handler interfaces.EventHandler retries int } // NewMemoryEventBus 创建内存事件总线 func NewMemoryEventBus(logger *zap.Logger, workerCount int) *MemoryEventBus { if workerCount <= 0 { workerCount = 5 // 默认5个工作协程 } return &MemoryEventBus{ subscribers: make(map[string][]interfaces.EventHandler), logger: logger, eventQueue: make(chan eventTask, 1000), // 缓冲1000个事件 workerCount: workerCount, stopCh: make(chan struct{}), } } // Name 返回服务名称 func (bus *MemoryEventBus) Name() string { return "memory-event-bus" } // Initialize 初始化服务 func (bus *MemoryEventBus) Initialize(ctx context.Context) error { bus.logger.Info("Memory event bus service initialized") return nil } // HealthCheck 健康检查 func (bus *MemoryEventBus) HealthCheck(ctx context.Context) error { if !bus.running { return fmt.Errorf("event bus is not running") } return nil } // Shutdown 关闭服务 func (bus *MemoryEventBus) Shutdown(ctx context.Context) error { bus.Stop(ctx) return nil } // Start 启动事件总线 func (bus *MemoryEventBus) Start(ctx context.Context) error { bus.mutex.Lock() defer bus.mutex.Unlock() if bus.running { return nil } bus.running = true // 启动工作协程 for i := 0; i < bus.workerCount; i++ { go bus.worker(i) } bus.logger.Info("Event bus started", zap.Int("workers", bus.workerCount)) return nil } // Stop 停止事件总线 func (bus *MemoryEventBus) Stop(ctx context.Context) error { bus.mutex.Lock() defer bus.mutex.Unlock() if !bus.running { return nil } bus.running = false close(bus.stopCh) // 等待所有工作协程结束或超时 done := make(chan struct{}) go func() { time.Sleep(5 * time.Second) // 给工作协程5秒时间结束 close(done) }() select { case <-done: case <-ctx.Done(): } bus.logger.Info("Event bus stopped") return nil } // Publish 发布事件(同步) func (bus *MemoryEventBus) Publish(ctx context.Context, event interfaces.Event) error { bus.logger.Info("📤 开始发布事件", zap.String("event_type", event.GetType()), zap.String("event_id", event.GetID()), zap.String("aggregate_id", event.GetAggregateID()), ) bus.mutex.RLock() handlers := bus.subscribers[event.GetType()] bus.mutex.RUnlock() if len(handlers) == 0 { bus.logger.Warn("⚠️ 没有找到事件处理器", zap.String("event_type", event.GetType()), zap.String("event_id", event.GetID()), ) return nil } bus.logger.Info("📋 找到事件处理器", zap.String("event_type", event.GetType()), zap.Int("handler_count", len(handlers)), ) for i, handler := range handlers { bus.logger.Info("🔄 处理事件", zap.String("event_type", event.GetType()), zap.String("handler_name", handler.GetName()), zap.Int("handler_index", i), zap.Bool("is_async", handler.IsAsync()), ) if handler.IsAsync() { // 异步处理 select { case bus.eventQueue <- eventTask{event: event, handler: handler, retries: 0}: bus.logger.Info("✅ 事件已加入异步队列", zap.String("event_type", event.GetType()), zap.String("handler_name", handler.GetName()), zap.Int("queue_length", len(bus.eventQueue)), ) default: bus.logger.Error("❌ 事件队列已满,丢弃事件", zap.String("event_type", event.GetType()), zap.String("handler_name", handler.GetName()), zap.Int("queue_length", len(bus.eventQueue)), zap.Int("queue_capacity", cap(bus.eventQueue)), ) } } else { // 同步处理 bus.logger.Info("⚡ 开始同步处理事件", zap.String("event_type", event.GetType()), zap.String("handler_name", handler.GetName()), ) if err := bus.handleEventWithRetry(ctx, event, handler); err != nil { bus.logger.Error("❌ 同步处理事件失败", zap.String("event_type", event.GetType()), zap.String("handler_name", handler.GetName()), zap.Error(err), ) } else { bus.logger.Info("✅ 同步处理事件成功", zap.String("event_type", event.GetType()), zap.String("handler_name", handler.GetName()), ) } } } bus.logger.Info("✅ 事件发布完成", zap.String("event_type", event.GetType()), zap.String("event_id", event.GetID()), ) return nil } // PublishBatch 批量发布事件 func (bus *MemoryEventBus) PublishBatch(ctx context.Context, events []interfaces.Event) error { for _, event := range events { if err := bus.Publish(ctx, event); err != nil { return err } } return nil } // Subscribe 订阅事件 func (bus *MemoryEventBus) Subscribe(eventType string, handler interfaces.EventHandler) error { bus.mutex.Lock() defer bus.mutex.Unlock() handlers := bus.subscribers[eventType] // 检查是否已经订阅 for _, h := range handlers { if h.GetName() == handler.GetName() { return fmt.Errorf("handler %s already subscribed to event type %s", handler.GetName(), eventType) } } bus.subscribers[eventType] = append(handlers, handler) bus.logger.Info("Handler subscribed to event", zap.String("handler", handler.GetName()), zap.String("event_type", eventType)) return nil } // Unsubscribe 取消订阅 func (bus *MemoryEventBus) Unsubscribe(eventType string, handler interfaces.EventHandler) error { bus.mutex.Lock() defer bus.mutex.Unlock() handlers := bus.subscribers[eventType] for i, h := range handlers { if h.GetName() == handler.GetName() { // 删除处理器 bus.subscribers[eventType] = append(handlers[:i], handlers[i+1:]...) bus.logger.Info("Handler unsubscribed from event", zap.String("handler", handler.GetName()), zap.String("event_type", eventType)) return nil } } return fmt.Errorf("handler %s not found for event type %s", handler.GetName(), eventType) } // GetSubscribers 获取订阅者 func (bus *MemoryEventBus) GetSubscribers(eventType string) []interfaces.EventHandler { bus.mutex.RLock() defer bus.mutex.RUnlock() handlers := bus.subscribers[eventType] result := make([]interfaces.EventHandler, len(handlers)) copy(result, handlers) return result } // worker 工作协程 func (bus *MemoryEventBus) worker(id int) { bus.logger.Info("👷 事件工作协程启动", zap.Int("worker_id", id)) for { select { case task := <-bus.eventQueue: bus.logger.Info("📥 工作协程接收到事件任务", zap.Int("worker_id", id), zap.String("event_type", task.event.GetType()), zap.String("handler_name", task.handler.GetName()), ) bus.processEventTask(task) case <-bus.stopCh: bus.logger.Info("🛑 事件工作协程停止", zap.Int("worker_id", id)) return } } } // processEventTask 处理事件任务 func (bus *MemoryEventBus) processEventTask(task eventTask) { ctx := context.Background() bus.logger.Info("🔧 开始处理事件任务", zap.String("event_type", task.event.GetType()), zap.String("handler_name", task.handler.GetName()), zap.Int("retries", task.retries), ) err := bus.handleEventWithRetry(ctx, task.event, task.handler) if err != nil { bus.logger.Error("❌ 事件任务处理失败", zap.String("event_type", task.event.GetType()), zap.String("handler_name", task.handler.GetName()), zap.Error(err), ) retryConfig := task.handler.GetRetryConfig() if task.retries < retryConfig.MaxRetries { // 重试 delay := time.Duration(float64(retryConfig.RetryDelay) * (1 + retryConfig.BackoffFactor*float64(task.retries))) if delay > retryConfig.MaxDelay { delay = retryConfig.MaxDelay } bus.logger.Info("🔄 准备重试事件任务", zap.String("event_type", task.event.GetType()), zap.String("handler_name", task.handler.GetName()), zap.Int("retries", task.retries), zap.Int("max_retries", retryConfig.MaxRetries), zap.Duration("delay", delay), ) go func() { time.Sleep(delay) task.retries++ select { case bus.eventQueue <- task: bus.logger.Info("✅ 事件任务重新加入队列", zap.String("event_type", task.event.GetType()), zap.String("handler_name", task.handler.GetName()), zap.Int("retries", task.retries), ) default: bus.logger.Error("❌ 事件队列已满,无法重新加入任务", zap.String("event_type", task.event.GetType()), zap.String("handler_name", task.handler.GetName()), zap.Int("retries", task.retries), ) } }() } else { bus.logger.Error("💥 事件处理失败,已达到最大重试次数", zap.String("event_type", task.event.GetType()), zap.String("handler_name", task.handler.GetName()), zap.Int("retries", task.retries), zap.Error(err), ) } } else { bus.logger.Info("✅ 事件任务处理成功", zap.String("event_type", task.event.GetType()), zap.String("handler_name", task.handler.GetName()), ) } } // handleEventWithRetry 处理事件并支持重试 func (bus *MemoryEventBus) handleEventWithRetry(ctx context.Context, event interfaces.Event, handler interfaces.EventHandler) error { start := time.Now() defer func() { duration := time.Since(start) bus.logger.Debug("Event handled", zap.String("type", event.GetType()), zap.String("handler", handler.GetName()), zap.Duration("duration", duration)) }() return handler.Handle(ctx, event) } // GetStats 获取事件总线统计信息 func (bus *MemoryEventBus) GetStats() map[string]interface{} { bus.mutex.RLock() defer bus.mutex.RUnlock() stats := map[string]interface{}{ "running": bus.running, "worker_count": bus.workerCount, "queue_length": len(bus.eventQueue), "queue_capacity": cap(bus.eventQueue), "event_types": len(bus.subscribers), } // 各事件类型的订阅者数量 eventTypes := make(map[string]int) for eventType, handlers := range bus.subscribers { eventTypes[eventType] = len(handlers) } stats["subscribers"] = eventTypes return stats }