tydata-server/app/main/api/internal/queue/cleanQueryData.go
2025-06-08 15:14:34 +08:00

168 lines
4.2 KiB
Go

package queue
import (
"context"
"database/sql"
"fmt"
"strconv"
"time"
"tydata-server/app/main/api/internal/svc"
"tydata-server/app/main/model"
"tydata-server/common/globalkey"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
// TASKTIME 定义为每天凌晨3点执行
const TASKTIME = "0 3 * * *"
type CleanQueryDataHandler struct {
svcCtx *svc.ServiceContext
}
func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler {
return &CleanQueryDataHandler{
svcCtx: svcCtx,
}
}
// 获取配置值
func (l *CleanQueryDataHandler) getConfigValue(ctx context.Context, key string) (string, error) {
// 通过缓存获取配置
config, err := l.svcCtx.QueryCleanupConfigModel.FindOneByConfigKey(ctx, key)
if err != nil {
if err == model.ErrNotFound {
return "", fmt.Errorf("配置项 %s 不存在", key)
}
return "", err
}
// 检查配置状态
if config.Status != 1 {
return "", fmt.Errorf("配置项 %s 已禁用或已删除", key)
}
return config.ConfigValue, nil
}
func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
now := time.Now()
logx.Infof("%s - 开始执行查询数据清理任务", now.Format("2006-01-02 15:04:05"))
// 1. 检查是否启用清理
enableCleanup, err := l.getConfigValue(ctx, "enable_cleanup")
if err != nil {
return err
}
if enableCleanup != "1" {
logx.Infof("查询数据清理任务已禁用")
return nil
}
// 2. 获取保留天数
retentionDaysStr, err := l.getConfigValue(ctx, "retention_days")
if err != nil {
return err
}
retentionDays, err := strconv.Atoi(retentionDaysStr)
if err != nil {
return err
}
// 3. 获取批次大小
batchSizeStr, err := l.getConfigValue(ctx, "batch_size")
if err != nil {
return err
}
batchSize, err := strconv.Atoi(batchSizeStr)
if err != nil {
return err
}
// 计算清理截止时间
cleanupBefore := now.AddDate(0, 0, -retentionDays)
// 创建清理日志记录
cleanupLog := &model.QueryCleanupLog{
CleanupTime: now,
CleanupBefore: cleanupBefore,
Status: 1,
Remark: sql.NullString{String: "定时清理数据", Valid: true},
}
// 使用事务处理清理操作和日志记录
err = l.svcCtx.QueryModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error {
// 分批处理
for {
// 1. 查询一批要删除的记录
builder := l.svcCtx.QueryModel.SelectBuilder().
Where("create_time < ?", cleanupBefore).
Where("del_state = ?", globalkey.DelStateNo).
Limit(uint64(batchSize))
queries, err := l.svcCtx.QueryModel.FindAll(ctx, builder, "")
if err != nil {
cleanupLog.Status = 2
cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
return err
}
if len(queries) == 0 {
break // 没有更多数据需要清理
}
// 2. 执行清理
for _, query := range queries {
err = l.svcCtx.QueryModel.DeleteSoft(ctx, session, query)
if err != nil {
cleanupLog.Status = 2
cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
return err
}
}
// 3. 更新影响行数
cleanupLog.AffectedRows += int64(len(queries))
// 4. 保存清理日志(每批次都记录)
cleanupLogInsertResult, err := l.svcCtx.QueryCleanupLogModel.Insert(ctx, session, cleanupLog)
if err != nil {
return err
}
cleanupLogId, err := cleanupLogInsertResult.LastInsertId()
if err != nil {
return err
}
// 5. 保存清理明细
for _, query := range queries {
detail := &model.QueryCleanupDetail{
CleanupLogId: cleanupLogId,
QueryId: query.Id,
OrderId: query.OrderId,
UserId: query.UserId,
ProductId: query.ProductId,
QueryState: query.QueryState,
CreateTimeOld: query.CreateTime,
}
_, err = l.svcCtx.QueryCleanupDetailModel.Insert(ctx, session, detail)
if err != nil {
return err
}
}
}
return nil
})
if err != nil {
logx.Errorf("%s - 清理查询数据失败: %v", now.Format("2006-01-02 15:04:05"), err)
return err
}
logx.Infof("%s - 查询数据清理完成,共删除 %d 条记录", now.Format("2006-01-02 15:04:05"), cleanupLog.AffectedRows)
return nil
}