qnc-server-tob/app/main/api/internal/service/asynqService.go
2025-06-09 12:34:52 +08:00

102 lines
3.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// asynq_service.go
package service
import (
"encoding/json"
"qnc-server/app/main/api/internal/config"
"qnc-server/app/main/api/internal/types"
"time"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type AsynqService struct {
client *asynq.Client
config config.Config
}
// NewAsynqService 创建并初始化 Asynq 客户端
func NewAsynqService(c config.Config) *AsynqService {
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: c.CacheRedis[0].Host,
Password: c.CacheRedis[0].Pass,
})
return &AsynqService{client: client, config: c}
}
// Close 关闭 Asynq 客户端
func (s *AsynqService) Close() error {
return s.client.Close()
}
func (s *AsynqService) SendQueryTask(orderID int64) error {
// 准备任务的 payload
payload := types.MsgPaySuccessQueryPayload{
OrderID: orderID,
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
logx.Errorf("发送异步任务失败 (无法编码 payload): %v, 订单号: %d", err, orderID)
return err // 直接返回错误,避免继续执行
}
options := []asynq.Option{
asynq.MaxRetry(5), // 设置最大重试次数
}
// 创建任务
task := asynq.NewTask(types.MsgPaySuccessQuery, payloadBytes, options...)
// 将任务加入队列并获取任务信息
info, err := s.client.Enqueue(task)
if err != nil {
logx.Errorf("发送异步任务失败 (加入队列失败): %+v, 订单号: %d", err, orderID)
return err
}
// 记录成功日志,带上任务 ID 和队列信息
logx.Infof("发送异步任务成功任务ID: %s, 队列: %s, 订单号: %d", info.ID, info.Queue, orderID)
return nil
}
// SendUnfreezeAgentBalanceTask 发送解冻代理金额的延迟任务
func (s *AsynqService) SendUnfreezeAgentBalanceTask(agentID, orderID int64, amount float64) error {
// 准备任务的 payload
payload := types.UnfreezeAgentBalancePayload{
AgentID: agentID,
OrderID: orderID,
Amount: amount,
FreezeTime: time.Now().Unix(),
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
logx.Errorf("发送解冻代理金额任务失败 (无法编码 payload): %v, 代理ID: %d, 订单号: %d", err, agentID, orderID)
return err
}
// 设置任务选项
options := []asynq.Option{
asynq.MaxRetry(3), // 最大重试3次
asynq.ProcessIn(24 * time.Hour), // 24小时后执行
asynq.Timeout(5 * time.Minute), // 任务超时时间
asynq.Retention(24 * time.Hour), // 任务结果保留时间
asynq.Queue("agent_balance"), // 使用专门的队列
}
// 创建任务
task := asynq.NewTask(types.MsgDelayedTask, payloadBytes, options...)
// 将任务加入队列
info, err := s.client.Enqueue(task)
if err != nil {
logx.Errorf("发送解冻代理金额任务失败 (加入队列失败): %v, 代理ID: %d, 订单号: %d", err, agentID, orderID)
return err
}
logx.Infof("发送解冻代理金额任务成功任务ID: %s, 队列: %s, 代理ID: %d, 订单号: %d, 金额: %.2f",
info.ID, info.Queue, agentID, orderID, amount)
return nil
}