Files
tyapi-server/internal/shared/events/event_bus.go

314 lines
7.5 KiB
Go

package events
import (
"context"
"fmt"
"sync"
"time"
"go.uber.org/zap"
"tyapi-server/internal/shared/interfaces"
)
// MemoryEventBus 内存事件总线实现
type MemoryEventBus struct {
subscribers map[string][]interfaces.EventHandler
mutex sync.RWMutex
logger *zap.Logger
running bool
stopCh chan struct{}
eventQueue chan eventTask
workerCount int
}
// eventTask 事件任务
type eventTask struct {
event interfaces.Event
handler interfaces.EventHandler
retries int
}
// NewMemoryEventBus 创建内存事件总线
func NewMemoryEventBus(logger *zap.Logger, workerCount int) *MemoryEventBus {
if workerCount <= 0 {
workerCount = 5 // 默认5个工作协程
}
return &MemoryEventBus{
subscribers: make(map[string][]interfaces.EventHandler),
logger: logger,
eventQueue: make(chan eventTask, 1000), // 缓冲1000个事件
workerCount: workerCount,
stopCh: make(chan struct{}),
}
}
// Name 返回服务名称
func (bus *MemoryEventBus) Name() string {
return "memory-event-bus"
}
// Initialize 初始化服务
func (bus *MemoryEventBus) Initialize(ctx context.Context) error {
bus.logger.Info("Memory event bus service initialized")
return nil
}
// HealthCheck 健康检查
func (bus *MemoryEventBus) HealthCheck(ctx context.Context) error {
if !bus.running {
return fmt.Errorf("event bus is not running")
}
return nil
}
// Shutdown 关闭服务
func (bus *MemoryEventBus) Shutdown(ctx context.Context) error {
bus.Stop(ctx)
return nil
}
// Start 启动事件总线
func (bus *MemoryEventBus) Start(ctx context.Context) error {
bus.mutex.Lock()
defer bus.mutex.Unlock()
if bus.running {
return nil
}
bus.running = true
// 启动工作协程
for i := 0; i < bus.workerCount; i++ {
go bus.worker(i)
}
bus.logger.Info("Event bus started", zap.Int("workers", bus.workerCount))
return nil
}
// Stop 停止事件总线
func (bus *MemoryEventBus) Stop(ctx context.Context) error {
bus.mutex.Lock()
defer bus.mutex.Unlock()
if !bus.running {
return nil
}
bus.running = false
close(bus.stopCh)
// 等待所有工作协程结束或超时
done := make(chan struct{})
go func() {
time.Sleep(5 * time.Second) // 给工作协程5秒时间结束
close(done)
}()
select {
case <-done:
case <-ctx.Done():
}
bus.logger.Info("Event bus stopped")
return nil
}
// Publish 发布事件(同步)
func (bus *MemoryEventBus) Publish(ctx context.Context, event interfaces.Event) error {
bus.mutex.RLock()
handlers := bus.subscribers[event.GetType()]
bus.mutex.RUnlock()
if len(handlers) == 0 {
bus.logger.Debug("No handlers for event type", zap.String("type", event.GetType()))
return nil
}
for _, handler := range handlers {
if handler.IsAsync() {
// 异步处理
select {
case bus.eventQueue <- eventTask{event: event, handler: handler, retries: 0}:
default:
bus.logger.Warn("Event queue is full, dropping event",
zap.String("type", event.GetType()),
zap.String("handler", handler.GetName()))
}
} else {
// 同步处理
if err := bus.handleEventWithRetry(ctx, event, handler); err != nil {
bus.logger.Error("Failed to handle event synchronously",
zap.String("type", event.GetType()),
zap.String("handler", handler.GetName()),
zap.Error(err))
}
}
}
return nil
}
// PublishBatch 批量发布事件
func (bus *MemoryEventBus) PublishBatch(ctx context.Context, events []interfaces.Event) error {
for _, event := range events {
if err := bus.Publish(ctx, event); err != nil {
return err
}
}
return nil
}
// Subscribe 订阅事件
func (bus *MemoryEventBus) Subscribe(eventType string, handler interfaces.EventHandler) error {
bus.mutex.Lock()
defer bus.mutex.Unlock()
handlers := bus.subscribers[eventType]
// 检查是否已经订阅
for _, h := range handlers {
if h.GetName() == handler.GetName() {
return fmt.Errorf("handler %s already subscribed to event type %s", handler.GetName(), eventType)
}
}
bus.subscribers[eventType] = append(handlers, handler)
bus.logger.Info("Handler subscribed to event",
zap.String("handler", handler.GetName()),
zap.String("event_type", eventType))
return nil
}
// Unsubscribe 取消订阅
func (bus *MemoryEventBus) Unsubscribe(eventType string, handler interfaces.EventHandler) error {
bus.mutex.Lock()
defer bus.mutex.Unlock()
handlers := bus.subscribers[eventType]
for i, h := range handlers {
if h.GetName() == handler.GetName() {
// 删除处理器
bus.subscribers[eventType] = append(handlers[:i], handlers[i+1:]...)
bus.logger.Info("Handler unsubscribed from event",
zap.String("handler", handler.GetName()),
zap.String("event_type", eventType))
return nil
}
}
return fmt.Errorf("handler %s not found for event type %s", handler.GetName(), eventType)
}
// GetSubscribers 获取订阅者
func (bus *MemoryEventBus) GetSubscribers(eventType string) []interfaces.EventHandler {
bus.mutex.RLock()
defer bus.mutex.RUnlock()
handlers := bus.subscribers[eventType]
result := make([]interfaces.EventHandler, len(handlers))
copy(result, handlers)
return result
}
// worker 工作协程
func (bus *MemoryEventBus) worker(id int) {
bus.logger.Debug("Event worker started", zap.Int("worker_id", id))
for {
select {
case task := <-bus.eventQueue:
bus.processEventTask(task)
case <-bus.stopCh:
bus.logger.Debug("Event worker stopped", zap.Int("worker_id", id))
return
}
}
}
// processEventTask 处理事件任务
func (bus *MemoryEventBus) processEventTask(task eventTask) {
ctx := context.Background()
err := bus.handleEventWithRetry(ctx, task.event, task.handler)
if err != nil {
retryConfig := task.handler.GetRetryConfig()
if task.retries < retryConfig.MaxRetries {
// 重试
delay := time.Duration(float64(retryConfig.RetryDelay) *
(1 + retryConfig.BackoffFactor*float64(task.retries)))
if delay > retryConfig.MaxDelay {
delay = retryConfig.MaxDelay
}
go func() {
time.Sleep(delay)
task.retries++
select {
case bus.eventQueue <- task:
default:
bus.logger.Error("Failed to requeue event for retry",
zap.String("type", task.event.GetType()),
zap.String("handler", task.handler.GetName()),
zap.Int("retries", task.retries))
}
}()
} else {
bus.logger.Error("Event processing failed after max retries",
zap.String("type", task.event.GetType()),
zap.String("handler", task.handler.GetName()),
zap.Int("retries", task.retries),
zap.Error(err))
}
}
}
// handleEventWithRetry 处理事件并支持重试
func (bus *MemoryEventBus) handleEventWithRetry(ctx context.Context, event interfaces.Event, handler interfaces.EventHandler) error {
start := time.Now()
defer func() {
duration := time.Since(start)
bus.logger.Debug("Event handled",
zap.String("type", event.GetType()),
zap.String("handler", handler.GetName()),
zap.Duration("duration", duration))
}()
return handler.Handle(ctx, event)
}
// GetStats 获取事件总线统计信息
func (bus *MemoryEventBus) GetStats() map[string]interface{} {
bus.mutex.RLock()
defer bus.mutex.RUnlock()
stats := map[string]interface{}{
"running": bus.running,
"worker_count": bus.workerCount,
"queue_length": len(bus.eventQueue),
"queue_capacity": cap(bus.eventQueue),
"event_types": len(bus.subscribers),
}
// 各事件类型的订阅者数量
eventTypes := make(map[string]int)
for eventType, handlers := range bus.subscribers {
eventTypes[eventType] = len(handlers)
}
stats["subscribers"] = eventTypes
return stats
}