package queue import ( "context" "encoding/json" "errors" "fmt" "time" "xingfucha-server/app/main/api/internal/svc" "xingfucha-server/app/main/api/internal/types" "xingfucha-server/app/main/model" "github.com/hibiken/asynq" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" ) // 定义佣金状态常量 const ( CommissionStatusReleased = 0 // 已发放 CommissionStatusFrozen = 1 // 冻结佣金 ) type UnfreezeCommissionHandler struct { svcCtx *svc.ServiceContext } func NewUnfreezeCommissionHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionHandler { return &UnfreezeCommissionHandler{ svcCtx: svcCtx, } } func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Task) error { now := time.Now() logx.Infof("%s - 开始执行佣金解冻任务", now.Format("2006-01-02 15:04:05")) // 解析任务payload,获取佣金ID var payload types.MsgUnfreezeCommissionPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { logx.Errorf("解析佣金解冻任务payload失败: %v", err) return err } commissionID := payload.CommissionID if commissionID <= 0 { logx.Errorf("无效的佣金ID: %d", commissionID) return fmt.Errorf("无效的佣金ID: %d", commissionID) } // 根据佣金ID查询特定佣金记录 commission, err := l.svcCtx.AgentCommissionModel.FindOne(ctx, commissionID) if err != nil { logx.Errorf("查询佣金记录ID %d 失败: %v", commissionID, err) return err } // 检查佣金状态是否为冻结状态 if commission.Status != CommissionStatusFrozen { logx.Infof("佣金记录ID %d 状态不是冻结状态,当前状态: %d,无需处理", commissionID, commission.Status) return nil } // 使用事务处理解冻操作 err = l.svcCtx.AgentCommissionModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error { // 获取代理钱包记录 agentWallet, err := l.svcCtx.AgentWalletModel.FindOneByAgentId(ctx, commission.AgentId) if err != nil { logx.Errorf("查询代理ID %d 的钱包记录失败: %v", commission.AgentId, err) return err } // 计算当前佣金在发生退款后的“净佣金金额” commissionAmount := commission.Amount - commission.RefundedAmount if commissionAmount <= 0 { logx.Infof("佣金记录ID %d 已被全部退款或无可解冻金额,跳过解冻", commissionID) return nil } // 更新钱包余额:增加净佣金金额到 balance,减少相应的 frozen_balance agentWallet.Balance += commissionAmount agentWallet.FrozenBalance -= commissionAmount agentWallet.UpdateTime = now // 更新钱包数据库(使用 UpdateWithVersion 保持乐观锁) updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(ctx, session, agentWallet) if updateWalletErr != nil { // 如果是版本冲突错误,重新查询最新的数据后重试 if errors.Is(updateWalletErr, model.ErrNoRowsUpdate) { logx.Infof("代理ID %d 的钱包版本冲突,重新查询最新数据重试", commission.AgentId) latestWallet, findErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(ctx, commission.AgentId) if findErr != nil { logx.Errorf("重新查询代理ID %d 的钱包记录失败: %v", commission.AgentId, findErr) return findErr } // 重新累加金额 latestWallet.Balance += commissionAmount latestWallet.FrozenBalance -= commissionAmount latestWallet.UpdateTime = now retryUpdateErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(ctx, session, latestWallet) if retryUpdateErr != nil { logx.Errorf("重试更新代理ID %d 的钱包记录失败: %v", commission.AgentId, retryUpdateErr) return retryUpdateErr } logx.Infof("重试成功,已更新代理ID %d 的钱包记录", commission.AgentId) } else { logx.Errorf("更新代理ID %d 的钱包记录失败: %v", commission.AgentId, updateWalletErr) return updateWalletErr } } // 钱包更新成功后,再更新佣金状态为已发放 commission.Status = CommissionStatusReleased commission.UpdateTime = now // 更新佣金数据库(使用 UpdateWithVersion 保持乐观锁) err = l.svcCtx.AgentCommissionModel.UpdateWithVersion(ctx, session, commission) if err != nil { // 如果是版本冲突错误,重新查询最新的数据后重试 if errors.Is(err, model.ErrNoRowsUpdate) { logx.Infof("佣金记录ID %d 版本冲突,重新查询最新数据重试", commissionID) latestCommission, findErr := l.svcCtx.AgentCommissionModel.FindOne(ctx, commissionID) if findErr != nil { logx.Errorf("重新查询佣金记录ID %d 失败: %v", commissionID, findErr) return findErr } // 检查状态是否已被其他操作修改 if latestCommission.Status != CommissionStatusFrozen { logx.Errorf("佣金记录ID %d 的状态已被其他操作修改,当前状态: %d", commissionID, latestCommission.Status) return fmt.Errorf("佣金记录状态已被修改") } // 重新更新状态 latestCommission.Status = CommissionStatusReleased latestCommission.UpdateTime = now retryUpdateErr := l.svcCtx.AgentCommissionModel.UpdateWithVersion(ctx, session, latestCommission) if retryUpdateErr != nil { logx.Errorf("重试更新佣金记录ID %d 失败: %v", commissionID, retryUpdateErr) return retryUpdateErr } logx.Infof("重试成功,已更新佣金记录ID %d", commissionID) } else { logx.Errorf("更新佣金记录ID %d 失败: %v", commissionID, err) return err } } logx.Infof("成功解冻佣金记录ID %d,代理ID %d,佣金金额 %.2f,已将佣金金额从冻结余额转移到可用余额", commissionID, commission.AgentId, commissionAmount) return nil }) if err != nil { logx.Errorf("%s - 佣金解冻任务失败: %v", now.Format("2006-01-02 15:04:05"), err) return err } logx.Infof("%s - 佣金解冻任务完成,佣金ID: %d", now.Format("2006-01-02 15:04:05"), commissionID) return nil }