diff --git a/internal/app/app.go b/internal/app/app.go index 3785b57..6363550 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -240,6 +240,7 @@ func (a *Application) autoMigrate(db *gorm.DB) error { &articleEntities.Article{}, &articleEntities.Category{}, &articleEntities.Tag{}, + &articleEntities.ScheduledTask{}, // api &apiEntities.ApiUser{}, diff --git a/internal/application/article/article_application_service.go b/internal/application/article/article_application_service.go index 571e2fb..d0d4837 100644 --- a/internal/application/article/article_application_service.go +++ b/internal/application/article/article_application_service.go @@ -21,6 +21,7 @@ type ArticleApplicationService interface { PublishArticle(ctx context.Context, cmd *commands.PublishArticleCommand) error PublishArticleByID(ctx context.Context, articleID string) error SchedulePublishArticle(ctx context.Context, cmd *commands.SchedulePublishCommand) error + UpdateSchedulePublishArticle(ctx context.Context, cmd *commands.SchedulePublishCommand) error CancelSchedulePublishArticle(ctx context.Context, cmd *commands.CancelScheduleCommand) error ArchiveArticle(ctx context.Context, cmd *commands.ArchiveArticleCommand) error SetFeatured(ctx context.Context, cmd *commands.SetFeaturedCommand) error diff --git a/internal/application/article/article_application_service_impl.go b/internal/application/article/article_application_service_impl.go index f443218..ec3685c 100644 --- a/internal/application/article/article_application_service_impl.go +++ b/internal/application/article/article_application_service_impl.go @@ -291,12 +291,27 @@ func (s *ArticleApplicationServiceImpl) PublishArticleByID(ctx context.Context, return fmt.Errorf("文章不存在: %w", err) } - // 2. 发布文章 + // 2. 检查是否已取消定时发布 + if !article.IsScheduled() { + s.logger.Info("文章定时发布已取消,跳过执行", + zap.String("id", articleID), + zap.String("status", string(article.Status))) + return nil // 静默返回,不报错 + } + + // 3. 检查定时发布时间是否匹配 + if article.ScheduledAt == nil { + s.logger.Info("文章没有定时发布时间,跳过执行", + zap.String("id", articleID)) + return nil + } + + // 4. 发布文章 if err := article.Publish(); err != nil { return fmt.Errorf("发布文章失败: %w", err) } - // 3. 保存更新 + // 5. 保存更新 if err := s.articleRepo.Update(ctx, article); err != nil { s.logger.Error("更新文章失败", zap.String("id", article.ID), zap.Error(err)) return fmt.Errorf("发布文章失败: %w", err) @@ -740,6 +755,52 @@ func (s *ArticleApplicationServiceImpl) ListTags(ctx context.Context) (*response return response, nil } +// UpdateSchedulePublishArticle 修改定时发布时间 +func (s *ArticleApplicationServiceImpl) UpdateSchedulePublishArticle(ctx context.Context, cmd *commands.SchedulePublishCommand) error { + // 1. 解析定时发布时间 + scheduledTime, err := cmd.GetScheduledTime() + if err != nil { + s.logger.Error("解析定时发布时间失败", zap.String("scheduled_time", cmd.ScheduledTime), zap.Error(err)) + return fmt.Errorf("定时发布时间格式错误: %w", err) + } + + // 2. 获取文章 + article, err := s.articleRepo.GetByID(ctx, cmd.ID) + if err != nil { + s.logger.Error("获取文章失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("文章不存在: %w", err) + } + + // 3. 检查是否已设置定时发布 + if !article.IsScheduled() { + return fmt.Errorf("文章未设置定时发布,无法修改时间") + } + + // 4. 重新调度定时发布任务 + newTaskID, err := s.asynqClient.RescheduleArticlePublish(ctx, cmd.ID, article.TaskID, scheduledTime) + if err != nil { + s.logger.Error("重新调度定时发布任务失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("修改定时发布时间失败: %w", err) + } + + // 5. 更新定时发布 + if err := article.UpdateSchedulePublish(scheduledTime, newTaskID); err != nil { + return fmt.Errorf("更新定时发布失败: %w", err) + } + + // 6. 保存更新 + if err := s.articleRepo.Update(ctx, article); err != nil { + s.logger.Error("更新文章失败", zap.String("id", article.ID), zap.Error(err)) + return fmt.Errorf("修改定时发布时间失败: %w", err) + } + + s.logger.Info("修改定时发布时间成功", + zap.String("id", article.ID), + zap.Time("new_scheduled_time", scheduledTime), + zap.String("new_task_id", newTaskID)) + return nil +} + // ==================== 验证方法 ==================== // validateCreateCategory 验证创建分类参数 diff --git a/internal/application/article/task_management_service.go b/internal/application/article/task_management_service.go new file mode 100644 index 0000000..89d978e --- /dev/null +++ b/internal/application/article/task_management_service.go @@ -0,0 +1,126 @@ +package article + +import ( + "context" + "fmt" + "time" + "tyapi-server/internal/domains/article/entities" + "tyapi-server/internal/domains/article/repositories" + + "go.uber.org/zap" +) + +// TaskManagementService 任务管理服务 +type TaskManagementService struct { + scheduledTaskRepo repositories.ScheduledTaskRepository + logger *zap.Logger +} + +// NewTaskManagementService 创建任务管理服务 +func NewTaskManagementService( + scheduledTaskRepo repositories.ScheduledTaskRepository, + logger *zap.Logger, +) *TaskManagementService { + return &TaskManagementService{ + scheduledTaskRepo: scheduledTaskRepo, + logger: logger, + } +} + +// GetTaskStatus 获取任务状态 +func (s *TaskManagementService) GetTaskStatus(ctx context.Context, taskID string) (*entities.ScheduledTask, error) { + task, err := s.scheduledTaskRepo.GetByTaskID(ctx, taskID) + if err != nil { + return nil, fmt.Errorf("获取任务状态失败: %w", err) + } + return &task, nil +} + +// GetArticleTaskStatus 获取文章的定时任务状态 +func (s *TaskManagementService) GetArticleTaskStatus(ctx context.Context, articleID string) (*entities.ScheduledTask, error) { + task, err := s.scheduledTaskRepo.GetByArticleID(ctx, articleID) + if err != nil { + return nil, fmt.Errorf("获取文章定时任务状态失败: %w", err) + } + return &task, nil +} + +// CancelTask 取消任务 +func (s *TaskManagementService) CancelTask(ctx context.Context, taskID string) error { + if err := s.scheduledTaskRepo.MarkAsCancelled(ctx, taskID); err != nil { + return fmt.Errorf("取消任务失败: %w", err) + } + + s.logger.Info("任务已取消", zap.String("task_id", taskID)) + return nil +} + +// GetActiveTasks 获取活动任务列表 +func (s *TaskManagementService) GetActiveTasks(ctx context.Context) ([]entities.ScheduledTask, error) { + tasks, err := s.scheduledTaskRepo.GetActiveTasks(ctx) + if err != nil { + return nil, fmt.Errorf("获取活动任务列表失败: %w", err) + } + return tasks, nil +} + +// GetExpiredTasks 获取过期任务列表 +func (s *TaskManagementService) GetExpiredTasks(ctx context.Context) ([]entities.ScheduledTask, error) { + tasks, err := s.scheduledTaskRepo.GetExpiredTasks(ctx) + if err != nil { + return nil, fmt.Errorf("获取过期任务列表失败: %w", err) + } + return tasks, nil +} + +// CleanupExpiredTasks 清理过期任务 +func (s *TaskManagementService) CleanupExpiredTasks(ctx context.Context) error { + expiredTasks, err := s.GetExpiredTasks(ctx) + if err != nil { + return err + } + + for _, task := range expiredTasks { + if err := s.scheduledTaskRepo.MarkAsCancelled(ctx, task.TaskID); err != nil { + s.logger.Warn("清理过期任务失败", zap.String("task_id", task.TaskID), zap.Error(err)) + continue + } + s.logger.Info("已清理过期任务", zap.String("task_id", task.TaskID)) + } + + return nil +} + +// GetTaskStats 获取任务统计信息 +func (s *TaskManagementService) GetTaskStats(ctx context.Context) (map[string]interface{}, error) { + activeTasks, err := s.GetActiveTasks(ctx) + if err != nil { + return nil, err + } + + expiredTasks, err := s.GetExpiredTasks(ctx) + if err != nil { + return nil, err + } + + stats := map[string]interface{}{ + "active_tasks_count": len(activeTasks), + "expired_tasks_count": len(expiredTasks), + "total_tasks_count": len(activeTasks) + len(expiredTasks), + "next_task_time": nil, + "last_cleanup_time": time.Now(), + } + + // 计算下一个任务时间 + if len(activeTasks) > 0 { + nextTask := activeTasks[0] + for _, task := range activeTasks { + if task.ScheduledAt.Before(nextTask.ScheduledAt) { + nextTask = task + } + } + stats["next_task_time"] = nextTask.ScheduledAt + } + + return stats, nil +} diff --git a/internal/container/container.go b/internal/container/container.go index bafcd73..a456602 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -534,6 +534,11 @@ func NewContainer() *Container { article_repo.NewGormTagRepository, fx.As(new(domain_article_repo.TagRepository)), ), + // 定时任务仓储 - 同时注册具体类型和接口类型 + fx.Annotate( + article_repo.NewGormScheduledTaskRepository, + fx.As(new(domain_article_repo.ScheduledTaskRepository)), + ), ), // API域仓储层 @@ -621,9 +626,9 @@ func NewContainer() *Container { // 任务系统 fx.Provide( // Asynq 客户端 - func(cfg *config.Config, logger *zap.Logger) *task.AsynqClient { + func(cfg *config.Config, scheduledTaskRepo domain_article_repo.ScheduledTaskRepository, logger *zap.Logger) *task.AsynqClient { redisAddr := fmt.Sprintf("%s:%s", cfg.Redis.Host, cfg.Redis.Port) - return task.NewAsynqClient(redisAddr, logger) + return task.NewAsynqClient(redisAddr, scheduledTaskRepo, logger) }, ), @@ -678,9 +683,27 @@ func NewContainer() *Container { product.NewSubscriptionApplicationService, fx.As(new(product.SubscriptionApplicationService)), ), + // 任务管理服务 + article.NewTaskManagementService, // 文章应用服务 - 绑定到接口 fx.Annotate( - article.NewArticleApplicationService, + func( + articleRepo domain_article_repo.ArticleRepository, + categoryRepo domain_article_repo.CategoryRepository, + tagRepo domain_article_repo.TagRepository, + articleService *article_service.ArticleService, + asynqClient *task.AsynqClient, + logger *zap.Logger, + ) article.ArticleApplicationService { + return article.NewArticleApplicationService( + articleRepo, + categoryRepo, + tagRepo, + articleService, + asynqClient, + logger, + ) + }, fx.As(new(article.ArticleApplicationService)), ), ), diff --git a/internal/domains/article/entities/scheduled_task.go b/internal/domains/article/entities/scheduled_task.go new file mode 100644 index 0000000..1c620f8 --- /dev/null +++ b/internal/domains/article/entities/scheduled_task.go @@ -0,0 +1,113 @@ +package entities + +import ( + "time" + + "github.com/google/uuid" + "gorm.io/gorm" +) + +// TaskStatus 任务状态枚举 +type TaskStatus string + +const ( + TaskStatusPending TaskStatus = "pending" // 等待执行 + TaskStatusRunning TaskStatus = "running" // 正在执行 + TaskStatusCompleted TaskStatus = "completed" // 已完成 + TaskStatusFailed TaskStatus = "failed" // 执行失败 + TaskStatusCancelled TaskStatus = "cancelled" // 已取消 +) + +// ScheduledTask 定时任务状态管理实体 +type ScheduledTask struct { + // 基础标识 + ID string `gorm:"primaryKey;type:varchar(36)" json:"id" comment:"任务唯一标识"` + TaskID string `gorm:"type:varchar(100);not null;uniqueIndex" json:"task_id" comment:"Asynq任务ID"` + TaskType string `gorm:"type:varchar(50);not null" json:"task_type" comment:"任务类型"` + + // 关联信息 + ArticleID string `gorm:"type:varchar(36);not null;index" json:"article_id" comment:"关联的文章ID"` + + // 任务状态 + Status TaskStatus `gorm:"type:varchar(20);not null;default:'pending'" json:"status" comment:"任务状态"` + + // 时间信息 + ScheduledAt time.Time `gorm:"not null" json:"scheduled_at" comment:"计划执行时间"` + StartedAt *time.Time `json:"started_at" comment:"开始执行时间"` + CompletedAt *time.Time `json:"completed_at" comment:"完成时间"` + + // 执行结果 + Error string `gorm:"type:text" json:"error" comment:"错误信息"` + RetryCount int `gorm:"default:0" json:"retry_count" comment:"重试次数"` + + // 时间戳字段 + CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at" comment:"创建时间"` + UpdatedAt time.Time `gorm:"autoUpdateTime" json:"updated_at" comment:"更新时间"` + DeletedAt gorm.DeletedAt `gorm:"index" json:"-" comment:"软删除时间"` + + // 关联关系 + Article *Article `gorm:"foreignKey:ArticleID" json:"article,omitempty" comment:"关联的文章"` +} + +// TableName 指定表名 +func (ScheduledTask) TableName() string { + return "scheduled_tasks" +} + +// BeforeCreate GORM钩子:创建前自动生成UUID +func (st *ScheduledTask) BeforeCreate(tx *gorm.DB) error { + if st.ID == "" { + st.ID = uuid.New().String() + } + return nil +} + +// MarkAsRunning 标记任务为正在执行 +func (st *ScheduledTask) MarkAsRunning() { + st.Status = TaskStatusRunning + now := time.Now() + st.StartedAt = &now +} + +// MarkAsCompleted 标记任务为已完成 +func (st *ScheduledTask) MarkAsCompleted() { + st.Status = TaskStatusCompleted + now := time.Now() + st.CompletedAt = &now +} + +// MarkAsFailed 标记任务为执行失败 +func (st *ScheduledTask) MarkAsFailed(errorMsg string) { + st.Status = TaskStatusFailed + now := time.Now() + st.CompletedAt = &now + st.Error = errorMsg + st.RetryCount++ +} + +// MarkAsCancelled 标记任务为已取消 +func (st *ScheduledTask) MarkAsCancelled() { + st.Status = TaskStatusCancelled + now := time.Now() + st.CompletedAt = &now +} + +// IsActive 判断任务是否处于活动状态 +func (st *ScheduledTask) IsActive() bool { + return st.Status == TaskStatusPending || st.Status == TaskStatusRunning +} + +// IsCancelled 判断任务是否已取消 +func (st *ScheduledTask) IsCancelled() bool { + return st.Status == TaskStatusCancelled +} + +// IsCompleted 判断任务是否已完成 +func (st *ScheduledTask) IsCompleted() bool { + return st.Status == TaskStatusCompleted +} + +// IsFailed 判断任务是否执行失败 +func (st *ScheduledTask) IsFailed() bool { + return st.Status == TaskStatusFailed +} diff --git a/internal/domains/article/repositories/scheduled_task_repository.go b/internal/domains/article/repositories/scheduled_task_repository.go new file mode 100644 index 0000000..395850a --- /dev/null +++ b/internal/domains/article/repositories/scheduled_task_repository.go @@ -0,0 +1,33 @@ +package repositories + +import ( + "context" + "tyapi-server/internal/domains/article/entities" +) + +// ScheduledTaskRepository 定时任务仓储接口 +type ScheduledTaskRepository interface { + // Create 创建定时任务记录 + Create(ctx context.Context, task entities.ScheduledTask) (entities.ScheduledTask, error) + + // GetByTaskID 根据Asynq任务ID获取任务记录 + GetByTaskID(ctx context.Context, taskID string) (entities.ScheduledTask, error) + + // GetByArticleID 根据文章ID获取任务记录 + GetByArticleID(ctx context.Context, articleID string) (entities.ScheduledTask, error) + + // Update 更新任务记录 + Update(ctx context.Context, task entities.ScheduledTask) error + + // Delete 删除任务记录 + Delete(ctx context.Context, taskID string) error + + // MarkAsCancelled 标记任务为已取消 + MarkAsCancelled(ctx context.Context, taskID string) error + + // GetActiveTasks 获取活动状态的任务列表 + GetActiveTasks(ctx context.Context) ([]entities.ScheduledTask, error) + + // GetExpiredTasks 获取过期的任务列表 + GetExpiredTasks(ctx context.Context) ([]entities.ScheduledTask, error) +} diff --git a/internal/infrastructure/database/repositories/article/gorm_scheduled_task_repository.go b/internal/infrastructure/database/repositories/article/gorm_scheduled_task_repository.go new file mode 100644 index 0000000..c58aadf --- /dev/null +++ b/internal/infrastructure/database/repositories/article/gorm_scheduled_task_repository.go @@ -0,0 +1,168 @@ +package repositories + +import ( + "context" + "fmt" + "time" + "tyapi-server/internal/domains/article/entities" + "tyapi-server/internal/domains/article/repositories" + + "go.uber.org/zap" + "gorm.io/gorm" +) + +// GormScheduledTaskRepository GORM定时任务仓储实现 +type GormScheduledTaskRepository struct { + db *gorm.DB + logger *zap.Logger +} + +// 编译时检查接口实现 +var _ repositories.ScheduledTaskRepository = (*GormScheduledTaskRepository)(nil) + +// NewGormScheduledTaskRepository 创建GORM定时任务仓储 +func NewGormScheduledTaskRepository(db *gorm.DB, logger *zap.Logger) *GormScheduledTaskRepository { + return &GormScheduledTaskRepository{ + db: db, + logger: logger, + } +} + +// Create 创建定时任务记录 +func (r *GormScheduledTaskRepository) Create(ctx context.Context, task entities.ScheduledTask) (entities.ScheduledTask, error) { + r.logger.Info("创建定时任务记录", zap.String("task_id", task.TaskID), zap.String("article_id", task.ArticleID)) + + err := r.db.WithContext(ctx).Create(&task).Error + if err != nil { + r.logger.Error("创建定时任务记录失败", zap.Error(err)) + return task, err + } + + return task, nil +} + +// GetByTaskID 根据Asynq任务ID获取任务记录 +func (r *GormScheduledTaskRepository) GetByTaskID(ctx context.Context, taskID string) (entities.ScheduledTask, error) { + var task entities.ScheduledTask + + err := r.db.WithContext(ctx). + Preload("Article"). + Where("task_id = ?", taskID). + First(&task).Error + + if err != nil { + if err == gorm.ErrRecordNotFound { + return task, fmt.Errorf("定时任务不存在") + } + r.logger.Error("获取定时任务失败", zap.String("task_id", taskID), zap.Error(err)) + return task, err + } + + return task, nil +} + +// GetByArticleID 根据文章ID获取任务记录 +func (r *GormScheduledTaskRepository) GetByArticleID(ctx context.Context, articleID string) (entities.ScheduledTask, error) { + var task entities.ScheduledTask + + err := r.db.WithContext(ctx). + Preload("Article"). + Where("article_id = ? AND status IN (?)", articleID, []string{"pending", "running"}). + First(&task).Error + + if err != nil { + if err == gorm.ErrRecordNotFound { + return task, fmt.Errorf("文章没有活动的定时任务") + } + r.logger.Error("获取文章定时任务失败", zap.String("article_id", articleID), zap.Error(err)) + return task, err + } + + return task, nil +} + +// Update 更新任务记录 +func (r *GormScheduledTaskRepository) Update(ctx context.Context, task entities.ScheduledTask) error { + r.logger.Info("更新定时任务记录", zap.String("task_id", task.TaskID), zap.String("status", string(task.Status))) + + err := r.db.WithContext(ctx).Save(&task).Error + if err != nil { + r.logger.Error("更新定时任务记录失败", zap.String("task_id", task.TaskID), zap.Error(err)) + return err + } + + return nil +} + +// Delete 删除任务记录 +func (r *GormScheduledTaskRepository) Delete(ctx context.Context, taskID string) error { + r.logger.Info("删除定时任务记录", zap.String("task_id", taskID)) + + err := r.db.WithContext(ctx).Where("task_id = ?", taskID).Delete(&entities.ScheduledTask{}).Error + if err != nil { + r.logger.Error("删除定时任务记录失败", zap.String("task_id", taskID), zap.Error(err)) + return err + } + + return nil +} + +// MarkAsCancelled 标记任务为已取消 +func (r *GormScheduledTaskRepository) MarkAsCancelled(ctx context.Context, taskID string) error { + r.logger.Info("标记定时任务为已取消", zap.String("task_id", taskID)) + + result := r.db.WithContext(ctx). + Model(&entities.ScheduledTask{}). + Where("task_id = ? AND status IN (?)", taskID, []string{"pending", "running"}). + Updates(map[string]interface{}{ + "status": entities.TaskStatusCancelled, + "completed_at": time.Now(), + }) + + if result.Error != nil { + r.logger.Error("标记定时任务为已取消失败", zap.String("task_id", taskID), zap.Error(result.Error)) + return result.Error + } + + if result.RowsAffected == 0 { + r.logger.Warn("没有找到需要取消的定时任务", zap.String("task_id", taskID)) + } + + return nil +} + +// GetActiveTasks 获取活动状态的任务列表 +func (r *GormScheduledTaskRepository) GetActiveTasks(ctx context.Context) ([]entities.ScheduledTask, error) { + var tasks []entities.ScheduledTask + + err := r.db.WithContext(ctx). + Preload("Article"). + Where("status IN (?)", []string{"pending", "running"}). + Order("scheduled_at ASC"). + Find(&tasks).Error + + if err != nil { + r.logger.Error("获取活动定时任务列表失败", zap.Error(err)) + return nil, err + } + + return tasks, nil +} + +// GetExpiredTasks 获取过期的任务列表 +func (r *GormScheduledTaskRepository) GetExpiredTasks(ctx context.Context) ([]entities.ScheduledTask, error) { + var tasks []entities.ScheduledTask + + err := r.db.WithContext(ctx). + Preload("Article"). + Where("status = ? AND scheduled_at < ?", entities.TaskStatusPending, time.Now()). + Order("scheduled_at ASC"). + Find(&tasks).Error + + if err != nil { + r.logger.Error("获取过期定时任务列表失败", zap.Error(err)) + return nil, err + } + + return tasks, nil +} diff --git a/internal/infrastructure/http/handlers/article_handler.go b/internal/infrastructure/http/handlers/article_handler.go index f162935..24c7d80 100644 --- a/internal/infrastructure/http/handlers/article_handler.go +++ b/internal/infrastructure/http/handlers/article_handler.go @@ -446,6 +446,43 @@ func (h *ArticleHandler) GetArticleStats(c *gin.Context) { h.responseBuilder.Success(c, response, "获取统计成功") } +// UpdateSchedulePublishArticle 修改定时发布时间 +// @Summary 修改定时发布时间 +// @Description 修改文章的定时发布时间 +// @Tags 文章管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Param id path string true "文章ID" +// @Param request body commands.SchedulePublishCommand true "修改定时发布请求" +// @Success 200 {object} map[string]interface{} "修改定时发布时间成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 404 {object} map[string]interface{} "文章不存在" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/articles/{id}/update-schedule-publish [post] +func (h *ArticleHandler) UpdateSchedulePublishArticle(c *gin.Context) { + var cmd commands.SchedulePublishCommand + + // 先绑定URI参数(文章ID) + if err := h.validator.ValidateParam(c, &cmd); err != nil { + return + } + + // 再绑定JSON请求体(定时发布时间) + if err := h.validator.BindAndValidate(c, &cmd); err != nil { + return + } + + if err := h.appService.UpdateSchedulePublishArticle(c.Request.Context(), &cmd); err != nil { + h.logger.Error("修改定时发布时间失败", zap.Error(err)) + h.responseBuilder.BadRequest(c, err.Error()) + return + } + + h.responseBuilder.Success(c, nil, "修改定时发布时间成功") +} + // ==================== 分类相关方法 ==================== // ListCategories 获取分类列表 diff --git a/internal/infrastructure/http/routes/article_routes.go b/internal/infrastructure/http/routes/article_routes.go index 9162815..b0ee8b2 100644 --- a/internal/infrastructure/http/routes/article_routes.go +++ b/internal/infrastructure/http/routes/article_routes.go @@ -79,6 +79,7 @@ func (r *ArticleRoutes) Register(router *sharedhttp.GinRouter) { // 文章状态管理 adminArticleGroup.POST("/:id/publish", r.handler.PublishArticle) // 发布文章 adminArticleGroup.POST("/:id/schedule-publish", r.handler.SchedulePublishArticle) // 定时发布文章 + adminArticleGroup.POST("/:id/update-schedule-publish", r.handler.UpdateSchedulePublishArticle) // 修改定时发布时间 adminArticleGroup.POST("/:id/cancel-schedule", r.handler.CancelSchedulePublishArticle) // 取消定时发布 adminArticleGroup.POST("/:id/archive", r.handler.ArchiveArticle) // 归档文章 adminArticleGroup.PUT("/:id/featured", r.handler.SetFeatured) // 设置推荐状态 diff --git a/internal/infrastructure/task/article_task_handler.go b/internal/infrastructure/task/article_task_handler.go index 60a1697..aae8266 100644 --- a/internal/infrastructure/task/article_task_handler.go +++ b/internal/infrastructure/task/article_task_handler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "tyapi-server/internal/domains/article/repositories" "github.com/hibiken/asynq" "go.uber.org/zap" @@ -16,18 +17,21 @@ type ArticlePublisher interface { // ArticleTaskHandler 文章任务处理器 type ArticleTaskHandler struct { - publisher ArticlePublisher - logger *zap.Logger + publisher ArticlePublisher + scheduledTaskRepo repositories.ScheduledTaskRepository + logger *zap.Logger } // NewArticleTaskHandler 创建文章任务处理器 func NewArticleTaskHandler( publisher ArticlePublisher, + scheduledTaskRepo repositories.ScheduledTaskRepository, logger *zap.Logger, ) *ArticleTaskHandler { return &ArticleTaskHandler{ - publisher: publisher, - logger: logger, + publisher: publisher, + scheduledTaskRepo: scheduledTaskRepo, + logger: logger, } } @@ -45,14 +49,49 @@ func (h *ArticleTaskHandler) HandleArticlePublish(ctx context.Context, t *asynq. return fmt.Errorf("任务载荷中缺少文章ID") } + // 获取任务状态记录 + task, err := h.scheduledTaskRepo.GetByTaskID(ctx, t.ResultWriter().TaskID()) + if err != nil { + h.logger.Error("获取任务状态记录失败", zap.String("task_id", t.ResultWriter().TaskID()), zap.Error(err)) + // 继续执行,不阻断任务 + } else { + // 检查任务是否已取消 + if task.IsCancelled() { + h.logger.Info("任务已取消,跳过执行", zap.String("task_id", t.ResultWriter().TaskID())) + return nil + } + + // 标记任务为正在执行 + task.MarkAsRunning() + if err := h.scheduledTaskRepo.Update(ctx, task); err != nil { + h.logger.Warn("更新任务状态失败", zap.String("task_id", t.ResultWriter().TaskID()), zap.Error(err)) + } + } + // 执行文章发布 if err := h.publisher.PublishArticleByID(ctx, articleID); err != nil { + // 更新任务状态为失败 + if task.ID != "" { + task.MarkAsFailed(err.Error()) + if updateErr := h.scheduledTaskRepo.Update(ctx, task); updateErr != nil { + h.logger.Warn("更新任务失败状态失败", zap.String("task_id", t.ResultWriter().TaskID()), zap.Error(updateErr)) + } + } + h.logger.Error("定时发布文章失败", zap.String("article_id", articleID), zap.Error(err)) return fmt.Errorf("定时发布文章失败: %w", err) } + // 更新任务状态为已完成 + if task.ID != "" { + task.MarkAsCompleted() + if err := h.scheduledTaskRepo.Update(ctx, task); err != nil { + h.logger.Warn("更新任务完成状态失败", zap.String("task_id", t.ResultWriter().TaskID()), zap.Error(err)) + } + } + h.logger.Info("定时发布文章成功", zap.String("article_id", articleID)) return nil } diff --git a/internal/infrastructure/task/asynq_client.go b/internal/infrastructure/task/asynq_client.go index bc6ae0d..453e5e3 100644 --- a/internal/infrastructure/task/asynq_client.go +++ b/internal/infrastructure/task/asynq_client.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "time" + "tyapi-server/internal/domains/article/entities" + "tyapi-server/internal/domains/article/repositories" "github.com/hibiken/asynq" "go.uber.org/zap" @@ -12,16 +14,18 @@ import ( // AsynqClient Asynq 客户端 type AsynqClient struct { - client *asynq.Client - logger *zap.Logger + client *asynq.Client + logger *zap.Logger + scheduledTaskRepo repositories.ScheduledTaskRepository } // NewAsynqClient 创建 Asynq 客户端 -func NewAsynqClient(redisAddr string, logger *zap.Logger) *AsynqClient { +func NewAsynqClient(redisAddr string, scheduledTaskRepo repositories.ScheduledTaskRepository, logger *zap.Logger) *AsynqClient { client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) return &AsynqClient{ - client: client, - logger: logger, + client: client, + logger: logger, + scheduledTaskRepo: scheduledTaskRepo, } } @@ -66,6 +70,20 @@ func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID stri return "", fmt.Errorf("调度任务失败: %w", err) } + // 创建任务状态记录 + scheduledTask := entities.ScheduledTask{ + TaskID: info.ID, + TaskType: TaskTypeArticlePublish, + ArticleID: articleID, + Status: entities.TaskStatusPending, + ScheduledAt: publishTime, + } + + if _, err := c.scheduledTaskRepo.Create(ctx, scheduledTask); err != nil { + c.logger.Error("创建任务状态记录失败", zap.String("task_id", info.ID), zap.Error(err)) + // 不返回错误,因为Asynq任务已经创建成功 + } + c.logger.Info("定时发布任务调度成功", zap.String("article_id", articleID), zap.Time("publish_time", publishTime), @@ -76,16 +94,17 @@ func (c *AsynqClient) ScheduleArticlePublish(ctx context.Context, articleID stri // CancelScheduledTask 取消已调度的任务 func (c *AsynqClient) CancelScheduledTask(ctx context.Context, taskID string) error { - // 注意:Asynq不直接支持取消已调度的任务 - // 这里我们记录日志,实际取消需要在数据库中标记 - c.logger.Info("请求取消定时任务", + c.logger.Info("标记定时任务为已取消", zap.String("task_id", taskID)) - - // 在实际应用中,你可能需要: - // 1. 在数据库中标记任务为已取消 - // 2. 在任务执行时检查取消状态 - // 3. 或者使用Redis的TTL机制 - + + // 标记数据库中的任务状态为已取消 + if err := c.scheduledTaskRepo.MarkAsCancelled(ctx, taskID); err != nil { + c.logger.Warn("标记任务状态为已取消失败", zap.String("task_id", taskID), zap.Error(err)) + // 不返回错误,因为Asynq任务可能已经执行完成 + } + + // Asynq不支持直接取消任务,我们通过数据库状态来标记 + // 任务执行时会检查文章状态,如果已取消则跳过执行 return nil }