2025-09-01 18:29:59 +08:00
|
|
|
|
package task
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"encoding/json"
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/hibiken/asynq"
|
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// AsynqClient Asynq 客户端
|
|
|
|
|
|
type AsynqClient struct {
|
|
|
|
|
|
client *asynq.Client
|
|
|
|
|
|
logger *zap.Logger
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// NewAsynqClient 创建 Asynq 客户端
|
|
|
|
|
|
func NewAsynqClient(redisAddr string, logger *zap.Logger) *AsynqClient {
|
|
|
|
|
|
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
|
|
|
|
|
|
return &AsynqClient{
|
|
|
|
|
|
client: client,
|
|
|
|
|
|
logger: logger,
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Close 关闭客户端
|
|
|
|
|
|
func (c *AsynqClient) Close() error {
|
|
|
|
|
|
return c.client.Close()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ScheduleArticlePublish 调度文章定时发布任务
|
2025-09-02 16:37:28 +08:00
|
|
|
|
func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID string, publishTime time.Time) (string, error) {
|
2025-09-01 18:29:59 +08:00
|
|
|
|
payload := map[string]interface{}{
|
|
|
|
|
|
"article_id": articleID,
|
|
|
|
|
|
}
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-01 18:29:59 +08:00
|
|
|
|
payloadBytes, err := json.Marshal(payload)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
c.logger.Error("序列化任务载荷失败", zap.Error(err))
|
2025-09-02 16:37:28 +08:00
|
|
|
|
return "", fmt.Errorf("创建任务失败: %w", err)
|
2025-09-01 18:29:59 +08:00
|
|
|
|
}
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-01 18:29:59 +08:00
|
|
|
|
task := asynq.NewTask(TaskTypeArticlePublish, payloadBytes)
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-01 18:29:59 +08:00
|
|
|
|
// 计算延迟时间
|
|
|
|
|
|
delay := publishTime.Sub(time.Now())
|
|
|
|
|
|
if delay <= 0 {
|
2025-09-02 16:37:28 +08:00
|
|
|
|
return "", fmt.Errorf("定时发布时间不能早于当前时间")
|
2025-09-01 18:29:59 +08:00
|
|
|
|
}
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-01 18:29:59 +08:00
|
|
|
|
// 设置任务选项
|
|
|
|
|
|
opts := []asynq.Option{
|
|
|
|
|
|
asynq.ProcessIn(delay),
|
|
|
|
|
|
asynq.MaxRetry(3),
|
|
|
|
|
|
asynq.Timeout(5 * time.Minute),
|
|
|
|
|
|
}
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-01 18:29:59 +08:00
|
|
|
|
info, err := c.client.Enqueue(task, opts...)
|
|
|
|
|
|
if err != nil {
|
2025-09-02 20:46:10 +08:00
|
|
|
|
c.logger.Error("调度定时发布任务失败",
|
2025-09-01 18:29:59 +08:00
|
|
|
|
zap.String("article_id", articleID),
|
|
|
|
|
|
zap.Time("publish_time", publishTime),
|
|
|
|
|
|
zap.Error(err))
|
2025-09-02 16:37:28 +08:00
|
|
|
|
return "", fmt.Errorf("调度任务失败: %w", err)
|
2025-09-01 18:29:59 +08:00
|
|
|
|
}
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-01 18:29:59 +08:00
|
|
|
|
c.logger.Info("定时发布任务调度成功",
|
|
|
|
|
|
zap.String("article_id", articleID),
|
|
|
|
|
|
zap.Time("publish_time", publishTime),
|
|
|
|
|
|
zap.String("task_id", info.ID))
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-02 16:37:28 +08:00
|
|
|
|
return info.ID, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// CancelScheduledTask 取消已调度的任务
|
|
|
|
|
|
func (c *AsynqClient) CancelScheduledTask(ctx context.Context, taskID string) error {
|
|
|
|
|
|
// 注意:Asynq不直接支持取消已调度的任务
|
|
|
|
|
|
// 这里我们记录日志,实际取消需要在数据库中标记
|
|
|
|
|
|
c.logger.Info("请求取消定时任务",
|
|
|
|
|
|
zap.String("task_id", taskID))
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-02 16:37:28 +08:00
|
|
|
|
// 在实际应用中,你可能需要:
|
|
|
|
|
|
// 1. 在数据库中标记任务为已取消
|
|
|
|
|
|
// 2. 在任务执行时检查取消状态
|
|
|
|
|
|
// 3. 或者使用Redis的TTL机制
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-01 18:29:59 +08:00
|
|
|
|
return nil
|
|
|
|
|
|
}
|
2025-09-02 16:37:28 +08:00
|
|
|
|
|
|
|
|
|
|
// RescheduleArticlePublish 重新调度文章定时发布任务
|
|
|
|
|
|
func (c *AsynqClient) RescheduleArticlePublish(ctx context.Context, articleID string, oldTaskID string, newPublishTime time.Time) (string, error) {
|
|
|
|
|
|
// 1. 取消旧任务(标记为已取消)
|
|
|
|
|
|
if err := c.CancelScheduledTask(ctx, oldTaskID); err != nil {
|
2025-09-02 20:46:10 +08:00
|
|
|
|
c.logger.Warn("取消旧任务失败",
|
2025-09-02 16:37:28 +08:00
|
|
|
|
zap.String("old_task_id", oldTaskID),
|
|
|
|
|
|
zap.Error(err))
|
|
|
|
|
|
}
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-02 16:37:28 +08:00
|
|
|
|
// 2. 创建新任务
|
|
|
|
|
|
newTaskID, err := c.ScheduleArticlePublish(ctx, articleID, newPublishTime)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return "", fmt.Errorf("重新调度任务失败: %w", err)
|
|
|
|
|
|
}
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-02 16:37:28 +08:00
|
|
|
|
c.logger.Info("重新调度定时发布任务成功",
|
|
|
|
|
|
zap.String("article_id", articleID),
|
|
|
|
|
|
zap.String("old_task_id", oldTaskID),
|
|
|
|
|
|
zap.String("new_task_id", newTaskID),
|
|
|
|
|
|
zap.Time("new_publish_time", newPublishTime))
|
2025-09-02 20:46:10 +08:00
|
|
|
|
|
2025-09-02 16:37:28 +08:00
|
|
|
|
return newTaskID, nil
|
|
|
|
|
|
}
|