Files
ycc-proxy-server/app/main/api/internal/queue/syncAlipayComplaint.go
2026-01-12 16:43:08 +08:00

131 lines
4.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package queue
import (
"context"
"os"
"time"
"ycc-server/app/main/api/internal/svc"
"github.com/hibiken/asynq"
"github.com/pkg/errors"
"github.com/smartwalle/alipay/v3"
"github.com/zeromicro/go-zero/core/logx"
)
// SyncAlipayComplaintHandler 支付宝投诉同步任务处理器
type SyncAlipayComplaintHandler struct {
svcCtx *svc.ServiceContext
}
func NewSyncAlipayComplaintHandler(svcCtx *svc.ServiceContext) *SyncAlipayComplaintHandler {
return &SyncAlipayComplaintHandler{
svcCtx: svcCtx,
}
}
// ProcessTask 处理投诉同步任务
func (l *SyncAlipayComplaintHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
if os.Getenv("ENV") == "development" {
return nil
}
startTime := time.Now()
logx.Infof("开始同步支付宝投诉数据,当前时间: %v", startTime)
// 1. 查询数据库中最新投诉的投诉时间
latestComplainTime, err := l.svcCtx.AlipayComplaintService.GetLatestComplainTime(ctx)
if err != nil {
logx.Errorf("查询最新投诉时间失败: %v", err)
return errors.Wrapf(err, "查询最新投诉时间失败")
}
// 2. 如果数据库为空查询最近7天的投诉否则查询最新投诉时间之后的投诉
var gmtComplainStart string
if latestComplainTime.IsZero() {
// 数据库为空查询最近7天
sevenDaysAgo := time.Now().AddDate(0, 0, -7)
gmtComplainStart = sevenDaysAgo.Format("2006-01-02 15:04:05")
logx.Infof("数据库为空查询最近7天的投诉起始时间: %s", gmtComplainStart)
} else {
// 查询最新投诉时间之后的投诉往前推1分钟避免边界问题
gmtComplainStart = latestComplainTime.Add(-1 * time.Minute).Format("2006-01-02 15:04:05")
logx.Infof("查询最新投诉时间之后的投诉,起始时间: %s (最新投诉时间: %v)", gmtComplainStart, latestComplainTime)
}
// 3. 分页查询投诉列表
pageNum := int64(1)
pageSize := int64(10)
totalSynced := 0
for {
// 调用支付宝接口查询投诉列表
req := alipay.SecurityRiskComplaintInfoBatchQueryReq{
CurrentPageNum: pageNum,
PageSize: pageSize,
GmtComplaintStart: gmtComplainStart,
GmtComplaintEnd: time.Now().Format("2006-01-02 15:04:05"),
}
resp, err := l.svcCtx.AlipayService.AlipayClient.SecurityRiskComplaintInfoBatchQuery(ctx, req)
if err != nil {
logx.Errorf("查询支付宝投诉列表失败: %v", err)
return errors.Wrapf(err, "查询支付宝投诉列表失败")
}
// 检查响应是否成功
if !resp.IsSuccess() {
logx.Errorf("支付宝返回错误: code=%s, msg=%s", resp.Code, resp.Msg)
return errors.Errorf("支付宝返回错误: %s-%s", resp.Code, resp.Msg)
}
// 如果没有数据,退出循环
if len(resp.ComplaintList) == 0 {
logx.Infof("第 %d 页没有数据,同步完成", pageNum)
break
}
// 4. 保存投诉数据到数据库
synced, err := l.saveComplaints(ctx, resp.ComplaintList)
if err != nil {
logx.Errorf("保存投诉数据失败: %v", err)
return errors.Wrapf(err, "保存投诉数据失败")
}
totalSynced += synced
logx.Infof("第 %d 页同步完成,本页同步 %d 条,累计同步 %d 条", pageNum, synced, totalSynced)
// 5. 检查是否还有下一页
if pageNum >= resp.CurrentPage {
// 当前页已经是最后一页
if len(resp.ComplaintList) < int(pageSize) {
// 当前页数据不足一页,说明没有更多数据了
break
}
}
// 继续查询下一页
pageNum++
}
duration := time.Since(startTime)
logx.Infof("支付宝投诉数据同步完成,共同步 %d 条投诉,耗时: %v", totalSynced, duration)
return nil
}
// saveComplaints 保存投诉列表到数据库
func (l *SyncAlipayComplaintHandler) saveComplaints(ctx context.Context, complaintList []alipay.SecurityRiskComplaintInfo) (int, error) {
synced := 0
for _, complaint := range complaintList {
// 使用 service 保存投诉数据
if err := l.svcCtx.AlipayComplaintService.SaveComplaint(ctx, &complaint); err != nil {
logx.Errorf("保存投诉失败, task_id: %s, error: %v", complaint.TaskId, err)
continue // 继续处理下一条
}
synced++
}
return synced, nil
}