add timed

This commit is contained in:
2025-09-02 16:37:28 +08:00
parent 2f3817c8f0
commit c7c4ab7a19
28 changed files with 478 additions and 2373 deletions

View File

@@ -31,7 +31,7 @@ func (c *AsynqClient) Close() error {
}
// ScheduleArticlePublish 调度文章定时发布任务
func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID string, publishTime time.Time) error {
func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID string, publishTime time.Time) (string, error) {
payload := map[string]interface{}{
"article_id": articleID,
}
@@ -39,7 +39,7 @@ func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID stri
payloadBytes, err := json.Marshal(payload)
if err != nil {
c.logger.Error("序列化任务载荷失败", zap.Error(err))
return fmt.Errorf("创建任务失败: %w", err)
return "", fmt.Errorf("创建任务失败: %w", err)
}
task := asynq.NewTask(TaskTypeArticlePublish, payloadBytes)
@@ -47,7 +47,7 @@ func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID stri
// 计算延迟时间
delay := publishTime.Sub(time.Now())
if delay <= 0 {
return fmt.Errorf("定时发布时间不能早于当前时间")
return "", fmt.Errorf("定时发布时间不能早于当前时间")
}
// 设置任务选项
@@ -63,7 +63,7 @@ func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID stri
zap.String("article_id", articleID),
zap.Time("publish_time", publishTime),
zap.Error(err))
return fmt.Errorf("调度任务失败: %w", err)
return "", fmt.Errorf("调度任务失败: %w", err)
}
c.logger.Info("定时发布任务调度成功",
@@ -71,5 +71,44 @@ func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID stri
zap.Time("publish_time", publishTime),
zap.String("task_id", info.ID))
return info.ID, nil
}
// CancelScheduledTask 取消已调度的任务
func (c *AsynqClient) CancelScheduledTask(ctx context.Context, taskID string) error {
// 注意Asynq不直接支持取消已调度的任务
// 这里我们记录日志,实际取消需要在数据库中标记
c.logger.Info("请求取消定时任务",
zap.String("task_id", taskID))
// 在实际应用中,你可能需要:
// 1. 在数据库中标记任务为已取消
// 2. 在任务执行时检查取消状态
// 3. 或者使用Redis的TTL机制
return nil
}
// RescheduleArticlePublish 重新调度文章定时发布任务
func (c *AsynqClient) RescheduleArticlePublish(ctx context.Context, articleID string, oldTaskID string, newPublishTime time.Time) (string, error) {
// 1. 取消旧任务(标记为已取消)
if err := c.CancelScheduledTask(ctx, oldTaskID); err != nil {
c.logger.Warn("取消旧任务失败",
zap.String("old_task_id", oldTaskID),
zap.Error(err))
}
// 2. 创建新任务
newTaskID, err := c.ScheduleArticlePublish(ctx, articleID, newPublishTime)
if err != nil {
return "", fmt.Errorf("重新调度任务失败: %w", err)
}
c.logger.Info("重新调度定时发布任务成功",
zap.String("article_id", articleID),
zap.String("old_task_id", oldTaskID),
zap.String("new_task_id", newTaskID),
zap.Time("new_publish_time", newPublishTime))
return newTaskID, nil
}