158 lines
5.2 KiB
Go
158 lines
5.2 KiB
Go
package queue
|
||
|
||
import (
|
||
"context"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"fmt"
|
||
"tyc-server/app/user/cmd/api/internal/svc"
|
||
"tyc-server/app/user/model"
|
||
"tyc-server/pkg/lzkit/crypto"
|
||
"tyc-server/pkg/lzkit/lzUtils"
|
||
|
||
"github.com/hibiken/asynq"
|
||
"github.com/zeromicro/go-zero/core/logx"
|
||
)
|
||
|
||
type PaySuccessNotifyUserHandler struct {
|
||
svcCtx *svc.ServiceContext
|
||
}
|
||
|
||
func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotifyUserHandler {
|
||
return &PaySuccessNotifyUserHandler{
|
||
svcCtx: svcCtx,
|
||
}
|
||
}
|
||
|
||
var payload struct {
|
||
OrderID int64 `json:"order_id"`
|
||
}
|
||
|
||
func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||
// 从任务的负载中解码数据
|
||
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
|
||
return fmt.Errorf("解析任务负载失败: %w", err)
|
||
}
|
||
|
||
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
|
||
if err != nil {
|
||
return fmt.Errorf("无效的订单ID: %d, %v", payload.OrderID, err)
|
||
}
|
||
if order.Status != "paid" {
|
||
err = fmt.Errorf("无效的订单: %d", payload.OrderID)
|
||
logx.Errorf("处理任务失败,原因: %v", err)
|
||
return asynq.SkipRetry
|
||
}
|
||
product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId)
|
||
if err != nil {
|
||
return fmt.Errorf("找不到相关产品: orderID: %d, productID: %d", payload.OrderID, order.ProductId)
|
||
}
|
||
|
||
query, findQueryErr := l.svcCtx.QueryModel.FindOneByOrderId(ctx, order.Id)
|
||
if findQueryErr != nil {
|
||
findQueryErr = fmt.Errorf("获取任务请求参数失败: %v", findQueryErr)
|
||
logx.Errorf("处理任务失败,原因: %v", findQueryErr)
|
||
return asynq.SkipRetry
|
||
}
|
||
if query.QueryState != model.QueryStatePending {
|
||
err = fmt.Errorf("查询已处理: %d", query.Id)
|
||
logx.Errorf("处理任务失败,原因: %v", err)
|
||
return asynq.SkipRetry
|
||
}
|
||
|
||
query.QueryState = model.QueryStateProcessing
|
||
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
|
||
if updateQueryErr != nil {
|
||
handleErrorErr := fmt.Errorf("更新查询状态失败,订单ID: %d, 错误: %v", order.Id, updateQueryErr)
|
||
return l.handleError(ctx, handleErrorErr, order, query)
|
||
}
|
||
|
||
secretKey := l.svcCtx.Config.Encrypt.SecretKey
|
||
key, decodeErr := hex.DecodeString(secretKey)
|
||
if decodeErr != nil {
|
||
handleErrorErr := fmt.Errorf("获取AES密钥失败: %v", decodeErr)
|
||
return l.handleError(ctx, handleErrorErr, order, query)
|
||
}
|
||
|
||
decryptData, aesdecryptErr := crypto.AesDecrypt(query.QueryParams, key)
|
||
if aesdecryptErr != nil {
|
||
handleErrorErr := fmt.Errorf("解密响应信息失败: %v", aesdecryptErr)
|
||
return l.handleError(ctx, handleErrorErr, order, query)
|
||
}
|
||
|
||
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(ctx, decryptData, product.Id)
|
||
if err != nil {
|
||
handleErrorErr := fmt.Errorf("处理请求失败: %v", err)
|
||
return l.handleError(ctx, handleErrorErr, order, query)
|
||
}
|
||
// 加密返回响应
|
||
encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
|
||
if aesEncryptErr != nil {
|
||
handleErrorErr := fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
|
||
return l.handleError(ctx, handleErrorErr, order, query)
|
||
}
|
||
query.QueryData = lzUtils.StringToNullString(encryptData)
|
||
updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
|
||
if updateErr != nil {
|
||
handleErrorErr := fmt.Errorf("保存响应数据失败: %v", updateErr)
|
||
return l.handleError(ctx, handleErrorErr, order, query)
|
||
}
|
||
|
||
query.QueryState = model.QueryStateSuccess
|
||
updateQueryErr = l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
|
||
if updateQueryErr != nil {
|
||
handleErrorErr := fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
|
||
return l.handleError(ctx, handleErrorErr, order, query)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// 定义一个中间件函数
|
||
func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error {
|
||
logx.Errorf("处理任务失败,原因: %v", err)
|
||
|
||
if order.Status == "paid" && query.QueryState == model.QueryStateProcessing {
|
||
// 更新查询状态为失败
|
||
query.QueryState = model.QueryStateFailed
|
||
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
|
||
if updateQueryErr != nil {
|
||
logx.Errorf("更新查询状态失败,订单ID: %d, 错误: %v", order.Id, updateQueryErr)
|
||
return asynq.SkipRetry
|
||
}
|
||
|
||
// 退款
|
||
if order.PaymentPlatform == "wechat" {
|
||
refundErr := l.svcCtx.WechatPayService.WeChatRefund(ctx, order.OrderNo, order.Amount, order.Amount)
|
||
if refundErr != nil {
|
||
logx.Error(refundErr)
|
||
return asynq.SkipRetry
|
||
}
|
||
} else {
|
||
refund, refundErr := l.svcCtx.AlipayService.AliRefund(ctx, order.OrderNo, order.Amount)
|
||
if refundErr != nil {
|
||
logx.Error(refundErr)
|
||
return asynq.SkipRetry
|
||
}
|
||
if refund.IsSuccess() {
|
||
logx.Errorf("支付宝退款成功, orderID: %d", order.Id)
|
||
// 更新订单状态为退款
|
||
order.Status = "refunded"
|
||
updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order)
|
||
if updateOrderErr != nil {
|
||
logx.Errorf("更新订单状态失败,订单ID: %d, 错误: %v", order.Id, updateOrderErr)
|
||
return fmt.Errorf("更新订单状态失败: %v", updateOrderErr)
|
||
}
|
||
return asynq.SkipRetry
|
||
} else {
|
||
logx.Errorf("支付宝退款失败:%v", refundErr)
|
||
return asynq.SkipRetry
|
||
}
|
||
// 直接成功
|
||
}
|
||
|
||
}
|
||
|
||
return asynq.SkipRetry
|
||
}
|