diff --git a/.gitignore b/.gitignore index 3f91ef1..b287a06 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,6 @@ data/* .vscode .vscode/ -/tmp/ +# 临时文件 +/tmp +/tmp/* diff --git a/app/user/cmd/api/internal/logic/pay/paymentlogic.go b/app/user/cmd/api/internal/logic/pay/paymentlogic.go index 4b5233f..e3d0b81 100644 --- a/app/user/cmd/api/internal/logic/pay/paymentlogic.go +++ b/app/user/cmd/api/internal/logic/pay/paymentlogic.go @@ -5,15 +5,16 @@ import ( "encoding/hex" "encoding/json" "fmt" - "github.com/pkg/errors" - "github.com/zeromicro/go-zero/core/logx" - "github.com/zeromicro/go-zero/core/stores/sqlx" "qnc-server/app/user/cmd/api/internal/svc" "qnc-server/app/user/cmd/api/internal/types" "qnc-server/app/user/model" "qnc-server/common/ctxdata" "qnc-server/common/xerr" "qnc-server/pkg/lzkit/crypto" + + "github.com/pkg/errors" + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/stores/sqlx" ) type PaymentLogic struct { diff --git a/app/user/cmd/api/internal/logic/query/querysingletestlogic.go b/app/user/cmd/api/internal/logic/query/querysingletestlogic.go index 75688d3..02e2a1c 100644 --- a/app/user/cmd/api/internal/logic/query/querysingletestlogic.go +++ b/app/user/cmd/api/internal/logic/query/querysingletestlogic.go @@ -28,21 +28,21 @@ func NewQuerySingleTestLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Q } func (l *QuerySingleTestLogic) QuerySingleTest(req *types.QuerySingleTestReq) (resp *types.QuerySingleTestResp, err error) { - _, err = l.svcCtx.FeatureModel.FindOneByApiId(l.ctx, req.Api) - if err != nil { - return nil, errors.Wrapf(xerr.NewErrCode(xerr.SERVER_COMMON_ERROR), "单查测试, 获取接口失败 : %d", err) - } + // _, err = l.svcCtx.FeatureModel.FindOneByApiId(l.ctx, req.Api) + // if err != nil { + // return nil, errors.Wrapf(xerr.NewErrCode(xerr.SERVER_COMMON_ERROR), "单查测试, 获取接口失败 : %d", err) + // } marshalParams, err := json.Marshal(req.Params) if err != nil { return nil, errors.Wrapf(xerr.NewErrCode(xerr.SERVER_COMMON_ERROR), "单查测试, 序列化参数失败 : %d", err) } - apiResp, err := l.svcCtx.ApiRequestService.PreprocessRequestApi(marshalParams, req.Api) + apiResp, err := l.svcCtx.ApiRequestService.PreprocessRequestApi(l.ctx, marshalParams, req.Api) if err != nil { return nil, errors.Wrapf(xerr.NewErrCode(xerr.SERVER_COMMON_ERROR), "单查测试, 获取接口失败 : %d", err) } var respData interface{} - err = json.Unmarshal(apiResp, &respData) + err = json.Unmarshal(apiResp.Data, &respData) if err != nil { return nil, errors.Wrapf(xerr.NewErrCode(xerr.SERVER_COMMON_ERROR), "单查测试, 反序列化接口失败 : %d", err) } diff --git a/app/user/cmd/api/internal/queue/carMaintenanceQuery.go b/app/user/cmd/api/internal/queue/carMaintenanceQuery.go new file mode 100644 index 0000000..8176005 --- /dev/null +++ b/app/user/cmd/api/internal/queue/carMaintenanceQuery.go @@ -0,0 +1,184 @@ +package queue + +import ( + "context" + "qnc-server/app/user/cmd/api/internal/svc" + + "github.com/hibiken/asynq" +) + +// CarMaintenanceQueryHandler 车辆维保记录查询任务处理器 +type CarMaintenanceQueryHandler struct { + svcCtx *svc.ServiceContext +} + +// NewCarMaintenanceQueryHandler 创建车辆维保记录查询处理器 +func NewCarMaintenanceQueryHandler(svcCtx *svc.ServiceContext) *CarMaintenanceQueryHandler { + return &CarMaintenanceQueryHandler{ + svcCtx: svcCtx, + } +} + +// ProcessTask 处理车辆维保记录查询任务 +func (h *CarMaintenanceQueryHandler) ProcessTask(ctx context.Context, task *asynq.Task) error { + // var payload types.MsgCarMaintenanceQueryPayload + // if err := json.Unmarshal(task.Payload(), &payload); err != nil { + // logx.Errorf("解析车辆维保记录查询任务参数失败: %v", err) + // return err + // } + + // logx.Infof("处理车辆维保记录查询任务,订单号: %s, 重试次数: %d, 查询ID: %d", + // payload.OrderID, payload.RetryCount, payload.QueryID) + + // // 调用apirequestService的ProcessCAR059Request方法 + // request := map[string]interface{}{ + // "order_id": payload.OrderID, + // } + // requestBytes, err := json.Marshal(request) + // if err != nil { + // logx.Errorf("车辆维保记录查询任务,编码请求参数失败: %v", err) + // return err + // } + + // // 调用API处理 + // result, err := h.svcCtx.ApiRequestService.ProcessCAR059Request(requestBytes) + + // // 处理结果 + // if err != nil { + // if result != nil { + // // 使用gjson解析返回结果中的状态码 + // retcode := gjson.GetBytes(result.Data, "retcode").String() + // logx.Infof("车辆维保记录查询任务,订单号: %s, 状态码: %s", payload.OrderID, retcode) + + // // 根据不同状态码进行不同处理 + // switch retcode { + // case "warn_029": // 未查询到结果,请稍后重试 + // logx.Infof("车辆维保记录查询任务,订单号: %s, 未查询到结果,需要重试", payload.OrderID) + + // // 安排重试 + // return h.scheduleRetryWithBackoff(payload.OrderID, payload.RetryCount, payload.QueryID) + + // case "warn_013": // 查无记录 + // logx.Infof("车辆维保记录查询任务,订单号: %s, 查无记录", payload.OrderID) + // // 更新query表为查无记录 + // if payload.QueryID > 0 { + // err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "completed_no_data") + // if err != nil { + // logx.Errorf("更新query表失败(查无记录): %v, 查询ID: %d", err, payload.QueryID) + // } + // } + // } + // } + // logx.Infof("车辆维保记录查询任务,订单号: %s, 查询失败", payload.OrderID) + // // 更新query表为查询失败 + // if payload.QueryID > 0 { + // err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "failed") + // if err != nil { + // logx.Errorf("更新query表失败(查询失败): %v, 查询ID: %d", err, payload.QueryID) + // } + // } + // return err + // } + + // logx.Infof("车辆维保记录查询任务完成,订单号: %s, 结果数据长度: %d", payload.OrderID, len(result.Data)) + // // 更新query表为查询成功 + // if payload.QueryID > 0 { + // err = h.updateQueryResult(ctx, payload.QueryID, result.Data, "completed") + // if err != nil { + // logx.Errorf("更新query表失败(查询成功): %v, 查询ID: %d", err, payload.QueryID) + // } + // } + + return nil +} + +// // scheduleRetryWithBackoff 安排指数退避重试任务 +// func (h *CarMaintenanceQueryHandler) scheduleRetryWithBackoff(orderID string, retryCount int, queryID int64) error { +// // 最大重试次数限制 +// maxRetries := 10 +// if retryCount >= maxRetries { +// logx.Infof("车辆维保记录查询任务已达最大重试次数 %d,停止重试,订单号: %s", maxRetries, orderID) +// // 更新query表为查询超时 +// if queryID > 0 { +// err := h.updateQueryErrorState(context.Background(), queryID, "查询超时,已达最大重试次数") +// if err != nil { +// logx.Errorf("更新query表超时状态失败: %v, 查询ID: %d", err, queryID) +// } +// } +// return nil +// } + +// // 使用指数退避策略的异步任务 +// err := h.svcCtx.AsynqService.SendCarMaintenanceQueryTaskWithBackoff(orderID, retryCount, queryID) +// if err != nil { +// logx.Errorf("安排指数退避重试任务失败: %v, 订单号: %s", err, orderID) +// return err +// } + +// logx.Infof("已安排指数退避重试任务, 当前重试次数: %d, 订单号: %s, 查询ID: %d", retryCount, orderID, queryID) +// return nil +// } + +// // updateQueryResult 更新query表的查询结果 +// func (h *CarMaintenanceQueryHandler) updateQueryResult(ctx context.Context, queryID int64, resultData []byte, state string) error { +// // 先查询当前query记录 +// query, err := h.svcCtx.QueryModel.FindOne(ctx, queryID) +// if err != nil { +// if err == model.ErrNotFound { +// return fmt.Errorf("查询记录不存在, ID: %d", queryID) +// } +// return err +// } + +// // 更新查询结果 +// query.QueryData = sql.NullString{ +// String: string(resultData), +// Valid: true, +// } +// query.QueryState = state +// query.Version++ + +// // 更新记录 +// err = h.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) +// if err != nil { +// return fmt.Errorf("更新查询记录失败: %v", err) +// } + +// logx.Infof("成功更新查询记录,查询ID: %d, 状态: %s", queryID, state) +// return nil +// } + +// // updateQueryErrorState 更新query表为错误状态 +// func (h *CarMaintenanceQueryHandler) updateQueryErrorState(ctx context.Context, queryID int64, errorMsg string) error { +// // 先查询当前query记录 +// query, err := h.svcCtx.QueryModel.FindOne(ctx, queryID) +// if err != nil { +// if err == model.ErrNotFound { +// return fmt.Errorf("查询记录不存在, ID: %d", queryID) +// } +// return err +// } + +// // 构造错误结果 +// errorResult := map[string]interface{}{ +// "error": errorMsg, +// } +// resultBytes, _ := json.Marshal(errorResult) + +// // 更新查询结果 +// query.QueryData = sql.NullString{ +// String: string(resultBytes), +// Valid: true, +// } +// query.QueryState = "failed" +// query.Version++ + +// // 更新记录 +// err = h.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) +// if err != nil { +// return fmt.Errorf("更新查询记录失败: %v", err) +// } + +// logx.Infof("成功更新查询记录为失败状态,查询ID: %d, 错误: %s", queryID, errorMsg) +// return nil +// } diff --git a/app/user/cmd/api/internal/queue/paySuccessNotify.go b/app/user/cmd/api/internal/queue/paySuccessNotify.go index 4995d79..ed20aec 100644 --- a/app/user/cmd/api/internal/queue/paySuccessNotify.go +++ b/app/user/cmd/api/internal/queue/paySuccessNotify.go @@ -5,12 +5,13 @@ import ( "encoding/hex" "encoding/json" "fmt" - "github.com/hibiken/asynq" - "github.com/zeromicro/go-zero/core/logx" "qnc-server/app/user/cmd/api/internal/svc" "qnc-server/app/user/model" "qnc-server/pkg/lzkit/crypto" "qnc-server/pkg/lzkit/lzUtils" + + "github.com/hibiken/asynq" + "github.com/zeromicro/go-zero/core/logx" ) type PaySuccessNotifyUserHandler struct { @@ -53,46 +54,55 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq. logx.Errorf("处理任务失败,原因: %v", findQueryErr) return asynq.SkipRetry } - if query.QueryState != "pending" { + if query.QueryState != model.QueryStatePending { err = fmt.Errorf("查询已处理: %d", query.Id) logx.Errorf("处理任务失败,原因: %v", err) return asynq.SkipRetry } + + query.QueryState = model.QueryStateProcessing + updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) + if updateQueryErr != nil { + handleErrorErr := fmt.Errorf("更新查询状态失败,订单ID: %d, 错误: %v", order.Id, updateQueryErr) + return l.handleError(ctx, handleErrorErr, order, query) + } + secretKey := l.svcCtx.Config.Encrypt.SecretKey key, decodeErr := hex.DecodeString(secretKey) if decodeErr != nil { - err = fmt.Errorf("获取AES密钥失败: %v", decodeErr) - return l.handleError(ctx, err, order, query) + handleErrorErr := fmt.Errorf("获取AES密钥失败: %v", decodeErr) + return l.handleError(ctx, handleErrorErr, order, query) } decryptData, aesdecryptErr := crypto.AesDecrypt(query.QueryParams, key) if aesdecryptErr != nil { - aesdecryptErr = fmt.Errorf("解密响应信息失败: %v", aesdecryptErr) - return l.handleError(ctx, aesdecryptErr, order, query) + handleErrorErr := fmt.Errorf("解密响应信息失败: %v", aesdecryptErr) + return l.handleError(ctx, handleErrorErr, order, query) } - combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id) + combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(ctx, decryptData, product.Id) if err != nil { - return l.handleError(ctx, err, order, query) + handleErrorErr := fmt.Errorf("处理请求失败: %v", err) + return l.handleError(ctx, handleErrorErr, order, query) } // 加密返回响应 encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key) if aesEncryptErr != nil { - err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr) - return l.handleError(ctx, aesEncryptErr, order, query) + handleErrorErr := fmt.Errorf("加密响应信息失败: %v", aesEncryptErr) + return l.handleError(ctx, handleErrorErr, order, query) } query.QueryData = lzUtils.StringToNullString(encryptData) updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) if updateErr != nil { - err = fmt.Errorf("保存响应数据失败: %v", updateErr) - return l.handleError(ctx, updateErr, order, query) + handleErrorErr := fmt.Errorf("保存响应数据失败: %v", updateErr) + return l.handleError(ctx, handleErrorErr, order, query) } - query.QueryState = "success" - updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) + query.QueryState = model.QueryStateSuccess + updateQueryErr = l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) if updateQueryErr != nil { - updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr) - return l.handleError(ctx, updateQueryErr, order, query) + handleErrorErr := fmt.Errorf("修改查询状态失败: %v", updateQueryErr) + return l.handleError(ctx, handleErrorErr, order, query) } return nil @@ -102,9 +112,9 @@ func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq. func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error { logx.Errorf("处理任务失败,原因: %v", err) - if order.Status == "paid" && query.QueryState == "pending" { + if order.Status == "paid" && query.QueryState == model.QueryStatePending { // 更新查询状态为失败 - query.QueryState = "failed" + query.QueryState = model.QueryStateFailed updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query) if updateQueryErr != nil { logx.Errorf("更新查询状态失败,订单ID: %d, 错误: %v", order.Id, updateQueryErr) diff --git a/app/user/cmd/api/internal/queue/routes.go b/app/user/cmd/api/internal/queue/routes.go index d22f89b..dd62a9b 100644 --- a/app/user/cmd/api/internal/queue/routes.go +++ b/app/user/cmd/api/internal/queue/routes.go @@ -3,9 +3,10 @@ package queue import ( "context" "fmt" - "github.com/hibiken/asynq" "qnc-server/app/user/cmd/api/internal/svc" "qnc-server/app/user/cmd/api/internal/types" + + "github.com/hibiken/asynq" ) type CronJob struct { @@ -34,6 +35,7 @@ func (l *CronJob) Register() *asynq.ServeMux { mux := asynq.NewServeMux() mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx)) mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx)) + mux.Handle(types.MsgCarMaintenanceQuery, NewCarMaintenanceQueryHandler(l.svcCtx)) return mux } diff --git a/app/user/cmd/api/internal/service/apirequestService.go b/app/user/cmd/api/internal/service/apirequestService.go index d401513..18bf3d2 100644 --- a/app/user/cmd/api/internal/service/apirequestService.go +++ b/app/user/cmd/api/internal/service/apirequestService.go @@ -47,16 +47,21 @@ func NewApiRequestService(c config.Config, westDexService *WestDexService, yusha type APIResponseData struct { ApiID string `json:"apiID"` Data json.RawMessage `json:"data"` // 这里用 RawMessage 来存储原始的 data + TaskID string `json:"task_id,omitempty"` + IsAsync bool `json:"is_async,omitempty"` Sort int64 `json:"sort"` Success bool `json:"success"` Timestamp string `json:"timestamp"` Error string `json:"error,omitempty"` } +type APIInternalResult struct { + IsAsync bool `json:"is_async,omitempty"` + TaskID string `json:"task_id,omitempty"` + Data json.RawMessage `json:"data"` +} // ProcessRequests 处理请求 -func (a *ApiRequestService) ProcessRequests(params []byte, productID int64) ([]byte, error) { - var ctx, cancel = context.WithCancel(context.Background()) - defer cancel() +func (a *ApiRequestService) ProcessRequests(ctx context.Context, params []byte, productID int64) ([]byte, error) { build := a.productFeatureModel.SelectBuilder().Where(squirrel.Eq{ "product_id": productID, }) @@ -66,11 +71,11 @@ func (a *ApiRequestService) ProcessRequests(params []byte, productID int64) ([]b } var featureIDs []int64 isImportantMap := make(map[int64]int64, len(productFeatureList)) - sortMap := make(map[int64]int64, len(productFeatureList)) // 新增 + sortMap := make(map[int64]int64, len(productFeatureList)) for _, pf := range productFeatureList { featureIDs = append(featureIDs, pf.FeatureId) isImportantMap[pf.FeatureId] = pf.IsImportant - sortMap[pf.FeatureId] = pf.Sort // 新增 + sortMap[pf.FeatureId] = pf.Sort } if len(featureIDs) == 0 { return nil, errors.New("featureIDs 是空的") @@ -92,13 +97,16 @@ func (a *ApiRequestService) ProcessRequests(params []byte, productID int64) ([]b retryNum = 5 ) + childCtx, cancel := context.WithCancel(ctx) + defer cancel() + for i, feature := range featureList { wg.Add(1) go func(i int, feature *model.Feature) { defer wg.Done() select { - case <-ctx.Done(): + case <-childCtx.Done(): return default: } @@ -109,15 +117,14 @@ func (a *ApiRequestService) ProcessRequests(params []byte, productID int64) ([]b } timestamp := time.Now().Format("2006-01-02 15:04:05") var ( - resp json.RawMessage + resp *APIInternalResult preprocessErr error ) - // 若 isImportantMap[feature.ID] == 1,则表示需要在出错时重试 isImportant := isImportantMap[feature.Id] == 1 tryCount := 0 for { tryCount++ - resp, preprocessErr = a.PreprocessRequestApi(params, feature.ApiId) + resp, preprocessErr = a.PreprocessRequestApi(childCtx, params, feature.ApiId) if preprocessErr == nil { break } @@ -130,7 +137,13 @@ func (a *ApiRequestService) ProcessRequests(params []byte, productID int64) ([]b if preprocessErr != nil { result.Timestamp = timestamp result.Error = preprocessErr.Error() - result.Data = resp + if resp != nil { + result.Data = resp.Data + if resp.IsAsync { + result.IsAsync = true + result.TaskID = resp.TaskID + } + } resultsCh <- result errorsCh <- fmt.Errorf("请求失败: %v", preprocessErr) atomic.AddInt32(&errorCount, 1) @@ -140,9 +153,13 @@ func (a *ApiRequestService) ProcessRequests(params []byte, productID int64) ([]b return } - result.Data = resp + result.Data = resp.Data result.Success = true result.Timestamp = timestamp + if resp.IsAsync { + result.IsAsync = true + result.TaskID = resp.TaskID + } resultsCh <- result }(i, feature) } @@ -152,7 +169,6 @@ func (a *ApiRequestService) ProcessRequests(params []byte, productID int64) ([]b close(resultsCh) close(errorsCh) }() - // 收集所有结果并合并 var responseData []APIResponseData for result := range resultsCh { responseData = append(responseData, result) @@ -174,7 +190,7 @@ func (a *ApiRequestService) ProcessRequests(params []byte, productID int64) ([]b } // ------------------------------------请求处理器-------------------------- -var requestProcessors = map[string]func(*ApiRequestService, []byte) ([]byte, error){ +var requestProcessors = map[string]func(*ApiRequestService, context.Context, []byte) (*APIInternalResult, error){ "G09SC02": (*ApiRequestService).ProcessG09SC02Request, "G27BJ05": (*ApiRequestService).ProcessG27BJ05Request, "G26BJ05": (*ApiRequestService).ProcessG26BJ05Request, @@ -197,6 +213,7 @@ var requestProcessors = map[string]func(*ApiRequestService, []byte) ([]byte, err "CAR061": (*ApiRequestService).ProcessCAR061Request, // 名下车辆 "CAR074": (*ApiRequestService).ProcessCAR074Request, // 车辆出险信息 "CAR058": (*ApiRequestService).ProcessCAR058Request, // 车辆维保记录 + "CAR059": (*ApiRequestService).ProcessCAR059Request, // 车辆维保记录结果查询 "CAR079": (*ApiRequestService).ProcessCAR079Request, // 车架号查车 "CAR066": (*ApiRequestService).ProcessCAR066Request, // 车辆过户次数 "CAR100": (*ApiRequestService).ProcessCAR100Request, // 车辆估值 @@ -221,15 +238,15 @@ var requestProcessors = map[string]func(*ApiRequestService, []byte) ([]byte, err } // PreprocessRequestApi 调用指定的请求处理函数 -func (a *ApiRequestService) PreprocessRequestApi(params []byte, apiID string) ([]byte, error) { +func (a *ApiRequestService) PreprocessRequestApi(ctx context.Context, params []byte, apiID string) (*APIInternalResult, error) { if processor, exists := requestProcessors[apiID]; exists { - return processor(a, params) // 调用 ApiRequestService 方法 + return processor(a, ctx, params) // 调用 ApiRequestService 方法 } return nil, errors.New("api请求, 未找到相应的处理程序") } -func (a *ApiRequestService) ProcessG09SC02Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG09SC02Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") @@ -255,13 +272,15 @@ func (a *ApiRequestService) ProcessG09SC02Request(params []byte) ([]byte, error) if err != nil { return nil, err } - return jsonResponse, nil + return &APIInternalResult{ + Data: jsonResponse, + }, nil } else { return nil, errors.New("查询为空") } } -func (a *ApiRequestService) ProcessG27BJ05Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG27BJ05Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") mobile := gjson.GetBytes(params, "mobile") @@ -311,10 +330,12 @@ func (a *ApiRequestService) ProcessG27BJ05Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("编码修改后的 data 失败: %v", err) } - return modifiedData, nil + return &APIInternalResult{ + Data: modifiedData, + }, nil } -func (a *ApiRequestService) ProcessG26BJ05Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG26BJ05Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") mobile := gjson.GetBytes(params, "mobile") @@ -364,10 +385,12 @@ func (a *ApiRequestService) ProcessG26BJ05Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("编码修改后的 data 失败: %v", err) } - return modifiedData, nil + return &APIInternalResult{ + Data: modifiedData, + }, nil } -func (a *ApiRequestService) ProcessG34BJ03Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG34BJ03Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") @@ -393,13 +416,15 @@ func (a *ApiRequestService) ProcessG34BJ03Request(params []byte) ([]byte, error) if err != nil { return nil, err } - return jsonResponse, nil + return &APIInternalResult{ + Data: jsonResponse, + }, nil } else { return nil, errors.New("查询为空") } } -func (a *ApiRequestService) ProcessG35SC01Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG35SC01Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") @@ -451,10 +476,12 @@ func (a *ApiRequestService) ProcessG35SC01Request(params []byte) ([]byte, error) return nil, fmt.Errorf("编码最终的 JSON 对象失败: %v", err) } - return finalDataBytes, nil + return &APIInternalResult{ + Data: finalDataBytes, + }, nil } -func (a *ApiRequestService) ProcessG28BJ05Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG28BJ05Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") mobile := gjson.GetBytes(params, "mobile") @@ -504,10 +531,12 @@ func (a *ApiRequestService) ProcessG28BJ05Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("编码修改后的 data 失败: %v", err) } - return modifiedData, nil + return &APIInternalResult{ + Data: modifiedData, + }, nil } -func (a *ApiRequestService) ProcessG05HZ01Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG05HZ01Request(ctx context.Context, params []byte) (*APIInternalResult, error) { idCard := gjson.GetBytes(params, "id_card") if !idCard.Exists() { @@ -535,14 +564,16 @@ func (a *ApiRequestService) ProcessG05HZ01Request(params []byte) ([]byte, error) return nil, fmt.Errorf("响应中缺少 data 字段") } // 返回 data 字段的内容 - return []byte(data.Raw), nil + return &APIInternalResult{ + Data: []byte(data.Raw), + }, nil } // code 不等于 "0000",返回错误 return nil, fmt.Errorf("响应code错误%s", code.String()) } -func (a *ApiRequestService) ProcessQ23SC01Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessQ23SC01Request(ctx context.Context, params []byte) (*APIInternalResult, error) { entName := gjson.GetBytes(params, "ent_name") entCode := gjson.GetBytes(params, "ent_code") @@ -596,10 +627,12 @@ func (a *ApiRequestService) ProcessQ23SC01Request(params []byte) ([]byte, error) if statusResult.Exists() || statusResult.Int() == -1 { return nil, fmt.Errorf("企业涉诉为空: %+v", finalDataBytes) } - return finalDataBytes, nil + return &APIInternalResult{ + Data: finalDataBytes, + }, nil } -func (a *ApiRequestService) ProcessG15BJ02Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG15BJ02Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") mobile := gjson.GetBytes(params, "mobile") @@ -626,13 +659,15 @@ func (a *ApiRequestService) ProcessG15BJ02Request(params []byte) ([]byte, error) code := dataResult.Int() // 处理允许的 code 值 if code == 1000 || code == 1003 || code == 1004 || code == 1005 { - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } return nil, fmt.Errorf("三要素核验失败: %+v", resp) } -func (a *ApiRequestService) ProcessG17BJ02Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG17BJ02Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") mobile := gjson.GetBytes(params, "mobile") @@ -657,13 +692,15 @@ func (a *ApiRequestService) ProcessG17BJ02Request(params []byte) ([]byte, error) code := dataResult.Int() // 处理允许的 code 值 if code == 1000 || code == 1001 { - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } return nil, fmt.Errorf("手机二要素核验失败: %+v", resp) } -func (a *ApiRequestService) ProcessG08SC02Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG08SC02Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") @@ -681,10 +718,12 @@ func (a *ApiRequestService) ProcessG08SC02Request(params []byte) ([]byte, error) if callApiErr != nil { return nil, callApiErr } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } -func (a *ApiRequestService) ProcessKZEYSRequest(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessKZEYSRequest(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") @@ -738,11 +777,13 @@ func (a *ApiRequestService) ProcessKZEYSRequest(params []byte) ([]byte, error) { } dataRaw := respData.Raw // 成功返回 - return []byte(dataRaw), nil + return &APIInternalResult{ + Data: []byte(dataRaw), + }, nil } // 人车核验 -func (a *ApiRequestService) ProcessP_C_B332Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessP_C_B332Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") carType := gjson.GetBytes(params, "car_type") carLicense := gjson.GetBytes(params, "car_license") @@ -759,11 +800,13 @@ func (a *ApiRequestService) ProcessP_C_B332Request(params []byte) ([]byte, error if err != nil { return nil, fmt.Errorf("人车核验查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 车辆出险信息 -func (a *ApiRequestService) ProcessCAR074Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessCAR074Request(ctx context.Context, params []byte) (*APIInternalResult, error) { vinCode := gjson.GetBytes(params, "vin_code") if !vinCode.Exists() { return nil, errors.New("api请求, CAR074, 获取相关参数失败") @@ -776,11 +819,13 @@ func (a *ApiRequestService) ProcessCAR074Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("车辆出险信息查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 车辆维保记录 -func (a *ApiRequestService) ProcessCAR058Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessCAR058Request(ctx context.Context, params []byte) (*APIInternalResult, error) { vinCode := gjson.GetBytes(params, "vin_code") carType := gjson.GetBytes(params, "car_driving_permit") if !vinCode.Exists() || !carType.Exists() { @@ -794,13 +839,131 @@ func (a *ApiRequestService) ProcessCAR058Request(params []byte) ([]byte, error) } resp, err := a.yushanService.request("CAR058", request) if err != nil { - return nil, fmt.Errorf("人车核验查询失败: %+v", err) + return nil, fmt.Errorf("车辆维保记录查询失败: %+v", err) } - return resp, nil + + orderID := gjson.GetBytes(resp, "orderId").String() + if orderID == "" { + return nil, fmt.Errorf("获取订单ID失败") + } + + // 如果需要查询结果(queryID > 0),直接调用ProcessCAR059Request + // 构建查询参数 + queryParams := map[string]interface{}{ + "order_id": orderID, + } + queryBytes, _ := json.Marshal(queryParams) + + // 直接调用ProcessCAR059Request进行查询,传递上级context + result, err := a.ProcessCAR059Request(ctx, queryBytes) + if err != nil { + return nil, err + } + + // 返回CAR058响应与CAR059结果的组合 + return &APIInternalResult{ + Data: result.Data, // 原始CAR058响应 + }, nil +} + +// 车辆维保记录结果查询 +func (a *ApiRequestService) ProcessCAR059Request(ctx context.Context, params []byte) (*APIInternalResult, error) { + orderID := gjson.GetBytes(params, "order_id") + if !orderID.Exists() { + return nil, errors.New("api请求, CAR059, 获取相关参数失败") + } + retryCount := 0 + if gjson.GetBytes(params, "retry_count").Exists() { + retryCount = int(gjson.GetBytes(params, "retry_count").Int()) + } + // 构建请求参数 + request := map[string]interface{}{ + "orderId": orderID.String(), + } + // 调用API + resp, err := a.yushanService.request("CAR059", request) + if err != nil { + if resp != nil { + // 使用gjson解析返回结果中的状态码 + retcode := gjson.GetBytes(resp, "retcode").String() + logx.Infof("车辆维保记录查询任务,订单号: %s, 状态码: %s, 重试次数: %d", + orderID.String(), retcode, retryCount) + // 根据不同状态码进行不同处理 + switch retcode { + case "warn_029": // 未查询到结果,请稍后重试 + logx.Infof("车辆维保记录查询任务,订单号: %s, 未查询到结果,需要重试", orderID.String()) + + // 检查重试次数 + maxRetries := 10 + if retryCount >= maxRetries { + fmt.Errorf("车辆维保记录查询任务已达最大重试次数 %d,停止重试,订单号: %s", + maxRetries, orderID.String()) + } + + // 计算延迟时间 + var delay time.Duration + if retryCount == 0 { + delay = 0 // 第一次无延迟 + } else { + // 指数退避策略 + baseDelay := 3 * time.Second + maxDelay := 1 * time.Hour + + delay = baseDelay + for i := 1; i < int(retryCount); i++ { + delay = delay * 2 + if delay > maxDelay { + delay = maxDelay + break + } + } + } + + // 检查ctx是否已经有超时 + deadline, hasDeadline := ctx.Deadline() + if hasDeadline { + timeRemaining := time.Until(deadline) + if timeRemaining <= delay { + // 如果剩余时间不足,使用剩余时间的90%作为超时 + delay = time.Duration(float64(timeRemaining) * 0.9) + logx.Infof("调整延迟时间以适应上下文超时,新延迟: %v", delay) + } + } + + // 等待指定延迟时间 + logx.Infof("安排延迟重试,订单号: %s, 延迟: %v, 重试次数: %d", + orderID.String(), delay, retryCount+1) + + select { + case <-time.After(delay): + // 延迟时间到,继续处理 + case <-ctx.Done(): + // 上下文被取消,返回错误 + return nil, ctx.Err() + } + + // 构建新的重试参数 + retryParams := map[string]interface{}{ + "order_id": orderID.String(), + "retry_count": retryCount + 1, + } + retryBytes, _ := json.Marshal(retryParams) + + // 递归调用自身进行重试 + return a.ProcessCAR059Request(ctx, retryBytes) + } + } + return nil, fmt.Errorf("车辆维保记录结果查询失败: %+v", err) + } + logx.Infof("车辆维保记录查询任务完成,订单号: %s, 结果数据长度: %d", + orderID.String(), len(resp)) + return &APIInternalResult{ + Data: resp, + }, nil } // 车架号查车 -func (a *ApiRequestService) ProcessCAR079Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessCAR079Request(ctx context.Context, params []byte) (*APIInternalResult, error) { vinCode := gjson.GetBytes(params, "vin_code") if !vinCode.Exists() { return nil, errors.New("api请求, CAR079, 获取相关参数失败") @@ -813,11 +976,13 @@ func (a *ApiRequestService) ProcessCAR079Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("车架号查车查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 车辆过户次数 -func (a *ApiRequestService) ProcessCAR066Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessCAR066Request(ctx context.Context, params []byte) (*APIInternalResult, error) { vinCode := gjson.GetBytes(params, "vin_code") if !vinCode.Exists() { return nil, errors.New("api请求, CAR066, 获取相关参数失败") @@ -830,11 +995,13 @@ func (a *ApiRequestService) ProcessCAR066Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("车辆过户次数查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 车辆估值 -func (a *ApiRequestService) ProcessCAR100Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessCAR100Request(ctx context.Context, params []byte) (*APIInternalResult, error) { vinCode := gjson.GetBytes(params, "vin_code") carLicense := gjson.GetBytes(params, "car_license") if !vinCode.Exists() || !carLicense.Exists() { @@ -850,11 +1017,13 @@ func (a *ApiRequestService) ProcessCAR100Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("车辆估值查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 银行卡黑名单 -func (a *ApiRequestService) ProcessFIN019Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessFIN019Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") mobile := gjson.GetBytes(params, "mobile") @@ -873,11 +1042,13 @@ func (a *ApiRequestService) ProcessFIN019Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("银行卡黑名单查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 名下车辆 -func (a *ApiRequestService) ProcessCAR061Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessCAR061Request(ctx context.Context, params []byte) (*APIInternalResult, error) { idCard := gjson.GetBytes(params, "id_card") if !idCard.Exists() { @@ -890,10 +1061,12 @@ func (a *ApiRequestService) ProcessCAR061Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("名下车辆查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } -func (a *ApiRequestService) ProcessG10SC02Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG10SC02Request(ctx context.Context, params []byte) (*APIInternalResult, error) { // 提取男方和女方信息 nameMan := gjson.GetBytes(params, "nameMan") idCardMan := gjson.GetBytes(params, "idCardMan") @@ -937,14 +1110,16 @@ func (a *ApiRequestService) ProcessG10SC02Request(params []byte) ([]byte, error) if err != nil { return nil, err } - return jsonResponse, nil + return &APIInternalResult{ + Data: jsonResponse, + }, nil } else { return nil, errors.New("查询为空") } } // 手机号码风险 -func (a *ApiRequestService) ProcessG03HZ01Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG03HZ01Request(ctx context.Context, params []byte) (*APIInternalResult, error) { mobile := gjson.GetBytes(params, "mobile") if !mobile.Exists() { @@ -969,11 +1144,13 @@ func (a *ApiRequestService) ProcessG03HZ01Request(params []byte) ([]byte, error) if !data.Exists() { return nil, fmt.Errorf("查询手机号码风险失败, %s", string(resp)) } - return []byte(data.Raw), nil + return &APIInternalResult{ + Data: []byte(data.Raw), + }, nil } // 手机在网时长 -func (a *ApiRequestService) ProcessG02BJ02Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG02BJ02Request(ctx context.Context, params []byte) (*APIInternalResult, error) { mobile := gjson.GetBytes(params, "mobile") if !mobile.Exists() { @@ -999,11 +1176,13 @@ func (a *ApiRequestService) ProcessG02BJ02Request(params []byte) ([]byte, error) if !data.Exists() { return nil, fmt.Errorf("查询手机在网时长失败, %s", string(resp)) } - return []byte(data.Raw), nil + return &APIInternalResult{ + Data: []byte(data.Raw), + }, nil } // 手机二次卡 -func (a *ApiRequestService) ProcessG19BJ02Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG19BJ02Request(ctx context.Context, params []byte) (*APIInternalResult, error) { mobile := gjson.GetBytes(params, "mobile") startDate := gjson.GetBytes(params, "startDate") if !mobile.Exists() { @@ -1029,11 +1208,13 @@ func (a *ApiRequestService) ProcessG19BJ02Request(params []byte) ([]byte, error) if !data.Exists() { return nil, fmt.Errorf("手机二次卡失败, %s", string(resp)) } - return []byte(data.Raw), nil + return &APIInternalResult{ + Data: []byte(data.Raw), + }, nil } // 银行卡四要素 -func (a *ApiRequestService) ProcessG20GZ01Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG20GZ01Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") mobile := gjson.GetBytes(params, "mobile") @@ -1077,11 +1258,13 @@ func (a *ApiRequestService) ProcessG20GZ01Request(params []byte) ([]byte, error) return nil, fmt.Errorf("重新编码 data 失败: %v", err) } - return resultBytes, nil + return &APIInternalResult{ + Data: resultBytes, + }, nil } // G37SC01 自然人失信信息 -func (a *ApiRequestService) ProcessG37SC01Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG37SC01Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") @@ -1111,11 +1294,13 @@ func (a *ApiRequestService) ProcessG37SC01Request(params []byte) ([]byte, error) if !sxbzxr.Exists() { return nil, fmt.Errorf("内层 sxbzxr 字段不存在") } - return []byte(sxbzxr.Raw), nil + return &APIInternalResult{ + Data: []byte(sxbzxr.Raw), + }, nil } // G36SC01 自然人限高信息 -func (a *ApiRequestService) ProcessG36SC01Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG36SC01Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") @@ -1145,11 +1330,13 @@ func (a *ApiRequestService) ProcessG36SC01Request(params []byte) ([]byte, error) if !xgbzxr.Exists() { return nil, fmt.Errorf("内层 xgbzxr 字段不存在") } - return []byte(xgbzxr.Raw), nil + return &APIInternalResult{ + Data: []byte(xgbzxr.Raw), + }, nil } // G22SC01 自然人司法模型 -func (a *ApiRequestService) ProcessG22SC01Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessG22SC01Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") @@ -1182,11 +1369,13 @@ func (a *ApiRequestService) ProcessG22SC01Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("序列化失败: %v", err) } - return marshal, nil + return &APIInternalResult{ + Data: marshal, + }, nil } // Q03SC01 企业涉诉信息 -func (a *ApiRequestService) ProcessQ03SC01Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessQ03SC01Request(ctx context.Context, params []byte) (*APIInternalResult, error) { entName := gjson.GetBytes(params, "ent_name") entCode := gjson.GetBytes(params, "ent_code") @@ -1219,11 +1408,13 @@ func (a *ApiRequestService) ProcessQ03SC01Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("序列化失败: %v", err) } - return marshal, nil + return &APIInternalResult{ + Data: marshal, + }, nil } // 出境限制查询 -func (a *ApiRequestService) ProcessCOM187Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessCOM187Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") if !name.Exists() { return nil, errors.New("api请求, COM187, 获取相关参数失败") @@ -1236,11 +1427,13 @@ func (a *ApiRequestService) ProcessCOM187Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("出境限制查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 手机月消费档次查询 -func (a *ApiRequestService) ProcessMOB035Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessMOB035Request(ctx context.Context, params []byte) (*APIInternalResult, error) { mobile := gjson.GetBytes(params, "mobile") if !mobile.Exists() { return nil, errors.New("api请求, MOB035, 获取相关参数失败") @@ -1253,11 +1446,13 @@ func (a *ApiRequestService) ProcessMOB035Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("手机月消费档次查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 学历信息验证 -func (a *ApiRequestService) ProcessPCB915Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessPCB915Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") certificateNumber := gjson.GetBytes(params, "certificate_number") @@ -1275,11 +1470,13 @@ func (a *ApiRequestService) ProcessPCB915Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("学历信息验证失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 反诈反赌风险核验 -func (a *ApiRequestService) ProcessRIS031Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessRIS031Request(ctx context.Context, params []byte) (*APIInternalResult, error) { keyword := gjson.GetBytes(params, "keyword") typeName := gjson.GetBytes(params, "type") @@ -1295,11 +1492,13 @@ func (a *ApiRequestService) ProcessRIS031Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("反诈反赌风险核验失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 手机号空号检测 -func (a *ApiRequestService) ProcessPCB601Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessPCB601Request(ctx context.Context, params []byte) (*APIInternalResult, error) { mobile := gjson.GetBytes(params, "mobile") if !mobile.Exists() { return nil, errors.New("api请求, PCB601, 获取相关参数失败") @@ -1312,11 +1511,13 @@ func (a *ApiRequestService) ProcessPCB601Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("手机号空号检测失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 银行卡归属地查询 -func (a *ApiRequestService) ProcessPCB148Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessPCB148Request(ctx context.Context, params []byte) (*APIInternalResult, error) { bankCard := gjson.GetBytes(params, "bank_card") if !bankCard.Exists() { return nil, errors.New("api请求, PCB148, 获取相关参数失败") @@ -1329,11 +1530,13 @@ func (a *ApiRequestService) ProcessPCB148Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("银行卡归属地查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 银行卡姓名二要素验证 -func (a *ApiRequestService) ProcessFIN011Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessFIN011Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") bankCard := gjson.GetBytes(params, "bank_card") @@ -1349,11 +1552,13 @@ func (a *ApiRequestService) ProcessFIN011Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("银行卡姓名二要素验证失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 银行卡号码二要素验证 -func (a *ApiRequestService) ProcessFIN020Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessFIN020Request(ctx context.Context, params []byte) (*APIInternalResult, error) { idCard := gjson.GetBytes(params, "id_card") bankCard := gjson.GetBytes(params, "bank_card") @@ -1369,11 +1574,13 @@ func (a *ApiRequestService) ProcessFIN020Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("银行卡号码二要素验证失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 银行卡三要素综合验证 -func (a *ApiRequestService) ProcessFIN018Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessFIN018Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") bankCard := gjson.GetBytes(params, "bank_card") @@ -1391,11 +1598,13 @@ func (a *ApiRequestService) ProcessFIN018Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("银行卡三要素综合验证失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 手机号码风险评估 -func (a *ApiRequestService) ProcessMOB032Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessMOB032Request(ctx context.Context, params []byte) (*APIInternalResult, error) { mobile := gjson.GetBytes(params, "mobile") if !mobile.Exists() { return nil, errors.New("api请求, MOB032, 获取相关参数失败") @@ -1408,11 +1617,13 @@ func (a *ApiRequestService) ProcessMOB032Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("手机号码风险评估失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 手机号贩毒反诈风险查询 -func (a *ApiRequestService) ProcessFIN032Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessFIN032Request(ctx context.Context, params []byte) (*APIInternalResult, error) { keyword := gjson.GetBytes(params, "keyword") typeName := gjson.GetBytes(params, "type") @@ -1428,11 +1639,13 @@ func (a *ApiRequestService) ProcessFIN032Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("手机号贩毒反诈风险查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 技能资格证书 -func (a *ApiRequestService) ProcessHRD004Request(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessHRD004Request(ctx context.Context, params []byte) (*APIInternalResult, error) { name := gjson.GetBytes(params, "name") idCard := gjson.GetBytes(params, "id_card") @@ -1448,11 +1661,13 @@ func (a *ApiRequestService) ProcessHRD004Request(params []byte) ([]byte, error) if err != nil { return nil, fmt.Errorf("HRD004请求失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 手机归属地查询 -func (a *ApiRequestService) ProcessMobilelocalRequest(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessMobilelocalRequest(ctx context.Context, params []byte) (*APIInternalResult, error) { mobile := gjson.GetBytes(params, "mobile") if !mobile.Exists() { return nil, errors.New("api请求, mobilelocal, 获取相关参数失败") @@ -1466,11 +1681,13 @@ func (a *ApiRequestService) ProcessMobilelocalRequest(params []byte) ([]byte, er if err != nil { return nil, fmt.Errorf("手机归属地查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } // 身份证归属地查询 -func (a *ApiRequestService) ProcessSfzRequest(params []byte) ([]byte, error) { +func (a *ApiRequestService) ProcessSfzRequest(ctx context.Context, params []byte) (*APIInternalResult, error) { idCard := gjson.GetBytes(params, "id_card") if !idCard.Exists() { return nil, errors.New("api请求, sfz, 获取相关参数失败") @@ -1484,5 +1701,7 @@ func (a *ApiRequestService) ProcessSfzRequest(params []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("身份证归属地查询失败: %+v", err) } - return resp, nil + return &APIInternalResult{ + Data: resp, + }, nil } diff --git a/app/user/cmd/api/internal/service/asynqService.go b/app/user/cmd/api/internal/service/asynqService.go index f95e5da..328a98b 100644 --- a/app/user/cmd/api/internal/service/asynqService.go +++ b/app/user/cmd/api/internal/service/asynqService.go @@ -4,10 +4,12 @@ package service import ( "encoding/json" - "github.com/hibiken/asynq" - "github.com/zeromicro/go-zero/core/logx" "qnc-server/app/user/cmd/api/internal/config" "qnc-server/app/user/cmd/api/internal/types" + "time" + + "github.com/hibiken/asynq" + "github.com/zeromicro/go-zero/core/logx" ) type AsynqService struct { @@ -29,6 +31,7 @@ func NewAsynqService(c config.Config) *AsynqService { func (s *AsynqService) Close() error { return s.client.Close() } + func (s *AsynqService) SendQueryTask(orderID int64) error { // 准备任务的 payload payload := types.MsgPaySuccessQueryPayload{ @@ -57,3 +60,73 @@ func (s *AsynqService) SendQueryTask(orderID int64) error { logx.Infof("发送异步任务成功,任务ID: %s, 队列: %s, 订单号: %d", info.ID, info.Queue, orderID) return nil } + +// SendCarMaintenanceQueryTaskWithBackoff 发送带有指数退避重试策略的车辆维保记录查询的异步任务 +func (s *AsynqService) SendCarMaintenanceQueryTaskWithBackoff(orderID string, retryCount int, queryID int64) error { + // 准备任务的 payload + payload := types.MsgCarMaintenanceQueryPayload{ + OrderID: orderID, + RetryCount: retryCount + 1, // 增加重试次数 + QueryID: queryID, // 关联的query表记录ID + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + logx.Errorf("发送重试车辆维保记录查询任务失败 (无法编码 payload): %v, 订单号: %s", err, orderID) + return err + } + + // 确定延迟时间 + var delay time.Duration + + if retryCount == 0 { + // 第一次执行,立即进行 + delay = 0 + } else { + // 非首次执行,使用指数退避策略 + // 基础延迟为3秒,每次重试后延迟时间翻倍,最长不超过1小时 + baseDelay := 3 * time.Second + maxDelay := 1 * time.Hour + + delay = baseDelay + for i := 1; i < retryCount; i++ { + delay = delay * 2 + if delay > maxDelay { + delay = maxDelay + break + } + } + } + + // 保存延迟时间到payload + payload.DelayDuration = int64(delay) + payloadBytes, _ = json.Marshal(payload) + + options := []asynq.Option{ + asynq.MaxRetry(0), // 使用我们自己的重试逻辑,不使用asynq的自动重试 + } + + // 如果有延迟,添加延迟选项 + if delay > 0 { + options = append(options, asynq.ProcessIn(delay)) + } + + // 创建任务 + task := asynq.NewTask(types.MsgCarMaintenanceQuery, payloadBytes, options...) + + // 将任务加入队列并获取任务信息 + info, err := s.client.Enqueue(task) + if err != nil { + logx.Errorf("发送重试车辆维保记录查询任务失败 (加入队列失败): %+v, 订单号: %s", err, orderID) + return err + } + + // 记录成功日志,带上任务 ID 和队列信息 + if delay == 0 { + logx.Infof("发送车辆维保记录查询任务成功,立即执行,任务ID: %s, 队列: %s, 订单号: %s, 查询ID: %d", + info.ID, info.Queue, orderID, queryID) + } else { + logx.Infof("发送重试车辆维保记录查询任务成功,任务ID: %s, 队列: %s, 延迟: %v, 重试次数: %d, 订单号: %s, 查询ID: %d", + info.ID, info.Queue, delay, retryCount+1, orderID, queryID) + } + return nil +} diff --git a/app/user/cmd/api/internal/service/tianjuService.go b/app/user/cmd/api/internal/service/tianjuService.go index ece5d1e..9993fcf 100644 --- a/app/user/cmd/api/internal/service/tianjuService.go +++ b/app/user/cmd/api/internal/service/tianjuService.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net/http" + "net/url" "qnc-server/app/user/cmd/api/internal/config" "strings" @@ -38,14 +39,27 @@ func (t *TianjuService) Request(apiPath string, params map[string]interface{}) ( // 构建完整的URL,假设BaseURL已包含https://前缀 fullURL := fmt.Sprintf("%s/%s/index", strings.TrimRight(t.config.BaseURL, "/"), apiPath) - // 将请求数据转换为JSON - messageBytes, err := json.Marshal(reqParams) - if err != nil { - return nil, fmt.Errorf("参数序列化失败: %w", err) + // 构建表单数据 + formData := url.Values{} + for k, v := range reqParams { + // 将不同类型的值转换为字符串 + switch val := v.(type) { + case string: + formData.Add(k, val) + case int, int64, float64: + formData.Add(k, fmt.Sprintf("%v", val)) + default: + // 对于复杂类型,转为JSON字符串 + jsonBytes, err := json.Marshal(val) + if err != nil { + return nil, fmt.Errorf("参数值序列化失败: %w", err) + } + formData.Add(k, string(jsonBytes)) + } } - // 发起HTTP请求 - resp, err := http.Post(fullURL, "application/json", strings.NewReader(string(messageBytes))) + // 发起HTTP请求 - 使用表单数据 + resp, err := http.PostForm(fullURL, formData) if err != nil { return nil, fmt.Errorf("发送请求失败: %w", err) } diff --git a/app/user/cmd/api/internal/service/yushanService.go b/app/user/cmd/api/internal/service/yushanService.go index fc4f23f..8ecd3b2 100644 --- a/app/user/cmd/api/internal/service/yushanService.go +++ b/app/user/cmd/api/internal/service/yushanService.go @@ -98,16 +98,16 @@ func (y *YushanService) request(prodID string, params map[string]interface{}) ([ if retCode == "100000" { // retcode 为 100000,表示查询为空 - return nil, fmt.Errorf("羽山请求查空: %s", string(respData)) + return respData, fmt.Errorf("羽山请求查空: %s", string(respData)) } else if retCode == "000000" || retCode == "000001" || retCode == "000002" || retCode == "000003" { // retcode 为 000000,表示有数据,返回 retdata retData := gjson.GetBytes(respData, "retdata") if !retData.Exists() { - return nil, fmt.Errorf("羽山请求retdata为空: %s", string(respData)) + return respData, fmt.Errorf("羽山请求retdata为空: %s", string(respData)) } return []byte(retData.Raw), nil } else { - return nil, fmt.Errorf("羽山请求未知的状态码: %s", string(respData)) + return respData, fmt.Errorf("羽山请求未知的状态码: %s", string(respData)) } } diff --git a/app/user/cmd/api/internal/types/payload.go b/app/user/cmd/api/internal/types/payload.go index 672918f..2a06be8 100644 --- a/app/user/cmd/api/internal/types/payload.go +++ b/app/user/cmd/api/internal/types/payload.go @@ -3,3 +3,10 @@ package types type MsgPaySuccessQueryPayload struct { OrderID int64 `json:"order_id"` } + +type MsgCarMaintenanceQueryPayload struct { + OrderID string `json:"order_id"` + RetryCount int `json:"retry_count"` + DelayDuration int64 `json:"delay_duration"` // 延迟时间(纳秒) + QueryID int64 `json:"query_id"` // 关联的query表记录ID +} diff --git a/app/user/cmd/api/internal/types/taskname.go b/app/user/cmd/api/internal/types/taskname.go index 33329ee..1a2bc9f 100644 --- a/app/user/cmd/api/internal/types/taskname.go +++ b/app/user/cmd/api/internal/types/taskname.go @@ -2,3 +2,4 @@ package types const MsgPaySuccessQuery = "msg:pay_success:query" const MsgCleanQueryData = "msg:clean_query_data" +const MsgCarMaintenanceQuery = "msg:car:maintenance:query" diff --git a/app/user/model/vars.go b/app/user/model/vars.go index 4723f1b..4c6dfcb 100644 --- a/app/user/model/vars.go +++ b/app/user/model/vars.go @@ -2,6 +2,7 @@ package model import ( "errors" + "github.com/zeromicro/go-zero/core/stores/sqlx" ) @@ -13,3 +14,10 @@ var UserAuthTypeAppWechat string = "app_wechat" //微信小程序 var UserAuthTypeH5Mobile string = "h5_mobile" var UserAuthTypeWxMini string = "wx_mini" var UserAuthTypeWxOfficialAccount string = "wx_official_account" + +const ( + QueryStatePending = "pending" + QueryStateFailed = "failed" + QueryStateSuccess = "success" + QueryStateProcessing = "processing" +) diff --git a/tmp/main.exe b/tmp/main.exe index 4857176..e8c0025 100644 Binary files a/tmp/main.exe and b/tmp/main.exe differ