first commit
This commit is contained in:
167
app/main/api/internal/queue/cleanQueryData.go
Normal file
167
app/main/api/internal/queue/cleanQueryData.go
Normal file
@@ -0,0 +1,167 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"aedata-server/app/main/api/internal/svc"
|
||||
"aedata-server/app/main/model"
|
||||
"aedata-server/common/globalkey"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||
)
|
||||
|
||||
// TASKTIME 定义为每天凌晨3点执行
|
||||
const TASKTIME = "0 3 * * *"
|
||||
|
||||
type CleanQueryDataHandler struct {
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler {
|
||||
return &CleanQueryDataHandler{
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
// 获取配置值
|
||||
func (l *CleanQueryDataHandler) getConfigValue(ctx context.Context, key string) (string, error) {
|
||||
// 通过缓存获取配置
|
||||
config, err := l.svcCtx.QueryCleanupConfigModel.FindOneByConfigKey(ctx, key)
|
||||
if err != nil {
|
||||
if err == model.ErrNotFound {
|
||||
return "", fmt.Errorf("配置项 %s 不存在", key)
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
// 检查配置状态
|
||||
if config.Status != 1 {
|
||||
return "", fmt.Errorf("配置项 %s 已禁用或已删除", key)
|
||||
}
|
||||
|
||||
return config.ConfigValue, nil
|
||||
}
|
||||
|
||||
func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||||
now := time.Now()
|
||||
logx.Infof("%s - 开始执行查询数据清理任务", now.Format("2006-01-02 15:04:05"))
|
||||
|
||||
// 1. 检查是否启用清理
|
||||
enableCleanup, err := l.getConfigValue(ctx, "enable_cleanup")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if enableCleanup != "1" {
|
||||
logx.Infof("查询数据清理任务已禁用")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2. 获取保留天数
|
||||
retentionDaysStr, err := l.getConfigValue(ctx, "retention_days")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
retentionDays, err := strconv.Atoi(retentionDaysStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 3. 获取批次大小
|
||||
batchSizeStr, err := l.getConfigValue(ctx, "batch_size")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
batchSize, err := strconv.Atoi(batchSizeStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 计算清理截止时间
|
||||
cleanupBefore := now.AddDate(0, 0, -retentionDays)
|
||||
|
||||
// 创建清理日志记录
|
||||
cleanupLog := &model.QueryCleanupLog{
|
||||
CleanupTime: now,
|
||||
CleanupBefore: cleanupBefore,
|
||||
Status: 1,
|
||||
Remark: sql.NullString{String: "定时清理数据", Valid: true},
|
||||
}
|
||||
|
||||
// 使用事务处理清理操作和日志记录
|
||||
err = l.svcCtx.QueryModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error {
|
||||
// 分批处理
|
||||
for {
|
||||
// 1. 查询一批要删除的记录
|
||||
builder := l.svcCtx.QueryModel.SelectBuilder().
|
||||
Where("create_time < ?", cleanupBefore).
|
||||
Where("del_state = ?", globalkey.DelStateNo).
|
||||
Limit(uint64(batchSize))
|
||||
|
||||
queries, err := l.svcCtx.QueryModel.FindAll(ctx, builder, "")
|
||||
if err != nil {
|
||||
cleanupLog.Status = 2
|
||||
cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
|
||||
return err
|
||||
}
|
||||
|
||||
if len(queries) == 0 {
|
||||
break // 没有更多数据需要清理
|
||||
}
|
||||
|
||||
// 2. 执行清理
|
||||
for _, query := range queries {
|
||||
err = l.svcCtx.QueryModel.DeleteSoft(ctx, session, query)
|
||||
if err != nil {
|
||||
cleanupLog.Status = 2
|
||||
cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 更新影响行数
|
||||
cleanupLog.AffectedRows += int64(len(queries))
|
||||
|
||||
// 4. 保存清理日志(每批次都记录)
|
||||
cleanupLogInsertResult, err := l.svcCtx.QueryCleanupLogModel.Insert(ctx, session, cleanupLog)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cleanupLogId, err := cleanupLogInsertResult.LastInsertId()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 5. 保存清理明细
|
||||
for _, query := range queries {
|
||||
detail := &model.QueryCleanupDetail{
|
||||
CleanupLogId: cleanupLogId,
|
||||
QueryId: query.Id,
|
||||
OrderId: query.OrderId,
|
||||
UserId: query.UserId,
|
||||
ProductId: query.ProductId,
|
||||
QueryState: query.QueryState,
|
||||
CreateTimeOld: query.CreateTime,
|
||||
}
|
||||
_, err = l.svcCtx.QueryCleanupDetailModel.Insert(ctx, session, detail)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logx.Errorf("%s - 清理查询数据失败: %v", now.Format("2006-01-02 15:04:05"), err)
|
||||
return err
|
||||
}
|
||||
|
||||
logx.Infof("%s - 查询数据清理完成,共删除 %d 条记录", now.Format("2006-01-02 15:04:05"), cleanupLog.AffectedRows)
|
||||
return nil
|
||||
}
|
||||
370
app/main/api/internal/queue/paySuccessNotify.go
Normal file
370
app/main/api/internal/queue/paySuccessNotify.go
Normal file
@@ -0,0 +1,370 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"aedata-server/app/main/api/internal/svc"
|
||||
"aedata-server/app/main/api/internal/types"
|
||||
"aedata-server/app/main/model"
|
||||
"aedata-server/pkg/lzkit/crypto"
|
||||
"aedata-server/pkg/lzkit/lzUtils"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type PaySuccessNotifyUserHandler struct {
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotifyUserHandler {
|
||||
return &PaySuccessNotifyUserHandler{
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
var payload struct {
|
||||
OrderID int64 `json:"order_id"`
|
||||
}
|
||||
|
||||
func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||||
// 从任务的负载中解码数据
|
||||
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
|
||||
return fmt.Errorf("解析任务负载失败: %w", err)
|
||||
}
|
||||
|
||||
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("无效的订单ID: %d, %v", payload.OrderID, err)
|
||||
}
|
||||
env := os.Getenv("ENV")
|
||||
if order.Status != "paid" && env != "development" {
|
||||
err = fmt.Errorf("无效的订单: %d", payload.OrderID)
|
||||
logx.Errorf("处理任务失败,原因: %v", err)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("找不到相关产品: orderID: %d, productID: %d", payload.OrderID, order.ProductId)
|
||||
}
|
||||
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
|
||||
cache, cacheErr := l.svcCtx.Redis.GetCtx(ctx, redisKey)
|
||||
if cacheErr != nil {
|
||||
return fmt.Errorf("获取缓存内容失败: %+v", cacheErr)
|
||||
}
|
||||
var data types.QueryCacheLoad
|
||||
err = json.Unmarshal([]byte(cache), &data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("解析缓存内容失败: %+v", err)
|
||||
}
|
||||
secretKey := l.svcCtx.Config.Encrypt.SecretKey
|
||||
key, decodeErr := hex.DecodeString(secretKey)
|
||||
if decodeErr != nil {
|
||||
return fmt.Errorf("获取AES密钥失败: %+v", decodeErr)
|
||||
}
|
||||
decryptData, aesdecryptErr := crypto.AesDecrypt(data.Params, key)
|
||||
if aesdecryptErr != nil {
|
||||
return fmt.Errorf("解密参数失败: %+v", aesdecryptErr)
|
||||
}
|
||||
|
||||
// 敏感数据脱敏处理
|
||||
desensitizedParams, err := l.desensitizeParams(decryptData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("脱敏处理失败: %+v", err)
|
||||
}
|
||||
|
||||
// 对脱敏后的数据进行AES加密
|
||||
encryptedParams, encryptErr := crypto.AesEncrypt(desensitizedParams, key)
|
||||
if encryptErr != nil {
|
||||
return fmt.Errorf("加密脱敏数据失败: %+v", encryptErr)
|
||||
}
|
||||
|
||||
query := &model.Query{
|
||||
OrderId: order.Id,
|
||||
UserId: order.UserId,
|
||||
ProductId: product.Id,
|
||||
QueryParams: encryptedParams,
|
||||
QueryState: "pending",
|
||||
}
|
||||
result, insertQueryErr := l.svcCtx.QueryModel.Insert(ctx, nil, query)
|
||||
if insertQueryErr != nil {
|
||||
return fmt.Errorf("保存查询失败: %+v", insertQueryErr)
|
||||
}
|
||||
|
||||
// 获取插入后的ID
|
||||
queryId, err := result.LastInsertId()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取插入的查询ID失败: %+v", err)
|
||||
}
|
||||
|
||||
// 从数据库中查询完整的查询记录
|
||||
query, err = l.svcCtx.QueryModel.FindOne(ctx, queryId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取插入后的查询记录失败: %+v", err)
|
||||
}
|
||||
|
||||
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id)
|
||||
if err != nil {
|
||||
return l.handleError(ctx, err, order, query)
|
||||
}
|
||||
// 加密返回响应
|
||||
encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
|
||||
if aesEncryptErr != nil {
|
||||
err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
|
||||
return l.handleError(ctx, err, order, query)
|
||||
}
|
||||
query.QueryData = lzUtils.StringToNullString(encryptData)
|
||||
updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
|
||||
if updateErr != nil {
|
||||
err = fmt.Errorf("保存响应数据失败: %v", updateErr)
|
||||
return l.handleError(ctx, err, order, query)
|
||||
}
|
||||
|
||||
query.QueryState = "success"
|
||||
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
|
||||
if updateQueryErr != nil {
|
||||
updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
|
||||
return l.handleError(ctx, updateQueryErr, order, query)
|
||||
}
|
||||
|
||||
|
||||
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
|
||||
if delErr != nil {
|
||||
logx.Errorf("删除Redis缓存失败,但任务已成功处理,订单ID: %d, 错误: %v", order.Id, delErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 定义一个中间件函数
|
||||
func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error {
|
||||
logx.Errorf("处理任务失败,原因: %v", err)
|
||||
|
||||
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
|
||||
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
|
||||
if delErr != nil {
|
||||
logx.Errorf("删除Redis缓存失败,订单ID: %d, 错误: %v", order.Id, delErr)
|
||||
}
|
||||
|
||||
if order.Status == "paid" && query.QueryState == "pending" {
|
||||
// 更新查询状态为失败
|
||||
query.QueryState = "failed"
|
||||
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
|
||||
if updateQueryErr != nil {
|
||||
logx.Errorf("更新查询状态失败,订单ID: %d, 错误: %v", order.Id, updateQueryErr)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
// 退款
|
||||
if order.PaymentPlatform == "wechat" {
|
||||
refundErr := l.svcCtx.WechatPayService.WeChatRefund(ctx, order.OrderNo, order.Amount, order.Amount)
|
||||
if refundErr != nil {
|
||||
logx.Error(refundErr)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
} else {
|
||||
refund, refundErr := l.svcCtx.AlipayService.AliRefund(ctx, order.OrderNo, order.Amount)
|
||||
if refundErr != nil {
|
||||
logx.Error(refundErr)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
if refund.IsSuccess() {
|
||||
logx.Errorf("支付宝退款成功, orderID: %d", order.Id)
|
||||
// 更新订单状态为退款
|
||||
order.Status = "refunded"
|
||||
updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order)
|
||||
if updateOrderErr != nil {
|
||||
logx.Errorf("更新订单状态失败,订单ID: %d, 错误: %v", order.Id, updateOrderErr)
|
||||
return fmt.Errorf("更新订单状态失败: %v", updateOrderErr)
|
||||
}
|
||||
return asynq.SkipRetry
|
||||
} else {
|
||||
logx.Errorf("支付宝退款失败:%v", refundErr)
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
// 直接成功
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
// desensitizeParams 对敏感数据进行脱敏处理
|
||||
func (l *PaySuccessNotifyUserHandler) desensitizeParams(data []byte) ([]byte, error) {
|
||||
// 解析JSON数据到map
|
||||
var paramsMap map[string]interface{}
|
||||
if err := json.Unmarshal(data, ¶msMap); err != nil {
|
||||
return nil, fmt.Errorf("解析JSON数据失败: %v", err)
|
||||
}
|
||||
|
||||
// 处理可能包含敏感信息的字段
|
||||
for key, value := range paramsMap {
|
||||
if strValue, ok := value.(string); ok {
|
||||
// 根据字段名和内容判断并脱敏
|
||||
if isNameField(key) && len(strValue) > 0 {
|
||||
// 姓名脱敏
|
||||
paramsMap[key] = maskName(strValue)
|
||||
} else if isIDCardField(key) && len(strValue) > 10 {
|
||||
// 身份证号脱敏
|
||||
paramsMap[key] = maskIDCard(strValue)
|
||||
} else if isPhoneField(key) && len(strValue) >= 8 {
|
||||
// 手机号脱敏
|
||||
paramsMap[key] = maskPhone(strValue)
|
||||
} else if len(strValue) > 3 {
|
||||
// 其他所有未匹配的字段都进行通用脱敏
|
||||
paramsMap[key] = maskGeneral(strValue)
|
||||
}
|
||||
} else if mapValue, ok := value.(map[string]interface{}); ok {
|
||||
// 递归处理嵌套的map
|
||||
for subKey, subValue := range mapValue {
|
||||
if subStrValue, ok := subValue.(string); ok {
|
||||
if isNameField(subKey) && len(subStrValue) > 0 {
|
||||
mapValue[subKey] = maskName(subStrValue)
|
||||
} else if isIDCardField(subKey) && len(subStrValue) > 10 {
|
||||
mapValue[subKey] = maskIDCard(subStrValue)
|
||||
} else if isPhoneField(subKey) && len(subStrValue) >= 8 {
|
||||
mapValue[subKey] = maskPhone(subStrValue)
|
||||
} else if len(subStrValue) > 3 {
|
||||
// 其他所有未匹配的字段都进行通用脱敏
|
||||
mapValue[subKey] = maskGeneral(subStrValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 将处理后的map重新序列化为JSON
|
||||
return json.Marshal(paramsMap)
|
||||
}
|
||||
|
||||
// 判断是否为姓名字段
|
||||
func isNameField(key string) bool {
|
||||
key = strings.ToLower(key)
|
||||
return strings.Contains(key, "name") || strings.Contains(key, "姓名") ||
|
||||
strings.Contains(key, "owner") || strings.Contains(key, "main")
|
||||
}
|
||||
|
||||
// 判断是否为身份证字段
|
||||
func isIDCardField(key string) bool {
|
||||
key = strings.ToLower(key)
|
||||
return strings.Contains(key, "idcard") || strings.Contains(key, "id_card") ||
|
||||
strings.Contains(key, "身份证") || strings.Contains(key, "证件号")
|
||||
}
|
||||
|
||||
// 判断是否为手机号字段
|
||||
func isPhoneField(key string) bool {
|
||||
key = strings.ToLower(key)
|
||||
return strings.Contains(key, "phone") || strings.Contains(key, "mobile") ||
|
||||
strings.Contains(key, "手机") || strings.Contains(key, "电话")
|
||||
}
|
||||
|
||||
// 判断是否包含敏感数据模式
|
||||
func containsSensitivePattern(value string) bool {
|
||||
// 检查是否包含连续的数字或字母模式
|
||||
numPattern := regexp.MustCompile(`\d{6,}`)
|
||||
return numPattern.MatchString(value)
|
||||
}
|
||||
|
||||
// 姓名脱敏
|
||||
func maskName(name string) string {
|
||||
// 将字符串转换为rune切片以正确处理中文字符
|
||||
runes := []rune(name)
|
||||
length := len(runes)
|
||||
|
||||
if length <= 1 {
|
||||
return name
|
||||
}
|
||||
|
||||
if length == 2 {
|
||||
// 两个字:保留第一个字,第二个字用*替代
|
||||
return string(runes[0]) + "*"
|
||||
}
|
||||
|
||||
// 三个字及以上:保留首尾字,中间用*替代
|
||||
first := string(runes[0])
|
||||
last := string(runes[length-1])
|
||||
mask := strings.Repeat("*", length-2)
|
||||
|
||||
return first + mask + last
|
||||
}
|
||||
|
||||
// 身份证号脱敏
|
||||
func maskIDCard(idCard string) string {
|
||||
length := len(idCard)
|
||||
if length <= 10 {
|
||||
return idCard // 如果长度太短,可能不是身份证,不处理
|
||||
}
|
||||
// 保留前3位和后4位
|
||||
return idCard[:3] + strings.Repeat("*", length-7) + idCard[length-4:]
|
||||
}
|
||||
|
||||
// 手机号脱敏
|
||||
func maskPhone(phone string) string {
|
||||
length := len(phone)
|
||||
if length < 8 {
|
||||
return phone // 如果长度太短,可能不是手机号,不处理
|
||||
}
|
||||
// 保留前3位和后4位
|
||||
return phone[:3] + strings.Repeat("*", length-7) + phone[length-4:]
|
||||
}
|
||||
|
||||
// 通用敏感信息脱敏 - 根据字符串长度比例进行脱敏
|
||||
func maskGeneral(value string) string {
|
||||
length := len(value)
|
||||
|
||||
// 小于3个字符的不脱敏
|
||||
if length <= 3 {
|
||||
return value
|
||||
}
|
||||
|
||||
// 根据字符串长度计算保留字符数
|
||||
var prefixLen, suffixLen int
|
||||
|
||||
switch {
|
||||
case length <= 6: // 短字符串
|
||||
// 保留首尾各1个字符
|
||||
prefixLen, suffixLen = 1, 1
|
||||
case length <= 10: // 中等长度字符串
|
||||
// 保留首部30%和尾部20%的字符
|
||||
prefixLen = int(float64(length) * 0.3)
|
||||
suffixLen = int(float64(length) * 0.2)
|
||||
case length <= 20: // 较长字符串
|
||||
// 保留首部25%和尾部15%的字符
|
||||
prefixLen = int(float64(length) * 0.25)
|
||||
suffixLen = int(float64(length) * 0.15)
|
||||
default: // 非常长的字符串
|
||||
// 保留首部20%和尾部10%的字符
|
||||
prefixLen = int(float64(length) * 0.2)
|
||||
suffixLen = int(float64(length) * 0.1)
|
||||
}
|
||||
|
||||
// 确保至少有一个字符被保留
|
||||
if prefixLen < 1 {
|
||||
prefixLen = 1
|
||||
}
|
||||
if suffixLen < 1 {
|
||||
suffixLen = 1
|
||||
}
|
||||
|
||||
// 确保前缀和后缀总长不超过总长度的80%
|
||||
if prefixLen+suffixLen > int(float64(length)*0.8) {
|
||||
// 调整为总长度的80%
|
||||
totalVisible := int(float64(length) * 0.8)
|
||||
// 前缀占60%,后缀占40%
|
||||
prefixLen = int(float64(totalVisible) * 0.6)
|
||||
suffixLen = totalVisible - prefixLen
|
||||
}
|
||||
|
||||
// 创建脱敏后的字符串
|
||||
prefix := value[:prefixLen]
|
||||
suffix := value[length-suffixLen:]
|
||||
masked := strings.Repeat("*", length-prefixLen-suffixLen)
|
||||
|
||||
return prefix + masked + suffix
|
||||
}
|
||||
40
app/main/api/internal/queue/routes.go
Normal file
40
app/main/api/internal/queue/routes.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"aedata-server/app/main/api/internal/svc"
|
||||
"aedata-server/app/main/api/internal/types"
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
|
||||
type CronJob struct {
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewCronJob(ctx context.Context, svcCtx *svc.ServiceContext) *CronJob {
|
||||
return &CronJob{
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *CronJob) Register() *asynq.ServeMux {
|
||||
redisClientOpt := asynq.RedisClientOpt{Addr: l.svcCtx.Config.CacheRedis[0].Host, Password: l.svcCtx.Config.CacheRedis[0].Pass}
|
||||
scheduler := asynq.NewScheduler(redisClientOpt, nil)
|
||||
task := asynq.NewTask(types.MsgCleanQueryData, nil, nil)
|
||||
_, err := scheduler.Register(TASKTIME, task)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("定时任务注册失败:%v", err))
|
||||
}
|
||||
scheduler.Start()
|
||||
fmt.Println("定时任务启动!!!")
|
||||
|
||||
mux := asynq.NewServeMux()
|
||||
mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx))
|
||||
mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx))
|
||||
|
||||
return mux
|
||||
}
|
||||
Reference in New Issue
Block a user