Files
qnc-server-v3/app/main/api/internal/queue/cleanQueryData.go
2025-12-13 17:44:18 +08:00

276 lines
8.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package queue
import (
"context"
"database/sql"
"fmt"
"strconv"
"time"
"qnc-server/app/main/api/internal/svc"
"qnc-server/app/main/model"
"qnc-server/common/globalkey"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
// TASKTIME 定义为每天凌晨3点执行
const TASKTIME = "0 3 * * *"
type CleanQueryDataHandler struct {
svcCtx *svc.ServiceContext
}
func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler {
return &CleanQueryDataHandler{
svcCtx: svcCtx,
}
}
// 获取配置值
func (l *CleanQueryDataHandler) getConfigValue(ctx context.Context, key string) (string, error) {
// 通过缓存获取配置
config, err := l.svcCtx.QueryCleanupConfigModel.FindOneByConfigKey(ctx, key)
if err != nil {
if err == model.ErrNotFound {
return "", fmt.Errorf("配置项 %s 不存在", key)
}
return "", err
}
// 检查配置状态
if config.Status != 1 {
return "", fmt.Errorf("配置项 %s 已禁用或已删除", key)
}
return config.ConfigValue, nil
}
func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// 添加超时控制最多运行1小时
taskCtx, cancel := context.WithTimeout(ctx, 1*time.Hour)
defer cancel()
startTime := time.Now()
now := time.Now()
logx.Infof("%s - 开始执行查询数据清理任务", now.Format("2006-01-02 15:04:05"))
// 1. 检查是否启用清理
enableCleanup, err := l.getConfigValue(taskCtx, "enable_cleanup")
if err != nil {
return err
}
if enableCleanup != "1" {
logx.Infof("查询数据清理任务已禁用")
return nil
}
// 2. 获取保留天数
retentionDaysStr, err := l.getConfigValue(taskCtx, "retention_days")
if err != nil {
return err
}
retentionDays, err := strconv.Atoi(retentionDaysStr)
if err != nil {
return fmt.Errorf("保留天数配置无效: %v", err)
}
if retentionDays < 0 {
return fmt.Errorf("保留天数不能为负数: %d", retentionDays)
}
// 3. 获取批次大小
batchSizeStr, err := l.getConfigValue(taskCtx, "batch_size")
if err != nil {
return err
}
batchSize, err := strconv.Atoi(batchSizeStr)
if err != nil {
return fmt.Errorf("批次大小配置无效: %v", err)
}
if batchSize <= 0 || batchSize > 10000 {
return fmt.Errorf("批次大小必须在1-10000之间: %d", batchSize)
}
// 计算清理截止时间
cleanupBefore := now.AddDate(0, 0, -retentionDays)
// 先快速检查是否有数据需要清理(避免创建无用的日志记录)
checkBuilder := l.svcCtx.QueryModel.SelectBuilder().
Where("create_time < ?", cleanupBefore).
Where("del_state = ?", globalkey.DelStateNo).
Limit(1) // 只查询1条用于判断是否有数据
checkQueries, checkErr := l.svcCtx.QueryModel.FindAll(taskCtx, checkBuilder, "")
if checkErr != nil {
logx.Errorf("检查是否有数据需要清理失败: %v", checkErr)
return checkErr
}
// 如果没有数据需要清理,直接返回,不创建日志记录
if len(checkQueries) == 0 {
logx.Infof("%s - 没有需要清理的数据(清理截止时间: %s", now.Format("2006-01-02 15:04:05"), cleanupBefore.Format("2006-01-02 15:04:05"))
return nil
}
// 创建清理日志记录(只创建一次)
cleanupLog := &model.QueryCleanupLog{
Id: uuid.New().String(),
CleanupTime: now,
CleanupBefore: cleanupBefore,
Status: 1,
Remark: sql.NullString{String: "定时清理数据", Valid: true},
}
// 先创建清理日志记录
err = l.svcCtx.QueryCleanupLogModel.Trans(taskCtx, func(logCtx context.Context, logSession sqlx.Session) error {
_, insertErr := l.svcCtx.QueryCleanupLogModel.Insert(logCtx, logSession, cleanupLog)
if insertErr != nil {
return insertErr
}
return insertErr
})
if err != nil {
logx.Errorf("创建清理日志记录失败: %v", err)
return err
}
logx.Infof("创建清理日志记录成功日志ID: %s", cleanupLog.Id)
// 分批处理,每个批次使用独立事务
batchCount := 0
lastProcessedId := ""
for {
// 检查是否被取消(优雅关闭支持)
select {
case <-taskCtx.Done():
logx.Infof("清理任务被取消,已处理 %d 批次,共删除 %d 条记录", batchCount, cleanupLog.AffectedRows)
// 更新清理日志状态
l.updateCleanupLogStatus(taskCtx, cleanupLog.Id, cleanupLog, fmt.Errorf("任务被取消"))
return taskCtx.Err()
default:
// 继续处理
}
// 每个批次使用独立事务
var batchQueries []*model.Query
batchErr := l.svcCtx.QueryModel.Trans(taskCtx, func(batchCtx context.Context, batchSession sqlx.Session) error {
// 1. 查询一批要删除的记录(添加排序确保一致性)
builder := l.svcCtx.QueryModel.SelectBuilder().
Where("create_time < ?", cleanupBefore).
Where("del_state = ?", globalkey.DelStateNo).
OrderBy("id ASC"). // 添加排序,确保处理顺序一致
Limit(uint64(batchSize))
// 如果已处理过从上次处理的ID之后继续
if lastProcessedId != "" {
builder = builder.Where("id > ?", lastProcessedId)
}
var queryErr error
batchQueries, queryErr = l.svcCtx.QueryModel.FindAll(batchCtx, builder, "")
if queryErr != nil {
return queryErr
}
if len(batchQueries) == 0 {
// 没有更多数据需要清理,标记为完成
return nil
}
// 2. 执行清理
for _, query := range batchQueries {
deleteErr := l.svcCtx.QueryModel.DeleteSoft(batchCtx, batchSession, query)
if deleteErr != nil {
return deleteErr
}
}
// 3. 保存清理明细
for _, query := range batchQueries {
detail := &model.QueryCleanupDetail{
Id: uuid.New().String(),
CleanupLogId: cleanupLog.Id,
QueryId: query.Id,
OrderId: query.OrderId,
UserId: query.UserId,
ProductId: query.ProductId,
QueryState: query.QueryState,
CreateTimeOld: query.CreateTime,
}
_, insertErr := l.svcCtx.QueryCleanupDetailModel.Insert(batchCtx, batchSession, detail)
if insertErr != nil {
return insertErr
}
}
// 4. 记录最后处理的ID用于下次查询
lastProcessedId = batchQueries[len(batchQueries)-1].Id
return nil
})
if batchErr != nil {
// 批次失败,更新清理日志状态
logx.Errorf("批次处理失败(批次 %d: %v", batchCount+1, batchErr)
l.updateCleanupLogStatus(taskCtx, cleanupLog.Id, cleanupLog, batchErr)
return batchErr
}
// 如果查询结果为空,说明没有更多数据
if len(batchQueries) == 0 {
logx.Infof("所有数据已处理完成")
break
}
// 更新影响行数(在事务外更新,避免重复计算)
actualBatchSize := int64(len(batchQueries))
cleanupLog.AffectedRows += actualBatchSize
batchCount++
logx.Infof("批次 %d 处理完成,本批次删除 %d 条记录,累计删除 %d 条记录", batchCount, actualBatchSize, cleanupLog.AffectedRows)
// 如果本批次查询到的数据少于批次大小,说明已经处理完所有数据
if actualBatchSize < int64(batchSize) {
logx.Infof("所有数据已处理完成(本批次数据量少于批次大小)")
break
}
}
// 更新清理日志状态为成功
l.updateCleanupLogStatus(taskCtx, cleanupLog.Id, cleanupLog, nil)
duration := time.Since(startTime)
logx.Infof("%s - 查询数据清理完成,共处理 %d 批次,删除 %d 条记录,耗时 %v",
now.Format("2006-01-02 15:04:05"), batchCount, cleanupLog.AffectedRows, duration)
return nil
}
// updateCleanupLogStatus 更新清理日志状态
func (l *CleanQueryDataHandler) updateCleanupLogStatus(ctx context.Context, logId string, cleanupLog *model.QueryCleanupLog, err error) {
err = l.svcCtx.QueryCleanupLogModel.Trans(ctx, func(updateCtx context.Context, updateSession sqlx.Session) error {
// 查询当前日志记录
currentLog, findErr := l.svcCtx.QueryCleanupLogModel.FindOne(updateCtx, cleanupLog.Id)
if findErr != nil {
return findErr
}
// 更新状态和影响行数
currentLog.AffectedRows = cleanupLog.AffectedRows
if err != nil {
currentLog.Status = 2 // 失败
currentLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
} else {
currentLog.Status = 1 // 成功
}
_, updateErr := l.svcCtx.QueryCleanupLogModel.Update(updateCtx, updateSession, currentLog)
return updateErr
})
if err != nil {
logx.Errorf("更新清理日志状态失败: %v", err)
}
}