From 89367fb2ee94dbe61fe1cfeb33035b983ee83a86 Mon Sep 17 00:00:00 2001 From: 18278715334 <18278715334@163.com> Date: Sat, 6 Dec 2025 13:56:00 +0800 Subject: [PATCH] 18278715334@163.com --- cmd/worker/main.go | 58 ++- internal/app/app.go | 2 + .../announcement_application_service.go | 30 ++ .../announcement_application_service_impl.go | 484 ++++++++++++++++++ .../dto/commands/announcement_commands.go | 104 ++++ .../dto/queries/announcement_queries.go | 18 + .../dto/responses/announcement_responses.go | 79 +++ internal/container/container.go | 39 ++ .../domains/article/entities/announcement.go | 137 +++++ .../article/repositories/announcement.go | 24 + .../queries/announcement_queries.go | 13 + .../article/services/announcement_service.go | 133 +++++ internal/infrastructure/database/database.go | 2 +- .../article/gorm_announcement_repository.go | 328 ++++++++++++ .../http/handlers/announcement_handler.go | 411 +++++++++++++++ .../http/routes/announcement_routes.go | 73 +++ .../task/entities/task_factory.go | 53 ++ .../task/handlers/article_task_handler.go | 116 ++++- .../implementations/asynq/asynq_worker.go | 14 +- .../task/implementations/task_manager.go | 87 ++-- .../repositories/async_task_repository.go | 48 +- .../infrastructure/task/types/task_types.go | 7 +- 22 files changed, 2186 insertions(+), 74 deletions(-) create mode 100644 internal/application/article/announcement_application_service.go create mode 100644 internal/application/article/announcement_application_service_impl.go create mode 100644 internal/application/article/dto/commands/announcement_commands.go create mode 100644 internal/application/article/dto/queries/announcement_queries.go create mode 100644 internal/application/article/dto/responses/announcement_responses.go create mode 100644 internal/domains/article/entities/announcement.go create mode 100644 internal/domains/article/repositories/announcement.go create mode 100644 internal/domains/article/repositories/queries/announcement_queries.go create mode 100644 internal/domains/article/services/announcement_service.go create mode 100644 internal/infrastructure/database/repositories/article/gorm_announcement_repository.go create mode 100644 internal/infrastructure/http/handlers/announcement_handler.go create mode 100644 internal/infrastructure/http/routes/announcement_routes.go diff --git a/cmd/worker/main.go b/cmd/worker/main.go index a738317..cb1b587 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -19,7 +19,8 @@ import ( ) const ( - TaskTypeArticlePublish = "article:publish" + TaskTypeArticlePublish = "article:publish" + TaskTypeAnnouncementPublish = "announcement_publish" ) func main() { @@ -78,6 +79,9 @@ func main() { mux.HandleFunc(TaskTypeArticlePublish, func(ctx context.Context, t *asynq.Task) error { return handleArticlePublish(ctx, t, db, logger) }) + mux.HandleFunc(TaskTypeAnnouncementPublish, func(ctx context.Context, t *asynq.Task) error { + return handleAnnouncementPublish(ctx, t, db, logger) + }) // 启动 Worker go func() { @@ -135,3 +139,55 @@ func handleArticlePublish(ctx context.Context, t *asynq.Task, db *gorm.DB, logge logger.Info("定时发布文章成功", zap.String("article_id", articleID)) return nil } + +// handleAnnouncementPublish 处理公告定时发布任务 +func handleAnnouncementPublish(ctx context.Context, t *asynq.Task, db *gorm.DB, logger *zap.Logger) error { + var payload map[string]interface{} + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + logger.Error("解析任务载荷失败", zap.Error(err)) + return fmt.Errorf("解析任务载荷失败: %w", err) + } + + announcementID, ok := payload["announcement_id"].(string) + if !ok { + logger.Error("任务载荷中缺少公告ID") + return fmt.Errorf("任务载荷中缺少公告ID") + } + + // 获取公告 + var announcement entities.Announcement + if err := db.WithContext(ctx).First(&announcement, "id = ?", announcementID).Error; err != nil { + logger.Error("获取公告失败", zap.String("announcement_id", announcementID), zap.Error(err)) + return fmt.Errorf("获取公告失败: %w", err) + } + + // 检查是否已取消定时发布 + if !announcement.IsScheduled() { + logger.Info("公告定时发布已取消,跳过执行", + zap.String("announcement_id", announcementID), + zap.String("status", string(announcement.Status))) + return nil // 静默返回,不报错 + } + + // 检查定时发布时间是否匹配 + if announcement.ScheduledAt == nil { + logger.Info("公告没有定时发布时间,跳过执行", + zap.String("announcement_id", announcementID)) + return nil + } + + // 发布公告 + if err := announcement.Publish(); err != nil { + logger.Error("发布公告失败", zap.String("announcement_id", announcementID), zap.Error(err)) + return fmt.Errorf("发布公告失败: %w", err) + } + + // 保存更新 + if err := db.WithContext(ctx).Save(&announcement).Error; err != nil { + logger.Error("保存公告失败", zap.String("announcement_id", announcementID), zap.Error(err)) + return fmt.Errorf("保存公告失败: %w", err) + } + + logger.Info("定时发布公告成功", zap.String("announcement_id", announcementID)) + return nil +} diff --git a/internal/app/app.go b/internal/app/app.go index bb7568e..65b7022 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -245,6 +245,8 @@ func (a *Application) autoMigrate(db *gorm.DB) error { &articleEntities.Category{}, &articleEntities.Tag{}, &articleEntities.ScheduledTask{}, + // 公告 + &articleEntities.Announcement{}, // 统计域 &statisticsEntities.StatisticsMetric{}, diff --git a/internal/application/article/announcement_application_service.go b/internal/application/article/announcement_application_service.go new file mode 100644 index 0000000..519082a --- /dev/null +++ b/internal/application/article/announcement_application_service.go @@ -0,0 +1,30 @@ +package article + +import ( + "context" + "tyapi-server/internal/application/article/dto/commands" + appQueries "tyapi-server/internal/application/article/dto/queries" + "tyapi-server/internal/application/article/dto/responses" +) + +// AnnouncementApplicationService 公告应用服务接口 +type AnnouncementApplicationService interface { + // 公告管理 + CreateAnnouncement(ctx context.Context, cmd *commands.CreateAnnouncementCommand) error + UpdateAnnouncement(ctx context.Context, cmd *commands.UpdateAnnouncementCommand) error + DeleteAnnouncement(ctx context.Context, cmd *commands.DeleteAnnouncementCommand) error + GetAnnouncementByID(ctx context.Context, query *appQueries.GetAnnouncementQuery) (*responses.AnnouncementInfoResponse, error) + ListAnnouncements(ctx context.Context, query *appQueries.ListAnnouncementQuery) (*responses.AnnouncementListResponse, error) + + // 公告状态管理 + PublishAnnouncement(ctx context.Context, cmd *commands.PublishAnnouncementCommand) error + PublishAnnouncementByID(ctx context.Context, announcementID string) error // 通过ID发布公告 (用于定时任务) + WithdrawAnnouncement(ctx context.Context, cmd *commands.WithdrawAnnouncementCommand) error + ArchiveAnnouncement(ctx context.Context, cmd *commands.ArchiveAnnouncementCommand) error + SchedulePublishAnnouncement(ctx context.Context, cmd *commands.SchedulePublishAnnouncementCommand) error + UpdateSchedulePublishAnnouncement(ctx context.Context, cmd *commands.UpdateSchedulePublishAnnouncementCommand) error + CancelSchedulePublishAnnouncement(ctx context.Context, cmd *commands.CancelSchedulePublishAnnouncementCommand) error + + // 统计信息 + GetAnnouncementStats(ctx context.Context) (*responses.AnnouncementStatsResponse, error) +} diff --git a/internal/application/article/announcement_application_service_impl.go b/internal/application/article/announcement_application_service_impl.go new file mode 100644 index 0000000..f349afb --- /dev/null +++ b/internal/application/article/announcement_application_service_impl.go @@ -0,0 +1,484 @@ +package article + +import ( + "context" + "fmt" + "tyapi-server/internal/application/article/dto/commands" + appQueries "tyapi-server/internal/application/article/dto/queries" + "tyapi-server/internal/application/article/dto/responses" + "tyapi-server/internal/domains/article/entities" + "tyapi-server/internal/domains/article/repositories" + repoQueries "tyapi-server/internal/domains/article/repositories/queries" + "tyapi-server/internal/domains/article/services" + task_entities "tyapi-server/internal/infrastructure/task/entities" + task_interfaces "tyapi-server/internal/infrastructure/task/interfaces" + + "go.uber.org/zap" +) + +// AnnouncementApplicationServiceImpl 公告应用服务实现 +type AnnouncementApplicationServiceImpl struct { + announcementRepo repositories.AnnouncementRepository + announcementService *services.AnnouncementService + taskManager task_interfaces.TaskManager + logger *zap.Logger +} + +// NewAnnouncementApplicationService 创建公告应用服务 +func NewAnnouncementApplicationService( + announcementRepo repositories.AnnouncementRepository, + announcementService *services.AnnouncementService, + taskManager task_interfaces.TaskManager, + logger *zap.Logger, +) AnnouncementApplicationService { + return &AnnouncementApplicationServiceImpl{ + announcementRepo: announcementRepo, + announcementService: announcementService, + taskManager: taskManager, + logger: logger, + } +} + +// CreateAnnouncement 创建公告 +func (s *AnnouncementApplicationServiceImpl) CreateAnnouncement(ctx context.Context, cmd *commands.CreateAnnouncementCommand) error { + // 1. 创建公告实体 + announcement := &entities.Announcement{ + Title: cmd.Title, + Content: cmd.Content, + Status: entities.AnnouncementStatusDraft, + } + + // 2. 调用领域服务验证 + if err := s.announcementService.ValidateAnnouncement(announcement); err != nil { + return fmt.Errorf("业务验证失败: %w", err) + } + + // 3. 保存公告 + _, err := s.announcementRepo.Create(ctx, *announcement) + if err != nil { + s.logger.Error("创建公告失败", zap.Error(err)) + return fmt.Errorf("创建公告失败: %w", err) + } + + s.logger.Info("创建公告成功", zap.String("id", announcement.ID), zap.String("title", announcement.Title)) + return nil +} + +// UpdateAnnouncement 更新公告 +func (s *AnnouncementApplicationServiceImpl) UpdateAnnouncement(ctx context.Context, cmd *commands.UpdateAnnouncementCommand) error { + // 1. 获取原公告 + announcement, err := s.announcementRepo.GetByID(ctx, cmd.ID) + if err != nil { + s.logger.Error("获取公告失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("公告不存在: %w", err) + } + + // 2. 检查是否可以编辑 + if err := s.announcementService.CanEdit(&announcement); err != nil { + return fmt.Errorf("公告状态不允许编辑: %w", err) + } + + // 3. 更新字段 + if cmd.Title != "" { + announcement.Title = cmd.Title + } + if cmd.Content != "" { + announcement.Content = cmd.Content + } + + // 4. 验证更新后的公告 + if err := s.announcementService.ValidateAnnouncement(&announcement); err != nil { + return fmt.Errorf("业务验证失败: %w", err) + } + + // 5. 保存更新 + if err := s.announcementRepo.Update(ctx, announcement); err != nil { + s.logger.Error("更新公告失败", zap.String("id", announcement.ID), zap.Error(err)) + return fmt.Errorf("更新公告失败: %w", err) + } + + s.logger.Info("更新公告成功", zap.String("id", announcement.ID)) + return nil +} + +// DeleteAnnouncement 删除公告 +func (s *AnnouncementApplicationServiceImpl) DeleteAnnouncement(ctx context.Context, cmd *commands.DeleteAnnouncementCommand) error { + // 1. 检查公告是否存在 + _, err := s.announcementRepo.GetByID(ctx, cmd.ID) + if err != nil { + s.logger.Error("获取公告失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("公告不存在: %w", err) + } + + // 2. 删除公告 + if err := s.announcementRepo.Delete(ctx, cmd.ID); err != nil { + s.logger.Error("删除公告失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("删除公告失败: %w", err) + } + + s.logger.Info("删除公告成功", zap.String("id", cmd.ID)) + return nil +} + +// GetAnnouncementByID 获取公告详情 +func (s *AnnouncementApplicationServiceImpl) GetAnnouncementByID(ctx context.Context, query *appQueries.GetAnnouncementQuery) (*responses.AnnouncementInfoResponse, error) { + // 1. 获取公告 + announcement, err := s.announcementRepo.GetByID(ctx, query.ID) + if err != nil { + s.logger.Error("获取公告失败", zap.String("id", query.ID), zap.Error(err)) + return nil, fmt.Errorf("公告不存在: %w", err) + } + + // 2. 转换为响应对象 + response := responses.FromAnnouncementEntity(&announcement) + + return response, nil +} + +// ListAnnouncements 获取公告列表 +func (s *AnnouncementApplicationServiceImpl) ListAnnouncements(ctx context.Context, query *appQueries.ListAnnouncementQuery) (*responses.AnnouncementListResponse, error) { + // 1. 构建仓储查询 + repoQuery := &repoQueries.ListAnnouncementQuery{ + Page: query.Page, + PageSize: query.PageSize, + Status: query.Status, + Title: query.Title, + OrderBy: query.OrderBy, + OrderDir: query.OrderDir, + } + + // 2. 调用仓储 + announcements, total, err := s.announcementRepo.ListAnnouncements(ctx, repoQuery) + if err != nil { + s.logger.Error("获取公告列表失败", zap.Error(err)) + return nil, fmt.Errorf("获取公告列表失败: %w", err) + } + + // 3. 转换为响应对象 + items := responses.FromAnnouncementEntityList(announcements) + + response := &responses.AnnouncementListResponse{ + Total: total, + Page: query.Page, + Size: query.PageSize, + Items: items, + } + + s.logger.Info("获取公告列表成功", zap.Int64("total", total)) + return response, nil +} + +// PublishAnnouncement 发布公告 +func (s *AnnouncementApplicationServiceImpl) PublishAnnouncement(ctx context.Context, cmd *commands.PublishAnnouncementCommand) error { + // 1. 获取公告 + announcement, err := s.announcementRepo.GetByID(ctx, cmd.ID) + if err != nil { + s.logger.Error("获取公告失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("公告不存在: %w", err) + } + + // 2. 检查是否可以发布 + if err := s.announcementService.CanPublish(&announcement); err != nil { + return fmt.Errorf("无法发布公告: %w", err) + } + + // 3. 发布公告 + if err := announcement.Publish(); err != nil { + return fmt.Errorf("发布公告失败: %w", err) + } + + // 4. 保存更新 + if err := s.announcementRepo.Update(ctx, announcement); err != nil { + s.logger.Error("更新公告失败", zap.String("id", announcement.ID), zap.Error(err)) + return fmt.Errorf("发布公告失败: %w", err) + } + + s.logger.Info("发布公告成功", zap.String("id", announcement.ID)) + return nil +} + +// PublishAnnouncementByID 通过ID发布公告 (用于定时任务) +func (s *AnnouncementApplicationServiceImpl) PublishAnnouncementByID(ctx context.Context, announcementID string) error { + // 1. 获取公告 + announcement, err := s.announcementRepo.GetByID(ctx, announcementID) + if err != nil { + s.logger.Error("获取公告失败", zap.String("id", announcementID), zap.Error(err)) + return fmt.Errorf("公告不存在: %w", err) + } + + // 2. 检查是否已取消定时发布 + if !announcement.IsScheduled() { + s.logger.Info("公告定时发布已取消,跳过执行", + zap.String("id", announcementID), + zap.String("status", string(announcement.Status))) + return nil // 静默返回,不报错 + } + + // 3. 检查定时发布时间是否匹配 + if announcement.ScheduledAt == nil { + s.logger.Info("公告没有定时发布时间,跳过执行", + zap.String("id", announcementID)) + return nil + } + + // 4. 发布公告 + if err := announcement.Publish(); err != nil { + return fmt.Errorf("发布公告失败: %w", err) + } + + // 5. 保存更新 + if err := s.announcementRepo.Update(ctx, announcement); err != nil { + s.logger.Error("更新公告失败", zap.String("id", announcement.ID), zap.Error(err)) + return fmt.Errorf("发布公告失败: %w", err) + } + + s.logger.Info("定时发布公告成功", zap.String("id", announcement.ID)) + return nil +} + +// WithdrawAnnouncement 撤回公告 +func (s *AnnouncementApplicationServiceImpl) WithdrawAnnouncement(ctx context.Context, cmd *commands.WithdrawAnnouncementCommand) error { + // 1. 获取公告 + announcement, err := s.announcementRepo.GetByID(ctx, cmd.ID) + if err != nil { + s.logger.Error("获取公告失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("公告不存在: %w", err) + } + + // 2. 检查是否可以撤回 + if err := s.announcementService.CanWithdraw(&announcement); err != nil { + return fmt.Errorf("无法撤回公告: %w", err) + } + + // 3. 撤回公告 + if err := announcement.Withdraw(); err != nil { + return fmt.Errorf("撤回公告失败: %w", err) + } + + // 4. 保存更新 + if err := s.announcementRepo.Update(ctx, announcement); err != nil { + s.logger.Error("更新公告失败", zap.String("id", announcement.ID), zap.Error(err)) + return fmt.Errorf("撤回公告失败: %w", err) + } + + s.logger.Info("撤回公告成功", zap.String("id", announcement.ID)) + return nil +} + +// ArchiveAnnouncement 归档公告 +func (s *AnnouncementApplicationServiceImpl) ArchiveAnnouncement(ctx context.Context, cmd *commands.ArchiveAnnouncementCommand) error { + // 1. 获取公告 + announcement, err := s.announcementRepo.GetByID(ctx, cmd.ID) + if err != nil { + s.logger.Error("获取公告失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("公告不存在: %w", err) + } + + // 2. 检查是否可以归档 + if err := s.announcementService.CanArchive(&announcement); err != nil { + return fmt.Errorf("无法归档公告: %w", err) + } + + // 3. 归档公告 + announcement.Status = entities.AnnouncementStatusArchived + + // 4. 保存更新 + if err := s.announcementRepo.Update(ctx, announcement); err != nil { + s.logger.Error("更新公告失败", zap.String("id", announcement.ID), zap.Error(err)) + return fmt.Errorf("归档公告失败: %w", err) + } + + s.logger.Info("归档公告成功", zap.String("id", announcement.ID)) + return nil +} + +// SchedulePublishAnnouncement 定时发布公告 +func (s *AnnouncementApplicationServiceImpl) SchedulePublishAnnouncement(ctx context.Context, cmd *commands.SchedulePublishAnnouncementCommand) error { + // 1. 解析定时发布时间 + scheduledTime, err := cmd.GetScheduledTime() + if err != nil { + s.logger.Error("解析定时发布时间失败", zap.String("scheduled_time", cmd.ScheduledTime), zap.Error(err)) + return fmt.Errorf("定时发布时间格式错误: %w", err) + } + + // 2. 获取公告 + announcement, err := s.announcementRepo.GetByID(ctx, cmd.ID) + if err != nil { + s.logger.Error("获取公告失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("公告不存在: %w", err) + } + + // 3. 检查是否可以定时发布 + if err := s.announcementService.CanSchedulePublish(&announcement, scheduledTime); err != nil { + return fmt.Errorf("无法设置定时发布: %w", err) + } + + // 4. 取消旧任务(如果存在) + if err := s.taskManager.CancelTask(ctx, cmd.ID); err != nil { + s.logger.Warn("取消旧任务失败", zap.String("announcement_id", cmd.ID), zap.Error(err)) + } + + // 5. 创建任务工厂 + taskFactory := task_entities.NewTaskFactoryWithManager(s.taskManager) + + // 6. 创建并异步入队公告发布任务 + if err := taskFactory.CreateAndEnqueueAnnouncementPublishTask( + ctx, + cmd.ID, + scheduledTime, + "system", // 暂时使用系统用户ID + ); err != nil { + s.logger.Error("创建并入队公告发布任务失败", zap.Error(err)) + return fmt.Errorf("创建定时发布任务失败: %w", err) + } + + // 7. 设置定时发布 + if err := announcement.SchedulePublish(scheduledTime); err != nil { + return fmt.Errorf("设置定时发布失败: %w", err) + } + + // 8. 保存更新 + if err := s.announcementRepo.Update(ctx, announcement); err != nil { + s.logger.Error("更新公告失败", zap.String("id", announcement.ID), zap.Error(err)) + return fmt.Errorf("设置定时发布失败: %w", err) + } + + s.logger.Info("设置定时发布成功", zap.String("id", announcement.ID), zap.Time("scheduled_at", scheduledTime)) + return nil +} + +// UpdateSchedulePublishAnnouncement 更新定时发布公告 +func (s *AnnouncementApplicationServiceImpl) UpdateSchedulePublishAnnouncement(ctx context.Context, cmd *commands.UpdateSchedulePublishAnnouncementCommand) error { + // 1. 解析定时发布时间 + scheduledTime, err := cmd.GetScheduledTime() + if err != nil { + s.logger.Error("解析定时发布时间失败", zap.String("scheduled_time", cmd.ScheduledTime), zap.Error(err)) + return fmt.Errorf("定时发布时间格式错误: %w", err) + } + + // 2. 获取公告 + announcement, err := s.announcementRepo.GetByID(ctx, cmd.ID) + if err != nil { + s.logger.Error("获取公告失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("公告不存在: %w", err) + } + + // 3. 检查是否已设置定时发布 + if !announcement.IsScheduled() { + return fmt.Errorf("公告未设置定时发布,无法修改时间") + } + + // 4. 取消旧任务 + if err := s.taskManager.CancelTask(ctx, cmd.ID); err != nil { + s.logger.Warn("取消旧任务失败", zap.String("announcement_id", cmd.ID), zap.Error(err)) + } + + // 5. 创建任务工厂 + taskFactory := task_entities.NewTaskFactoryWithManager(s.taskManager) + + // 6. 创建并异步入队新的公告发布任务 + if err := taskFactory.CreateAndEnqueueAnnouncementPublishTask( + ctx, + cmd.ID, + scheduledTime, + "system", // 暂时使用系统用户ID + ); err != nil { + s.logger.Error("创建并入队公告发布任务失败", zap.Error(err)) + return fmt.Errorf("创建定时发布任务失败: %w", err) + } + + // 7. 更新定时发布时间 + if err := announcement.UpdateSchedulePublish(scheduledTime); err != nil { + return fmt.Errorf("更新定时发布时间失败: %w", err) + } + + // 8. 保存更新 + if err := s.announcementRepo.Update(ctx, announcement); err != nil { + s.logger.Error("更新公告失败", zap.String("id", announcement.ID), zap.Error(err)) + return fmt.Errorf("修改定时发布时间失败: %w", err) + } + + s.logger.Info("修改定时发布时间成功", zap.String("id", announcement.ID), zap.Time("scheduled_at", scheduledTime)) + return nil +} + +// CancelSchedulePublishAnnouncement 取消定时发布公告 +func (s *AnnouncementApplicationServiceImpl) CancelSchedulePublishAnnouncement(ctx context.Context, cmd *commands.CancelSchedulePublishAnnouncementCommand) error { + // 1. 获取公告 + announcement, err := s.announcementRepo.GetByID(ctx, cmd.ID) + if err != nil { + s.logger.Error("获取公告失败", zap.String("id", cmd.ID), zap.Error(err)) + return fmt.Errorf("公告不存在: %w", err) + } + + // 2. 检查是否已设置定时发布 + if !announcement.IsScheduled() { + return fmt.Errorf("公告未设置定时发布,无需取消") + } + + // 3. 取消任务 + if err := s.taskManager.CancelTask(ctx, cmd.ID); err != nil { + s.logger.Warn("取消任务失败", zap.String("announcement_id", cmd.ID), zap.Error(err)) + // 继续执行,即使取消任务失败也尝试取消定时发布状态 + } + + // 4. 取消定时发布 + if err := announcement.CancelSchedulePublish(); err != nil { + return fmt.Errorf("取消定时发布失败: %w", err) + } + + // 5. 保存更新 + if err := s.announcementRepo.Update(ctx, announcement); err != nil { + s.logger.Error("更新公告失败", zap.String("id", announcement.ID), zap.Error(err)) + return fmt.Errorf("取消定时发布失败: %w", err) + } + + s.logger.Info("取消定时发布成功", zap.String("id", announcement.ID)) + return nil +} + +// GetAnnouncementStats 获取公告统计信息 +func (s *AnnouncementApplicationServiceImpl) GetAnnouncementStats(ctx context.Context) (*responses.AnnouncementStatsResponse, error) { + // 1. 统计总数 + total, err := s.announcementRepo.CountByStatus(ctx, entities.AnnouncementStatusDraft) + if err != nil { + s.logger.Error("统计公告总数失败", zap.Error(err)) + return nil, fmt.Errorf("获取统计信息失败: %w", err) + } + + // 2. 统计各状态数量 + published, err := s.announcementRepo.CountByStatus(ctx, entities.AnnouncementStatusPublished) + if err != nil { + s.logger.Error("统计已发布公告数失败", zap.Error(err)) + return nil, fmt.Errorf("获取统计信息失败: %w", err) + } + + draft, err := s.announcementRepo.CountByStatus(ctx, entities.AnnouncementStatusDraft) + if err != nil { + s.logger.Error("统计草稿公告数失败", zap.Error(err)) + return nil, fmt.Errorf("获取统计信息失败: %w", err) + } + + archived, err := s.announcementRepo.CountByStatus(ctx, entities.AnnouncementStatusArchived) + if err != nil { + s.logger.Error("统计归档公告数失败", zap.Error(err)) + return nil, fmt.Errorf("获取统计信息失败: %w", err) + } + + // 3. 统计定时发布数量(需要查询有scheduled_at的草稿) + scheduled, err := s.announcementRepo.FindScheduled(ctx) + if err != nil { + s.logger.Error("统计定时发布公告数失败", zap.Error(err)) + return nil, fmt.Errorf("获取统计信息失败: %w", err) + } + + response := &responses.AnnouncementStatsResponse{ + TotalAnnouncements: total + published + archived, + PublishedAnnouncements: published, + DraftAnnouncements: draft, + ArchivedAnnouncements: archived, + ScheduledAnnouncements: int64(len(scheduled)), + } + + return response, nil +} diff --git a/internal/application/article/dto/commands/announcement_commands.go b/internal/application/article/dto/commands/announcement_commands.go new file mode 100644 index 0000000..a9415a2 --- /dev/null +++ b/internal/application/article/dto/commands/announcement_commands.go @@ -0,0 +1,104 @@ +package commands + +import ( + "fmt" + "time" +) + +// CreateAnnouncementCommand 创建公告命令 +type CreateAnnouncementCommand struct { + Title string `json:"title" binding:"required" comment:"公告标题"` + Content string `json:"content" binding:"required" comment:"公告内容"` +} + +// UpdateAnnouncementCommand 更新公告命令 +type UpdateAnnouncementCommand struct { + ID string `json:"-" uri:"id" binding:"required" comment:"公告ID"` + Title string `json:"title" comment:"公告标题"` + Content string `json:"content" comment:"公告内容"` +} + +// DeleteAnnouncementCommand 删除公告命令 +type DeleteAnnouncementCommand struct { + ID string `json:"-" uri:"id" binding:"required" comment:"公告ID"` +} + +// PublishAnnouncementCommand 发布公告命令 +type PublishAnnouncementCommand struct { + ID string `json:"-" uri:"id" binding:"required" comment:"公告ID"` +} + +// WithdrawAnnouncementCommand 撤回公告命令 +type WithdrawAnnouncementCommand struct { + ID string `json:"-" uri:"id" binding:"required" comment:"公告ID"` +} + +// ArchiveAnnouncementCommand 归档公告命令 +type ArchiveAnnouncementCommand struct { + ID string `json:"-" uri:"id" binding:"required" comment:"公告ID"` +} + +// SchedulePublishAnnouncementCommand 定时发布公告命令 +type SchedulePublishAnnouncementCommand struct { + ID string `json:"-" uri:"id" binding:"required" comment:"公告ID"` + ScheduledTime string `json:"scheduled_time" binding:"required" comment:"定时发布时间"` +} + +// GetScheduledTime 获取解析后的定时发布时间 +func (cmd *SchedulePublishAnnouncementCommand) GetScheduledTime() (time.Time, error) { + // 定义中国东八区时区 + cst := time.FixedZone("CST", 8*3600) + + // 支持多种时间格式 + formats := []string{ + "2006-01-02 15:04:05", // "2025-09-02 14:12:01" + "2006-01-02T15:04:05", // "2025-09-02T14:12:01" + "2006-01-02T15:04:05Z", // "2025-09-02T14:12:01Z" + "2006-01-02 15:04", // "2025-09-02 14:12" + time.RFC3339, // "2025-09-02T14:12:01+08:00" + } + + for _, format := range formats { + if t, err := time.ParseInLocation(format, cmd.ScheduledTime, cst); err == nil { + // 确保返回的时间是东八区时区 + return t.In(cst), nil + } + } + + return time.Time{}, fmt.Errorf("不支持的时间格式: %s,请使用 YYYY-MM-DD HH:mm:ss 格式", cmd.ScheduledTime) +} + +// UpdateSchedulePublishAnnouncementCommand 更新定时发布公告命令 +type UpdateSchedulePublishAnnouncementCommand struct { + ID string `json:"-" uri:"id" binding:"required" comment:"公告ID"` + ScheduledTime string `json:"scheduled_time" binding:"required" comment:"定时发布时间"` +} + +// GetScheduledTime 获取解析后的定时发布时间 +func (cmd *UpdateSchedulePublishAnnouncementCommand) GetScheduledTime() (time.Time, error) { + // 定义中国东八区时区 + cst := time.FixedZone("CST", 8*3600) + + // 支持多种时间格式 + formats := []string{ + "2006-01-02 15:04:05", // "2025-09-02 14:12:01" + "2006-01-02T15:04:05", // "2025-09-02T14:12:01" + "2006-01-02T15:04:05Z", // "2025-09-02T14:12:01Z" + "2006-01-02 15:04", // "2025-09-02 14:12" + time.RFC3339, // "2025-09-02T14:12:01+08:00" + } + + for _, format := range formats { + if t, err := time.ParseInLocation(format, cmd.ScheduledTime, cst); err == nil { + // 确保返回的时间是东八区时区 + return t.In(cst), nil + } + } + + return time.Time{}, fmt.Errorf("不支持的时间格式: %s,请使用 YYYY-MM-DD HH:mm:ss 格式", cmd.ScheduledTime) +} + +// CancelSchedulePublishAnnouncementCommand 取消定时发布公告命令 +type CancelSchedulePublishAnnouncementCommand struct { + ID string `json:"-" uri:"id" binding:"required" comment:"公告ID"` +} diff --git a/internal/application/article/dto/queries/announcement_queries.go b/internal/application/article/dto/queries/announcement_queries.go new file mode 100644 index 0000000..9b22041 --- /dev/null +++ b/internal/application/article/dto/queries/announcement_queries.go @@ -0,0 +1,18 @@ +package queries + +import "tyapi-server/internal/domains/article/entities" + +// ListAnnouncementQuery 公告列表查询 +type ListAnnouncementQuery struct { + Page int `form:"page" binding:"min=1" comment:"页码"` + PageSize int `form:"page_size" binding:"min=1,max=100" comment:"每页数量"` + Status entities.AnnouncementStatus `form:"status" comment:"公告状态"` + Title string `form:"title" comment:"标题关键词"` + OrderBy string `form:"order_by" comment:"排序字段"` + OrderDir string `form:"order_dir" comment:"排序方向"` +} + +// GetAnnouncementQuery 获取公告详情查询 +type GetAnnouncementQuery struct { + ID string `uri:"id" binding:"required" comment:"公告ID"` +} diff --git a/internal/application/article/dto/responses/announcement_responses.go b/internal/application/article/dto/responses/announcement_responses.go new file mode 100644 index 0000000..a40a89f --- /dev/null +++ b/internal/application/article/dto/responses/announcement_responses.go @@ -0,0 +1,79 @@ +package responses + +import ( + "time" + "tyapi-server/internal/domains/article/entities" +) + +// AnnouncementInfoResponse 公告详情响应 +type AnnouncementInfoResponse struct { + ID string `json:"id" comment:"公告ID"` + Title string `json:"title" comment:"公告标题"` + Content string `json:"content" comment:"公告内容"` + Status string `json:"status" comment:"公告状态"` + ScheduledAt *time.Time `json:"scheduled_at" comment:"定时发布时间"` + CreatedAt time.Time `json:"created_at" comment:"创建时间"` + UpdatedAt time.Time `json:"updated_at" comment:"更新时间"` +} + +// AnnouncementListItemResponse 公告列表项响应 +type AnnouncementListItemResponse struct { + ID string `json:"id" comment:"公告ID"` + Title string `json:"title" comment:"公告标题"` + Content string `json:"content" comment:"公告内容"` + Status string `json:"status" comment:"公告状态"` + ScheduledAt *time.Time `json:"scheduled_at" comment:"定时发布时间"` + CreatedAt time.Time `json:"created_at" comment:"创建时间"` + UpdatedAt time.Time `json:"updated_at" comment:"更新时间"` +} + +// AnnouncementListResponse 公告列表响应 +type AnnouncementListResponse struct { + Total int64 `json:"total" comment:"总数"` + Page int `json:"page" comment:"页码"` + Size int `json:"size" comment:"每页数量"` + Items []AnnouncementListItemResponse `json:"items" comment:"公告列表"` +} + +// AnnouncementStatsResponse 公告统计响应 +type AnnouncementStatsResponse struct { + TotalAnnouncements int64 `json:"total_announcements" comment:"公告总数"` + PublishedAnnouncements int64 `json:"published_announcements" comment:"已发布公告数"` + DraftAnnouncements int64 `json:"draft_announcements" comment:"草稿公告数"` + ArchivedAnnouncements int64 `json:"archived_announcements" comment:"归档公告数"` + ScheduledAnnouncements int64 `json:"scheduled_announcements" comment:"定时发布公告数"` +} + +// FromAnnouncementEntity 从公告实体转换为响应对象 +func FromAnnouncementEntity(announcement *entities.Announcement) *AnnouncementInfoResponse { + if announcement == nil { + return nil + } + + return &AnnouncementInfoResponse{ + ID: announcement.ID, + Title: announcement.Title, + Content: announcement.Content, + Status: string(announcement.Status), + ScheduledAt: announcement.ScheduledAt, + CreatedAt: announcement.CreatedAt, + UpdatedAt: announcement.UpdatedAt, + } +} + +// FromAnnouncementEntityList 从公告实体列表转换为列表项响应 +func FromAnnouncementEntityList(announcements []*entities.Announcement) []AnnouncementListItemResponse { + items := make([]AnnouncementListItemResponse, 0, len(announcements)) + for _, announcement := range announcements { + items = append(items, AnnouncementListItemResponse{ + ID: announcement.ID, + Title: announcement.Title, + Content: announcement.Content, + Status: string(announcement.Status), + ScheduledAt: announcement.ScheduledAt, + CreatedAt: announcement.CreatedAt, + UpdatedAt: announcement.UpdatedAt, + }) + } + return items +} diff --git a/internal/container/container.go b/internal/container/container.go index b877863..72a3fc3 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -574,6 +574,11 @@ func NewContainer() *Container { article_repo.NewGormScheduledTaskRepository, fx.As(new(domain_article_repo.ScheduledTaskRepository)), ), + // 公告仓储 - 同时注册具体类型和接口类型 + fx.Annotate( + article_repo.NewGormAnnouncementRepository, + fx.As(new(domain_article_repo.AnnouncementRepository)), + ), ), // API域仓储层 @@ -678,6 +683,8 @@ func NewContainer() *Container { certification_service.NewEnterpriseInfoSubmitRecordService, // 文章领域服务 article_service.NewArticleService, + // 公告领域服务 + article_service.NewAnnouncementService, // 统计领域服务 statistics_service.NewStatisticsAggregateService, statistics_service.NewStatisticsCalculationService, @@ -778,6 +785,7 @@ func NewContainer() *Container { cfg *config.Config, logger *zap.Logger, articleApplicationService article.ArticleApplicationService, + announcementApplicationService article.AnnouncementApplicationService, apiApplicationService api_app.ApiApplicationService, walletService finance_services.WalletAggregateService, subscriptionService *product_services.ProductSubscriptionService, @@ -788,6 +796,7 @@ func NewContainer() *Container { redisAddr, logger, articleApplicationService, + announcementApplicationService, apiApplicationService, walletService, subscriptionService, @@ -944,6 +953,23 @@ func NewContainer() *Container { }, fx.As(new(article.ArticleApplicationService)), ), + // 公告应用服务 - 绑定到接口 + fx.Annotate( + func( + announcementRepo domain_article_repo.AnnouncementRepository, + announcementService *article_service.AnnouncementService, + taskManager task_interfaces.TaskManager, + logger *zap.Logger, + ) article.AnnouncementApplicationService { + return article.NewAnnouncementApplicationService( + announcementRepo, + announcementService, + taskManager, + logger, + ) + }, + fx.As(new(article.AnnouncementApplicationService)), + ), // 统计应用服务 - 绑定到接口 fx.Annotate( func( @@ -1064,6 +1090,15 @@ func NewContainer() *Container { ) *handlers.ArticleHandler { return handlers.NewArticleHandler(appService, responseBuilder, validator, logger) }, + // 公告HTTP处理器 + func( + appService article.AnnouncementApplicationService, + responseBuilder interfaces.ResponseBuilder, + validator interfaces.RequestValidator, + logger *zap.Logger, + ) *handlers.AnnouncementHandler { + return handlers.NewAnnouncementHandler(appService, responseBuilder, validator, logger) + }, ), // 路由注册 @@ -1080,6 +1115,8 @@ func NewContainer() *Container { routes.NewProductAdminRoutes, // 文章路由 routes.NewArticleRoutes, + // 公告路由 + routes.NewAnnouncementRoutes, // API路由 routes.NewApiRoutes, // 统计路由 @@ -1191,6 +1228,7 @@ func RegisterRoutes( productRoutes *routes.ProductRoutes, productAdminRoutes *routes.ProductAdminRoutes, articleRoutes *routes.ArticleRoutes, + announcementRoutes *routes.AnnouncementRoutes, apiRoutes *routes.ApiRoutes, statisticsRoutes *routes.StatisticsRoutes, cfg *config.Config, @@ -1208,6 +1246,7 @@ func RegisterRoutes( productRoutes.Register(router) productAdminRoutes.Register(router) articleRoutes.Register(router) + announcementRoutes.Register(router) statisticsRoutes.Register(router) // 打印注册的路由信息 diff --git a/internal/domains/article/entities/announcement.go b/internal/domains/article/entities/announcement.go new file mode 100644 index 0000000..08609f0 --- /dev/null +++ b/internal/domains/article/entities/announcement.go @@ -0,0 +1,137 @@ +package entities + +import ( + "time" + + "github.com/google/uuid" + "gorm.io/gorm" +) + +// AnnouncementStatus 公告状态枚举 +type AnnouncementStatus string + +const ( + AnnouncementStatusDraft AnnouncementStatus = "draft" // 草稿 + AnnouncementStatusPublished AnnouncementStatus = "published" // 已发布 + AnnouncementStatusArchived AnnouncementStatus = "archived" // 已归档 +) + +// Announcement 公告聚合根 +// 用于对系统公告进行管理,支持发布、撤回、定时发布等功能 +type Announcement struct { + // 基础标识 + ID string `gorm:"primaryKey;type:varchar(36)" json:"id" comment:"公告唯一标识"` + Title string `gorm:"type:varchar(200);not null;index" json:"title" comment:"公告标题"` + Content string `gorm:"type:text;not null" json:"content" comment:"公告内容"` + Status AnnouncementStatus `gorm:"type:varchar(20);not null;default:'draft';index" json:"status" comment:"公告状态"` + ScheduledAt *time.Time `gorm:"index" json:"scheduled_at" comment:"定时发布时间"` + CreatedAt time.Time `gorm:"autoCreateTime;index" json:"created_at" comment:"创建时间"` + UpdatedAt time.Time `gorm:"autoUpdateTime" json:"updated_at" comment:"更新时间"` + DeletedAt gorm.DeletedAt `gorm:"index" json:"-" comment:"软删除时间"` +} + +// TableName 指定表名 +func (Announcement) TableName() string { + return "announcements" +} + +// BeforeCreate GORM钩子:创建前自动生成UUID +func (a *Announcement) BeforeCreate(tx *gorm.DB) error { + if a.ID == "" { + a.ID = uuid.New().String() + } + return nil +} + +// 实现 Entity 接口 - 提供统一的实体管理接口 +// GetID 获取实体唯一标识 +func (a *Announcement) GetID() string { + return a.ID +} + +// GetCreatedAt 获取创建时间 +func (a *Announcement) GetCreatedAt() time.Time { + return a.CreatedAt +} + +// GetUpdatedAt 获取更新时间 +func (a *Announcement) GetUpdatedAt() time.Time { + return a.UpdatedAt +} + +// 验证公告信息 +func (a *Announcement) Validate() error { + if a.Title == "" { + return NewValidationError("公告标题不能为空") + } + if a.Content == "" { + return NewValidationError("公告内容不能为空") + } + return nil +} + +// 发布公告 +func (a *Announcement) Publish() error { + if a.Status == AnnouncementStatusPublished { + return NewValidationError("公告已经是发布状态") + } + a.Status = AnnouncementStatusPublished + now := time.Now() + a.CreatedAt = now + + return nil +} + +// 撤回公告 +func (a *Announcement) Withdraw() error { + if a.Status == AnnouncementStatusDraft { + return NewValidationError("公告已经是草稿状态") + } + a.Status = AnnouncementStatusDraft + now := time.Now() + a.CreatedAt = now + + return nil +} + +// 定时发布公告 +func (a *Announcement) SchedulePublish(scheduledTime time.Time) error { + if a.Status == AnnouncementStatusPublished { + return NewValidationError("公告已经是发布状态") + } + a.Status = AnnouncementStatusDraft // 保持草稿状态,等待定时发布 + a.ScheduledAt = &scheduledTime + + return nil +} + +// 更新定时发布时间 +func (a *Announcement) UpdateSchedulePublish(scheduledTime time.Time) error { + if a.Status == AnnouncementStatusPublished { + return NewValidationError("公告已经是发布状态") + } + if scheduledTime.Before(time.Now()) { + return NewValidationError("定时发布时间不能早于当前时间") + } + a.ScheduledAt = &scheduledTime + return nil +} + +// CancelSchedulePublish 取消定时发布 +func (a *Announcement) CancelSchedulePublish() error { + if a.Status == AnnouncementStatusPublished { + return NewValidationError("公告已经是发布状态") + } + a.ScheduledAt = nil + return nil +} + +// IsScheduled 判断是否已设置定时发布 +func (a *Announcement) IsScheduled() bool { + return a.ScheduledAt != nil && a.Status == AnnouncementStatusDraft +} + +// GetScheduledTime 获取定时发布时间 +func (a *Announcement) GetScheduledTime() *time.Time { + return a.ScheduledAt +} diff --git a/internal/domains/article/repositories/announcement.go b/internal/domains/article/repositories/announcement.go new file mode 100644 index 0000000..8b8da03 --- /dev/null +++ b/internal/domains/article/repositories/announcement.go @@ -0,0 +1,24 @@ +// 存储公告的仓储接口 +package repositories + +import ( + "context" + "tyapi-server/internal/domains/article/entities" + "tyapi-server/internal/domains/article/repositories/queries" + "tyapi-server/internal/shared/interfaces" +) + +// AnnouncementRepository 公告仓储接口 +type AnnouncementRepository interface { + interfaces.Repository[entities.Announcement] + + // 自定义查询方法 + FindByStatus(ctx context.Context, status entities.AnnouncementStatus) ([]*entities.Announcement, error) + FindScheduled(ctx context.Context) ([]*entities.Announcement, error) + ListAnnouncements(ctx context.Context, query *queries.ListAnnouncementQuery) ([]*entities.Announcement, int64, error) + + // 统计方法 + CountByStatus(ctx context.Context, status entities.AnnouncementStatus) (int64, error) + // 更新统计信息 + UpdateStatistics(ctx context.Context, announcementID string) error +} diff --git a/internal/domains/article/repositories/queries/announcement_queries.go b/internal/domains/article/repositories/queries/announcement_queries.go new file mode 100644 index 0000000..c6c8a2b --- /dev/null +++ b/internal/domains/article/repositories/queries/announcement_queries.go @@ -0,0 +1,13 @@ +package queries + +import "tyapi-server/internal/domains/article/entities" + +// ListAnnouncementQuery 公告列表查询 +type ListAnnouncementQuery struct { + Page int `json:"page"` + PageSize int `json:"page_size"` + Status entities.AnnouncementStatus `json:"status"` + Title string `json:"title"` + OrderBy string `json:"order_by"` + OrderDir string `json:"order_dir"` +} diff --git a/internal/domains/article/services/announcement_service.go b/internal/domains/article/services/announcement_service.go new file mode 100644 index 0000000..f4058a2 --- /dev/null +++ b/internal/domains/article/services/announcement_service.go @@ -0,0 +1,133 @@ +package services + +import ( + "tyapi-server/internal/domains/article/entities" +) + +// AnnouncementService 公告领域服务 +// 处理公告相关的业务逻辑,包括验证、状态管理等 +type AnnouncementService struct{} + +// NewAnnouncementService 创建公告领域服务 +func NewAnnouncementService() *AnnouncementService { + return &AnnouncementService{} +} + +// ValidateAnnouncement 验证公告 +// 检查公告是否符合业务规则 +func (s *AnnouncementService) ValidateAnnouncement(announcement *entities.Announcement) error { + // 1. 基础验证 + if err := announcement.Validate(); err != nil { + return err + } + + // 2. 业务规则验证 + // 标题不能包含敏感词 + if s.containsSensitiveWords(announcement.Title) { + return entities.NewValidationError("公告标题包含敏感词") + } + + // 内容不能包含敏感词 + if s.containsSensitiveWords(announcement.Content) { + return entities.NewValidationError("公告内容包含敏感词") + } + + // 标题长度验证 + if len(announcement.Title) > 200 { + return entities.NewValidationError("公告标题不能超过200个字符") + } + + return nil +} + +// CanPublish 检查是否可以发布 +func (s *AnnouncementService) CanPublish(announcement *entities.Announcement) error { + if announcement.Status == entities.AnnouncementStatusPublished { + return entities.NewValidationError("公告已经是发布状态") + } + + if announcement.Status == entities.AnnouncementStatusArchived { + return entities.NewValidationError("已归档的公告不能发布") + } + + // 检查必填字段 + if announcement.Title == "" { + return entities.NewValidationError("公告标题不能为空") + } + if announcement.Content == "" { + return entities.NewValidationError("公告内容不能为空") + } + + return nil +} + +// CanEdit 检查是否可以编辑 +func (s *AnnouncementService) CanEdit(announcement *entities.Announcement) error { + if announcement.Status == entities.AnnouncementStatusPublished { + return entities.NewValidationError("已发布的公告不能编辑,请先撤回") + } + + if announcement.Status == entities.AnnouncementStatusArchived { + return entities.NewValidationError("已归档的公告不能编辑") + } + + return nil +} + +// CanArchive 检查是否可以归档 +func (s *AnnouncementService) CanArchive(announcement *entities.Announcement) error { + if announcement.Status != entities.AnnouncementStatusPublished { + return entities.NewValidationError("只有已发布的公告才能归档") + } + + return nil +} + +// CanWithdraw 检查是否可以撤回 +func (s *AnnouncementService) CanWithdraw(announcement *entities.Announcement) error { + if announcement.Status != entities.AnnouncementStatusPublished { + return entities.NewValidationError("只有已发布的公告才能撤回") + } + + return nil +} + +// CanSchedulePublish 检查是否可以定时发布 +func (s *AnnouncementService) CanSchedulePublish(announcement *entities.Announcement, scheduledTime interface{}) error { + if announcement.Status == entities.AnnouncementStatusPublished { + return entities.NewValidationError("已发布的公告不能设置定时发布") + } + + if announcement.Status == entities.AnnouncementStatusArchived { + return entities.NewValidationError("已归档的公告不能设置定时发布") + } + + return nil +} + +// containsSensitiveWords 检查是否包含敏感词 +func (s *AnnouncementService) containsSensitiveWords(text string) bool { + // TODO: 实现敏感词检查逻辑 + // 这里可以集成敏感词库或调用外部服务 + sensitiveWords := []string{ + "敏感词1", + "敏感词2", + "敏感词3", + } + + for _, word := range sensitiveWords { + if len(word) > 0 && len(text) > 0 { + // 简单的字符串包含检查 + // 实际项目中应该使用更复杂的算法 + if len(text) >= len(word) { + for i := 0; i <= len(text)-len(word); i++ { + if text[i:i+len(word)] == word { + return true + } + } + } + } + } + + return false +} diff --git a/internal/infrastructure/database/database.go b/internal/infrastructure/database/database.go index f3078e0..05d0d1c 100644 --- a/internal/infrastructure/database/database.go +++ b/internal/infrastructure/database/database.go @@ -46,7 +46,7 @@ func NewConnection(config Config) (*DB, error) { NowFunc: func() time.Time { return time.Now().In(time.FixedZone("CST", 8*3600)) // 强制使用北京时间 }, - PrepareStmt: true, + PrepareStmt: true, DisableAutomaticPing: false, } diff --git a/internal/infrastructure/database/repositories/article/gorm_announcement_repository.go b/internal/infrastructure/database/repositories/article/gorm_announcement_repository.go new file mode 100644 index 0000000..20c5513 --- /dev/null +++ b/internal/infrastructure/database/repositories/article/gorm_announcement_repository.go @@ -0,0 +1,328 @@ +package repositories + +import ( + "context" + "fmt" + "strings" + "time" + "tyapi-server/internal/domains/article/entities" + "tyapi-server/internal/domains/article/repositories" + repoQueries "tyapi-server/internal/domains/article/repositories/queries" + "tyapi-server/internal/shared/interfaces" + + "go.uber.org/zap" + "gorm.io/gorm" +) + +// GormAnnouncementRepository GORM公告仓储实现 +type GormAnnouncementRepository struct { + db *gorm.DB + logger *zap.Logger +} + +// 编译时检查接口实现 +var _ repositories.AnnouncementRepository = (*GormAnnouncementRepository)(nil) + +// NewGormAnnouncementRepository 创建GORM公告仓储 +func NewGormAnnouncementRepository(db *gorm.DB, logger *zap.Logger) *GormAnnouncementRepository { + return &GormAnnouncementRepository{ + db: db, + logger: logger, + } +} + +// Create 创建公告 +func (r *GormAnnouncementRepository) Create(ctx context.Context, entity entities.Announcement) (entities.Announcement, error) { + r.logger.Info("创建公告", zap.String("id", entity.ID), zap.String("title", entity.Title)) + + err := r.db.WithContext(ctx).Create(&entity).Error + + if err != nil { + r.logger.Error("创建公告失败", zap.Error(err)) + return entity, err + } + + return entity, nil +} + +// GetByID 根据ID获取公告 +func (r *GormAnnouncementRepository) GetByID(ctx context.Context, id string) (entities.Announcement, error) { + var entity entities.Announcement + + err := r.db.WithContext(ctx). + Where("id = ?", id). + First(&entity).Error + + if err != nil { + if err == gorm.ErrRecordNotFound { + return entity, fmt.Errorf("公告不存在") + } + r.logger.Error("获取公告失败", zap.String("id", id), zap.Error(err)) + return entity, err + } + + return entity, nil +} + +// Update 更新公告 +func (r *GormAnnouncementRepository) Update(ctx context.Context, entity entities.Announcement) error { + r.logger.Info("更新公告", zap.String("id", entity.ID)) + + err := r.db.WithContext(ctx).Save(&entity).Error + if err != nil { + r.logger.Error("更新公告失败", zap.String("id", entity.ID), zap.Error(err)) + return err + } + + return nil +} + +// Delete 删除公告 +func (r *GormAnnouncementRepository) Delete(ctx context.Context, id string) error { + r.logger.Info("删除公告", zap.String("id", id)) + + err := r.db.WithContext(ctx).Delete(&entities.Announcement{}, "id = ?", id).Error + if err != nil { + r.logger.Error("删除公告失败", zap.String("id", id), zap.Error(err)) + return err + } + + return nil +} + +// FindByStatus 根据状态查找公告 +func (r *GormAnnouncementRepository) FindByStatus(ctx context.Context, status entities.AnnouncementStatus) ([]*entities.Announcement, error) { + var announcements []entities.Announcement + + err := r.db.WithContext(ctx). + Where("status = ?", status). + Order("created_at DESC"). + Find(&announcements).Error + + if err != nil { + r.logger.Error("根据状态查找公告失败", zap.String("status", string(status)), zap.Error(err)) + return nil, err + } + + // 转换为指针切片 + result := make([]*entities.Announcement, len(announcements)) + for i := range announcements { + result[i] = &announcements[i] + } + + return result, nil +} + +// FindScheduled 查找定时发布的公告 +func (r *GormAnnouncementRepository) FindScheduled(ctx context.Context) ([]*entities.Announcement, error) { + var announcements []entities.Announcement + now := time.Now() + + err := r.db.WithContext(ctx). + Where("status = ? AND scheduled_at IS NOT NULL AND scheduled_at <= ?", entities.AnnouncementStatusDraft, now). + Order("scheduled_at ASC"). + Find(&announcements).Error + + if err != nil { + r.logger.Error("查找定时发布公告失败", zap.Error(err)) + return nil, err + } + + // 转换为指针切片 + result := make([]*entities.Announcement, len(announcements)) + for i := range announcements { + result[i] = &announcements[i] + } + + return result, nil +} + +// ListAnnouncements 获取公告列表 +func (r *GormAnnouncementRepository) ListAnnouncements(ctx context.Context, query *repoQueries.ListAnnouncementQuery) ([]*entities.Announcement, int64, error) { + var announcements []entities.Announcement + var total int64 + + dbQuery := r.db.WithContext(ctx).Model(&entities.Announcement{}) + + // 应用筛选条件 + if query.Status != "" { + dbQuery = dbQuery.Where("status = ?", query.Status) + } + + if query.Title != "" { + dbQuery = dbQuery.Where("title ILIKE ?", "%"+query.Title+"%") + } + + // 获取总数 + if err := dbQuery.Count(&total).Error; err != nil { + r.logger.Error("获取公告列表总数失败", zap.Error(err)) + return nil, 0, err + } + + // 应用排序 + if query.OrderBy != "" { + orderDir := "DESC" + if query.OrderDir != "" { + orderDir = strings.ToUpper(query.OrderDir) + } + dbQuery = dbQuery.Order(fmt.Sprintf("%s %s", query.OrderBy, orderDir)) + } else { + dbQuery = dbQuery.Order("created_at DESC") + } + + // 应用分页 + if query.Page > 0 && query.PageSize > 0 { + offset := (query.Page - 1) * query.PageSize + dbQuery = dbQuery.Offset(offset).Limit(query.PageSize) + } + + // 获取数据 + if err := dbQuery.Find(&announcements).Error; err != nil { + r.logger.Error("获取公告列表失败", zap.Error(err)) + return nil, 0, err + } + + // 转换为指针切片 + result := make([]*entities.Announcement, len(announcements)) + for i := range announcements { + result[i] = &announcements[i] + } + + return result, total, nil +} + +// CountByStatus 根据状态统计公告数量 +func (r *GormAnnouncementRepository) CountByStatus(ctx context.Context, status entities.AnnouncementStatus) (int64, error) { + var count int64 + + err := r.db.WithContext(ctx).Model(&entities.Announcement{}). + Where("status = ?", status). + Count(&count).Error + + if err != nil { + r.logger.Error("统计公告数量失败", zap.String("status", string(status)), zap.Error(err)) + return 0, err + } + + return count, nil +} + +// UpdateStatistics 更新统计信息 +// 注意:公告实体目前没有统计字段,此方法预留扩展 +func (r *GormAnnouncementRepository) UpdateStatistics(ctx context.Context, announcementID string) error { + r.logger.Info("更新公告统计信息", zap.String("announcement_id", announcementID)) + // TODO: 如果将来需要统计字段(如阅读量等),可以在这里实现 + return nil +} + +// ================ 实现 BaseRepository 接口的其他方法 ================ + +// Count 统计数量 +func (r *GormAnnouncementRepository) Count(ctx context.Context, options interfaces.CountOptions) (int64, error) { + dbQuery := r.db.WithContext(ctx).Model(&entities.Announcement{}) + + // 应用筛选条件 + if options.Filters != nil { + for key, value := range options.Filters { + dbQuery = dbQuery.Where(key+" = ?", value) + } + } + + if options.Search != "" { + search := "%" + options.Search + "%" + dbQuery = dbQuery.Where("title LIKE ? OR content LIKE ?", search, search) + } + + var count int64 + err := dbQuery.Count(&count).Error + return count, err +} + +// Exists 检查是否存在 +func (r *GormAnnouncementRepository) Exists(ctx context.Context, id string) (bool, error) { + var count int64 + err := r.db.WithContext(ctx).Model(&entities.Announcement{}). + Where("id = ?", id). + Count(&count).Error + return count > 0, err +} + +// SoftDelete 软删除 +func (r *GormAnnouncementRepository) SoftDelete(ctx context.Context, id string) error { + return r.db.WithContext(ctx).Delete(&entities.Announcement{}, "id = ?", id).Error +} + +// Restore 恢复软删除 +func (r *GormAnnouncementRepository) Restore(ctx context.Context, id string) error { + return r.db.WithContext(ctx).Unscoped().Model(&entities.Announcement{}). + Where("id = ?", id). + Update("deleted_at", nil).Error +} + +// CreateBatch 批量创建 +func (r *GormAnnouncementRepository) CreateBatch(ctx context.Context, entities []entities.Announcement) error { + return r.db.WithContext(ctx).Create(&entities).Error +} + +// GetByIDs 根据ID列表获取 +func (r *GormAnnouncementRepository) GetByIDs(ctx context.Context, ids []string) ([]entities.Announcement, error) { + var announcements []entities.Announcement + err := r.db.WithContext(ctx).Where("id IN ?", ids).Find(&announcements).Error + return announcements, err +} + +// UpdateBatch 批量更新 +func (r *GormAnnouncementRepository) UpdateBatch(ctx context.Context, entities []entities.Announcement) error { + return r.db.WithContext(ctx).Save(&entities).Error +} + +// DeleteBatch 批量删除 +func (r *GormAnnouncementRepository) DeleteBatch(ctx context.Context, ids []string) error { + return r.db.WithContext(ctx).Delete(&entities.Announcement{}, "id IN ?", ids).Error +} + +// List 列表查询 +func (r *GormAnnouncementRepository) List(ctx context.Context, options interfaces.ListOptions) ([]entities.Announcement, error) { + var announcements []entities.Announcement + + dbQuery := r.db.WithContext(ctx).Model(&entities.Announcement{}) + + // 应用筛选条件 + if options.Filters != nil { + for key, value := range options.Filters { + dbQuery = dbQuery.Where(key+" = ?", value) + } + } + + if options.Search != "" { + search := "%" + options.Search + "%" + dbQuery = dbQuery.Where("title LIKE ? OR content LIKE ?", search, search) + } + + // 应用排序 + if options.Sort != "" { + order := "DESC" + if options.Order != "" { + order = strings.ToUpper(options.Order) + } + dbQuery = dbQuery.Order(fmt.Sprintf("%s %s", options.Sort, order)) + } else { + dbQuery = dbQuery.Order("created_at DESC") + } + + // 应用分页 + if options.Page > 0 && options.PageSize > 0 { + offset := (options.Page - 1) * options.PageSize + dbQuery = dbQuery.Offset(offset).Limit(options.PageSize) + } + + // 预加载关联数据 + if len(options.Include) > 0 { + for _, include := range options.Include { + dbQuery = dbQuery.Preload(include) + } + } + + err := dbQuery.Find(&announcements).Error + return announcements, err +} diff --git a/internal/infrastructure/http/handlers/announcement_handler.go b/internal/infrastructure/http/handlers/announcement_handler.go new file mode 100644 index 0000000..38ff867 --- /dev/null +++ b/internal/infrastructure/http/handlers/announcement_handler.go @@ -0,0 +1,411 @@ +package handlers + +import ( + "tyapi-server/internal/application/article" + "tyapi-server/internal/application/article/dto/commands" + appQueries "tyapi-server/internal/application/article/dto/queries" + "tyapi-server/internal/shared/interfaces" + + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +// AnnouncementHandler 公告HTTP处理器 +type AnnouncementHandler struct { + appService article.AnnouncementApplicationService + responseBuilder interfaces.ResponseBuilder + validator interfaces.RequestValidator + logger *zap.Logger +} + +// NewAnnouncementHandler 创建公告HTTP处理器 +func NewAnnouncementHandler( + appService article.AnnouncementApplicationService, + responseBuilder interfaces.ResponseBuilder, + validator interfaces.RequestValidator, + logger *zap.Logger, +) *AnnouncementHandler { + return &AnnouncementHandler{ + appService: appService, + responseBuilder: responseBuilder, + validator: validator, + logger: logger, + } +} + +// CreateAnnouncement 创建公告 +// @Summary 创建公告 +// @Description 创建新的公告 +// @Tags 公告管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Param request body commands.CreateAnnouncementCommand true "创建公告请求" +// @Success 201 {object} map[string]interface{} "公告创建成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/announcements [post] +func (h *AnnouncementHandler) CreateAnnouncement(c *gin.Context) { + var cmd commands.CreateAnnouncementCommand + if err := h.validator.BindAndValidate(c, &cmd); err != nil { + return + } + + // 验证用户是否已登录 + if _, exists := c.Get("user_id"); !exists { + h.responseBuilder.Unauthorized(c, "用户未登录") + return + } + + if err := h.appService.CreateAnnouncement(c.Request.Context(), &cmd); err != nil { + h.logger.Error("创建公告失败", zap.Error(err)) + h.responseBuilder.BadRequest(c, err.Error()) + return + } + + h.responseBuilder.Created(c, nil, "公告创建成功") +} + +// GetAnnouncementByID 获取公告详情 +// @Summary 获取公告详情 +// @Description 根据ID获取公告详情 +// @Tags 公告管理-用户端 +// @Accept json +// @Produce json +// @Param id path string true "公告ID" +// @Success 200 {object} responses.AnnouncementInfoResponse "获取公告详情成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 404 {object} map[string]interface{} "公告不存在" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/announcements/{id} [get] +func (h *AnnouncementHandler) GetAnnouncementByID(c *gin.Context) { + var query appQueries.GetAnnouncementQuery + + // 绑定URI参数(公告ID) + if err := h.validator.ValidateParam(c, &query); err != nil { + return + } + + response, err := h.appService.GetAnnouncementByID(c.Request.Context(), &query) + if err != nil { + h.logger.Error("获取公告详情失败", zap.Error(err)) + h.responseBuilder.NotFound(c, "公告不存在") + return + } + + h.responseBuilder.Success(c, response, "获取公告详情成功") +} + +// ListAnnouncements 获取公告列表 +// @Summary 获取公告列表 +// @Description 分页获取公告列表,支持多种筛选条件 +// @Tags 公告管理-用户端 +// @Accept json +// @Produce json +// @Param page query int false "页码" default(1) +// @Param page_size query int false "每页数量" default(10) +// @Param status query string false "公告状态" +// @Param title query string false "标题关键词" +// @Param order_by query string false "排序字段" +// @Param order_dir query string false "排序方向" +// @Success 200 {object} responses.AnnouncementListResponse "获取公告列表成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/announcements [get] +func (h *AnnouncementHandler) ListAnnouncements(c *gin.Context) { + var query appQueries.ListAnnouncementQuery + if err := h.validator.ValidateQuery(c, &query); err != nil { + return + } + + // 设置默认值 + if query.Page <= 0 { + query.Page = 1 + } + if query.PageSize <= 0 { + query.PageSize = 10 + } + if query.PageSize > 100 { + query.PageSize = 100 + } + + response, err := h.appService.ListAnnouncements(c.Request.Context(), &query) + if err != nil { + h.logger.Error("获取公告列表失败", zap.Error(err)) + h.responseBuilder.InternalError(c, "获取公告列表失败") + return + } + + h.responseBuilder.Success(c, response, "获取公告列表成功") +} + +// PublishAnnouncement 发布公告 +// @Summary 发布公告 +// @Description 发布指定的公告 +// @Tags 公告管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Param id path string true "公告ID" +// @Success 200 {object} map[string]interface{} "发布成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/announcements/{id}/publish [post] +func (h *AnnouncementHandler) PublishAnnouncement(c *gin.Context) { + var cmd commands.PublishAnnouncementCommand + if err := h.validator.ValidateParam(c, &cmd); err != nil { + return + } + + if err := h.appService.PublishAnnouncement(c.Request.Context(), &cmd); err != nil { + h.logger.Error("发布公告失败", zap.Error(err)) + h.responseBuilder.BadRequest(c, err.Error()) + return + } + + h.responseBuilder.Success(c, nil, "发布成功") +} + +// WithdrawAnnouncement 撤回公告 +// @Summary 撤回公告 +// @Description 撤回已发布的公告 +// @Tags 公告管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Param id path string true "公告ID" +// @Success 200 {object} map[string]interface{} "撤回成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/announcements/{id}/withdraw [post] +func (h *AnnouncementHandler) WithdrawAnnouncement(c *gin.Context) { + var cmd commands.WithdrawAnnouncementCommand + if err := h.validator.ValidateParam(c, &cmd); err != nil { + return + } + + if err := h.appService.WithdrawAnnouncement(c.Request.Context(), &cmd); err != nil { + h.logger.Error("撤回公告失败", zap.Error(err)) + h.responseBuilder.BadRequest(c, err.Error()) + return + } + + h.responseBuilder.Success(c, nil, "撤回成功") +} + +// ArchiveAnnouncement 归档公告 +// @Summary 归档公告 +// @Description 归档指定的公告 +// @Tags 公告管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Param id path string true "公告ID" +// @Success 200 {object} map[string]interface{} "归档成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/announcements/{id}/archive [post] +func (h *AnnouncementHandler) ArchiveAnnouncement(c *gin.Context) { + var cmd commands.ArchiveAnnouncementCommand + if err := h.validator.ValidateParam(c, &cmd); err != nil { + return + } + + if err := h.appService.ArchiveAnnouncement(c.Request.Context(), &cmd); err != nil { + h.logger.Error("归档公告失败", zap.Error(err)) + h.responseBuilder.BadRequest(c, err.Error()) + return + } + + h.responseBuilder.Success(c, nil, "归档成功") +} + +// UpdateAnnouncement 更新公告 +// @Summary 更新公告 +// @Description 更新指定的公告 +// @Tags 公告管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Param id path string true "公告ID" +// @Param request body commands.UpdateAnnouncementCommand true "更新公告请求" +// @Success 200 {object} map[string]interface{} "更新成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/announcements/{id} [put] +func (h *AnnouncementHandler) UpdateAnnouncement(c *gin.Context) { + var cmd commands.UpdateAnnouncementCommand + + // 先绑定URI参数(公告ID) + if err := h.validator.ValidateParam(c, &cmd); err != nil { + return + } + + // 再绑定JSON请求体(公告信息) + if err := h.validator.BindAndValidate(c, &cmd); err != nil { + return + } + + if err := h.appService.UpdateAnnouncement(c.Request.Context(), &cmd); err != nil { + h.logger.Error("更新公告失败", zap.Error(err)) + h.responseBuilder.BadRequest(c, err.Error()) + return + } + + h.responseBuilder.Success(c, nil, "更新成功") +} + +// DeleteAnnouncement 删除公告 +// @Summary 删除公告 +// @Description 删除指定的公告 +// @Tags 公告管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Param id path string true "公告ID" +// @Success 200 {object} map[string]interface{} "删除成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/announcements/{id} [delete] +func (h *AnnouncementHandler) DeleteAnnouncement(c *gin.Context) { + var cmd commands.DeleteAnnouncementCommand + if err := h.validator.ValidateParam(c, &cmd); err != nil { + return + } + + if err := h.appService.DeleteAnnouncement(c.Request.Context(), &cmd); err != nil { + h.logger.Error("删除公告失败", zap.Error(err)) + h.responseBuilder.BadRequest(c, err.Error()) + return + } + + h.responseBuilder.Success(c, nil, "删除成功") +} + +// SchedulePublishAnnouncement 定时发布公告 +// @Summary 定时发布公告 +// @Description 设置公告的定时发布时间 +// @Tags 公告管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Param id path string true "公告ID" +// @Param request body commands.SchedulePublishAnnouncementCommand true "定时发布请求" +// @Success 200 {object} map[string]interface{} "设置成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/announcements/{id}/schedule-publish [post] +func (h *AnnouncementHandler) SchedulePublishAnnouncement(c *gin.Context) { + var cmd commands.SchedulePublishAnnouncementCommand + + // 先绑定URI参数(公告ID) + if err := h.validator.ValidateParam(c, &cmd); err != nil { + return + } + + // 再绑定JSON请求体(定时发布时间) + if err := h.validator.BindAndValidate(c, &cmd); err != nil { + return + } + + if err := h.appService.SchedulePublishAnnouncement(c.Request.Context(), &cmd); err != nil { + h.logger.Error("设置定时发布失败", zap.String("id", cmd.ID), zap.Error(err)) + h.responseBuilder.BadRequest(c, err.Error()) + return + } + + h.responseBuilder.Success(c, nil, "设置成功") +} + +// UpdateSchedulePublishAnnouncement 更新定时发布公告 +// @Summary 更新定时发布公告 +// @Description 修改公告的定时发布时间 +// @Tags 公告管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Param id path string true "公告ID" +// @Param request body commands.UpdateSchedulePublishAnnouncementCommand true "更新定时发布请求" +// @Success 200 {object} map[string]interface{} "更新成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/announcements/{id}/update-schedule-publish [post] +func (h *AnnouncementHandler) UpdateSchedulePublishAnnouncement(c *gin.Context) { + var cmd commands.UpdateSchedulePublishAnnouncementCommand + + // 先绑定URI参数(公告ID) + if err := h.validator.ValidateParam(c, &cmd); err != nil { + return + } + + // 再绑定JSON请求体(定时发布时间) + if err := h.validator.BindAndValidate(c, &cmd); err != nil { + return + } + + if err := h.appService.UpdateSchedulePublishAnnouncement(c.Request.Context(), &cmd); err != nil { + h.logger.Error("更新定时发布时间失败", zap.String("id", cmd.ID), zap.Error(err)) + h.responseBuilder.BadRequest(c, err.Error()) + return + } + + h.responseBuilder.Success(c, nil, "更新成功") +} + +// CancelSchedulePublishAnnouncement 取消定时发布公告 +// @Summary 取消定时发布公告 +// @Description 取消公告的定时发布 +// @Tags 公告管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Param id path string true "公告ID" +// @Success 200 {object} map[string]interface{} "取消成功" +// @Failure 400 {object} map[string]interface{} "请求参数错误" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/announcements/{id}/cancel-schedule [post] +func (h *AnnouncementHandler) CancelSchedulePublishAnnouncement(c *gin.Context) { + var cmd commands.CancelSchedulePublishAnnouncementCommand + if err := h.validator.ValidateParam(c, &cmd); err != nil { + return + } + + if err := h.appService.CancelSchedulePublishAnnouncement(c.Request.Context(), &cmd); err != nil { + h.logger.Error("取消定时发布失败", zap.Error(err)) + h.responseBuilder.BadRequest(c, err.Error()) + return + } + + h.responseBuilder.Success(c, nil, "取消成功") +} + +// GetAnnouncementStats 获取公告统计信息 +// @Summary 获取公告统计信息 +// @Description 获取公告的统计数据 +// @Tags 公告管理-管理端 +// @Accept json +// @Produce json +// @Security Bearer +// @Success 200 {object} responses.AnnouncementStatsResponse "获取统计信息成功" +// @Failure 401 {object} map[string]interface{} "未认证" +// @Failure 500 {object} map[string]interface{} "服务器内部错误" +// @Router /api/v1/admin/announcements/stats [get] +func (h *AnnouncementHandler) GetAnnouncementStats(c *gin.Context) { + response, err := h.appService.GetAnnouncementStats(c.Request.Context()) + if err != nil { + h.logger.Error("获取公告统计信息失败", zap.Error(err)) + h.responseBuilder.InternalError(c, "获取统计信息失败") + return + } + + h.responseBuilder.Success(c, response, "获取统计信息成功") +} diff --git a/internal/infrastructure/http/routes/announcement_routes.go b/internal/infrastructure/http/routes/announcement_routes.go new file mode 100644 index 0000000..2cfe93f --- /dev/null +++ b/internal/infrastructure/http/routes/announcement_routes.go @@ -0,0 +1,73 @@ +package routes + +import ( + "tyapi-server/internal/infrastructure/http/handlers" + sharedhttp "tyapi-server/internal/shared/http" + "tyapi-server/internal/shared/middleware" + + "go.uber.org/zap" +) + +// AnnouncementRoutes 公告路由 +type AnnouncementRoutes struct { + handler *handlers.AnnouncementHandler + auth *middleware.JWTAuthMiddleware + admin *middleware.AdminAuthMiddleware + logger *zap.Logger +} + +// NewAnnouncementRoutes 创建公告路由 +func NewAnnouncementRoutes( + handler *handlers.AnnouncementHandler, + auth *middleware.JWTAuthMiddleware, + admin *middleware.AdminAuthMiddleware, + logger *zap.Logger, +) *AnnouncementRoutes { + return &AnnouncementRoutes{ + handler: handler, + auth: auth, + admin: admin, + logger: logger, + } +} + +// Register 注册路由 +func (r *AnnouncementRoutes) Register(router *sharedhttp.GinRouter) { + engine := router.GetEngine() + + // ==================== 用户端路由 ==================== + // 公告相关路由 - 用户端(只显示已发布的公告) + announcementGroup := engine.Group("/api/v1/announcements") + { + // 公开路由 - 不需要认证 + announcementGroup.GET("/:id", r.handler.GetAnnouncementByID) // 获取公告详情 + announcementGroup.GET("", r.handler.ListAnnouncements) // 获取公告列表 + } + + // ==================== 管理员端路由 ==================== + // 管理员公告管理路由 + adminAnnouncementGroup := engine.Group("/api/v1/admin/announcements") + adminAnnouncementGroup.Use(r.admin.Handle()) + { + // 统计信息 + adminAnnouncementGroup.GET("/stats", r.handler.GetAnnouncementStats) // 获取公告统计 + + // 公告列表查询 + adminAnnouncementGroup.GET("", r.handler.ListAnnouncements) // 获取公告列表(管理员端,包含所有状态) + + // 公告管理 + adminAnnouncementGroup.POST("", r.handler.CreateAnnouncement) // 创建公告 + adminAnnouncementGroup.PUT("/:id", r.handler.UpdateAnnouncement) // 更新公告 + adminAnnouncementGroup.DELETE("/:id", r.handler.DeleteAnnouncement) // 删除公告 + + // 公告状态管理 + adminAnnouncementGroup.POST("/:id/publish", r.handler.PublishAnnouncement) // 发布公告 + adminAnnouncementGroup.POST("/:id/withdraw", r.handler.WithdrawAnnouncement) // 撤回公告 + adminAnnouncementGroup.POST("/:id/archive", r.handler.ArchiveAnnouncement) // 归档公告 + adminAnnouncementGroup.POST("/:id/schedule-publish", r.handler.SchedulePublishAnnouncement) // 定时发布公告 + adminAnnouncementGroup.POST("/:id/update-schedule-publish", r.handler.UpdateSchedulePublishAnnouncement) // 修改定时发布时间 + adminAnnouncementGroup.POST("/:id/cancel-schedule", r.handler.CancelSchedulePublishAnnouncement) // 取消定时发布 + } + + r.logger.Info("公告路由注册完成") +} diff --git a/internal/infrastructure/task/entities/task_factory.go b/internal/infrastructure/task/entities/task_factory.go index 1b70041..47df54e 100644 --- a/internal/infrastructure/task/entities/task_factory.go +++ b/internal/infrastructure/task/entities/task_factory.go @@ -53,6 +53,33 @@ func (f *TaskFactory) CreateArticlePublishTask(articleID string, publishAt time. return task, nil } +// CreateAnnouncementPublishTask 创建公告发布任务 +func (f *TaskFactory) CreateAnnouncementPublishTask(announcementID string, publishAt time.Time, userID string) (*AsyncTask, error) { + // 创建任务实体,ID将由GORM的BeforeCreate钩子自动生成UUID + task := &AsyncTask{ + Type: string(types.TaskTypeAnnouncementPublish), + Status: TaskStatusPending, + ScheduledAt: &publishAt, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + // 在payload中添加任务ID(将在保存后更新) + payloadWithID := map[string]interface{}{ + "announcement_id": announcementID, + "publish_at": publishAt, + "user_id": userID, + } + + payloadDataWithID, err := json.Marshal(payloadWithID) + if err != nil { + return nil, err + } + + task.Payload = string(payloadDataWithID) + return task, nil +} + // CreateArticleCancelTask 创建文章取消任务 func (f *TaskFactory) CreateArticleCancelTask(articleID string, userID string) (*AsyncTask, error) { // 创建任务实体,ID将由GORM的BeforeCreate钩子自动生成UUID @@ -240,6 +267,32 @@ func (f *TaskFactory) CreateAndEnqueueArticlePublishTask(ctx context.Context, ar return fmt.Errorf("TaskManager类型不匹配") } +// CreateAndEnqueueAnnouncementPublishTask 创建并入队公告发布任务 +func (f *TaskFactory) CreateAndEnqueueAnnouncementPublishTask(ctx context.Context, announcementID string, publishAt time.Time, userID string) error { + if f.taskManager == nil { + return fmt.Errorf("TaskManager未初始化") + } + + task, err := f.CreateAnnouncementPublishTask(announcementID, publishAt, userID) + if err != nil { + return err + } + + delay := publishAt.Sub(time.Now()) + if delay < 0 { + delay = 0 + } + + // 使用类型断言调用TaskManager方法 + if tm, ok := f.taskManager.(interface { + CreateAndEnqueueDelayedTask(ctx context.Context, task *AsyncTask, delay time.Duration) error + }); ok { + return tm.CreateAndEnqueueDelayedTask(ctx, task, delay) + } + + return fmt.Errorf("TaskManager类型不匹配") +} + // CreateAndEnqueueApiLogTask 创建并入队API日志任务 func (f *TaskFactory) CreateAndEnqueueApiLogTask(ctx context.Context, transactionID string, userID string, apiName string, productID string) error { if f.taskManager == nil { diff --git a/internal/infrastructure/task/handlers/article_task_handler.go b/internal/infrastructure/task/handlers/article_task_handler.go index 2a49eb1..aa6dbd2 100644 --- a/internal/infrastructure/task/handlers/article_task_handler.go +++ b/internal/infrastructure/task/handlers/article_task_handler.go @@ -17,17 +17,24 @@ import ( // ArticleTaskHandler 文章任务处理器 type ArticleTaskHandler struct { - logger *zap.Logger - articleApplicationService article.ArticleApplicationService - asyncTaskRepo repositories.AsyncTaskRepository + logger *zap.Logger + articleApplicationService article.ArticleApplicationService + announcementApplicationService article.AnnouncementApplicationService + asyncTaskRepo repositories.AsyncTaskRepository } // NewArticleTaskHandler 创建文章任务处理器 -func NewArticleTaskHandler(logger *zap.Logger, articleApplicationService article.ArticleApplicationService, asyncTaskRepo repositories.AsyncTaskRepository) *ArticleTaskHandler { +func NewArticleTaskHandler( + logger *zap.Logger, + articleApplicationService article.ArticleApplicationService, + announcementApplicationService article.AnnouncementApplicationService, + asyncTaskRepo repositories.AsyncTaskRepository, +) *ArticleTaskHandler { return &ArticleTaskHandler{ - logger: logger, - articleApplicationService: articleApplicationService, - asyncTaskRepo: asyncTaskRepo, + logger: logger, + articleApplicationService: articleApplicationService, + announcementApplicationService: announcementApplicationService, + asyncTaskRepo: asyncTaskRepo, } } @@ -112,6 +119,47 @@ func (h *ArticleTaskHandler) HandleArticleModify(ctx context.Context, t *asynq.T return nil } +// HandleAnnouncementPublish 处理公告发布任务 +func (h *ArticleTaskHandler) HandleAnnouncementPublish(ctx context.Context, t *asynq.Task) error { + h.logger.Info("开始处理公告发布任务") + + var payload AnnouncementPublishPayload + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + h.logger.Error("解析公告发布任务载荷失败", zap.Error(err)) + h.updateTaskStatus(ctx, t, "failed", "解析任务载荷失败") + return err + } + + h.logger.Info("处理公告发布任务", + zap.String("announcement_id", payload.AnnouncementID), + zap.Time("publish_at", payload.PublishAt)) + + // 检查任务是否已被取消 + if err := h.checkTaskStatus(ctx, t); err != nil { + h.logger.Info("任务已被取消,跳过执行", zap.String("announcement_id", payload.AnnouncementID)) + return nil // 静默返回,不报错 + } + + // 调用公告应用服务发布公告 + if h.announcementApplicationService != nil { + err := h.announcementApplicationService.PublishAnnouncementByID(ctx, payload.AnnouncementID) + if err != nil { + h.logger.Error("公告发布失败", zap.String("announcement_id", payload.AnnouncementID), zap.Error(err)) + h.updateTaskStatus(ctx, t, "failed", "公告发布失败: "+err.Error()) + return err + } + } else { + h.logger.Warn("公告应用服务未初始化,跳过发布", zap.String("announcement_id", payload.AnnouncementID)) + h.updateTaskStatus(ctx, t, "failed", "公告应用服务未初始化") + return nil + } + + // 更新任务状态为成功 + h.updateTaskStatus(ctx, t, "completed", "") + h.logger.Info("公告发布任务处理完成", zap.String("announcement_id", payload.AnnouncementID)) + return nil +} + // ArticlePublishPayload 文章发布任务载荷 type ArticlePublishPayload struct { ArticleID string `json:"article_id"` @@ -157,9 +205,9 @@ func (p *ArticleCancelPayload) FromJSON(data []byte) error { // ArticleModifyPayload 文章修改任务载荷 type ArticleModifyPayload struct { - ArticleID string `json:"article_id"` - NewPublishAt time.Time `json:"new_publish_at"` - UserID string `json:"user_id"` + ArticleID string `json:"article_id"` + NewPublishAt time.Time `json:"new_publish_at"` + UserID string `json:"user_id"` } // GetType 获取任务类型 @@ -177,6 +225,28 @@ func (p *ArticleModifyPayload) FromJSON(data []byte) error { return json.Unmarshal(data, p) } +// AnnouncementPublishPayload 公告发布任务载荷 +type AnnouncementPublishPayload struct { + AnnouncementID string `json:"announcement_id"` + PublishAt time.Time `json:"publish_at"` + UserID string `json:"user_id"` +} + +// GetType 获取任务类型 +func (p *AnnouncementPublishPayload) GetType() types.TaskType { + return types.TaskTypeAnnouncementPublish +} + +// ToJSON 序列化为JSON +func (p *AnnouncementPublishPayload) ToJSON() ([]byte, error) { + return json.Marshal(p) +} + +// FromJSON 从JSON反序列化 +func (p *AnnouncementPublishPayload) FromJSON(data []byte) error { + return json.Unmarshal(data, p) +} + // updateTaskStatus 更新任务状态 func (h *ArticleTaskHandler) updateTaskStatus(ctx context.Context, t *asynq.Task, status string, errorMsg string) { // 从任务载荷中提取任务ID @@ -189,9 +259,11 @@ func (h *ArticleTaskHandler) updateTaskStatus(ctx context.Context, t *asynq.Task // 尝试从payload中获取任务ID taskID, ok := payload["task_id"].(string) if !ok { - // 如果没有task_id,尝试从article_id生成 + // 如果没有task_id,尝试从article_id或announcement_id生成 if articleID, ok := payload["article_id"].(string); ok { taskID = fmt.Sprintf("article-publish-%s", articleID) + } else if announcementID, ok := payload["announcement_id"].(string); ok { + taskID = fmt.Sprintf("announcement-publish-%s", announcementID) } else { h.logger.Error("无法从任务载荷中获取任务ID") return @@ -205,7 +277,7 @@ func (h *ArticleTaskHandler) updateTaskStatus(ctx context.Context, t *asynq.Task } else if status == "completed" { // 成功时:清除错误信息并更新状态 if err := h.asyncTaskRepo.UpdateStatusWithSuccess(ctx, taskID, entities.TaskStatus(status)); err != nil { - h.logger.Error("更新任务状态失败", + h.logger.Error("更新任务状态失败", zap.String("task_id", taskID), zap.String("status", status), zap.Error(err)) @@ -213,14 +285,14 @@ func (h *ArticleTaskHandler) updateTaskStatus(ctx context.Context, t *asynq.Task } else { // 其他状态:只更新状态 if err := h.asyncTaskRepo.UpdateStatus(ctx, taskID, entities.TaskStatus(status)); err != nil { - h.logger.Error("更新任务状态失败", + h.logger.Error("更新任务状态失败", zap.String("task_id", taskID), zap.String("status", status), zap.Error(err)) } } - h.logger.Info("任务状态已更新", + h.logger.Info("任务状态已更新", zap.String("task_id", taskID), zap.String("status", status), zap.String("error_msg", errorMsg)) @@ -237,29 +309,29 @@ func (h *ArticleTaskHandler) handleTaskFailure(ctx context.Context, taskID strin // 增加重试次数 newRetryCount := task.RetryCount + 1 - + // 检查是否达到最大重试次数 if newRetryCount >= task.MaxRetries { // 达到最大重试次数,标记为最终失败 if err := h.asyncTaskRepo.UpdateStatusWithRetryAndError(ctx, taskID, entities.TaskStatusFailed, errorMsg); err != nil { - h.logger.Error("更新任务状态失败", + h.logger.Error("更新任务状态失败", zap.String("task_id", taskID), zap.String("status", "failed"), zap.Error(err)) } - h.logger.Info("任务最终失败,已达到最大重试次数", + h.logger.Info("任务最终失败,已达到最大重试次数", zap.String("task_id", taskID), zap.Int("retry_count", newRetryCount), zap.Int("max_retries", task.MaxRetries)) } else { // 未达到最大重试次数,保持pending状态,记录错误信息 if err := h.asyncTaskRepo.UpdateRetryCountAndError(ctx, taskID, newRetryCount, errorMsg); err != nil { - h.logger.Error("更新任务重试次数失败", + h.logger.Error("更新任务重试次数失败", zap.String("task_id", taskID), zap.Int("retry_count", newRetryCount), zap.Error(err)) } - h.logger.Info("任务失败,准备重试", + h.logger.Info("任务失败,准备重试", zap.String("task_id", taskID), zap.Int("retry_count", newRetryCount), zap.Int("max_retries", task.MaxRetries)) @@ -278,9 +350,11 @@ func (h *ArticleTaskHandler) checkTaskStatus(ctx context.Context, t *asynq.Task) // 尝试从payload中获取任务ID taskID, ok := payload["task_id"].(string) if !ok { - // 如果没有task_id,尝试从article_id生成 + // 如果没有task_id,尝试从article_id或announcement_id生成 if articleID, ok := payload["article_id"].(string); ok { taskID = fmt.Sprintf("article-publish-%s", articleID) + } else if announcementID, ok := payload["announcement_id"].(string); ok { + taskID = fmt.Sprintf("announcement-publish-%s", announcementID) } else { h.logger.Error("无法从任务载荷中获取任务ID") return fmt.Errorf("无法获取任务ID") @@ -301,4 +375,4 @@ func (h *ArticleTaskHandler) checkTaskStatus(ctx context.Context, t *asynq.Task) } return nil -} \ No newline at end of file +} diff --git a/internal/infrastructure/task/implementations/asynq/asynq_worker.go b/internal/infrastructure/task/implementations/asynq/asynq_worker.go index 652085b..2857c32 100644 --- a/internal/infrastructure/task/implementations/asynq/asynq_worker.go +++ b/internal/infrastructure/task/implementations/asynq/asynq_worker.go @@ -29,6 +29,7 @@ func NewAsynqWorker( redisAddr string, logger *zap.Logger, articleApplicationService article.ArticleApplicationService, + announcementApplicationService article.AnnouncementApplicationService, apiApplicationService api.ApiApplicationService, walletService finance_services.WalletAggregateService, subscriptionService *product_services.ProductSubscriptionService, @@ -39,15 +40,16 @@ func NewAsynqWorker( asynq.Config{ Concurrency: 6, // 降低总并发数 Queues: map[string]int{ - "default": 2, // 2个goroutine - "api": 3, // 3个goroutine (扣款任务) - "article": 1, // 1个goroutine + "default": 2, // 2个goroutine + "api": 3, // 3个goroutine (扣款任务) + "article": 1, // 1个goroutine + "announcement": 1, // 1个goroutine }, }, ) // 创建任务处理器 - articleHandler := handlers.NewArticleTaskHandler(logger, articleApplicationService, asyncTaskRepo) + articleHandler := handlers.NewArticleTaskHandler(logger, articleApplicationService, announcementApplicationService, asyncTaskRepo) apiHandler := handlers.NewApiTaskHandler(logger, apiApplicationService, walletService, subscriptionService, asyncTaskRepo) // 创建ServeMux @@ -105,6 +107,9 @@ func (w *AsynqWorker) registerAllHandlers() { w.mux.HandleFunc(string(types.TaskTypeArticleCancel), w.articleHandler.HandleArticleCancel) w.mux.HandleFunc(string(types.TaskTypeArticleModify), w.articleHandler.HandleArticleModify) + // 注册公告任务处理器 + w.mux.HandleFunc(string(types.TaskTypeAnnouncementPublish), w.articleHandler.HandleAnnouncementPublish) + // 注册API任务处理器 w.mux.HandleFunc(string(types.TaskTypeApiCall), w.apiHandler.HandleApiCall) w.mux.HandleFunc(string(types.TaskTypeApiLog), w.apiHandler.HandleApiLog) @@ -116,6 +121,7 @@ func (w *AsynqWorker) registerAllHandlers() { zap.String("article_publish", string(types.TaskTypeArticlePublish)), zap.String("article_cancel", string(types.TaskTypeArticleCancel)), zap.String("article_modify", string(types.TaskTypeArticleModify)), + zap.String("announcement_publish", string(types.TaskTypeAnnouncementPublish)), zap.String("api_call", string(types.TaskTypeApiCall)), zap.String("api_log", string(types.TaskTypeApiLog)), ) diff --git a/internal/infrastructure/task/implementations/task_manager.go b/internal/infrastructure/task/implementations/task_manager.go index 9d5b160..6041dec 100644 --- a/internal/infrastructure/task/implementations/task_manager.go +++ b/internal/infrastructure/task/implementations/task_manager.go @@ -17,10 +17,10 @@ import ( // TaskManagerImpl 任务管理器实现 type TaskManagerImpl struct { - asynqClient *asynq.Client - asyncTaskRepo repositories.AsyncTaskRepository - logger *zap.Logger - config *interfaces.TaskManagerConfig + asynqClient *asynq.Client + asyncTaskRepo repositories.AsyncTaskRepository + logger *zap.Logger + config *interfaces.TaskManagerConfig } // NewTaskManager 创建任务管理器 @@ -42,7 +42,7 @@ func NewTaskManager( func (tm *TaskManagerImpl) CreateAndEnqueueTask(ctx context.Context, task *entities.AsyncTask) error { // 1. 保存任务到数据库(GORM会自动生成UUID) if err := tm.asyncTaskRepo.Create(ctx, task); err != nil { - tm.logger.Error("保存任务到数据库失败", + tm.logger.Error("保存任务到数据库失败", zap.String("task_id", task.ID), zap.Error(err)) return fmt.Errorf("保存任务失败: %w", err) @@ -50,7 +50,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueTask(ctx context.Context, task *entit // 2. 更新payload中的task_id if err := tm.updatePayloadTaskID(task); err != nil { - tm.logger.Error("更新payload中的任务ID失败", + tm.logger.Error("更新payload中的任务ID失败", zap.String("task_id", task.ID), zap.Error(err)) return fmt.Errorf("更新payload中的任务ID失败: %w", err) @@ -58,7 +58,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueTask(ctx context.Context, task *entit // 3. 更新数据库中的payload if err := tm.asyncTaskRepo.Update(ctx, task); err != nil { - tm.logger.Error("更新任务payload失败", + tm.logger.Error("更新任务payload失败", zap.String("task_id", task.ID), zap.Error(err)) return fmt.Errorf("更新任务payload失败: %w", err) @@ -71,7 +71,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueTask(ctx context.Context, task *entit return fmt.Errorf("任务入队失败: %w", err) } - tm.logger.Info("任务创建并入队成功", + tm.logger.Info("任务创建并入队成功", zap.String("task_id", task.ID), zap.String("task_type", task.Type)) @@ -86,7 +86,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueDelayedTask(ctx context.Context, task // 2. 保存任务到数据库(GORM会自动生成UUID) if err := tm.asyncTaskRepo.Create(ctx, task); err != nil { - tm.logger.Error("保存延时任务到数据库失败", + tm.logger.Error("保存延时任务到数据库失败", zap.String("task_id", task.ID), zap.Error(err)) return fmt.Errorf("保存延时任务失败: %w", err) @@ -94,7 +94,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueDelayedTask(ctx context.Context, task // 3. 更新payload中的task_id if err := tm.updatePayloadTaskID(task); err != nil { - tm.logger.Error("更新payload中的任务ID失败", + tm.logger.Error("更新payload中的任务ID失败", zap.String("task_id", task.ID), zap.Error(err)) return fmt.Errorf("更新payload中的任务ID失败: %w", err) @@ -102,7 +102,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueDelayedTask(ctx context.Context, task // 4. 更新数据库中的payload if err := tm.asyncTaskRepo.Update(ctx, task); err != nil { - tm.logger.Error("更新任务payload失败", + tm.logger.Error("更新任务payload失败", zap.String("task_id", task.ID), zap.Error(err)) return fmt.Errorf("更新任务payload失败: %w", err) @@ -115,7 +115,7 @@ func (tm *TaskManagerImpl) CreateAndEnqueueDelayedTask(ctx context.Context, task return fmt.Errorf("延时任务入队失败: %w", err) } - tm.logger.Info("延时任务创建并入队成功", + tm.logger.Info("延时任务创建并入队成功", zap.String("task_id", task.ID), zap.String("task_type", task.Type), zap.Duration("delay", delay)) @@ -131,13 +131,13 @@ func (tm *TaskManagerImpl) CancelTask(ctx context.Context, taskID string) error } if err := tm.asyncTaskRepo.UpdateStatus(ctx, task.ID, entities.TaskStatusCancelled); err != nil { - tm.logger.Error("更新任务状态为取消失败", + tm.logger.Error("更新任务状态为取消失败", zap.String("task_id", task.ID), zap.Error(err)) return fmt.Errorf("更新任务状态失败: %w", err) } - tm.logger.Info("任务已标记为取消", + tm.logger.Info("任务已标记为取消", zap.String("task_id", task.ID), zap.String("task_type", task.Type)) @@ -152,14 +152,14 @@ func (tm *TaskManagerImpl) UpdateTaskSchedule(ctx context.Context, taskID string return err } - tm.logger.Info("找到要更新的任务", + tm.logger.Info("找到要更新的任务", zap.String("task_id", task.ID), zap.String("current_status", string(task.Status)), zap.Time("current_scheduled_at", *task.ScheduledAt)) // 2. 取消旧任务 if err := tm.asyncTaskRepo.UpdateStatus(ctx, task.ID, entities.TaskStatusCancelled); err != nil { - tm.logger.Error("取消旧任务失败", + tm.logger.Error("取消旧任务失败", zap.String("task_id", task.ID), zap.Error(err)) return fmt.Errorf("取消旧任务失败: %w", err) @@ -173,7 +173,7 @@ func (tm *TaskManagerImpl) UpdateTaskSchedule(ctx context.Context, taskID string return err } - tm.logger.Info("新任务已创建", + tm.logger.Info("新任务已创建", zap.String("new_task_id", newTask.ID), zap.Time("new_scheduled_at", newScheduledAt)) @@ -189,7 +189,7 @@ func (tm *TaskManagerImpl) UpdateTaskSchedule(ctx context.Context, taskID string return fmt.Errorf("重新入队任务失败: %w", err) } - tm.logger.Info("任务调度时间更新成功", + tm.logger.Info("任务调度时间更新成功", zap.String("old_task_id", task.ID), zap.String("new_task_id", newTask.ID), zap.Time("new_scheduled_at", newScheduledAt)) @@ -237,7 +237,7 @@ func (tm *TaskManagerImpl) RetryTask(ctx context.Context, taskID string) error { return fmt.Errorf("重试任务入队失败: %w", err) } - tm.logger.Info("任务重试成功", + tm.logger.Info("任务重试成功", zap.String("task_id", taskID), zap.Int("retry_count", task.RetryCount)) @@ -248,7 +248,7 @@ func (tm *TaskManagerImpl) RetryTask(ctx context.Context, taskID string) error { func (tm *TaskManagerImpl) CleanupExpiredTasks(ctx context.Context, olderThan time.Time) error { // 这里可以实现清理逻辑,比如删除超过一定时间的已完成任务 tm.logger.Info("开始清理过期任务", zap.Time("older_than", olderThan)) - + // TODO: 实现清理逻辑 return nil } @@ -274,8 +274,7 @@ func (tm *TaskManagerImpl) updatePayloadTaskID(task *entities.AsyncTask) error { return nil } - -// findTask 查找任务(支持taskID和articleID双重查找) +// findTask 查找任务(支持taskID、articleID和announcementID三重查找) func (tm *TaskManagerImpl) findTask(ctx context.Context, taskID string) (*entities.AsyncTask, error) { // 先尝试通过任务ID查找 task, err := tm.asyncTaskRepo.GetByID(ctx, taskID) @@ -285,21 +284,34 @@ func (tm *TaskManagerImpl) findTask(ctx context.Context, taskID string) (*entiti // 如果通过任务ID找不到,尝试通过文章ID查找 tm.logger.Info("通过任务ID查找失败,尝试通过文章ID查找", zap.String("task_id", taskID)) - + tasks, err := tm.asyncTaskRepo.GetByArticleID(ctx, taskID) - if err != nil || len(tasks) == 0 { - tm.logger.Error("通过文章ID也找不到任务", + if err == nil && len(tasks) > 0 { + // 使用找到的第一个任务 + task = tasks[0] + tm.logger.Info("通过文章ID找到任务", zap.String("article_id", taskID), + zap.String("task_id", task.ID)) + return task, nil + } + + // 如果通过文章ID也找不到,尝试通过公告ID查找 + tm.logger.Info("通过文章ID查找失败,尝试通过公告ID查找", zap.String("task_id", taskID)) + + announcementTasks, err := tm.asyncTaskRepo.GetByAnnouncementID(ctx, taskID) + if err != nil || len(announcementTasks) == 0 { + tm.logger.Error("通过公告ID也找不到任务", + zap.String("announcement_id", taskID), zap.Error(err)) return nil, fmt.Errorf("获取任务信息失败: %w", err) } - + // 使用找到的第一个任务 - task = tasks[0] - tm.logger.Info("通过文章ID找到任务", - zap.String("article_id", taskID), + task = announcementTasks[0] + tm.logger.Info("通过公告ID找到任务", + zap.String("announcement_id", taskID), zap.String("task_id", task.ID)) - + return task, nil } @@ -317,7 +329,7 @@ func (tm *TaskManagerImpl) createAndSaveTask(ctx context.Context, originalTask * // 保存到数据库(GORM会自动生成UUID) if err := tm.asyncTaskRepo.Create(ctx, newTask); err != nil { - tm.logger.Error("创建新任务失败", + tm.logger.Error("创建新任务失败", zap.String("new_task_id", newTask.ID), zap.Error(err)) return nil, fmt.Errorf("创建新任务失败: %w", err) @@ -325,7 +337,7 @@ func (tm *TaskManagerImpl) createAndSaveTask(ctx context.Context, originalTask * // 更新payload中的task_id if err := tm.updatePayloadTaskID(newTask); err != nil { - tm.logger.Error("更新payload中的任务ID失败", + tm.logger.Error("更新payload中的任务ID失败", zap.String("new_task_id", newTask.ID), zap.Error(err)) return nil, fmt.Errorf("更新payload中的任务ID失败: %w", err) @@ -333,7 +345,7 @@ func (tm *TaskManagerImpl) createAndSaveTask(ctx context.Context, originalTask * // 更新数据库中的payload if err := tm.asyncTaskRepo.Update(ctx, newTask); err != nil { - tm.logger.Error("更新新任务payload失败", + tm.logger.Error("更新新任务payload失败", zap.String("new_task_id", newTask.ID), zap.Error(err)) return nil, fmt.Errorf("更新新任务payload失败: %w", err) @@ -346,23 +358,24 @@ func (tm *TaskManagerImpl) createAndSaveTask(ctx context.Context, originalTask * func (tm *TaskManagerImpl) enqueueTaskWithDelay(ctx context.Context, task *entities.AsyncTask, delay time.Duration) error { queueName := tm.getQueueName(task.Type) asynqTask := asynq.NewTask(task.Type, []byte(task.Payload)) - + var err error if delay > 0 { - _, err = tm.asynqClient.EnqueueContext(ctx, asynqTask, + _, err = tm.asynqClient.EnqueueContext(ctx, asynqTask, asynq.Queue(queueName), asynq.ProcessIn(delay)) } else { _, err = tm.asynqClient.EnqueueContext(ctx, asynqTask, asynq.Queue(queueName)) } - + return err } // getQueueName 根据任务类型获取队列名称 func (tm *TaskManagerImpl) getQueueName(taskType string) string { switch taskType { - case string(types.TaskTypeArticlePublish), string(types.TaskTypeArticleCancel), string(types.TaskTypeArticleModify): + case string(types.TaskTypeArticlePublish), string(types.TaskTypeArticleCancel), string(types.TaskTypeArticleModify), + string(types.TaskTypeAnnouncementPublish): return "article" case string(types.TaskTypeApiCall), string(types.TaskTypeApiLog), string(types.TaskTypeDeduction), string(types.TaskTypeUsageStats): return "api" diff --git a/internal/infrastructure/task/repositories/async_task_repository.go b/internal/infrastructure/task/repositories/async_task_repository.go index 9d73186..9f40576 100644 --- a/internal/infrastructure/task/repositories/async_task_repository.go +++ b/internal/infrastructure/task/repositories/async_task_repository.go @@ -42,6 +42,9 @@ type AsyncTaskRepository interface { GetByArticleID(ctx context.Context, articleID string) ([]*entities.AsyncTask, error) CancelArticlePublishTask(ctx context.Context, articleID string) error UpdateArticlePublishTaskSchedule(ctx context.Context, articleID string, newScheduledAt time.Time) error + + // 公告任务专用方法 + GetByAnnouncementID(ctx context.Context, announcementID string) ([]*entities.AsyncTask, error) } // AsyncTaskRepositoryImpl 异步任务仓库实现 @@ -219,8 +222,8 @@ func (r *AsyncTaskRepositoryImpl) DeleteBatch(ctx context.Context, ids []string) func (r *AsyncTaskRepositoryImpl) GetArticlePublishTask(ctx context.Context, articleID string) (*entities.AsyncTask, error) { var task entities.AsyncTask err := r.db.WithContext(ctx). - Where("type = ? AND payload LIKE ? AND status IN ?", - types.TaskTypeArticlePublish, + Where("type = ? AND payload LIKE ? AND status IN ?", + types.TaskTypeArticlePublish, "%\"article_id\":\""+articleID+"\"%", []entities.TaskStatus{entities.TaskStatusPending, entities.TaskStatusRunning}). First(&task).Error @@ -234,7 +237,7 @@ func (r *AsyncTaskRepositoryImpl) GetArticlePublishTask(ctx context.Context, art func (r *AsyncTaskRepositoryImpl) GetByArticleID(ctx context.Context, articleID string) ([]*entities.AsyncTask, error) { var tasks []*entities.AsyncTask err := r.db.WithContext(ctx). - Where("payload LIKE ? AND status IN ?", + Where("payload LIKE ? AND status IN ?", "%\"article_id\":\""+articleID+"\"%", []entities.TaskStatus{entities.TaskStatusPending, entities.TaskStatusRunning}). Find(&tasks).Error @@ -248,8 +251,8 @@ func (r *AsyncTaskRepositoryImpl) GetByArticleID(ctx context.Context, articleID func (r *AsyncTaskRepositoryImpl) CancelArticlePublishTask(ctx context.Context, articleID string) error { return r.db.WithContext(ctx). Model(&entities.AsyncTask{}). - Where("type = ? AND payload LIKE ? AND status IN ?", - types.TaskTypeArticlePublish, + Where("type = ? AND payload LIKE ? AND status IN ?", + types.TaskTypeArticlePublish, "%\"article_id\":\""+articleID+"\"%", []entities.TaskStatus{entities.TaskStatusPending, entities.TaskStatusRunning}). Update("status", entities.TaskStatusCancelled).Error @@ -259,9 +262,38 @@ func (r *AsyncTaskRepositoryImpl) CancelArticlePublishTask(ctx context.Context, func (r *AsyncTaskRepositoryImpl) UpdateArticlePublishTaskSchedule(ctx context.Context, articleID string, newScheduledAt time.Time) error { return r.db.WithContext(ctx). Model(&entities.AsyncTask{}). - Where("type = ? AND payload LIKE ? AND status IN ?", - types.TaskTypeArticlePublish, + Where("type = ? AND payload LIKE ? AND status IN ?", + types.TaskTypeArticlePublish, "%\"article_id\":\""+articleID+"\"%", []entities.TaskStatus{entities.TaskStatusPending, entities.TaskStatusRunning}). Update("scheduled_at", newScheduledAt).Error -} \ No newline at end of file +} + +// GetPendingArticlePublishTaskByArticleID 根据公告ID获取待执行的公告发布任务 +func (r *AsyncTaskRepositoryImpl) GetPendingAnnouncementPublishTaskByAnnouncementID(ctx context.Context, announcementID string) (*entities.AsyncTask, error) { + var task entities.AsyncTask + err := r.db.WithContext(ctx). + Where("type = ? AND payload LIKE ? AND status IN ?", + types.TaskTypeAnnouncementPublish, + "%\"announcement_id\":\""+announcementID+"\"%", + []entities.TaskStatus{entities.TaskStatusPending, entities.TaskStatusRunning}). + First(&task).Error + if err != nil { + return nil, err + } + return &task, nil +} + +// GetByAnnouncementID 根据公告ID获取所有相关任务 +func (r *AsyncTaskRepositoryImpl) GetByAnnouncementID(ctx context.Context, announcementID string) ([]*entities.AsyncTask, error) { + var tasks []*entities.AsyncTask + err := r.db.WithContext(ctx). + Where("payload LIKE ? AND status IN ?", + "%\"announcement_id\":\""+announcementID+"\"%", + []entities.TaskStatus{entities.TaskStatusPending, entities.TaskStatusRunning}). + Find(&tasks).Error + if err != nil { + return nil, err + } + return tasks, nil +} diff --git a/internal/infrastructure/task/types/task_types.go b/internal/infrastructure/task/types/task_types.go index 2a5d88b..c0616fd 100644 --- a/internal/infrastructure/task/types/task_types.go +++ b/internal/infrastructure/task/types/task_types.go @@ -9,12 +9,15 @@ const ( TaskTypeArticleCancel TaskType = "article_cancel" TaskTypeArticleModify TaskType = "article_modify" + // 公告相关任务 + TaskTypeAnnouncementPublish TaskType = "announcement_publish" + // API相关任务 TaskTypeApiCall TaskType = "api_call" TaskTypeApiLog TaskType = "api_log" // 财务相关任务 - TaskTypeDeduction TaskType = "deduction" + TaskTypeDeduction TaskType = "deduction" TaskTypeCompensation TaskType = "compensation" // 产品相关任务 @@ -26,4 +29,4 @@ type TaskPayload interface { GetType() TaskType ToJSON() ([]byte, error) FromJSON(data []byte) error -} \ No newline at end of file +}