234 lines
9.0 KiB
Go
234 lines
9.0 KiB
Go
package queue
|
||
|
||
import (
|
||
"context"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
"ycc-server/app/main/model"
|
||
"ycc-server/pkg/lzkit/lzUtils"
|
||
|
||
"ycc-server/app/main/api/internal/svc"
|
||
|
||
"github.com/hibiken/asynq"
|
||
"github.com/pkg/errors"
|
||
"github.com/zeromicro/go-zero/core/logx"
|
||
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
||
)
|
||
|
||
// UnfreezeCommissionScanHandler 定时扫描解冻任务处理器
|
||
type UnfreezeCommissionScanHandler struct {
|
||
svcCtx *svc.ServiceContext
|
||
}
|
||
|
||
func NewUnfreezeCommissionScanHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionScanHandler {
|
||
return &UnfreezeCommissionScanHandler{
|
||
svcCtx: svcCtx,
|
||
}
|
||
}
|
||
|
||
// ProcessTask 定时扫描需要解冻的任务
|
||
func (l *UnfreezeCommissionScanHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||
scanStartTime := time.Now()
|
||
now := time.Now()
|
||
logx.Infof("开始扫描需要解冻的佣金任务,当前时间: %v", now)
|
||
|
||
// 1. 查询所有待解冻且解冻时间已到的任务
|
||
// 使用索引 idx_status 和 idx_unfreeze_time 优化查询
|
||
// 不限制查询数量,找到所有需要解冻的任务
|
||
builder := l.svcCtx.AgentFreezeTaskModel.SelectBuilder().
|
||
Where("status = ? AND unfreeze_time <= ? AND del_state = ?", 1, now, 0). // 1=待解冻,0=未删除
|
||
OrderBy("unfreeze_time ASC") // 按解冻时间升序,优先处理最早的任务
|
||
|
||
freezeTasks, err := l.svcCtx.AgentFreezeTaskModel.FindAll(ctx, builder, "")
|
||
if err != nil {
|
||
logx.Errorf("查询待解冻任务失败: %v", err)
|
||
return errors.Wrapf(err, "查询待解冻任务失败")
|
||
}
|
||
|
||
// 如果没有需要解冻的任务,直接返回(不创建任何记录,只记录日志)
|
||
if len(freezeTasks) == 0 {
|
||
scanDuration := time.Since(scanStartTime)
|
||
logx.Infof("没有需要解冻的任务,扫描耗时: %v", scanDuration)
|
||
return nil
|
||
}
|
||
|
||
// 2. 批次大小限制:如果任务量过大,分批处理
|
||
const maxBatchSize = 1000
|
||
originalCount := len(freezeTasks)
|
||
if len(freezeTasks) > maxBatchSize {
|
||
logx.Errorf("任务数量过多(%d),本次只处理前%d个,剩余将在下次扫描处理", len(freezeTasks), maxBatchSize)
|
||
freezeTasks = freezeTasks[:maxBatchSize]
|
||
}
|
||
|
||
logx.Infof("找到 %d 个需要解冻的任务(原始数量: %d),开始处理(最多同时处理2个)", len(freezeTasks), originalCount)
|
||
|
||
// 3. 并发控制:使用信号量限制最多同时处理2个任务
|
||
const maxConcurrency = 2 // 最多同时处理2个任务
|
||
const taskTimeout = 30 * time.Second // 每个任务30秒超时
|
||
semaphore := make(chan struct{}, maxConcurrency) // 信号量通道
|
||
var wg sync.WaitGroup
|
||
var mu sync.Mutex // 保护计数器的互斥锁
|
||
successCount := 0
|
||
failCount := 0
|
||
skipCount := 0 // 跳过的任务数(已处理、时间未到等)
|
||
|
||
// 4. 并发处理所有任务,但最多同时处理2个
|
||
for _, freezeTask := range freezeTasks {
|
||
// 检查是否被取消(优雅关闭支持)
|
||
select {
|
||
case <-ctx.Done():
|
||
logx.Infof("扫描任务被取消,已处理: 成功=%d, 失败=%d, 跳过=%d", successCount, failCount, skipCount)
|
||
return ctx.Err()
|
||
default:
|
||
// 继续处理
|
||
}
|
||
|
||
wg.Add(1)
|
||
semaphore <- struct{}{} // 获取信号量,如果已满2个则阻塞
|
||
|
||
go func(task *model.AgentFreezeTask) {
|
||
defer wg.Done()
|
||
defer func() { <-semaphore }() // 释放信号量
|
||
|
||
taskStartTime := time.Now()
|
||
|
||
// 为每个任务设置超时控制
|
||
taskCtx, cancel := context.WithTimeout(ctx, taskTimeout)
|
||
defer cancel()
|
||
|
||
// 使用事务处理每个任务,确保原子性
|
||
err := l.svcCtx.AgentFreezeTaskModel.Trans(taskCtx, func(transCtx context.Context, session sqlx.Session) error {
|
||
// 4.1 重新查询任务(使用乐观锁,确保并发安全)
|
||
currentTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(transCtx, task.Id)
|
||
if err != nil {
|
||
if errors.Is(err, model.ErrNotFound) {
|
||
logx.Infof("冻结任务不存在,可能已被处理: freezeTaskId=%d", task.Id)
|
||
return nil // 任务不存在,跳过
|
||
}
|
||
return errors.Wrapf(err, "查询冻结任务失败, freezeTaskId: %d", task.Id)
|
||
}
|
||
|
||
// 4.2 幂等性增强:检查是否已经解冻过(通过 actual_unfreeze_time)
|
||
if currentTask.ActualUnfreezeTime.Valid {
|
||
logx.Infof("任务已解冻,跳过: freezeTaskId=%d, actualUnfreezeTime=%v", task.Id, currentTask.ActualUnfreezeTime.Time)
|
||
return nil // 已解冻,跳过
|
||
}
|
||
|
||
// 4.3 检查任务状态(双重检查,防止并发处理)
|
||
if currentTask.Status != 1 {
|
||
logx.Infof("冻结任务状态已变更,跳过处理: freezeTaskId=%d, status=%d", task.Id, currentTask.Status)
|
||
return nil // 状态已变更,跳过
|
||
}
|
||
|
||
// 4.4 再次检查解冻时间(防止时间判断误差)
|
||
nowTime := time.Now()
|
||
if nowTime.Before(currentTask.UnfreezeTime) {
|
||
logx.Infof("冻结任务解冻时间未到,跳过处理: freezeTaskId=%d, unfreezeTime=%v", task.Id, currentTask.UnfreezeTime)
|
||
return nil // 时间未到,跳过
|
||
}
|
||
|
||
// 4.5 计算延迟时间(便于监控)
|
||
delay := nowTime.Sub(currentTask.UnfreezeTime)
|
||
if delay > 1*time.Hour {
|
||
logx.Errorf("解冻任务延迟处理: freezeTaskId=%d, 延迟=%v, unfreezeTime=%v", task.Id, delay, currentTask.UnfreezeTime)
|
||
}
|
||
|
||
// 4.6 更新冻结任务状态
|
||
currentTask.Status = 2 // 已解冻
|
||
currentTask.ActualUnfreezeTime = lzUtils.TimeToNullTime(nowTime)
|
||
if updateErr := l.svcCtx.AgentFreezeTaskModel.UpdateWithVersion(transCtx, session, currentTask); updateErr != nil {
|
||
return errors.Wrapf(updateErr, "更新冻结任务状态失败, freezeTaskId: %d", task.Id)
|
||
}
|
||
|
||
// 4.7 更新钱包(解冻余额)
|
||
wallet, walletErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(transCtx, currentTask.AgentId)
|
||
if walletErr != nil {
|
||
return errors.Wrapf(walletErr, "查询钱包失败, agentId: %d", currentTask.AgentId)
|
||
}
|
||
|
||
// 检查冻结余额是否足够(防止数据异常)
|
||
if wallet.FrozenBalance < currentTask.FreezeAmount {
|
||
logx.Errorf("钱包冻结余额不足,数据异常: freezeTaskId=%d, agentId=%d, frozenBalance=%.2f, freezeAmount=%.2f",
|
||
task.Id, currentTask.AgentId, wallet.FrozenBalance, currentTask.FreezeAmount)
|
||
return errors.Errorf("钱包冻结余额不足: agentId=%d, frozenBalance=%.2f, freezeAmount=%.2f",
|
||
currentTask.AgentId, wallet.FrozenBalance, currentTask.FreezeAmount)
|
||
}
|
||
|
||
wallet.FrozenBalance -= currentTask.FreezeAmount
|
||
wallet.Balance += currentTask.FreezeAmount
|
||
if updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(transCtx, session, wallet); updateWalletErr != nil {
|
||
return errors.Wrapf(updateWalletErr, "更新钱包失败, agentId: %d", currentTask.AgentId)
|
||
}
|
||
|
||
// 更详细的日志(包含更多上下文信息)
|
||
logx.Infof("解冻任务处理成功: freezeTaskId=%d, agentId=%d, amount=%.2f, orderPrice=%.2f, freezeTime=%v, unfreezeTime=%v, delay=%v",
|
||
task.Id, currentTask.AgentId, currentTask.FreezeAmount, currentTask.OrderPrice,
|
||
currentTask.FreezeTime, currentTask.UnfreezeTime, delay)
|
||
return nil
|
||
})
|
||
|
||
// 记录处理时间
|
||
taskDuration := time.Since(taskStartTime)
|
||
if taskDuration > 5*time.Second {
|
||
logx.Errorf("解冻任务处理耗时较长: freezeTaskId=%d, duration=%v", task.Id, taskDuration)
|
||
}
|
||
|
||
// 更新计数器(需要加锁保护)
|
||
mu.Lock()
|
||
if err != nil {
|
||
// 错误分类处理
|
||
if isTemporaryError(err) {
|
||
// 临时错误(如超时、网络问题),记录但继续处理其他任务
|
||
failCount++
|
||
logx.Errorf("解冻任务临时失败,将在下次扫描重试: freezeTaskId=%d, duration=%v, err=%v", task.Id, taskDuration, err)
|
||
} else {
|
||
// 永久错误(如数据异常),记录详细日志
|
||
failCount++
|
||
logx.Errorf("解冻任务永久失败: freezeTaskId=%d, duration=%v, err=%v", task.Id, taskDuration, err)
|
||
}
|
||
} else {
|
||
successCount++
|
||
logx.Infof("解冻任务处理完成: freezeTaskId=%d, duration=%v", task.Id, taskDuration)
|
||
}
|
||
mu.Unlock()
|
||
|
||
}(freezeTask)
|
||
}
|
||
|
||
// 5. 等待所有任务完成
|
||
wg.Wait()
|
||
|
||
// 6. 记录扫描统计信息
|
||
scanDuration := time.Since(scanStartTime)
|
||
logx.Infof("解冻任务扫描完成: 成功=%d, 失败=%d, 跳过=%d, 总计=%d, 扫描耗时=%v",
|
||
successCount, failCount, skipCount, len(freezeTasks), scanDuration)
|
||
|
||
return nil
|
||
}
|
||
|
||
// isTemporaryError 判断是否为临时错误(可以重试的错误)
|
||
func isTemporaryError(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
|
||
errStr := err.Error()
|
||
// 超时错误
|
||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||
return true
|
||
}
|
||
// 网络相关错误
|
||
errStrLower := strings.ToLower(errStr)
|
||
if strings.Contains(errStrLower, "timeout") || strings.Contains(errStrLower, "connection") || strings.Contains(errStrLower, "network") {
|
||
return true
|
||
}
|
||
// 数据库连接错误
|
||
if strings.Contains(errStrLower, "connection pool") || strings.Contains(errStrLower, "too many connections") {
|
||
return true
|
||
}
|
||
|
||
// 其他错误视为永久错误(如数据异常、业务逻辑错误等)
|
||
return false
|
||
}
|