335 lines
9.4 KiB
Go
335 lines
9.4 KiB
Go
package entities
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"time"
|
||
|
||
"tyapi-server/internal/infrastructure/task/types"
|
||
)
|
||
|
||
// TaskFactory 任务工厂
|
||
type TaskFactory struct {
|
||
taskManager interface{} // 使用interface{}避免循环导入
|
||
}
|
||
|
||
// NewTaskFactory 创建任务工厂
|
||
func NewTaskFactory() *TaskFactory {
|
||
return &TaskFactory{}
|
||
}
|
||
|
||
// NewTaskFactoryWithManager 创建带管理器的任务工厂
|
||
func NewTaskFactoryWithManager(taskManager interface{}) *TaskFactory {
|
||
return &TaskFactory{
|
||
taskManager: taskManager,
|
||
}
|
||
}
|
||
|
||
// CreateArticlePublishTask 创建文章发布任务
|
||
func (f *TaskFactory) CreateArticlePublishTask(articleID string, publishAt time.Time, userID string) (*AsyncTask, error) {
|
||
// 创建任务实体,ID将由GORM的BeforeCreate钩子自动生成UUID
|
||
task := &AsyncTask{
|
||
Type: string(types.TaskTypeArticlePublish),
|
||
Status: TaskStatusPending,
|
||
ScheduledAt: &publishAt,
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
}
|
||
|
||
// 在payload中添加任务ID(将在保存后更新)
|
||
payloadWithID := map[string]interface{}{
|
||
"article_id": articleID,
|
||
"publish_at": publishAt,
|
||
"user_id": userID,
|
||
}
|
||
|
||
payloadDataWithID, err := json.Marshal(payloadWithID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
task.Payload = string(payloadDataWithID)
|
||
return task, nil
|
||
}
|
||
|
||
// CreateArticleCancelTask 创建文章取消任务
|
||
func (f *TaskFactory) CreateArticleCancelTask(articleID string, userID string) (*AsyncTask, error) {
|
||
// 创建任务实体,ID将由GORM的BeforeCreate钩子自动生成UUID
|
||
task := &AsyncTask{
|
||
Type: string(types.TaskTypeArticleCancel),
|
||
Status: TaskStatusPending,
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
}
|
||
|
||
// 在payload中添加任务数据
|
||
payloadWithID := map[string]interface{}{
|
||
"article_id": articleID,
|
||
"user_id": userID,
|
||
}
|
||
|
||
payloadDataWithID, err := json.Marshal(payloadWithID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
task.Payload = string(payloadDataWithID)
|
||
return task, nil
|
||
}
|
||
|
||
// CreateArticleModifyTask 创建文章修改任务
|
||
func (f *TaskFactory) CreateArticleModifyTask(articleID string, newPublishAt time.Time, userID string) (*AsyncTask, error) {
|
||
// 创建任务实体,ID将由GORM的BeforeCreate钩子自动生成UUID
|
||
task := &AsyncTask{
|
||
Type: string(types.TaskTypeArticleModify),
|
||
Status: TaskStatusPending,
|
||
ScheduledAt: &newPublishAt,
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
}
|
||
|
||
// 在payload中添加任务数据
|
||
payloadWithID := map[string]interface{}{
|
||
"article_id": articleID,
|
||
"new_publish_at": newPublishAt,
|
||
"user_id": userID,
|
||
}
|
||
|
||
payloadDataWithID, err := json.Marshal(payloadWithID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
task.Payload = string(payloadDataWithID)
|
||
return task, nil
|
||
}
|
||
|
||
// CreateApiCallTask 创建API调用任务
|
||
func (f *TaskFactory) CreateApiCallTask(apiCallID string, userID string, productID string, amount string) (*AsyncTask, error) {
|
||
// 创建任务实体,ID将由GORM的BeforeCreate钩子自动生成UUID
|
||
task := &AsyncTask{
|
||
Type: string(types.TaskTypeApiCall),
|
||
Status: TaskStatusPending,
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
}
|
||
|
||
// 在payload中添加任务数据
|
||
payloadWithID := map[string]interface{}{
|
||
"api_call_id": apiCallID,
|
||
"user_id": userID,
|
||
"product_id": productID,
|
||
"amount": amount,
|
||
}
|
||
|
||
payloadDataWithID, err := json.Marshal(payloadWithID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
task.Payload = string(payloadDataWithID)
|
||
return task, nil
|
||
}
|
||
|
||
// CreateDeductionTask 创建扣款任务
|
||
func (f *TaskFactory) CreateDeductionTask(apiCallID string, userID string, productID string, amount string, transactionID string) (*AsyncTask, error) {
|
||
// 创建任务实体,ID将由GORM的BeforeCreate钩子自动生成UUID
|
||
task := &AsyncTask{
|
||
Type: string(types.TaskTypeDeduction),
|
||
Status: TaskStatusPending,
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
}
|
||
|
||
// 在payload中添加任务数据
|
||
payloadWithID := map[string]interface{}{
|
||
"api_call_id": apiCallID,
|
||
"user_id": userID,
|
||
"product_id": productID,
|
||
"amount": amount,
|
||
"transaction_id": transactionID,
|
||
}
|
||
|
||
payloadDataWithID, err := json.Marshal(payloadWithID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
task.Payload = string(payloadDataWithID)
|
||
return task, nil
|
||
}
|
||
|
||
// CreateApiCallLogTask 创建API调用日志任务
|
||
func (f *TaskFactory) CreateApiCallLogTask(transactionID string, userID string, apiName string, productID string) (*AsyncTask, error) {
|
||
// 创建任务实体,ID将由GORM的BeforeCreate钩子自动生成UUID
|
||
task := &AsyncTask{
|
||
Type: string(types.TaskTypeApiLog),
|
||
Status: TaskStatusPending,
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
}
|
||
|
||
// 在payload中添加任务数据
|
||
payloadWithID := map[string]interface{}{
|
||
"transaction_id": transactionID,
|
||
"user_id": userID,
|
||
"api_name": apiName,
|
||
"product_id": productID,
|
||
}
|
||
|
||
payloadDataWithID, err := json.Marshal(payloadWithID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
task.Payload = string(payloadDataWithID)
|
||
return task, nil
|
||
}
|
||
|
||
// CreateUsageStatsTask 创建使用统计任务
|
||
func (f *TaskFactory) CreateUsageStatsTask(subscriptionID string, userID string, productID string, increment int) (*AsyncTask, error) {
|
||
// 创建任务实体,ID将由GORM的BeforeCreate钩子自动生成UUID
|
||
task := &AsyncTask{
|
||
Type: string(types.TaskTypeUsageStats),
|
||
Status: TaskStatusPending,
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
}
|
||
|
||
// 在payload中添加任务数据
|
||
payloadWithID := map[string]interface{}{
|
||
"subscription_id": subscriptionID,
|
||
"user_id": userID,
|
||
"product_id": productID,
|
||
"increment": increment,
|
||
}
|
||
|
||
payloadDataWithID, err := json.Marshal(payloadWithID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
task.Payload = string(payloadDataWithID)
|
||
return task, nil
|
||
}
|
||
|
||
// CreateAndEnqueueArticlePublishTask 创建并入队文章发布任务
|
||
func (f *TaskFactory) CreateAndEnqueueArticlePublishTask(ctx context.Context, articleID string, publishAt time.Time, userID string) error {
|
||
if f.taskManager == nil {
|
||
return fmt.Errorf("TaskManager未初始化")
|
||
}
|
||
|
||
task, err := f.CreateArticlePublishTask(articleID, publishAt, userID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
delay := publishAt.Sub(time.Now())
|
||
if delay < 0 {
|
||
delay = 0
|
||
}
|
||
|
||
// 使用类型断言调用TaskManager方法
|
||
if tm, ok := f.taskManager.(interface {
|
||
CreateAndEnqueueDelayedTask(ctx context.Context, task *AsyncTask, delay time.Duration) error
|
||
}); ok {
|
||
return tm.CreateAndEnqueueDelayedTask(ctx, task, delay)
|
||
}
|
||
|
||
return fmt.Errorf("TaskManager类型不匹配")
|
||
}
|
||
|
||
// CreateAndEnqueueApiLogTask 创建并入队API日志任务
|
||
func (f *TaskFactory) CreateAndEnqueueApiLogTask(ctx context.Context, transactionID string, userID string, apiName string, productID string) error {
|
||
if f.taskManager == nil {
|
||
return fmt.Errorf("TaskManager未初始化")
|
||
}
|
||
|
||
task, err := f.CreateApiCallLogTask(transactionID, userID, apiName, productID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 使用类型断言调用TaskManager方法
|
||
if tm, ok := f.taskManager.(interface {
|
||
CreateAndEnqueueTask(ctx context.Context, task *AsyncTask) error
|
||
}); ok {
|
||
return tm.CreateAndEnqueueTask(ctx, task)
|
||
}
|
||
|
||
return fmt.Errorf("TaskManager类型不匹配")
|
||
}
|
||
|
||
// CreateAndEnqueueApiCallTask 创建并入队API调用任务
|
||
func (f *TaskFactory) CreateAndEnqueueApiCallTask(ctx context.Context, apiCallID string, userID string, productID string, amount string) error {
|
||
if f.taskManager == nil {
|
||
return fmt.Errorf("TaskManager未初始化")
|
||
}
|
||
|
||
task, err := f.CreateApiCallTask(apiCallID, userID, productID, amount)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 使用类型断言调用TaskManager方法
|
||
if tm, ok := f.taskManager.(interface {
|
||
CreateAndEnqueueTask(ctx context.Context, task *AsyncTask) error
|
||
}); ok {
|
||
return tm.CreateAndEnqueueTask(ctx, task)
|
||
}
|
||
|
||
return fmt.Errorf("TaskManager类型不匹配")
|
||
}
|
||
|
||
// CreateAndEnqueueDeductionTask 创建并入队扣款任务
|
||
func (f *TaskFactory) CreateAndEnqueueDeductionTask(ctx context.Context, apiCallID string, userID string, productID string, amount string, transactionID string) error {
|
||
if f.taskManager == nil {
|
||
return fmt.Errorf("TaskManager未初始化")
|
||
}
|
||
|
||
task, err := f.CreateDeductionTask(apiCallID, userID, productID, amount, transactionID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 使用类型断言调用TaskManager方法
|
||
if tm, ok := f.taskManager.(interface {
|
||
CreateAndEnqueueTask(ctx context.Context, task *AsyncTask) error
|
||
}); ok {
|
||
return tm.CreateAndEnqueueTask(ctx, task)
|
||
}
|
||
|
||
return fmt.Errorf("TaskManager类型不匹配")
|
||
}
|
||
|
||
// CreateAndEnqueueUsageStatsTask 创建并入队使用统计任务
|
||
func (f *TaskFactory) CreateAndEnqueueUsageStatsTask(ctx context.Context, subscriptionID string, userID string, productID string, increment int) error {
|
||
if f.taskManager == nil {
|
||
return fmt.Errorf("TaskManager未初始化")
|
||
}
|
||
|
||
task, err := f.CreateUsageStatsTask(subscriptionID, userID, productID, increment)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 使用类型断言调用TaskManager方法
|
||
if tm, ok := f.taskManager.(interface {
|
||
CreateAndEnqueueTask(ctx context.Context, task *AsyncTask) error
|
||
}); ok {
|
||
return tm.CreateAndEnqueueTask(ctx, task)
|
||
}
|
||
|
||
return fmt.Errorf("TaskManager类型不匹配")
|
||
}
|
||
|
||
// generateRandomString 生成随机字符串
|
||
func generateRandomString(length int) string {
|
||
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||
b := make([]byte, length)
|
||
for i := range b {
|
||
b[i] = charset[time.Now().UnixNano()%int64(len(charset))]
|
||
}
|
||
return string(b)
|
||
} |