Files
tyapi-server/internal/infrastructure/task/implementations/asynq/asynq_article_task_queue.go
2025-09-12 01:15:09 +08:00

132 lines
4.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package asynq
import (
"context"
"fmt"
"time"
"github.com/hibiken/asynq"
"go.uber.org/zap"
"tyapi-server/internal/infrastructure/task/entities"
"tyapi-server/internal/infrastructure/task/interfaces"
"tyapi-server/internal/infrastructure/task/types"
)
// AsynqArticleTaskQueue Asynq文章任务队列实现
type AsynqArticleTaskQueue struct {
client *asynq.Client
logger *zap.Logger
}
// NewAsynqArticleTaskQueue 创建Asynq文章任务队列
func NewAsynqArticleTaskQueue(redisAddr string, logger *zap.Logger) interfaces.ArticleTaskQueue {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
return &AsynqArticleTaskQueue{
client: client,
logger: logger,
}
}
// Enqueue 入队任务
func (q *AsynqArticleTaskQueue) Enqueue(ctx context.Context, taskType types.TaskType, payload types.TaskPayload) error {
payloadData, err := payload.ToJSON()
if err != nil {
q.logger.Error("序列化任务载荷失败", zap.Error(err))
return err
}
task := asynq.NewTask(string(taskType), payloadData)
_, err = q.client.EnqueueContext(ctx, task)
if err != nil {
q.logger.Error("入队任务失败", zap.String("task_type", string(taskType)), zap.Error(err))
return err
}
q.logger.Info("任务入队成功", zap.String("task_type", string(taskType)))
return nil
}
// EnqueueDelayed 延时入队任务
func (q *AsynqArticleTaskQueue) EnqueueDelayed(ctx context.Context, taskType types.TaskType, payload types.TaskPayload, delay time.Duration) error {
payloadData, err := payload.ToJSON()
if err != nil {
q.logger.Error("序列化任务载荷失败", zap.Error(err))
return err
}
task := asynq.NewTask(string(taskType), payloadData)
_, err = q.client.EnqueueContext(ctx, task, asynq.ProcessIn(delay))
if err != nil {
q.logger.Error("延时入队任务失败", zap.String("task_type", string(taskType)), zap.Error(err))
return err
}
q.logger.Info("延时任务入队成功", zap.String("task_type", string(taskType)), zap.Duration("delay", delay))
return nil
}
// EnqueueAt 指定时间入队任务
func (q *AsynqArticleTaskQueue) EnqueueAt(ctx context.Context, taskType types.TaskType, payload types.TaskPayload, scheduledAt time.Time) error {
payloadData, err := payload.ToJSON()
if err != nil {
q.logger.Error("序列化任务载荷失败", zap.Error(err))
return err
}
task := asynq.NewTask(string(taskType), payloadData)
_, err = q.client.EnqueueContext(ctx, task, asynq.ProcessAt(scheduledAt))
if err != nil {
q.logger.Error("定时入队任务失败", zap.String("task_type", string(taskType)), zap.Error(err))
return err
}
q.logger.Info("定时任务入队成功", zap.String("task_type", string(taskType)), zap.Time("scheduled_at", scheduledAt))
return nil
}
// Cancel 取消任务
func (q *AsynqArticleTaskQueue) Cancel(ctx context.Context, taskID string) error {
// Asynq本身不支持直接取消任务但我们可以通过以下方式实现
// 1. 在数据库中标记任务为已取消
// 2. 任务执行时检查状态,如果已取消则跳过执行
q.logger.Info("标记任务为已取消", zap.String("task_id", taskID))
// 这里应该更新数据库中的任务状态为cancelled
// 由于我们没有直接访问repository暂时只记录日志
// 实际实现中应该调用AsyncTaskRepository.UpdateStatus
return nil
}
// ModifySchedule 修改任务调度时间
func (q *AsynqArticleTaskQueue) ModifySchedule(ctx context.Context, taskID string, newScheduledAt time.Time) error {
// Asynq本身不支持修改调度时间但我们可以通过以下方式实现
// 1. 取消旧任务
// 2. 创建新任务
q.logger.Info("修改任务调度时间",
zap.String("task_id", taskID),
zap.Time("new_scheduled_at", newScheduledAt))
// 这里应该:
// 1. 调用Cancel取消旧任务
// 2. 根据任务类型重新创建任务
// 由于没有直接访问repository暂时只记录日志
return nil
}
// GetTaskStatus 获取任务状态
func (q *AsynqArticleTaskQueue) GetTaskStatus(ctx context.Context, taskID string) (*entities.AsyncTask, error) {
// Asynq本身不提供任务状态查询这里返回错误提示
return nil, fmt.Errorf("Asynq不提供任务状态查询请使用数据库状态管理")
}
// ListTasks 列出任务
func (q *AsynqArticleTaskQueue) ListTasks(ctx context.Context, taskType types.TaskType, status entities.TaskStatus, limit int) ([]*entities.AsyncTask, error) {
// Asynq本身不提供任务列表查询这里返回错误提示
return nil, fmt.Errorf("Asynq不提供任务列表查询请使用数据库状态管理")
}