168 lines
4.2 KiB
Go
168 lines
4.2 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
"tydata-server/app/main/api/internal/svc"
|
|
"tydata-server/app/main/model"
|
|
"tydata-server/common/globalkey"
|
|
|
|
"github.com/hibiken/asynq"
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
|
)
|
|
|
|
// TASKTIME 定义为每天凌晨3点执行
|
|
const TASKTIME = "0 3 * * *"
|
|
|
|
type CleanQueryDataHandler struct {
|
|
svcCtx *svc.ServiceContext
|
|
}
|
|
|
|
func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler {
|
|
return &CleanQueryDataHandler{
|
|
svcCtx: svcCtx,
|
|
}
|
|
}
|
|
|
|
// 获取配置值
|
|
func (l *CleanQueryDataHandler) getConfigValue(ctx context.Context, key string) (string, error) {
|
|
// 通过缓存获取配置
|
|
config, err := l.svcCtx.QueryCleanupConfigModel.FindOneByConfigKey(ctx, key)
|
|
if err != nil {
|
|
if err == model.ErrNotFound {
|
|
return "", fmt.Errorf("配置项 %s 不存在", key)
|
|
}
|
|
return "", err
|
|
}
|
|
|
|
// 检查配置状态
|
|
if config.Status != 1 {
|
|
return "", fmt.Errorf("配置项 %s 已禁用或已删除", key)
|
|
}
|
|
|
|
return config.ConfigValue, nil
|
|
}
|
|
|
|
func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
|
now := time.Now()
|
|
logx.Infof("%s - 开始执行查询数据清理任务", now.Format("2006-01-02 15:04:05"))
|
|
|
|
// 1. 检查是否启用清理
|
|
enableCleanup, err := l.getConfigValue(ctx, "enable_cleanup")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if enableCleanup != "1" {
|
|
logx.Infof("查询数据清理任务已禁用")
|
|
return nil
|
|
}
|
|
|
|
// 2. 获取保留天数
|
|
retentionDaysStr, err := l.getConfigValue(ctx, "retention_days")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
retentionDays, err := strconv.Atoi(retentionDaysStr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 3. 获取批次大小
|
|
batchSizeStr, err := l.getConfigValue(ctx, "batch_size")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
batchSize, err := strconv.Atoi(batchSizeStr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 计算清理截止时间
|
|
cleanupBefore := now.AddDate(0, 0, -retentionDays)
|
|
|
|
// 创建清理日志记录
|
|
cleanupLog := &model.QueryCleanupLog{
|
|
CleanupTime: now,
|
|
CleanupBefore: cleanupBefore,
|
|
Status: 1,
|
|
Remark: sql.NullString{String: "定时清理数据", Valid: true},
|
|
}
|
|
|
|
// 使用事务处理清理操作和日志记录
|
|
err = l.svcCtx.QueryModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error {
|
|
// 分批处理
|
|
for {
|
|
// 1. 查询一批要删除的记录
|
|
builder := l.svcCtx.QueryModel.SelectBuilder().
|
|
Where("create_time < ?", cleanupBefore).
|
|
Where("del_state = ?", globalkey.DelStateNo).
|
|
Limit(uint64(batchSize))
|
|
|
|
queries, err := l.svcCtx.QueryModel.FindAll(ctx, builder, "")
|
|
if err != nil {
|
|
cleanupLog.Status = 2
|
|
cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
|
|
return err
|
|
}
|
|
|
|
if len(queries) == 0 {
|
|
break // 没有更多数据需要清理
|
|
}
|
|
|
|
// 2. 执行清理
|
|
for _, query := range queries {
|
|
err = l.svcCtx.QueryModel.DeleteSoft(ctx, session, query)
|
|
if err != nil {
|
|
cleanupLog.Status = 2
|
|
cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
|
|
return err
|
|
}
|
|
}
|
|
|
|
// 3. 更新影响行数
|
|
cleanupLog.AffectedRows += int64(len(queries))
|
|
|
|
// 4. 保存清理日志(每批次都记录)
|
|
cleanupLogInsertResult, err := l.svcCtx.QueryCleanupLogModel.Insert(ctx, session, cleanupLog)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cleanupLogId, err := cleanupLogInsertResult.LastInsertId()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 5. 保存清理明细
|
|
for _, query := range queries {
|
|
detail := &model.QueryCleanupDetail{
|
|
CleanupLogId: cleanupLogId,
|
|
QueryId: query.Id,
|
|
OrderId: query.OrderId,
|
|
UserId: query.UserId,
|
|
ProductId: query.ProductId,
|
|
QueryState: query.QueryState,
|
|
CreateTimeOld: query.CreateTime,
|
|
}
|
|
_, err = l.svcCtx.QueryCleanupDetailModel.Insert(ctx, session, detail)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
logx.Errorf("%s - 清理查询数据失败: %v", now.Format("2006-01-02 15:04:05"), err)
|
|
return err
|
|
}
|
|
|
|
logx.Infof("%s - 查询数据清理完成,共删除 %d 条记录", now.Format("2006-01-02 15:04:05"), cleanupLog.AffectedRows)
|
|
return nil
|
|
}
|