fix 车辆维保记录

This commit is contained in:
2025-03-21 15:47:11 +08:00
parent 02a382f911
commit c702894f64
14 changed files with 664 additions and 143 deletions

View File

@@ -0,0 +1,184 @@
package queue
import (
"context"
"qnc-server/app/user/cmd/api/internal/svc"
"github.com/hibiken/asynq"
)
// CarMaintenanceQueryHandler 车辆维保记录查询任务处理器
type CarMaintenanceQueryHandler struct {
svcCtx *svc.ServiceContext
}
// NewCarMaintenanceQueryHandler 创建车辆维保记录查询处理器
func NewCarMaintenanceQueryHandler(svcCtx *svc.ServiceContext) *CarMaintenanceQueryHandler {
return &CarMaintenanceQueryHandler{
svcCtx: svcCtx,
}
}
// ProcessTask 处理车辆维保记录查询任务
func (h *CarMaintenanceQueryHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
// var payload types.MsgCarMaintenanceQueryPayload
// if err := json.Unmarshal(task.Payload(), &payload); err != nil {
// logx.Errorf("解析车辆维保记录查询任务参数失败: %v", err)
// return err
// }
// logx.Infof("处理车辆维保记录查询任务,订单号: %s, 重试次数: %d, 查询ID: %d",
// payload.OrderID, payload.RetryCount, payload.QueryID)
// // 调用apirequestService的ProcessCAR059Request方法
// request := map[string]interface{}{
// "order_id": payload.OrderID,
// }
// requestBytes, err := json.Marshal(request)
// if err != nil {
// logx.Errorf("车辆维保记录查询任务,编码请求参数失败: %v", err)
// return err
// }
// // 调用API处理
// result, err := h.svcCtx.ApiRequestService.ProcessCAR059Request(requestBytes)
// // 处理结果
// if err != nil {
// if result != nil {
// // 使用gjson解析返回结果中的状态码
// retcode := gjson.GetBytes(result.Data, "retcode").String()
// logx.Infof("车辆维保记录查询任务,订单号: %s, 状态码: %s", payload.OrderID, retcode)
// // 根据不同状态码进行不同处理
// switch retcode {
// case "warn_029": // 未查询到结果,请稍后重试
// logx.Infof("车辆维保记录查询任务,订单号: %s, 未查询到结果,需要重试", payload.OrderID)
// // 安排重试
// return h.scheduleRetryWithBackoff(payload.OrderID, payload.RetryCount, payload.QueryID)
// case "warn_013": // 查无记录
// logx.Infof("车辆维保记录查询任务,订单号: %s, 查无记录", payload.OrderID)
// // 更新query表为查无记录
// if payload.QueryID > 0 {
// err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "completed_no_data")
// if err != nil {
// logx.Errorf("更新query表失败(查无记录): %v, 查询ID: %d", err, payload.QueryID)
// }
// }
// }
// }
// logx.Infof("车辆维保记录查询任务,订单号: %s, 查询失败", payload.OrderID)
// // 更新query表为查询失败
// if payload.QueryID > 0 {
// err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "failed")
// if err != nil {
// logx.Errorf("更新query表失败(查询失败): %v, 查询ID: %d", err, payload.QueryID)
// }
// }
// return err
// }
// logx.Infof("车辆维保记录查询任务完成,订单号: %s, 结果数据长度: %d", payload.OrderID, len(result.Data))
// // 更新query表为查询成功
// if payload.QueryID > 0 {
// err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "completed")
// if err != nil {
// logx.Errorf("更新query表失败(查询成功): %v, 查询ID: %d", err, payload.QueryID)
// }
// }
return nil
}
// // scheduleRetryWithBackoff 安排指数退避重试任务
// func (h *CarMaintenanceQueryHandler) scheduleRetryWithBackoff(orderID string, retryCount int, queryID int64) error {
// // 最大重试次数限制
// maxRetries := 10
// if retryCount >= maxRetries {
// logx.Infof("车辆维保记录查询任务已达最大重试次数 %d停止重试订单号: %s", maxRetries, orderID)
// // 更新query表为查询超时
// if queryID > 0 {
// err := h.updateQueryErrorState(context.Background(), queryID, "查询超时,已达最大重试次数")
// if err != nil {
// logx.Errorf("更新query表超时状态失败: %v, 查询ID: %d", err, queryID)
// }
// }
// return nil
// }
// // 使用指数退避策略的异步任务
// err := h.svcCtx.AsynqService.SendCarMaintenanceQueryTaskWithBackoff(orderID, retryCount, queryID)
// if err != nil {
// logx.Errorf("安排指数退避重试任务失败: %v, 订单号: %s", err, orderID)
// return err
// }
// logx.Infof("已安排指数退避重试任务, 当前重试次数: %d, 订单号: %s, 查询ID: %d", retryCount, orderID, queryID)
// return nil
// }
// // updateQueryResult 更新query表的查询结果
// func (h *CarMaintenanceQueryHandler) updateQueryResult(ctx context.Context, queryID int64, resultData []byte, state string) error {
// // 先查询当前query记录
// query, err := h.svcCtx.QueryModel.FindOne(ctx, queryID)
// if err != nil {
// if err == model.ErrNotFound {
// return fmt.Errorf("查询记录不存在, ID: %d", queryID)
// }
// return err
// }
// // 更新查询结果
// query.QueryData = sql.NullString{
// String: string(resultData),
// Valid: true,
// }
// query.QueryState = state
// query.Version++
// // 更新记录
// err = h.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
// if err != nil {
// return fmt.Errorf("更新查询记录失败: %v", err)
// }
// logx.Infof("成功更新查询记录查询ID: %d, 状态: %s", queryID, state)
// return nil
// }
// // updateQueryErrorState 更新query表为错误状态
// func (h *CarMaintenanceQueryHandler) updateQueryErrorState(ctx context.Context, queryID int64, errorMsg string) error {
// // 先查询当前query记录
// query, err := h.svcCtx.QueryModel.FindOne(ctx, queryID)
// if err != nil {
// if err == model.ErrNotFound {
// return fmt.Errorf("查询记录不存在, ID: %d", queryID)
// }
// return err
// }
// // 构造错误结果
// errorResult := map[string]interface{}{
// "error": errorMsg,
// }
// resultBytes, _ := json.Marshal(errorResult)
// // 更新查询结果
// query.QueryData = sql.NullString{
// String: string(resultBytes),
// Valid: true,
// }
// query.QueryState = "failed"
// query.Version++
// // 更新记录
// err = h.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
// if err != nil {
// return fmt.Errorf("更新查询记录失败: %v", err)
// }
// logx.Infof("成功更新查询记录为失败状态查询ID: %d, 错误: %s", queryID, errorMsg)
// return nil
// }

View File

@@ -5,12 +5,13 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
"qnc-server/app/user/cmd/api/internal/svc"
"qnc-server/app/user/model"
"qnc-server/pkg/lzkit/crypto"
"qnc-server/pkg/lzkit/lzUtils"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type PaySuccessNotifyUserHandler struct {
@@ -53,46 +54,55 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.
logx.Errorf("处理任务失败,原因: %v", findQueryErr)
return asynq.SkipRetry
}
if query.QueryState != "pending" {
if query.QueryState != model.QueryStatePending {
err = fmt.Errorf("查询已处理: %d", query.Id)
logx.Errorf("处理任务失败,原因: %v", err)
return asynq.SkipRetry
}
query.QueryState = model.QueryStateProcessing
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
handleErrorErr := fmt.Errorf("更新查询状态失败订单ID: %d, 错误: %v", order.Id, updateQueryErr)
return l.handleError(ctx, handleErrorErr, order, query)
}
secretKey := l.svcCtx.Config.Encrypt.SecretKey
key, decodeErr := hex.DecodeString(secretKey)
if decodeErr != nil {
err = fmt.Errorf("获取AES密钥失败: %v", decodeErr)
return l.handleError(ctx, err, order, query)
handleErrorErr := fmt.Errorf("获取AES密钥失败: %v", decodeErr)
return l.handleError(ctx, handleErrorErr, order, query)
}
decryptData, aesdecryptErr := crypto.AesDecrypt(query.QueryParams, key)
if aesdecryptErr != nil {
aesdecryptErr = fmt.Errorf("解密响应信息失败: %v", aesdecryptErr)
return l.handleError(ctx, aesdecryptErr, order, query)
handleErrorErr := fmt.Errorf("解密响应信息失败: %v", aesdecryptErr)
return l.handleError(ctx, handleErrorErr, order, query)
}
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id)
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(ctx, decryptData, product.Id)
if err != nil {
return l.handleError(ctx, err, order, query)
handleErrorErr := fmt.Errorf("处理请求失败: %v", err)
return l.handleError(ctx, handleErrorErr, order, query)
}
// 加密返回响应
encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
if aesEncryptErr != nil {
err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
return l.handleError(ctx, aesEncryptErr, order, query)
handleErrorErr := fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
return l.handleError(ctx, handleErrorErr, order, query)
}
query.QueryData = lzUtils.StringToNullString(encryptData)
updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateErr != nil {
err = fmt.Errorf("保存响应数据失败: %v", updateErr)
return l.handleError(ctx, updateErr, order, query)
handleErrorErr := fmt.Errorf("保存响应数据失败: %v", updateErr)
return l.handleError(ctx, handleErrorErr, order, query)
}
query.QueryState = "success"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
query.QueryState = model.QueryStateSuccess
updateQueryErr = l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
return l.handleError(ctx, updateQueryErr, order, query)
handleErrorErr := fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
return l.handleError(ctx, handleErrorErr, order, query)
}
return nil
@@ -102,9 +112,9 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.
func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error {
logx.Errorf("处理任务失败,原因: %v", err)
if order.Status == "paid" && query.QueryState == "pending" {
if order.Status == "paid" && query.QueryState == model.QueryStatePending {
// 更新查询状态为失败
query.QueryState = "failed"
query.QueryState = model.QueryStateFailed
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
logx.Errorf("更新查询状态失败订单ID: %d, 错误: %v", order.Id, updateQueryErr)

View File

@@ -3,9 +3,10 @@ package queue
import (
"context"
"fmt"
"github.com/hibiken/asynq"
"qnc-server/app/user/cmd/api/internal/svc"
"qnc-server/app/user/cmd/api/internal/types"
"github.com/hibiken/asynq"
)
type CronJob struct {
@@ -34,6 +35,7 @@ func (l *CronJob) Register() *asynq.ServeMux {
mux := asynq.NewServeMux()
mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx))
mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx))
mux.Handle(types.MsgCarMaintenanceQuery, NewCarMaintenanceQueryHandler(l.svcCtx))
return mux
}