This commit is contained in:
2026-01-30 16:51:19 +08:00
commit 4609219a4d
606 changed files with 65580 additions and 0 deletions

View File

@@ -0,0 +1,167 @@
package queue
import (
"context"
"tyass-server/app/main/api/internal/svc"
"tyass-server/app/main/model"
"tyass-server/common/globalkey"
"database/sql"
"fmt"
"strconv"
"time"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
// TASKTIME 定义为每天凌晨3点执行
const TASKTIME = "0 3 * * *"
type CleanQueryDataHandler struct {
svcCtx *svc.ServiceContext
}
func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler {
return &CleanQueryDataHandler{
svcCtx: svcCtx,
}
}
// 获取配置值
func (l *CleanQueryDataHandler) getConfigValue(ctx context.Context, key string) (string, error) {
// 通过缓存获取配置
config, err := l.svcCtx.QueryCleanupConfigModel.FindOneByConfigKey(ctx, key)
if err != nil {
if err == model.ErrNotFound {
return "", fmt.Errorf("配置项 %s 不存在", key)
}
return "", err
}
// 检查配置状态
if config.Status != 1 {
return "", fmt.Errorf("配置项 %s 已禁用或已删除", key)
}
return config.ConfigValue, nil
}
func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
now := time.Now()
logx.Infof("%s - 开始执行查询数据清理任务", now.Format("2006-01-02 15:04:05"))
// 1. 检查是否启用清理
enableCleanup, err := l.getConfigValue(ctx, "enable_cleanup")
if err != nil {
return err
}
if enableCleanup != "1" {
logx.Infof("查询数据清理任务已禁用")
return nil
}
// 2. 获取保留天数
retentionDaysStr, err := l.getConfigValue(ctx, "retention_days")
if err != nil {
return err
}
retentionDays, err := strconv.Atoi(retentionDaysStr)
if err != nil {
return err
}
// 3. 获取批次大小
batchSizeStr, err := l.getConfigValue(ctx, "batch_size")
if err != nil {
return err
}
batchSize, err := strconv.Atoi(batchSizeStr)
if err != nil {
return err
}
// 计算清理截止时间
cleanupBefore := now.AddDate(0, 0, -retentionDays)
// 创建清理日志记录
cleanupLog := &model.QueryCleanupLog{
CleanupTime: now,
CleanupBefore: cleanupBefore,
Status: 1,
Remark: sql.NullString{String: "定时清理数据", Valid: true},
}
// 使用事务处理清理操作和日志记录
err = l.svcCtx.QueryModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error {
// 分批处理
for {
// 1. 查询一批要删除的记录
builder := l.svcCtx.QueryModel.SelectBuilder().
Where("create_time < ?", cleanupBefore).
Where("del_state = ?", globalkey.DelStateNo).
Limit(uint64(batchSize))
queries, err := l.svcCtx.QueryModel.FindAll(ctx, builder, "")
if err != nil {
cleanupLog.Status = 2
cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
return err
}
if len(queries) == 0 {
break // 没有更多数据需要清理
}
// 2. 执行清理
for _, query := range queries {
err = l.svcCtx.QueryModel.DeleteSoft(ctx, session, query)
if err != nil {
cleanupLog.Status = 2
cleanupLog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
return err
}
}
// 3. 更新影响行数
cleanupLog.AffectedRows += int64(len(queries))
// 4. 保存清理日志(每批次都记录)
cleanupLogInsertResult, err := l.svcCtx.QueryCleanupLogModel.Insert(ctx, session, cleanupLog)
if err != nil {
return err
}
cleanupLogId, err := cleanupLogInsertResult.LastInsertId()
if err != nil {
return err
}
// 5. 保存清理明细
for _, query := range queries {
detail := &model.QueryCleanupDetail{
CleanupLogId: cleanupLogId,
QueryId: query.Id,
OrderId: query.OrderId,
UserId: query.UserId,
ProductId: query.ProductId,
QueryState: query.QueryState,
CreateTimeOld: query.CreateTime,
}
_, err = l.svcCtx.QueryCleanupDetailModel.Insert(ctx, session, detail)
if err != nil {
return err
}
}
}
return nil
})
if err != nil {
logx.Errorf("%s - 清理查询数据失败: %v", now.Format("2006-01-02 15:04:05"), err)
return err
}
logx.Infof("%s - 查询数据清理完成,共删除 %d 条记录", now.Format("2006-01-02 15:04:05"), cleanupLog.AffectedRows)
return nil
}

View File

@@ -0,0 +1,474 @@
package queue
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"net/url"
"os"
"path"
"regexp"
"strings"
paylogic "tyass-server/app/main/api/internal/logic/pay"
"tyass-server/app/main/api/internal/service"
"tyass-server/app/main/api/internal/svc"
"tyass-server/app/main/api/internal/types"
"tyass-server/app/main/model"
"tyass-server/pkg/lzkit/crypto"
"tyass-server/pkg/lzkit/lzUtils"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
)
type PaySuccessNotifyUserHandler struct {
svcCtx *svc.ServiceContext
}
func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotifyUserHandler {
return &PaySuccessNotifyUserHandler{
svcCtx: svcCtx,
}
}
var payload struct {
OrderID int64 `json:"order_id"`
}
func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// 从任务的负载中解码数据
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析任务负载失败: %w", err)
}
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
if err != nil {
return fmt.Errorf("无效的订单ID: %d, %v", payload.OrderID, err)
}
// 必须已支付才处理:仅支付宝/微信/苹果回调或 pay_method=test 的异步流程会将订单标为 paid此处不再按 ENV 放宽
if order.Status != "paid" {
err = fmt.Errorf("无效的订单状态(非已支付): orderID=%d, status=%s", payload.OrderID, order.Status)
logx.Errorf("处理任务失败,原因: %v", err)
return asynq.SkipRetry
}
env := os.Getenv("ENV")
product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId)
if err != nil {
return fmt.Errorf("找不到相关产品: orderID: %d, productID: %d", payload.OrderID, order.ProductId)
}
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
cache, cacheErr := l.svcCtx.Redis.GetCtx(ctx, redisKey)
if cacheErr != nil {
return fmt.Errorf("获取缓存内容失败: %+v", cacheErr)
}
var data types.QueryCacheLoad
err = json.Unmarshal([]byte(cache), &data)
if err != nil {
return fmt.Errorf("解析缓存内容失败: %+v", err)
}
secretKey := l.svcCtx.Config.Encrypt.SecretKey
key, decodeErr := hex.DecodeString(secretKey)
if decodeErr != nil {
return fmt.Errorf("获取AES密钥失败: %+v", decodeErr)
}
decryptData, aesdecryptErr := crypto.AesDecrypt(data.Params, key)
if aesdecryptErr != nil {
return fmt.Errorf("解密参数失败: %+v", aesdecryptErr)
}
query := &model.Query{
OrderId: order.Id,
UserId: order.UserId,
ProductId: product.Id,
QueryParams: data.Params,
QueryState: "pending",
}
result, insertQueryErr := l.svcCtx.QueryModel.Insert(ctx, nil, query)
if insertQueryErr != nil {
return fmt.Errorf("保存查询失败: %+v", insertQueryErr)
}
// 获取插入后的ID
queryId, err := result.LastInsertId()
if err != nil {
return fmt.Errorf("获取插入的查询ID失败: %+v", err)
}
// 从数据库中查询完整的查询记录
query, err = l.svcCtx.QueryModel.FindOne(ctx, queryId)
if err != nil {
return fmt.Errorf("获取插入后的查询记录失败: %+v", err)
}
// 解析解密后的参数获取用户信息
var userInfo map[string]interface{}
if err := json.Unmarshal(decryptData, &userInfo); err != nil {
return fmt.Errorf("解析用户信息失败: %+v", err)
}
// 生成授权书
authDoc, err := l.svcCtx.AuthorizationService.GenerateAuthorizationDocument(
ctx, order.UserId, order.Id, queryId, userInfo,
)
if err != nil {
logx.Errorf("生成授权书失败: %v", err)
}
// 将授权书URL添加到解密数据中
if authDoc != nil {
// 生成授权书下载接口URL
downloadURL := l.buildAuthorizationDownloadURL(authDoc.FileName)
userInfo["authorization_url"] = downloadURL
// 重新序列化用户信息
updatedDecryptData, marshalErr := json.Marshal(userInfo)
if marshalErr != nil {
logx.Errorf("序列化用户信息失败: %v", marshalErr)
} else {
// 重新加密更新后的数据
encryptedUpdatedData, encryptErr := crypto.AesEncrypt(updatedDecryptData, key)
if encryptErr != nil {
logx.Errorf("重新加密用户信息失败: %v", encryptErr)
} else {
// 更新查询记录中的参数
query.QueryParams = string(encryptedUpdatedData)
updateParamsErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateParamsErr != nil {
logx.Errorf("更新查询参数失败: %v", updateParamsErr)
} else {
logx.Infof("成功更新查询参数包含授权书URL: %s", downloadURL)
}
}
decryptData = updatedDecryptData
}
}
// 调用API请求服务开发环境下不调用其它产品使用默认空报告
var responseData []service.APIResponseData
if env == "development" {
// 开发环境:生成仅包含基本信息的默认空报告,不调用外部 API
// 空报告模式生成空的报告数据跳过API调用
logx.Infof("空报告模式:订单 %s (ID: %s) 跳过API调用生成空报告", order.OrderNo, order.Id)
// 空数组,表示没有数据;与 json.Marshal 配合得到 []
responseData = []service.APIResponseData{}
} else {
var processErr error
responseData, processErr = l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id)
if processErr != nil {
return l.handleError(ctx, processErr, order, query)
}
}
// 计算成功模块的总成本价
totalCostPrice := 0.0
if responseData != nil {
for _, item := range responseData {
if item.Success {
// 根据API ID查找功能模块
feature, err := l.svcCtx.FeatureModel.FindOneByApiId(ctx, item.ApiID)
if err != nil {
logx.Errorf("查找功能模块失败, API ID: %s, 错误: %v", item.ApiID, err)
continue
}
// 累加成本价
totalCostPrice += feature.CostPrice
}
}
}
// 更新订单的销售成本
order.SalesCost = totalCostPrice
updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order)
if updateOrderErr != nil {
logx.Errorf("更新订单销售成本失败, 订单ID: %d, 错误: %v", order.Id, updateOrderErr)
// 注意:这里不返回错误,因为订单销售成本更新失败不应影响整个查询流程
} else {
logx.Infof("成功更新订单销售成本, 订单ID: %d, 总成本价: %f", order.Id, totalCostPrice)
}
// 对返回的类型进行二进制转换
combinedResponse, marshalErr := json.Marshal(responseData)
if marshalErr != nil {
err = fmt.Errorf("响应数据转 JSON 失败: %v", marshalErr)
return l.handleError(ctx, err, order, query)
}
// 加密返回响应
encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
if aesEncryptErr != nil {
err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
return l.handleError(ctx, err, order, query)
}
query.QueryData = lzUtils.StringToNullString(encryptData)
updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateErr != nil {
err = fmt.Errorf("保存响应数据失败: %v", updateErr)
return l.handleError(ctx, err, order, query)
}
query.QueryState = "success"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
return l.handleError(ctx, updateQueryErr, order, query)
}
err = l.svcCtx.AgentService.AgentProcess(ctx, order)
if err != nil {
return l.handleError(ctx, err, order, query)
}
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
if delErr != nil {
logx.Errorf("删除Redis缓存失败但任务已成功处理订单ID: %d, 错误: %v", order.Id, delErr)
}
return nil
}
func (l *PaySuccessNotifyUserHandler) buildAuthorizationDownloadURL(fileName string) string {
escapedFileName := url.PathEscape(fileName)
base := l.svcCtx.Config.Authorization.FileBaseURL
if parsed, err := url.Parse(base); err == nil && parsed.Scheme != "" && parsed.Host != "" {
parsed.Path = path.Join("/api/v1/authorization/download/file", escapedFileName)
parsed.RawQuery = ""
parsed.Fragment = ""
return parsed.String()
}
return fmt.Sprintf("/api/v1/authorization/download/file/%s", escapedFileName)
}
// 定义一个中间件函数
func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error {
logx.Errorf("处理任务失败,原因: %v", err)
redisKey := fmt.Sprintf(types.QueryCacheKey, order.UserId, order.OrderNo)
_, delErr := l.svcCtx.Redis.DelCtx(ctx, redisKey)
if delErr != nil {
logx.Errorf("删除Redis缓存失败订单ID: %d, 错误: %v", order.Id, delErr)
}
if order.Status == "paid" && query.QueryState == "pending" {
// 更新查询状态为失败
query.QueryState = "failed"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
logx.Errorf("更新查询状态失败订单ID: %d, 错误: %v", order.Id, updateQueryErr)
return asynq.SkipRetry
}
// 退款
if order.PaymentPlatform == "wechat" {
// 微信退款为异步结果,这里只发起退款申请,订单状态与佣金/钱包扣减交由退款回调统一处理
refundErr := l.svcCtx.WechatPayService.WeChatRefund(ctx, order.OrderNo, order.Amount, order.Amount)
if refundErr != nil {
logx.Error(refundErr)
return asynq.SkipRetry
}
logx.Infof("已发起微信退款申请, orderID: %d, amount: %f", order.Id, order.Amount)
return asynq.SkipRetry
} else {
// 支付宝退款为同步结果,这里直接根据返回结果更新订单和佣金/钱包
refund, refundErr := l.svcCtx.AlipayService.AliRefund(ctx, order.OrderNo, order.Amount)
if refundErr != nil {
logx.Error(refundErr)
return asynq.SkipRetry
}
if refund.IsSuccess() {
logx.Infof("支付宝退款成功, orderID: %d", order.Id)
// 更新订单状态为退款
order.Status = "refunded"
updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order)
if updateOrderErr != nil {
logx.Errorf("更新订单状态失败订单ID: %d, 错误: %v", order.Id, updateOrderErr)
return fmt.Errorf("更新订单状态失败: %v", updateOrderErr)
}
// 使用公共函数按本次退款金额处理佣金和钱包扣除
_ = paylogic.HandleCommissionAndWalletDeduction(ctx, l.svcCtx, nil, order, order.Amount)
return asynq.SkipRetry
} else {
logx.Errorf("支付宝退款失败:%v", refundErr)
return asynq.SkipRetry
}
// 直接成功
}
}
return asynq.SkipRetry
}
// desensitizeParams 对敏感数据进行脱敏处理
func (l *PaySuccessNotifyUserHandler) desensitizeParams(data []byte) ([]byte, error) {
// 解析JSON数据到map
var paramsMap map[string]interface{}
if err := json.Unmarshal(data, &paramsMap); err != nil {
return nil, fmt.Errorf("解析JSON数据失败: %v", err)
}
// 处理可能包含敏感信息的字段
for key, value := range paramsMap {
if strValue, ok := value.(string); ok {
// 根据字段名和内容判断并脱敏
if isNameField(key) && len(strValue) > 0 {
// 姓名脱敏
paramsMap[key] = maskName(strValue)
} else if isIDCardField(key) && len(strValue) > 10 {
// 身份证号脱敏
paramsMap[key] = maskIDCard(strValue)
} else if isPhoneField(key) && len(strValue) >= 8 {
// 手机号脱敏
paramsMap[key] = maskPhone(strValue)
} else if len(strValue) > 3 {
// 其他所有未匹配的字段都进行通用脱敏
paramsMap[key] = maskGeneral(strValue)
}
} else if mapValue, ok := value.(map[string]interface{}); ok {
// 递归处理嵌套的map
for subKey, subValue := range mapValue {
if subStrValue, ok := subValue.(string); ok {
if isNameField(subKey) && len(subStrValue) > 0 {
mapValue[subKey] = maskName(subStrValue)
} else if isIDCardField(subKey) && len(subStrValue) > 10 {
mapValue[subKey] = maskIDCard(subStrValue)
} else if isPhoneField(subKey) && len(subStrValue) >= 8 {
mapValue[subKey] = maskPhone(subStrValue)
} else if len(subStrValue) > 3 {
// 其他所有未匹配的字段都进行通用脱敏
mapValue[subKey] = maskGeneral(subStrValue)
}
}
}
}
}
// 将处理后的map重新序列化为JSON
return json.Marshal(paramsMap)
}
// 判断是否为姓名字段
func isNameField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "name") || strings.Contains(key, "姓名") ||
strings.Contains(key, "owner") || strings.Contains(key, "main")
}
// 判断是否为身份证字段
func isIDCardField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "idcard") || strings.Contains(key, "id_card") ||
strings.Contains(key, "身份证") || strings.Contains(key, "证件号")
}
// 判断是否为手机号字段
func isPhoneField(key string) bool {
key = strings.ToLower(key)
return strings.Contains(key, "phone") || strings.Contains(key, "mobile") ||
strings.Contains(key, "手机") || strings.Contains(key, "电话")
}
// 判断是否包含敏感数据模式
func containsSensitivePattern(value string) bool {
// 检查是否包含连续的数字或字母模式
numPattern := regexp.MustCompile(`\d{6,}`)
return numPattern.MatchString(value)
}
// 姓名脱敏
func maskName(name string) string {
// 将字符串转换为rune切片以正确处理中文字符
runes := []rune(name)
length := len(runes)
if length <= 1 {
return name
}
if length == 2 {
// 两个字:保留第一个字,第二个字用*替代
return string(runes[0]) + "*"
}
// 三个字及以上:保留首尾字,中间用*替代
first := string(runes[0])
last := string(runes[length-1])
mask := strings.Repeat("*", length-2)
return first + mask + last
}
// 身份证号脱敏
func maskIDCard(idCard string) string {
length := len(idCard)
if length <= 10 {
return idCard // 如果长度太短,可能不是身份证,不处理
}
// 保留前3位和后4位
return idCard[:3] + strings.Repeat("*", length-7) + idCard[length-4:]
}
// 手机号脱敏
func maskPhone(phone string) string {
length := len(phone)
if length < 8 {
return phone // 如果长度太短,可能不是手机号,不处理
}
// 保留前3位和后4位
return phone[:3] + strings.Repeat("*", length-7) + phone[length-4:]
}
// 通用敏感信息脱敏 - 根据字符串长度比例进行脱敏
func maskGeneral(value string) string {
length := len(value)
// 小于3个字符的不脱敏
if length <= 3 {
return value
}
// 根据字符串长度计算保留字符数
var prefixLen, suffixLen int
switch {
case length <= 6: // 短字符串
// 保留首尾各1个字符
prefixLen, suffixLen = 1, 1
case length <= 10: // 中等长度字符串
// 保留首部30%和尾部20%的字符
prefixLen = int(float64(length) * 0.3)
suffixLen = int(float64(length) * 0.2)
case length <= 20: // 较长字符串
// 保留首部25%和尾部15%的字符
prefixLen = int(float64(length) * 0.25)
suffixLen = int(float64(length) * 0.15)
default: // 非常长的字符串
// 保留首部20%和尾部10%的字符
prefixLen = int(float64(length) * 0.2)
suffixLen = int(float64(length) * 0.1)
}
// 确保至少有一个字符被保留
if prefixLen < 1 {
prefixLen = 1
}
if suffixLen < 1 {
suffixLen = 1
}
// 确保前缀和后缀总长不超过总长度的80%
if prefixLen+suffixLen > int(float64(length)*0.8) {
// 调整为总长度的80%
totalVisible := int(float64(length) * 0.8)
// 前缀占60%后缀占40%
prefixLen = int(float64(totalVisible) * 0.6)
suffixLen = totalVisible - prefixLen
}
// 创建脱敏后的字符串
prefix := value[:prefixLen]
suffix := value[length-suffixLen:]
masked := strings.Repeat("*", length-prefixLen-suffixLen)
return prefix + masked + suffix
}

View File

@@ -0,0 +1,43 @@
package queue
import (
"context"
"fmt"
"tyass-server/app/main/api/internal/svc"
"tyass-server/app/main/api/internal/types"
"github.com/hibiken/asynq"
)
type CronJob struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCronJob(ctx context.Context, svcCtx *svc.ServiceContext) *CronJob {
return &CronJob{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CronJob) Register() *asynq.ServeMux {
redisClientOpt := asynq.RedisClientOpt{Addr: l.svcCtx.Config.CacheRedis[0].Host, Password: l.svcCtx.Config.CacheRedis[0].Pass}
scheduler := asynq.NewScheduler(redisClientOpt, nil)
// 注册清理查询数据任务
task := asynq.NewTask(types.MsgCleanQueryData, nil, nil)
_, err := scheduler.Register(TASKTIME, task)
if err != nil {
panic(fmt.Sprintf("定时任务注册失败:%v", err))
}
scheduler.Start()
fmt.Println("定时任务启动!!!")
mux := asynq.NewServeMux()
mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx))
mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx))
mux.Handle(types.MsgUnfreezeCommission, NewUnfreezeCommissionHandler(l.svcCtx))
return mux
}

View File

@@ -0,0 +1,159 @@
package queue
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"tyass-server/app/main/api/internal/svc"
"tyass-server/app/main/api/internal/types"
"tyass-server/app/main/model"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
// 定义佣金状态常量
const (
CommissionStatusReleased = 0 // 已发放
CommissionStatusFrozen = 1 // 冻结佣金
)
type UnfreezeCommissionHandler struct {
svcCtx *svc.ServiceContext
}
func NewUnfreezeCommissionHandler(svcCtx *svc.ServiceContext) *UnfreezeCommissionHandler {
return &UnfreezeCommissionHandler{
svcCtx: svcCtx,
}
}
func (l *UnfreezeCommissionHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
now := time.Now()
logx.Infof("%s - 开始执行佣金解冻任务", now.Format("2006-01-02 15:04:05"))
// 解析任务payload获取佣金ID
var payload types.MsgUnfreezeCommissionPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
logx.Errorf("解析佣金解冻任务payload失败: %v", err)
return err
}
commissionID := payload.CommissionID
if commissionID <= 0 {
logx.Errorf("无效的佣金ID: %d", commissionID)
return fmt.Errorf("无效的佣金ID: %d", commissionID)
}
// 根据佣金ID查询特定佣金记录
commission, err := l.svcCtx.AgentCommissionModel.FindOne(ctx, commissionID)
if err != nil {
logx.Errorf("查询佣金记录ID %d 失败: %v", commissionID, err)
return err
}
// 检查佣金状态是否为冻结状态
if commission.Status != CommissionStatusFrozen {
logx.Infof("佣金记录ID %d 状态不是冻结状态,当前状态: %d无需处理", commissionID, commission.Status)
return nil
}
// 使用事务处理解冻操作
err = l.svcCtx.AgentCommissionModel.Trans(ctx, func(ctx context.Context, session sqlx.Session) error {
// 获取代理钱包记录
agentWallet, err := l.svcCtx.AgentWalletModel.FindOneByAgentId(ctx, commission.AgentId)
if err != nil {
logx.Errorf("查询代理ID %d 的钱包记录失败: %v", commission.AgentId, err)
return err
}
// 计算当前佣金在发生退款后的“净佣金金额”
commissionAmount := commission.Amount - commission.RefundedAmount
if commissionAmount <= 0 {
logx.Infof("佣金记录ID %d 已被全部退款或无可解冻金额,跳过解冻", commissionID)
return nil
}
// 更新钱包余额:增加净佣金金额到 balance减少相应的 frozen_balance
agentWallet.Balance += commissionAmount
agentWallet.FrozenBalance -= commissionAmount
agentWallet.UpdateTime = now
// 更新钱包数据库(使用 UpdateWithVersion 保持乐观锁)
updateWalletErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(ctx, session, agentWallet)
if updateWalletErr != nil {
// 如果是版本冲突错误,重新查询最新的数据后重试
if errors.Is(updateWalletErr, model.ErrNoRowsUpdate) {
logx.Infof("代理ID %d 的钱包版本冲突,重新查询最新数据重试", commission.AgentId)
latestWallet, findErr := l.svcCtx.AgentWalletModel.FindOneByAgentId(ctx, commission.AgentId)
if findErr != nil {
logx.Errorf("重新查询代理ID %d 的钱包记录失败: %v", commission.AgentId, findErr)
return findErr
}
// 重新累加金额
latestWallet.Balance += commissionAmount
latestWallet.FrozenBalance -= commissionAmount
latestWallet.UpdateTime = now
retryUpdateErr := l.svcCtx.AgentWalletModel.UpdateWithVersion(ctx, session, latestWallet)
if retryUpdateErr != nil {
logx.Errorf("重试更新代理ID %d 的钱包记录失败: %v", commission.AgentId, retryUpdateErr)
return retryUpdateErr
}
logx.Infof("重试成功已更新代理ID %d 的钱包记录", commission.AgentId)
} else {
logx.Errorf("更新代理ID %d 的钱包记录失败: %v", commission.AgentId, updateWalletErr)
return updateWalletErr
}
}
// 钱包更新成功后,再更新佣金状态为已发放
commission.Status = CommissionStatusReleased
commission.UpdateTime = now
// 更新佣金数据库(使用 UpdateWithVersion 保持乐观锁)
err = l.svcCtx.AgentCommissionModel.UpdateWithVersion(ctx, session, commission)
if err != nil {
// 如果是版本冲突错误,重新查询最新的数据后重试
if errors.Is(err, model.ErrNoRowsUpdate) {
logx.Infof("佣金记录ID %d 版本冲突,重新查询最新数据重试", commissionID)
latestCommission, findErr := l.svcCtx.AgentCommissionModel.FindOne(ctx, commissionID)
if findErr != nil {
logx.Errorf("重新查询佣金记录ID %d 失败: %v", commissionID, findErr)
return findErr
}
// 检查状态是否已被其他操作修改
if latestCommission.Status != CommissionStatusFrozen {
logx.Errorf("佣金记录ID %d 的状态已被其他操作修改,当前状态: %d", commissionID, latestCommission.Status)
return fmt.Errorf("佣金记录状态已被修改")
}
// 重新更新状态
latestCommission.Status = CommissionStatusReleased
latestCommission.UpdateTime = now
retryUpdateErr := l.svcCtx.AgentCommissionModel.UpdateWithVersion(ctx, session, latestCommission)
if retryUpdateErr != nil {
logx.Errorf("重试更新佣金记录ID %d 失败: %v", commissionID, retryUpdateErr)
return retryUpdateErr
}
logx.Infof("重试成功已更新佣金记录ID %d", commissionID)
} else {
logx.Errorf("更新佣金记录ID %d 失败: %v", commissionID, err)
return err
}
}
logx.Infof("成功解冻佣金记录ID %d代理ID %d佣金金额 %.2f,已将佣金金额从冻结余额转移到可用余额",
commissionID, commission.AgentId, commissionAmount)
return nil
})
if err != nil {
logx.Errorf("%s - 佣金解冻任务失败: %v", now.Format("2006-01-02 15:04:05"), err)
return err
}
logx.Infof("%s - 佣金解冻任务完成佣金ID: %d", now.Format("2006-01-02 15:04:05"), commissionID)
return nil
}