f
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -26,7 +26,7 @@ Thumbs.db
|
||||
tmp/
|
||||
temp/
|
||||
console
|
||||
worker
|
||||
|
||||
|
||||
# 依赖目录
|
||||
vendor/
|
||||
|
||||
193
cmd/worker/main.go
Normal file
193
cmd/worker/main.go
Normal file
@@ -0,0 +1,193 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"hyapi-server/internal/config"
|
||||
"hyapi-server/internal/domains/article/entities"
|
||||
"hyapi-server/internal/infrastructure/database"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
const (
|
||||
TaskTypeArticlePublish = "article:publish"
|
||||
TaskTypeAnnouncementPublish = "announcement_publish"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 加载配置
|
||||
cfg, err := config.LoadConfig()
|
||||
if err != nil {
|
||||
log.Fatal("加载配置失败:", err)
|
||||
}
|
||||
|
||||
// 创建日志器
|
||||
logger, err := zap.NewProduction()
|
||||
if err != nil {
|
||||
log.Fatal("创建日志器失败:", err)
|
||||
}
|
||||
defer logger.Sync()
|
||||
|
||||
// 连接数据库
|
||||
// 使用配置文件中的数据库配置
|
||||
dbCfg := database.Config{
|
||||
Host: cfg.Database.Host,
|
||||
Port: cfg.Database.Port,
|
||||
User: cfg.Database.User,
|
||||
Password: cfg.Database.Password,
|
||||
Name: cfg.Database.Name,
|
||||
SSLMode: cfg.Database.SSLMode,
|
||||
Timezone: cfg.Database.Timezone,
|
||||
MaxOpenConns: cfg.Database.MaxOpenConns,
|
||||
MaxIdleConns: cfg.Database.MaxIdleConns,
|
||||
ConnMaxLifetime: cfg.Database.ConnMaxLifetime,
|
||||
}
|
||||
logger.Info("数据库配置", zap.Any("dbCfg", dbCfg))
|
||||
dbWrapper, err := database.NewConnection(dbCfg)
|
||||
if err != nil {
|
||||
logger.Fatal("连接数据库失败", zap.Error(err))
|
||||
}
|
||||
db := dbWrapper.DB
|
||||
|
||||
// 使用配置文件中的Redis配置
|
||||
redisAddr := fmt.Sprintf("%s:%s", cfg.Redis.Host, cfg.Redis.Port)
|
||||
|
||||
// 创建 Asynq Server
|
||||
server := asynq.NewServer(
|
||||
asynq.RedisClientOpt{Addr: redisAddr},
|
||||
asynq.Config{
|
||||
Concurrency: 10,
|
||||
Queues: map[string]int{
|
||||
"critical": 6,
|
||||
"default": 3,
|
||||
"low": 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// 创建任务处理器
|
||||
mux := asynq.NewServeMux()
|
||||
mux.HandleFunc(TaskTypeArticlePublish, func(ctx context.Context, t *asynq.Task) error {
|
||||
return handleArticlePublish(ctx, t, db, logger)
|
||||
})
|
||||
mux.HandleFunc(TaskTypeAnnouncementPublish, func(ctx context.Context, t *asynq.Task) error {
|
||||
return handleAnnouncementPublish(ctx, t, db, logger)
|
||||
})
|
||||
|
||||
// 启动 Worker
|
||||
go func() {
|
||||
logger.Info("启动 Asynq Worker", zap.String("redis_addr", redisAddr))
|
||||
if err := server.Run(mux); err != nil {
|
||||
logger.Fatal("启动 Worker 失败", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
// 等待信号
|
||||
quit := make(chan os.Signal, 1)
|
||||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-quit
|
||||
|
||||
// 优雅关闭
|
||||
logger.Info("正在关闭 Worker...")
|
||||
server.Stop()
|
||||
server.Shutdown()
|
||||
logger.Info("Worker 已关闭")
|
||||
}
|
||||
|
||||
// handleArticlePublish 处理文章定时发布任务
|
||||
func handleArticlePublish(ctx context.Context, t *asynq.Task, db *gorm.DB, logger *zap.Logger) error {
|
||||
var payload map[string]interface{}
|
||||
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
|
||||
logger.Error("解析任务载荷失败", zap.Error(err))
|
||||
return fmt.Errorf("解析任务载荷失败: %w", err)
|
||||
}
|
||||
|
||||
articleID, ok := payload["article_id"].(string)
|
||||
if !ok {
|
||||
logger.Error("任务载荷中缺少文章ID")
|
||||
return fmt.Errorf("任务载荷中缺少文章ID")
|
||||
}
|
||||
|
||||
// 获取文章
|
||||
var article entities.Article
|
||||
if err := db.WithContext(ctx).First(&article, "id = ?", articleID).Error; err != nil {
|
||||
logger.Error("获取文章失败", zap.String("article_id", articleID), zap.Error(err))
|
||||
return fmt.Errorf("获取文章失败: %w", err)
|
||||
}
|
||||
|
||||
// 发布文章
|
||||
if err := article.Publish(); err != nil {
|
||||
logger.Error("发布文章失败", zap.String("article_id", articleID), zap.Error(err))
|
||||
return fmt.Errorf("发布文章失败: %w", err)
|
||||
}
|
||||
|
||||
// 保存更新
|
||||
if err := db.WithContext(ctx).Save(&article).Error; err != nil {
|
||||
logger.Error("保存文章失败", zap.String("article_id", articleID), zap.Error(err))
|
||||
return fmt.Errorf("保存文章失败: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("定时发布文章成功", zap.String("article_id", articleID))
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleAnnouncementPublish 处理公告定时发布任务
|
||||
func handleAnnouncementPublish(ctx context.Context, t *asynq.Task, db *gorm.DB, logger *zap.Logger) error {
|
||||
var payload map[string]interface{}
|
||||
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
|
||||
logger.Error("解析任务载荷失败", zap.Error(err))
|
||||
return fmt.Errorf("解析任务载荷失败: %w", err)
|
||||
}
|
||||
|
||||
announcementID, ok := payload["announcement_id"].(string)
|
||||
if !ok {
|
||||
logger.Error("任务载荷中缺少公告ID")
|
||||
return fmt.Errorf("任务载荷中缺少公告ID")
|
||||
}
|
||||
|
||||
// 获取公告
|
||||
var announcement entities.Announcement
|
||||
if err := db.WithContext(ctx).First(&announcement, "id = ?", announcementID).Error; err != nil {
|
||||
logger.Error("获取公告失败", zap.String("announcement_id", announcementID), zap.Error(err))
|
||||
return fmt.Errorf("获取公告失败: %w", err)
|
||||
}
|
||||
|
||||
// 检查是否已取消定时发布
|
||||
if !announcement.IsScheduled() {
|
||||
logger.Info("公告定时发布已取消,跳过执行",
|
||||
zap.String("announcement_id", announcementID),
|
||||
zap.String("status", string(announcement.Status)))
|
||||
return nil // 静默返回,不报错
|
||||
}
|
||||
|
||||
// 检查定时发布时间是否匹配
|
||||
if announcement.ScheduledAt == nil {
|
||||
logger.Info("公告没有定时发布时间,跳过执行",
|
||||
zap.String("announcement_id", announcementID))
|
||||
return nil
|
||||
}
|
||||
|
||||
// 发布公告
|
||||
if err := announcement.Publish(); err != nil {
|
||||
logger.Error("发布公告失败", zap.String("announcement_id", announcementID), zap.Error(err))
|
||||
return fmt.Errorf("发布公告失败: %w", err)
|
||||
}
|
||||
|
||||
// 保存更新
|
||||
if err := db.WithContext(ctx).Save(&announcement).Error; err != nil {
|
||||
logger.Error("保存公告失败", zap.String("announcement_id", announcementID), zap.Error(err))
|
||||
return fmt.Errorf("保存公告失败: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("定时发布公告成功", zap.String("announcement_id", announcementID))
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user