package queue import ( "context" "encoding/json" "errors" "fmt" "time" "ycc-server/app/main/model" "ycc-server/pkg/lzkit/lzUtils" "ycc-server/app/main/api/internal/svc" "ycc-server/app/main/api/internal/types" "github.com/hibiken/asynq" pkgerrors "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" ) type UnfreezeCommissionHandler struct { svcCtx *svc.ServiceContext } func NewUnfreezeCommissionHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionHandler { return &UnfreezeCommissionHandler{ svcCtx: svcCtx, } } func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Task) error { var payload types.MsgUnfreezeCommissionPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("解析解冻任务负载失败: %w", err) } // 1. 查询冻结任务 freezeTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(ctx, payload.FreezeTaskId) if err != nil { if errors.Is(err, model.ErrNotFound) { logx.Errorf("解冻任务失败,冻结任务不存在: freezeTaskId=%s", payload.FreezeTaskId) return asynq.SkipRetry // 任务不存在,跳过重试 } return fmt.Errorf("查询冻结任务失败: freezeTaskId=%s, err=%w", payload.FreezeTaskId, err) } // 2. 检查任务状态 if freezeTask.Status != 1 { logx.Infof("解冻任务跳过,任务已处理: freezeTaskId=%s, status=%d", payload.FreezeTaskId, freezeTask.Status) return nil // 任务已处理,不重试 } // 3. 检查解冻时间是否已到 if time.Now().Before(freezeTask.UnfreezeTime) { logx.Infof("解冻任务跳过,未到解冻时间: freezeTaskId=%s, unfreezeTime=%v", payload.FreezeTaskId, freezeTask.UnfreezeTime) // 重新发送延迟任务 if err := l.svcCtx.AsynqService.SendUnfreezeTask(payload.FreezeTaskId, freezeTask.UnfreezeTime); err != nil { logx.Errorf("重新发送解冻任务失败: freezeTaskId=%s, err=%v", payload.FreezeTaskId, err) } return nil } // 4. 使用事务处理解冻 err = l.svcCtx.AgentFreezeTaskModel.Trans(ctx, func(transCtx context.Context, session sqlx.Session) error { // 4.1 更新冻结任务状态 freezeTask.Status = 2 // 已解冻 freezeTask.ActualUnfreezeTime = lzUtils.TimeToNullTime(time.Now()) if updateErr := l.svcCtx.AgentFreezeTaskModel.UpdateWithVersion(transCtx, session, freezeTask); updateErr != nil { return pkgerrors.Wrapf(updateErr, "更新冻结任务状态失败") } // 4.2 更新钱包(解冻余额) wallet, walletErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(transCtx, freezeTask.AgentId) if walletErr != nil { return pkgerrors.Wrapf(walletErr, "查询钱包失败, agentId: %s", freezeTask.AgentId) } wallet.FrozenBalance -= freezeTask.FreezeAmount wallet.Balance += freezeTask.FreezeAmount if updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(transCtx, session, wallet); updateWalletErr != nil { return pkgerrors.Wrapf(updateWalletErr, "更新钱包失败") } return nil }) if err != nil { logx.Errorf("解冻任务处理失败: freezeTaskId=%s, err=%v", payload.FreezeTaskId, err) return fmt.Errorf("解冻任务处理失败: freezeTaskId=%s, err=%w", payload.FreezeTaskId, err) } logx.Infof("解冻任务处理成功: freezeTaskId=%s, agentId=%s, amount=%.2f", payload.FreezeTaskId, freezeTask.AgentId, freezeTask.FreezeAmount) return nil }