package api import ( "context" "encoding/json" "errors" "fmt" "time" "tyapi-server/internal/application/api/commands" "tyapi-server/internal/application/api/dto" "tyapi-server/internal/application/api/utils" "tyapi-server/internal/config" entities "tyapi-server/internal/domains/api/entities" "tyapi-server/internal/domains/api/repositories" "tyapi-server/internal/domains/api/services" "tyapi-server/internal/domains/api/services/processors" finance_services "tyapi-server/internal/domains/finance/services" product_entities "tyapi-server/internal/domains/product/entities" product_services "tyapi-server/internal/domains/product/services" user_repositories "tyapi-server/internal/domains/user/repositories" task_entities "tyapi-server/internal/infrastructure/task/entities" "tyapi-server/internal/infrastructure/task/interfaces" "tyapi-server/internal/shared/crypto" "tyapi-server/internal/shared/database" "tyapi-server/internal/shared/export" shared_interfaces "tyapi-server/internal/shared/interfaces" "github.com/shopspring/decimal" "go.uber.org/zap" ) type ApiApplicationService interface { CallApi(ctx context.Context, cmd *commands.ApiCallCommand) (string, string, error) // 获取用户API密钥 GetUserApiKeys(ctx context.Context, userID string) (*dto.ApiKeysResponse, error) // 用户白名单管理 GetUserWhiteList(ctx context.Context, userID string) (*dto.WhiteListListResponse, error) AddWhiteListIP(ctx context.Context, userID string, ipAddress string) error DeleteWhiteListIP(ctx context.Context, userID string, ipAddress string) error // 获取用户API调用记录 GetUserApiCalls(ctx context.Context, userID string, filters map[string]interface{}, options shared_interfaces.ListOptions) (*dto.ApiCallListResponse, error) // 管理端API调用记录 GetAdminApiCalls(ctx context.Context, filters map[string]interface{}, options shared_interfaces.ListOptions) (*dto.ApiCallListResponse, error) // 导出功能 ExportAdminApiCalls(ctx context.Context, filters map[string]interface{}, format string) ([]byte, error) // 加密参数接口 EncryptParams(ctx context.Context, userID string, cmd *commands.EncryptCommand) (string, error) // 解密参数接口 DecryptParams(ctx context.Context, userID string, cmd *commands.DecryptCommand) (map[string]interface{}, error) // 获取表单配置 GetFormConfig(ctx context.Context, apiCode string) (*dto.FormConfigResponse, error) // 异步任务处理接口 SaveApiCall(ctx context.Context, cmd *commands.SaveApiCallCommand) error ProcessDeduction(ctx context.Context, cmd *commands.ProcessDeductionCommand) error UpdateUsageStats(ctx context.Context, cmd *commands.UpdateUsageStatsCommand) error RecordApiLog(ctx context.Context, cmd *commands.RecordApiLogCommand) error ProcessCompensation(ctx context.Context, cmd *commands.ProcessCompensationCommand) error // 余额预警设置 GetUserBalanceAlertSettings(ctx context.Context, userID string) (map[string]interface{}, error) UpdateUserBalanceAlertSettings(ctx context.Context, userID string, enabled bool, threshold float64, alertPhone string) error TestBalanceAlertSms(ctx context.Context, userID string, phone string, balance float64, alertType string) error } type ApiApplicationServiceImpl struct { apiCallService services.ApiCallAggregateService apiUserService services.ApiUserAggregateService apiRequestService *services.ApiRequestService formConfigService services.FormConfigService apiCallRepository repositories.ApiCallRepository contractInfoService user_repositories.ContractInfoRepository productManagementService *product_services.ProductManagementService userRepo user_repositories.UserRepository txManager *database.TransactionManager config *config.Config logger *zap.Logger taskManager interfaces.TaskManager exportManager *export.ExportManager // 其他域的服务 walletService finance_services.WalletAggregateService subscriptionService *product_services.ProductSubscriptionService balanceAlertService finance_services.BalanceAlertService } func NewApiApplicationService( apiCallService services.ApiCallAggregateService, apiUserService services.ApiUserAggregateService, apiRequestService *services.ApiRequestService, formConfigService services.FormConfigService, apiCallRepository repositories.ApiCallRepository, productManagementService *product_services.ProductManagementService, userRepo user_repositories.UserRepository, txManager *database.TransactionManager, config *config.Config, logger *zap.Logger, contractInfoService user_repositories.ContractInfoRepository, taskManager interfaces.TaskManager, walletService finance_services.WalletAggregateService, subscriptionService *product_services.ProductSubscriptionService, exportManager *export.ExportManager, balanceAlertService finance_services.BalanceAlertService, ) ApiApplicationService { service := &ApiApplicationServiceImpl{ apiCallService: apiCallService, apiUserService: apiUserService, apiRequestService: apiRequestService, formConfigService: formConfigService, apiCallRepository: apiCallRepository, productManagementService: productManagementService, userRepo: userRepo, txManager: txManager, config: config, logger: logger, contractInfoService: contractInfoService, taskManager: taskManager, exportManager: exportManager, walletService: walletService, subscriptionService: subscriptionService, balanceAlertService: balanceAlertService, } return service } // CallApi 优化后的应用服务层统一入口 func (s *ApiApplicationServiceImpl) CallApi(ctx context.Context, cmd *commands.ApiCallCommand) (string, string, error) { // ==================== 第一阶段:同步关键验证 ==================== // 1. 创建ApiCall(内存中,不保存) apiCall, err := entities.NewApiCall(cmd.AccessId, cmd.Data, cmd.ClientIP) if err != nil { s.logger.Error("创建ApiCall失败", zap.Error(err)) return "", "", ErrSystem } transactionId := apiCall.TransactionId // 2. 同步验证用户和产品(关键路径) validationResult, err := s.validateApiCall(ctx, cmd, apiCall) if err != nil { // 异步记录失败状态 go s.asyncRecordFailure(context.Background(), apiCall, err) return "", "", err } // 3. 同步调用外部API(核心业务) response, err := s.callExternalApi(ctx, cmd, validationResult) if err != nil { // 异步记录失败状态 go s.asyncRecordFailure(context.Background(), apiCall, err) return "", "", err } // 4. 同步加密响应 encryptedResponse, err := crypto.AesEncrypt([]byte(response), validationResult.GetSecretKey()) if err != nil { s.logger.Error("加密响应失败", zap.Error(err)) go s.asyncRecordFailure(context.Background(), apiCall, err) return "", "", ErrSystem } // ==================== 第二阶段:异步处理非关键操作 ==================== // 5. 异步保存API调用记录 go s.asyncSaveApiCall(context.Background(), apiCall, validationResult, response) // 6. 异步扣款处理 go s.asyncProcessDeduction(context.Background(), apiCall, validationResult) // 7. 异步更新使用统计 // go s.asyncUpdateUsageStats(context.Background(), validationResult) // ==================== 第三阶段:立即返回结果 ==================== s.logger.Info("API调用成功,异步处理后续操作", zap.String("transaction_id", transactionId), zap.String("user_id", validationResult.GetUserID()), zap.String("api_name", cmd.ApiName)) return transactionId, string(encryptedResponse), nil } // validateApiCall 同步验证用户和产品信息 func (s *ApiApplicationServiceImpl) validateApiCall(ctx context.Context, cmd *commands.ApiCallCommand, apiCall *entities.ApiCall) (*dto.ApiCallValidationResult, error) { result := dto.NewApiCallValidationResult() // 1. 验证ApiUser apiUser, err := s.apiUserService.LoadApiUserByAccessId(ctx, cmd.AccessId) if err != nil { s.logger.Error("查ApiUser失败", zap.Error(err)) return nil, ErrInvalidAccessId } result.SetApiUser(apiUser) // 2. 验证产品 product, err := s.productManagementService.GetProductByCode(ctx, cmd.ApiName) if err != nil { s.logger.Error("查产品失败", zap.Error(err)) return nil, ErrProductNotFound } result.SetProduct(product) // 3. 验证用户状态 if apiUser.IsFrozen() { s.logger.Error("账户已冻结", zap.String("userId", apiUser.UserId)) return nil, ErrFrozenAccount } // 4. 验证IP白名单(非开发环境) if !s.config.App.IsDevelopment() && !cmd.Options.IsDebug { if !apiUser.IsWhiteListed(cmd.ClientIP) { s.logger.Error("IP不在白名单内", zap.String("userId", apiUser.UserId), zap.String("ip", cmd.ClientIP)) return nil, ErrInvalidIP } } // 5. 验证钱包状态 if err := s.validateWalletStatus(ctx, apiUser.UserId, product); err != nil { return nil, err } // 6. 验证订阅状态并获取订阅信息 subscription, err := s.validateSubscriptionStatus(ctx, apiUser.UserId, product) if err != nil { return nil, err } result.SetSubscription(subscription) // 7. 解密参数 requestParams, err := crypto.AesDecrypt(cmd.Data, apiUser.SecretKey) if err != nil { s.logger.Error("解密参数失败", zap.Error(err)) return nil, ErrDecryptFail } // 将解密后的字节数组转换为map var paramsMap map[string]interface{} if err := json.Unmarshal(requestParams, ¶msMap); err != nil { s.logger.Error("解析解密参数失败", zap.Error(err)) return nil, ErrDecryptFail } result.SetRequestParams(paramsMap) // 8. 获取合同信息 contractInfo, err := s.contractInfoService.FindByUserID(ctx, apiUser.UserId) if err == nil && len(contractInfo) > 0 { result.SetContractCode(contractInfo[0].ContractCode) } // 更新ApiCall信息 apiCall.ProductId = &product.ID apiCall.UserId = &apiUser.UserId result.SetApiCall(apiCall) return result, nil } // callExternalApi 同步调用外部API func (s *ApiApplicationServiceImpl) callExternalApi(ctx context.Context, cmd *commands.ApiCallCommand, validation *dto.ApiCallValidationResult) (string, error) { // 创建CallContext callContext := &processors.CallContext{ ContractCode: validation.ContractCode, } // 将transactionId放入ctx中 ctxWithTransactionId := context.WithValue(ctx, "transaction_id", validation.ApiCall.TransactionId) // 将map转换为字节数组 requestParamsBytes, err := json.Marshal(validation.RequestParams) if err != nil { s.logger.Error("序列化请求参数失败", zap.Error(err)) return "", ErrSystem } // 调用外部API response, err := s.apiRequestService.PreprocessRequestApi( ctxWithTransactionId, cmd.ApiName, requestParamsBytes, &cmd.Options, callContext) if err != nil { if errors.Is(err, processors.ErrDatasource) { return "", ErrSystem } else if errors.Is(err, processors.ErrInvalidParam) { return "", ErrInvalidParam } else if errors.Is(err, processors.ErrNotFound) { return "", ErrQueryEmpty } else { return "", ErrSystem } } return string(response), nil } // asyncSaveApiCall 异步保存API调用记录 func (s *ApiApplicationServiceImpl) asyncSaveApiCall(ctx context.Context, apiCall *entities.ApiCall, validation *dto.ApiCallValidationResult, response string) { // 标记为成功 apiCall.MarkSuccess(validation.GetAmount()) // 检查TransactionID是否已存在,避免重复创建 existingCall, err := s.apiCallRepository.FindByTransactionId(ctx, apiCall.TransactionId) if err == nil && existingCall != nil { s.logger.Warn("API调用记录已存在,跳过创建", zap.String("transaction_id", apiCall.TransactionId), zap.String("user_id", validation.GetUserID())) return // 静默返回,不报错 } // 直接保存到数据库 if err := s.apiCallRepository.Create(ctx, apiCall); err != nil { s.logger.Error("异步保存API调用记录失败", zap.Error(err)) return } // 创建任务工厂 taskFactory := task_entities.NewTaskFactoryWithManager(s.taskManager) // 创建并异步入队API调用日志任务 if err := taskFactory.CreateAndEnqueueApiLogTask( ctx, apiCall.TransactionId, validation.GetUserID(), validation.Product.Code, validation.Product.Code, ); err != nil { s.logger.Error("创建并入队API日志任务失败", zap.Error(err)) } } // asyncProcessDeduction 异步扣款处理 func (s *ApiApplicationServiceImpl) asyncProcessDeduction(ctx context.Context, apiCall *entities.ApiCall, validation *dto.ApiCallValidationResult) { // 创建任务工厂 taskFactory := task_entities.NewTaskFactoryWithManager(s.taskManager) // 为扣款任务生成独立的TransactionID,避免与API调用的TransactionID冲突 deductionTransactionID := entities.GenerateTransactionID() // 创建并异步入队扣款任务 if err := taskFactory.CreateAndEnqueueDeductionTask( ctx, apiCall.ID, validation.GetUserID(), validation.GetProductID(), validation.GetAmount().String(), deductionTransactionID, // 使用独立的TransactionID ); err != nil { s.logger.Error("创建并入队扣款任务失败", zap.Error(err)) } } // asyncUpdateUsageStats 异步更新使用统计 func (s *ApiApplicationServiceImpl) asyncUpdateUsageStats(ctx context.Context, validation *dto.ApiCallValidationResult) { // 创建任务工厂 taskFactory := task_entities.NewTaskFactoryWithManager(s.taskManager) // 创建并异步入队使用统计任务 if err := taskFactory.CreateAndEnqueueUsageStatsTask( ctx, validation.GetSubscriptionID(), validation.GetUserID(), validation.GetProductID(), 1, ); err != nil { s.logger.Error("创建并入队使用统计任务失败", zap.Error(err)) } } // asyncRecordFailure 异步记录失败状态 func (s *ApiApplicationServiceImpl) asyncRecordFailure(ctx context.Context, apiCall *entities.ApiCall, err error) { // 根据错误类型标记失败状态 var errorType string var errorMsg string switch { case errors.Is(err, ErrInvalidAccessId): errorType = entities.ApiCallErrorInvalidAccess errorMsg = err.Error() case errors.Is(err, ErrFrozenAccount): errorType = entities.ApiCallErrorFrozenAccount case errors.Is(err, ErrInvalidIP): errorType = entities.ApiCallErrorInvalidIP case errors.Is(err, ErrArrears): errorType = entities.ApiCallErrorArrears case errors.Is(err, ErrInsufficientBalance): errorType = entities.ApiCallErrorArrears case errors.Is(err, ErrProductNotFound): errorType = entities.ApiCallErrorProductNotFound errorMsg = err.Error() case errors.Is(err, ErrProductDisabled): errorType = entities.ApiCallErrorProductDisabled case errors.Is(err, ErrNotSubscribed): errorType = entities.ApiCallErrorNotSubscribed case errors.Is(err, ErrProductNotSubscribed): errorType = entities.ApiCallErrorNotSubscribed case errors.Is(err, ErrSubscriptionExpired): errorType = entities.ApiCallErrorNotSubscribed case errors.Is(err, ErrSubscriptionSuspended): errorType = entities.ApiCallErrorNotSubscribed case errors.Is(err, ErrDecryptFail): errorType = entities.ApiCallErrorDecryptFail errorMsg = err.Error() case errors.Is(err, ErrInvalidParam): errorType = entities.ApiCallErrorInvalidParam errorMsg = err.Error() case errors.Is(err, ErrQueryEmpty): errorType = entities.ApiCallErrorQueryEmpty default: errorType = entities.ApiCallErrorSystem errorMsg = err.Error() } apiCall.MarkFailed(errorType, errorMsg) // 失败请求不创建任务,只记录日志 s.logger.Info("API调用失败,记录失败状态", zap.String("transaction_id", apiCall.TransactionId), zap.String("error_type", errorType), zap.String("error_msg", errorMsg)) // 可选:如果需要统计失败请求,可以在这里添加计数器 // s.failureCounter.Inc() } // GetUserApiKeys 获取用户API密钥 func (s *ApiApplicationServiceImpl) GetUserApiKeys(ctx context.Context, userID string) (*dto.ApiKeysResponse, error) { apiUser, err := s.apiUserService.LoadApiUserByUserId(ctx, userID) if err != nil { return nil, err } return &dto.ApiKeysResponse{ ID: apiUser.ID, UserID: apiUser.UserId, AccessID: apiUser.AccessId, SecretKey: apiUser.SecretKey, Status: apiUser.Status, CreatedAt: apiUser.CreatedAt, UpdatedAt: apiUser.UpdatedAt, }, nil } // GetUserWhiteList 获取用户白名单列表 func (s *ApiApplicationServiceImpl) GetUserWhiteList(ctx context.Context, userID string) (*dto.WhiteListListResponse, error) { apiUser, err := s.apiUserService.LoadApiUserByUserId(ctx, userID) if err != nil { return nil, err } // 确保WhiteList不为nil if apiUser.WhiteList == nil { apiUser.WhiteList = []string{} } // 将白名单字符串数组转换为响应格式 var items []dto.WhiteListResponse for _, ip := range apiUser.WhiteList { items = append(items, dto.WhiteListResponse{ ID: apiUser.ID, // 使用API用户ID作为标识 UserID: apiUser.UserId, IPAddress: ip, CreatedAt: apiUser.CreatedAt, // 使用API用户创建时间 }) } return &dto.WhiteListListResponse{ Items: items, Total: len(items), }, nil } // AddWhiteListIP 添加白名单IP func (s *ApiApplicationServiceImpl) AddWhiteListIP(ctx context.Context, userID string, ipAddress string) error { apiUser, err := s.apiUserService.LoadApiUserByUserId(ctx, userID) if err != nil { return err } // 确保WhiteList不为nil if apiUser.WhiteList == nil { apiUser.WhiteList = []string{} } // 使用实体的领域方法添加IP到白名单 err = apiUser.AddToWhiteList(ipAddress) if err != nil { return err } // 保存更新 err = s.apiUserService.SaveApiUser(ctx, apiUser) if err != nil { return err } return nil } // DeleteWhiteListIP 删除白名单IP func (s *ApiApplicationServiceImpl) DeleteWhiteListIP(ctx context.Context, userID string, ipAddress string) error { apiUser, err := s.apiUserService.LoadApiUserByUserId(ctx, userID) if err != nil { return err } // 确保WhiteList不为nil if apiUser.WhiteList == nil { apiUser.WhiteList = []string{} } // 使用实体的领域方法删除IP err = apiUser.RemoveFromWhiteList(ipAddress) if err != nil { return err } // 保存更新 err = s.apiUserService.SaveApiUser(ctx, apiUser) if err != nil { return err } return nil } // GetUserApiCalls 获取用户API调用记录 func (s *ApiApplicationServiceImpl) GetUserApiCalls(ctx context.Context, userID string, filters map[string]interface{}, options shared_interfaces.ListOptions) (*dto.ApiCallListResponse, error) { // 查询API调用记录(包含产品名称) productNameMap, calls, total, err := s.apiCallRepository.ListByUserIdWithFiltersAndProductName(ctx, userID, filters, options) if err != nil { s.logger.Error("查询API调用记录失败", zap.Error(err), zap.String("userID", userID)) return nil, err } // 转换为响应DTO var items []dto.ApiCallRecordResponse for _, call := range calls { item := dto.ApiCallRecordResponse{ ID: call.ID, AccessId: call.AccessId, UserId: *call.UserId, TransactionId: call.TransactionId, ClientIp: call.ClientIp, Status: call.Status, StartAt: call.StartAt.Format("2006-01-02 15:04:05"), CreatedAt: call.CreatedAt.Format("2006-01-02 15:04:05"), UpdatedAt: call.UpdatedAt.Format("2006-01-02 15:04:05"), } // 处理可选字段 if call.ProductId != nil { item.ProductId = call.ProductId } // 从映射中获取产品名称 if productName, exists := productNameMap[call.ID]; exists { item.ProductName = &productName } if call.EndAt != nil { endAt := call.EndAt.Format("2006-01-02 15:04:05") item.EndAt = &endAt } if call.Cost != nil { cost := call.Cost.String() item.Cost = &cost } if call.ErrorType != nil { item.ErrorType = call.ErrorType } if call.ErrorMsg != nil { item.ErrorMsg = call.ErrorMsg // 添加翻译后的错误信息 item.TranslatedErrorMsg = utils.TranslateErrorMsg(call.ErrorType, call.ErrorMsg) } items = append(items, item) } return &dto.ApiCallListResponse{ Items: items, Total: total, Page: options.Page, Size: options.PageSize, }, nil } // GetAdminApiCalls 获取管理端API调用记录 func (s *ApiApplicationServiceImpl) GetAdminApiCalls(ctx context.Context, filters map[string]interface{}, options shared_interfaces.ListOptions) (*dto.ApiCallListResponse, error) { // 查询API调用记录(包含产品名称) productNameMap, calls, total, err := s.apiCallRepository.ListWithFiltersAndProductName(ctx, filters, options) if err != nil { s.logger.Error("查询API调用记录失败", zap.Error(err)) return nil, err } // 转换为响应DTO var items []dto.ApiCallRecordResponse for _, call := range calls { // 基础字段安全检查 if call.ID == "" { s.logger.Warn("跳过无效的API调用记录:ID为空") continue } item := dto.ApiCallRecordResponse{ ID: call.ID, AccessId: call.AccessId, TransactionId: call.TransactionId, ClientIp: call.ClientIp, Status: call.Status, } // 安全设置用户ID if call.UserId != nil && *call.UserId != "" { item.UserId = *call.UserId } else { item.UserId = "未知用户" } // 安全设置时间字段 if !call.StartAt.IsZero() { item.StartAt = call.StartAt.Format("2006-01-02 15:04:05") } else { item.StartAt = "未知时间" } if !call.CreatedAt.IsZero() { item.CreatedAt = call.CreatedAt.Format("2006-01-02 15:04:05") } else { item.CreatedAt = "未知时间" } if !call.UpdatedAt.IsZero() { item.UpdatedAt = call.UpdatedAt.Format("2006-01-02 15:04:05") } else { item.UpdatedAt = "未知时间" } // 处理可选字段 if call.ProductId != nil && *call.ProductId != "" { item.ProductId = call.ProductId } // 从映射中获取产品名称 if productName, exists := productNameMap[call.ID]; exists && productName != "" { item.ProductName = &productName } // 安全设置结束时间 if call.EndAt != nil && !call.EndAt.IsZero() { endAt := call.EndAt.Format("2006-01-02 15:04:05") item.EndAt = &endAt } // 安全设置费用 if call.Cost != nil { cost := call.Cost.String() if cost != "" { item.Cost = &cost } } // 安全设置错误类型 if call.ErrorType != nil && *call.ErrorType != "" { item.ErrorType = call.ErrorType } // 安全设置错误信息 if call.ErrorMsg != nil && *call.ErrorMsg != "" { item.ErrorMsg = call.ErrorMsg // 添加翻译后的错误信息 if call.ErrorType != nil && *call.ErrorType != "" { item.TranslatedErrorMsg = utils.TranslateErrorMsg(call.ErrorType, call.ErrorMsg) } } // 获取用户信息和企业名称(增强空指针防护) if call.UserId != nil && *call.UserId != "" { user, err := s.userRepo.GetByIDWithEnterpriseInfo(ctx, *call.UserId) if err == nil && user.ID != "" { companyName := "未知企业" // 安全获取企业名称 if user.EnterpriseInfo != nil && user.EnterpriseInfo.CompanyName != "" { companyName = user.EnterpriseInfo.CompanyName } item.CompanyName = &companyName // 安全构建用户响应 item.User = &dto.UserSimpleResponse{ ID: user.ID, CompanyName: companyName, Phone: user.Phone, } // 验证用户数据的完整性 if user.Phone == "" { s.logger.Warn("用户手机号为空", zap.String("user_id", user.ID), zap.String("call_id", call.ID)) item.User.Phone = "未知手机号" } } else { // 用户查询失败或用户数据不完整时的处理 if err != nil { s.logger.Warn("获取用户信息失败", zap.String("user_id", *call.UserId), zap.String("call_id", call.ID), zap.Error(err)) } else if user.ID == "" { s.logger.Warn("用户ID为空", zap.String("call_user_id", *call.UserId), zap.String("call_id", call.ID)) } // 设置默认值 defaultCompanyName := "未知企业" item.CompanyName = &defaultCompanyName item.User = &dto.UserSimpleResponse{ ID: "未知用户", CompanyName: defaultCompanyName, Phone: "未知手机号", } } } else { // 用户ID为空时的处理 defaultCompanyName := "未知企业" item.CompanyName = &defaultCompanyName item.User = &dto.UserSimpleResponse{ ID: "未知用户", CompanyName: defaultCompanyName, Phone: "未知手机号", } } items = append(items, item) } return &dto.ApiCallListResponse{ Items: items, Total: total, Page: options.Page, Size: options.PageSize, }, nil } // ExportAdminApiCalls 导出管理端API调用记录 func (s *ApiApplicationServiceImpl) ExportAdminApiCalls(ctx context.Context, filters map[string]interface{}, format string) ([]byte, error) { const batchSize = 1000 // 每批处理1000条记录 var allCalls []*entities.ApiCall var productNameMap map[string]string // 分批获取数据 page := 1 for { // 查询当前批次的数据 batchProductNameMap, calls, _, err := s.apiCallRepository.ListWithFiltersAndProductName(ctx, filters, shared_interfaces.ListOptions{ Page: page, PageSize: batchSize, Sort: "created_at", Order: "desc", }) if err != nil { s.logger.Error("查询导出API调用记录失败", zap.Error(err)) return nil, err } // 合并产品名称映射 if productNameMap == nil { productNameMap = batchProductNameMap } else { for k, v := range batchProductNameMap { productNameMap[k] = v } } // 添加到总数据中 allCalls = append(allCalls, calls...) // 如果当前批次数据少于批次大小,说明已经是最后一批 if len(calls) < batchSize { break } page++ } // 批量获取企业名称映射,避免N+1查询问题 companyNameMap, err := s.batchGetCompanyNamesForApiCalls(ctx, allCalls) if err != nil { s.logger.Warn("批量获取企业名称失败,使用默认值", zap.Error(err)) companyNameMap = make(map[string]string) } // 准备导出数据 headers := []string{"企业名称", "产品名称", "交易ID", "客户端IP", "状态", "开始时间", "结束时间"} columnWidths := []float64{30, 20, 40, 15, 10, 20, 20} data := make([][]interface{}, len(allCalls)) for i, call := range allCalls { // 从映射中获取企业名称 companyName := "未知企业" if call.UserId != nil { companyName = companyNameMap[*call.UserId] if companyName == "" { companyName = "未知企业" } } // 获取产品名称 productName := "未知产品" if call.ID != "" { productName = productNameMap[call.ID] if productName == "" { productName = "未知产品" } } // 格式化时间 startAt := call.StartAt.Format("2006-01-02 15:04:05") endAt := "" if call.EndAt != nil { endAt = call.EndAt.Format("2006-01-02 15:04:05") } data[i] = []interface{}{ companyName, productName, call.TransactionId, call.ClientIp, call.Status, startAt, endAt, } } // 创建导出配置 config := &export.ExportConfig{ SheetName: "API调用记录", Headers: headers, Data: data, ColumnWidths: columnWidths, } // 使用导出管理器生成文件 return s.exportManager.Export(ctx, config, format) } // EncryptParams 加密参数 func (s *ApiApplicationServiceImpl) EncryptParams(ctx context.Context, userID string, cmd *commands.EncryptCommand) (string, error) { // 1. 将数据转换为JSON字节数组 jsonData, err := json.Marshal(cmd.Data) if err != nil { s.logger.Error("序列化参数失败", zap.Error(err)) return "", err } // 2. 使用前端传来的SecretKey进行加密 encryptedData, err := crypto.AesEncrypt(jsonData, cmd.SecretKey) if err != nil { s.logger.Error("加密参数失败", zap.Error(err)) return "", err } return encryptedData, nil } // DecryptParams 解密参数 func (s *ApiApplicationServiceImpl) DecryptParams(ctx context.Context, userID string, cmd *commands.DecryptCommand) (map[string]interface{}, error) { // 1. 使用前端传来的SecretKey进行解密 decryptedData, err := crypto.AesDecrypt(cmd.EncryptedData, cmd.SecretKey) if err != nil { s.logger.Error("解密参数失败", zap.Error(err)) return nil, err } // 2. 将解密后的JSON字节数组转换为map var result map[string]interface{} err = json.Unmarshal(decryptedData, &result) if err != nil { s.logger.Error("反序列化解密数据失败", zap.Error(err)) return nil, err } return result, nil } // GetFormConfig 获取指定API的表单配置 func (s *ApiApplicationServiceImpl) GetFormConfig(ctx context.Context, apiCode string) (*dto.FormConfigResponse, error) { // 调用领域服务获取表单配置 config, err := s.formConfigService.GetFormConfig(apiCode) if err != nil { s.logger.Error("获取表单配置失败", zap.String("api_code", apiCode), zap.Error(err)) return nil, err } if config == nil { return nil, nil } // 转换为应用层DTO response := &dto.FormConfigResponse{ ApiCode: config.ApiCode, Fields: make([]dto.FormField, len(config.Fields)), } for i, field := range config.Fields { response.Fields[i] = dto.FormField{ Name: field.Name, Label: field.Label, Type: field.Type, Required: field.Required, Validation: field.Validation, Description: field.Description, Example: field.Example, Placeholder: field.Placeholder, } } return response, nil } // ==================== 异步任务处理方法 ==================== // SaveApiCall 保存API调用记录 func (s *ApiApplicationServiceImpl) SaveApiCall(ctx context.Context, cmd *commands.SaveApiCallCommand) error { s.logger.Debug("开始保存API调用记录", zap.String("transaction_id", cmd.TransactionID), zap.String("user_id", cmd.UserID)) // 创建ApiCall实体 apiCall := &entities.ApiCall{ ID: cmd.ApiCallID, AccessId: "", // SaveApiCallCommand中没有AccessID字段 UserId: &cmd.UserID, TransactionId: cmd.TransactionID, ClientIp: cmd.ClientIP, Status: cmd.Status, StartAt: time.Now(), // SaveApiCallCommand中没有StartAt字段 EndAt: nil, // SaveApiCallCommand中没有EndAt字段 Cost: &[]decimal.Decimal{decimal.NewFromFloat(cmd.Cost)}[0], // 转换float64为*decimal.Decimal ErrorType: &cmd.ErrorType, // 转换string为*string ErrorMsg: &cmd.ErrorMsg, // 转换string为*string CreatedAt: time.Now(), UpdatedAt: time.Now(), } // 保存到数据库 if err := s.apiCallRepository.Create(ctx, apiCall); err != nil { s.logger.Error("保存API调用记录失败", zap.Error(err)) return err } s.logger.Info("API调用记录保存成功", zap.String("transaction_id", cmd.TransactionID), zap.String("status", cmd.Status)) return nil } // ProcessDeduction 处理扣款 func (s *ApiApplicationServiceImpl) ProcessDeduction(ctx context.Context, cmd *commands.ProcessDeductionCommand) error { s.logger.Debug("开始处理扣款", zap.String("transaction_id", cmd.TransactionID), zap.String("user_id", cmd.UserID), zap.String("amount", cmd.Amount)) // 直接调用钱包服务进行扣款 amount, err := decimal.NewFromString(cmd.Amount) if err != nil { s.logger.Error("金额格式错误", zap.Error(err)) return err } if err := s.walletService.Deduct(ctx, cmd.UserID, amount, cmd.ApiCallID, cmd.TransactionID, cmd.ProductID); err != nil { s.logger.Error("扣款处理失败", zap.String("transaction_id", cmd.TransactionID), zap.Error(err)) return err } s.logger.Info("扣款处理成功", zap.String("transaction_id", cmd.TransactionID), zap.String("user_id", cmd.UserID)) return nil } // UpdateUsageStats 更新使用统计 func (s *ApiApplicationServiceImpl) UpdateUsageStats(ctx context.Context, cmd *commands.UpdateUsageStatsCommand) error { s.logger.Debug("开始更新使用统计", zap.String("subscription_id", cmd.SubscriptionID), zap.String("user_id", cmd.UserID), zap.Int("increment", cmd.Increment)) // 直接调用订阅服务更新使用统计 if err := s.subscriptionService.IncrementSubscriptionAPIUsage(ctx, cmd.SubscriptionID, int64(cmd.Increment)); err != nil { s.logger.Error("更新使用统计失败", zap.String("subscription_id", cmd.SubscriptionID), zap.Error(err)) return err } s.logger.Info("使用统计更新成功", zap.String("subscription_id", cmd.SubscriptionID), zap.String("user_id", cmd.UserID)) return nil } // RecordApiLog 记录API日志 func (s *ApiApplicationServiceImpl) RecordApiLog(ctx context.Context, cmd *commands.RecordApiLogCommand) error { s.logger.Debug("开始记录API日志", zap.String("transaction_id", cmd.TransactionID), zap.String("api_name", cmd.ApiName), zap.String("user_id", cmd.UserID)) // 记录结构化日志 s.logger.Info("API调用日志", zap.String("transaction_id", cmd.TransactionID), zap.String("user_id", cmd.UserID), zap.String("api_name", cmd.ApiName), zap.String("client_ip", cmd.ClientIP), zap.Int64("response_size", cmd.ResponseSize), zap.Time("timestamp", time.Now())) // 这里可以添加其他日志记录逻辑 // 例如:写入专门的日志文件、发送到日志系统、写入数据库等 s.logger.Info("API日志记录成功", zap.String("transaction_id", cmd.TransactionID), zap.String("api_name", cmd.ApiName), zap.String("user_id", cmd.UserID)) return nil } // ProcessCompensation 处理补偿 func (s *ApiApplicationServiceImpl) ProcessCompensation(ctx context.Context, cmd *commands.ProcessCompensationCommand) error { s.logger.Debug("开始处理补偿", zap.String("transaction_id", cmd.TransactionID), zap.String("type", cmd.Type)) // 根据补偿类型处理不同的补偿逻辑 switch cmd.Type { case "refund": // 退款补偿 - ProcessCompensationCommand中没有Amount字段,暂时只记录日志 s.logger.Info("退款补偿处理", zap.String("transaction_id", cmd.TransactionID)) case "credit": // 积分补偿 - ProcessCompensationCommand中没有CreditAmount字段,暂时只记录日志 s.logger.Info("积分补偿处理", zap.String("transaction_id", cmd.TransactionID)) case "subscription_extension": // 订阅延期补偿 - ProcessCompensationCommand中没有ExtensionDays字段,暂时只记录日志 s.logger.Info("订阅延期补偿处理", zap.String("transaction_id", cmd.TransactionID)) default: s.logger.Warn("未知的补偿类型", zap.String("type", cmd.Type)) return fmt.Errorf("未知的补偿类型: %s", cmd.Type) } s.logger.Info("补偿处理成功", zap.String("transaction_id", cmd.TransactionID), zap.String("type", cmd.Type)) return nil } // validateWalletStatus 验证钱包状态 func (s *ApiApplicationServiceImpl) validateWalletStatus(ctx context.Context, userID string, product *product_entities.Product) error { // 1. 获取用户钱包信息 wallet, err := s.walletService.LoadWalletByUserId(ctx, userID) if err != nil { s.logger.Error("获取钱包信息失败", zap.String("user_id", userID), zap.Error(err)) return ErrSystem } // 2. 检查钱包是否激活 if !wallet.IsActive { s.logger.Error("钱包未激活", zap.String("user_id", userID), zap.String("wallet_id", wallet.ID)) return ErrFrozenAccount } // 3. 检查钱包余额是否充足 requiredAmount := product.Price if wallet.Balance.LessThan(requiredAmount) { s.logger.Error("钱包余额不足", zap.String("user_id", userID), zap.String("balance", wallet.Balance.String()), zap.String("required_amount", requiredAmount.String()), zap.String("product_code", product.Code)) return ErrInsufficientBalance } // 4. 检查是否欠费 if wallet.IsArrears() { s.logger.Error("钱包存在欠费", zap.String("user_id", userID), zap.String("wallet_id", wallet.ID)) return ErrFrozenAccount } s.logger.Info("钱包状态验证通过", zap.String("user_id", userID), zap.String("wallet_id", wallet.ID), zap.String("balance", wallet.Balance.String())) return nil } // validateSubscriptionStatus 验证订阅状态并返回订阅信息 func (s *ApiApplicationServiceImpl) validateSubscriptionStatus(ctx context.Context, userID string, product *product_entities.Product) (*product_entities.Subscription, error) { // 1. 检查用户是否已订阅该产品 subscription, err := s.subscriptionService.UserSubscribedProductByCode(ctx, userID, product.Code) if err != nil { // 如果没有找到订阅记录,说明用户未订阅 s.logger.Error("用户未订阅该产品", zap.String("user_id", userID), zap.String("product_code", product.Code), zap.Error(err)) return nil, ErrProductNotSubscribed } // 2. 检查订阅是否有效(未删除) if !subscription.IsValid() { s.logger.Error("订阅已失效", zap.String("user_id", userID), zap.String("subscription_id", subscription.ID), zap.String("product_code", product.Code)) return nil, ErrSubscriptionExpired } s.logger.Info("订阅状态验证通过", zap.String("user_id", userID), zap.String("subscription_id", subscription.ID), zap.String("product_code", product.Code), zap.String("subscription_price", subscription.Price.String()), zap.Int64("api_used", subscription.APIUsed)) return subscription, nil } // batchGetCompanyNamesForApiCalls 批量获取企业名称映射(用于API调用记录) func (s *ApiApplicationServiceImpl) batchGetCompanyNamesForApiCalls(ctx context.Context, calls []*entities.ApiCall) (map[string]string, error) { // 收集所有唯一的用户ID userIDSet := make(map[string]bool) for _, call := range calls { if call.UserId != nil && *call.UserId != "" { userIDSet[*call.UserId] = true } } // 转换为切片 userIDs := make([]string, 0, len(userIDSet)) for userID := range userIDSet { userIDs = append(userIDs, userID) } // 批量查询用户信息 users, err := s.userRepo.BatchGetByIDsWithEnterpriseInfo(ctx, userIDs) if err != nil { return nil, err } // 构建企业名称映射 companyNameMap := make(map[string]string) for _, user := range users { companyName := "未知企业" if user.EnterpriseInfo != nil { companyName = user.EnterpriseInfo.CompanyName } companyNameMap[user.ID] = companyName } return companyNameMap, nil } // GetUserBalanceAlertSettings 获取用户余额预警设置 func (s *ApiApplicationServiceImpl) GetUserBalanceAlertSettings(ctx context.Context, userID string) (map[string]interface{}, error) { // 获取API用户信息 apiUser, err := s.apiUserService.LoadApiUserByUserId(ctx, userID) if err != nil { s.logger.Error("获取API用户信息失败", zap.String("user_id", userID), zap.Error(err)) return nil, fmt.Errorf("获取API用户信息失败: %w", err) } if apiUser == nil { return nil, fmt.Errorf("API用户不存在") } // 返回预警设置 settings := map[string]interface{}{ "enabled": apiUser.BalanceAlertEnabled, "threshold": apiUser.BalanceAlertThreshold, "alert_phone": apiUser.AlertPhone, } return settings, nil } // UpdateUserBalanceAlertSettings 更新用户余额预警设置 func (s *ApiApplicationServiceImpl) UpdateUserBalanceAlertSettings(ctx context.Context, userID string, enabled bool, threshold float64, alertPhone string) error { // 获取API用户信息 apiUser, err := s.apiUserService.LoadApiUserByUserId(ctx, userID) if err != nil { s.logger.Error("获取API用户信息失败", zap.String("user_id", userID), zap.Error(err)) return fmt.Errorf("获取API用户信息失败: %w", err) } if apiUser == nil { return fmt.Errorf("API用户不存在") } // 更新预警设置 if err := apiUser.UpdateBalanceAlertSettings(enabled, threshold, alertPhone); err != nil { s.logger.Error("更新预警设置失败", zap.String("user_id", userID), zap.Error(err)) return fmt.Errorf("更新预警设置失败: %w", err) } // 保存到数据库 if err := s.apiUserService.SaveApiUser(ctx, apiUser); err != nil { s.logger.Error("保存API用户信息失败", zap.String("user_id", userID), zap.Error(err)) return fmt.Errorf("保存API用户信息失败: %w", err) } s.logger.Info("用户余额预警设置更新成功", zap.String("user_id", userID), zap.Bool("enabled", enabled), zap.Float64("threshold", threshold), zap.String("alert_phone", alertPhone)) return nil } // TestBalanceAlertSms 测试余额预警短信 func (s *ApiApplicationServiceImpl) TestBalanceAlertSms(ctx context.Context, userID string, phone string, balance float64, alertType string) error { // 获取用户信息以获取企业名称 user, err := s.userRepo.GetByID(ctx, userID) if err != nil { s.logger.Error("获取用户信息失败", zap.String("user_id", userID), zap.Error(err)) return fmt.Errorf("获取用户信息失败: %w", err) } // 获取企业名称 enterpriseName := "天远数据用户" if user.EnterpriseInfo != nil && user.EnterpriseInfo.CompanyName != "" { enterpriseName = user.EnterpriseInfo.CompanyName } // 调用短信服务发送测试短信 if err := s.balanceAlertService.CheckAndSendAlert(ctx, userID, decimal.NewFromFloat(balance)); err != nil { s.logger.Error("发送测试预警短信失败", zap.String("user_id", userID), zap.String("phone", phone), zap.Float64("balance", balance), zap.String("alert_type", alertType), zap.Error(err)) return fmt.Errorf("发送测试短信失败: %w", err) } s.logger.Info("测试预警短信发送成功", zap.String("user_id", userID), zap.String("phone", phone), zap.Float64("balance", balance), zap.String("alert_type", alertType), zap.String("enterprise_name", enterpriseName)) return nil }