This commit is contained in:
2025-09-03 13:51:52 +08:00
parent c579e53ad1
commit c1f127e9b1
12 changed files with 645 additions and 23 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"tyapi-server/internal/domains/article/repositories"
"github.com/hibiken/asynq"
"go.uber.org/zap"
@@ -16,18 +17,21 @@ type ArticlePublisher interface {
// ArticleTaskHandler 文章任务处理器
type ArticleTaskHandler struct {
publisher ArticlePublisher
logger *zap.Logger
publisher ArticlePublisher
scheduledTaskRepo repositories.ScheduledTaskRepository
logger *zap.Logger
}
// NewArticleTaskHandler 创建文章任务处理器
func NewArticleTaskHandler(
publisher ArticlePublisher,
scheduledTaskRepo repositories.ScheduledTaskRepository,
logger *zap.Logger,
) *ArticleTaskHandler {
return &ArticleTaskHandler{
publisher: publisher,
logger: logger,
publisher: publisher,
scheduledTaskRepo: scheduledTaskRepo,
logger: logger,
}
}
@@ -45,14 +49,49 @@ func (h *ArticleTaskHandler) HandleArticlePublish(ctx context.Context, t *asynq.
return fmt.Errorf("任务载荷中缺少文章ID")
}
// 获取任务状态记录
task, err := h.scheduledTaskRepo.GetByTaskID(ctx, t.ResultWriter().TaskID())
if err != nil {
h.logger.Error("获取任务状态记录失败", zap.String("task_id", t.ResultWriter().TaskID()), zap.Error(err))
// 继续执行,不阻断任务
} else {
// 检查任务是否已取消
if task.IsCancelled() {
h.logger.Info("任务已取消,跳过执行", zap.String("task_id", t.ResultWriter().TaskID()))
return nil
}
// 标记任务为正在执行
task.MarkAsRunning()
if err := h.scheduledTaskRepo.Update(ctx, task); err != nil {
h.logger.Warn("更新任务状态失败", zap.String("task_id", t.ResultWriter().TaskID()), zap.Error(err))
}
}
// 执行文章发布
if err := h.publisher.PublishArticleByID(ctx, articleID); err != nil {
// 更新任务状态为失败
if task.ID != "" {
task.MarkAsFailed(err.Error())
if updateErr := h.scheduledTaskRepo.Update(ctx, task); updateErr != nil {
h.logger.Warn("更新任务失败状态失败", zap.String("task_id", t.ResultWriter().TaskID()), zap.Error(updateErr))
}
}
h.logger.Error("定时发布文章失败",
zap.String("article_id", articleID),
zap.Error(err))
return fmt.Errorf("定时发布文章失败: %w", err)
}
// 更新任务状态为已完成
if task.ID != "" {
task.MarkAsCompleted()
if err := h.scheduledTaskRepo.Update(ctx, task); err != nil {
h.logger.Warn("更新任务完成状态失败", zap.String("task_id", t.ResultWriter().TaskID()), zap.Error(err))
}
}
h.logger.Info("定时发布文章成功", zap.String("article_id", articleID))
return nil
}

View File

@@ -5,6 +5,8 @@ import (
"encoding/json"
"fmt"
"time"
"tyapi-server/internal/domains/article/entities"
"tyapi-server/internal/domains/article/repositories"
"github.com/hibiken/asynq"
"go.uber.org/zap"
@@ -12,16 +14,18 @@ import (
// AsynqClient Asynq 客户端
type AsynqClient struct {
client *asynq.Client
logger *zap.Logger
client *asynq.Client
logger *zap.Logger
scheduledTaskRepo repositories.ScheduledTaskRepository
}
// NewAsynqClient 创建 Asynq 客户端
func NewAsynqClient(redisAddr string, logger *zap.Logger) *AsynqClient {
func NewAsynqClient(redisAddr string, scheduledTaskRepo repositories.ScheduledTaskRepository, logger *zap.Logger) *AsynqClient {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
return &AsynqClient{
client: client,
logger: logger,
client: client,
logger: logger,
scheduledTaskRepo: scheduledTaskRepo,
}
}
@@ -66,6 +70,20 @@ func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID stri
return "", fmt.Errorf("调度任务失败: %w", err)
}
// 创建任务状态记录
scheduledTask := entities.ScheduledTask{
TaskID: info.ID,
TaskType: TaskTypeArticlePublish,
ArticleID: articleID,
Status: entities.TaskStatusPending,
ScheduledAt: publishTime,
}
if _, err := c.scheduledTaskRepo.Create(ctx, scheduledTask); err != nil {
c.logger.Error("创建任务状态记录失败", zap.String("task_id", info.ID), zap.Error(err))
// 不返回错误因为Asynq任务已经创建成功
}
c.logger.Info("定时发布任务调度成功",
zap.String("article_id", articleID),
zap.Time("publish_time", publishTime),
@@ -76,16 +94,17 @@ func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID stri
// CancelScheduledTask 取消已调度的任务
func (c *AsynqClient) CancelScheduledTask(ctx context.Context, taskID string) error {
// 注意Asynq不直接支持取消已调度的任务
// 这里我们记录日志,实际取消需要在数据库中标记
c.logger.Info("请求取消定时任务",
c.logger.Info("标记定时任务为已取消",
zap.String("task_id", taskID))
// 在实际应用中,你可能需要:
// 1. 在数据库中标记任务为已取消
// 2. 在任务执行时检查取消状态
// 3. 或者使用Redis的TTL机制
// 标记数据库中的任务状态为已取消
if err := c.scheduledTaskRepo.MarkAsCancelled(ctx, taskID); err != nil {
c.logger.Warn("标记任务状态为已取消失败", zap.String("task_id", taskID), zap.Error(err))
// 不返回错误因为Asynq任务可能已经执行完成
}
// Asynq不支持直接取消任务我们通过数据库状态来标记
// 任务执行时会检查文章状态,如果已取消则跳过执行
return nil
}