132 lines
4.5 KiB
Go
132 lines
4.5 KiB
Go
package asynq
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/hibiken/asynq"
|
||
"go.uber.org/zap"
|
||
|
||
"tyapi-server/internal/infrastructure/task/entities"
|
||
"tyapi-server/internal/infrastructure/task/interfaces"
|
||
"tyapi-server/internal/infrastructure/task/types"
|
||
)
|
||
|
||
// AsynqArticleTaskQueue Asynq文章任务队列实现
|
||
type AsynqArticleTaskQueue struct {
|
||
client *asynq.Client
|
||
logger *zap.Logger
|
||
}
|
||
|
||
// NewAsynqArticleTaskQueue 创建Asynq文章任务队列
|
||
func NewAsynqArticleTaskQueue(redisAddr string, logger *zap.Logger) interfaces.ArticleTaskQueue {
|
||
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
|
||
return &AsynqArticleTaskQueue{
|
||
client: client,
|
||
logger: logger,
|
||
}
|
||
}
|
||
|
||
// Enqueue 入队任务
|
||
func (q *AsynqArticleTaskQueue) Enqueue(ctx context.Context, taskType types.TaskType, payload types.TaskPayload) error {
|
||
payloadData, err := payload.ToJSON()
|
||
if err != nil {
|
||
q.logger.Error("序列化任务载荷失败", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
task := asynq.NewTask(string(taskType), payloadData)
|
||
_, err = q.client.EnqueueContext(ctx, task)
|
||
if err != nil {
|
||
q.logger.Error("入队任务失败", zap.String("task_type", string(taskType)), zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
q.logger.Info("任务入队成功", zap.String("task_type", string(taskType)))
|
||
return nil
|
||
}
|
||
|
||
// EnqueueDelayed 延时入队任务
|
||
func (q *AsynqArticleTaskQueue) EnqueueDelayed(ctx context.Context, taskType types.TaskType, payload types.TaskPayload, delay time.Duration) error {
|
||
payloadData, err := payload.ToJSON()
|
||
if err != nil {
|
||
q.logger.Error("序列化任务载荷失败", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
task := asynq.NewTask(string(taskType), payloadData)
|
||
_, err = q.client.EnqueueContext(ctx, task, asynq.ProcessIn(delay))
|
||
if err != nil {
|
||
q.logger.Error("延时入队任务失败", zap.String("task_type", string(taskType)), zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
q.logger.Info("延时任务入队成功", zap.String("task_type", string(taskType)), zap.Duration("delay", delay))
|
||
return nil
|
||
}
|
||
|
||
// EnqueueAt 指定时间入队任务
|
||
func (q *AsynqArticleTaskQueue) EnqueueAt(ctx context.Context, taskType types.TaskType, payload types.TaskPayload, scheduledAt time.Time) error {
|
||
payloadData, err := payload.ToJSON()
|
||
if err != nil {
|
||
q.logger.Error("序列化任务载荷失败", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
task := asynq.NewTask(string(taskType), payloadData)
|
||
_, err = q.client.EnqueueContext(ctx, task, asynq.ProcessAt(scheduledAt))
|
||
if err != nil {
|
||
q.logger.Error("定时入队任务失败", zap.String("task_type", string(taskType)), zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
q.logger.Info("定时任务入队成功", zap.String("task_type", string(taskType)), zap.Time("scheduled_at", scheduledAt))
|
||
return nil
|
||
}
|
||
|
||
// Cancel 取消任务
|
||
func (q *AsynqArticleTaskQueue) Cancel(ctx context.Context, taskID string) error {
|
||
// Asynq本身不支持直接取消任务,但我们可以通过以下方式实现:
|
||
// 1. 在数据库中标记任务为已取消
|
||
// 2. 任务执行时检查状态,如果已取消则跳过执行
|
||
|
||
q.logger.Info("标记任务为已取消", zap.String("task_id", taskID))
|
||
|
||
// 这里应该更新数据库中的任务状态为cancelled
|
||
// 由于我们没有直接访问repository,暂时只记录日志
|
||
// 实际实现中应该调用AsyncTaskRepository.UpdateStatus
|
||
|
||
return nil
|
||
}
|
||
|
||
// ModifySchedule 修改任务调度时间
|
||
func (q *AsynqArticleTaskQueue) ModifySchedule(ctx context.Context, taskID string, newScheduledAt time.Time) error {
|
||
// Asynq本身不支持修改调度时间,但我们可以通过以下方式实现:
|
||
// 1. 取消旧任务
|
||
// 2. 创建新任务
|
||
|
||
q.logger.Info("修改任务调度时间",
|
||
zap.String("task_id", taskID),
|
||
zap.Time("new_scheduled_at", newScheduledAt))
|
||
|
||
// 这里应该:
|
||
// 1. 调用Cancel取消旧任务
|
||
// 2. 根据任务类型重新创建任务
|
||
// 由于没有直接访问repository,暂时只记录日志
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetTaskStatus 获取任务状态
|
||
func (q *AsynqArticleTaskQueue) GetTaskStatus(ctx context.Context, taskID string) (*entities.AsyncTask, error) {
|
||
// Asynq本身不提供任务状态查询,这里返回错误提示
|
||
return nil, fmt.Errorf("Asynq不提供任务状态查询,请使用数据库状态管理")
|
||
}
|
||
|
||
// ListTasks 列出任务
|
||
func (q *AsynqArticleTaskQueue) ListTasks(ctx context.Context, taskType types.TaskType, status entities.TaskStatus, limit int) ([]*entities.AsyncTask, error) {
|
||
// Asynq本身不提供任务列表查询,这里返回错误提示
|
||
return nil, fmt.Errorf("Asynq不提供任务列表查询,请使用数据库状态管理")
|
||
}
|