Files
ycc-proxy-server/app/main/api/internal/queue/paySuccessNotify.go
2025-12-09 18:55:28 +08:00

429 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package queue
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"regexp"
"strings"
"ycc-server/app/main/api/internal/svc"
"ycc-server/app/main/api/internal/types"
"ycc-server/app/main/model"
"ycc-server/pkg/lzkit/crypto"
"ycc-server/pkg/lzkit/lzUtils"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type PaySuccessNotifyUserHandler struct {
svcCtx *svc.ServiceContext
}
func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotifyUserHandler {
return &PaySuccessNotifyUserHandler{
svcCtx: svcCtx,
}
}
var payload types.MsgPaySuccessQueryPayload
func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// 从任务的负载中解码数据
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析任务负载失败: %w", err)
}
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
if err != nil {
// 订单不存在,记录详细日志并跳过重试
logx.Errorf("支付成功通知任务失败订单不存在订单ID: %s, 错误: %v", payload.OrderID, err)
return asynq.SkipRetry // 订单不存在时跳过重试,避免重复失败
}
env := os.Getenv("ENV")
if order.Status != "paid" && env != "development" {
err = fmt.Errorf("无效的订单: %s", payload.OrderID)
logx.Errorf("处理任务失败,原因: %v", err)
return asynq.SkipRetry
}
product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId)
if err != nil {
return fmt.Errorf("找不到相关产品: orderID: %s, productID: %s", payload.OrderID, order.ProductId)
}
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
cache, cacheErr := l.svcCtx.Redis.GetCtx(ctx, redisKey)
if cacheErr != nil {
return fmt.Errorf("获取缓存内容失败: %+v", cacheErr)
}
var data types.QueryCacheLoad
err = json.Unmarshal([]byte(cache), &data)
if err != nil {
return fmt.Errorf("解析缓存内容失败: %+v", err)
}
secretKey := l.svcCtx.Config.Encrypt.SecretKey
key, decodeErr := hex.DecodeString(secretKey)
if decodeErr != nil {
return fmt.Errorf("获取AES密钥失败: %+v", decodeErr)
}
decryptData, aesdecryptErr := crypto.AesDecrypt(data.Params, key)
if aesdecryptErr != nil {
return fmt.Errorf("解密参数失败: %+v", aesdecryptErr)
}
query := &model.Query{
Id: uuid.NewString(),
OrderId: order.Id,
UserId: order.UserId,
ProductId: product.Id,
QueryParams: data.Params,
QueryState: "pending",
}
_, insertQueryErr := l.svcCtx.QueryModel.Insert(ctx, nil, query)
if insertQueryErr != nil {
return fmt.Errorf("保存查询失败: %+v", insertQueryErr)
}
// 插入后使用预生成的查询ID
queryId := query.Id
// 从数据库中查询完整的查询记录
query, err = l.svcCtx.QueryModel.FindOne(ctx, queryId)
if err != nil {
return fmt.Errorf("获取插入后的查询记录失败: %+v", err)
}
// 解析解密后的参数获取用户信息
var userInfo map[string]interface{}
if err := json.Unmarshal(decryptData, &userInfo); err != nil {
return fmt.Errorf("解析用户信息失败: %+v", err)
}
// 生成授权书
authDoc, err := l.svcCtx.AuthorizationService.GenerateAuthorizationDocument(
ctx, order.UserId, order.Id, queryId, userInfo,
)
if err != nil {
logx.Errorf("生成授权书失败: %v", err)
}
// 将授权书URL添加到解密数据中
if authDoc != nil {
// 生成完整的授权书访问URL
fullAuthDocURL := l.svcCtx.AuthorizationService.GetFullFileURL(authDoc.FileUrl)
userInfo["authorization_url"] = fullAuthDocURL
// 重新序列化用户信息
updatedDecryptData, marshalErr := json.Marshal(userInfo)
if marshalErr != nil {
logx.Errorf("序列化用户信息失败: %v", marshalErr)
} else {
// 重新加密更新后的数据
encryptedUpdatedData, encryptErr := crypto.AesEncrypt(updatedDecryptData, key)
if encryptErr != nil {
logx.Errorf("重新加密用户信息失败: %v", encryptErr)
} else {
// 更新查询记录中的参数
query.QueryParams = string(encryptedUpdatedData)
updateParamsErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateParamsErr != nil {
logx.Errorf("更新查询参数失败: %v", updateParamsErr)
} else {
logx.Infof("成功更新查询参数包含授权书URL: %s", fullAuthDocURL)
}
}
decryptData = updatedDecryptData
}
}
// 检查是否为空报告模式(开发环境)
isEmptyReportMode := env == "development" && order.PaymentPlatform == "test"
var encryptData string
if isEmptyReportMode {
// 空报告模式生成空的报告数据跳过API调用
logx.Infof("空报告模式:订单 %s (ID: %s) 跳过API调用生成空报告", order.OrderNo, order.Id)
// 生成空报告数据结构(根据实际报告格式生成)
emptyReportData := []byte(`[]`) // 空数组,表示没有数据
// 加密空报告数据
encryptedEmptyData, aesEncryptErr := crypto.AesEncrypt(emptyReportData, key)
if aesEncryptErr != nil {
err = fmt.Errorf("加密空报告数据失败: %v", aesEncryptErr)
return l.handleError(ctx, err, order, query)
}
encryptData = encryptedEmptyData
} else {
// 正常模式调用API请求服务
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id)
if err != nil {
return l.handleError(ctx, err, order, query)
}
// 加密返回响应
encryptedResponse, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
if aesEncryptErr != nil {
err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
return l.handleError(ctx, err, order, query)
}
encryptData = encryptedResponse
}
query.QueryData = lzUtils.StringToNullString(encryptData)
updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateErr != nil {
err = fmt.Errorf("保存响应数据失败: %v", updateErr)
return l.handleError(ctx, err, order, query)
}
query.QueryState = "success"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
return l.handleError(ctx, updateQueryErr, order, query)
}
// 报告生成成功后,发送代理处理异步任务(不阻塞报告流程)
if asyncErr := l.svcCtx.AsynqService.SendAgentProcessTask(order.Id); asyncErr != nil {
// 代理处理任务发送失败,只记录日志,不影响报告流程
logx.Errorf("发送代理处理任务失败订单ID: %s, 错误: %v", order.Id, asyncErr)
}
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
if delErr != nil {
logx.Errorf("删除Redis缓存失败但任务已成功处理订单ID: %s, 错误: %v", order.Id, delErr)
}
return nil
}
// 定义一个中间件函数
func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error {
logx.Errorf("处理任务失败,原因: %v", err)
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
if delErr != nil {
logx.Errorf("删除Redis缓存失败订单ID: %s, 错误: %v", order.Id, delErr)
}
if order.Status == "paid" && query.QueryState == "pending" {
// 更新查询状态为失败
query.QueryState = "failed"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
logx.Errorf("更新查询状态失败订单ID: %d, 错误: %v", order.Id, updateQueryErr)
return asynq.SkipRetry
}
// 退款
if order.PaymentPlatform == "wechat" {
refundErr := l.svcCtx.WechatPayService.WeChatRefund(ctx, order.OrderNo, order.Amount, order.Amount)
if refundErr != nil {
logx.Error(refundErr)
return asynq.SkipRetry
}
} else {
refund, refundErr := l.svcCtx.AlipayService.AliRefund(ctx, order.OrderNo, order.Amount)
if refundErr != nil {
logx.Error(refundErr)
return asynq.SkipRetry
}
if refund.IsSuccess() {
logx.Errorf("支付宝退款成功, orderID: %s", order.Id)
// 更新订单状态为退款
order.Status = "refunded"
updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order)
if updateOrderErr != nil {
logx.Errorf("更新订单状态失败订单ID: %s, 错误: %v", order.Id, updateOrderErr)
return fmt.Errorf("更新订单状态失败: %v", updateOrderErr)
}
return asynq.SkipRetry
} else {
logx.Errorf("支付宝退款失败:%v", refundErr)
return asynq.SkipRetry
}
// 直接成功
}
}
return asynq.SkipRetry
}
// desensitizeParams 对敏感数据进行脱敏处理
func (l *PaySuccessNotifyUserHandler) desensitizeParams(data []byte) ([]byte, error) {
// 解析JSON数据到map
var paramsMap map[string]interface{}
if err := json.Unmarshal(data, &paramsMap); err != nil {
return nil, fmt.Errorf("解析JSON数据失败: %v", err)
}
// 处理可能包含敏感信息的字段
for key, value := range paramsMap {
if strValue, ok := value.(string); ok {
// 根据字段名和内容判断并脱敏
if isNameField(key) && len(strValue) > 0 {
// 姓名脱敏
paramsMap[key] = maskName(strValue)
} else if isIDCardField(key) && len(strValue) > 10 {
// 身份证号脱敏
paramsMap[key] = maskIDCard(strValue)
} else if isPhoneField(key) && len(strValue) >= 8 {
// 手机号脱敏
paramsMap[key] = maskPhone(strValue)
} else if len(strValue) > 3 {
// 其他所有未匹配的字段都进行通用脱敏
paramsMap[key] = maskGeneral(strValue)
}
} else if mapValue, ok := value.(map[string]interface{}); ok {
// 递归处理嵌套的map
for subKey, subValue := range mapValue {
if subStrValue, ok := subValue.(string); ok {
if isNameField(subKey) && len(subStrValue) > 0 {
mapValue[subKey] = maskName(subStrValue)
} else if isIDCardField(subKey) && len(subStrValue) > 10 {
mapValue[subKey] = maskIDCard(subStrValue)
} else if isPhoneField(subKey) && len(subStrValue) >= 8 {
mapValue[subKey] = maskPhone(subStrValue)
} else if len(subStrValue) > 3 {
// 其他所有未匹配的字段都进行通用脱敏
mapValue[subKey] = maskGeneral(subStrValue)
}
}
}
}
}
// 将处理后的map重新序列化为JSON
return json.Marshal(paramsMap)
}
// 判断是否为姓名字段
func isNameField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "name") || strings.Contains(key, "姓名") ||
strings.Contains(key, "owner") || strings.Contains(key, "main")
}
// 判断是否为身份证字段
func isIDCardField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "idcard") || strings.Contains(key, "id_card") ||
strings.Contains(key, "身份证") || strings.Contains(key, "证件号")
}
// 判断是否为手机号字段
func isPhoneField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "phone") || strings.Contains(key, "mobile") ||
strings.Contains(key, "手机") || strings.Contains(key, "电话")
}
// 判断是否包含敏感数据模式
func containsSensitivePattern(value string) bool {
// 检查是否包含连续的数字或字母模式
numPattern := regexp.MustCompile(`\d{6,}`)
return numPattern.MatchString(value)
}
// 姓名脱敏
func maskName(name string) string {
// 将字符串转换为rune切片以正确处理中文字符
runes := []rune(name)
length := len(runes)
if length <= 1 {
return name
}
if length == 2 {
// 两个字:保留第一个字,第二个字用*替代
return string(runes[0]) + "*"
}
// 三个字及以上:保留首尾字,中间用*替代
first := string(runes[0])
last := string(runes[length-1])
mask := strings.Repeat("*", length-2)
return first + mask + last
}
// 身份证号脱敏
func maskIDCard(idCard string) string {
length := len(idCard)
if length <= 10 {
return idCard // 如果长度太短,可能不是身份证,不处理
}
// 保留前3位和后4位
return idCard[:3] + strings.Repeat("*", length-7) + idCard[length-4:]
}
// 手机号脱敏
func maskPhone(phone string) string {
length := len(phone)
if length < 8 {
return phone // 如果长度太短,可能不是手机号,不处理
}
// 保留前3位和后4位
return phone[:3] + strings.Repeat("*", length-7) + phone[length-4:]
}
// 通用敏感信息脱敏 - 根据字符串长度比例进行脱敏
func maskGeneral(value string) string {
length := len(value)
// 小于3个字符的不脱敏
if length <= 3 {
return value
}
// 根据字符串长度计算保留字符数
var prefixLen, suffixLen int
switch {
case length <= 6: // 短字符串
// 保留首尾各1个字符
prefixLen, suffixLen = 1, 1
case length <= 10: // 中等长度字符串
// 保留首部30%和尾部20%的字符
prefixLen = int(float64(length) * 0.3)
suffixLen = int(float64(length) * 0.2)
case length <= 20: // 较长字符串
// 保留首部25%和尾部15%的字符
prefixLen = int(float64(length) * 0.25)
suffixLen = int(float64(length) * 0.15)
default: // 非常长的字符串
// 保留首部20%和尾部10%的字符
prefixLen = int(float64(length) * 0.2)
suffixLen = int(float64(length) * 0.1)
}
// 确保至少有一个字符被保留
if prefixLen < 1 {
prefixLen = 1
}
if suffixLen < 1 {
suffixLen = 1
}
// 确保前缀和后缀总长不超过总长度的80%
if prefixLen+suffixLen > int(float64(length)*0.8) {
// 调整为总长度的80%
totalVisible := int(float64(length) * 0.8)
// 前缀占60%后缀占40%
prefixLen = int(float64(totalVisible) * 0.6)
suffixLen = totalVisible - prefixLen
}
// 创建脱敏后的字符串
prefix := value[:prefixLen]
suffix := value[length-suffixLen:]
masked := strings.Repeat("*", length-prefixLen-suffixLen)
return prefix + masked + suffix
}