package handlers import ( "context" "encoding/json" "time" "github.com/hibiken/asynq" "github.com/shopspring/decimal" "go.uber.org/zap" "tyapi-server/internal/application/api" finance_services "tyapi-server/internal/domains/finance/services" product_services "tyapi-server/internal/domains/product/services" "tyapi-server/internal/infrastructure/task/entities" "tyapi-server/internal/infrastructure/task/repositories" "tyapi-server/internal/infrastructure/task/types" ) // ApiTaskHandler API任务处理器 type ApiTaskHandler struct { logger *zap.Logger apiApplicationService api.ApiApplicationService walletService finance_services.WalletAggregateService subscriptionService *product_services.ProductSubscriptionService asyncTaskRepo repositories.AsyncTaskRepository } // NewApiTaskHandler 创建API任务处理器 func NewApiTaskHandler( logger *zap.Logger, apiApplicationService api.ApiApplicationService, walletService finance_services.WalletAggregateService, subscriptionService *product_services.ProductSubscriptionService, asyncTaskRepo repositories.AsyncTaskRepository, ) *ApiTaskHandler { return &ApiTaskHandler{ logger: logger, apiApplicationService: apiApplicationService, walletService: walletService, subscriptionService: subscriptionService, asyncTaskRepo: asyncTaskRepo, } } // HandleApiCall 处理API调用任务 func (h *ApiTaskHandler) HandleApiCall(ctx context.Context, t *asynq.Task) error { h.logger.Info("开始处理API调用任务") var payload types.ApiCallPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { h.logger.Error("解析API调用任务载荷失败", zap.Error(err)) h.updateTaskStatus(ctx, t, "failed", "解析任务载荷失败") return err } h.logger.Info("处理API调用任务", zap.String("api_call_id", payload.ApiCallID), zap.String("user_id", payload.UserID), zap.String("product_id", payload.ProductID)) // 这里实现API调用的具体逻辑 // 例如:记录API调用、更新使用统计等 // 更新任务状态为成功 h.updateTaskStatus(ctx, t, "completed", "") h.logger.Info("API调用任务处理完成", zap.String("api_call_id", payload.ApiCallID)) return nil } // HandleDeduction 处理扣款任务 func (h *ApiTaskHandler) HandleDeduction(ctx context.Context, t *asynq.Task) error { h.logger.Info("开始处理扣款任务") var payload types.DeductionPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { h.logger.Error("解析扣款任务载荷失败", zap.Error(err)) h.updateTaskStatus(ctx, t, "failed", "解析任务载荷失败") return err } h.logger.Info("处理扣款任务", zap.String("user_id", payload.UserID), zap.String("amount", payload.Amount), zap.String("transaction_id", payload.TransactionID)) // 调用钱包服务进行扣款 if h.walletService != nil { amount, err := decimal.NewFromString(payload.Amount) if err != nil { h.logger.Error("金额格式错误", zap.Error(err)) h.updateTaskStatus(ctx, t, "failed", "金额格式错误") return err } if err := h.walletService.Deduct(ctx, payload.UserID, amount, payload.ApiCallID, payload.TransactionID, payload.ProductID); err != nil { h.logger.Error("扣款处理失败", zap.Error(err)) h.updateTaskStatus(ctx, t, "failed", "扣款处理失败: "+err.Error()) return err } } else { h.logger.Warn("钱包服务未初始化,跳过扣款", zap.String("user_id", payload.UserID)) h.updateTaskStatus(ctx, t, "failed", "钱包服务未初始化") return nil } // 更新任务状态为成功 h.updateTaskStatus(ctx, t, "completed", "") h.logger.Info("扣款任务处理完成", zap.String("transaction_id", payload.TransactionID)) return nil } // HandleCompensation 处理补偿任务 func (h *ApiTaskHandler) HandleCompensation(ctx context.Context, t *asynq.Task) error { h.logger.Info("开始处理补偿任务") var payload types.CompensationPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { h.logger.Error("解析补偿任务载荷失败", zap.Error(err)) return err } h.logger.Info("处理补偿任务", zap.String("transaction_id", payload.TransactionID), zap.String("type", payload.Type)) // 这里实现补偿的具体逻辑 // 例如:调用钱包服务进行退款等 h.logger.Info("补偿任务处理完成", zap.String("transaction_id", payload.TransactionID)) return nil } // HandleUsageStats 处理使用统计任务 func (h *ApiTaskHandler) HandleUsageStats(ctx context.Context, t *asynq.Task) error { h.logger.Info("开始处理使用统计任务") var payload types.UsageStatsPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { h.logger.Error("解析使用统计任务载荷失败", zap.Error(err)) h.updateTaskStatus(ctx, t, "failed", "解析任务载荷失败") return err } h.logger.Info("处理使用统计任务", zap.String("subscription_id", payload.SubscriptionID), zap.String("user_id", payload.UserID), zap.Int("increment", payload.Increment)) // 调用订阅服务更新使用统计 if h.subscriptionService != nil { if err := h.subscriptionService.IncrementSubscriptionAPIUsage(ctx, payload.SubscriptionID, int64(payload.Increment)); err != nil { h.logger.Error("更新使用统计失败", zap.Error(err)) h.updateTaskStatus(ctx, t, "failed", "更新使用统计失败: "+err.Error()) return err } } else { h.logger.Warn("订阅服务未初始化,跳过使用统计更新", zap.String("subscription_id", payload.SubscriptionID)) h.updateTaskStatus(ctx, t, "failed", "订阅服务未初始化") return nil } // 更新任务状态为成功 h.updateTaskStatus(ctx, t, "completed", "") h.logger.Info("使用统计任务处理完成", zap.String("subscription_id", payload.SubscriptionID)) return nil } // HandleApiLog 处理API日志任务 func (h *ApiTaskHandler) HandleApiLog(ctx context.Context, t *asynq.Task) error { h.logger.Info("开始处理API日志任务") var payload types.ApiLogPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { h.logger.Error("解析API日志任务载荷失败", zap.Error(err)) h.updateTaskStatus(ctx, t, "failed", "解析任务载荷失败") return err } h.logger.Info("处理API日志任务", zap.String("transaction_id", payload.TransactionID), zap.String("user_id", payload.UserID), zap.String("api_name", payload.ApiName), zap.String("product_id", payload.ProductID)) // 记录结构化日志 h.logger.Info("API调用日志", zap.String("transaction_id", payload.TransactionID), zap.String("user_id", payload.UserID), zap.String("api_name", payload.ApiName), zap.String("product_id", payload.ProductID), zap.Time("timestamp", time.Now())) // 这里可以添加其他日志记录逻辑 // 例如:写入专门的日志文件、发送到日志系统、写入数据库等 // 更新任务状态为成功 h.updateTaskStatus(ctx, t, "completed", "") h.logger.Info("API日志任务处理完成", zap.String("transaction_id", payload.TransactionID)) return nil } // updateTaskStatus 更新任务状态 func (h *ApiTaskHandler) updateTaskStatus(ctx context.Context, t *asynq.Task, status string, errorMsg string) { // 从任务载荷中提取任务ID var payload map[string]interface{} if err := json.Unmarshal(t.Payload(), &payload); err != nil { h.logger.Error("解析任务载荷失败,无法更新状态", zap.Error(err)) return } // 尝试从payload中获取任务ID taskID, ok := payload["task_id"].(string) if !ok { h.logger.Error("无法从任务载荷中获取任务ID") return } // 根据状态决定更新方式 if status == "failed" { // 失败时:需要检查是否达到最大重试次数 h.handleTaskFailure(ctx, taskID, errorMsg) } else if status == "completed" { // 成功时:清除错误信息并更新状态 if err := h.asyncTaskRepo.UpdateStatusWithSuccess(ctx, taskID, entities.TaskStatus(status)); err != nil { h.logger.Error("更新任务状态失败", zap.String("task_id", taskID), zap.String("status", status), zap.Error(err)) } } else { // 其他状态:只更新状态 if err := h.asyncTaskRepo.UpdateStatus(ctx, taskID, entities.TaskStatus(status)); err != nil { h.logger.Error("更新任务状态失败", zap.String("task_id", taskID), zap.String("status", status), zap.Error(err)) } } h.logger.Info("任务状态已更新", zap.String("task_id", taskID), zap.String("status", status), zap.String("error_msg", errorMsg)) } // handleTaskFailure 处理任务失败 func (h *ApiTaskHandler) handleTaskFailure(ctx context.Context, taskID string, errorMsg string) { // 获取当前任务信息 task, err := h.asyncTaskRepo.GetByID(ctx, taskID) if err != nil { h.logger.Error("获取任务信息失败", zap.String("task_id", taskID), zap.Error(err)) return } // 增加重试次数 newRetryCount := task.RetryCount + 1 // 检查是否达到最大重试次数 if newRetryCount >= task.MaxRetries { // 达到最大重试次数,标记为最终失败 if err := h.asyncTaskRepo.UpdateStatusWithRetryAndError(ctx, taskID, entities.TaskStatusFailed, errorMsg); err != nil { h.logger.Error("更新任务状态失败", zap.String("task_id", taskID), zap.String("status", "failed"), zap.Error(err)) } h.logger.Info("任务最终失败,已达到最大重试次数", zap.String("task_id", taskID), zap.Int("retry_count", newRetryCount), zap.Int("max_retries", task.MaxRetries)) } else { // 未达到最大重试次数,保持pending状态,记录错误信息 if err := h.asyncTaskRepo.UpdateRetryCountAndError(ctx, taskID, newRetryCount, errorMsg); err != nil { h.logger.Error("更新任务重试次数失败", zap.String("task_id", taskID), zap.Int("retry_count", newRetryCount), zap.Error(err)) } h.logger.Info("任务失败,准备重试", zap.String("task_id", taskID), zap.Int("retry_count", newRetryCount), zap.Int("max_retries", task.MaxRetries)) } }