package queue import ( "context" "strings" "sync" "time" "ycc-server/app/main/model" "ycc-server/pkg/lzkit/lzUtils" "ycc-server/app/main/api/internal/svc" "github.com/hibiken/asynq" "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" ) // UnfreezeCommissionScanHandler 定时扫描解冻任务处理器 type UnfreezeCommissionScanHandler struct { svcCtx *svc.ServiceContext } func NewUnfreezeCommissionScanHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionScanHandler { return &UnfreezeCommissionScanHandler{ svcCtx: svcCtx, } } // ProcessTask 定时扫描需要解冻的任务 func (l *UnfreezeCommissionScanHandler) ProcessTask(ctx context.Context, t *asynq.Task) error { scanStartTime := time.Now() now := time.Now() logx.Infof("开始扫描需要解冻的佣金任务,当前时间: %v", now) // 1. 查询所有待解冻且解冻时间已到的任务 // 使用索引 idx_status 和 idx_unfreeze_time 优化查询 // 不限制查询数量,找到所有需要解冻的任务 builder := l.svcCtx.AgentFreezeTaskModel.SelectBuilder(). Where("status = ? AND unfreeze_time <= ? AND del_state = ?", 1, now, 0). // 1=待解冻,0=未删除 OrderBy("unfreeze_time ASC") // 按解冻时间升序,优先处理最早的任务 freezeTasks, err := l.svcCtx.AgentFreezeTaskModel.FindAll(ctx, builder, "") if err != nil { logx.Errorf("查询待解冻任务失败: %v", err) return errors.Wrapf(err, "查询待解冻任务失败") } // 如果没有需要解冻的任务,直接返回(不创建任何记录,只记录日志) if len(freezeTasks) == 0 { scanDuration := time.Since(scanStartTime) logx.Infof("没有需要解冻的任务,扫描耗时: %v", scanDuration) return nil } // 2. 批次大小限制:如果任务量过大,分批处理 const maxBatchSize = 1000 originalCount := len(freezeTasks) if len(freezeTasks) > maxBatchSize { logx.Errorf("任务数量过多(%d),本次只处理前%d个,剩余将在下次扫描处理", len(freezeTasks), maxBatchSize) freezeTasks = freezeTasks[:maxBatchSize] } logx.Infof("找到 %d 个需要解冻的任务(原始数量: %d),开始处理(最多同时处理2个)", len(freezeTasks), originalCount) // 3. 并发控制:使用信号量限制最多同时处理2个任务 const maxConcurrency = 2 // 最多同时处理2个任务 const taskTimeout = 30 * time.Second // 每个任务30秒超时 semaphore := make(chan struct{}, maxConcurrency) // 信号量通道 var wg sync.WaitGroup var mu sync.Mutex // 保护计数器的互斥锁 successCount := 0 failCount := 0 skipCount := 0 // 跳过的任务数(已处理、时间未到等) // 4. 并发处理所有任务,但最多同时处理2个 for _, freezeTask := range freezeTasks { // 检查是否被取消(优雅关闭支持) select { case <-ctx.Done(): logx.Infof("扫描任务被取消,已处理: 成功=%d, 失败=%d, 跳过=%d", successCount, failCount, skipCount) return ctx.Err() default: // 继续处理 } wg.Add(1) semaphore <- struct{}{} // 获取信号量,如果已满2个则阻塞 go func(task *model.AgentFreezeTask) { defer wg.Done() defer func() { <-semaphore }() // 释放信号量 taskStartTime := time.Now() // 为每个任务设置超时控制 taskCtx, cancel := context.WithTimeout(ctx, taskTimeout) defer cancel() // 使用事务处理每个任务,确保原子性 err := l.svcCtx.AgentFreezeTaskModel.Trans(taskCtx, func(transCtx context.Context, session sqlx.Session) error { // 4.1 重新查询任务(使用乐观锁,确保并发安全) currentTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(transCtx, task.Id) if err != nil { if errors.Is(err, model.ErrNotFound) { logx.Infof("冻结任务不存在,可能已被处理: freezeTaskId=%d", task.Id) return nil // 任务不存在,跳过 } return errors.Wrapf(err, "查询冻结任务失败, freezeTaskId: %d", task.Id) } // 4.2 幂等性增强:检查是否已经解冻过(通过 actual_unfreeze_time) if currentTask.ActualUnfreezeTime.Valid { logx.Infof("任务已解冻,跳过: freezeTaskId=%d, actualUnfreezeTime=%v", task.Id, currentTask.ActualUnfreezeTime.Time) return nil // 已解冻,跳过 } // 4.3 检查任务状态(双重检查,防止并发处理) if currentTask.Status != 1 { logx.Infof("冻结任务状态已变更,跳过处理: freezeTaskId=%d, status=%d", task.Id, currentTask.Status) return nil // 状态已变更,跳过 } // 4.4 再次检查解冻时间(防止时间判断误差) nowTime := time.Now() if nowTime.Before(currentTask.UnfreezeTime) { logx.Infof("冻结任务解冻时间未到,跳过处理: freezeTaskId=%d, unfreezeTime=%v", task.Id, currentTask.UnfreezeTime) return nil // 时间未到,跳过 } // 4.5 计算延迟时间(便于监控) delay := nowTime.Sub(currentTask.UnfreezeTime) if delay > 1*time.Hour { logx.Errorf("解冻任务延迟处理: freezeTaskId=%d, 延迟=%v, unfreezeTime=%v", task.Id, delay, currentTask.UnfreezeTime) } // 4.6 更新冻结任务状态 currentTask.Status = 2 // 已解冻 currentTask.ActualUnfreezeTime = lzUtils.TimeToNullTime(nowTime) if updateErr := l.svcCtx.AgentFreezeTaskModel.UpdateWithVersion(transCtx, session, currentTask); updateErr != nil { return errors.Wrapf(updateErr, "更新冻结任务状态失败, freezeTaskId: %d", task.Id) } // 4.7 更新钱包(解冻余额) wallet, walletErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(transCtx, currentTask.AgentId) if walletErr != nil { return errors.Wrapf(walletErr, "查询钱包失败, agentId: %d", currentTask.AgentId) } // 检查冻结余额是否足够(防止数据异常) if wallet.FrozenBalance < currentTask.FreezeAmount { logx.Errorf("钱包冻结余额不足,数据异常: freezeTaskId=%d, agentId=%d, frozenBalance=%.2f, freezeAmount=%.2f", task.Id, currentTask.AgentId, wallet.FrozenBalance, currentTask.FreezeAmount) return errors.Errorf("钱包冻结余额不足: agentId=%d, frozenBalance=%.2f, freezeAmount=%.2f", currentTask.AgentId, wallet.FrozenBalance, currentTask.FreezeAmount) } wallet.FrozenBalance -= currentTask.FreezeAmount wallet.Balance += currentTask.FreezeAmount if updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(transCtx, session, wallet); updateWalletErr != nil { return errors.Wrapf(updateWalletErr, "更新钱包失败, agentId: %d", currentTask.AgentId) } // 更详细的日志(包含更多上下文信息) logx.Infof("解冻任务处理成功: freezeTaskId=%d, agentId=%d, amount=%.2f, orderPrice=%.2f, freezeTime=%v, unfreezeTime=%v, delay=%v", task.Id, currentTask.AgentId, currentTask.FreezeAmount, currentTask.OrderPrice, currentTask.FreezeTime, currentTask.UnfreezeTime, delay) return nil }) // 记录处理时间 taskDuration := time.Since(taskStartTime) if taskDuration > 5*time.Second { logx.Errorf("解冻任务处理耗时较长: freezeTaskId=%d, duration=%v", task.Id, taskDuration) } // 更新计数器(需要加锁保护) mu.Lock() if err != nil { // 错误分类处理 if isTemporaryError(err) { // 临时错误(如超时、网络问题),记录但继续处理其他任务 failCount++ logx.Errorf("解冻任务临时失败,将在下次扫描重试: freezeTaskId=%d, duration=%v, err=%v", task.Id, taskDuration, err) } else { // 永久错误(如数据异常),记录详细日志 failCount++ logx.Errorf("解冻任务永久失败: freezeTaskId=%d, duration=%v, err=%v", task.Id, taskDuration, err) } } else { successCount++ logx.Infof("解冻任务处理完成: freezeTaskId=%d, duration=%v", task.Id, taskDuration) } mu.Unlock() }(freezeTask) } // 5. 等待所有任务完成 wg.Wait() // 6. 记录扫描统计信息 scanDuration := time.Since(scanStartTime) logx.Infof("解冻任务扫描完成: 成功=%d, 失败=%d, 跳过=%d, 总计=%d, 扫描耗时=%v", successCount, failCount, skipCount, len(freezeTasks), scanDuration) return nil } // isTemporaryError 判断是否为临时错误(可以重试的错误) func isTemporaryError(err error) bool { if err == nil { return false } errStr := err.Error() // 超时错误 if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { return true } // 网络相关错误 errStrLower := strings.ToLower(errStr) if strings.Contains(errStrLower, "timeout") || strings.Contains(errStrLower, "connection") || strings.Contains(errStrLower, "network") { return true } // 数据库连接错误 if strings.Contains(errStrLower, "connection pool") || strings.Contains(errStrLower, "too many connections") { return true } // 其他错误视为永久错误(如数据异常、业务逻辑错误等) return false }