133 lines
4.6 KiB
Markdown
133 lines
4.6 KiB
Markdown
|
|
# 解冻任务实现方案说明
|
|||
|
|
|
|||
|
|
## 方案对比
|
|||
|
|
|
|||
|
|
### 方案1:Asynq 延迟任务(已实现但未使用)
|
|||
|
|
**优点:**
|
|||
|
|
- ✅ 精确到秒级执行
|
|||
|
|
- ✅ 自动重试机制
|
|||
|
|
- ✅ 无需额外调度器
|
|||
|
|
|
|||
|
|
**缺点:**
|
|||
|
|
- ❌ 依赖 Redis 持久化,Redis 数据丢失会导致任务丢失
|
|||
|
|
- ❌ 系统长时间停机可能导致延迟任务过期
|
|||
|
|
- ❌ 需要补偿机制
|
|||
|
|
|
|||
|
|
### 方案2:定时任务扫描(✅ 已采用)
|
|||
|
|
**优点:**
|
|||
|
|
- ✅ **数据持久化在数据库,更可靠**(核心优势)
|
|||
|
|
- ✅ **系统停机后重启,定时任务会继续扫描并处理**(核心优势)
|
|||
|
|
- ✅ 可以批量处理,效率高
|
|||
|
|
- ✅ 已有定时任务基础设施
|
|||
|
|
- ✅ 不依赖 Redis 持久化
|
|||
|
|
|
|||
|
|
**缺点:**
|
|||
|
|
- ⚠️ 执行时间不够精确(取决于扫描频率,如每5分钟扫描一次)
|
|||
|
|
- ⚠️ 需要处理并发扫描(已通过乐观锁解决)
|
|||
|
|
|
|||
|
|
## 最终选择:定时任务扫描方案
|
|||
|
|
|
|||
|
|
### 选择理由
|
|||
|
|
1. **金融场景,可靠性优先**:涉及资金解冻,必须保证任务不丢失
|
|||
|
|
2. **解冻时间允许延迟**:解冻时间可以有一定的延迟(比如几分钟内都可以接受)
|
|||
|
|
3. **已有基础设施**:项目中已有定时任务实现(`cleanQueryData.go`)
|
|||
|
|
4. **数据库表已设计好**:`status` 和 `unfreeze_time` 字段支持扫描查询
|
|||
|
|
|
|||
|
|
## 实现细节
|
|||
|
|
|
|||
|
|
### 1. 定时任务配置
|
|||
|
|
- **执行频率**:每2小时执行一次(`0 */2 * * *`)- 节省性能
|
|||
|
|
- **任务类型**:`MsgUnfreezeCommissionScan`
|
|||
|
|
- **处理器**:`UnfreezeCommissionScanHandler`
|
|||
|
|
- **批次大小**:每次最多处理2个任务,避免并发太多
|
|||
|
|
|
|||
|
|
### 2. 扫描逻辑
|
|||
|
|
```go
|
|||
|
|
// 查询条件:
|
|||
|
|
// - status = 1(待解冻)
|
|||
|
|
// - unfreeze_time <= 当前时间
|
|||
|
|
// - del_state = 0(未删除)
|
|||
|
|
// - 按 unfreeze_time 升序排序
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 3. 并发安全
|
|||
|
|
- 使用**乐观锁**(`version` 字段)确保并发安全
|
|||
|
|
- 每个任务在事务中处理,确保原子性
|
|||
|
|
- 双重检查:查询后再次检查状态,防止并发处理
|
|||
|
|
|
|||
|
|
### 4. 错误处理
|
|||
|
|
- 单个任务失败不影响其他任务
|
|||
|
|
- 记录详细的错误日志
|
|||
|
|
- 失败的任务会在下次扫描时重试
|
|||
|
|
|
|||
|
|
### 5. 性能优化
|
|||
|
|
- 使用数据库索引优化查询(`idx_status` 和 `idx_unfreeze_time`)
|
|||
|
|
- **扫描频率**:每2小时扫描一次,减少数据库查询压力
|
|||
|
|
- **查询所有任务**:每次扫描找到所有需要解冻的任务(不限制数量)
|
|||
|
|
- **并发控制**:使用信号量(Semaphore)限制最多同时处理2个任务
|
|||
|
|
- **批量处理**:所有任务都会处理,但通过并发控制避免同时处理太多,节省性能
|
|||
|
|
|
|||
|
|
## 代码文件
|
|||
|
|
|
|||
|
|
### 新增文件
|
|||
|
|
- `app/main/api/internal/queue/unfreezeCommissionScan.go` - 定时扫描处理器
|
|||
|
|
|
|||
|
|
### 修改文件
|
|||
|
|
- `app/main/api/internal/queue/routes.go` - 注册定时任务
|
|||
|
|
- `app/main/api/internal/queue/agentProcess.go` - 移除发送延迟任务的逻辑
|
|||
|
|
- `app/main/api/internal/types/taskname.go` - 添加任务类型常量
|
|||
|
|
|
|||
|
|
### 保留文件(备用)
|
|||
|
|
- `app/main/api/internal/queue/unfreezeCommission.go` - 延迟任务处理器(保留作为备用)
|
|||
|
|
- `app/main/api/internal/service/asynqService.go` - `SendUnfreezeTask` 方法(保留作为备用)
|
|||
|
|
|
|||
|
|
## 执行流程
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
定时任务启动(每2小时)
|
|||
|
|
↓
|
|||
|
|
扫描数据库:status=1 AND unfreeze_time <= 当前时间
|
|||
|
|
↓
|
|||
|
|
查询所有需要解冻的任务(不限制数量)
|
|||
|
|
↓
|
|||
|
|
并发处理(使用信号量限制最多同时2个)
|
|||
|
|
├─ 任务1(goroutine 1)
|
|||
|
|
├─ 任务2(goroutine 2)
|
|||
|
|
├─ 任务3(等待,直到前2个完成)
|
|||
|
|
├─ 任务4(等待,直到前2个完成)
|
|||
|
|
└─ ...(以此类推,两个两个处理)
|
|||
|
|
↓
|
|||
|
|
每个任务使用事务 + 乐观锁处理
|
|||
|
|
↓
|
|||
|
|
更新任务状态:status = 2(已解冻)
|
|||
|
|
↓
|
|||
|
|
更新钱包:FrozenBalance -= 冻结金额, Balance += 冻结金额
|
|||
|
|
↓
|
|||
|
|
等待所有任务完成
|
|||
|
|
↓
|
|||
|
|
记录日志
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 监控建议
|
|||
|
|
|
|||
|
|
1. **监控扫描任务执行情况**
|
|||
|
|
- 检查定时任务是否正常执行
|
|||
|
|
- 监控每次扫描找到的任务数量
|
|||
|
|
- 监控成功/失败数量
|
|||
|
|
|
|||
|
|
2. **监控解冻延迟**
|
|||
|
|
- 记录 `actual_unfreeze_time - unfreeze_time` 的差值
|
|||
|
|
- 如果延迟超过10分钟,需要检查定时任务是否正常
|
|||
|
|
|
|||
|
|
3. **监控异常情况**
|
|||
|
|
- 冻结余额不足的情况(数据异常)
|
|||
|
|
- 任务状态异常的情况
|
|||
|
|
|
|||
|
|
## 后续优化建议
|
|||
|
|
|
|||
|
|
1. **可配置扫描频率**:将扫描频率(当前5分钟)配置到配置表
|
|||
|
|
2. **批次大小限制**:如果任务量很大,可以限制每次处理的数量
|
|||
|
|
3. **告警机制**:如果连续多次扫描都失败,发送告警
|
|||
|
|
4. **补偿机制**:提供手动触发扫描的接口,用于紧急情况
|
|||
|
|
|