package queue import ( "context" "encoding/hex" "encoding/json" "fmt" "os" "qnc-server/app/user/cmd/api/internal/svc" "qnc-server/app/user/cmd/api/internal/types" "qnc-server/app/user/model" "qnc-server/pkg/lzkit/crypto" "qnc-server/pkg/lzkit/lzUtils" "regexp" "strings" "github.com/hibiken/asynq" "github.com/zeromicro/go-zero/core/logx" ) type PaySuccessNotifyUserHandler struct { svcCtx *svc.ServiceContext } func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotifyUserHandler { return &PaySuccessNotifyUserHandler{ svcCtx: svcCtx, } } var payload struct { OrderID int64 `json:"order_id"` } func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error { // 从任务的负载中解码数据 if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("解析任务负载失败: %w", err) } order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID) if err != nil { return fmt.Errorf("无效的订单ID: %d, %v", payload.OrderID, err) } env := os.Getenv("ENV") if order.Status != "paid" && env != "development" { err = fmt.Errorf("无效的订单: %d", payload.OrderID) logx.Errorf("处理任务失败,原因: %v", err) return asynq.SkipRetry } product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId) if err != nil { return fmt.Errorf("找不到相关产品: orderID: %d, productID: %d", payload.OrderID, order.ProductId) } redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo) cache, cacheErr := l.svcCtx.Redis.GetCtx(ctx, redisKey) if cacheErr != nil { return fmt.Errorf("获取缓存内容失败: %+v", cacheErr) } var data types.QueryCacheLoad err = json.Unmarshal([]byte(cache), &data) if err != nil { return fmt.Errorf("解析缓存内容失败: %+v", err) } secretKey := l.svcCtx.Config.Encrypt.SecretKey key, decodeErr := hex.DecodeString(secretKey) if decodeErr != nil { return fmt.Errorf("获取AES密钥失败: %+v", decodeErr) } decryptData, aesdecryptErr := crypto.AesDecrypt(data.Params, key) if aesdecryptErr != nil { return fmt.Errorf("解密参数失败: %+v", aesdecryptErr) } // 敏感数据脱敏处理 desensitizedParams, err := l.desensitizeParams(decryptData) if err != nil { return fmt.Errorf("脱敏处理失败: %+v", err) } // 对脱敏后的数据进行AES加密 encryptedParams, encryptErr := crypto.AesEncrypt(desensitizedParams, key) if encryptErr != nil { return fmt.Errorf("加密脱敏数据失败: %+v", encryptErr) } query := &model.Query{ OrderId: order.Id, UserId: order.UserId, ProductId: product.Id, QueryParams: encryptedParams, QueryState: "pending", } result, insertQueryErr := l.svcCtx.QueryModel.Insert(ctx, nil, query) if insertQueryErr != nil { return fmt.Errorf("保存查询失败: %+v", insertQueryErr) } // 获取插入后的ID queryId, err := result.LastInsertId() if err != nil { return fmt.Errorf("获取插入的查询ID失败: %+v", err) } // 从数据库中查询完整的查询记录 query, err = l.svcCtx.QueryModel.FindOne(ctx, queryId) if err != nil { return fmt.Errorf("获取插入后的查询记录失败: %+v", err) } combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id) if err != nil { return l.handleError(ctx, err, order, query) } // 加密返回响应 encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key) if aesEncryptErr != nil { err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr) return l.handleError(ctx, err, order, query) } query.QueryData = lzUtils.StringToNullString(encryptData) updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) if updateErr != nil { err = fmt.Errorf("保存响应数据失败: %v", updateErr) return l.handleError(ctx, err, order, query) } query.QueryState = "success" updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) if updateQueryErr != nil { updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr) return l.handleError(ctx, updateQueryErr, order, query) } err = l.svcCtx.AgentService.AgentProcess(ctx, order) if err != nil { return l.handleError(ctx, err, order, query) } _, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey) if delErr != nil { logx.Errorf("删除Redis缓存失败,但任务已成功处理,订单ID: %d, 错误: %v", order.Id, delErr) } return nil } // 定义一个中间件函数 func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error { logx.Errorf("处理任务失败,原因: %v", err) redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo) _, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey) if delErr != nil { logx.Errorf("删除Redis缓存失败,订单ID: %d, 错误: %v", order.Id, delErr) } if order.Status == "paid" && query.QueryState == "pending" { // 更新查询状态为失败 query.QueryState = "failed" updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) if updateQueryErr != nil { logx.Errorf("更新查询状态失败,订单ID: %d, 错误: %v", order.Id, updateQueryErr) return asynq.SkipRetry } // 退款 if order.PaymentPlatform == "wechat" { refundErr := l.svcCtx.WechatPayService.WeChatRefund(ctx, order.OrderNo, order.Amount, order.Amount) if refundErr != nil { logx.Error(refundErr) return asynq.SkipRetry } } else { refund, refundErr := l.svcCtx.AlipayService.AliRefund(ctx, order.OrderNo, order.Amount) if refundErr != nil { logx.Error(refundErr) return asynq.SkipRetry } if refund.IsSuccess() { logx.Errorf("支付宝退款成功, orderID: %d", order.Id) // 更新订单状态为退款 order.Status = "refunded" updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order) if updateOrderErr != nil { logx.Errorf("更新订单状态失败,订单ID: %d, 错误: %v", order.Id, updateOrderErr) return fmt.Errorf("更新订单状态失败: %v", updateOrderErr) } return asynq.SkipRetry } else { logx.Errorf("支付宝退款失败:%v", refundErr) return asynq.SkipRetry } // 直接成功 } } return asynq.SkipRetry } // desensitizeParams 对敏感数据进行脱敏处理 func (l *PaySuccessNotifyUserHandler) desensitizeParams(data []byte) ([]byte, error) { // 解析JSON数据到map var paramsMap map[string]interface{} if err := json.Unmarshal(data, ¶msMap); err != nil { return nil, fmt.Errorf("解析JSON数据失败: %v", err) } // 处理可能包含敏感信息的字段 for key, value := range paramsMap { if strValue, ok := value.(string); ok { // 根据字段名和内容判断并脱敏 if isNameField(key) && len(strValue) > 0 { // 姓名脱敏 paramsMap[key] = maskName(strValue) } else if isIDCardField(key) && len(strValue) > 10 { // 身份证号脱敏 paramsMap[key] = maskIDCard(strValue) } else if isPhoneField(key) && len(strValue) >= 8 { // 手机号脱敏 paramsMap[key] = maskPhone(strValue) } else if len(strValue) > 3 { // 其他所有未匹配的字段都进行通用脱敏 paramsMap[key] = maskGeneral(strValue) } } else if mapValue, ok := value.(map[string]interface{}); ok { // 递归处理嵌套的map for subKey, subValue := range mapValue { if subStrValue, ok := subValue.(string); ok { if isNameField(subKey) && len(subStrValue) > 0 { mapValue[subKey] = maskName(subStrValue) } else if isIDCardField(subKey) && len(subStrValue) > 10 { mapValue[subKey] = maskIDCard(subStrValue) } else if isPhoneField(subKey) && len(subStrValue) >= 8 { mapValue[subKey] = maskPhone(subStrValue) } else if len(subStrValue) > 3 { // 其他所有未匹配的字段都进行通用脱敏 mapValue[subKey] = maskGeneral(subStrValue) } } } } } // 将处理后的map重新序列化为JSON return json.Marshal(paramsMap) } // 判断是否为姓名字段 func isNameField(key string) bool { key = strings.ToLower(key) return strings.Contains(key, "name") || strings.Contains(key, "姓名") || strings.Contains(key, "owner") || strings.Contains(key, "user") } // 判断是否为身份证字段 func isIDCardField(key string) bool { key = strings.ToLower(key) return strings.Contains(key, "idcard") || strings.Contains(key, "id_card") || strings.Contains(key, "身份证") || strings.Contains(key, "证件号") } // 判断是否为手机号字段 func isPhoneField(key string) bool { key = strings.ToLower(key) return strings.Contains(key, "phone") || strings.Contains(key, "mobile") || strings.Contains(key, "手机") || strings.Contains(key, "电话") } // 判断是否包含敏感数据模式 func containsSensitivePattern(value string) bool { // 检查是否包含连续的数字或字母模式 numPattern := regexp.MustCompile(`\d{6,}`) return numPattern.MatchString(value) } // 姓名脱敏 func maskName(name string) string { // 将字符串转换为rune切片以正确处理中文字符 runes := []rune(name) length := len(runes) if length <= 1 { return name } if length == 2 { // 两个字:保留第一个字,第二个字用*替代 return string(runes[0]) + "*" } // 三个字及以上:保留首尾字,中间用*替代 first := string(runes[0]) last := string(runes[length-1]) mask := strings.Repeat("*", length-2) return first + mask + last } // 身份证号脱敏 func maskIDCard(idCard string) string { length := len(idCard) if length <= 10 { return idCard // 如果长度太短,可能不是身份证,不处理 } // 保留前3位和后4位 return idCard[:3] + strings.Repeat("*", length-7) + idCard[length-4:] } // 手机号脱敏 func maskPhone(phone string) string { length := len(phone) if length < 8 { return phone // 如果长度太短,可能不是手机号,不处理 } // 保留前3位和后4位 return phone[:3] + strings.Repeat("*", length-7) + phone[length-4:] } // 通用敏感信息脱敏 - 根据字符串长度比例进行脱敏 func maskGeneral(value string) string { length := len(value) // 小于3个字符的不脱敏 if length <= 3 { return value } // 根据字符串长度计算保留字符数 var prefixLen, suffixLen int switch { case length <= 6: // 短字符串 // 保留首尾各1个字符 prefixLen, suffixLen = 1, 1 case length <= 10: // 中等长度字符串 // 保留首部30%和尾部20%的字符 prefixLen = int(float64(length) * 0.3) suffixLen = int(float64(length) * 0.2) case length <= 20: // 较长字符串 // 保留首部25%和尾部15%的字符 prefixLen = int(float64(length) * 0.25) suffixLen = int(float64(length) * 0.15) default: // 非常长的字符串 // 保留首部20%和尾部10%的字符 prefixLen = int(float64(length) * 0.2) suffixLen = int(float64(length) * 0.1) } // 确保至少有一个字符被保留 if prefixLen < 1 { prefixLen = 1 } if suffixLen < 1 { suffixLen = 1 } // 确保前缀和后缀总长不超过总长度的80% if prefixLen+suffixLen > int(float64(length)*0.8) { // 调整为总长度的80% totalVisible := int(float64(length) * 0.8) // 前缀占60%,后缀占40% prefixLen = int(float64(totalVisible) * 0.6) suffixLen = totalVisible - prefixLen } // 创建脱敏后的字符串 prefix := value[:prefixLen] suffix := value[length-suffixLen:] masked := strings.Repeat("*", length-prefixLen-suffixLen) return prefix + masked + suffix }