2025-03-21 15:47:11 +08:00
|
|
|
|
package queue
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2025-04-09 15:58:06 +08:00
|
|
|
|
"tyc-server/app/user/cmd/api/internal/svc"
|
2025-03-21 15:47:11 +08:00
|
|
|
|
|
|
|
|
|
"github.com/hibiken/asynq"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// CarMaintenanceQueryHandler 车辆维保记录查询任务处理器
|
|
|
|
|
type CarMaintenanceQueryHandler struct {
|
|
|
|
|
svcCtx *svc.ServiceContext
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewCarMaintenanceQueryHandler 创建车辆维保记录查询处理器
|
|
|
|
|
func NewCarMaintenanceQueryHandler(svcCtx *svc.ServiceContext) *CarMaintenanceQueryHandler {
|
|
|
|
|
return &CarMaintenanceQueryHandler{
|
|
|
|
|
svcCtx: svcCtx,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ProcessTask 处理车辆维保记录查询任务
|
|
|
|
|
func (h *CarMaintenanceQueryHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
|
|
|
|
|
// var payload types.MsgCarMaintenanceQueryPayload
|
|
|
|
|
// if err := json.Unmarshal(task.Payload(), &payload); err != nil {
|
|
|
|
|
// logx.Errorf("解析车辆维保记录查询任务参数失败: %v", err)
|
|
|
|
|
// return err
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// logx.Infof("处理车辆维保记录查询任务,订单号: %s, 重试次数: %d, 查询ID: %d",
|
|
|
|
|
// payload.OrderID, payload.RetryCount, payload.QueryID)
|
|
|
|
|
|
|
|
|
|
// // 调用apirequestService的ProcessCAR059Request方法
|
|
|
|
|
// request := map[string]interface{}{
|
|
|
|
|
// "order_id": payload.OrderID,
|
|
|
|
|
// }
|
|
|
|
|
// requestBytes, err := json.Marshal(request)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// logx.Errorf("车辆维保记录查询任务,编码请求参数失败: %v", err)
|
|
|
|
|
// return err
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// // 调用API处理
|
|
|
|
|
// result, err := h.svcCtx.ApiRequestService.ProcessCAR059Request(requestBytes)
|
|
|
|
|
|
|
|
|
|
// // 处理结果
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// if result != nil {
|
|
|
|
|
// // 使用gjson解析返回结果中的状态码
|
|
|
|
|
// retcode := gjson.GetBytes(result.Data, "retcode").String()
|
|
|
|
|
// logx.Infof("车辆维保记录查询任务,订单号: %s, 状态码: %s", payload.OrderID, retcode)
|
|
|
|
|
|
|
|
|
|
// // 根据不同状态码进行不同处理
|
|
|
|
|
// switch retcode {
|
|
|
|
|
// case "warn_029": // 未查询到结果,请稍后重试
|
|
|
|
|
// logx.Infof("车辆维保记录查询任务,订单号: %s, 未查询到结果,需要重试", payload.OrderID)
|
|
|
|
|
|
|
|
|
|
// // 安排重试
|
|
|
|
|
// return h.scheduleRetryWithBackoff(payload.OrderID, payload.RetryCount, payload.QueryID)
|
|
|
|
|
|
|
|
|
|
// case "warn_013": // 查无记录
|
|
|
|
|
// logx.Infof("车辆维保记录查询任务,订单号: %s, 查无记录", payload.OrderID)
|
|
|
|
|
// // 更新query表为查无记录
|
|
|
|
|
// if payload.QueryID > 0 {
|
|
|
|
|
// err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "completed_no_data")
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// logx.Errorf("更新query表失败(查无记录): %v, 查询ID: %d", err, payload.QueryID)
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// logx.Infof("车辆维保记录查询任务,订单号: %s, 查询失败", payload.OrderID)
|
|
|
|
|
// // 更新query表为查询失败
|
|
|
|
|
// if payload.QueryID > 0 {
|
|
|
|
|
// err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "failed")
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// logx.Errorf("更新query表失败(查询失败): %v, 查询ID: %d", err, payload.QueryID)
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// return err
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// logx.Infof("车辆维保记录查询任务完成,订单号: %s, 结果数据长度: %d", payload.OrderID, len(result.Data))
|
|
|
|
|
// // 更新query表为查询成功
|
|
|
|
|
// if payload.QueryID > 0 {
|
|
|
|
|
// err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "completed")
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// logx.Errorf("更新query表失败(查询成功): %v, 查询ID: %d", err, payload.QueryID)
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// // scheduleRetryWithBackoff 安排指数退避重试任务
|
|
|
|
|
// func (h *CarMaintenanceQueryHandler) scheduleRetryWithBackoff(orderID string, retryCount int, queryID int64) error {
|
|
|
|
|
// // 最大重试次数限制
|
|
|
|
|
// maxRetries := 10
|
|
|
|
|
// if retryCount >= maxRetries {
|
|
|
|
|
// logx.Infof("车辆维保记录查询任务已达最大重试次数 %d,停止重试,订单号: %s", maxRetries, orderID)
|
|
|
|
|
// // 更新query表为查询超时
|
|
|
|
|
// if queryID > 0 {
|
|
|
|
|
// err := h.updateQueryErrorState(context.Background(), queryID, "查询超时,已达最大重试次数")
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// logx.Errorf("更新query表超时状态失败: %v, 查询ID: %d", err, queryID)
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// return nil
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// // 使用指数退避策略的异步任务
|
|
|
|
|
// err := h.svcCtx.AsynqService.SendCarMaintenanceQueryTaskWithBackoff(orderID, retryCount, queryID)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// logx.Errorf("安排指数退避重试任务失败: %v, 订单号: %s", err, orderID)
|
|
|
|
|
// return err
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// logx.Infof("已安排指数退避重试任务, 当前重试次数: %d, 订单号: %s, 查询ID: %d", retryCount, orderID, queryID)
|
|
|
|
|
// return nil
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// // updateQueryResult 更新query表的查询结果
|
|
|
|
|
// func (h *CarMaintenanceQueryHandler) updateQueryResult(ctx context.Context, queryID int64, resultData []byte, state string) error {
|
|
|
|
|
// // 先查询当前query记录
|
|
|
|
|
// query, err := h.svcCtx.QueryModel.FindOne(ctx, queryID)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// if err == model.ErrNotFound {
|
|
|
|
|
// return fmt.Errorf("查询记录不存在, ID: %d", queryID)
|
|
|
|
|
// }
|
|
|
|
|
// return err
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// // 更新查询结果
|
|
|
|
|
// query.QueryData = sql.NullString{
|
|
|
|
|
// String: string(resultData),
|
|
|
|
|
// Valid: true,
|
|
|
|
|
// }
|
|
|
|
|
// query.QueryState = state
|
|
|
|
|
// query.Version++
|
|
|
|
|
|
|
|
|
|
// // 更新记录
|
|
|
|
|
// err = h.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// return fmt.Errorf("更新查询记录失败: %v", err)
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// logx.Infof("成功更新查询记录,查询ID: %d, 状态: %s", queryID, state)
|
|
|
|
|
// return nil
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// // updateQueryErrorState 更新query表为错误状态
|
|
|
|
|
// func (h *CarMaintenanceQueryHandler) updateQueryErrorState(ctx context.Context, queryID int64, errorMsg string) error {
|
|
|
|
|
// // 先查询当前query记录
|
|
|
|
|
// query, err := h.svcCtx.QueryModel.FindOne(ctx, queryID)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// if err == model.ErrNotFound {
|
|
|
|
|
// return fmt.Errorf("查询记录不存在, ID: %d", queryID)
|
|
|
|
|
// }
|
|
|
|
|
// return err
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// // 构造错误结果
|
|
|
|
|
// errorResult := map[string]interface{}{
|
|
|
|
|
// "error": errorMsg,
|
|
|
|
|
// }
|
|
|
|
|
// resultBytes, _ := json.Marshal(errorResult)
|
|
|
|
|
|
|
|
|
|
// // 更新查询结果
|
|
|
|
|
// query.QueryData = sql.NullString{
|
|
|
|
|
// String: string(resultBytes),
|
|
|
|
|
// Valid: true,
|
|
|
|
|
// }
|
|
|
|
|
// query.QueryState = "failed"
|
|
|
|
|
// query.Version++
|
|
|
|
|
|
|
|
|
|
// // 更新记录
|
|
|
|
|
// err = h.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// return fmt.Errorf("更新查询记录失败: %v", err)
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// logx.Infof("成功更新查询记录为失败状态,查询ID: %d, 错误: %s", queryID, errorMsg)
|
|
|
|
|
// return nil
|
|
|
|
|
// }
|