168 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			168 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|  | package queue | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"context" | ||
|  | 	"database/sql" | ||
|  | 	"fmt" | ||
|  | 	"strconv" | ||
|  | 	"time" | ||
|  | 	"znc-server/app/main/api/internal/svc" | ||
|  | 	"znc-server/app/main/model" | ||
|  | 	"znc-server/common/globalkey" | ||
|  | 
 | ||
|  | 	"github.com/hibiken/asynq" | ||
|  | 	"github.com/zeromicro/go-zero/core/logx" | ||
|  | 	"github.com/zeromicro/go-zero/core/stores/sqlx" | ||
|  | ) | ||
|  | 
 | ||
|  | // TASKTIME 定义为每天凌晨3点执行 | ||
|  | const TASKTIME = "0 3 * * *" | ||
|  | 
 | ||
|  | type CleanQueryDataHandler struct { | ||
|  | 	svcCtx *svc.ServiceContext | ||
|  | } | ||
|  | 
 | ||
|  | func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler { | ||
|  | 	return &CleanQueryDataHandler{ | ||
|  | 		svcCtx: svcCtx, | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // 获取配置值 | ||
|  | func (l *CleanQueryDataHandler) getConfigValue(ctx context.Context, key string) (string, error) { | ||
|  | 	// 通过缓存获取配置 | ||
|  | 	config, err := l.svcCtx.QueryCleanupConfigModel.FindOneByConfigKey(ctx, key) | ||
|  | 	if err != nil { | ||
|  | 		if err == model.ErrNotFound { | ||
|  | 			return "", fmt.Errorf("配置项 %s 不存在", key) | ||
|  | 		} | ||
|  | 		return "", err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// 检查配置状态 | ||
|  | 	if config.Status != 1 { | ||
|  | 		return "", fmt.Errorf("配置项 %s 已禁用或已删除", key) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return config.ConfigValue, nil | ||
|  | } | ||
|  | 
 | ||
|  | func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error { | ||
|  | 	now := time.Now() | ||
|  | 	logx.Infof("%s - 开始执行查询数据清理任务", now.Format("2006-01-02 15:04:05")) | ||
|  | 
 | ||
|  | 	// 1. 检查是否启用清理 | ||
|  | 	enableCleanup, err := l.getConfigValue(ctx, "enable_cleanup") | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 	if enableCleanup != "1" { | ||
|  | 		logx.Infof("查询数据清理任务已禁用") | ||
|  | 		return nil | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// 2. 获取保留天数 | ||
|  | 	retentionDaysStr, err := l.getConfigValue(ctx, "retention_days") | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 	retentionDays, err := strconv.Atoi(retentionDaysStr) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// 3. 获取批次大小 | ||
|  | 	batchSizeStr, err := l.getConfigValue(ctx, "batch_size") | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 	batchSize, err := strconv.Atoi(batchSizeStr) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// 计算清理截止时间 | ||
|  | 	cleanupBefore := now.AddDate(0, 0, -retentionDays) | ||
|  | 
 | ||
|  | 	// 创建清理日志记录 | ||
|  | 	cleanupLog := &model.QueryCleanupLog{ | ||
|  | 		CleanupTime:   now, | ||
|  | 		CleanupBefore: cleanupBefore, | ||
|  | 		Status:        1, | ||
|  | 		Remark:        sql.NullString{String: "定时清理数据", Valid: true}, | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// 使用事务处理清理操作和日志记录 | ||
|  | 	err = l.svcCtx.QueryModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error { | ||
|  | 		// 分批处理 | ||
|  | 		for { | ||
|  | 			// 1. 查询一批要删除的记录 | ||
|  | 			builder := l.svcCtx.QueryModel.SelectBuilder(). | ||
|  | 				Where("create_time < ?", cleanupBefore). | ||
|  | 				Where("del_state = ?", globalkey.DelStateNo). | ||
|  | 				Limit(uint64(batchSize)) | ||
|  | 
 | ||
|  | 			queries, err := l.svcCtx.QueryModel.FindAll(ctx, builder, "") | ||
|  | 			if err != nil { | ||
|  | 				cleanupLog.Status = 2 | ||
|  | 				cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} | ||
|  | 				return err | ||
|  | 			} | ||
|  | 
 | ||
|  | 			if len(queries) == 0 { | ||
|  | 				break // 没有更多数据需要清理 | ||
|  | 			} | ||
|  | 
 | ||
|  | 			// 2. 执行清理 | ||
|  | 			for _, query := range queries { | ||
|  | 				err = l.svcCtx.QueryModel.DeleteSoft(ctx, session, query) | ||
|  | 				if err != nil { | ||
|  | 					cleanupLog.Status = 2 | ||
|  | 					cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true} | ||
|  | 					return err | ||
|  | 				} | ||
|  | 			} | ||
|  | 
 | ||
|  | 			// 3. 更新影响行数 | ||
|  | 			cleanupLog.AffectedRows += int64(len(queries)) | ||
|  | 
 | ||
|  | 			// 4. 保存清理日志(每批次都记录) | ||
|  | 			cleanupLogInsertResult, err := l.svcCtx.QueryCleanupLogModel.Insert(ctx, session, cleanupLog) | ||
|  | 			if err != nil { | ||
|  | 				return err | ||
|  | 			} | ||
|  | 			cleanupLogId, err := cleanupLogInsertResult.LastInsertId() | ||
|  | 			if err != nil { | ||
|  | 				return err | ||
|  | 			} | ||
|  | 
 | ||
|  | 			// 5. 保存清理明细 | ||
|  | 			for _, query := range queries { | ||
|  | 				detail := &model.QueryCleanupDetail{ | ||
|  | 					CleanupLogId:  cleanupLogId, | ||
|  | 					QueryId:       query.Id, | ||
|  | 					OrderId:       query.OrderId, | ||
|  | 					UserId:        query.UserId, | ||
|  | 					ProductId:     query.ProductId, | ||
|  | 					QueryState:    query.QueryState, | ||
|  | 					CreateTimeOld: query.CreateTime, | ||
|  | 				} | ||
|  | 				_, err = l.svcCtx.QueryCleanupDetailModel.Insert(ctx, session, detail) | ||
|  | 				if err != nil { | ||
|  | 					return err | ||
|  | 				} | ||
|  | 			} | ||
|  | 		} | ||
|  | 
 | ||
|  | 		return nil | ||
|  | 	}) | ||
|  | 
 | ||
|  | 	if err != nil { | ||
|  | 		logx.Errorf("%s - 清理查询数据失败: %v", now.Format("2006-01-02 15:04:05"), err) | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	logx.Infof("%s - 查询数据清理完成,共删除 %d 条记录", now.Format("2006-01-02 15:04:05"), cleanupLog.AffectedRows) | ||
|  | 	return nil | ||
|  | } |