Files
ycc-proxy-server/app/main/api/internal/queue/unfreezeCommissionScan.go
2025-12-02 19:57:10 +08:00

234 lines
9.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package queue
import (
"context"
"strings"
"sync"
"time"
"ycc-server/app/main/model"
"ycc-server/pkg/lzkit/lzUtils"
"ycc-server/app/main/api/internal/svc"
"github.com/hibiken/asynq"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
// UnfreezeCommissionScanHandler 定时扫描解冻任务处理器
type UnfreezeCommissionScanHandler struct {
svcCtx *svc.ServiceContext
}
func NewUnfreezeCommissionScanHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionScanHandler {
return &UnfreezeCommissionScanHandler{
svcCtx: svcCtx,
}
}
// ProcessTask 定时扫描需要解冻的任务
func (l *UnfreezeCommissionScanHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
scanStartTime := time.Now()
now := time.Now()
logx.Infof("开始扫描需要解冻的佣金任务,当前时间: %v", now)
// 1. 查询所有待解冻且解冻时间已到的任务
// 使用索引 idx_status 和 idx_unfreeze_time 优化查询
// 不限制查询数量,找到所有需要解冻的任务
builder := l.svcCtx.AgentFreezeTaskModel.SelectBuilder().
Where("status = ? AND unfreeze_time <= ? AND del_state = ?", 1, now, 0). // 1=待解冻0=未删除
OrderBy("unfreeze_time ASC") // 按解冻时间升序,优先处理最早的任务
freezeTasks, err := l.svcCtx.AgentFreezeTaskModel.FindAll(ctx, builder, "")
if err != nil {
logx.Errorf("查询待解冻任务失败: %v", err)
return errors.Wrapf(err, "查询待解冻任务失败")
}
// 如果没有需要解冻的任务,直接返回(不创建任何记录,只记录日志)
if len(freezeTasks) == 0 {
scanDuration := time.Since(scanStartTime)
logx.Infof("没有需要解冻的任务,扫描耗时: %v", scanDuration)
return nil
}
// 2. 批次大小限制:如果任务量过大,分批处理
const maxBatchSize = 1000
originalCount := len(freezeTasks)
if len(freezeTasks) > maxBatchSize {
logx.Errorf("任务数量过多(%d),本次只处理前%d个剩余将在下次扫描处理", len(freezeTasks), maxBatchSize)
freezeTasks = freezeTasks[:maxBatchSize]
}
logx.Infof("找到 %d 个需要解冻的任务(原始数量: %d开始处理最多同时处理2个", len(freezeTasks), originalCount)
// 3. 并发控制使用信号量限制最多同时处理2个任务
const maxConcurrency = 2 // 最多同时处理2个任务
const taskTimeout = 30 * time.Second // 每个任务30秒超时
semaphore := make(chan struct{}, maxConcurrency) // 信号量通道
var wg sync.WaitGroup
var mu sync.Mutex // 保护计数器的互斥锁
successCount := 0
failCount := 0
skipCount := 0 // 跳过的任务数(已处理、时间未到等)
// 4. 并发处理所有任务但最多同时处理2个
for _, freezeTask := range freezeTasks {
// 检查是否被取消(优雅关闭支持)
select {
case <-ctx.Done():
logx.Infof("扫描任务被取消,已处理: 成功=%d, 失败=%d, 跳过=%d", successCount, failCount, skipCount)
return ctx.Err()
default:
// 继续处理
}
wg.Add(1)
semaphore <- struct{}{} // 获取信号量如果已满2个则阻塞
go func(task *model.AgentFreezeTask) {
defer wg.Done()
defer func() { <-semaphore }() // 释放信号量
taskStartTime := time.Now()
// 为每个任务设置超时控制
taskCtx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()
// 使用事务处理每个任务,确保原子性
err := l.svcCtx.AgentFreezeTaskModel.Trans(taskCtx, func(transCtx context.Context, session sqlx.Session) error {
// 4.1 重新查询任务(使用乐观锁,确保并发安全)
currentTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(transCtx, task.Id)
if err != nil {
if errors.Is(err, model.ErrNotFound) {
logx.Infof("冻结任务不存在,可能已被处理: freezeTaskId=%d", task.Id)
return nil // 任务不存在,跳过
}
return errors.Wrapf(err, "查询冻结任务失败, freezeTaskId: %d", task.Id)
}
// 4.2 幂等性增强:检查是否已经解冻过(通过 actual_unfreeze_time
if currentTask.ActualUnfreezeTime.Valid {
logx.Infof("任务已解冻,跳过: freezeTaskId=%d, actualUnfreezeTime=%v", task.Id, currentTask.ActualUnfreezeTime.Time)
return nil // 已解冻,跳过
}
// 4.3 检查任务状态(双重检查,防止并发处理)
if currentTask.Status != 1 {
logx.Infof("冻结任务状态已变更,跳过处理: freezeTaskId=%d, status=%d", task.Id, currentTask.Status)
return nil // 状态已变更,跳过
}
// 4.4 再次检查解冻时间(防止时间判断误差)
nowTime := time.Now()
if nowTime.Before(currentTask.UnfreezeTime) {
logx.Infof("冻结任务解冻时间未到,跳过处理: freezeTaskId=%d, unfreezeTime=%v", task.Id, currentTask.UnfreezeTime)
return nil // 时间未到,跳过
}
// 4.5 计算延迟时间(便于监控)
delay := nowTime.Sub(currentTask.UnfreezeTime)
if delay > 1*time.Hour {
logx.Errorf("解冻任务延迟处理: freezeTaskId=%d, 延迟=%v, unfreezeTime=%v", task.Id, delay, currentTask.UnfreezeTime)
}
// 4.6 更新冻结任务状态
currentTask.Status = 2 // 已解冻
currentTask.ActualUnfreezeTime = lzUtils.TimeToNullTime(nowTime)
if updateErr := l.svcCtx.AgentFreezeTaskModel.UpdateWithVersion(transCtx, session, currentTask); updateErr != nil {
return errors.Wrapf(updateErr, "更新冻结任务状态失败, freezeTaskId: %d", task.Id)
}
// 4.7 更新钱包(解冻余额)
wallet, walletErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(transCtx, currentTask.AgentId)
if walletErr != nil {
return errors.Wrapf(walletErr, "查询钱包失败, agentId: %d", currentTask.AgentId)
}
// 检查冻结余额是否足够(防止数据异常)
if wallet.FrozenBalance < currentTask.FreezeAmount {
logx.Errorf("钱包冻结余额不足,数据异常: freezeTaskId=%d, agentId=%d, frozenBalance=%.2f, freezeAmount=%.2f",
task.Id, currentTask.AgentId, wallet.FrozenBalance, currentTask.FreezeAmount)
return errors.Errorf("钱包冻结余额不足: agentId=%d, frozenBalance=%.2f, freezeAmount=%.2f",
currentTask.AgentId, wallet.FrozenBalance, currentTask.FreezeAmount)
}
wallet.FrozenBalance -= currentTask.FreezeAmount
wallet.Balance += currentTask.FreezeAmount
if updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(transCtx, session, wallet); updateWalletErr != nil {
return errors.Wrapf(updateWalletErr, "更新钱包失败, agentId: %d", currentTask.AgentId)
}
// 更详细的日志(包含更多上下文信息)
logx.Infof("解冻任务处理成功: freezeTaskId=%d, agentId=%d, amount=%.2f, orderPrice=%.2f, freezeTime=%v, unfreezeTime=%v, delay=%v",
task.Id, currentTask.AgentId, currentTask.FreezeAmount, currentTask.OrderPrice,
currentTask.FreezeTime, currentTask.UnfreezeTime, delay)
return nil
})
// 记录处理时间
taskDuration := time.Since(taskStartTime)
if taskDuration > 5*time.Second {
logx.Errorf("解冻任务处理耗时较长: freezeTaskId=%d, duration=%v", task.Id, taskDuration)
}
// 更新计数器(需要加锁保护)
mu.Lock()
if err != nil {
// 错误分类处理
if isTemporaryError(err) {
// 临时错误(如超时、网络问题),记录但继续处理其他任务
failCount++
logx.Errorf("解冻任务临时失败,将在下次扫描重试: freezeTaskId=%d, duration=%v, err=%v", task.Id, taskDuration, err)
} else {
// 永久错误(如数据异常),记录详细日志
failCount++
logx.Errorf("解冻任务永久失败: freezeTaskId=%d, duration=%v, err=%v", task.Id, taskDuration, err)
}
} else {
successCount++
logx.Infof("解冻任务处理完成: freezeTaskId=%d, duration=%v", task.Id, taskDuration)
}
mu.Unlock()
}(freezeTask)
}
// 5. 等待所有任务完成
wg.Wait()
// 6. 记录扫描统计信息
scanDuration := time.Since(scanStartTime)
logx.Infof("解冻任务扫描完成: 成功=%d, 失败=%d, 跳过=%d, 总计=%d, 扫描耗时=%v",
successCount, failCount, skipCount, len(freezeTasks), scanDuration)
return nil
}
// isTemporaryError 判断是否为临时错误(可以重试的错误)
func isTemporaryError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
// 超时错误
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return true
}
// 网络相关错误
errStrLower := strings.ToLower(errStr)
if strings.Contains(errStrLower, "timeout") || strings.Contains(errStrLower, "connection") || strings.Contains(errStrLower, "network") {
return true
}
// 数据库连接错误
if strings.Contains(errStrLower, "connection pool") || strings.Contains(errStrLower, "too many connections") {
return true
}
// 其他错误视为永久错误(如数据异常、业务逻辑错误等)
return false
}