From a82aae9ab3335ebfbd82edb156d166f3477c582b Mon Sep 17 00:00:00 2001 From: liangzai <2440983361@qq.com> Date: Tue, 21 Apr 2026 23:25:18 +0800 Subject: [PATCH] f --- .gitignore | 2 +- cmd/worker/main.go | 193 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+), 1 deletion(-) create mode 100644 cmd/worker/main.go diff --git a/.gitignore b/.gitignore index ed9f32e..db5acfd 100644 --- a/.gitignore +++ b/.gitignore @@ -26,7 +26,7 @@ Thumbs.db tmp/ temp/ console -worker + # 依赖目录 vendor/ diff --git a/cmd/worker/main.go b/cmd/worker/main.go new file mode 100644 index 0000000..37984e8 --- /dev/null +++ b/cmd/worker/main.go @@ -0,0 +1,193 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "hyapi-server/internal/config" + "hyapi-server/internal/domains/article/entities" + "hyapi-server/internal/infrastructure/database" + + "github.com/hibiken/asynq" + "go.uber.org/zap" + "gorm.io/gorm" +) + +const ( + TaskTypeArticlePublish = "article:publish" + TaskTypeAnnouncementPublish = "announcement_publish" +) + +func main() { + // 加载配置 + cfg, err := config.LoadConfig() + if err != nil { + log.Fatal("加载配置失败:", err) + } + + // 创建日志器 + logger, err := zap.NewProduction() + if err != nil { + log.Fatal("创建日志器失败:", err) + } + defer logger.Sync() + + // 连接数据库 + // 使用配置文件中的数据库配置 + dbCfg := database.Config{ + Host: cfg.Database.Host, + Port: cfg.Database.Port, + User: cfg.Database.User, + Password: cfg.Database.Password, + Name: cfg.Database.Name, + SSLMode: cfg.Database.SSLMode, + Timezone: cfg.Database.Timezone, + MaxOpenConns: cfg.Database.MaxOpenConns, + MaxIdleConns: cfg.Database.MaxIdleConns, + ConnMaxLifetime: cfg.Database.ConnMaxLifetime, + } + logger.Info("数据库配置", zap.Any("dbCfg", dbCfg)) + dbWrapper, err := database.NewConnection(dbCfg) + if err != nil { + logger.Fatal("连接数据库失败", zap.Error(err)) + } + db := dbWrapper.DB + + // 使用配置文件中的Redis配置 + redisAddr := fmt.Sprintf("%s:%s", cfg.Redis.Host, cfg.Redis.Port) + + // 创建 Asynq Server + server := asynq.NewServer( + asynq.RedisClientOpt{Addr: redisAddr}, + asynq.Config{ + Concurrency: 10, + Queues: map[string]int{ + "critical": 6, + "default": 3, + "low": 1, + }, + }, + ) + + // 创建任务处理器 + mux := asynq.NewServeMux() + mux.HandleFunc(TaskTypeArticlePublish, func(ctx context.Context, t *asynq.Task) error { + return handleArticlePublish(ctx, t, db, logger) + }) + mux.HandleFunc(TaskTypeAnnouncementPublish, func(ctx context.Context, t *asynq.Task) error { + return handleAnnouncementPublish(ctx, t, db, logger) + }) + + // 启动 Worker + go func() { + logger.Info("启动 Asynq Worker", zap.String("redis_addr", redisAddr)) + if err := server.Run(mux); err != nil { + logger.Fatal("启动 Worker 失败", zap.Error(err)) + } + }() + + // 等待信号 + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + + // 优雅关闭 + logger.Info("正在关闭 Worker...") + server.Stop() + server.Shutdown() + logger.Info("Worker 已关闭") +} + +// handleArticlePublish 处理文章定时发布任务 +func handleArticlePublish(ctx context.Context, t *asynq.Task, db *gorm.DB, logger *zap.Logger) error { + var payload map[string]interface{} + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + logger.Error("解析任务载荷失败", zap.Error(err)) + return fmt.Errorf("解析任务载荷失败: %w", err) + } + + articleID, ok := payload["article_id"].(string) + if !ok { + logger.Error("任务载荷中缺少文章ID") + return fmt.Errorf("任务载荷中缺少文章ID") + } + + // 获取文章 + var article entities.Article + if err := db.WithContext(ctx).First(&article, "id = ?", articleID).Error; err != nil { + logger.Error("获取文章失败", zap.String("article_id", articleID), zap.Error(err)) + return fmt.Errorf("获取文章失败: %w", err) + } + + // 发布文章 + if err := article.Publish(); err != nil { + logger.Error("发布文章失败", zap.String("article_id", articleID), zap.Error(err)) + return fmt.Errorf("发布文章失败: %w", err) + } + + // 保存更新 + if err := db.WithContext(ctx).Save(&article).Error; err != nil { + logger.Error("保存文章失败", zap.String("article_id", articleID), zap.Error(err)) + return fmt.Errorf("保存文章失败: %w", err) + } + + logger.Info("定时发布文章成功", zap.String("article_id", articleID)) + return nil +} + +// handleAnnouncementPublish 处理公告定时发布任务 +func handleAnnouncementPublish(ctx context.Context, t *asynq.Task, db *gorm.DB, logger *zap.Logger) error { + var payload map[string]interface{} + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + logger.Error("解析任务载荷失败", zap.Error(err)) + return fmt.Errorf("解析任务载荷失败: %w", err) + } + + announcementID, ok := payload["announcement_id"].(string) + if !ok { + logger.Error("任务载荷中缺少公告ID") + return fmt.Errorf("任务载荷中缺少公告ID") + } + + // 获取公告 + var announcement entities.Announcement + if err := db.WithContext(ctx).First(&announcement, "id = ?", announcementID).Error; err != nil { + logger.Error("获取公告失败", zap.String("announcement_id", announcementID), zap.Error(err)) + return fmt.Errorf("获取公告失败: %w", err) + } + + // 检查是否已取消定时发布 + if !announcement.IsScheduled() { + logger.Info("公告定时发布已取消,跳过执行", + zap.String("announcement_id", announcementID), + zap.String("status", string(announcement.Status))) + return nil // 静默返回,不报错 + } + + // 检查定时发布时间是否匹配 + if announcement.ScheduledAt == nil { + logger.Info("公告没有定时发布时间,跳过执行", + zap.String("announcement_id", announcementID)) + return nil + } + + // 发布公告 + if err := announcement.Publish(); err != nil { + logger.Error("发布公告失败", zap.String("announcement_id", announcementID), zap.Error(err)) + return fmt.Errorf("发布公告失败: %w", err) + } + + // 保存更新 + if err := db.WithContext(ctx).Save(&announcement).Error; err != nil { + logger.Error("保存公告失败", zap.String("announcement_id", announcementID), zap.Error(err)) + return fmt.Errorf("保存公告失败: %w", err) + } + + logger.Info("定时发布公告成功", zap.String("announcement_id", announcementID)) + return nil +}