package queue import ( "context" "database/sql" "fmt" "strconv" "time" "ycc-server/app/main/api/internal/svc" "ycc-server/app/main/model" "ycc-server/common/globalkey" "github.com/hibiken/asynq" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" ) // TASKTIME 定义为每天凌晨3点执行 const TASKTIME = "0 3 * * *" type CleanQueryDataHandler struct { svcCtx *svc.ServiceContext } func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler { return &CleanQueryDataHandler{ svcCtx: svcCtx, } } // 获取配置值 func (l *CleanQueryDataHandler) getConfigValue(ctx context.Context, key string) (string, error) { // 通过缓存获取配置 config, err := l.svcCtx.QueryCleanupConfigModel.FindOneByConfigKey(ctx, key) if err != nil { if err == model.ErrNotFound { return "", fmt.Errorf("配置项 %s 不存在", key) } return "", err } // 检查配置状态 if config.Status != 1 { return "", fmt.Errorf("配置项 %s 已禁用或已删除", key) } return config.ConfigValue, nil } func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error { // 添加超时控制:最多运行1小时 taskCtx, cancel := context.WithTimeout(ctx, 1*time.Hour) defer cancel() startTime := time.Now() now := time.Now() logx.Infof("%s - 开始执行查询数据清理任务", now.Format("2006-01-02 15:04:05")) // 1. 检查是否启用清理 enableCleanup, err := l.getConfigValue(taskCtx, "enable_cleanup") if err != nil { return err } if enableCleanup != "1" { logx.Infof("查询数据清理任务已禁用") return nil } // 2. 获取保留天数 retentionDaysStr, err := l.getConfigValue(taskCtx, "retention_days") if err != nil { return err } retentionDays, err := strconv.Atoi(retentionDaysStr) if err != nil { return fmt.Errorf("保留天数配置无效: %v", err) } if retentionDays < 0 { return fmt.Errorf("保留天数不能为负数: %d", retentionDays) } // 3. 获取批次大小 batchSizeStr, err := l.getConfigValue(taskCtx, "batch_size") if err != nil { return err } batchSize, err := strconv.Atoi(batchSizeStr) if err != nil { return fmt.Errorf("批次大小配置无效: %v", err) } if batchSize <= 0 || batchSize > 10000 { return fmt.Errorf("批次大小必须在1-10000之间: %d", batchSize) } // 计算清理截止时间 cleanupBefore := now.AddDate(0, 0, -retentionDays) // 先快速检查是否有数据需要清理(避免创建无用的日志记录) checkBuilder := l.svcCtx.QueryModel.SelectBuilder(). Where("create_time < ?", cleanupBefore). Where("del_state = ?", globalkey.DelStateNo). Limit(1) // 只查询1条,用于判断是否有数据 checkQueries, checkErr := l.svcCtx.QueryModel.FindAll(taskCtx, checkBuilder, "") if checkErr != nil { logx.Errorf("检查是否有数据需要清理失败: %v", checkErr) return checkErr } // 如果没有数据需要清理,直接返回,不创建日志记录 if len(checkQueries) == 0 { logx.Infof("%s - 没有需要清理的数据(清理截止时间: %s)", now.Format("2006-01-02 15:04:05"), cleanupBefore.Format("2006-01-02 15:04:05")) return nil } // 创建清理日志记录(只创建一次) cleanupLog := &model.QueryCleanupLog{ CleanupTime: now, CleanupBefore: cleanupBefore, Status: 1, Remark: sql.NullString{String: "定时清理数据", Valid: true}, } // 先创建清理日志记录 var cleanupLogId int64 err = l.svcCtx.QueryCleanupLogModel.Trans(taskCtx, func(logCtx context.Context, logSession sqlx.Session) error { cleanupLogInsertResult, insertErr := l.svcCtx.QueryCleanupLogModel.Insert(logCtx, logSession, cleanupLog) if insertErr != nil { return insertErr } cleanupLogId, insertErr = cleanupLogInsertResult.LastInsertId() return insertErr }) if err != nil { logx.Errorf("创建清理日志记录失败: %v", err) return err } logx.Infof("创建清理日志记录成功,日志ID: %d", cleanupLogId) // 分批处理,每个批次使用独立事务 batchCount := 0 lastProcessedId := int64(0) for { // 检查是否被取消(优雅关闭支持) select { case <-taskCtx.Done(): logx.Infof("清理任务被取消,已处理 %d 批次,共删除 %d 条记录", batchCount, cleanupLog.AffectedRows) // 更新清理日志状态 l.updateCleanupLogStatus(taskCtx, cleanupLogId, cleanupLog, fmt.Errorf("任务被取消")) return taskCtx.Err() default: // 继续处理 } // 每个批次使用独立事务 var batchQueries []*model.Query batchErr := l.svcCtx.QueryModel.Trans(taskCtx, func(batchCtx context.Context, batchSession sqlx.Session) error { // 1. 查询一批要删除的记录(添加排序确保一致性) builder := l.svcCtx.QueryModel.SelectBuilder(). Where("create_time < ?", cleanupBefore). Where("del_state = ?", globalkey.DelStateNo). OrderBy("id ASC"). // 添加排序,确保处理顺序一致 Limit(uint64(batchSize)) // 如果已处理过,从上次处理的ID之后继续 if lastProcessedId > 0 { builder = builder.Where("id > ?", lastProcessedId) } var queryErr error batchQueries, queryErr = l.svcCtx.QueryModel.FindAll(batchCtx, builder, "") if queryErr != nil { return queryErr } if len(batchQueries) == 0 { // 没有更多数据需要清理,标记为完成 return nil } // 2. 执行清理 for _, query := range batchQueries { deleteErr := l.svcCtx.QueryModel.DeleteSoft(batchCtx, batchSession, query) if deleteErr != nil { return deleteErr } } // 3. 保存清理明细 for _, query := range batchQueries { detail := &model.QueryCleanupDetail{ CleanupLogId: cleanupLogId, QueryId: query.Id, OrderId: query.OrderId, UserId: query.UserId, ProductId: query.ProductId, QueryState: query.QueryState, CreateTimeOld: query.CreateTime, } _, insertErr := l.svcCtx.QueryCleanupDetailModel.Insert(batchCtx, batchSession, detail) if insertErr != nil { return insertErr } } // 4. 记录最后处理的ID(用于下次查询) lastProcessedId = batchQueries[len(batchQueries)-1].Id return nil }) if batchErr != nil { // 批次失败,更新清理日志状态 logx.Errorf("批次处理失败(批次 %d): %v", batchCount+1, batchErr) l.updateCleanupLogStatus(taskCtx, cleanupLogId, cleanupLog, batchErr) return batchErr } // 如果查询结果为空,说明没有更多数据 if len(batchQueries) == 0 { logx.Infof("所有数据已处理完成") break } // 更新影响行数(在事务外更新,避免重复计算) actualBatchSize := int64(len(batchQueries)) cleanupLog.AffectedRows += actualBatchSize batchCount++ logx.Infof("批次 %d 处理完成,本批次删除 %d 条记录,累计删除 %d 条记录", batchCount, actualBatchSize, cleanupLog.AffectedRows) // 如果本批次查询到的数据少于批次大小,说明已经处理完所有数据 if actualBatchSize < int64(batchSize) { logx.Infof("所有数据已处理完成(本批次数据量少于批次大小)") break } } // 更新清理日志状态为成功 l.updateCleanupLogStatus(taskCtx, cleanupLogId, cleanupLog, nil) duration := time.Since(startTime) logx.Infof("%s - 查询数据清理完成,共处理 %d 批次,删除 %d 条记录,耗时 %v", now.Format("2006-01-02 15:04:05"), batchCount, cleanupLog.AffectedRows, duration) return nil } // updateCleanupLogStatus 更新清理日志状态 func (l *CleanQueryDataHandler) updateCleanupLogStatus(ctx context.Context, logId int64, cleanupLog *model.QueryCleanupLog, err error) { err = l.svcCtx.QueryCleanupLogModel.Trans(ctx, func(updateCtx context.Context, updateSession sqlx.Session) error { // 查询当前日志记录 currentLog, findErr := l.svcCtx.QueryCleanupLogModel.FindOne(updateCtx, logId) if findErr != nil { return findErr } // 更新状态和影响行数 currentLog.AffectedRows = cleanupLog.AffectedRows if err != nil { currentLog.Status = 2 // 失败 currentLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} } else { currentLog.Status = 1 // 成功 } _, updateErr := l.svcCtx.QueryCleanupLogModel.Update(updateCtx, updateSession, currentLog) return updateErr }) if err != nil { logx.Errorf("更新清理日志状态失败: %v", err) } }