qnc-server/app/user/cmd/api/internal/service/asynqService.go
2025-03-21 15:47:11 +08:00

133 lines
3.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// asynq_service.go
package service
import (
"encoding/json"
"qnc-server/app/user/cmd/api/internal/config"
"qnc-server/app/user/cmd/api/internal/types"
"time"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type AsynqService struct {
client *asynq.Client
config config.Config
}
// NewAsynqService 创建并初始化 Asynq 客户端
func NewAsynqService(c config.Config) *AsynqService {
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: c.CacheRedis[0].Host,
Password: c.CacheRedis[0].Pass,
})
return &AsynqService{client: client, config: c}
}
// Close 关闭 Asynq 客户端
func (s *AsynqService) Close() error {
return s.client.Close()
}
func (s *AsynqService) SendQueryTask(orderID int64) error {
// 准备任务的 payload
payload := types.MsgPaySuccessQueryPayload{
OrderID: orderID,
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
logx.Errorf("发送异步任务失败 (无法编码 payload): %v, 订单号: %d", err, orderID)
return err // 直接返回错误,避免继续执行
}
options := []asynq.Option{
asynq.MaxRetry(5), // 设置最大重试次数
}
// 创建任务
task := asynq.NewTask(types.MsgPaySuccessQuery, payloadBytes, options...)
// 将任务加入队列并获取任务信息
info, err := s.client.Enqueue(task)
if err != nil {
logx.Errorf("发送异步任务失败 (加入队列失败): %+v, 订单号: %d", err, orderID)
return err
}
// 记录成功日志,带上任务 ID 和队列信息
logx.Infof("发送异步任务成功任务ID: %s, 队列: %s, 订单号: %d", info.ID, info.Queue, orderID)
return nil
}
// SendCarMaintenanceQueryTaskWithBackoff 发送带有指数退避重试策略的车辆维保记录查询的异步任务
func (s *AsynqService) SendCarMaintenanceQueryTaskWithBackoff(orderID string, retryCount int, queryID int64) error {
// 准备任务的 payload
payload := types.MsgCarMaintenanceQueryPayload{
OrderID: orderID,
RetryCount: retryCount + 1, // 增加重试次数
QueryID: queryID, // 关联的query表记录ID
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
logx.Errorf("发送重试车辆维保记录查询任务失败 (无法编码 payload): %v, 订单号: %s", err, orderID)
return err
}
// 确定延迟时间
var delay time.Duration
if retryCount == 0 {
// 第一次执行,立即进行
delay = 0
} else {
// 非首次执行,使用指数退避策略
// 基础延迟为3秒每次重试后延迟时间翻倍最长不超过1小时
baseDelay := 3 * time.Second
maxDelay := 1 * time.Hour
delay = baseDelay
for i := 1; i < retryCount; i++ {
delay = delay * 2
if delay > maxDelay {
delay = maxDelay
break
}
}
}
// 保存延迟时间到payload
payload.DelayDuration = int64(delay)
payloadBytes, _ = json.Marshal(payload)
options := []asynq.Option{
asynq.MaxRetry(0), // 使用我们自己的重试逻辑不使用asynq的自动重试
}
// 如果有延迟,添加延迟选项
if delay > 0 {
options = append(options, asynq.ProcessIn(delay))
}
// 创建任务
task := asynq.NewTask(types.MsgCarMaintenanceQuery, payloadBytes, options...)
// 将任务加入队列并获取任务信息
info, err := s.client.Enqueue(task)
if err != nil {
logx.Errorf("发送重试车辆维保记录查询任务失败 (加入队列失败): %+v, 订单号: %s", err, orderID)
return err
}
// 记录成功日志,带上任务 ID 和队列信息
if delay == 0 {
logx.Infof("发送车辆维保记录查询任务成功立即执行任务ID: %s, 队列: %s, 订单号: %s, 查询ID: %d",
info.ID, info.Queue, orderID, queryID)
} else {
logx.Infof("发送重试车辆维保记录查询任务成功任务ID: %s, 队列: %s, 延迟: %v, 重试次数: %d, 订单号: %s, 查询ID: %d",
info.ID, info.Queue, delay, retryCount+1, orderID, queryID)
}
return nil
}