package task import ( "context" "encoding/json" "fmt" "time" "tyapi-server/internal/domains/article/entities" "tyapi-server/internal/domains/article/repositories" "github.com/hibiken/asynq" "go.uber.org/zap" ) // AsynqClient Asynq 客户端 type AsynqClient struct { client *asynq.Client logger *zap.Logger scheduledTaskRepo repositories.ScheduledTaskRepository } // NewAsynqClient 创建 Asynq 客户端 func NewAsynqClient(redisAddr string, scheduledTaskRepo repositories.ScheduledTaskRepository, logger *zap.Logger) *AsynqClient { client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) return &AsynqClient{ client: client, logger: logger, scheduledTaskRepo: scheduledTaskRepo, } } // Close 关闭客户端 func (c *AsynqClient) Close() error { return c.client.Close() } // ScheduleArticlePublish 调度文章定时发布任务 func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID string, publishTime time.Time) (string, error) { payload := map[string]interface{}{ "article_id": articleID, } payloadBytes, err := json.Marshal(payload) if err != nil { c.logger.Error("序列化任务载荷失败", zap.Error(err)) return "", fmt.Errorf("创建任务失败: %w", err) } task := asynq.NewTask(TaskTypeArticlePublish, payloadBytes) // 计算延迟时间 delay := publishTime.Sub(time.Now()) if delay <= 0 { return "", fmt.Errorf("定时发布时间不能早于当前时间") } // 设置任务选项 opts := []asynq.Option{ asynq.ProcessIn(delay), asynq.MaxRetry(3), asynq.Timeout(5 * time.Minute), } info, err := c.client.Enqueue(task, opts...) if err != nil { c.logger.Error("调度定时发布任务失败", zap.String("article_id", articleID), zap.Time("publish_time", publishTime), zap.Error(err)) return "", fmt.Errorf("调度任务失败: %w", err) } // 创建任务状态记录 scheduledTask := entities.ScheduledTask{ TaskID: info.ID, TaskType: TaskTypeArticlePublish, ArticleID: articleID, Status: entities.TaskStatusPending, ScheduledAt: publishTime, } if _, err := c.scheduledTaskRepo.Create(ctx, scheduledTask); err != nil { c.logger.Error("创建任务状态记录失败", zap.String("task_id", info.ID), zap.Error(err)) // 不返回错误,因为Asynq任务已经创建成功 } c.logger.Info("定时发布任务调度成功", zap.String("article_id", articleID), zap.Time("publish_time", publishTime), zap.String("task_id", info.ID)) return info.ID, nil } // CancelScheduledTask 取消已调度的任务 func (c *AsynqClient) CancelScheduledTask(ctx context.Context, taskID string) error { c.logger.Info("标记定时任务为已取消", zap.String("task_id", taskID)) // 标记数据库中的任务状态为已取消 if err := c.scheduledTaskRepo.MarkAsCancelled(ctx, taskID); err != nil { c.logger.Warn("标记任务状态为已取消失败", zap.String("task_id", taskID), zap.Error(err)) // 不返回错误,因为Asynq任务可能已经执行完成 } // Asynq不支持直接取消任务,我们通过数据库状态来标记 // 任务执行时会检查文章状态,如果已取消则跳过执行 return nil } // RescheduleArticlePublish 重新调度文章定时发布任务 func (c *AsynqClient) RescheduleArticlePublish(ctx context.Context, articleID string, oldTaskID string, newPublishTime time.Time) (string, error) { // 1. 取消旧任务(标记为已取消) if err := c.CancelScheduledTask(ctx, oldTaskID); err != nil { c.logger.Warn("取消旧任务失败", zap.String("old_task_id", oldTaskID), zap.Error(err)) } // 2. 创建新任务 newTaskID, err := c.ScheduleArticlePublish(ctx, articleID, newPublishTime) if err != nil { return "", fmt.Errorf("重新调度任务失败: %w", err) } c.logger.Info("重新调度定时发布任务成功", zap.String("article_id", articleID), zap.String("old_task_id", oldTaskID), zap.String("new_task_id", newTaskID), zap.Time("new_publish_time", newPublishTime)) return newTaskID, nil }