This commit is contained in:
2025-08-02 02:54:21 +08:00
parent 934dce2776
commit 66845d3fe0
74 changed files with 8686 additions and 212 deletions

View File

@@ -119,36 +119,80 @@ func (bus *MemoryEventBus) Stop(ctx context.Context) error {
// Publish 发布事件(同步)
func (bus *MemoryEventBus) Publish(ctx context.Context, event interfaces.Event) error {
bus.logger.Info("📤 开始发布事件",
zap.String("event_type", event.GetType()),
zap.String("event_id", event.GetID()),
zap.String("aggregate_id", event.GetAggregateID()),
)
bus.mutex.RLock()
handlers := bus.subscribers[event.GetType()]
bus.mutex.RUnlock()
if len(handlers) == 0 {
bus.logger.Debug("No handlers for event type", zap.String("type", event.GetType()))
bus.logger.Warn("⚠️ 没有找到事件处理器",
zap.String("event_type", event.GetType()),
zap.String("event_id", event.GetID()),
)
return nil
}
for _, handler := range handlers {
bus.logger.Info("📋 找到事件处理器",
zap.String("event_type", event.GetType()),
zap.Int("handler_count", len(handlers)),
)
for i, handler := range handlers {
bus.logger.Info("🔄 处理事件",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
zap.Int("handler_index", i),
zap.Bool("is_async", handler.IsAsync()),
)
if handler.IsAsync() {
// 异步处理
select {
case bus.eventQueue <- eventTask{event: event, handler: handler, retries: 0}:
bus.logger.Info("✅ 事件已加入异步队列",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
zap.Int("queue_length", len(bus.eventQueue)),
)
default:
bus.logger.Warn("Event queue is full, dropping event",
zap.String("type", event.GetType()),
zap.String("handler", handler.GetName()))
bus.logger.Error("❌ 事件队列已满,丢弃事件",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
zap.Int("queue_length", len(bus.eventQueue)),
zap.Int("queue_capacity", cap(bus.eventQueue)),
)
}
} else {
// 同步处理
bus.logger.Info("⚡ 开始同步处理事件",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
)
if err := bus.handleEventWithRetry(ctx, event, handler); err != nil {
bus.logger.Error("Failed to handle event synchronously",
zap.String("type", event.GetType()),
zap.String("handler", handler.GetName()),
zap.Error(err))
bus.logger.Error("❌ 同步处理事件失败",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
zap.Error(err),
)
} else {
bus.logger.Info("✅ 同步处理事件成功",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
)
}
}
}
bus.logger.Info("✅ 事件发布完成",
zap.String("event_type", event.GetType()),
zap.String("event_id", event.GetID()),
)
return nil
}
@@ -221,14 +265,19 @@ func (bus *MemoryEventBus) GetSubscribers(eventType string) []interfaces.EventHa
// worker 工作协程
func (bus *MemoryEventBus) worker(id int) {
bus.logger.Debug("Event worker started", zap.Int("worker_id", id))
bus.logger.Info("👷 事件工作协程启动", zap.Int("worker_id", id))
for {
select {
case task := <-bus.eventQueue:
bus.logger.Info("📥 工作协程接收到事件任务",
zap.Int("worker_id", id),
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
)
bus.processEventTask(task)
case <-bus.stopCh:
bus.logger.Debug("Event worker stopped", zap.Int("worker_id", id))
bus.logger.Info("🛑 事件工作协程停止", zap.Int("worker_id", id))
return
}
}
@@ -238,8 +287,20 @@ func (bus *MemoryEventBus) worker(id int) {
func (bus *MemoryEventBus) processEventTask(task eventTask) {
ctx := context.Background()
bus.logger.Info("🔧 开始处理事件任务",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Int("retries", task.retries),
)
err := bus.handleEventWithRetry(ctx, task.event, task.handler)
if err != nil {
bus.logger.Error("❌ 事件任务处理失败",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Error(err),
)
retryConfig := task.handler.GetRetryConfig()
if task.retries < retryConfig.MaxRetries {
@@ -251,26 +312,46 @@ func (bus *MemoryEventBus) processEventTask(task eventTask) {
delay = retryConfig.MaxDelay
}
bus.logger.Info("🔄 准备重试事件任务",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Int("retries", task.retries),
zap.Int("max_retries", retryConfig.MaxRetries),
zap.Duration("delay", delay),
)
go func() {
time.Sleep(delay)
task.retries++
select {
case bus.eventQueue <- task:
bus.logger.Info("✅ 事件任务重新加入队列",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Int("retries", task.retries),
)
default:
bus.logger.Error("Failed to requeue event for retry",
zap.String("type", task.event.GetType()),
zap.String("handler", task.handler.GetName()),
zap.Int("retries", task.retries))
bus.logger.Error("❌ 事件队列已满,无法重新加入任务",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Int("retries", task.retries),
)
}
}()
} else {
bus.logger.Error("Event processing failed after max retries",
zap.String("type", task.event.GetType()),
zap.String("handler", task.handler.GetName()),
bus.logger.Error("💥 事件处理失败,已达到最大重试次数",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Int("retries", task.retries),
zap.Error(err))
zap.Error(err),
)
}
} else {
bus.logger.Info("✅ 事件任务处理成功",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
)
}
}