v1.0
This commit is contained in:
63
app/main/api/internal/queue/agentProcess.go
Normal file
63
app/main/api/internal/queue/agentProcess.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"ycc-server/app/main/api/internal/svc"
|
||||
"ycc-server/app/main/api/internal/types"
|
||||
"ycc-server/app/main/model"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type AgentProcessHandler struct {
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewAgentProcessHandler(svcCtx *svc.ServiceContext) *AgentProcessHandler {
|
||||
return &AgentProcessHandler{
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *AgentProcessHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||||
var payload types.MsgAgentProcessPayload
|
||||
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
|
||||
return fmt.Errorf("解析代理处理任务负载失败: %w", err)
|
||||
}
|
||||
|
||||
// 获取订单信息
|
||||
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
|
||||
if err != nil {
|
||||
if errors.Is(err, model.ErrNotFound) {
|
||||
logx.Errorf("代理处理任务失败,订单不存在: orderID=%d", payload.OrderID)
|
||||
return asynq.SkipRetry // 订单不存在,跳过重试
|
||||
}
|
||||
return fmt.Errorf("查询订单失败: orderID=%d, err=%w", payload.OrderID, err)
|
||||
}
|
||||
|
||||
// 检查订单状态
|
||||
if order.Status != "paid" {
|
||||
logx.Infof("代理处理任务跳过,订单未支付: orderID=%d, status=%s", payload.OrderID, order.Status)
|
||||
return nil // 订单未支付,不处理,不重试
|
||||
}
|
||||
|
||||
// 调用代理处理服务
|
||||
err = l.svcCtx.AgentService.AgentProcess(ctx, order)
|
||||
if err != nil {
|
||||
// 记录错误日志,但不阻塞报告流程
|
||||
logx.Errorf("代理处理失败,订单ID: %d, 错误: %v", payload.OrderID, err)
|
||||
// 返回错误以触发重试机制
|
||||
return fmt.Errorf("代理处理失败: orderID=%d, err=%w", payload.OrderID, err)
|
||||
}
|
||||
|
||||
// 注意:解冻任务现在通过定时任务扫描处理,不再需要发送延迟任务
|
||||
// 定时任务每5分钟扫描一次待解冻的任务,更加可靠
|
||||
logx.Infof("代理处理成功,订单ID: %d,冻结任务(如有)将由定时任务自动处理", payload.OrderID)
|
||||
|
||||
logx.Infof("代理处理成功,订单ID: %d", payload.OrderID)
|
||||
return nil
|
||||
}
|
||||
@@ -2,13 +2,13 @@ package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"ycc-server/app/main/api/internal/svc"
|
||||
"ycc-server/app/main/model"
|
||||
"ycc-server/common/globalkey"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
"ycc-server/app/main/api/internal/svc"
|
||||
"ycc-server/app/main/model"
|
||||
"ycc-server/common/globalkey"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
@@ -48,11 +48,16 @@ func (l *CleanQueryDataHandler) getConfigValue(ctx context.Context, key string)
|
||||
}
|
||||
|
||||
func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||||
// 添加超时控制:最多运行1小时
|
||||
taskCtx, cancel := context.WithTimeout(ctx, 1*time.Hour)
|
||||
defer cancel()
|
||||
|
||||
startTime := time.Now()
|
||||
now := time.Now()
|
||||
logx.Infof("%s - 开始执行查询数据清理任务", now.Format("2006-01-02 15:04:05"))
|
||||
|
||||
// 1. 检查是否启用清理
|
||||
enableCleanup, err := l.getConfigValue(ctx, "enable_cleanup")
|
||||
enableCleanup, err := l.getConfigValue(taskCtx, "enable_cleanup")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -62,29 +67,53 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
}
|
||||
|
||||
// 2. 获取保留天数
|
||||
retentionDaysStr, err := l.getConfigValue(ctx, "retention_days")
|
||||
retentionDaysStr, err := l.getConfigValue(taskCtx, "retention_days")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
retentionDays, err := strconv.Atoi(retentionDaysStr)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("保留天数配置无效: %v", err)
|
||||
}
|
||||
if retentionDays < 0 {
|
||||
return fmt.Errorf("保留天数不能为负数: %d", retentionDays)
|
||||
}
|
||||
|
||||
// 3. 获取批次大小
|
||||
batchSizeStr, err := l.getConfigValue(ctx, "batch_size")
|
||||
batchSizeStr, err := l.getConfigValue(taskCtx, "batch_size")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
batchSize, err := strconv.Atoi(batchSizeStr)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("批次大小配置无效: %v", err)
|
||||
}
|
||||
if batchSize <= 0 || batchSize > 10000 {
|
||||
return fmt.Errorf("批次大小必须在1-10000之间: %d", batchSize)
|
||||
}
|
||||
|
||||
// 计算清理截止时间
|
||||
cleanupBefore := now.AddDate(0, 0, -retentionDays)
|
||||
|
||||
// 创建清理日志记录
|
||||
// 先快速检查是否有数据需要清理(避免创建无用的日志记录)
|
||||
checkBuilder := l.svcCtx.QueryModel.SelectBuilder().
|
||||
Where("create_time < ?", cleanupBefore).
|
||||
Where("del_state = ?", globalkey.DelStateNo).
|
||||
Limit(1) // 只查询1条,用于判断是否有数据
|
||||
|
||||
checkQueries, checkErr := l.svcCtx.QueryModel.FindAll(taskCtx, checkBuilder, "")
|
||||
if checkErr != nil {
|
||||
logx.Errorf("检查是否有数据需要清理失败: %v", checkErr)
|
||||
return checkErr
|
||||
}
|
||||
|
||||
// 如果没有数据需要清理,直接返回,不创建日志记录
|
||||
if len(checkQueries) == 0 {
|
||||
logx.Infof("%s - 没有需要清理的数据(清理截止时间: %s)", now.Format("2006-01-02 15:04:05"), cleanupBefore.Format("2006-01-02 15:04:05"))
|
||||
return nil
|
||||
}
|
||||
|
||||
// 创建清理日志记录(只创建一次)
|
||||
cleanupLog := &model.QueryCleanupLog{
|
||||
CleanupTime: now,
|
||||
CleanupBefore: cleanupBefore,
|
||||
@@ -92,52 +121,75 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
Remark: sql.NullString{String: "定时清理数据", Valid: true},
|
||||
}
|
||||
|
||||
// 使用事务处理清理操作和日志记录
|
||||
err = l.svcCtx.QueryModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error {
|
||||
// 分批处理
|
||||
for {
|
||||
// 1. 查询一批要删除的记录
|
||||
// 先创建清理日志记录
|
||||
var cleanupLogId int64
|
||||
err = l.svcCtx.QueryCleanupLogModel.Trans(taskCtx, func(logCtx context.Context, logSession sqlx.Session) error {
|
||||
cleanupLogInsertResult, insertErr := l.svcCtx.QueryCleanupLogModel.Insert(logCtx, logSession, cleanupLog)
|
||||
if insertErr != nil {
|
||||
return insertErr
|
||||
}
|
||||
cleanupLogId, insertErr = cleanupLogInsertResult.LastInsertId()
|
||||
return insertErr
|
||||
})
|
||||
if err != nil {
|
||||
logx.Errorf("创建清理日志记录失败: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
logx.Infof("创建清理日志记录成功,日志ID: %d", cleanupLogId)
|
||||
|
||||
// 分批处理,每个批次使用独立事务
|
||||
batchCount := 0
|
||||
lastProcessedId := int64(0)
|
||||
|
||||
for {
|
||||
// 检查是否被取消(优雅关闭支持)
|
||||
select {
|
||||
case <-taskCtx.Done():
|
||||
logx.Infof("清理任务被取消,已处理 %d 批次,共删除 %d 条记录", batchCount, cleanupLog.AffectedRows)
|
||||
// 更新清理日志状态
|
||||
l.updateCleanupLogStatus(taskCtx, cleanupLogId, cleanupLog, fmt.Errorf("任务被取消"))
|
||||
return taskCtx.Err()
|
||||
default:
|
||||
// 继续处理
|
||||
}
|
||||
|
||||
// 每个批次使用独立事务
|
||||
var batchQueries []*model.Query
|
||||
batchErr := l.svcCtx.QueryModel.Trans(taskCtx, func(batchCtx context.Context, batchSession sqlx.Session) error {
|
||||
// 1. 查询一批要删除的记录(添加排序确保一致性)
|
||||
builder := l.svcCtx.QueryModel.SelectBuilder().
|
||||
Where("create_time < ?", cleanupBefore).
|
||||
Where("del_state = ?", globalkey.DelStateNo).
|
||||
OrderBy("id ASC"). // 添加排序,确保处理顺序一致
|
||||
Limit(uint64(batchSize))
|
||||
|
||||
queries, err := l.svcCtx.QueryModel.FindAll(ctx, builder, "")
|
||||
if err != nil {
|
||||
cleanupLog.Status = 2
|
||||
cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
|
||||
return err
|
||||
// 如果已处理过,从上次处理的ID之后继续
|
||||
if lastProcessedId > 0 {
|
||||
builder = builder.Where("id > ?", lastProcessedId)
|
||||
}
|
||||
|
||||
if len(queries) == 0 {
|
||||
break // 没有更多数据需要清理
|
||||
var queryErr error
|
||||
batchQueries, queryErr = l.svcCtx.QueryModel.FindAll(batchCtx, builder, "")
|
||||
if queryErr != nil {
|
||||
return queryErr
|
||||
}
|
||||
|
||||
if len(batchQueries) == 0 {
|
||||
// 没有更多数据需要清理,标记为完成
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2. 执行清理
|
||||
for _, query := range queries {
|
||||
err = l.svcCtx.QueryModel.DeleteSoft(ctx, session, query)
|
||||
if err != nil {
|
||||
cleanupLog.Status = 2
|
||||
cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
|
||||
return err
|
||||
for _, query := range batchQueries {
|
||||
deleteErr := l.svcCtx.QueryModel.DeleteSoft(batchCtx, batchSession, query)
|
||||
if deleteErr != nil {
|
||||
return deleteErr
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 更新影响行数
|
||||
cleanupLog.AffectedRows += int64(len(queries))
|
||||
|
||||
// 4. 保存清理日志(每批次都记录)
|
||||
cleanupLogInsertResult, err := l.svcCtx.QueryCleanupLogModel.Insert(ctx, session, cleanupLog)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cleanupLogId, err := cleanupLogInsertResult.LastInsertId()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 5. 保存清理明细
|
||||
for _, query := range queries {
|
||||
// 3. 保存清理明细
|
||||
for _, query := range batchQueries {
|
||||
detail := &model.QueryCleanupDetail{
|
||||
CleanupLogId: cleanupLogId,
|
||||
QueryId: query.Id,
|
||||
@@ -147,21 +199,76 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
QueryState: query.QueryState,
|
||||
CreateTimeOld: query.CreateTime,
|
||||
}
|
||||
_, err = l.svcCtx.QueryCleanupDetailModel.Insert(ctx, session, detail)
|
||||
if err != nil {
|
||||
return err
|
||||
_, insertErr := l.svcCtx.QueryCleanupDetailModel.Insert(batchCtx, batchSession, detail)
|
||||
if insertErr != nil {
|
||||
return insertErr
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 记录最后处理的ID(用于下次查询)
|
||||
lastProcessedId = batchQueries[len(batchQueries)-1].Id
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if batchErr != nil {
|
||||
// 批次失败,更新清理日志状态
|
||||
logx.Errorf("批次处理失败(批次 %d): %v", batchCount+1, batchErr)
|
||||
l.updateCleanupLogStatus(taskCtx, cleanupLogId, cleanupLog, batchErr)
|
||||
return batchErr
|
||||
}
|
||||
|
||||
return nil
|
||||
// 如果查询结果为空,说明没有更多数据
|
||||
if len(batchQueries) == 0 {
|
||||
logx.Infof("所有数据已处理完成")
|
||||
break
|
||||
}
|
||||
|
||||
// 更新影响行数(在事务外更新,避免重复计算)
|
||||
actualBatchSize := int64(len(batchQueries))
|
||||
cleanupLog.AffectedRows += actualBatchSize
|
||||
batchCount++
|
||||
logx.Infof("批次 %d 处理完成,本批次删除 %d 条记录,累计删除 %d 条记录", batchCount, actualBatchSize, cleanupLog.AffectedRows)
|
||||
|
||||
// 如果本批次查询到的数据少于批次大小,说明已经处理完所有数据
|
||||
if actualBatchSize < int64(batchSize) {
|
||||
logx.Infof("所有数据已处理完成(本批次数据量少于批次大小)")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// 更新清理日志状态为成功
|
||||
l.updateCleanupLogStatus(taskCtx, cleanupLogId, cleanupLog, nil)
|
||||
|
||||
duration := time.Since(startTime)
|
||||
logx.Infof("%s - 查询数据清理完成,共处理 %d 批次,删除 %d 条记录,耗时 %v",
|
||||
now.Format("2006-01-02 15:04:05"), batchCount, cleanupLog.AffectedRows, duration)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateCleanupLogStatus 更新清理日志状态
|
||||
func (l *CleanQueryDataHandler) updateCleanupLogStatus(ctx context.Context, logId int64, cleanupLog *model.QueryCleanupLog, err error) {
|
||||
err = l.svcCtx.QueryCleanupLogModel.Trans(ctx, func(updateCtx context.Context, updateSession sqlx.Session) error {
|
||||
// 查询当前日志记录
|
||||
currentLog, findErr := l.svcCtx.QueryCleanupLogModel.FindOne(updateCtx, logId)
|
||||
if findErr != nil {
|
||||
return findErr
|
||||
}
|
||||
|
||||
// 更新状态和影响行数
|
||||
currentLog.AffectedRows = cleanupLog.AffectedRows
|
||||
if err != nil {
|
||||
currentLog.Status = 2 // 失败
|
||||
currentLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
|
||||
} else {
|
||||
currentLog.Status = 1 // 成功
|
||||
}
|
||||
|
||||
_, updateErr := l.svcCtx.QueryCleanupLogModel.Update(updateCtx, updateSession, currentLog)
|
||||
return updateErr
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logx.Errorf("%s - 清理查询数据失败: %v", now.Format("2006-01-02 15:04:05"), err)
|
||||
return err
|
||||
logx.Errorf("更新清理日志状态失败: %v", err)
|
||||
}
|
||||
|
||||
logx.Infof("%s - 查询数据清理完成,共删除 %d 条记录", now.Format("2006-01-02 15:04:05"), cleanupLog.AffectedRows)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2,17 +2,17 @@ package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"ycc-server/app/main/api/internal/svc"
|
||||
"ycc-server/app/main/api/internal/types"
|
||||
"ycc-server/app/main/model"
|
||||
"ycc-server/pkg/lzkit/crypto"
|
||||
"ycc-server/pkg/lzkit/lzUtils"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"ycc-server/app/main/api/internal/svc"
|
||||
"ycc-server/app/main/api/internal/types"
|
||||
"ycc-server/app/main/model"
|
||||
"ycc-server/pkg/lzkit/crypto"
|
||||
"ycc-server/pkg/lzkit/lzUtils"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
@@ -40,7 +40,9 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.
|
||||
|
||||
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("无效的订单ID: %d, %v", payload.OrderID, err)
|
||||
// 订单不存在,记录详细日志并跳过重试
|
||||
logx.Errorf("支付成功通知任务失败:订单不存在,订单ID: %d, 错误: %v", payload.OrderID, err)
|
||||
return asynq.SkipRetry // 订单不存在时跳过重试,避免重复失败
|
||||
}
|
||||
env := os.Getenv("ENV")
|
||||
if order.Status != "paid" && env != "development" {
|
||||
@@ -139,17 +141,39 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.
|
||||
}
|
||||
}
|
||||
|
||||
// 调用API请求服务
|
||||
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id)
|
||||
if err != nil {
|
||||
return l.handleError(ctx, err, order, query)
|
||||
}
|
||||
// 加密返回响应
|
||||
encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
|
||||
if aesEncryptErr != nil {
|
||||
err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
|
||||
return l.handleError(ctx, err, order, query)
|
||||
// 检查是否为空报告模式(开发环境)
|
||||
isEmptyReportMode := env == "development" && order.PaymentPlatform == "test"
|
||||
|
||||
var encryptData string
|
||||
if isEmptyReportMode {
|
||||
// 空报告模式:生成空的报告数据,跳过API调用
|
||||
logx.Infof("空报告模式:订单 %s (ID: %d) 跳过API调用,生成空报告", order.OrderNo, order.Id)
|
||||
|
||||
// 生成空报告数据结构(根据实际报告格式生成)
|
||||
emptyReportData := []byte(`[]`) // 空数组,表示没有数据
|
||||
|
||||
// 加密空报告数据
|
||||
encryptedEmptyData, aesEncryptErr := crypto.AesEncrypt(emptyReportData, key)
|
||||
if aesEncryptErr != nil {
|
||||
err = fmt.Errorf("加密空报告数据失败: %v", aesEncryptErr)
|
||||
return l.handleError(ctx, err, order, query)
|
||||
}
|
||||
encryptData = encryptedEmptyData
|
||||
} else {
|
||||
// 正常模式:调用API请求服务
|
||||
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id)
|
||||
if err != nil {
|
||||
return l.handleError(ctx, err, order, query)
|
||||
}
|
||||
// 加密返回响应
|
||||
encryptedResponse, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
|
||||
if aesEncryptErr != nil {
|
||||
err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
|
||||
return l.handleError(ctx, err, order, query)
|
||||
}
|
||||
encryptData = encryptedResponse
|
||||
}
|
||||
|
||||
query.QueryData = lzUtils.StringToNullString(encryptData)
|
||||
updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
|
||||
if updateErr != nil {
|
||||
@@ -164,9 +188,10 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.
|
||||
return l.handleError(ctx, updateQueryErr, order, query)
|
||||
}
|
||||
|
||||
err = l.svcCtx.AgentService.AgentProcess(ctx, order)
|
||||
if err != nil {
|
||||
return l.handleError(ctx, err, order, query)
|
||||
// 报告生成成功后,发送代理处理异步任务(不阻塞报告流程)
|
||||
if asyncErr := l.svcCtx.AsynqService.SendAgentProcessTask(order.Id); asyncErr != nil {
|
||||
// 代理处理任务发送失败,只记录日志,不影响报告流程
|
||||
logx.Errorf("发送代理处理任务失败,订单ID: %d, 错误: %v", order.Id, asyncErr)
|
||||
}
|
||||
|
||||
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
|
||||
|
||||
@@ -2,9 +2,9 @@ package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"ycc-server/app/main/api/internal/svc"
|
||||
"ycc-server/app/main/api/internal/types"
|
||||
"fmt"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
@@ -24,17 +24,30 @@ func NewCronJob(ctx context.Context, svcCtx *svc.ServiceContext) *CronJob {
|
||||
func (l *CronJob) Register() *asynq.ServeMux {
|
||||
redisClientOpt := asynq.RedisClientOpt{Addr: l.svcCtx.Config.CacheRedis[0].Host, Password: l.svcCtx.Config.CacheRedis[0].Pass}
|
||||
scheduler := asynq.NewScheduler(redisClientOpt, nil)
|
||||
|
||||
// 注册数据清理定时任务(每天凌晨3点)
|
||||
task := asynq.NewTask(types.MsgCleanQueryData, nil, nil)
|
||||
_, err := scheduler.Register(TASKTIME, task)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("定时任务注册失败:%v", err))
|
||||
}
|
||||
|
||||
// 注册解冻佣金扫描定时任务(每2小时执行一次)
|
||||
unfreezeScanTask := asynq.NewTask(types.MsgUnfreezeCommissionScan, nil, nil)
|
||||
_, err = scheduler.Register("0 */2 * * *", unfreezeScanTask) // 每2小时执行一次(每小时的第0分钟)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("解冻佣金扫描定时任务注册失败:%v", err))
|
||||
}
|
||||
|
||||
scheduler.Start()
|
||||
fmt.Println("定时任务启动!!!")
|
||||
|
||||
mux := asynq.NewServeMux()
|
||||
mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx))
|
||||
mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx))
|
||||
mux.Handle(types.MsgAgentProcess, NewAgentProcessHandler(l.svcCtx))
|
||||
mux.Handle(types.MsgUnfreezeCommission, NewUnfreezeCommissionHandler(l.svcCtx))
|
||||
mux.Handle(types.MsgUnfreezeCommissionScan, NewUnfreezeCommissionScanHandler(l.svcCtx))
|
||||
|
||||
return mux
|
||||
}
|
||||
|
||||
94
app/main/api/internal/queue/unfreezeCommission.go
Normal file
94
app/main/api/internal/queue/unfreezeCommission.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
"ycc-server/app/main/model"
|
||||
"ycc-server/pkg/lzkit/lzUtils"
|
||||
|
||||
"ycc-server/app/main/api/internal/svc"
|
||||
"ycc-server/app/main/api/internal/types"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
pkgerrors "github.com/pkg/errors"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||
)
|
||||
|
||||
type UnfreezeCommissionHandler struct {
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewUnfreezeCommissionHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionHandler {
|
||||
return &UnfreezeCommissionHandler{
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||||
var payload types.MsgUnfreezeCommissionPayload
|
||||
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
|
||||
return fmt.Errorf("解析解冻任务负载失败: %w", err)
|
||||
}
|
||||
|
||||
// 1. 查询冻结任务
|
||||
freezeTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(ctx, payload.FreezeTaskId)
|
||||
if err != nil {
|
||||
if errors.Is(err, model.ErrNotFound) {
|
||||
logx.Errorf("解冻任务失败,冻结任务不存在: freezeTaskId=%d", payload.FreezeTaskId)
|
||||
return asynq.SkipRetry // 任务不存在,跳过重试
|
||||
}
|
||||
return fmt.Errorf("查询冻结任务失败: freezeTaskId=%d, err=%w", payload.FreezeTaskId, err)
|
||||
}
|
||||
|
||||
// 2. 检查任务状态
|
||||
if freezeTask.Status != 1 {
|
||||
logx.Infof("解冻任务跳过,任务已处理: freezeTaskId=%d, status=%d", payload.FreezeTaskId, freezeTask.Status)
|
||||
return nil // 任务已处理,不重试
|
||||
}
|
||||
|
||||
// 3. 检查解冻时间是否已到
|
||||
if time.Now().Before(freezeTask.UnfreezeTime) {
|
||||
logx.Infof("解冻任务跳过,未到解冻时间: freezeTaskId=%d, unfreezeTime=%v", payload.FreezeTaskId, freezeTask.UnfreezeTime)
|
||||
// 重新发送延迟任务
|
||||
if err := l.svcCtx.AsynqService.SendUnfreezeTask(payload.FreezeTaskId, freezeTask.UnfreezeTime); err != nil {
|
||||
logx.Errorf("重新发送解冻任务失败: freezeTaskId=%d, err=%v", payload.FreezeTaskId, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 4. 使用事务处理解冻
|
||||
err = l.svcCtx.AgentFreezeTaskModel.Trans(ctx, func(transCtx context.Context, session sqlx.Session) error {
|
||||
// 4.1 更新冻结任务状态
|
||||
freezeTask.Status = 2 // 已解冻
|
||||
freezeTask.ActualUnfreezeTime = lzUtils.TimeToNullTime(time.Now())
|
||||
if updateErr := l.svcCtx.AgentFreezeTaskModel.UpdateWithVersion(transCtx, session, freezeTask); updateErr != nil {
|
||||
return pkgerrors.Wrapf(updateErr, "更新冻结任务状态失败")
|
||||
}
|
||||
|
||||
// 4.2 更新钱包(解冻余额)
|
||||
wallet, walletErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(transCtx, freezeTask.AgentId)
|
||||
if walletErr != nil {
|
||||
return pkgerrors.Wrapf(walletErr, "查询钱包失败, agentId: %d", freezeTask.AgentId)
|
||||
}
|
||||
|
||||
wallet.FrozenBalance -= freezeTask.FreezeAmount
|
||||
wallet.Balance += freezeTask.FreezeAmount
|
||||
if updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(transCtx, session, wallet); updateWalletErr != nil {
|
||||
return pkgerrors.Wrapf(updateWalletErr, "更新钱包失败")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logx.Errorf("解冻任务处理失败: freezeTaskId=%d, err=%v", payload.FreezeTaskId, err)
|
||||
return fmt.Errorf("解冻任务处理失败: freezeTaskId=%d, err=%w", payload.FreezeTaskId, err)
|
||||
}
|
||||
|
||||
logx.Infof("解冻任务处理成功: freezeTaskId=%d, agentId=%d, amount=%.2f", payload.FreezeTaskId, freezeTask.AgentId, freezeTask.FreezeAmount)
|
||||
return nil
|
||||
}
|
||||
233
app/main/api/internal/queue/unfreezeCommissionScan.go
Normal file
233
app/main/api/internal/queue/unfreezeCommissionScan.go
Normal file
@@ -0,0 +1,233 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"ycc-server/app/main/model"
|
||||
"ycc-server/pkg/lzkit/lzUtils"
|
||||
|
||||
"ycc-server/app/main/api/internal/svc"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||
)
|
||||
|
||||
// UnfreezeCommissionScanHandler 定时扫描解冻任务处理器
|
||||
type UnfreezeCommissionScanHandler struct {
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewUnfreezeCommissionScanHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionScanHandler {
|
||||
return &UnfreezeCommissionScanHandler{
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
// ProcessTask 定时扫描需要解冻的任务
|
||||
func (l *UnfreezeCommissionScanHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||||
scanStartTime := time.Now()
|
||||
now := time.Now()
|
||||
logx.Infof("开始扫描需要解冻的佣金任务,当前时间: %v", now)
|
||||
|
||||
// 1. 查询所有待解冻且解冻时间已到的任务
|
||||
// 使用索引 idx_status 和 idx_unfreeze_time 优化查询
|
||||
// 不限制查询数量,找到所有需要解冻的任务
|
||||
builder := l.svcCtx.AgentFreezeTaskModel.SelectBuilder().
|
||||
Where("status = ? AND unfreeze_time <= ? AND del_state = ?", 1, now, 0). // 1=待解冻,0=未删除
|
||||
OrderBy("unfreeze_time ASC") // 按解冻时间升序,优先处理最早的任务
|
||||
|
||||
freezeTasks, err := l.svcCtx.AgentFreezeTaskModel.FindAll(ctx, builder, "")
|
||||
if err != nil {
|
||||
logx.Errorf("查询待解冻任务失败: %v", err)
|
||||
return errors.Wrapf(err, "查询待解冻任务失败")
|
||||
}
|
||||
|
||||
// 如果没有需要解冻的任务,直接返回(不创建任何记录,只记录日志)
|
||||
if len(freezeTasks) == 0 {
|
||||
scanDuration := time.Since(scanStartTime)
|
||||
logx.Infof("没有需要解冻的任务,扫描耗时: %v", scanDuration)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2. 批次大小限制:如果任务量过大,分批处理
|
||||
const maxBatchSize = 1000
|
||||
originalCount := len(freezeTasks)
|
||||
if len(freezeTasks) > maxBatchSize {
|
||||
logx.Errorf("任务数量过多(%d),本次只处理前%d个,剩余将在下次扫描处理", len(freezeTasks), maxBatchSize)
|
||||
freezeTasks = freezeTasks[:maxBatchSize]
|
||||
}
|
||||
|
||||
logx.Infof("找到 %d 个需要解冻的任务(原始数量: %d),开始处理(最多同时处理2个)", len(freezeTasks), originalCount)
|
||||
|
||||
// 3. 并发控制:使用信号量限制最多同时处理2个任务
|
||||
const maxConcurrency = 2 // 最多同时处理2个任务
|
||||
const taskTimeout = 30 * time.Second // 每个任务30秒超时
|
||||
semaphore := make(chan struct{}, maxConcurrency) // 信号量通道
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex // 保护计数器的互斥锁
|
||||
successCount := 0
|
||||
failCount := 0
|
||||
skipCount := 0 // 跳过的任务数(已处理、时间未到等)
|
||||
|
||||
// 4. 并发处理所有任务,但最多同时处理2个
|
||||
for _, freezeTask := range freezeTasks {
|
||||
// 检查是否被取消(优雅关闭支持)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logx.Infof("扫描任务被取消,已处理: 成功=%d, 失败=%d, 跳过=%d", successCount, failCount, skipCount)
|
||||
return ctx.Err()
|
||||
default:
|
||||
// 继续处理
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
semaphore <- struct{}{} // 获取信号量,如果已满2个则阻塞
|
||||
|
||||
go func(task *model.AgentFreezeTask) {
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphore }() // 释放信号量
|
||||
|
||||
taskStartTime := time.Now()
|
||||
|
||||
// 为每个任务设置超时控制
|
||||
taskCtx, cancel := context.WithTimeout(ctx, taskTimeout)
|
||||
defer cancel()
|
||||
|
||||
// 使用事务处理每个任务,确保原子性
|
||||
err := l.svcCtx.AgentFreezeTaskModel.Trans(taskCtx, func(transCtx context.Context, session sqlx.Session) error {
|
||||
// 4.1 重新查询任务(使用乐观锁,确保并发安全)
|
||||
currentTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(transCtx, task.Id)
|
||||
if err != nil {
|
||||
if errors.Is(err, model.ErrNotFound) {
|
||||
logx.Infof("冻结任务不存在,可能已被处理: freezeTaskId=%d", task.Id)
|
||||
return nil // 任务不存在,跳过
|
||||
}
|
||||
return errors.Wrapf(err, "查询冻结任务失败, freezeTaskId: %d", task.Id)
|
||||
}
|
||||
|
||||
// 4.2 幂等性增强:检查是否已经解冻过(通过 actual_unfreeze_time)
|
||||
if currentTask.ActualUnfreezeTime.Valid {
|
||||
logx.Infof("任务已解冻,跳过: freezeTaskId=%d, actualUnfreezeTime=%v", task.Id, currentTask.ActualUnfreezeTime.Time)
|
||||
return nil // 已解冻,跳过
|
||||
}
|
||||
|
||||
// 4.3 检查任务状态(双重检查,防止并发处理)
|
||||
if currentTask.Status != 1 {
|
||||
logx.Infof("冻结任务状态已变更,跳过处理: freezeTaskId=%d, status=%d", task.Id, currentTask.Status)
|
||||
return nil // 状态已变更,跳过
|
||||
}
|
||||
|
||||
// 4.4 再次检查解冻时间(防止时间判断误差)
|
||||
nowTime := time.Now()
|
||||
if nowTime.Before(currentTask.UnfreezeTime) {
|
||||
logx.Infof("冻结任务解冻时间未到,跳过处理: freezeTaskId=%d, unfreezeTime=%v", task.Id, currentTask.UnfreezeTime)
|
||||
return nil // 时间未到,跳过
|
||||
}
|
||||
|
||||
// 4.5 计算延迟时间(便于监控)
|
||||
delay := nowTime.Sub(currentTask.UnfreezeTime)
|
||||
if delay > 1*time.Hour {
|
||||
logx.Errorf("解冻任务延迟处理: freezeTaskId=%d, 延迟=%v, unfreezeTime=%v", task.Id, delay, currentTask.UnfreezeTime)
|
||||
}
|
||||
|
||||
// 4.6 更新冻结任务状态
|
||||
currentTask.Status = 2 // 已解冻
|
||||
currentTask.ActualUnfreezeTime = lzUtils.TimeToNullTime(nowTime)
|
||||
if updateErr := l.svcCtx.AgentFreezeTaskModel.UpdateWithVersion(transCtx, session, currentTask); updateErr != nil {
|
||||
return errors.Wrapf(updateErr, "更新冻结任务状态失败, freezeTaskId: %d", task.Id)
|
||||
}
|
||||
|
||||
// 4.7 更新钱包(解冻余额)
|
||||
wallet, walletErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(transCtx, currentTask.AgentId)
|
||||
if walletErr != nil {
|
||||
return errors.Wrapf(walletErr, "查询钱包失败, agentId: %d", currentTask.AgentId)
|
||||
}
|
||||
|
||||
// 检查冻结余额是否足够(防止数据异常)
|
||||
if wallet.FrozenBalance < currentTask.FreezeAmount {
|
||||
logx.Errorf("钱包冻结余额不足,数据异常: freezeTaskId=%d, agentId=%d, frozenBalance=%.2f, freezeAmount=%.2f",
|
||||
task.Id, currentTask.AgentId, wallet.FrozenBalance, currentTask.FreezeAmount)
|
||||
return errors.Errorf("钱包冻结余额不足: agentId=%d, frozenBalance=%.2f, freezeAmount=%.2f",
|
||||
currentTask.AgentId, wallet.FrozenBalance, currentTask.FreezeAmount)
|
||||
}
|
||||
|
||||
wallet.FrozenBalance -= currentTask.FreezeAmount
|
||||
wallet.Balance += currentTask.FreezeAmount
|
||||
if updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(transCtx, session, wallet); updateWalletErr != nil {
|
||||
return errors.Wrapf(updateWalletErr, "更新钱包失败, agentId: %d", currentTask.AgentId)
|
||||
}
|
||||
|
||||
// 更详细的日志(包含更多上下文信息)
|
||||
logx.Infof("解冻任务处理成功: freezeTaskId=%d, agentId=%d, amount=%.2f, orderPrice=%.2f, freezeTime=%v, unfreezeTime=%v, delay=%v",
|
||||
task.Id, currentTask.AgentId, currentTask.FreezeAmount, currentTask.OrderPrice,
|
||||
currentTask.FreezeTime, currentTask.UnfreezeTime, delay)
|
||||
return nil
|
||||
})
|
||||
|
||||
// 记录处理时间
|
||||
taskDuration := time.Since(taskStartTime)
|
||||
if taskDuration > 5*time.Second {
|
||||
logx.Errorf("解冻任务处理耗时较长: freezeTaskId=%d, duration=%v", task.Id, taskDuration)
|
||||
}
|
||||
|
||||
// 更新计数器(需要加锁保护)
|
||||
mu.Lock()
|
||||
if err != nil {
|
||||
// 错误分类处理
|
||||
if isTemporaryError(err) {
|
||||
// 临时错误(如超时、网络问题),记录但继续处理其他任务
|
||||
failCount++
|
||||
logx.Errorf("解冻任务临时失败,将在下次扫描重试: freezeTaskId=%d, duration=%v, err=%v", task.Id, taskDuration, err)
|
||||
} else {
|
||||
// 永久错误(如数据异常),记录详细日志
|
||||
failCount++
|
||||
logx.Errorf("解冻任务永久失败: freezeTaskId=%d, duration=%v, err=%v", task.Id, taskDuration, err)
|
||||
}
|
||||
} else {
|
||||
successCount++
|
||||
logx.Infof("解冻任务处理完成: freezeTaskId=%d, duration=%v", task.Id, taskDuration)
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
}(freezeTask)
|
||||
}
|
||||
|
||||
// 5. 等待所有任务完成
|
||||
wg.Wait()
|
||||
|
||||
// 6. 记录扫描统计信息
|
||||
scanDuration := time.Since(scanStartTime)
|
||||
logx.Infof("解冻任务扫描完成: 成功=%d, 失败=%d, 跳过=%d, 总计=%d, 扫描耗时=%v",
|
||||
successCount, failCount, skipCount, len(freezeTasks), scanDuration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isTemporaryError 判断是否为临时错误(可以重试的错误)
|
||||
func isTemporaryError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
errStr := err.Error()
|
||||
// 超时错误
|
||||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||||
return true
|
||||
}
|
||||
// 网络相关错误
|
||||
errStrLower := strings.ToLower(errStr)
|
||||
if strings.Contains(errStrLower, "timeout") || strings.Contains(errStrLower, "connection") || strings.Contains(errStrLower, "network") {
|
||||
return true
|
||||
}
|
||||
// 数据库连接错误
|
||||
if strings.Contains(errStrLower, "connection pool") || strings.Contains(errStrLower, "too many connections") {
|
||||
return true
|
||||
}
|
||||
|
||||
// 其他错误视为永久错误(如数据异常、业务逻辑错误等)
|
||||
return false
|
||||
}
|
||||
Reference in New Issue
Block a user