""" Celery 应用配置 支持异步任务处理,可水平扩展 Worker """ import os import sys from pathlib import Path # 确保项目根目录在 path 中 PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from celery import Celery # Redis 配置 (可通过环境变量覆盖) REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") # 创建 Celery 应用 celery_app = Celery( "video_flow", broker=REDIS_URL, backend=REDIS_URL, include=["api.tasks.video_tasks", "api.tasks.audio_tasks"] ) # Celery 配置 celery_app.conf.update( # 任务序列化 task_serializer="json", accept_content=["json"], result_serializer="json", # 时区 timezone="Asia/Shanghai", enable_utc=True, # 任务追踪 task_track_started=True, task_time_limit=600, # 10分钟超时 task_soft_time_limit=540, # 9分钟软超时 # 结果保存 result_expires=3600, # 1小时后过期 # Worker 配置 worker_prefetch_multiplier=1, # 每次只取一个任务,适合长任务 worker_concurrency=2, # 每个 Worker 的并发数 # 任务路由 (可选,用于任务分类) task_routes={ "api.tasks.video_tasks.*": {"queue": "video"}, "api.tasks.audio_tasks.*": {"queue": "audio"}, }, # 默认队列 task_default_queue="default", ) # 健康检查任务 @celery_app.task(bind=True) def health_check(self): """Worker 健康检查""" return { "status": "ok", "worker_id": self.request.id, "hostname": self.request.hostname } if __name__ == "__main__": celery_app.start()