Files
ycc-proxy-server/app/main/api/internal/queue/unfreezeCommission.go
2025-12-09 18:55:28 +08:00

95 lines
3.5 KiB
Go

package queue
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"ycc-server/app/main/model"
"ycc-server/pkg/lzkit/lzUtils"
"ycc-server/app/main/api/internal/svc"
"ycc-server/app/main/api/internal/types"
"github.com/hibiken/asynq"
pkgerrors "github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
type UnfreezeCommissionHandler struct {
svcCtx *svc.ServiceContext
}
func NewUnfreezeCommissionHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionHandler {
return &UnfreezeCommissionHandler{
svcCtx: svcCtx,
}
}
func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
var payload types.MsgUnfreezeCommissionPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析解冻任务负载失败: %w", err)
}
// 1. 查询冻结任务
freezeTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(ctx, payload.FreezeTaskId)
if err != nil {
if errors.Is(err, model.ErrNotFound) {
logx.Errorf("解冻任务失败,冻结任务不存在: freezeTaskId=%s", payload.FreezeTaskId)
return asynq.SkipRetry // 任务不存在,跳过重试
}
return fmt.Errorf("查询冻结任务失败: freezeTaskId=%s, err=%w", payload.FreezeTaskId, err)
}
// 2. 检查任务状态
if freezeTask.Status != 1 {
logx.Infof("解冻任务跳过,任务已处理: freezeTaskId=%s, status=%d", payload.FreezeTaskId, freezeTask.Status)
return nil // 任务已处理,不重试
}
// 3. 检查解冻时间是否已到
if time.Now().Before(freezeTask.UnfreezeTime) {
logx.Infof("解冻任务跳过,未到解冻时间: freezeTaskId=%s, unfreezeTime=%v", payload.FreezeTaskId, freezeTask.UnfreezeTime)
// 重新发送延迟任务
if err := l.svcCtx.AsynqService.SendUnfreezeTask(payload.FreezeTaskId, freezeTask.UnfreezeTime); err != nil {
logx.Errorf("重新发送解冻任务失败: freezeTaskId=%s, err=%v", payload.FreezeTaskId, err)
}
return nil
}
// 4. 使用事务处理解冻
err = l.svcCtx.AgentFreezeTaskModel.Trans(ctx, func(transCtx context.Context, session sqlx.Session) error {
// 4.1 更新冻结任务状态
freezeTask.Status = 2 // 已解冻
freezeTask.ActualUnfreezeTime = lzUtils.TimeToNullTime(time.Now())
if updateErr := l.svcCtx.AgentFreezeTaskModel.UpdateWithVersion(transCtx, session, freezeTask); updateErr != nil {
return pkgerrors.Wrapf(updateErr, "更新冻结任务状态失败")
}
// 4.2 更新钱包(解冻余额)
wallet, walletErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(transCtx, freezeTask.AgentId)
if walletErr != nil {
return pkgerrors.Wrapf(walletErr, "查询钱包失败, agentId: %s", freezeTask.AgentId)
}
wallet.FrozenBalance -= freezeTask.FreezeAmount
wallet.Balance += freezeTask.FreezeAmount
if updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(transCtx, session, wallet); updateWalletErr != nil {
return pkgerrors.Wrapf(updateWalletErr, "更新钱包失败")
}
return nil
})
if err != nil {
logx.Errorf("解冻任务处理失败: freezeTaskId=%s, err=%v", payload.FreezeTaskId, err)
return fmt.Errorf("解冻任务处理失败: freezeTaskId=%s, err=%w", payload.FreezeTaskId, err)
}
logx.Infof("解冻任务处理成功: freezeTaskId=%s, agentId=%s, amount=%.2f", payload.FreezeTaskId, freezeTask.AgentId, freezeTask.FreezeAmount)
return nil
}