This commit is contained in:
2025-12-06 13:56:00 +08:00
parent 05b6623e75
commit 89367fb2ee
22 changed files with 2186 additions and 74 deletions

View File

@@ -29,6 +29,7 @@ func NewAsynqWorker(
redisAddr string,
logger *zap.Logger,
articleApplicationService article.ArticleApplicationService,
announcementApplicationService article.AnnouncementApplicationService,
apiApplicationService api.ApiApplicationService,
walletService finance_services.WalletAggregateService,
subscriptionService *product_services.ProductSubscriptionService,
@@ -39,15 +40,16 @@ func NewAsynqWorker(
asynq.Config{
Concurrency: 6, // 降低总并发数
Queues: map[string]int{
"default": 2, // 2个goroutine
"api": 3, // 3个goroutine (扣款任务)
"article": 1, // 1个goroutine
"default": 2, // 2个goroutine
"api": 3, // 3个goroutine (扣款任务)
"article": 1, // 1个goroutine
"announcement": 1, // 1个goroutine
},
},
)
// 创建任务处理器
articleHandler := handlers.NewArticleTaskHandler(logger, articleApplicationService, asyncTaskRepo)
articleHandler := handlers.NewArticleTaskHandler(logger, articleApplicationService, announcementApplicationService, asyncTaskRepo)
apiHandler := handlers.NewApiTaskHandler(logger, apiApplicationService, walletService, subscriptionService, asyncTaskRepo)
// 创建ServeMux
@@ -105,6 +107,9 @@ func (w *AsynqWorker) registerAllHandlers() {
w.mux.HandleFunc(string(types.TaskTypeArticleCancel), w.articleHandler.HandleArticleCancel)
w.mux.HandleFunc(string(types.TaskTypeArticleModify), w.articleHandler.HandleArticleModify)
// 注册公告任务处理器
w.mux.HandleFunc(string(types.TaskTypeAnnouncementPublish), w.articleHandler.HandleAnnouncementPublish)
// 注册API任务处理器
w.mux.HandleFunc(string(types.TaskTypeApiCall), w.apiHandler.HandleApiCall)
w.mux.HandleFunc(string(types.TaskTypeApiLog), w.apiHandler.HandleApiLog)
@@ -116,6 +121,7 @@ func (w *AsynqWorker) registerAllHandlers() {
zap.String("article_publish", string(types.TaskTypeArticlePublish)),
zap.String("article_cancel", string(types.TaskTypeArticleCancel)),
zap.String("article_modify", string(types.TaskTypeArticleModify)),
zap.String("announcement_publish", string(types.TaskTypeAnnouncementPublish)),
zap.String("api_call", string(types.TaskTypeApiCall)),
zap.String("api_log", string(types.TaskTypeApiLog)),
)

View File

@@ -17,10 +17,10 @@ import (
// TaskManagerImpl 任务管理器实现
type TaskManagerImpl struct {
asynqClient *asynq.Client
asyncTaskRepo repositories.AsyncTaskRepository
logger *zap.Logger
config *interfaces.TaskManagerConfig
asynqClient *asynq.Client
asyncTaskRepo repositories.AsyncTaskRepository
logger *zap.Logger
config *interfaces.TaskManagerConfig
}
// NewTaskManager 创建任务管理器
@@ -42,7 +42,7 @@ func NewTaskManager(
func (tm *TaskManagerImpl) CreateAndEnqueueTask(ctx context.Context, task *entities.AsyncTask) error {
// 1. 保存任务到数据库GORM会自动生成UUID
if err := tm.asyncTaskRepo.Create(ctx, task); err != nil {
tm.logger.Error("保存任务到数据库失败",
tm.logger.Error("保存任务到数据库失败",
zap.String("task_id", task.ID),
zap.Error(err))
return fmt.Errorf("保存任务失败: %w", err)
@@ -50,7 +50,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueTask(ctx context.Context, task *entit
// 2. 更新payload中的task_id
if err := tm.updatePayloadTaskID(task); err != nil {
tm.logger.Error("更新payload中的任务ID失败",
tm.logger.Error("更新payload中的任务ID失败",
zap.String("task_id", task.ID),
zap.Error(err))
return fmt.Errorf("更新payload中的任务ID失败: %w", err)
@@ -58,7 +58,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueTask(ctx context.Context, task *entit
// 3. 更新数据库中的payload
if err := tm.asyncTaskRepo.Update(ctx, task); err != nil {
tm.logger.Error("更新任务payload失败",
tm.logger.Error("更新任务payload失败",
zap.String("task_id", task.ID),
zap.Error(err))
return fmt.Errorf("更新任务payload失败: %w", err)
@@ -71,7 +71,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueTask(ctx context.Context, task *entit
return fmt.Errorf("任务入队失败: %w", err)
}
tm.logger.Info("任务创建并入队成功",
tm.logger.Info("任务创建并入队成功",
zap.String("task_id", task.ID),
zap.String("task_type", task.Type))
@@ -86,7 +86,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueDelayedTask(ctx context.Context, task
// 2. 保存任务到数据库GORM会自动生成UUID
if err := tm.asyncTaskRepo.Create(ctx, task); err != nil {
tm.logger.Error("保存延时任务到数据库失败",
tm.logger.Error("保存延时任务到数据库失败",
zap.String("task_id", task.ID),
zap.Error(err))
return fmt.Errorf("保存延时任务失败: %w", err)
@@ -94,7 +94,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueDelayedTask(ctx context.Context, task
// 3. 更新payload中的task_id
if err := tm.updatePayloadTaskID(task); err != nil {
tm.logger.Error("更新payload中的任务ID失败",
tm.logger.Error("更新payload中的任务ID失败",
zap.String("task_id", task.ID),
zap.Error(err))
return fmt.Errorf("更新payload中的任务ID失败: %w", err)
@@ -102,7 +102,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueDelayedTask(ctx context.Context, task
// 4. 更新数据库中的payload
if err := tm.asyncTaskRepo.Update(ctx, task); err != nil {
tm.logger.Error("更新任务payload失败",
tm.logger.Error("更新任务payload失败",
zap.String("task_id", task.ID),
zap.Error(err))
return fmt.Errorf("更新任务payload失败: %w", err)
@@ -115,7 +115,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueDelayedTask(ctx context.Context, task
return fmt.Errorf("延时任务入队失败: %w", err)
}
tm.logger.Info("延时任务创建并入队成功",
tm.logger.Info("延时任务创建并入队成功",
zap.String("task_id", task.ID),
zap.String("task_type", task.Type),
zap.Duration("delay", delay))
@@ -131,13 +131,13 @@ func (tm *TaskManagerImpl) CancelTask(ctx context.Context, taskID string) error
}
if err := tm.asyncTaskRepo.UpdateStatus(ctx, task.ID, entities.TaskStatusCancelled); err != nil {
tm.logger.Error("更新任务状态为取消失败",
tm.logger.Error("更新任务状态为取消失败",
zap.String("task_id", task.ID),
zap.Error(err))
return fmt.Errorf("更新任务状态失败: %w", err)
}
tm.logger.Info("任务已标记为取消",
tm.logger.Info("任务已标记为取消",
zap.String("task_id", task.ID),
zap.String("task_type", task.Type))
@@ -152,14 +152,14 @@ func (tm *TaskManagerImpl) UpdateTaskSchedule(ctx context.Context, taskID string
return err
}
tm.logger.Info("找到要更新的任务",
tm.logger.Info("找到要更新的任务",
zap.String("task_id", task.ID),
zap.String("current_status", string(task.Status)),
zap.Time("current_scheduled_at", *task.ScheduledAt))
// 2. 取消旧任务
if err := tm.asyncTaskRepo.UpdateStatus(ctx, task.ID, entities.TaskStatusCancelled); err != nil {
tm.logger.Error("取消旧任务失败",
tm.logger.Error("取消旧任务失败",
zap.String("task_id", task.ID),
zap.Error(err))
return fmt.Errorf("取消旧任务失败: %w", err)
@@ -173,7 +173,7 @@ func (tm *TaskManagerImpl) UpdateTaskSchedule(ctx context.Context, taskID string
return err
}
tm.logger.Info("新任务已创建",
tm.logger.Info("新任务已创建",
zap.String("new_task_id", newTask.ID),
zap.Time("new_scheduled_at", newScheduledAt))
@@ -189,7 +189,7 @@ func (tm *TaskManagerImpl) UpdateTaskSchedule(ctx context.Context, taskID string
return fmt.Errorf("重新入队任务失败: %w", err)
}
tm.logger.Info("任务调度时间更新成功",
tm.logger.Info("任务调度时间更新成功",
zap.String("old_task_id", task.ID),
zap.String("new_task_id", newTask.ID),
zap.Time("new_scheduled_at", newScheduledAt))
@@ -237,7 +237,7 @@ func (tm *TaskManagerImpl) RetryTask(ctx context.Context, taskID string) error {
return fmt.Errorf("重试任务入队失败: %w", err)
}
tm.logger.Info("任务重试成功",
tm.logger.Info("任务重试成功",
zap.String("task_id", taskID),
zap.Int("retry_count", task.RetryCount))
@@ -248,7 +248,7 @@ func (tm *TaskManagerImpl) RetryTask(ctx context.Context, taskID string) error {
func (tm *TaskManagerImpl) CleanupExpiredTasks(ctx context.Context, olderThan time.Time) error {
// 这里可以实现清理逻辑,比如删除超过一定时间的已完成任务
tm.logger.Info("开始清理过期任务", zap.Time("older_than", olderThan))
// TODO: 实现清理逻辑
return nil
}
@@ -274,8 +274,7 @@ func (tm *TaskManagerImpl) updatePayloadTaskID(task *entities.AsyncTask) error {
return nil
}
// findTask 查找任务支持taskID和articleID双重查找
// findTask 查找任务支持taskID、articleID和announcementID三重查找
func (tm *TaskManagerImpl) findTask(ctx context.Context, taskID string) (*entities.AsyncTask, error) {
// 先尝试通过任务ID查找
task, err := tm.asyncTaskRepo.GetByID(ctx, taskID)
@@ -285,21 +284,34 @@ func (tm *TaskManagerImpl) findTask(ctx context.Context, taskID string) (*entiti
// 如果通过任务ID找不到尝试通过文章ID查找
tm.logger.Info("通过任务ID查找失败尝试通过文章ID查找", zap.String("task_id", taskID))
tasks, err := tm.asyncTaskRepo.GetByArticleID(ctx, taskID)
if err != nil || len(tasks) == 0 {
tm.logger.Error("通过文章ID也找不到任务",
if err == nil && len(tasks) > 0 {
// 使用找到的第一个任务
task = tasks[0]
tm.logger.Info("通过文章ID找到任务",
zap.String("article_id", taskID),
zap.String("task_id", task.ID))
return task, nil
}
// 如果通过文章ID也找不到尝试通过公告ID查找
tm.logger.Info("通过文章ID查找失败尝试通过公告ID查找", zap.String("task_id", taskID))
announcementTasks, err := tm.asyncTaskRepo.GetByAnnouncementID(ctx, taskID)
if err != nil || len(announcementTasks) == 0 {
tm.logger.Error("通过公告ID也找不到任务",
zap.String("announcement_id", taskID),
zap.Error(err))
return nil, fmt.Errorf("获取任务信息失败: %w", err)
}
// 使用找到的第一个任务
task = tasks[0]
tm.logger.Info("通过文章ID找到任务",
zap.String("article_id", taskID),
task = announcementTasks[0]
tm.logger.Info("通过公告ID找到任务",
zap.String("announcement_id", taskID),
zap.String("task_id", task.ID))
return task, nil
}
@@ -317,7 +329,7 @@ func (tm *TaskManagerImpl) createAndSaveTask(ctx context.Context, originalTask *
// 保存到数据库GORM会自动生成UUID
if err := tm.asyncTaskRepo.Create(ctx, newTask); err != nil {
tm.logger.Error("创建新任务失败",
tm.logger.Error("创建新任务失败",
zap.String("new_task_id", newTask.ID),
zap.Error(err))
return nil, fmt.Errorf("创建新任务失败: %w", err)
@@ -325,7 +337,7 @@ func (tm *TaskManagerImpl) createAndSaveTask(ctx context.Context, originalTask *
// 更新payload中的task_id
if err := tm.updatePayloadTaskID(newTask); err != nil {
tm.logger.Error("更新payload中的任务ID失败",
tm.logger.Error("更新payload中的任务ID失败",
zap.String("new_task_id", newTask.ID),
zap.Error(err))
return nil, fmt.Errorf("更新payload中的任务ID失败: %w", err)
@@ -333,7 +345,7 @@ func (tm *TaskManagerImpl) createAndSaveTask(ctx context.Context, originalTask *
// 更新数据库中的payload
if err := tm.asyncTaskRepo.Update(ctx, newTask); err != nil {
tm.logger.Error("更新新任务payload失败",
tm.logger.Error("更新新任务payload失败",
zap.String("new_task_id", newTask.ID),
zap.Error(err))
return nil, fmt.Errorf("更新新任务payload失败: %w", err)
@@ -346,23 +358,24 @@ func (tm *TaskManagerImpl) createAndSaveTask(ctx context.Context, originalTask *
func (tm *TaskManagerImpl) enqueueTaskWithDelay(ctx context.Context, task *entities.AsyncTask, delay time.Duration) error {
queueName := tm.getQueueName(task.Type)
asynqTask := asynq.NewTask(task.Type, []byte(task.Payload))
var err error
if delay > 0 {
_, err = tm.asynqClient.EnqueueContext(ctx, asynqTask,
_, err = tm.asynqClient.EnqueueContext(ctx, asynqTask,
asynq.Queue(queueName),
asynq.ProcessIn(delay))
} else {
_, err = tm.asynqClient.EnqueueContext(ctx, asynqTask, asynq.Queue(queueName))
}
return err
}
// getQueueName 根据任务类型获取队列名称
func (tm *TaskManagerImpl) getQueueName(taskType string) string {
switch taskType {
case string(types.TaskTypeArticlePublish), string(types.TaskTypeArticleCancel), string(types.TaskTypeArticleModify):
case string(types.TaskTypeArticlePublish), string(types.TaskTypeArticleCancel), string(types.TaskTypeArticleModify),
string(types.TaskTypeAnnouncementPublish):
return "article"
case string(types.TaskTypeApiCall), string(types.TaskTypeApiLog), string(types.TaskTypeDeduction), string(types.TaskTypeUsageStats):
return "api"