后台面板
This commit is contained in:
@@ -2,7 +2,7 @@ package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"qnc-server/app/user/cmd/api/internal/svc"
|
||||
"qnc-server/app/main/api/internal/svc"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
|
||||
99
app/main/api/internal/queue/delayed_task_handler.go
Normal file
99
app/main/api/internal/queue/delayed_task_handler.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"qnc-server/app/main/api/internal/svc"
|
||||
"qnc-server/app/main/api/internal/types"
|
||||
"qnc-server/app/main/model"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||
)
|
||||
|
||||
type DelayedTaskHandler struct {
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewDelayedTaskHandler(svcCtx *svc.ServiceContext) *DelayedTaskHandler {
|
||||
return &DelayedTaskHandler{
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DelayedTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||||
var payload types.MsgDelayedTaskPayload
|
||||
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
|
||||
return fmt.Errorf("解析任务载荷失败: %v", err)
|
||||
}
|
||||
|
||||
// 解析具体的任务数据
|
||||
var unfreezePayload types.UnfreezeAgentBalancePayload
|
||||
if err := json.Unmarshal([]byte(payload.Data), &unfreezePayload); err != nil {
|
||||
return fmt.Errorf("解析解冻代理金额任务数据失败: %v", err)
|
||||
}
|
||||
|
||||
// 处理解冻代理金额的逻辑
|
||||
err := h.handleUnfreezeAgentBalance(ctx, unfreezePayload)
|
||||
if err != nil {
|
||||
logx.Errorf("处理解冻代理金额任务失败: %v, 代理ID: %d, 订单号: %d",
|
||||
err, unfreezePayload.AgentID, unfreezePayload.OrderID)
|
||||
return err
|
||||
}
|
||||
|
||||
logx.Infof("解冻代理金额任务处理成功: 代理ID: %d, 订单号: %d, 金额: %.2f",
|
||||
unfreezePayload.AgentID, unfreezePayload.OrderID, unfreezePayload.Amount)
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleUnfreezeAgentBalance 处理解冻代理金额的具体业务逻辑
|
||||
func (h *DelayedTaskHandler) handleUnfreezeAgentBalance(ctx context.Context, payload types.UnfreezeAgentBalancePayload) error {
|
||||
// 开启事务处理解冻逻辑
|
||||
err := h.svcCtx.AgentService.AgentWalletModel.Trans(ctx, func(transCtx context.Context, session sqlx.Session) error {
|
||||
// 1. 查询代理钱包
|
||||
agentWallet, err := h.svcCtx.AgentService.AgentWalletModel.FindOneByAgentId(transCtx, payload.AgentID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("查询代理钱包失败: %v", err)
|
||||
}
|
||||
|
||||
// 2. 验证冻结金额
|
||||
if agentWallet.FrozenBalance < payload.Amount {
|
||||
return fmt.Errorf("冻结余额不足: 当前冻结余额 %.2f, 需要解冻金额 %.2f",
|
||||
agentWallet.FrozenBalance, payload.Amount)
|
||||
}
|
||||
|
||||
// 3. 查询订单状况
|
||||
order, err := h.svcCtx.OrderModel.FindOne(transCtx, payload.OrderID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("查询订单失败: %v", err)
|
||||
}
|
||||
|
||||
if order.Status != model.OrderStatusPaid {
|
||||
return fmt.Errorf("订单状态不正确: 当前状态 %s, 期望状态 %s",
|
||||
order.Status, model.OrderStatusPaid)
|
||||
}
|
||||
|
||||
// 5. 更新钱包余额
|
||||
agentWallet.FrozenBalance -= payload.Amount
|
||||
agentWallet.Balance += payload.Amount
|
||||
|
||||
// 6. 保存更新
|
||||
if err := h.svcCtx.AgentService.AgentWalletModel.UpdateWithVersion(transCtx, session, agentWallet); err != nil {
|
||||
return fmt.Errorf("更新钱包失败: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logx.Errorf("解冻代理金额失败: %v, 代理ID: %d, 订单号: %d",
|
||||
err, payload.AgentID, payload.OrderID)
|
||||
return err
|
||||
}
|
||||
|
||||
logx.Infof("解冻代理金额成功: 代理ID: %d, 订单号: %d, 金额: %.2f",
|
||||
payload.AgentID, payload.OrderID, payload.Amount)
|
||||
return nil
|
||||
}
|
||||
@@ -7,9 +7,9 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"qnc-server/app/user/cmd/api/internal/svc"
|
||||
"qnc-server/app/user/cmd/api/internal/types"
|
||||
"qnc-server/app/user/model"
|
||||
"qnc-server/app/main/api/internal/svc"
|
||||
"qnc-server/app/main/api/internal/types"
|
||||
"qnc-server/app/main/model"
|
||||
"qnc-server/pkg/lzkit/crypto"
|
||||
"qnc-server/pkg/lzkit/lzUtils"
|
||||
"time"
|
||||
|
||||
@@ -3,8 +3,8 @@ package queue
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"qnc-server/app/user/cmd/api/internal/svc"
|
||||
"qnc-server/app/user/cmd/api/internal/types"
|
||||
"qnc-server/app/main/api/internal/svc"
|
||||
"qnc-server/app/main/api/internal/types"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
@@ -35,6 +35,7 @@ func (l *CronJob) Register() *asynq.ServeMux {
|
||||
mux := asynq.NewServeMux()
|
||||
mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx))
|
||||
mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx))
|
||||
mux.Handle(types.MsgDelayedTask, NewDelayedTaskHandler(l.svcCtx))
|
||||
|
||||
return mux
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user