fix add
This commit is contained in:
@@ -2,9 +2,9 @@ package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"tydata-server/app/main/api/internal/svc"
|
||||
"tydata-server/app/main/api/internal/types"
|
||||
"fmt"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
@@ -24,17 +24,28 @@ func NewCronJob(ctx context.Context, svcCtx *svc.ServiceContext) *CronJob {
|
||||
func (l *CronJob) Register() *asynq.ServeMux {
|
||||
redisClientOpt := asynq.RedisClientOpt{Addr: l.svcCtx.Config.CacheRedis[0].Host, Password: l.svcCtx.Config.CacheRedis[0].Pass}
|
||||
scheduler := asynq.NewScheduler(redisClientOpt, nil)
|
||||
|
||||
// 注册清理查询数据任务
|
||||
task := asynq.NewTask(types.MsgCleanQueryData, nil, nil)
|
||||
_, err := scheduler.Register(TASKTIME, task)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("定时任务注册失败:%v", err))
|
||||
}
|
||||
|
||||
// 注释掉原来的佣金解冻定时任务,因为现在改为按需处理单个佣金记录
|
||||
// unfreezeTask := asynq.NewTask(types.MsgUnfreezeCommission, nil, nil)
|
||||
// _, err = scheduler.Register("0 4 * * *", unfreezeTask)
|
||||
// if err != nil {
|
||||
// panic(fmt.Sprintf("佣金解冻任务注册失败:%v", err))
|
||||
// }
|
||||
|
||||
scheduler.Start()
|
||||
fmt.Println("定时任务启动!!!")
|
||||
|
||||
mux := asynq.NewServeMux()
|
||||
mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx))
|
||||
mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx))
|
||||
mux.Handle(types.MsgUnfreezeCommission, NewUnfreezeCommissionHandler(l.svcCtx))
|
||||
|
||||
return mux
|
||||
}
|
||||
|
||||
119
app/main/api/internal/queue/unfreezeCommission.go
Normal file
119
app/main/api/internal/queue/unfreezeCommission.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"tydata-server/app/main/api/internal/svc"
|
||||
"tydata-server/app/main/api/internal/types"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||
)
|
||||
|
||||
// 定义佣金状态常量
|
||||
const (
|
||||
CommissionStatusReleased = 0 // 已发放
|
||||
CommissionStatusFrozen = 1 // 冻结佣金
|
||||
)
|
||||
|
||||
// UNFREEZE_COMMISSION_DELAY_DAYS 定义延迟解冻天数
|
||||
const UNFREEZE_COMMISSION_DELAY_DAYS = 3 // 三天后解冻
|
||||
|
||||
type UnfreezeCommissionHandler struct {
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewUnfreezeCommissionHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionHandler {
|
||||
return &UnfreezeCommissionHandler{
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||||
now := time.Now()
|
||||
logx.Infof("%s - 开始执行佣金解冻任务", now.Format("2006-01-02 15:04:05"))
|
||||
|
||||
// 解析任务payload,获取佣金ID
|
||||
var payload types.MsgUnfreezeCommissionPayload
|
||||
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
|
||||
logx.Errorf("解析佣金解冻任务payload失败: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
commissionID := payload.CommissionID
|
||||
if commissionID <= 0 {
|
||||
logx.Errorf("无效的佣金ID: %d", commissionID)
|
||||
return fmt.Errorf("无效的佣金ID: %d", commissionID)
|
||||
}
|
||||
|
||||
// 根据佣金ID查询特定佣金记录
|
||||
commission, err := l.svcCtx.AgentCommissionModel.FindOne(ctx, commissionID)
|
||||
if err != nil {
|
||||
logx.Errorf("查询佣金记录ID %d 失败: %v", commissionID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 检查佣金状态是否为冻结状态
|
||||
if commission.Status != CommissionStatusFrozen {
|
||||
logx.Infof("佣金记录ID %d 状态不是冻结状态,当前状态: %d,无需处理", commissionID, commission.Status)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 检查是否已到达解冻时间(3天后)
|
||||
unfreezeTime := commission.UpdateTime.AddDate(0, 0, UNFREEZE_COMMISSION_DELAY_DAYS)
|
||||
if now.Before(unfreezeTime) {
|
||||
logx.Infof("佣金记录ID %d 尚未到达解冻时间,解冻时间: %s", commissionID, unfreezeTime.Format("2006-01-02 15:04:05"))
|
||||
return nil
|
||||
}
|
||||
|
||||
// 使用事务处理解冻操作
|
||||
err = l.svcCtx.AgentCommissionModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error {
|
||||
// 更新佣金状态为已发放
|
||||
commission.Status = CommissionStatusReleased
|
||||
commission.UpdateTime = now
|
||||
|
||||
// 更新佣金数据库
|
||||
_, err := l.svcCtx.AgentCommissionModel.Update(ctx, session, commission)
|
||||
if err != nil {
|
||||
logx.Errorf("更新佣金记录ID %d 失败: %v", commissionID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 获取代理钱包记录
|
||||
agentWallet, err := l.svcCtx.AgentWalletModel.FindOneByAgentId(ctx, commission.AgentId)
|
||||
if err != nil {
|
||||
logx.Errorf("查询代理ID %d 的钱包记录失败: %v", commission.AgentId, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 更新钱包余额:增加佣金金额到balance,减少相应的frozen_balance
|
||||
// 注意:这里应该转移的是当前佣金记录的金额,而不是全部冻结余额
|
||||
commissionAmount := commission.Amount
|
||||
agentWallet.Balance += commissionAmount
|
||||
agentWallet.FrozenBalance -= commissionAmount
|
||||
agentWallet.UpdateTime = now
|
||||
|
||||
// 更新钱包数据库
|
||||
_, err = l.svcCtx.AgentWalletModel.Update(ctx, session, agentWallet)
|
||||
if err != nil {
|
||||
logx.Errorf("更新代理ID %d 的钱包记录失败: %v", commission.AgentId, err)
|
||||
return err
|
||||
}
|
||||
|
||||
logx.Infof("成功解冻佣金记录ID %d,代理ID %d,佣金金额 %.2f,已将佣金金额从冻结余额转移到可用余额",
|
||||
commissionID, commission.AgentId, commissionAmount)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logx.Errorf("%s - 佣金解冻任务失败: %v", now.Format("2006-01-02 15:04:05"), err)
|
||||
return err
|
||||
}
|
||||
|
||||
logx.Infof("%s - 佣金解冻任务完成,佣金ID: %d", now.Format("2006-01-02 15:04:05"), commissionID)
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user