Files
tyapi-server/internal/shared/events/event_bus.go
2025-08-02 02:54:21 +08:00

395 lines
10 KiB
Go

package events
import (
"context"
"fmt"
"sync"
"time"
"go.uber.org/zap"
"tyapi-server/internal/shared/interfaces"
)
// MemoryEventBus 内存事件总线实现
type MemoryEventBus struct {
subscribers map[string][]interfaces.EventHandler
mutex sync.RWMutex
logger *zap.Logger
running bool
stopCh chan struct{}
eventQueue chan eventTask
workerCount int
}
// eventTask 事件任务
type eventTask struct {
event interfaces.Event
handler interfaces.EventHandler
retries int
}
// NewMemoryEventBus 创建内存事件总线
func NewMemoryEventBus(logger *zap.Logger, workerCount int) *MemoryEventBus {
if workerCount <= 0 {
workerCount = 5 // 默认5个工作协程
}
return &MemoryEventBus{
subscribers: make(map[string][]interfaces.EventHandler),
logger: logger,
eventQueue: make(chan eventTask, 1000), // 缓冲1000个事件
workerCount: workerCount,
stopCh: make(chan struct{}),
}
}
// Name 返回服务名称
func (bus *MemoryEventBus) Name() string {
return "memory-event-bus"
}
// Initialize 初始化服务
func (bus *MemoryEventBus) Initialize(ctx context.Context) error {
bus.logger.Info("Memory event bus service initialized")
return nil
}
// HealthCheck 健康检查
func (bus *MemoryEventBus) HealthCheck(ctx context.Context) error {
if !bus.running {
return fmt.Errorf("event bus is not running")
}
return nil
}
// Shutdown 关闭服务
func (bus *MemoryEventBus) Shutdown(ctx context.Context) error {
bus.Stop(ctx)
return nil
}
// Start 启动事件总线
func (bus *MemoryEventBus) Start(ctx context.Context) error {
bus.mutex.Lock()
defer bus.mutex.Unlock()
if bus.running {
return nil
}
bus.running = true
// 启动工作协程
for i := 0; i < bus.workerCount; i++ {
go bus.worker(i)
}
bus.logger.Info("Event bus started", zap.Int("workers", bus.workerCount))
return nil
}
// Stop 停止事件总线
func (bus *MemoryEventBus) Stop(ctx context.Context) error {
bus.mutex.Lock()
defer bus.mutex.Unlock()
if !bus.running {
return nil
}
bus.running = false
close(bus.stopCh)
// 等待所有工作协程结束或超时
done := make(chan struct{})
go func() {
time.Sleep(5 * time.Second) // 给工作协程5秒时间结束
close(done)
}()
select {
case <-done:
case <-ctx.Done():
}
bus.logger.Info("Event bus stopped")
return nil
}
// Publish 发布事件(同步)
func (bus *MemoryEventBus) Publish(ctx context.Context, event interfaces.Event) error {
bus.logger.Info("📤 开始发布事件",
zap.String("event_type", event.GetType()),
zap.String("event_id", event.GetID()),
zap.String("aggregate_id", event.GetAggregateID()),
)
bus.mutex.RLock()
handlers := bus.subscribers[event.GetType()]
bus.mutex.RUnlock()
if len(handlers) == 0 {
bus.logger.Warn("⚠️ 没有找到事件处理器",
zap.String("event_type", event.GetType()),
zap.String("event_id", event.GetID()),
)
return nil
}
bus.logger.Info("📋 找到事件处理器",
zap.String("event_type", event.GetType()),
zap.Int("handler_count", len(handlers)),
)
for i, handler := range handlers {
bus.logger.Info("🔄 处理事件",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
zap.Int("handler_index", i),
zap.Bool("is_async", handler.IsAsync()),
)
if handler.IsAsync() {
// 异步处理
select {
case bus.eventQueue <- eventTask{event: event, handler: handler, retries: 0}:
bus.logger.Info("✅ 事件已加入异步队列",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
zap.Int("queue_length", len(bus.eventQueue)),
)
default:
bus.logger.Error("❌ 事件队列已满,丢弃事件",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
zap.Int("queue_length", len(bus.eventQueue)),
zap.Int("queue_capacity", cap(bus.eventQueue)),
)
}
} else {
// 同步处理
bus.logger.Info("⚡ 开始同步处理事件",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
)
if err := bus.handleEventWithRetry(ctx, event, handler); err != nil {
bus.logger.Error("❌ 同步处理事件失败",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
zap.Error(err),
)
} else {
bus.logger.Info("✅ 同步处理事件成功",
zap.String("event_type", event.GetType()),
zap.String("handler_name", handler.GetName()),
)
}
}
}
bus.logger.Info("✅ 事件发布完成",
zap.String("event_type", event.GetType()),
zap.String("event_id", event.GetID()),
)
return nil
}
// PublishBatch 批量发布事件
func (bus *MemoryEventBus) PublishBatch(ctx context.Context, events []interfaces.Event) error {
for _, event := range events {
if err := bus.Publish(ctx, event); err != nil {
return err
}
}
return nil
}
// Subscribe 订阅事件
func (bus *MemoryEventBus) Subscribe(eventType string, handler interfaces.EventHandler) error {
bus.mutex.Lock()
defer bus.mutex.Unlock()
handlers := bus.subscribers[eventType]
// 检查是否已经订阅
for _, h := range handlers {
if h.GetName() == handler.GetName() {
return fmt.Errorf("handler %s already subscribed to event type %s", handler.GetName(), eventType)
}
}
bus.subscribers[eventType] = append(handlers, handler)
bus.logger.Info("Handler subscribed to event",
zap.String("handler", handler.GetName()),
zap.String("event_type", eventType))
return nil
}
// Unsubscribe 取消订阅
func (bus *MemoryEventBus) Unsubscribe(eventType string, handler interfaces.EventHandler) error {
bus.mutex.Lock()
defer bus.mutex.Unlock()
handlers := bus.subscribers[eventType]
for i, h := range handlers {
if h.GetName() == handler.GetName() {
// 删除处理器
bus.subscribers[eventType] = append(handlers[:i], handlers[i+1:]...)
bus.logger.Info("Handler unsubscribed from event",
zap.String("handler", handler.GetName()),
zap.String("event_type", eventType))
return nil
}
}
return fmt.Errorf("handler %s not found for event type %s", handler.GetName(), eventType)
}
// GetSubscribers 获取订阅者
func (bus *MemoryEventBus) GetSubscribers(eventType string) []interfaces.EventHandler {
bus.mutex.RLock()
defer bus.mutex.RUnlock()
handlers := bus.subscribers[eventType]
result := make([]interfaces.EventHandler, len(handlers))
copy(result, handlers)
return result
}
// worker 工作协程
func (bus *MemoryEventBus) worker(id int) {
bus.logger.Info("👷 事件工作协程启动", zap.Int("worker_id", id))
for {
select {
case task := <-bus.eventQueue:
bus.logger.Info("📥 工作协程接收到事件任务",
zap.Int("worker_id", id),
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
)
bus.processEventTask(task)
case <-bus.stopCh:
bus.logger.Info("🛑 事件工作协程停止", zap.Int("worker_id", id))
return
}
}
}
// processEventTask 处理事件任务
func (bus *MemoryEventBus) processEventTask(task eventTask) {
ctx := context.Background()
bus.logger.Info("🔧 开始处理事件任务",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Int("retries", task.retries),
)
err := bus.handleEventWithRetry(ctx, task.event, task.handler)
if err != nil {
bus.logger.Error("❌ 事件任务处理失败",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Error(err),
)
retryConfig := task.handler.GetRetryConfig()
if task.retries < retryConfig.MaxRetries {
// 重试
delay := time.Duration(float64(retryConfig.RetryDelay) *
(1 + retryConfig.BackoffFactor*float64(task.retries)))
if delay > retryConfig.MaxDelay {
delay = retryConfig.MaxDelay
}
bus.logger.Info("🔄 准备重试事件任务",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Int("retries", task.retries),
zap.Int("max_retries", retryConfig.MaxRetries),
zap.Duration("delay", delay),
)
go func() {
time.Sleep(delay)
task.retries++
select {
case bus.eventQueue <- task:
bus.logger.Info("✅ 事件任务重新加入队列",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Int("retries", task.retries),
)
default:
bus.logger.Error("❌ 事件队列已满,无法重新加入任务",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Int("retries", task.retries),
)
}
}()
} else {
bus.logger.Error("💥 事件处理失败,已达到最大重试次数",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
zap.Int("retries", task.retries),
zap.Error(err),
)
}
} else {
bus.logger.Info("✅ 事件任务处理成功",
zap.String("event_type", task.event.GetType()),
zap.String("handler_name", task.handler.GetName()),
)
}
}
// handleEventWithRetry 处理事件并支持重试
func (bus *MemoryEventBus) handleEventWithRetry(ctx context.Context, event interfaces.Event, handler interfaces.EventHandler) error {
start := time.Now()
defer func() {
duration := time.Since(start)
bus.logger.Debug("Event handled",
zap.String("type", event.GetType()),
zap.String("handler", handler.GetName()),
zap.Duration("duration", duration))
}()
return handler.Handle(ctx, event)
}
// GetStats 获取事件总线统计信息
func (bus *MemoryEventBus) GetStats() map[string]interface{} {
bus.mutex.RLock()
defer bus.mutex.RUnlock()
stats := map[string]interface{}{
"running": bus.running,
"worker_count": bus.workerCount,
"queue_length": len(bus.eventQueue),
"queue_capacity": cap(bus.eventQueue),
"event_types": len(bus.subscribers),
}
// 各事件类型的订阅者数量
eventTypes := make(map[string]int)
for eventType, handlers := range bus.subscribers {
eventTypes[eventType] = len(handlers)
}
stats["subscribers"] = eventTypes
return stats
}