From ba2624e310332bdfb6ed3cc87eea43d581be433a Mon Sep 17 00:00:00 2001 From: 18278715334 <18278715334@163.com> Date: Sun, 4 Jan 2026 11:24:48 +0800 Subject: [PATCH] try fix --- app/main/api/internal/queue/routes.go | 8 --- .../api/internal/queue/unfreezeCommission.go | 60 ++++++++++++++++--- 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/app/main/api/internal/queue/routes.go b/app/main/api/internal/queue/routes.go index 7bc31c1..f604b51 100644 --- a/app/main/api/internal/queue/routes.go +++ b/app/main/api/internal/queue/routes.go @@ -31,14 +31,6 @@ func (l *CronJob) Register() *asynq.ServeMux { if err != nil { panic(fmt.Sprintf("定时任务注册失败:%v", err)) } - - // 注释掉原来的佣金解冻定时任务,因为现在改为按需处理单个佣金记录 - // unfreezeTask := asynq.NewTask(types.MsgUnfreezeCommission, nil, nil) - // _, err = scheduler.Register("0 4 * * *", unfreezeTask) - // if err != nil { - // panic(fmt.Sprintf("佣金解冻任务注册失败:%v", err)) - // } - scheduler.Start() fmt.Println("定时任务启动!!!") diff --git a/app/main/api/internal/queue/unfreezeCommission.go b/app/main/api/internal/queue/unfreezeCommission.go index 6fd562d..67ddb49 100644 --- a/app/main/api/internal/queue/unfreezeCommission.go +++ b/app/main/api/internal/queue/unfreezeCommission.go @@ -3,11 +3,13 @@ package queue import ( "context" "encoding/json" + "errors" "fmt" "time" "tydata-server/app/main/api/internal/svc" "tydata-server/app/main/api/internal/types" + "tydata-server/app/main/model" "github.com/hibiken/asynq" "github.com/zeromicro/go-zero/core/logx" @@ -76,11 +78,31 @@ func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Ta agentWallet.FrozenBalance -= commissionAmount agentWallet.UpdateTime = now - // 更新钱包数据库 - _, err = l.svcCtx.AgentWalletModel.Update(ctx, session, agentWallet) - if err != nil { - logx.Errorf("更新代理ID %d 的钱包记录失败: %v", commission.AgentId, err) - return err + // 更新钱包数据库(使用 UpdateWithVersion 保持乐观锁) + updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(ctx, session, agentWallet) + if updateWalletErr != nil { + // 如果是版本冲突错误,重新查询最新的数据后重试 + if errors.Is(updateWalletErr, model.ErrNoRowsUpdate) { + logx.Infof("代理ID %d 的钱包版本冲突,重新查询最新数据重试", commission.AgentId) + latestWallet, findErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(ctx, commission.AgentId) + if findErr != nil { + logx.Errorf("重新查询代理ID %d 的钱包记录失败: %v", commission.AgentId, findErr) + return findErr + } + // 重新累加金额 + latestWallet.Balance += commissionAmount + latestWallet.FrozenBalance -= commissionAmount + latestWallet.UpdateTime = now + retryUpdateErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(ctx, session, latestWallet) + if retryUpdateErr != nil { + logx.Errorf("重试更新代理ID %d 的钱包记录失败: %v", commission.AgentId, retryUpdateErr) + return retryUpdateErr + } + logx.Infof("重试成功,已更新代理ID %d 的钱包记录", commission.AgentId) + } else { + logx.Errorf("更新代理ID %d 的钱包记录失败: %v", commission.AgentId, updateWalletErr) + return updateWalletErr + } } // 钱包更新成功后,再更新佣金状态为已发放 @@ -90,8 +112,32 @@ func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Ta // 更新佣金数据库(使用 UpdateWithVersion 保持乐观锁) err = l.svcCtx.AgentCommissionModel.UpdateWithVersion(ctx, session, commission) if err != nil { - logx.Errorf("更新佣金记录ID %d 失败: %v", commissionID, err) - return err + // 如果是版本冲突错误,重新查询最新的数据后重试 + if errors.Is(err, model.ErrNoRowsUpdate) { + logx.Infof("佣金记录ID %d 版本冲突,重新查询最新数据重试", commissionID) + latestCommission, findErr := l.svcCtx.AgentCommissionModel.FindOne(ctx, commissionID) + if findErr != nil { + logx.Errorf("重新查询佣金记录ID %d 失败: %v", commissionID, findErr) + return findErr + } + // 检查状态是否已被其他操作修改 + if latestCommission.Status != CommissionStatusFrozen { + logx.Errorf("佣金记录ID %d 的状态已被其他操作修改,当前状态: %d", commissionID, latestCommission.Status) + return fmt.Errorf("佣金记录状态已被修改") + } + // 重新更新状态 + latestCommission.Status = CommissionStatusReleased + latestCommission.UpdateTime = now + retryUpdateErr := l.svcCtx.AgentCommissionModel.UpdateWithVersion(ctx, session, latestCommission) + if retryUpdateErr != nil { + logx.Errorf("重试更新佣金记录ID %d 失败: %v", commissionID, retryUpdateErr) + return retryUpdateErr + } + logx.Infof("重试成功,已更新佣金记录ID %d", commissionID) + } else { + logx.Errorf("更新佣金记录ID %d 失败: %v", commissionID, err) + return err + } } logx.Infof("成功解冻佣金记录ID %d,代理ID %d,佣金金额 %.2f,已将佣金金额从冻结余额转移到可用余额",