first commit

This commit is contained in:
2025-01-10 00:09:25 +08:00
commit f54ead0f90
215 changed files with 16058 additions and 0 deletions

View File

@@ -0,0 +1,25 @@
package queue
import (
"context"
"fmt"
"github.com/hibiken/asynq"
"tydata-server/app/user/cmd/api/internal/svc"
)
const TASKTIME = "32 * * * *"
type CleanQueryDataHandler struct {
svcCtx *svc.ServiceContext
}
func NewCleanQueryDataHandler(svcCtx *svc.ServiceContext) *CleanQueryDataHandler {
return &CleanQueryDataHandler{
svcCtx: svcCtx,
}
}
func (l *CleanQueryDataHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
fmt.Println("企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅企鹅-ProcessTask")
return nil
}

View File

@@ -0,0 +1,147 @@
package queue
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
"github.com/zeromicro/go-zero/core/logx"
"tydata-server/app/user/cmd/api/internal/svc"
"tydata-server/app/user/model"
"tydata-server/pkg/lzkit/crypto"
"tydata-server/pkg/lzkit/lzUtils"
)
type PaySuccessNotifyUserHandler struct {
svcCtx *svc.ServiceContext
}
func NewPaySuccessNotifyUserHandler(svcCtx *svc.ServiceContext) *PaySuccessNotifyUserHandler {
return &PaySuccessNotifyUserHandler{
svcCtx: svcCtx,
}
}
var payload struct {
OrderID int64 `json:"order_id"`
}
func (l *PaySuccessNotifyUserHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
// 从任务的负载中解码数据
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("解析任务负载失败: %w", err)
}
order, err := l.svcCtx.OrderModel.FindOne(ctx, payload.OrderID)
if err != nil {
return fmt.Errorf("无效的订单ID: %d, %v", payload.OrderID, err)
}
if order.Status != "paid" {
err = fmt.Errorf("无效的订单: %d", payload.OrderID)
logx.Errorf("处理任务失败,原因: %v", err)
return asynq.SkipRetry
}
product, err := l.svcCtx.ProductModel.FindOne(ctx, order.ProductId)
if err != nil {
return fmt.Errorf("找不到相关产品: orderID: %d, productID: %d", payload.OrderID, order.ProductId)
}
query, findQueryErr := l.svcCtx.QueryModel.FindOneByOrderId(ctx, order.Id)
if findQueryErr != nil {
findQueryErr = fmt.Errorf("获取任务请求参数失败: %v", findQueryErr)
logx.Errorf("处理任务失败,原因: %v", findQueryErr)
return asynq.SkipRetry
}
if query.QueryState != "pending" {
err = fmt.Errorf("查询已处理: %d", query.Id)
logx.Errorf("处理任务失败,原因: %v", err)
return asynq.SkipRetry
}
secretKey := l.svcCtx.Config.Encrypt.SecretKey
key, decodeErr := hex.DecodeString(secretKey)
if decodeErr != nil {
err = fmt.Errorf("获取AES密钥失败: %v", decodeErr)
return l.handleError(ctx, err, order, query)
}
decryptData, aesdecryptErr := crypto.AesDecrypt(query.QueryParams, key)
if aesdecryptErr != nil {
aesdecryptErr = fmt.Errorf("解密响应信息失败: %v", aesdecryptErr)
return l.handleError(ctx, aesdecryptErr, order, query)
}
combinedResponse, err := l.svcCtx.ApiRequestService.ProcessRequests(decryptData, product.Id)
if err != nil {
return l.handleError(ctx, err, order, query)
}
// 加密返回响应
encryptData, aesEncryptErr := crypto.AesEncrypt(combinedResponse, key)
if aesEncryptErr != nil {
err = fmt.Errorf("加密响应信息失败: %v", aesEncryptErr)
return l.handleError(ctx, aesEncryptErr, order, query)
}
query.QueryData = lzUtils.StringToNullString(encryptData)
updateErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateErr != nil {
err = fmt.Errorf("保存响应数据失败: %v", updateErr)
return l.handleError(ctx, updateErr, order, query)
}
query.QueryState = "success"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
updateQueryErr = fmt.Errorf("修改查询状态失败: %v", updateQueryErr)
return l.handleError(ctx, updateQueryErr, order, query)
}
return nil
}
// 定义一个中间件函数
func (l *PaySuccessNotifyUserHandler) handleError(ctx context.Context, err error, order *model.Order, query *model.Query) error {
logx.Errorf("处理任务失败,原因: %v", err)
if order.Status == "paid" && query.QueryState == "pending" {
// 更新查询状态为失败
query.QueryState = "failed"
updateQueryErr := l.svcCtx.QueryModel.UpdateWithVersion(ctx, nil, query)
if updateQueryErr != nil {
logx.Errorf("更新查询状态失败订单ID: %d, 错误: %v", order.Id, updateQueryErr)
return asynq.SkipRetry
}
// 退款
if order.PaymentPlatform == "wechat" {
refundErr := l.svcCtx.WechatPayService.WeChatRefund(ctx, order.OrderNo, order.Amount, order.Amount)
if refundErr != nil {
logx.Error(refundErr)
return asynq.SkipRetry
}
} else {
refund, refundErr := l.svcCtx.AlipayService.AliRefund(ctx, order.OrderNo, order.Amount)
if refundErr != nil {
logx.Error(refundErr)
return asynq.SkipRetry
}
if refund.IsSuccess() {
logx.Errorf("支付宝退款成功, orderID: %d", order.Id)
// 更新订单状态为退款
order.Status = "refunded"
updateOrderErr := l.svcCtx.OrderModel.UpdateWithVersion(ctx, nil, order)
if updateOrderErr != nil {
logx.Errorf("更新订单状态失败订单ID: %d, 错误: %v", order.Id, updateOrderErr)
return fmt.Errorf("更新订单状态失败: %v", updateOrderErr)
}
return asynq.SkipRetry
} else {
logx.Errorf("支付宝退款失败:%v", refundErr)
return asynq.SkipRetry
}
// 直接成功
}
}
return asynq.SkipRetry
}

View File

@@ -0,0 +1,39 @@
package queue
import (
"context"
"fmt"
"github.com/hibiken/asynq"
"tydata-server/app/user/cmd/api/internal/svc"
"tydata-server/app/user/cmd/api/internal/types"
)
type CronJob struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCronJob(ctx context.Context, svcCtx *svc.ServiceContext) *CronJob {
return &CronJob{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CronJob) Register() *asynq.ServeMux {
redisClientOpt := asynq.RedisClientOpt{Addr: l.svcCtx.Config.CacheRedis[0].Host, Password: l.svcCtx.Config.CacheRedis[0].Pass}
scheduler := asynq.NewScheduler(redisClientOpt, nil)
task := asynq.NewTask(types.MsgCleanQueryData, nil, nil)
_, err := scheduler.Register(TASKTIME, task)
if err != nil {
panic(fmt.Sprintf("定时任务注册失败:%v", err))
}
scheduler.Start()
fmt.Println("定时任务启动!!!")
mux := asynq.NewServeMux()
mux.Handle(types.MsgPaySuccessQuery, NewPaySuccessNotifyUserHandler(l.svcCtx))
mux.Handle(types.MsgCleanQueryData, NewCleanQueryDataHandler(l.svcCtx))
return mux
}