v1.1
This commit is contained in:
@@ -30,34 +30,34 @@ func (l *AgentProcessHandler) ProcessTask(ctx context.Context, t *asynq.Task) er
|
||||
}
|
||||
|
||||
// 获取订单信息
|
||||
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
|
||||
if err != nil {
|
||||
if errors.Is(err, model.ErrNotFound) {
|
||||
logx.Errorf("代理处理任务失败,订单不存在: orderID=%d", payload.OrderID)
|
||||
return asynq.SkipRetry // 订单不存在,跳过重试
|
||||
}
|
||||
return fmt.Errorf("查询订单失败: orderID=%d, err=%w", payload.OrderID, err)
|
||||
}
|
||||
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
|
||||
if err != nil {
|
||||
if errors.Is(err, model.ErrNotFound) {
|
||||
logx.Errorf("代理处理任务失败,订单不存在: orderID=%s", payload.OrderID)
|
||||
return asynq.SkipRetry // 订单不存在,跳过重试
|
||||
}
|
||||
return fmt.Errorf("查询订单失败: orderID=%s, err=%w", payload.OrderID, err)
|
||||
}
|
||||
|
||||
// 检查订单状态
|
||||
if order.Status != "paid" {
|
||||
logx.Infof("代理处理任务跳过,订单未支付: orderID=%d, status=%s", payload.OrderID, order.Status)
|
||||
return nil // 订单未支付,不处理,不重试
|
||||
}
|
||||
if order.Status != "paid" {
|
||||
logx.Infof("代理处理任务跳过,订单未支付: orderID=%s, status=%s", payload.OrderID, order.Status)
|
||||
return nil // 订单未支付,不处理,不重试
|
||||
}
|
||||
|
||||
// 调用代理处理服务
|
||||
err = l.svcCtx.AgentService.AgentProcess(ctx, order)
|
||||
if err != nil {
|
||||
// 记录错误日志,但不阻塞报告流程
|
||||
logx.Errorf("代理处理失败,订单ID: %d, 错误: %v", payload.OrderID, err)
|
||||
// 返回错误以触发重试机制
|
||||
return fmt.Errorf("代理处理失败: orderID=%d, err=%w", payload.OrderID, err)
|
||||
}
|
||||
if err != nil {
|
||||
// 记录错误日志,但不阻塞报告流程
|
||||
logx.Errorf("代理处理失败,订单ID: %s, 错误: %v", payload.OrderID, err)
|
||||
// 返回错误以触发重试机制
|
||||
return fmt.Errorf("代理处理失败: orderID=%s, err=%w", payload.OrderID, err)
|
||||
}
|
||||
|
||||
// 注意:解冻任务现在通过定时任务扫描处理,不再需要发送延迟任务
|
||||
// 定时任务每5分钟扫描一次待解冻的任务,更加可靠
|
||||
logx.Infof("代理处理成功,订单ID: %d,冻结任务(如有)将由定时任务自动处理", payload.OrderID)
|
||||
logx.Infof("代理处理成功,订单ID: %s,冻结任务(如有)将由定时任务自动处理", payload.OrderID)
|
||||
|
||||
logx.Infof("代理处理成功,订单ID: %d", payload.OrderID)
|
||||
return nil
|
||||
logx.Infof("代理处理成功,订单ID: %s", payload.OrderID)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"ycc-server/app/main/model"
|
||||
"ycc-server/common/globalkey"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||
@@ -115,6 +116,7 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
|
||||
// 创建清理日志记录(只创建一次)
|
||||
cleanupLog := &model.QueryCleanupLog{
|
||||
Id: uuid.New().String(),
|
||||
CleanupTime: now,
|
||||
CleanupBefore: cleanupBefore,
|
||||
Status: 1,
|
||||
@@ -122,13 +124,11 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
}
|
||||
|
||||
// 先创建清理日志记录
|
||||
var cleanupLogId int64
|
||||
err = l.svcCtx.QueryCleanupLogModel.Trans(taskCtx, func(logCtx context.Context, logSession sqlx.Session) error {
|
||||
cleanupLogInsertResult, insertErr := l.svcCtx.QueryCleanupLogModel.Insert(logCtx, logSession, cleanupLog)
|
||||
_, insertErr := l.svcCtx.QueryCleanupLogModel.Insert(logCtx, logSession, cleanupLog)
|
||||
if insertErr != nil {
|
||||
return insertErr
|
||||
}
|
||||
cleanupLogId, insertErr = cleanupLogInsertResult.LastInsertId()
|
||||
return insertErr
|
||||
})
|
||||
if err != nil {
|
||||
@@ -136,11 +136,11 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
return err
|
||||
}
|
||||
|
||||
logx.Infof("创建清理日志记录成功,日志ID: %d", cleanupLogId)
|
||||
logx.Infof("创建清理日志记录成功,日志ID: %s", cleanupLog.Id)
|
||||
|
||||
// 分批处理,每个批次使用独立事务
|
||||
batchCount := 0
|
||||
lastProcessedId := int64(0)
|
||||
lastProcessedId := ""
|
||||
|
||||
for {
|
||||
// 检查是否被取消(优雅关闭支持)
|
||||
@@ -148,7 +148,7 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
case <-taskCtx.Done():
|
||||
logx.Infof("清理任务被取消,已处理 %d 批次,共删除 %d 条记录", batchCount, cleanupLog.AffectedRows)
|
||||
// 更新清理日志状态
|
||||
l.updateCleanupLogStatus(taskCtx, cleanupLogId, cleanupLog, fmt.Errorf("任务被取消"))
|
||||
l.updateCleanupLogStatus(taskCtx, cleanupLog.Id, cleanupLog, fmt.Errorf("任务被取消"))
|
||||
return taskCtx.Err()
|
||||
default:
|
||||
// 继续处理
|
||||
@@ -165,7 +165,7 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
Limit(uint64(batchSize))
|
||||
|
||||
// 如果已处理过,从上次处理的ID之后继续
|
||||
if lastProcessedId > 0 {
|
||||
if lastProcessedId != "" {
|
||||
builder = builder.Where("id > ?", lastProcessedId)
|
||||
}
|
||||
|
||||
@@ -191,7 +191,8 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
// 3. 保存清理明细
|
||||
for _, query := range batchQueries {
|
||||
detail := &model.QueryCleanupDetail{
|
||||
CleanupLogId: cleanupLogId,
|
||||
Id: uuid.New().String(),
|
||||
CleanupLogId: cleanupLog.Id,
|
||||
QueryId: query.Id,
|
||||
OrderId: query.OrderId,
|
||||
UserId: query.UserId,
|
||||
@@ -214,7 +215,7 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
if batchErr != nil {
|
||||
// 批次失败,更新清理日志状态
|
||||
logx.Errorf("批次处理失败(批次 %d): %v", batchCount+1, batchErr)
|
||||
l.updateCleanupLogStatus(taskCtx, cleanupLogId, cleanupLog, batchErr)
|
||||
l.updateCleanupLogStatus(taskCtx, cleanupLog.Id, cleanupLog, batchErr)
|
||||
return batchErr
|
||||
}
|
||||
|
||||
@@ -238,7 +239,7 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
}
|
||||
|
||||
// 更新清理日志状态为成功
|
||||
l.updateCleanupLogStatus(taskCtx, cleanupLogId, cleanupLog, nil)
|
||||
l.updateCleanupLogStatus(taskCtx, cleanupLog.Id, cleanupLog, nil)
|
||||
|
||||
duration := time.Since(startTime)
|
||||
logx.Infof("%s - 查询数据清理完成,共处理 %d 批次,删除 %d 条记录,耗时 %v",
|
||||
@@ -247,10 +248,10 @@ func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
||||
}
|
||||
|
||||
// updateCleanupLogStatus 更新清理日志状态
|
||||
func (l *CleanQueryDataHandler) updateCleanupLogStatus(ctx context.Context, logId int64, cleanupLog *model.QueryCleanupLog, err error) {
|
||||
func (l *CleanQueryDataHandler) updateCleanupLogStatus(ctx context.Context, logId string, cleanupLog *model.QueryCleanupLog, err error) {
|
||||
err = l.svcCtx.QueryCleanupLogModel.Trans(ctx, func(updateCtx context.Context, updateSession sqlx.Session) error {
|
||||
// 查询当前日志记录
|
||||
currentLog, findErr := l.svcCtx.QueryCleanupLogModel.FindOne(updateCtx, logId)
|
||||
currentLog, findErr := l.svcCtx.QueryCleanupLogModel.FindOne(updateCtx, cleanupLog.Id)
|
||||
if findErr != nil {
|
||||
return findErr
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"ycc-server/pkg/lzkit/crypto"
|
||||
"ycc-server/pkg/lzkit/lzUtils"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
@@ -28,9 +29,7 @@ func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotif
|
||||
}
|
||||
}
|
||||
|
||||
var payload struct {
|
||||
OrderID int64 `json:"order_id"`
|
||||
}
|
||||
var payload types.MsgPaySuccessQueryPayload
|
||||
|
||||
func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||||
// 从任务的负载中解码数据
|
||||
@@ -41,18 +40,18 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.
|
||||
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
|
||||
if err != nil {
|
||||
// 订单不存在,记录详细日志并跳过重试
|
||||
logx.Errorf("支付成功通知任务失败:订单不存在,订单ID: %d, 错误: %v", payload.OrderID, err)
|
||||
logx.Errorf("支付成功通知任务失败:订单不存在,订单ID: %s, 错误: %v", payload.OrderID, err)
|
||||
return asynq.SkipRetry // 订单不存在时跳过重试,避免重复失败
|
||||
}
|
||||
env := os.Getenv("ENV")
|
||||
if order.Status != "paid" && env != "development" {
|
||||
err = fmt.Errorf("无效的订单: %d", payload.OrderID)
|
||||
err = fmt.Errorf("无效的订单: %s", payload.OrderID)
|
||||
logx.Errorf("处理任务失败,原因: %v", err)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("找不到相关产品: orderID: %d, productID: %d", payload.OrderID, order.ProductId)
|
||||
return fmt.Errorf("找不到相关产品: orderID: %s, productID: %s", payload.OrderID, order.ProductId)
|
||||
}
|
||||
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
|
||||
cache, cacheErr := l.svcCtx.Redis.GetCtx(ctx, redisKey)
|
||||
@@ -75,22 +74,20 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.
|
||||
}
|
||||
|
||||
query := &model.Query{
|
||||
Id: uuid.NewString(),
|
||||
OrderId: order.Id,
|
||||
UserId: order.UserId,
|
||||
ProductId: product.Id,
|
||||
QueryParams: data.Params,
|
||||
QueryState: "pending",
|
||||
}
|
||||
result, insertQueryErr := l.svcCtx.QueryModel.Insert(ctx, nil, query)
|
||||
_, insertQueryErr := l.svcCtx.QueryModel.Insert(ctx, nil, query)
|
||||
if insertQueryErr != nil {
|
||||
return fmt.Errorf("保存查询失败: %+v", insertQueryErr)
|
||||
}
|
||||
|
||||
// 获取插入后的ID
|
||||
queryId, err := result.LastInsertId()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取插入的查询ID失败: %+v", err)
|
||||
}
|
||||
// 插入后使用预生成的查询ID
|
||||
queryId := query.Id
|
||||
|
||||
// 从数据库中查询完整的查询记录
|
||||
query, err = l.svcCtx.QueryModel.FindOne(ctx, queryId)
|
||||
@@ -147,7 +144,7 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.
|
||||
var encryptData string
|
||||
if isEmptyReportMode {
|
||||
// 空报告模式:生成空的报告数据,跳过API调用
|
||||
logx.Infof("空报告模式:订单 %s (ID: %d) 跳过API调用,生成空报告", order.OrderNo, order.Id)
|
||||
logx.Infof("空报告模式:订单 %s (ID: %s) 跳过API调用,生成空报告", order.OrderNo, order.Id)
|
||||
|
||||
// 生成空报告数据结构(根据实际报告格式生成)
|
||||
emptyReportData := []byte(`[]`) // 空数组,表示没有数据
|
||||
@@ -191,12 +188,12 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.
|
||||
// 报告生成成功后,发送代理处理异步任务(不阻塞报告流程)
|
||||
if asyncErr := l.svcCtx.AsynqService.SendAgentProcessTask(order.Id); asyncErr != nil {
|
||||
// 代理处理任务发送失败,只记录日志,不影响报告流程
|
||||
logx.Errorf("发送代理处理任务失败,订单ID: %d, 错误: %v", order.Id, asyncErr)
|
||||
logx.Errorf("发送代理处理任务失败,订单ID: %s, 错误: %v", order.Id, asyncErr)
|
||||
}
|
||||
|
||||
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
|
||||
if delErr != nil {
|
||||
logx.Errorf("删除Redis缓存失败,但任务已成功处理,订单ID: %d, 错误: %v", order.Id, delErr)
|
||||
logx.Errorf("删除Redis缓存失败,但任务已成功处理,订单ID: %s, 错误: %v", order.Id, delErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -209,7 +206,7 @@ func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error
|
||||
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
|
||||
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
|
||||
if delErr != nil {
|
||||
logx.Errorf("删除Redis缓存失败,订单ID: %d, 错误: %v", order.Id, delErr)
|
||||
logx.Errorf("删除Redis缓存失败,订单ID: %s, 错误: %v", order.Id, delErr)
|
||||
}
|
||||
|
||||
if order.Status == "paid" && query.QueryState == "pending" {
|
||||
@@ -235,12 +232,12 @@ func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
if refund.IsSuccess() {
|
||||
logx.Errorf("支付宝退款成功, orderID: %d", order.Id)
|
||||
logx.Errorf("支付宝退款成功, orderID: %s", order.Id)
|
||||
// 更新订单状态为退款
|
||||
order.Status = "refunded"
|
||||
updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order)
|
||||
if updateOrderErr != nil {
|
||||
logx.Errorf("更新订单状态失败,订单ID: %d, 错误: %v", order.Id, updateOrderErr)
|
||||
logx.Errorf("更新订单状态失败,订单ID: %s, 错误: %v", order.Id, updateOrderErr)
|
||||
return fmt.Errorf("更新订单状态失败: %v", updateOrderErr)
|
||||
}
|
||||
return asynq.SkipRetry
|
||||
|
||||
@@ -35,30 +35,30 @@ func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Ta
|
||||
}
|
||||
|
||||
// 1. 查询冻结任务
|
||||
freezeTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(ctx, payload.FreezeTaskId)
|
||||
if err != nil {
|
||||
if errors.Is(err, model.ErrNotFound) {
|
||||
logx.Errorf("解冻任务失败,冻结任务不存在: freezeTaskId=%d", payload.FreezeTaskId)
|
||||
return asynq.SkipRetry // 任务不存在,跳过重试
|
||||
}
|
||||
return fmt.Errorf("查询冻结任务失败: freezeTaskId=%d, err=%w", payload.FreezeTaskId, err)
|
||||
}
|
||||
freezeTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(ctx, payload.FreezeTaskId)
|
||||
if err != nil {
|
||||
if errors.Is(err, model.ErrNotFound) {
|
||||
logx.Errorf("解冻任务失败,冻结任务不存在: freezeTaskId=%s", payload.FreezeTaskId)
|
||||
return asynq.SkipRetry // 任务不存在,跳过重试
|
||||
}
|
||||
return fmt.Errorf("查询冻结任务失败: freezeTaskId=%s, err=%w", payload.FreezeTaskId, err)
|
||||
}
|
||||
|
||||
// 2. 检查任务状态
|
||||
if freezeTask.Status != 1 {
|
||||
logx.Infof("解冻任务跳过,任务已处理: freezeTaskId=%d, status=%d", payload.FreezeTaskId, freezeTask.Status)
|
||||
return nil // 任务已处理,不重试
|
||||
}
|
||||
if freezeTask.Status != 1 {
|
||||
logx.Infof("解冻任务跳过,任务已处理: freezeTaskId=%s, status=%d", payload.FreezeTaskId, freezeTask.Status)
|
||||
return nil // 任务已处理,不重试
|
||||
}
|
||||
|
||||
// 3. 检查解冻时间是否已到
|
||||
if time.Now().Before(freezeTask.UnfreezeTime) {
|
||||
logx.Infof("解冻任务跳过,未到解冻时间: freezeTaskId=%d, unfreezeTime=%v", payload.FreezeTaskId, freezeTask.UnfreezeTime)
|
||||
// 重新发送延迟任务
|
||||
if err := l.svcCtx.AsynqService.SendUnfreezeTask(payload.FreezeTaskId, freezeTask.UnfreezeTime); err != nil {
|
||||
logx.Errorf("重新发送解冻任务失败: freezeTaskId=%d, err=%v", payload.FreezeTaskId, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if time.Now().Before(freezeTask.UnfreezeTime) {
|
||||
logx.Infof("解冻任务跳过,未到解冻时间: freezeTaskId=%s, unfreezeTime=%v", payload.FreezeTaskId, freezeTask.UnfreezeTime)
|
||||
// 重新发送延迟任务
|
||||
if err := l.svcCtx.AsynqService.SendUnfreezeTask(payload.FreezeTaskId, freezeTask.UnfreezeTime); err != nil {
|
||||
logx.Errorf("重新发送解冻任务失败: freezeTaskId=%s, err=%v", payload.FreezeTaskId, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 4. 使用事务处理解冻
|
||||
err = l.svcCtx.AgentFreezeTaskModel.Trans(ctx, func(transCtx context.Context, session sqlx.Session) error {
|
||||
@@ -71,9 +71,9 @@ func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Ta
|
||||
|
||||
// 4.2 更新钱包(解冻余额)
|
||||
wallet, walletErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(transCtx, freezeTask.AgentId)
|
||||
if walletErr != nil {
|
||||
return pkgerrors.Wrapf(walletErr, "查询钱包失败, agentId: %d", freezeTask.AgentId)
|
||||
}
|
||||
if walletErr != nil {
|
||||
return pkgerrors.Wrapf(walletErr, "查询钱包失败, agentId: %s", freezeTask.AgentId)
|
||||
}
|
||||
|
||||
wallet.FrozenBalance -= freezeTask.FreezeAmount
|
||||
wallet.Balance += freezeTask.FreezeAmount
|
||||
@@ -84,11 +84,11 @@ func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Ta
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logx.Errorf("解冻任务处理失败: freezeTaskId=%d, err=%v", payload.FreezeTaskId, err)
|
||||
return fmt.Errorf("解冻任务处理失败: freezeTaskId=%d, err=%w", payload.FreezeTaskId, err)
|
||||
}
|
||||
if err != nil {
|
||||
logx.Errorf("解冻任务处理失败: freezeTaskId=%s, err=%v", payload.FreezeTaskId, err)
|
||||
return fmt.Errorf("解冻任务处理失败: freezeTaskId=%s, err=%w", payload.FreezeTaskId, err)
|
||||
}
|
||||
|
||||
logx.Infof("解冻任务处理成功: freezeTaskId=%d, agentId=%d, amount=%.2f", payload.FreezeTaskId, freezeTask.AgentId, freezeTask.FreezeAmount)
|
||||
return nil
|
||||
logx.Infof("解冻任务处理成功: freezeTaskId=%s, agentId=%s, amount=%.2f", payload.FreezeTaskId, freezeTask.AgentId, freezeTask.FreezeAmount)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user