This commit is contained in:
2025-12-13 17:44:18 +08:00
commit c7a30fb094
599 changed files with 65988 additions and 0 deletions

View File

@@ -0,0 +1,63 @@
package queue
import (
"context"
"encoding/json"
"errors"
"fmt"
"qnc-server/app/main/api/internal/svc"
"qnc-server/app/main/api/internal/types"
"qnc-server/app/main/model"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type AgentProcessHandler struct {
svcCtx *svc.ServiceContext
}
func NewAgentProcessHandler(svcCtx *svc.ServiceContext) *AgentProcessHandler {
return &AgentProcessHandler{
svcCtx: svcCtx,
}
}
func (l *AgentProcessHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
var payload types.MsgAgentProcessPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析代理处理任务负载失败: %w", err)
}
// 获取订单信息
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
if err != nil {
if errors.Is(err, model.ErrNotFound) {
logx.Errorf("代理处理任务失败,订单不存在: orderID=%s", payload.OrderID)
return asynq.SkipRetry // 订单不存在,跳过重试
}
return fmt.Errorf("查询订单失败: orderID=%s, err=%w", payload.OrderID, err)
}
// 检查订单状态
if order.Status != "paid" {
logx.Infof("代理处理任务跳过,订单未支付: orderID=%s, status=%s", payload.OrderID, order.Status)
return nil // 订单未支付,不处理,不重试
}
// 调用代理处理服务
err = l.svcCtx.AgentService.AgentProcess(ctx, order)
if err != nil {
// 记录错误日志,但不阻塞报告流程
logx.Errorf("代理处理失败订单ID: %s, 错误: %v", payload.OrderID, err)
// 返回错误以触发重试机制
return fmt.Errorf("代理处理失败: orderID=%s, err=%w", payload.OrderID, err)
}
// 注意:解冻任务现在通过定时任务扫描处理,不再需要发送延迟任务
// 定时任务每5分钟扫描一次待解冻的任务更加可靠
logx.Infof("代理处理成功订单ID: %s冻结任务如有将由定时任务自动处理", payload.OrderID)
logx.Infof("代理处理成功订单ID: %s", payload.OrderID)
return nil
}

View File

@@ -0,0 +1,275 @@
package queue
import (
"context"
"database/sql"
"fmt"
"strconv"
"time"
"qnc-server/app/main/api/internal/svc"
"qnc-server/app/main/model"
"qnc-server/common/globalkey"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
// TASKTIME 定义为每天凌晨3点执行
const TASKTIME = "0 3 * * *"
type CleanQueryDataHandler struct {
svcCtx *svc.ServiceContext
}
func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler {
return &CleanQueryDataHandler{
svcCtx: svcCtx,
}
}
// 获取配置值
func (l *CleanQueryDataHandler) getConfigValue(ctx context.Context, key string) (string, error) {
// 通过缓存获取配置
config, err := l.svcCtx.QueryCleanupConfigModel.FindOneByConfigKey(ctx, key)
if err != nil {
if err == model.ErrNotFound {
return "", fmt.Errorf("配置项 %s 不存在", key)
}
return "", err
}
// 检查配置状态
if config.Status != 1 {
return "", fmt.Errorf("配置项 %s 已禁用或已删除", key)
}
return config.ConfigValue, nil
}
func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// 添加超时控制最多运行1小时
taskCtx, cancel := context.WithTimeout(ctx, 1*time.Hour)
defer cancel()
startTime := time.Now()
now := time.Now()
logx.Infof("%s - 开始执行查询数据清理任务", now.Format("2006-01-02 15:04:05"))
// 1. 检查是否启用清理
enableCleanup, err := l.getConfigValue(taskCtx, "enable_cleanup")
if err != nil {
return err
}
if enableCleanup != "1" {
logx.Infof("查询数据清理任务已禁用")
return nil
}
// 2. 获取保留天数
retentionDaysStr, err := l.getConfigValue(taskCtx, "retention_days")
if err != nil {
return err
}
retentionDays, err := strconv.Atoi(retentionDaysStr)
if err != nil {
return fmt.Errorf("保留天数配置无效: %v", err)
}
if retentionDays < 0 {
return fmt.Errorf("保留天数不能为负数: %d", retentionDays)
}
// 3. 获取批次大小
batchSizeStr, err := l.getConfigValue(taskCtx, "batch_size")
if err != nil {
return err
}
batchSize, err := strconv.Atoi(batchSizeStr)
if err != nil {
return fmt.Errorf("批次大小配置无效: %v", err)
}
if batchSize <= 0 || batchSize > 10000 {
return fmt.Errorf("批次大小必须在1-10000之间: %d", batchSize)
}
// 计算清理截止时间
cleanupBefore := now.AddDate(0, 0, -retentionDays)
// 先快速检查是否有数据需要清理(避免创建无用的日志记录)
checkBuilder := l.svcCtx.QueryModel.SelectBuilder().
Where("create_time < ?", cleanupBefore).
Where("del_state = ?", globalkey.DelStateNo).
Limit(1) // 只查询1条用于判断是否有数据
checkQueries, checkErr := l.svcCtx.QueryModel.FindAll(taskCtx, checkBuilder, "")
if checkErr != nil {
logx.Errorf("检查是否有数据需要清理失败: %v", checkErr)
return checkErr
}
// 如果没有数据需要清理,直接返回,不创建日志记录
if len(checkQueries) == 0 {
logx.Infof("%s - 没有需要清理的数据(清理截止时间: %s", now.Format("2006-01-02 15:04:05"), cleanupBefore.Format("2006-01-02 15:04:05"))
return nil
}
// 创建清理日志记录(只创建一次)
cleanupLog := &model.QueryCleanupLog{
Id: uuid.New().String(),
CleanupTime: now,
CleanupBefore: cleanupBefore,
Status: 1,
Remark: sql.NullString{String: "定时清理数据", Valid: true},
}
// 先创建清理日志记录
err = l.svcCtx.QueryCleanupLogModel.Trans(taskCtx, func(logCtx context.Context, logSession sqlx.Session) error {
_, insertErr := l.svcCtx.QueryCleanupLogModel.Insert(logCtx, logSession, cleanupLog)
if insertErr != nil {
return insertErr
}
return insertErr
})
if err != nil {
logx.Errorf("创建清理日志记录失败: %v", err)
return err
}
logx.Infof("创建清理日志记录成功日志ID: %s", cleanupLog.Id)
// 分批处理,每个批次使用独立事务
batchCount := 0
lastProcessedId := ""
for {
// 检查是否被取消(优雅关闭支持)
select {
case <-taskCtx.Done():
logx.Infof("清理任务被取消,已处理 %d 批次,共删除 %d 条记录", batchCount, cleanupLog.AffectedRows)
// 更新清理日志状态
l.updateCleanupLogStatus(taskCtx, cleanupLog.Id, cleanupLog, fmt.Errorf("任务被取消"))
return taskCtx.Err()
default:
// 继续处理
}
// 每个批次使用独立事务
var batchQueries []*model.Query
batchErr := l.svcCtx.QueryModel.Trans(taskCtx, func(batchCtx context.Context, batchSession sqlx.Session) error {
// 1. 查询一批要删除的记录(添加排序确保一致性)
builder := l.svcCtx.QueryModel.SelectBuilder().
Where("create_time < ?", cleanupBefore).
Where("del_state = ?", globalkey.DelStateNo).
OrderBy("id ASC"). // 添加排序,确保处理顺序一致
Limit(uint64(batchSize))
// 如果已处理过从上次处理的ID之后继续
if lastProcessedId != "" {
builder = builder.Where("id > ?", lastProcessedId)
}
var queryErr error
batchQueries, queryErr = l.svcCtx.QueryModel.FindAll(batchCtx, builder, "")
if queryErr != nil {
return queryErr
}
if len(batchQueries) == 0 {
// 没有更多数据需要清理,标记为完成
return nil
}
// 2. 执行清理
for _, query := range batchQueries {
deleteErr := l.svcCtx.QueryModel.DeleteSoft(batchCtx, batchSession, query)
if deleteErr != nil {
return deleteErr
}
}
// 3. 保存清理明细
for _, query := range batchQueries {
detail := &model.QueryCleanupDetail{
Id: uuid.New().String(),
CleanupLogId: cleanupLog.Id,
QueryId: query.Id,
OrderId: query.OrderId,
UserId: query.UserId,
ProductId: query.ProductId,
QueryState: query.QueryState,
CreateTimeOld: query.CreateTime,
}
_, insertErr := l.svcCtx.QueryCleanupDetailModel.Insert(batchCtx, batchSession, detail)
if insertErr != nil {
return insertErr
}
}
// 4. 记录最后处理的ID用于下次查询
lastProcessedId = batchQueries[len(batchQueries)-1].Id
return nil
})
if batchErr != nil {
// 批次失败,更新清理日志状态
logx.Errorf("批次处理失败(批次 %d: %v", batchCount+1, batchErr)
l.updateCleanupLogStatus(taskCtx, cleanupLog.Id, cleanupLog, batchErr)
return batchErr
}
// 如果查询结果为空,说明没有更多数据
if len(batchQueries) == 0 {
logx.Infof("所有数据已处理完成")
break
}
// 更新影响行数(在事务外更新,避免重复计算)
actualBatchSize := int64(len(batchQueries))
cleanupLog.AffectedRows += actualBatchSize
batchCount++
logx.Infof("批次 %d 处理完成,本批次删除 %d 条记录,累计删除 %d 条记录", batchCount, actualBatchSize, cleanupLog.AffectedRows)
// 如果本批次查询到的数据少于批次大小,说明已经处理完所有数据
if actualBatchSize < int64(batchSize) {
logx.Infof("所有数据已处理完成(本批次数据量少于批次大小)")
break
}
}
// 更新清理日志状态为成功
l.updateCleanupLogStatus(taskCtx, cleanupLog.Id, cleanupLog, nil)
duration := time.Since(startTime)
logx.Infof("%s - 查询数据清理完成,共处理 %d 批次,删除 %d 条记录,耗时 %v",
now.Format("2006-01-02 15:04:05"), batchCount, cleanupLog.AffectedRows, duration)
return nil
}
// updateCleanupLogStatus 更新清理日志状态
func (l *CleanQueryDataHandler) updateCleanupLogStatus(ctx context.Context, logId string, cleanupLog *model.QueryCleanupLog, err error) {
err = l.svcCtx.QueryCleanupLogModel.Trans(ctx, func(updateCtx context.Context, updateSession sqlx.Session) error {
// 查询当前日志记录
currentLog, findErr := l.svcCtx.QueryCleanupLogModel.FindOne(updateCtx, cleanupLog.Id)
if findErr != nil {
return findErr
}
// 更新状态和影响行数
currentLog.AffectedRows = cleanupLog.AffectedRows
if err != nil {
currentLog.Status = 2 // 失败
currentLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
} else {
currentLog.Status = 1 // 成功
}
_, updateErr := l.svcCtx.QueryCleanupLogModel.Update(updateCtx, updateSession, currentLog)
return updateErr
})
if err != nil {
logx.Errorf("更新清理日志状态失败: %v", err)
}
}

View File

@@ -0,0 +1,428 @@
package queue
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"regexp"
"strings"
"qnc-server/app/main/api/internal/svc"
"qnc-server/app/main/api/internal/types"
"qnc-server/app/main/model"
"qnc-server/pkg/lzkit/crypto"
"qnc-server/pkg/lzkit/lzUtils"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type PaySuccessNotifyUserHandler struct {
svcCtx *svc.ServiceContext
}
func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotifyUserHandler {
return &PaySuccessNotifyUserHandler{
svcCtx: svcCtx,
}
}
var payload types.MsgPaySuccessQueryPayload
func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// 从任务的负载中解码数据
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析任务负载失败: %w", err)
}
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
if err != nil {
// 订单不存在,记录详细日志并跳过重试
logx.Errorf("支付成功通知任务失败订单不存在订单ID: %s, 错误: %v", payload.OrderID, err)
return asynq.SkipRetry // 订单不存在时跳过重试,避免重复失败
}
env := os.Getenv("ENV")
if order.Status != "paid" && env != "development" {
err = fmt.Errorf("无效的订单: %s", payload.OrderID)
logx.Errorf("处理任务失败,原因: %v", err)
return asynq.SkipRetry
}
product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId)
if err != nil {
return fmt.Errorf("找不到相关产品: orderID: %s, productID: %s", payload.OrderID, order.ProductId)
}
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
cache, cacheErr := l.svcCtx.Redis.GetCtx(ctx, redisKey)
if cacheErr != nil {
return fmt.Errorf("获取缓存内容失败: %+v", cacheErr)
}
var data types.QueryCacheLoad
err = json.Unmarshal([]byte(cache), &data)
if err != nil {
return fmt.Errorf("解析缓存内容失败: %+v", err)
}
secretKey := l.svcCtx.Config.Encrypt.SecretKey
key, decodeErr := hex.DecodeString(secretKey)
if decodeErr != nil {
return fmt.Errorf("获取AES密钥失败: %+v", decodeErr)
}
decryptData, aesdecryptErr := crypto.AesDecrypt(data.Params, key)
if aesdecryptErr != nil {
return fmt.Errorf("解密参数失败: %+v", aesdecryptErr)
}
query := &model.Query{
Id: uuid.NewString(),
OrderId: order.Id,
UserId: order.UserId,
ProductId: product.Id,
QueryParams: data.Params,
QueryState: "pending",
}
_, insertQueryErr := l.svcCtx.QueryModel.Insert(ctx, nil, query)
if insertQueryErr != nil {
return fmt.Errorf("保存查询失败: %+v", insertQueryErr)
}
// 插入后使用预生成的查询ID
queryId := query.Id
// 从数据库中查询完整的查询记录
query, err = l.svcCtx.QueryModel.FindOne(ctx, queryId)
if err != nil {
return fmt.Errorf("获取插入后的查询记录失败: %+v", err)
}
// 解析解密后的参数获取用户信息
var userInfo map[string]interface{}
if err := json.Unmarshal(decryptData, &userInfo); err != nil {
return fmt.Errorf("解析用户信息失败: %+v", err)
}
// 生成授权书
authDoc, err := l.svcCtx.AuthorizationService.GenerateAuthorizationDocument(
ctx, order.UserId, order.Id, queryId, userInfo,
)
if err != nil {
logx.Errorf("生成授权书失败: %v", err)
}
// 将授权书URL添加到解密数据中
if authDoc != nil {
// 生成完整的授权书访问URL
fullAuthDocURL := l.svcCtx.AuthorizationService.GetFullFileURL(authDoc.FileUrl)
userInfo["authorization_url"] = fullAuthDocURL
// 重新序列化用户信息
updatedDecryptData, marshalErr := json.Marshal(userInfo)
if marshalErr != nil {
logx.Errorf("序列化用户信息失败: %v", marshalErr)
} else {
// 重新加密更新后的数据
encryptedUpdatedData, encryptErr := crypto.AesEncrypt(updatedDecryptData, key)
if encryptErr != nil {
logx.Errorf("重新加密用户信息失败: %v", encryptErr)
} else {
// 更新查询记录中的参数
query.QueryParams = string(encryptedUpdatedData)
updateParamsErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateParamsErr != nil {
logx.Errorf("更新查询参数失败: %v", updateParamsErr)
} else {
logx.Infof("成功更新查询参数包含授权书URL: %s", fullAuthDocURL)
}
}
decryptData = updatedDecryptData
}
}
// 检查是否为空报告模式(开发环境)
isEmptyReportMode := env == "development" && order.PaymentPlatform == "test"
var encryptData string
if isEmptyReportMode {
// 空报告模式生成空的报告数据跳过API调用
logx.Infof("空报告模式:订单 %s (ID: %s) 跳过API调用生成空报告", order.OrderNo, order.Id)
// 生成空报告数据结构(根据实际报告格式生成)
emptyReportData := []byte(`[]`) // 空数组,表示没有数据
// 加密空报告数据
encryptedEmptyData, aesEncryptErr := crypto.AesEncrypt(emptyReportData, key)
if aesEncryptErr != nil {
err = fmt.Errorf("加密空报告数据失败: %v", aesEncryptErr)
return l.handleError(ctx, err, order, query)
}
encryptData = encryptedEmptyData
} else {
// 正常模式调用API请求服务
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id)
if err != nil {
return l.handleError(ctx, err, order, query)
}
// 加密返回响应
encryptedResponse, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
if aesEncryptErr != nil {
err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
return l.handleError(ctx, err, order, query)
}
encryptData = encryptedResponse
}
query.QueryData = lzUtils.StringToNullString(encryptData)
updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateErr != nil {
err = fmt.Errorf("保存响应数据失败: %v", updateErr)
return l.handleError(ctx, err, order, query)
}
query.QueryState = "success"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
return l.handleError(ctx, updateQueryErr, order, query)
}
// 报告生成成功后,发送代理处理异步任务(不阻塞报告流程)
if asyncErr := l.svcCtx.AsynqService.SendAgentProcessTask(order.Id); asyncErr != nil {
// 代理处理任务发送失败,只记录日志,不影响报告流程
logx.Errorf("发送代理处理任务失败订单ID: %s, 错误: %v", order.Id, asyncErr)
}
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
if delErr != nil {
logx.Errorf("删除Redis缓存失败但任务已成功处理订单ID: %s, 错误: %v", order.Id, delErr)
}
return nil
}
// 定义一个中间件函数
func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error {
logx.Errorf("处理任务失败,原因: %v", err)
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
if delErr != nil {
logx.Errorf("删除Redis缓存失败订单ID: %s, 错误: %v", order.Id, delErr)
}
if order.Status == "paid" && query.QueryState == "pending" {
// 更新查询状态为失败
query.QueryState = "failed"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
logx.Errorf("更新查询状态失败订单ID: %d, 错误: %v", order.Id, updateQueryErr)
return asynq.SkipRetry
}
// 退款
if order.PaymentPlatform == "wechat" {
refundErr := l.svcCtx.WechatPayService.WeChatRefund(ctx, order.OrderNo, order.Amount, order.Amount)
if refundErr != nil {
logx.Error(refundErr)
return asynq.SkipRetry
}
} else {
refund, refundErr := l.svcCtx.AlipayService.AliRefund(ctx, order.OrderNo, order.Amount)
if refundErr != nil {
logx.Error(refundErr)
return asynq.SkipRetry
}
if refund.IsSuccess() {
logx.Errorf("支付宝退款成功, orderID: %s", order.Id)
// 更新订单状态为退款
order.Status = "refunded"
updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order)
if updateOrderErr != nil {
logx.Errorf("更新订单状态失败订单ID: %s, 错误: %v", order.Id, updateOrderErr)
return fmt.Errorf("更新订单状态失败: %v", updateOrderErr)
}
return asynq.SkipRetry
} else {
logx.Errorf("支付宝退款失败:%v", refundErr)
return asynq.SkipRetry
}
// 直接成功
}
}
return asynq.SkipRetry
}
// desensitizeParams 对敏感数据进行脱敏处理
func (l *PaySuccessNotifyUserHandler) desensitizeParams(data []byte) ([]byte, error) {
// 解析JSON数据到map
var paramsMap map[string]interface{}
if err := json.Unmarshal(data, &paramsMap); err != nil {
return nil, fmt.Errorf("解析JSON数据失败: %v", err)
}
// 处理可能包含敏感信息的字段
for key, value := range paramsMap {
if strValue, ok := value.(string); ok {
// 根据字段名和内容判断并脱敏
if isNameField(key) && len(strValue) > 0 {
// 姓名脱敏
paramsMap[key] = maskName(strValue)
} else if isIDCardField(key) && len(strValue) > 10 {
// 身份证号脱敏
paramsMap[key] = maskIDCard(strValue)
} else if isPhoneField(key) && len(strValue) >= 8 {
// 手机号脱敏
paramsMap[key] = maskPhone(strValue)
} else if len(strValue) > 3 {
// 其他所有未匹配的字段都进行通用脱敏
paramsMap[key] = maskGeneral(strValue)
}
} else if mapValue, ok := value.(map[string]interface{}); ok {
// 递归处理嵌套的map
for subKey, subValue := range mapValue {
if subStrValue, ok := subValue.(string); ok {
if isNameField(subKey) && len(subStrValue) > 0 {
mapValue[subKey] = maskName(subStrValue)
} else if isIDCardField(subKey) && len(subStrValue) > 10 {
mapValue[subKey] = maskIDCard(subStrValue)
} else if isPhoneField(subKey) && len(subStrValue) >= 8 {
mapValue[subKey] = maskPhone(subStrValue)
} else if len(subStrValue) > 3 {
// 其他所有未匹配的字段都进行通用脱敏
mapValue[subKey] = maskGeneral(subStrValue)
}
}
}
}
}
// 将处理后的map重新序列化为JSON
return json.Marshal(paramsMap)
}
// 判断是否为姓名字段
func isNameField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "name") || strings.Contains(key, "姓名") ||
strings.Contains(key, "owner") || strings.Contains(key, "main")
}
// 判断是否为身份证字段
func isIDCardField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "idcard") || strings.Contains(key, "id_card") ||
strings.Contains(key, "身份证") || strings.Contains(key, "证件号")
}
// 判断是否为手机号字段
func isPhoneField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "phone") || strings.Contains(key, "mobile") ||
strings.Contains(key, "手机") || strings.Contains(key, "电话")
}
// 判断是否包含敏感数据模式
func containsSensitivePattern(value string) bool {
// 检查是否包含连续的数字或字母模式
numPattern := regexp.MustCompile(`\d{6,}`)
return numPattern.MatchString(value)
}
// 姓名脱敏
func maskName(name string) string {
// 将字符串转换为rune切片以正确处理中文字符
runes := []rune(name)
length := len(runes)
if length <= 1 {
return name
}
if length == 2 {
// 两个字:保留第一个字,第二个字用*替代
return string(runes[0]) + "*"
}
// 三个字及以上:保留首尾字,中间用*替代
first := string(runes[0])
last := string(runes[length-1])
mask := strings.Repeat("*", length-2)
return first + mask + last
}
// 身份证号脱敏
func maskIDCard(idCard string) string {
length := len(idCard)
if length <= 10 {
return idCard // 如果长度太短,可能不是身份证,不处理
}
// 保留前3位和后4位
return idCard[:3] + strings.Repeat("*", length-7) + idCard[length-4:]
}
// 手机号脱敏
func maskPhone(phone string) string {
length := len(phone)
if length < 8 {
return phone // 如果长度太短,可能不是手机号,不处理
}
// 保留前3位和后4位
return phone[:3] + strings.Repeat("*", length-7) + phone[length-4:]
}
// 通用敏感信息脱敏 - 根据字符串长度比例进行脱敏
func maskGeneral(value string) string {
length := len(value)
// 小于3个字符的不脱敏
if length <= 3 {
return value
}
// 根据字符串长度计算保留字符数
var prefixLen, suffixLen int
switch {
case length <= 6: // 短字符串
// 保留首尾各1个字符
prefixLen, suffixLen = 1, 1
case length <= 10: // 中等长度字符串
// 保留首部30%和尾部20%的字符
prefixLen = int(float64(length) * 0.3)
suffixLen = int(float64(length) * 0.2)
case length <= 20: // 较长字符串
// 保留首部25%和尾部15%的字符
prefixLen = int(float64(length) * 0.25)
suffixLen = int(float64(length) * 0.15)
default: // 非常长的字符串
// 保留首部20%和尾部10%的字符
prefixLen = int(float64(length) * 0.2)
suffixLen = int(float64(length) * 0.1)
}
// 确保至少有一个字符被保留
if prefixLen < 1 {
prefixLen = 1
}
if suffixLen < 1 {
suffixLen = 1
}
// 确保前缀和后缀总长不超过总长度的80%
if prefixLen+suffixLen > int(float64(length)*0.8) {
// 调整为总长度的80%
totalVisible := int(float64(length) * 0.8)
// 前缀占60%后缀占40%
prefixLen = int(float64(totalVisible) * 0.6)
suffixLen = totalVisible - prefixLen
}
// 创建脱敏后的字符串
prefix := value[:prefixLen]
suffix := value[length-suffixLen:]
masked := strings.Repeat("*", length-prefixLen-suffixLen)
return prefix + masked + suffix
}

View File

@@ -0,0 +1,53 @@
package queue
import (
"context"
"fmt"
"qnc-server/app/main/api/internal/svc"
"qnc-server/app/main/api/internal/types"
"github.com/hibiken/asynq"
)
type CronJob struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCronJob(ctx context.Context, svcCtx *svc.ServiceContext) *CronJob {
return &CronJob{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CronJob) Register() *asynq.ServeMux {
redisClientOpt := asynq.RedisClientOpt{Addr: l.svcCtx.Config.CacheRedis[0].Host, Password: l.svcCtx.Config.CacheRedis[0].Pass}
scheduler := asynq.NewScheduler(redisClientOpt, nil)
// 注册数据清理定时任务每天凌晨3点
task := asynq.NewTask(types.MsgCleanQueryData, nil, nil)
_, err := scheduler.Register(TASKTIME, task)
if err != nil {
panic(fmt.Sprintf("定时任务注册失败:%v", err))
}
// 注册解冻佣金扫描定时任务每2小时执行一次
unfreezeScanTask := asynq.NewTask(types.MsgUnfreezeCommissionScan, nil, nil)
_, err = scheduler.Register("0 */2 * * *", unfreezeScanTask) // 每2小时执行一次每小时的第0分钟
if err != nil {
panic(fmt.Sprintf("解冻佣金扫描定时任务注册失败:%v", err))
}
scheduler.Start()
fmt.Println("定时任务启动!!!")
mux := asynq.NewServeMux()
mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx))
mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx))
mux.Handle(types.MsgAgentProcess, NewAgentProcessHandler(l.svcCtx))
mux.Handle(types.MsgUnfreezeCommission, NewUnfreezeCommissionHandler(l.svcCtx))
mux.Handle(types.MsgUnfreezeCommissionScan, NewUnfreezeCommissionScanHandler(l.svcCtx))
return mux
}

View File

@@ -0,0 +1,94 @@
package queue
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"qnc-server/app/main/model"
"qnc-server/pkg/lzkit/lzUtils"
"qnc-server/app/main/api/internal/svc"
"qnc-server/app/main/api/internal/types"
"github.com/hibiken/asynq"
pkgerrors "github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
type UnfreezeCommissionHandler struct {
svcCtx *svc.ServiceContext
}
func NewUnfreezeCommissionHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionHandler {
return &UnfreezeCommissionHandler{
svcCtx: svcCtx,
}
}
func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
var payload types.MsgUnfreezeCommissionPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析解冻任务负载失败: %w", err)
}
// 1. 查询冻结任务
freezeTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(ctx, payload.FreezeTaskId)
if err != nil {
if errors.Is(err, model.ErrNotFound) {
logx.Errorf("解冻任务失败,冻结任务不存在: freezeTaskId=%s", payload.FreezeTaskId)
return asynq.SkipRetry // 任务不存在,跳过重试
}
return fmt.Errorf("查询冻结任务失败: freezeTaskId=%s, err=%w", payload.FreezeTaskId, err)
}
// 2. 检查任务状态
if freezeTask.Status != 1 {
logx.Infof("解冻任务跳过,任务已处理: freezeTaskId=%s, status=%d", payload.FreezeTaskId, freezeTask.Status)
return nil // 任务已处理,不重试
}
// 3. 检查解冻时间是否已到
if time.Now().Before(freezeTask.UnfreezeTime) {
logx.Infof("解冻任务跳过,未到解冻时间: freezeTaskId=%s, unfreezeTime=%v", payload.FreezeTaskId, freezeTask.UnfreezeTime)
// 重新发送延迟任务
if err := l.svcCtx.AsynqService.SendUnfreezeTask(payload.FreezeTaskId, freezeTask.UnfreezeTime); err != nil {
logx.Errorf("重新发送解冻任务失败: freezeTaskId=%s, err=%v", payload.FreezeTaskId, err)
}
return nil
}
// 4. 使用事务处理解冻
err = l.svcCtx.AgentFreezeTaskModel.Trans(ctx, func(transCtx context.Context, session sqlx.Session) error {
// 4.1 更新冻结任务状态
freezeTask.Status = 2 // 已解冻
freezeTask.ActualUnfreezeTime = lzUtils.TimeToNullTime(time.Now())
if updateErr := l.svcCtx.AgentFreezeTaskModel.UpdateWithVersion(transCtx, session, freezeTask); updateErr != nil {
return pkgerrors.Wrapf(updateErr, "更新冻结任务状态失败")
}
// 4.2 更新钱包(解冻余额)
wallet, walletErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(transCtx, freezeTask.AgentId)
if walletErr != nil {
return pkgerrors.Wrapf(walletErr, "查询钱包失败, agentId: %s", freezeTask.AgentId)
}
wallet.FrozenBalance -= freezeTask.FreezeAmount
wallet.Balance += freezeTask.FreezeAmount
if updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(transCtx, session, wallet); updateWalletErr != nil {
return pkgerrors.Wrapf(updateWalletErr, "更新钱包失败")
}
return nil
})
if err != nil {
logx.Errorf("解冻任务处理失败: freezeTaskId=%s, err=%v", payload.FreezeTaskId, err)
return fmt.Errorf("解冻任务处理失败: freezeTaskId=%s, err=%w", payload.FreezeTaskId, err)
}
logx.Infof("解冻任务处理成功: freezeTaskId=%s, agentId=%s, amount=%.2f", payload.FreezeTaskId, freezeTask.AgentId, freezeTask.FreezeAmount)
return nil
}

View File

@@ -0,0 +1,233 @@
package queue
import (
"context"
"strings"
"sync"
"time"
"qnc-server/app/main/model"
"qnc-server/pkg/lzkit/lzUtils"
"qnc-server/app/main/api/internal/svc"
"github.com/hibiken/asynq"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
// UnfreezeCommissionScanHandler 定时扫描解冻任务处理器
type UnfreezeCommissionScanHandler struct {
svcCtx *svc.ServiceContext
}
func NewUnfreezeCommissionScanHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionScanHandler {
return &UnfreezeCommissionScanHandler{
svcCtx: svcCtx,
}
}
// ProcessTask 定时扫描需要解冻的任务
func (l *UnfreezeCommissionScanHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
scanStartTime := time.Now()
now := time.Now()
logx.Infof("开始扫描需要解冻的佣金任务,当前时间: %v", now)
// 1. 查询所有待解冻且解冻时间已到的任务
// 使用索引 idx_status 和 idx_unfreeze_time 优化查询
// 不限制查询数量,找到所有需要解冻的任务
builder := l.svcCtx.AgentFreezeTaskModel.SelectBuilder().
Where("status = ? AND unfreeze_time <= ? AND del_state = ?", 1, now, 0). // 1=待解冻0=未删除
OrderBy("unfreeze_time ASC") // 按解冻时间升序,优先处理最早的任务
freezeTasks, err := l.svcCtx.AgentFreezeTaskModel.FindAll(ctx, builder, "")
if err != nil {
logx.Errorf("查询待解冻任务失败: %v", err)
return errors.Wrapf(err, "查询待解冻任务失败")
}
// 如果没有需要解冻的任务,直接返回(不创建任何记录,只记录日志)
if len(freezeTasks) == 0 {
scanDuration := time.Since(scanStartTime)
logx.Infof("没有需要解冻的任务,扫描耗时: %v", scanDuration)
return nil
}
// 2. 批次大小限制:如果任务量过大,分批处理
const maxBatchSize = 1000
originalCount := len(freezeTasks)
if len(freezeTasks) > maxBatchSize {
logx.Errorf("任务数量过多(%d),本次只处理前%d个剩余将在下次扫描处理", len(freezeTasks), maxBatchSize)
freezeTasks = freezeTasks[:maxBatchSize]
}
logx.Infof("找到 %d 个需要解冻的任务(原始数量: %d开始处理最多同时处理2个", len(freezeTasks), originalCount)
// 3. 并发控制使用信号量限制最多同时处理2个任务
const maxConcurrency = 2 // 最多同时处理2个任务
const taskTimeout = 30 * time.Second // 每个任务30秒超时
semaphore := make(chan struct{}, maxConcurrency) // 信号量通道
var wg sync.WaitGroup
var mu sync.Mutex // 保护计数器的互斥锁
successCount := 0
failCount := 0
skipCount := 0 // 跳过的任务数(已处理、时间未到等)
// 4. 并发处理所有任务但最多同时处理2个
for _, freezeTask := range freezeTasks {
// 检查是否被取消(优雅关闭支持)
select {
case <-ctx.Done():
logx.Infof("扫描任务被取消,已处理: 成功=%d, 失败=%d, 跳过=%d", successCount, failCount, skipCount)
return ctx.Err()
default:
// 继续处理
}
wg.Add(1)
semaphore <- struct{}{} // 获取信号量如果已满2个则阻塞
go func(task *model.AgentFreezeTask) {
defer wg.Done()
defer func() { <-semaphore }() // 释放信号量
taskStartTime := time.Now()
// 为每个任务设置超时控制
taskCtx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()
// 使用事务处理每个任务,确保原子性
err := l.svcCtx.AgentFreezeTaskModel.Trans(taskCtx, func(transCtx context.Context, session sqlx.Session) error {
// 4.1 重新查询任务(使用乐观锁,确保并发安全)
currentTask, err := l.svcCtx.AgentFreezeTaskModel.FindOne(transCtx, task.Id)
if err != nil {
if errors.Is(err, model.ErrNotFound) {
logx.Infof("冻结任务不存在,可能已被处理: freezeTaskId=%d", task.Id)
return nil // 任务不存在,跳过
}
return errors.Wrapf(err, "查询冻结任务失败, freezeTaskId: %d", task.Id)
}
// 4.2 幂等性增强:检查是否已经解冻过(通过 actual_unfreeze_time
if currentTask.ActualUnfreezeTime.Valid {
logx.Infof("任务已解冻,跳过: freezeTaskId=%d, actualUnfreezeTime=%v", task.Id, currentTask.ActualUnfreezeTime.Time)
return nil // 已解冻,跳过
}
// 4.3 检查任务状态(双重检查,防止并发处理)
if currentTask.Status != 1 {
logx.Infof("冻结任务状态已变更,跳过处理: freezeTaskId=%d, status=%d", task.Id, currentTask.Status)
return nil // 状态已变更,跳过
}
// 4.4 再次检查解冻时间(防止时间判断误差)
nowTime := time.Now()
if nowTime.Before(currentTask.UnfreezeTime) {
logx.Infof("冻结任务解冻时间未到,跳过处理: freezeTaskId=%d, unfreezeTime=%v", task.Id, currentTask.UnfreezeTime)
return nil // 时间未到,跳过
}
// 4.5 计算延迟时间(便于监控)
delay := nowTime.Sub(currentTask.UnfreezeTime)
if delay > 1*time.Hour {
logx.Errorf("解冻任务延迟处理: freezeTaskId=%d, 延迟=%v, unfreezeTime=%v", task.Id, delay, currentTask.UnfreezeTime)
}
// 4.6 更新冻结任务状态
currentTask.Status = 2 // 已解冻
currentTask.ActualUnfreezeTime = lzUtils.TimeToNullTime(nowTime)
if updateErr := l.svcCtx.AgentFreezeTaskModel.UpdateWithVersion(transCtx, session, currentTask); updateErr != nil {
return errors.Wrapf(updateErr, "更新冻结任务状态失败, freezeTaskId: %d", task.Id)
}
// 4.7 更新钱包(解冻余额)
wallet, walletErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(transCtx, currentTask.AgentId)
if walletErr != nil {
return errors.Wrapf(walletErr, "查询钱包失败, agentId: %d", currentTask.AgentId)
}
// 检查冻结余额是否足够(防止数据异常)
if wallet.FrozenBalance < currentTask.FreezeAmount {
logx.Errorf("钱包冻结余额不足,数据异常: freezeTaskId=%d, agentId=%d, frozenBalance=%.2f, freezeAmount=%.2f",
task.Id, currentTask.AgentId, wallet.FrozenBalance, currentTask.FreezeAmount)
return errors.Errorf("钱包冻结余额不足: agentId=%d, frozenBalance=%.2f, freezeAmount=%.2f",
currentTask.AgentId, wallet.FrozenBalance, currentTask.FreezeAmount)
}
wallet.FrozenBalance -= currentTask.FreezeAmount
wallet.Balance += currentTask.FreezeAmount
if updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(transCtx, session, wallet); updateWalletErr != nil {
return errors.Wrapf(updateWalletErr, "更新钱包失败, agentId: %d", currentTask.AgentId)
}
// 更详细的日志(包含更多上下文信息)
logx.Infof("解冻任务处理成功: freezeTaskId=%d, agentId=%d, amount=%.2f, orderPrice=%.2f, freezeTime=%v, unfreezeTime=%v, delay=%v",
task.Id, currentTask.AgentId, currentTask.FreezeAmount, currentTask.OrderPrice,
currentTask.FreezeTime, currentTask.UnfreezeTime, delay)
return nil
})
// 记录处理时间
taskDuration := time.Since(taskStartTime)
if taskDuration > 5*time.Second {
logx.Errorf("解冻任务处理耗时较长: freezeTaskId=%d, duration=%v", task.Id, taskDuration)
}
// 更新计数器(需要加锁保护)
mu.Lock()
if err != nil {
// 错误分类处理
if isTemporaryError(err) {
// 临时错误(如超时、网络问题),记录但继续处理其他任务
failCount++
logx.Errorf("解冻任务临时失败,将在下次扫描重试: freezeTaskId=%d, duration=%v, err=%v", task.Id, taskDuration, err)
} else {
// 永久错误(如数据异常),记录详细日志
failCount++
logx.Errorf("解冻任务永久失败: freezeTaskId=%d, duration=%v, err=%v", task.Id, taskDuration, err)
}
} else {
successCount++
logx.Infof("解冻任务处理完成: freezeTaskId=%d, duration=%v", task.Id, taskDuration)
}
mu.Unlock()
}(freezeTask)
}
// 5. 等待所有任务完成
wg.Wait()
// 6. 记录扫描统计信息
scanDuration := time.Since(scanStartTime)
logx.Infof("解冻任务扫描完成: 成功=%d, 失败=%d, 跳过=%d, 总计=%d, 扫描耗时=%v",
successCount, failCount, skipCount, len(freezeTasks), scanDuration)
return nil
}
// isTemporaryError 判断是否为临时错误(可以重试的错误)
func isTemporaryError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
// 超时错误
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return true
}
// 网络相关错误
errStrLower := strings.ToLower(errStr)
if strings.Contains(errStrLower, "timeout") || strings.Contains(errStrLower, "connection") || strings.Contains(errStrLower, "network") {
return true
}
// 数据库连接错误
if strings.Contains(errStrLower, "connection pool") || strings.Contains(errStrLower, "too many connections") {
return true
}
// 其他错误视为永久错误(如数据异常、业务逻辑错误等)
return false
}