This commit is contained in:
2025-04-27 12:17:18 +08:00
parent b60f6ffb3e
commit 2aea96db2c
128 changed files with 396 additions and 222 deletions

View File

@@ -0,0 +1,184 @@
package queue
import (
"context"
"tyc-server/app/main/api/internal/svc"
"github.com/hibiken/asynq"
)
// CarMaintenanceQueryHandler 车辆维保记录查询任务处理器
type CarMaintenanceQueryHandler struct {
svcCtx *svc.ServiceContext
}
// NewCarMaintenanceQueryHandler 创建车辆维保记录查询处理器
func NewCarMaintenanceQueryHandler(svcCtx *svc.ServiceContext) *CarMaintenanceQueryHandler {
return &CarMaintenanceQueryHandler{
svcCtx: svcCtx,
}
}
// ProcessTask 处理车辆维保记录查询任务
func (h *CarMaintenanceQueryHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
// var payload types.MsgCarMaintenanceQueryPayload
// if err := json.Unmarshal(task.Payload(), &payload); err != nil {
// logx.Errorf("解析车辆维保记录查询任务参数失败: %v", err)
// return err
// }
// logx.Infof("处理车辆维保记录查询任务,订单号: %s, 重试次数: %d, 查询ID: %d",
// payload.OrderID, payload.RetryCount, payload.QueryID)
// // 调用apirequestService的ProcessCAR059Request方法
// request := map[string]interface{}{
// "order_id": payload.OrderID,
// }
// requestBytes, err := json.Marshal(request)
// if err != nil {
// logx.Errorf("车辆维保记录查询任务,编码请求参数失败: %v", err)
// return err
// }
// // 调用API处理
// result, err := h.svcCtx.ApiRequestService.ProcessCAR059Request(requestBytes)
// // 处理结果
// if err != nil {
// if result != nil {
// // 使用gjson解析返回结果中的状态码
// retcode := gjson.GetBytes(result.Data, "retcode").String()
// logx.Infof("车辆维保记录查询任务,订单号: %s, 状态码: %s", payload.OrderID, retcode)
// // 根据不同状态码进行不同处理
// switch retcode {
// case "warn_029": // 未查询到结果,请稍后重试
// logx.Infof("车辆维保记录查询任务,订单号: %s, 未查询到结果,需要重试", payload.OrderID)
// // 安排重试
// return h.scheduleRetryWithBackoff(payload.OrderID, payload.RetryCount, payload.QueryID)
// case "warn_013": // 查无记录
// logx.Infof("车辆维保记录查询任务,订单号: %s, 查无记录", payload.OrderID)
// // 更新query表为查无记录
// if payload.QueryID > 0 {
// err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "completed_no_data")
// if err != nil {
// logx.Errorf("更新query表失败(查无记录): %v, 查询ID: %d", err, payload.QueryID)
// }
// }
// }
// }
// logx.Infof("车辆维保记录查询任务,订单号: %s, 查询失败", payload.OrderID)
// // 更新query表为查询失败
// if payload.QueryID > 0 {
// err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "failed")
// if err != nil {
// logx.Errorf("更新query表失败(查询失败): %v, 查询ID: %d", err, payload.QueryID)
// }
// }
// return err
// }
// logx.Infof("车辆维保记录查询任务完成,订单号: %s, 结果数据长度: %d", payload.OrderID, len(result.Data))
// // 更新query表为查询成功
// if payload.QueryID > 0 {
// err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "completed")
// if err != nil {
// logx.Errorf("更新query表失败(查询成功): %v, 查询ID: %d", err, payload.QueryID)
// }
// }
return nil
}
// // scheduleRetryWithBackoff 安排指数退避重试任务
// func (h *CarMaintenanceQueryHandler) scheduleRetryWithBackoff(orderID string, retryCount int, queryID int64) error {
// // 最大重试次数限制
// maxRetries := 10
// if retryCount >= maxRetries {
// logx.Infof("车辆维保记录查询任务已达最大重试次数 %d停止重试订单号: %s", maxRetries, orderID)
// // 更新query表为查询超时
// if queryID > 0 {
// err := h.updateQueryErrorState(context.Background(), queryID, "查询超时,已达最大重试次数")
// if err != nil {
// logx.Errorf("更新query表超时状态失败: %v, 查询ID: %d", err, queryID)
// }
// }
// return nil
// }
// // 使用指数退避策略的异步任务
// err := h.svcCtx.AsynqService.SendCarMaintenanceQueryTaskWithBackoff(orderID, retryCount, queryID)
// if err != nil {
// logx.Errorf("安排指数退避重试任务失败: %v, 订单号: %s", err, orderID)
// return err
// }
// logx.Infof("已安排指数退避重试任务, 当前重试次数: %d, 订单号: %s, 查询ID: %d", retryCount, orderID, queryID)
// return nil
// }
// // updateQueryResult 更新query表的查询结果
// func (h *CarMaintenanceQueryHandler) updateQueryResult(ctx context.Context, queryID int64, resultData []byte, state string) error {
// // 先查询当前query记录
// query, err := h.svcCtx.QueryModel.FindOne(ctx, queryID)
// if err != nil {
// if err == model.ErrNotFound {
// return fmt.Errorf("查询记录不存在, ID: %d", queryID)
// }
// return err
// }
// // 更新查询结果
// query.QueryData = sql.NullString{
// String: string(resultData),
// Valid: true,
// }
// query.QueryState = state
// query.Version++
// // 更新记录
// err = h.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
// if err != nil {
// return fmt.Errorf("更新查询记录失败: %v", err)
// }
// logx.Infof("成功更新查询记录查询ID: %d, 状态: %s", queryID, state)
// return nil
// }
// // updateQueryErrorState 更新query表为错误状态
// func (h *CarMaintenanceQueryHandler) updateQueryErrorState(ctx context.Context, queryID int64, errorMsg string) error {
// // 先查询当前query记录
// query, err := h.svcCtx.QueryModel.FindOne(ctx, queryID)
// if err != nil {
// if err == model.ErrNotFound {
// return fmt.Errorf("查询记录不存在, ID: %d", queryID)
// }
// return err
// }
// // 构造错误结果
// errorResult := map[string]interface{}{
// "error": errorMsg,
// }
// resultBytes, _ := json.Marshal(errorResult)
// // 更新查询结果
// query.QueryData = sql.NullString{
// String: string(resultBytes),
// Valid: true,
// }
// query.QueryState = "failed"
// query.Version++
// // 更新记录
// err = h.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
// if err != nil {
// return fmt.Errorf("更新查询记录失败: %v", err)
// }
// logx.Infof("成功更新查询记录为失败状态查询ID: %d, 错误: %s", queryID, errorMsg)
// return nil
// }

View File

@@ -0,0 +1,43 @@
package queue
import (
"context"
"time"
"tyc-server/app/main/api/internal/svc"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
const TASKTIME = "0 3 * * *"
type CleanQueryDataHandler struct {
svcCtx *svc.ServiceContext
}
func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler {
return &CleanQueryDataHandler{
svcCtx: svcCtx,
}
}
func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
now := time.Now().Format("2006-01-02 15:04:05")
logx.Infof("%s - 开始执行查询数据清理任务", now)
// 计算7天前的时间
sevenDaysAgo := time.Now().AddDate(0, 0, -7)
// 创建查询条件排除product_id为4的记录
conditions := l.svcCtx.QueryModel.SelectBuilder().Where("product_id != ?", 4)
// 调用QueryModel删除7天前的数据排除product_id为4的记录
result, err := l.svcCtx.QueryModel.DeleteBefore(ctx, sevenDaysAgo, conditions)
if err != nil {
logx.Errorf("%s - 清理查询数据失败: %v", time.Now().Format("2006-01-02 15:04:05"), err)
return err
}
logx.Infof("%s - 查询数据清理完成,共删除 %d 条记录", time.Now().Format("2006-01-02 15:04:05"), result)
return nil
}

View File

@@ -0,0 +1,366 @@
package queue
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"regexp"
"strings"
"tyc-server/app/main/api/internal/svc"
"tyc-server/app/main/api/internal/types"
"tyc-server/app/main/model"
"tyc-server/pkg/lzkit/crypto"
"tyc-server/pkg/lzkit/lzUtils"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type PaySuccessNotifyUserHandler struct {
svcCtx *svc.ServiceContext
}
func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotifyUserHandler {
return &PaySuccessNotifyUserHandler{
svcCtx: svcCtx,
}
}
var payload struct {
OrderID int64 `json:"order_id"`
}
func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// 从任务的负载中解码数据
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析任务负载失败: %w", err)
}
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
if err != nil {
return fmt.Errorf("无效的订单ID: %d, %v", payload.OrderID, err)
}
if order.Status != "paid" {
err = fmt.Errorf("无效的订单: %d", payload.OrderID)
logx.Errorf("处理任务失败,原因: %v", err)
return asynq.SkipRetry
}
product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId)
if err != nil {
return fmt.Errorf("找不到相关产品: orderID: %d, productID: %d", payload.OrderID, order.ProductId)
}
redisKey := fmt.Sprintf("%d:%s", order.UserId, order.OrderNo)
cache, cacheErr := l.svcCtx.Redis.GetCtx(ctx, redisKey)
if cacheErr != nil {
return fmt.Errorf("获取缓存内容失败: %+v", cacheErr)
}
secretKey := l.svcCtx.Config.Encrypt.SecretKey
key, decodeErr := hex.DecodeString(secretKey)
if decodeErr != nil {
return fmt.Errorf("获取AES密钥失败: %+v", decodeErr)
}
var data types.QueryCacheLoad
err = json.Unmarshal([]byte(cache), &data)
if err != nil {
return fmt.Errorf("解析缓存内容失败: %+v", err)
}
decryptData, aesdecryptErr := crypto.AesDecrypt(data.Params, key)
if aesdecryptErr != nil {
return fmt.Errorf("解密参数失败: %+v", aesdecryptErr)
}
// 敏感数据脱敏处理
desensitizedParams, err := l.desensitizeParams(decryptData)
if err != nil {
return fmt.Errorf("脱敏处理失败: %+v", err)
}
// 对脱敏后的数据进行AES加密
encryptedParams, encryptErr := crypto.AesEncrypt(desensitizedParams, key)
if encryptErr != nil {
return fmt.Errorf("加密脱敏数据失败: %+v", encryptErr)
}
query := &model.Query{
OrderId: order.Id,
UserId: order.UserId,
ProductId: product.Id,
QueryParams: encryptedParams,
QueryState: model.QueryStateProcessing,
}
result, insertQueryErr := l.svcCtx.QueryModel.Insert(ctx, nil, query)
if insertQueryErr != nil {
return fmt.Errorf("保存查询失败: %+v", insertQueryErr)
}
// 获取插入后的ID
queryId, err := result.LastInsertId()
if err != nil {
return fmt.Errorf("获取插入的查询ID失败: %+v", err)
}
// 从数据库中查询完整的查询记录
query, err = l.svcCtx.QueryModel.FindOne(ctx, queryId)
if err != nil {
return fmt.Errorf("获取插入后的查询记录失败: %+v", err)
}
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(ctx, decryptData, product.Id)
if err != nil {
handleErrorErr := fmt.Errorf("处理请求失败: %v", err)
return l.handleError(ctx, handleErrorErr, order, query)
}
// 加密返回响应
encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
if aesEncryptErr != nil {
err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
return l.handleError(ctx, err, order, query)
}
query.QueryData = lzUtils.StringToNullString(encryptData)
updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateErr != nil {
err = fmt.Errorf("保存响应数据失败: %v", updateErr)
return l.handleError(ctx, err, order, query)
}
query.QueryState = "success"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
return l.handleError(ctx, updateQueryErr, order, query)
}
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
if delErr != nil {
logx.Errorf("删除Redis缓存失败但任务已成功处理订单ID: %d, 错误: %v", order.Id, delErr)
}
return nil
}
// 定义一个中间件函数
func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error {
logx.Errorf("处理任务失败,原因: %v", err)
redisKey := fmt.Sprintf("%d:%s", order.UserId, order.OrderNo)
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
if delErr != nil {
logx.Errorf("删除Redis缓存失败订单ID: %d, 错误: %v", order.Id, delErr)
}
if order.Status == "paid" && query.QueryState == model.QueryStateProcessing {
// 更新查询状态为失败
query.QueryState = model.QueryStateFailed
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
logx.Errorf("更新查询状态失败订单ID: %d, 错误: %v", order.Id, updateQueryErr)
return asynq.SkipRetry
}
// 退款
if order.PaymentPlatform == "wechat" {
refundErr := l.svcCtx.WechatPayService.WeChatRefund(ctx, order.OrderNo, order.Amount, order.Amount)
if refundErr != nil {
logx.Error(refundErr)
return asynq.SkipRetry
}
} else {
refund, refundErr := l.svcCtx.AlipayService.AliRefund(ctx, order.OrderNo, order.Amount)
if refundErr != nil {
logx.Error(refundErr)
return asynq.SkipRetry
}
if refund.IsSuccess() {
logx.Errorf("支付宝退款成功, orderID: %d", order.Id)
// 更新订单状态为退款
order.Status = "refunded"
updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order)
if updateOrderErr != nil {
logx.Errorf("更新订单状态失败订单ID: %d, 错误: %v", order.Id, updateOrderErr)
return fmt.Errorf("更新订单状态失败: %v", updateOrderErr)
}
return asynq.SkipRetry
} else {
logx.Errorf("支付宝退款失败:%v", refundErr)
return asynq.SkipRetry
}
// 直接成功
}
}
return asynq.SkipRetry
}
// desensitizeParams 对敏感数据进行脱敏处理
func (l *PaySuccessNotifyUserHandler) desensitizeParams(data []byte) ([]byte, error) {
// 解析JSON数据到map
var paramsMap map[string]interface{}
if err := json.Unmarshal(data, &paramsMap); err != nil {
return nil, fmt.Errorf("解析JSON数据失败: %v", err)
}
// 处理可能包含敏感信息的字段
for key, value := range paramsMap {
if strValue, ok := value.(string); ok {
// 根据字段名和内容判断并脱敏
if isNameField(key) && len(strValue) > 0 {
// 姓名脱敏
paramsMap[key] = maskName(strValue)
} else if isIDCardField(key) && len(strValue) > 10 {
// 身份证号脱敏
paramsMap[key] = maskIDCard(strValue)
} else if isPhoneField(key) && len(strValue) >= 8 {
// 手机号脱敏
paramsMap[key] = maskPhone(strValue)
} else if len(strValue) > 3 {
// 其他所有未匹配的字段都进行通用脱敏
paramsMap[key] = maskGeneral(strValue)
}
} else if mapValue, ok := value.(map[string]interface{}); ok {
// 递归处理嵌套的map
for subKey, subValue := range mapValue {
if subStrValue, ok := subValue.(string); ok {
if isNameField(subKey) && len(subStrValue) > 0 {
mapValue[subKey] = maskName(subStrValue)
} else if isIDCardField(subKey) && len(subStrValue) > 10 {
mapValue[subKey] = maskIDCard(subStrValue)
} else if isPhoneField(subKey) && len(subStrValue) >= 8 {
mapValue[subKey] = maskPhone(subStrValue)
} else if len(subStrValue) > 3 {
// 其他所有未匹配的字段都进行通用脱敏
mapValue[subKey] = maskGeneral(subStrValue)
}
}
}
}
}
// 将处理后的map重新序列化为JSON
return json.Marshal(paramsMap)
}
// 判断是否为姓名字段
func isNameField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "name") || strings.Contains(key, "姓名") ||
strings.Contains(key, "owner") || strings.Contains(key, "user")
}
// 判断是否为身份证字段
func isIDCardField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "idcard") || strings.Contains(key, "id_card") ||
strings.Contains(key, "身份证") || strings.Contains(key, "证件号")
}
// 判断是否为手机号字段
func isPhoneField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "phone") || strings.Contains(key, "mobile") ||
strings.Contains(key, "手机") || strings.Contains(key, "电话")
}
// 判断是否包含敏感数据模式
func containsSensitivePattern(value string) bool {
// 检查是否包含连续的数字或字母模式
numPattern := regexp.MustCompile(`\d{6,}`)
return numPattern.MatchString(value)
}
// 姓名脱敏
func maskName(name string) string {
// 将字符串转换为rune切片以正确处理中文字符
runes := []rune(name)
length := len(runes)
if length <= 1 {
return name
}
if length == 2 {
// 两个字:保留第一个字,第二个字用*替代
return string(runes[0]) + "*"
}
// 三个字及以上:保留首尾字,中间用*替代
first := string(runes[0])
last := string(runes[length-1])
mask := strings.Repeat("*", length-2)
return first + mask + last
}
// 身份证号脱敏
func maskIDCard(idCard string) string {
length := len(idCard)
if length <= 10 {
return idCard // 如果长度太短,可能不是身份证,不处理
}
// 保留前3位和后4位
return idCard[:3] + strings.Repeat("*", length-7) + idCard[length-4:]
}
// 手机号脱敏
func maskPhone(phone string) string {
length := len(phone)
if length < 8 {
return phone // 如果长度太短,可能不是手机号,不处理
}
// 保留前3位和后4位
return phone[:3] + strings.Repeat("*", length-7) + phone[length-4:]
}
// 通用敏感信息脱敏 - 根据字符串长度比例进行脱敏
func maskGeneral(value string) string {
length := len(value)
// 小于3个字符的不脱敏
if length <= 3 {
return value
}
// 根据字符串长度计算保留字符数
var prefixLen, suffixLen int
switch {
case length <= 6: // 短字符串
// 保留首尾各1个字符
prefixLen, suffixLen = 1, 1
case length <= 10: // 中等长度字符串
// 保留首部30%和尾部20%的字符
prefixLen = int(float64(length) * 0.3)
suffixLen = int(float64(length) * 0.2)
case length <= 20: // 较长字符串
// 保留首部25%和尾部15%的字符
prefixLen = int(float64(length) * 0.25)
suffixLen = int(float64(length) * 0.15)
default: // 非常长的字符串
// 保留首部20%和尾部10%的字符
prefixLen = int(float64(length) * 0.2)
suffixLen = int(float64(length) * 0.1)
}
// 确保至少有一个字符被保留
if prefixLen < 1 {
prefixLen = 1
}
if suffixLen < 1 {
suffixLen = 1
}
// 确保前缀和后缀总长不超过总长度的80%
if prefixLen+suffixLen > int(float64(length)*0.8) {
// 调整为总长度的80%
totalVisible := int(float64(length) * 0.8)
// 前缀占60%后缀占40%
prefixLen = int(float64(totalVisible) * 0.6)
suffixLen = totalVisible - prefixLen
}
// 创建脱敏后的字符串
prefix := value[:prefixLen]
suffix := value[length-suffixLen:]
masked := strings.Repeat("*", length-prefixLen-suffixLen)
return prefix + masked + suffix
}

View File

@@ -0,0 +1,41 @@
package queue
import (
"context"
"fmt"
"tyc-server/app/main/api/internal/svc"
"tyc-server/app/main/api/internal/types"
"github.com/hibiken/asynq"
)
type CronJob struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCronJob(ctx context.Context, svcCtx *svc.ServiceContext) *CronJob {
return &CronJob{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CronJob) Register() *asynq.ServeMux {
redisClientOpt := asynq.RedisClientOpt{Addr: l.svcCtx.Config.CacheRedis[0].Host, Password: l.svcCtx.Config.CacheRedis[0].Pass}
scheduler := asynq.NewScheduler(redisClientOpt, nil)
task := asynq.NewTask(types.MsgCleanQueryData, nil, nil)
_, err := scheduler.Register(TASKTIME, task)
if err != nil {
panic(fmt.Sprintf("定时任务注册失败:%v", err))
}
scheduler.Start()
fmt.Println("定时任务启动!!!")
mux := asynq.NewServeMux()
mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx))
mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx))
mux.Handle(types.MsgCarMaintenanceQuery, NewCarMaintenanceQueryHandler(l.svcCtx))
return mux
}