ycc-server/app/main/api/internal/queue/paySuccessNotify.go
2025-06-20 15:12:34 +08:00

491 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package queue
import (
"context"
"database/sql"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"os"
"regexp"
"strings"
"time"
"ycc-server/app/main/api/internal/svc"
"ycc-server/app/main/api/internal/types"
"ycc-server/app/main/model"
"ycc-server/pkg/lzkit/crypto"
"ycc-server/pkg/lzkit/lzUtils"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
type PaySuccessNotifyUserHandler struct {
svcCtx *svc.ServiceContext
}
func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotifyUserHandler {
return &PaySuccessNotifyUserHandler{
svcCtx: svcCtx,
}
}
var payload struct {
OrderID int64 `json:"order_id"`
}
func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// 从任务的负载中解码数据
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析任务负载失败: %w", err)
}
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
if err != nil {
return fmt.Errorf("无效的订单ID: %d, %v", payload.OrderID, err)
}
env := os.Getenv("ENV")
if order.Status != "paid" && env != "development" {
err = fmt.Errorf("无效的订单: %d", payload.OrderID)
logx.Errorf("处理任务失败,原因: %v", err)
return asynq.SkipRetry
}
product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId)
if err != nil {
return fmt.Errorf("找不到相关产品: orderID: %d, productID: %d", payload.OrderID, order.ProductId)
}
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
cache, cacheErr := l.svcCtx.Redis.GetCtx(ctx, redisKey)
if cacheErr != nil {
return fmt.Errorf("获取缓存内容失败: %+v", cacheErr)
}
var data types.QueryCacheLoad
err = json.Unmarshal([]byte(cache), &data)
if err != nil {
return fmt.Errorf("解析缓存内容失败: %+v", err)
}
secretKey := l.svcCtx.Config.Encrypt.SecretKey
key, decodeErr := hex.DecodeString(secretKey)
if decodeErr != nil {
return fmt.Errorf("获取AES密钥失败: %+v", decodeErr)
}
decryptData, aesdecryptErr := crypto.AesDecrypt(data.Params, key)
if aesdecryptErr != nil {
return fmt.Errorf("解密参数失败: %+v", aesdecryptErr)
}
// 敏感数据脱敏处理
desensitizedParams, err := l.desensitizeParams(decryptData)
if err != nil {
return fmt.Errorf("脱敏处理失败: %+v", err)
}
// 对脱敏后的数据进行AES加密
encryptedParams, encryptErr := crypto.AesEncrypt(desensitizedParams, key)
if encryptErr != nil {
return fmt.Errorf("加密脱敏数据失败: %+v", encryptErr)
}
query := &model.Query{
OrderId: order.Id,
UserId: order.UserId,
ProductId: product.Id,
QueryParams: encryptedParams,
QueryState: "pending",
}
result, insertQueryErr := l.svcCtx.QueryModel.Insert(ctx, nil, query)
if insertQueryErr != nil {
return fmt.Errorf("保存查询失败: %+v", insertQueryErr)
}
// 获取插入后的ID
queryId, err := result.LastInsertId()
if err != nil {
return fmt.Errorf("获取插入的查询ID失败: %+v", err)
}
// 从数据库中查询完整的查询记录
query, err = l.svcCtx.QueryModel.FindOne(ctx, queryId)
if err != nil {
return fmt.Errorf("获取插入后的查询记录失败: %+v", err)
}
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id)
if err != nil {
return l.handleError(ctx, err, order, query)
}
// 加密返回响应
encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
if aesEncryptErr != nil {
err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
return l.handleError(ctx, err, order, query)
}
query.QueryData = lzUtils.StringToNullString(encryptData)
updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateErr != nil {
err = fmt.Errorf("保存响应数据失败: %v", updateErr)
return l.handleError(ctx, err, order, query)
}
query.QueryState = "success"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
return l.handleError(ctx, updateQueryErr, order, query)
}
err = l.svcCtx.AgentService.AgentProcess(ctx, order)
if err != nil {
return l.handleError(ctx, err, order, query)
}
err = l.promotionOrderStats(ctx, order)
if err != nil {
logx.Errorf("处理推广订单统计失败订单ID: %d, 错误: %v", order.Id, err)
}
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
if delErr != nil {
logx.Errorf("删除Redis缓存失败但任务已成功处理订单ID: %d, 错误: %v", order.Id, delErr)
}
return nil
}
// 定义一个中间件函数
func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error {
logx.Errorf("处理任务失败,原因: %v", err)
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
if delErr != nil {
logx.Errorf("删除Redis缓存失败订单ID: %d, 错误: %v", order.Id, delErr)
}
if order.Status == "paid" && query.QueryState == "pending" {
// 更新查询状态为失败
query.QueryState = "failed"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
logx.Errorf("更新查询状态失败订单ID: %d, 错误: %v", order.Id, updateQueryErr)
return asynq.SkipRetry
}
// 发起退款并创建退款记录
refundErr := l.processRefund(ctx, order, "业务处理失败,自动退款")
if refundErr != nil {
logx.Errorf("退款处理失败订单ID: %d, 错误: %v", order.Id, refundErr)
return asynq.SkipRetry
}
}
return asynq.SkipRetry
}
// processRefund 处理退款逻辑
func (l *PaySuccessNotifyUserHandler) processRefund(ctx context.Context, order *model.Order, refundReason string) error {
refundNo := fmt.Sprintf("refund-%s", order.OrderNo)
if order.PaymentPlatform == "wechat" {
// 微信退款(异步)
refundErr := l.svcCtx.WechatPayService.WeChatRefund(ctx, order.OrderNo, order.Amount, order.Amount)
if refundErr != nil {
// 微信退款调用失败,创建失败记录
createRefundErr := l.createRefundRecord(ctx, order, refundNo, "", model.OrderRefundStatusFailed, refundReason)
if createRefundErr != nil {
logx.Errorf("创建微信退款失败记录时出错: %v", createRefundErr)
}
return refundErr
}
// 微信退款调用成功创建pending记录并更新订单状态
return l.svcCtx.OrderModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error {
// 创建pending状态的退款记录
if err := l.createRefundRecordWithSession(ctx, session, order, refundNo, "", model.OrderRefundStatusPending, refundReason); err != nil {
return fmt.Errorf("创建微信退款记录失败: %v", err)
}
// 更新订单状态为退款中
order.Status = model.OrderStatusRefunding
if _, err := l.svcCtx.OrderModel.Update(ctx, session, order); err != nil {
return fmt.Errorf("更新订单状态失败: %v", err)
}
return nil
})
} else if order.PaymentPlatform == "alipay" {
// 支付宝退款(同步)
refund, refundErr := l.svcCtx.AlipayService.AliRefund(ctx, order.OrderNo, order.Amount)
if refundErr != nil {
// 支付宝退款调用失败,创建失败记录
createRefundErr := l.createRefundRecord(ctx, order, refundNo, "", model.OrderRefundStatusFailed, refundReason)
if createRefundErr != nil {
logx.Errorf("创建支付宝退款失败记录时出错: %v", createRefundErr)
}
return refundErr
}
if refund.IsSuccess() {
// 支付宝退款成功,创建成功记录并更新订单状态
return l.svcCtx.OrderModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error {
// 创建成功状态的退款记录
if err := l.createRefundRecordWithSession(ctx, session, order, refundNo, refund.TradeNo, model.OrderRefundStatusSuccess, refundReason); err != nil {
return fmt.Errorf("创建支付宝退款成功记录失败: %v", err)
}
// 更新订单状态为已退款
order.Status = model.OrderStatusRefunded
order.RefundTime = sql.NullTime{Time: time.Now(), Valid: true}
if _, err := l.svcCtx.OrderModel.Update(ctx, session, order); err != nil {
return fmt.Errorf("更新订单状态失败: %v", err)
}
return nil
})
} else {
// 支付宝退款失败,创建失败记录
createRefundErr := l.createRefundRecord(ctx, order, refundNo, refund.TradeNo, model.OrderRefundStatusFailed, refundReason)
if createRefundErr != nil {
logx.Errorf("创建支付宝退款失败记录时出错: %v", createRefundErr)
}
return fmt.Errorf("支付宝退款失败: %s", refund.Msg)
}
} else {
return fmt.Errorf("不支持的支付平台: %s", order.PaymentPlatform)
}
}
// createRefundRecord 创建退款记录(无事务)
func (l *PaySuccessNotifyUserHandler) createRefundRecord(ctx context.Context, order *model.Order, refundNo, platformRefundId, status, reason string) error {
refund := &model.OrderRefund{
RefundNo: refundNo,
PlatformRefundId: l.createNullString(platformRefundId),
OrderId: order.Id,
UserId: order.UserId,
ProductId: order.ProductId,
RefundAmount: order.Amount,
RefundReason: l.createNullString(reason),
Status: status,
RefundTime: sql.NullTime{Time: time.Now(), Valid: true},
}
_, err := l.svcCtx.OrderRefundModel.Insert(ctx, nil, refund)
return err
}
// createRefundRecordWithSession 创建退款记录(带事务)
func (l *PaySuccessNotifyUserHandler) createRefundRecordWithSession(ctx context.Context, session sqlx.Session, order *model.Order, refundNo, platformRefundId, status, reason string) error {
refund := &model.OrderRefund{
RefundNo: refundNo,
PlatformRefundId: l.createNullString(platformRefundId),
OrderId: order.Id,
UserId: order.UserId,
ProductId: order.ProductId,
RefundAmount: order.Amount,
RefundReason: l.createNullString(reason),
Status: status,
RefundTime: sql.NullTime{Time: time.Now(), Valid: true},
}
_, err := l.svcCtx.OrderRefundModel.Insert(ctx, session, refund)
return err
}
// createNullString 创建 sql.NullString
func (l *PaySuccessNotifyUserHandler) createNullString(value string) sql.NullString {
return sql.NullString{
String: value,
Valid: value != "",
}
}
// 处理推广订单统计
func (l *PaySuccessNotifyUserHandler) promotionOrderStats(ctx context.Context, order *model.Order) error {
promotionOrder, err := l.svcCtx.AdminPromotionOrderModel.FindOneByOrderId(ctx, order.Id)
if err != nil && !errors.Is(err, model.ErrNotFound) {
return fmt.Errorf("获取推广订单失败: %+v", err)
}
if promotionOrder != nil {
err = l.svcCtx.AdminPromotionLinkStatsService.UpdatePaymentStats(ctx, promotionOrder.LinkId, float64(order.Amount))
if err != nil {
return fmt.Errorf("更新推广链接支付统计失败: %+v", err)
}
}
return nil
}
// desensitizeParams 对敏感数据进行脱敏处理
func (l *PaySuccessNotifyUserHandler) desensitizeParams(data []byte) ([]byte, error) {
// 解析JSON数据到map
var paramsMap map[string]interface{}
if err := json.Unmarshal(data, &paramsMap); err != nil {
return nil, fmt.Errorf("解析JSON数据失败: %v", err)
}
// 处理可能包含敏感信息的字段
for key, value := range paramsMap {
if strValue, ok := value.(string); ok {
// 根据字段名和内容判断并脱敏
if isNameField(key) && len(strValue) > 0 {
// 姓名脱敏
paramsMap[key] = maskName(strValue)
} else if isIDCardField(key) && len(strValue) > 10 {
// 身份证号脱敏
paramsMap[key] = maskIDCard(strValue)
} else if isPhoneField(key) && len(strValue) >= 8 {
// 手机号脱敏
paramsMap[key] = maskPhone(strValue)
} else if len(strValue) > 3 {
// 其他所有未匹配的字段都进行通用脱敏
paramsMap[key] = maskGeneral(strValue)
}
} else if mapValue, ok := value.(map[string]interface{}); ok {
// 递归处理嵌套的map
for subKey, subValue := range mapValue {
if subStrValue, ok := subValue.(string); ok {
if isNameField(subKey) && len(subStrValue) > 0 {
mapValue[subKey] = maskName(subStrValue)
} else if isIDCardField(subKey) && len(subStrValue) > 10 {
mapValue[subKey] = maskIDCard(subStrValue)
} else if isPhoneField(subKey) && len(subStrValue) >= 8 {
mapValue[subKey] = maskPhone(subStrValue)
} else if len(subStrValue) > 3 {
// 其他所有未匹配的字段都进行通用脱敏
mapValue[subKey] = maskGeneral(subStrValue)
}
}
}
}
}
// 将处理后的map重新序列化为JSON
return json.Marshal(paramsMap)
}
// 判断是否为姓名字段
func isNameField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "name") || strings.Contains(key, "姓名") ||
strings.Contains(key, "owner") || strings.Contains(key, "main")
}
// 判断是否为身份证字段
func isIDCardField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "idcard") || strings.Contains(key, "id_card") ||
strings.Contains(key, "身份证") || strings.Contains(key, "证件号")
}
// 判断是否为手机号字段
func isPhoneField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "phone") || strings.Contains(key, "mobile") ||
strings.Contains(key, "手机") || strings.Contains(key, "电话")
}
// 判断是否包含敏感数据模式
func containsSensitivePattern(value string) bool {
// 检查是否包含连续的数字或字母模式
numPattern := regexp.MustCompile(`\d{6,}`)
return numPattern.MatchString(value)
}
// 姓名脱敏
func maskName(name string) string {
// 将字符串转换为rune切片以正确处理中文字符
runes := []rune(name)
length := len(runes)
if length <= 1 {
return name
}
if length == 2 {
// 两个字:保留第一个字,第二个字用*替代
return string(runes[0]) + "*"
}
// 三个字及以上:保留首尾字,中间用*替代
first := string(runes[0])
last := string(runes[length-1])
mask := strings.Repeat("*", length-2)
return first + mask + last
}
// 身份证号脱敏
func maskIDCard(idCard string) string {
length := len(idCard)
if length <= 10 {
return idCard // 如果长度太短,可能不是身份证,不处理
}
// 保留前3位和后4位
return idCard[:3] + strings.Repeat("*", length-7) + idCard[length-4:]
}
// 手机号脱敏
func maskPhone(phone string) string {
length := len(phone)
if length < 8 {
return phone // 如果长度太短,可能不是手机号,不处理
}
// 保留前3位和后4位
return phone[:3] + strings.Repeat("*", length-7) + phone[length-4:]
}
// 通用敏感信息脱敏 - 根据字符串长度比例进行脱敏
func maskGeneral(value string) string {
length := len(value)
// 小于3个字符的不脱敏
if length <= 3 {
return value
}
// 根据字符串长度计算保留字符数
var prefixLen, suffixLen int
switch {
case length <= 6: // 短字符串
// 保留首尾各1个字符
prefixLen, suffixLen = 1, 1
case length <= 10: // 中等长度字符串
// 保留首部30%和尾部20%的字符
prefixLen = int(float64(length) * 0.3)
suffixLen = int(float64(length) * 0.2)
case length <= 20: // 较长字符串
// 保留首部25%和尾部15%的字符
prefixLen = int(float64(length) * 0.25)
suffixLen = int(float64(length) * 0.15)
default: // 非常长的字符串
// 保留首部20%和尾部10%的字符
prefixLen = int(float64(length) * 0.2)
suffixLen = int(float64(length) * 0.1)
}
// 确保至少有一个字符被保留
if prefixLen < 1 {
prefixLen = 1
}
if suffixLen < 1 {
suffixLen = 1
}
// 确保前缀和后缀总长不超过总长度的80%
if prefixLen+suffixLen > int(float64(length)*0.8) {
// 调整为总长度的80%
totalVisible := int(float64(length) * 0.8)
// 前缀占60%后缀占40%
prefixLen = int(float64(totalVisible) * 0.6)
suffixLen = totalVisible - prefixLen
}
// 创建脱敏后的字符串
prefix := value[:prefixLen]
suffix := value[length-suffixLen:]
masked := strings.Repeat("*", length-prefixLen-suffixLen)
return prefix + masked + suffix
}