package queue import ( "context" "encoding/json" "fmt" "qnc-server/app/main/api/internal/svc" "qnc-server/app/main/api/internal/types" "qnc-server/app/main/model" "github.com/hibiken/asynq" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" ) type DelayedTaskHandler struct { svcCtx *svc.ServiceContext } func NewDelayedTaskHandler(svcCtx *svc.ServiceContext) *DelayedTaskHandler { return &DelayedTaskHandler{ svcCtx: svcCtx, } } func (h *DelayedTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error { var payload types.MsgDelayedTaskPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("解析任务载荷失败: %v", err) } // 解析具体的任务数据 var unfreezePayload types.UnfreezeAgentBalancePayload if err := json.Unmarshal([]byte(payload.Data), &unfreezePayload); err != nil { return fmt.Errorf("解析解冻代理金额任务数据失败: %v", err) } // 处理解冻代理金额的逻辑 err := h.handleUnfreezeAgentBalance(ctx, unfreezePayload) if err != nil { logx.Errorf("处理解冻代理金额任务失败: %v, 代理ID: %d, 订单号: %d", err, unfreezePayload.AgentID, unfreezePayload.OrderID) return err } logx.Infof("解冻代理金额任务处理成功: 代理ID: %d, 订单号: %d, 金额: %.2f", unfreezePayload.AgentID, unfreezePayload.OrderID, unfreezePayload.Amount) return nil } // handleUnfreezeAgentBalance 处理解冻代理金额的具体业务逻辑 func (h *DelayedTaskHandler) handleUnfreezeAgentBalance(ctx context.Context, payload types.UnfreezeAgentBalancePayload) error { // 开启事务处理解冻逻辑 err := h.svcCtx.AgentService.AgentWalletModel.Trans(ctx, func(transCtx context.Context, session sqlx.Session) error { // 1. 查询代理钱包 agentWallet, err := h.svcCtx.AgentService.AgentWalletModel.FindOneByAgentId(transCtx, payload.AgentID) if err != nil { return fmt.Errorf("查询代理钱包失败: %v", err) } // 2. 验证冻结金额 if agentWallet.FrozenBalance < payload.Amount { return fmt.Errorf("冻结余额不足: 当前冻结余额 %.2f, 需要解冻金额 %.2f", agentWallet.FrozenBalance, payload.Amount) } // 3. 查询订单状况 order, err := h.svcCtx.OrderModel.FindOne(transCtx, payload.OrderID) if err != nil { return fmt.Errorf("查询订单失败: %v", err) } if order.Status != model.OrderStatusPaid { return fmt.Errorf("订单状态不正确: 当前状态 %s, 期望状态 %s", order.Status, model.OrderStatusPaid) } // 5. 更新钱包余额 agentWallet.FrozenBalance -= payload.Amount agentWallet.Balance += payload.Amount // 6. 保存更新 if err := h.svcCtx.AgentService.AgentWalletModel.UpdateWithVersion(transCtx, session, agentWallet); err != nil { return fmt.Errorf("更新钱包失败: %v", err) } return nil }) if err != nil { logx.Errorf("解冻代理金额失败: %v, 代理ID: %d, 订单号: %d", err, payload.AgentID, payload.OrderID) return err } logx.Infof("解冻代理金额成功: 代理ID: %d, 订单号: %d, 金额: %.2f", payload.AgentID, payload.OrderID, payload.Amount) return nil }