package queue import ( "context" "database/sql" "fmt" "strconv" "time" "tydata-server/app/main/api/internal/svc" "tydata-server/app/main/model" "tydata-server/common/globalkey" "github.com/hibiken/asynq" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" ) // TASKTIME 定义为每天凌晨3点执行 const TASKTIME = "0 3 * * *" type CleanQueryDataHandler struct { svcCtx *svc.ServiceContext } func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler { return &CleanQueryDataHandler{ svcCtx: svcCtx, } } // 获取配置值 func (l *CleanQueryDataHandler) getConfigValue(ctx context.Context, key string) (string, error) { // 通过缓存获取配置 config, err := l.svcCtx.QueryCleanupConfigModel.FindOneByConfigKey(ctx, key) if err != nil { if err == model.ErrNotFound { return "", fmt.Errorf("配置项 %s 不存在", key) } return "", err } // 检查配置状态 if config.Status != 1 { return "", fmt.Errorf("配置项 %s 已禁用或已删除", key) } return config.ConfigValue, nil } func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error { now := time.Now() logx.Infof("%s - 开始执行查询数据清理任务", now.Format("2006-01-02 15:04:05")) // 1. 检查是否启用清理 enableCleanup, err := l.getConfigValue(ctx, "enable_cleanup") if err != nil { return err } if enableCleanup != "1" { logx.Infof("查询数据清理任务已禁用") return nil } // 2. 获取保留天数 retentionDaysStr, err := l.getConfigValue(ctx, "retention_days") if err != nil { return err } retentionDays, err := strconv.Atoi(retentionDaysStr) if err != nil { return err } // 3. 获取批次大小 batchSizeStr, err := l.getConfigValue(ctx, "batch_size") if err != nil { return err } batchSize, err := strconv.Atoi(batchSizeStr) if err != nil { return err } // 计算清理截止时间 cleanupBefore := now.AddDate(0, 0, -retentionDays) // 创建清理日志记录 cleanupLog := &model.QueryCleanupLog{ CleanupTime: now, CleanupBefore: cleanupBefore, Status: 1, Remark: sql.NullString{String: "定时清理数据", Valid: true}, } // 使用事务处理清理操作和日志记录 err = l.svcCtx.QueryModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error { // 分批处理 for { // 1. 查询一批要删除的记录 builder := l.svcCtx.QueryModel.SelectBuilder(). Where("create_time < ?", cleanupBefore). Where("del_state = ?", globalkey.DelStateNo). Limit(uint64(batchSize)) queries, err := l.svcCtx.QueryModel.FindAll(ctx, builder, "") if err != nil { cleanupLog.Status = 2 cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} return err } if len(queries) == 0 { break // 没有更多数据需要清理 } // 2. 执行清理 for _, query := range queries { err = l.svcCtx.QueryModel.DeleteSoft(ctx, session, query) if err != nil { cleanupLog.Status = 2 cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} return err } } // 3. 更新影响行数 cleanupLog.AffectedRows += int64(len(queries)) // 4. 保存清理日志(每批次都记录) cleanupLogInsertResult, err := l.svcCtx.QueryCleanupLogModel.Insert(ctx, session, cleanupLog) if err != nil { return err } cleanupLogId, err := cleanupLogInsertResult.LastInsertId() if err != nil { return err } // 5. 保存清理明细 for _, query := range queries { detail := &model.QueryCleanupDetail{ CleanupLogId: cleanupLogId, QueryId: query.Id, OrderId: query.OrderId, UserId: query.UserId, ProductId: query.ProductId, QueryState: query.QueryState, CreateTimeOld: query.CreateTime, } _, err = l.svcCtx.QueryCleanupDetailModel.Insert(ctx, session, detail) if err != nil { return err } } } return nil }) if err != nil { logx.Errorf("%s - 清理查询数据失败: %v", now.Format("2006-01-02 15:04:05"), err) return err } logx.Infof("%s - 查询数据清理完成,共删除 %d 条记录", now.Format("2006-01-02 15:04:05"), cleanupLog.AffectedRows) return nil }