180 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			180 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package queue
 | ||
| 
 | ||
| import (
 | ||
| 	"context"
 | ||
| 	"encoding/hex"
 | ||
| 	"encoding/json"
 | ||
| 	"fmt"
 | ||
| 	"github.com/hibiken/asynq"
 | ||
| 	"github.com/zeromicro/go-zero/core/logx"
 | ||
| 	"qnc-server/app/user/cmd/api/internal/svc"
 | ||
| 	"qnc-server/app/user/cmd/api/internal/types"
 | ||
| 	"qnc-server/app/user/model"
 | ||
| 	"qnc-server/pkg/lzkit/crypto"
 | ||
| 	"qnc-server/pkg/lzkit/lzUtils"
 | ||
| )
 | ||
| 
 | ||
| type PaySuccessNotifyUserHandler struct {
 | ||
| 	svcCtx *svc.ServiceContext
 | ||
| }
 | ||
| 
 | ||
| func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotifyUserHandler {
 | ||
| 	return &PaySuccessNotifyUserHandler{
 | ||
| 		svcCtx: svcCtx,
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| var payload struct {
 | ||
| 	OrderID int64 `json:"order_id"`
 | ||
| }
 | ||
| 
 | ||
| func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
 | ||
| 	// 从任务的负载中解码数据
 | ||
| 	if err := json.Unmarshal(t.Payload(), &payload); err != nil {
 | ||
| 		return fmt.Errorf("解析任务负载失败: %w", err)
 | ||
| 	}
 | ||
| 
 | ||
| 	order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
 | ||
| 	if err != nil {
 | ||
| 		return fmt.Errorf("无效的订单ID:  %d, %v", payload.OrderID, err)
 | ||
| 	}
 | ||
| 	if order.Status != "paid" {
 | ||
| 		err = fmt.Errorf("无效的订单: %d", payload.OrderID)
 | ||
| 		logx.Errorf("处理任务失败,原因: %v", err)
 | ||
| 		return asynq.SkipRetry
 | ||
| 	}
 | ||
| 	product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId)
 | ||
| 	if err != nil {
 | ||
| 		return fmt.Errorf("找不到相关产品: orderID: %d, productID: %d", payload.OrderID, order.ProductId)
 | ||
| 	}
 | ||
| 
 | ||
| 	query, findQueryErr := l.svcCtx.QueryModel.FindOneByOrderId(ctx, order.Id)
 | ||
| 	if findQueryErr != nil {
 | ||
| 		findQueryErr = fmt.Errorf("获取任务请求参数失败: %v", findQueryErr)
 | ||
| 		logx.Errorf("处理任务失败,原因: %v", findQueryErr)
 | ||
| 		return asynq.SkipRetry
 | ||
| 	}
 | ||
| 	if query.QueryState != "pending" {
 | ||
| 		err = fmt.Errorf("查询已处理: %d", query.Id)
 | ||
| 		logx.Errorf("处理任务失败,原因: %v", err)
 | ||
| 		return asynq.SkipRetry
 | ||
| 	}
 | ||
| 	secretKey := l.svcCtx.Config.Encrypt.SecretKey
 | ||
| 	key, decodeErr := hex.DecodeString(secretKey)
 | ||
| 	if decodeErr != nil {
 | ||
| 		err = fmt.Errorf("获取AES密钥失败: %v", decodeErr)
 | ||
| 		return l.handleError(ctx, err, order, query)
 | ||
| 	}
 | ||
| 
 | ||
| 	decryptData, aesdecryptErr := crypto.AesDecrypt(query.QueryParams, key)
 | ||
| 	if aesdecryptErr != nil {
 | ||
| 		aesdecryptErr = fmt.Errorf("加密响应信息失败: %v", aesdecryptErr)
 | ||
| 		return l.handleError(ctx, aesdecryptErr, order, query)
 | ||
| 	}
 | ||
| 	requests, exists := types.WestDexParams[product.ProductEn]
 | ||
| 	if !exists {
 | ||
| 		err = fmt.Errorf("未找到有效的参数配置: productEn: %s", product.ProductEn)
 | ||
| 		return l.handleError(ctx, err, order, query)
 | ||
| 	}
 | ||
| 	// 根据产品类型选择结构体类型
 | ||
| 	var requestData interface{}
 | ||
| 	switch product.ProductEn {
 | ||
| 	case "marriage":
 | ||
| 		requestData = &types.MarriageReq{}
 | ||
| 	case "homeservice":
 | ||
| 		requestData = &types.HomeServiceReq{}
 | ||
| 	case "riskassessment":
 | ||
| 		requestData = &types.RiskAssessmentReq{}
 | ||
| 	case "companyinfo":
 | ||
| 		requestData = &types.CompanyInfoReq{}
 | ||
| 	case "rentalinfo":
 | ||
| 		requestData = &types.RentalInfoReq{}
 | ||
| 	case "preloanbackgroundcheck":
 | ||
| 		requestData = &types.PreLoanBackgroundCheckReq{}
 | ||
| 	case "backgroundcheck":
 | ||
| 		requestData = &types.BackgroundCheckReq{}
 | ||
| 	default:
 | ||
| 		err = fmt.Errorf("未支持的产品类型: productEn: %s", product.ProductEn)
 | ||
| 		return l.handleError(ctx, err, order, query)
 | ||
| 	}
 | ||
| 	unmarshalErr := json.Unmarshal(decryptData, &requestData)
 | ||
| 	if unmarshalErr != nil {
 | ||
| 		unmarshalErr = fmt.Errorf("解析参数失败: %v", unmarshalErr)
 | ||
| 		return l.handleError(ctx, unmarshalErr, order, query)
 | ||
| 	}
 | ||
| 
 | ||
| 	combinedResponse, err := l.svcCtx.WestDexService.ProcessRequests(requestData, requests)
 | ||
| 	if err != nil {
 | ||
| 		return l.handleError(ctx, err, order, query)
 | ||
| 	}
 | ||
| 	// 加密返回响应
 | ||
| 	encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
 | ||
| 	if aesEncryptErr != nil {
 | ||
| 		err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
 | ||
| 		return l.handleError(ctx, aesEncryptErr, order, query)
 | ||
| 	}
 | ||
| 	query.QueryData = lzUtils.StringToNullString(encryptData)
 | ||
| 	updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
 | ||
| 	if updateErr != nil {
 | ||
| 		err = fmt.Errorf("保存响应数据失败: %v", updateErr)
 | ||
| 		return l.handleError(ctx, updateErr, order, query)
 | ||
| 	}
 | ||
| 
 | ||
| 	query.QueryState = "success"
 | ||
| 	updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
 | ||
| 	if updateQueryErr != nil {
 | ||
| 		updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
 | ||
| 		return l.handleError(ctx, updateQueryErr, order, query)
 | ||
| 	}
 | ||
| 
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // 定义一个中间件函数
 | ||
| func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error {
 | ||
| 	logx.Errorf("处理任务失败,原因: %v", err)
 | ||
| 
 | ||
| 	if order.Status == "paid" && query.QueryState == "pending" {
 | ||
| 		// 更新查询状态为失败
 | ||
| 		query.QueryState = "failed"
 | ||
| 		updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
 | ||
| 		if updateQueryErr != nil {
 | ||
| 			logx.Errorf("更新查询状态失败,订单ID: %d, 错误: %v", order.Id, updateQueryErr)
 | ||
| 			return asynq.SkipRetry
 | ||
| 		}
 | ||
| 
 | ||
| 		// 退款
 | ||
| 		if order.PaymentPlatform == "wechat" {
 | ||
| 			refundErr := l.svcCtx.WechatPayService.WeChatRefund(ctx, order.OrderNo, order.Amount, order.Amount)
 | ||
| 			if refundErr != nil {
 | ||
| 				logx.Error(refundErr)
 | ||
| 				return asynq.SkipRetry
 | ||
| 			}
 | ||
| 		} else {
 | ||
| 			refund, refundErr := l.svcCtx.AlipayService.AliRefund(ctx, order.OrderNo, order.Amount)
 | ||
| 			if refundErr != nil {
 | ||
| 				logx.Error(refundErr)
 | ||
| 				return asynq.SkipRetry
 | ||
| 			}
 | ||
| 			if refund.IsSuccess() {
 | ||
| 				logx.Errorf("支付宝退款成功, orderID: %d", order.Id)
 | ||
| 				// 更新订单状态为退款
 | ||
| 				order.Status = "refunded"
 | ||
| 				updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order)
 | ||
| 				if updateOrderErr != nil {
 | ||
| 					logx.Errorf("更新订单状态失败,订单ID: %d, 错误: %v", order.Id, updateOrderErr)
 | ||
| 					return fmt.Errorf("更新订单状态失败: %v", updateOrderErr)
 | ||
| 				}
 | ||
| 				return asynq.SkipRetry
 | ||
| 			} else {
 | ||
| 				logx.Errorf("支付宝退款失败:%v", refundErr)
 | ||
| 				return asynq.SkipRetry
 | ||
| 			}
 | ||
| 			// 直接成功
 | ||
| 		}
 | ||
| 
 | ||
| 	}
 | ||
| 
 | ||
| 	return asynq.SkipRetry
 | ||
| }
 |