This commit is contained in:
2026-04-29 11:38:59 +08:00
parent e96e3f9583
commit 7d363f4e8a
30 changed files with 1135 additions and 215 deletions

View File

@@ -0,0 +1,49 @@
package queue
import (
"context"
"encoding/json"
"fmt"
"time"
"bdrp-server/app/main/api/internal/service"
"bdrp-server/app/main/api/internal/svc"
"bdrp-server/app/main/api/internal/types"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type AgentMembershipExpireHandleHandler struct {
svcCtx *svc.ServiceContext
}
func NewAgentMembershipExpireHandleHandler(svcCtx *svc.ServiceContext) *AgentMembershipExpireHandleHandler {
return &AgentMembershipExpireHandleHandler{
svcCtx: svcCtx,
}
}
func (l *AgentMembershipExpireHandleHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
var payload types.MsgAgentMembershipExpireHandlePayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
logx.Errorf("解析会员到期处理任务payload失败: %v", err)
return err
}
if payload.AgentID <= 0 {
return fmt.Errorf("无效代理ID: %d", payload.AgentID)
}
now := time.Now()
downgraded, err := service.DowngradeExpiredMembership(ctx, l.svcCtx.AgentModel, payload.AgentID, now)
if err != nil {
logx.Errorf("会员到期处理失败代理ID: %d, 错误: %v", payload.AgentID, err)
return err
}
if downgraded {
logx.Infof("会员到期处理成功代理ID: %d 已降级为普通代理", payload.AgentID)
} else {
logx.Infof("会员到期处理跳过代理ID: %d 当前无需降级", payload.AgentID)
}
return nil
}

View File

@@ -0,0 +1,68 @@
package queue
import (
"context"
"time"
"bdrp-server/app/main/api/internal/service"
"bdrp-server/app/main/api/internal/svc"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type AgentMembershipExpireScanHandler struct {
svcCtx *svc.ServiceContext
}
func NewAgentMembershipExpireScanHandler(svcCtx *svc.ServiceContext) *AgentMembershipExpireScanHandler {
return &AgentMembershipExpireScanHandler{
svcCtx: svcCtx,
}
}
func (l *AgentMembershipExpireScanHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
now := time.Now()
dayStart := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
dayEnd := dayStart.Add(24*time.Hour - time.Second)
// 1. 扫描当天将到期会员,并为每个会员安排准点处理任务
todayAgents, err := service.ListTodayWillExpireAgents(ctx, l.svcCtx.AgentModel, dayStart, dayEnd)
if err != nil {
logx.Errorf("扫描当天到期会员失败: %v", err)
return err
}
for _, agent := range todayAgents {
processAt := agent.MembershipExpiryTime.Time
if processAt.Before(now) {
processAt = now
}
if l.svcCtx.AsynqService != nil {
if sendErr := l.svcCtx.AsynqService.SendAgentMembershipExpireHandleTask(agent.Id, processAt); sendErr != nil {
logx.Errorf("安排会员到期处理任务失败代理ID: %d, 错误: %v", agent.Id, sendErr)
}
}
}
// 2. 补偿处理:扫描已过期但尚未降级的会员并立即降级
expiredAgents, err := service.ListExpiredUnprocessedAgents(ctx, l.svcCtx.AgentModel, now)
if err != nil {
logx.Errorf("扫描已过期未处理会员失败: %v", err)
return err
}
downgradedCount := 0
for _, agent := range expiredAgents {
downgraded, degradeErr := service.DowngradeExpiredMembership(ctx, l.svcCtx.AgentModel, agent.Id, now)
if degradeErr != nil {
logx.Errorf("补偿降级失败代理ID: %d, 错误: %v", agent.Id, degradeErr)
continue
}
if downgraded {
downgradedCount++
}
}
logx.Infof("会员到期扫描完成,当天到期会员: %d, 补偿降级数量: %d", len(todayAgents), downgradedCount)
return nil
}

View File

@@ -10,7 +10,6 @@ import (
"path"
"regexp"
"strings"
paylogic "bdrp-server/app/main/api/internal/logic/pay"
"bdrp-server/app/main/api/internal/service"
"bdrp-server/app/main/api/internal/svc"
"bdrp-server/app/main/api/internal/types"
@@ -283,8 +282,8 @@ func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error
return fmt.Errorf("更新订单状态失败: %v", updateOrderErr)
}
// 使用公共函数按本次退款金额处理佣金和钱包扣除
_ = paylogic.HandleCommissionAndWalletDeduction(ctx, l.svcCtx, nil, order, order.Amount)
// 使用 AgentService 中的共用退款扣款逻辑
l.svcCtx.AgentService.HandleOrderRefundDeduction(ctx, nil, order, order.Amount)
return asynq.SkipRetry
} else {

View File

@@ -1,12 +1,14 @@
package queue
import (
"context"
"fmt"
"bdrp-server/app/main/api/internal/svc"
"bdrp-server/app/main/api/internal/types"
"context"
"fmt"
"time"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type CronJob struct {
@@ -14,6 +16,8 @@ type CronJob struct {
svcCtx *svc.ServiceContext
}
const AgentMembershipExpireScanTaskTime = "5 0 * * *"
func NewCronJob(ctx context.Context, svcCtx *svc.ServiceContext) *CronJob {
return &CronJob{
ctx: ctx,
@@ -31,6 +35,29 @@ func (l *CronJob) Register() *asynq.ServeMux {
if err != nil {
panic(fmt.Sprintf("定时任务注册失败:%v", err))
}
// 注册会员到期扫描任务(每天凌晨执行)
expireScanTask := asynq.NewTask(types.MsgAgentMembershipExpireScan, nil, nil)
_, err = scheduler.Register(AgentMembershipExpireScanTaskTime, expireScanTask)
if err != nil {
panic(fmt.Sprintf("会员到期扫描任务注册失败:%v", err))
}
// 启动补偿:服务启动后立即触发一次扫描任务(按日期去重)
client := asynq.NewClient(redisClientOpt)
startupTaskID := fmt.Sprintf("agent_membership_expire_scan_startup_%s", time.Now().Format("20060102"))
_, enqueueErr := client.Enqueue(
asynq.NewTask(types.MsgAgentMembershipExpireScan, nil),
asynq.MaxRetry(1),
asynq.TaskID(startupTaskID),
)
if enqueueErr != nil {
logx.Errorf("启动补偿扫描任务入队失败: %v", enqueueErr)
}
if closeErr := client.Close(); closeErr != nil {
logx.Errorf("关闭启动补偿任务客户端失败: %v", closeErr)
}
scheduler.Start()
fmt.Println("定时任务启动!!!")
@@ -38,6 +65,8 @@ func (l *CronJob) Register() *asynq.ServeMux {
mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx))
mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx))
mux.Handle(types.MsgUnfreezeCommission, NewUnfreezeCommissionHandler(l.svcCtx))
mux.Handle(types.MsgAgentMembershipExpireScan, NewAgentMembershipExpireScanHandler(l.svcCtx))
mux.Handle(types.MsgAgentMembershipExpireHandle, NewAgentMembershipExpireHandleHandler(l.svcCtx))
return mux
}