Files
tyapi-server/cmd/worker/main.go

138 lines
3.6 KiB
Go
Raw Permalink Normal View History

2025-09-01 18:29:59 +08:00
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"tyapi-server/internal/config"
"tyapi-server/internal/domains/article/entities"
2025-09-02 16:37:28 +08:00
"tyapi-server/internal/infrastructure/database"
2025-09-01 18:29:59 +08:00
"github.com/hibiken/asynq"
"go.uber.org/zap"
"gorm.io/gorm"
)
const (
TaskTypeArticlePublish = "article:publish"
)
func main() {
// 加载配置
cfg, err := config.LoadConfig()
if err != nil {
log.Fatal("加载配置失败:", err)
}
// 创建日志器
logger, err := zap.NewProduction()
if err != nil {
log.Fatal("创建日志器失败:", err)
}
defer logger.Sync()
// 连接数据库
2025-09-02 16:37:28 +08:00
// 使用配置文件中的数据库配置
dbCfg := database.Config{
Host: cfg.Database.Host,
Port: cfg.Database.Port,
User: cfg.Database.User,
Password: cfg.Database.Password,
Name: cfg.Database.Name,
SSLMode: cfg.Database.SSLMode,
Timezone: cfg.Database.Timezone,
MaxOpenConns: cfg.Database.MaxOpenConns,
MaxIdleConns: cfg.Database.MaxIdleConns,
ConnMaxLifetime: cfg.Database.ConnMaxLifetime,
2025-09-01 18:29:59 +08:00
}
2025-09-02 16:37:28 +08:00
logger.Info("数据库配置", zap.Any("dbCfg", dbCfg))
dbWrapper, err := database.NewConnection(dbCfg)
2025-09-01 18:29:59 +08:00
if err != nil {
logger.Fatal("连接数据库失败", zap.Error(err))
}
2025-09-02 16:37:28 +08:00
db := dbWrapper.DB
2025-09-01 18:29:59 +08:00
2025-09-02 16:37:28 +08:00
// 使用配置文件中的Redis配置
redisAddr := fmt.Sprintf("%s:%s", cfg.Redis.Host, cfg.Redis.Port)
2025-09-01 18:29:59 +08:00
// 创建 Asynq Server
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
Concurrency: 10,
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
},
)
// 创建任务处理器
mux := asynq.NewServeMux()
mux.HandleFunc(TaskTypeArticlePublish, func(ctx context.Context, t *asynq.Task) error {
return handleArticlePublish(ctx, t, db, logger)
})
// 启动 Worker
go func() {
logger.Info("启动 Asynq Worker", zap.String("redis_addr", redisAddr))
if err := server.Run(mux); err != nil {
logger.Fatal("启动 Worker 失败", zap.Error(err))
}
}()
// 等待信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
// 优雅关闭
logger.Info("正在关闭 Worker...")
server.Stop()
server.Shutdown()
logger.Info("Worker 已关闭")
}
// handleArticlePublish 处理文章定时发布任务
func handleArticlePublish(ctx context.Context, t *asynq.Task, db *gorm.DB, logger *zap.Logger) error {
var payload map[string]interface{}
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
logger.Error("解析任务载荷失败", zap.Error(err))
return fmt.Errorf("解析任务载荷失败: %w", err)
}
articleID, ok := payload["article_id"].(string)
if !ok {
logger.Error("任务载荷中缺少文章ID")
return fmt.Errorf("任务载荷中缺少文章ID")
}
// 获取文章
var article entities.Article
if err := db.WithContext(ctx).First(&article, "id = ?", articleID).Error; err != nil {
logger.Error("获取文章失败", zap.String("article_id", articleID), zap.Error(err))
return fmt.Errorf("获取文章失败: %w", err)
}
// 发布文章
if err := article.Publish(); err != nil {
logger.Error("发布文章失败", zap.String("article_id", articleID), zap.Error(err))
return fmt.Errorf("发布文章失败: %w", err)
}
// 保存更新
if err := db.WithContext(ctx).Save(&article).Error; err != nil {
logger.Error("保存文章失败", zap.String("article_id", articleID), zap.Error(err))
return fmt.Errorf("保存文章失败: %w", err)
}
logger.Info("定时发布文章成功", zap.String("article_id", articleID))
return nil
}