package queue import ( "context" "qnc-server/app/user/cmd/api/internal/svc" "github.com/hibiken/asynq" ) // CarMaintenanceQueryHandler 车辆维保记录查询任务处理器 type CarMaintenanceQueryHandler struct { svcCtx *svc.ServiceContext } // NewCarMaintenanceQueryHandler 创建车辆维保记录查询处理器 func NewCarMaintenanceQueryHandler(svcCtx *svc.ServiceContext) *CarMaintenanceQueryHandler { return &CarMaintenanceQueryHandler{ svcCtx: svcCtx, } } // ProcessTask 处理车辆维保记录查询任务 func (h *CarMaintenanceQueryHandler) ProcessTask(ctx context.Context, task *asynq.Task) error { // var payload types.MsgCarMaintenanceQueryPayload // if err := json.Unmarshal(task.Payload(), &payload); err != nil { // logx.Errorf("解析车辆维保记录查询任务参数失败: %v", err) // return err // } // logx.Infof("处理车辆维保记录查询任务,订单号: %s, 重试次数: %d, 查询ID: %d", // payload.OrderID, payload.RetryCount, payload.QueryID) // // 调用apirequestService的ProcessCAR059Request方法 // request := map[string]interface{}{ // "order_id": payload.OrderID, // } // requestBytes, err := json.Marshal(request) // if err != nil { // logx.Errorf("车辆维保记录查询任务,编码请求参数失败: %v", err) // return err // } // // 调用API处理 // result, err := h.svcCtx.ApiRequestService.ProcessCAR059Request(requestBytes) // // 处理结果 // if err != nil { // if result != nil { // // 使用gjson解析返回结果中的状态码 // retcode := gjson.GetBytes(result.Data, "retcode").String() // logx.Infof("车辆维保记录查询任务,订单号: %s, 状态码: %s", payload.OrderID, retcode) // // 根据不同状态码进行不同处理 // switch retcode { // case "warn_029": // 未查询到结果,请稍后重试 // logx.Infof("车辆维保记录查询任务,订单号: %s, 未查询到结果,需要重试", payload.OrderID) // // 安排重试 // return h.scheduleRetryWithBackoff(payload.OrderID, payload.RetryCount, payload.QueryID) // case "warn_013": // 查无记录 // logx.Infof("车辆维保记录查询任务,订单号: %s, 查无记录", payload.OrderID) // // 更新query表为查无记录 // if payload.QueryID > 0 { // err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "completed_no_data") // if err != nil { // logx.Errorf("更新query表失败(查无记录): %v, 查询ID: %d", err, payload.QueryID) // } // } // } // } // logx.Infof("车辆维保记录查询任务,订单号: %s, 查询失败", payload.OrderID) // // 更新query表为查询失败 // if payload.QueryID > 0 { // err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "failed") // if err != nil { // logx.Errorf("更新query表失败(查询失败): %v, 查询ID: %d", err, payload.QueryID) // } // } // return err // } // logx.Infof("车辆维保记录查询任务完成,订单号: %s, 结果数据长度: %d", payload.OrderID, len(result.Data)) // // 更新query表为查询成功 // if payload.QueryID > 0 { // err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "completed") // if err != nil { // logx.Errorf("更新query表失败(查询成功): %v, 查询ID: %d", err, payload.QueryID) // } // } return nil } // // scheduleRetryWithBackoff 安排指数退避重试任务 // func (h *CarMaintenanceQueryHandler) scheduleRetryWithBackoff(orderID string, retryCount int, queryID int64) error { // // 最大重试次数限制 // maxRetries := 10 // if retryCount >= maxRetries { // logx.Infof("车辆维保记录查询任务已达最大重试次数 %d,停止重试,订单号: %s", maxRetries, orderID) // // 更新query表为查询超时 // if queryID > 0 { // err := h.updateQueryErrorState(context.Background(), queryID, "查询超时,已达最大重试次数") // if err != nil { // logx.Errorf("更新query表超时状态失败: %v, 查询ID: %d", err, queryID) // } // } // return nil // } // // 使用指数退避策略的异步任务 // err := h.svcCtx.AsynqService.SendCarMaintenanceQueryTaskWithBackoff(orderID, retryCount, queryID) // if err != nil { // logx.Errorf("安排指数退避重试任务失败: %v, 订单号: %s", err, orderID) // return err // } // logx.Infof("已安排指数退避重试任务, 当前重试次数: %d, 订单号: %s, 查询ID: %d", retryCount, orderID, queryID) // return nil // } // // updateQueryResult 更新query表的查询结果 // func (h *CarMaintenanceQueryHandler) updateQueryResult(ctx context.Context, queryID int64, resultData []byte, state string) error { // // 先查询当前query记录 // query, err := h.svcCtx.QueryModel.FindOne(ctx, queryID) // if err != nil { // if err == model.ErrNotFound { // return fmt.Errorf("查询记录不存在, ID: %d", queryID) // } // return err // } // // 更新查询结果 // query.QueryData = sql.NullString{ // String: string(resultData), // Valid: true, // } // query.QueryState = state // query.Version++ // // 更新记录 // err = h.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) // if err != nil { // return fmt.Errorf("更新查询记录失败: %v", err) // } // logx.Infof("成功更新查询记录,查询ID: %d, 状态: %s", queryID, state) // return nil // } // // updateQueryErrorState 更新query表为错误状态 // func (h *CarMaintenanceQueryHandler) updateQueryErrorState(ctx context.Context, queryID int64, errorMsg string) error { // // 先查询当前query记录 // query, err := h.svcCtx.QueryModel.FindOne(ctx, queryID) // if err != nil { // if err == model.ErrNotFound { // return fmt.Errorf("查询记录不存在, ID: %d", queryID) // } // return err // } // // 构造错误结果 // errorResult := map[string]interface{}{ // "error": errorMsg, // } // resultBytes, _ := json.Marshal(errorResult) // // 更新查询结果 // query.QueryData = sql.NullString{ // String: string(resultBytes), // Valid: true, // } // query.QueryState = "failed" // query.Version++ // // 更新记录 // err = h.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) // if err != nil { // return fmt.Errorf("更新查询记录失败: %v", err) // } // logx.Infof("成功更新查询记录为失败状态,查询ID: %d, 错误: %s", queryID, errorMsg) // return nil // }