Files
video-flow/api/tasks/video_tasks.py
2026-01-09 14:09:16 +08:00

419 lines
14 KiB
Python

"""
视频处理 Celery 任务
封装现有的 FFmpeg 处理逻辑为异步任务
"""
import os
import time
import logging
import shutil
from pathlib import Path
from typing import Dict, Any, List, Optional
from celery import shared_task
import config
from modules.db_manager import db
from modules.composer import VideoComposer
from modules import ffmpeg_utils, factory
from modules.text_renderer import renderer
logger = logging.getLogger(__name__)
@shared_task(bind=True, name="video.compose_from_script")
def compose_from_script_task(
self,
project_id: str,
script_data: Dict[str, Any],
video_map: Dict[int, str],
bgm_path: Optional[str] = None,
voice_type: str = "zh_female_santongyongns_saturn_bigtts",
output_name: Optional[str] = None
) -> Dict[str, Any]:
"""
基于脚本合成视频(异步任务)
Args:
project_id: 项目 ID
script_data: 脚本数据
video_map: 场景视频映射
bgm_path: BGM 路径
voice_type: TTS 音色
output_name: 输出文件名
Returns:
{"status": "success", "output_path": "...", "output_url": "..."}
"""
task_id = self.request.id
logger.info(f"[Task {task_id}] 开始合成视频: {project_id}")
# 更新任务状态
self.update_state(state="PROGRESS", meta={"progress": 0.1, "message": "准备素材..."})
try:
# 验证视频文件存在
valid_videos = {}
for scene_id, path in video_map.items():
if path and os.path.exists(path):
valid_videos[int(scene_id)] = path
if not valid_videos:
raise ValueError("没有可用的视频素材")
self.update_state(state="PROGRESS", meta={"progress": 0.2, "message": "创建合成器..."})
# 创建合成器
composer = VideoComposer(voice_type=voice_type)
# 生成输出名称
if not output_name:
output_name = f"final_{project_id}_{int(time.time())}"
self.update_state(state="PROGRESS", meta={"progress": 0.3, "message": "执行合成..."})
# 执行合成
output_path = composer.compose_from_script(
script=script_data,
video_map=valid_videos,
bgm_path=bgm_path,
output_name=output_name
)
self.update_state(state="PROGRESS", meta={"progress": 0.9, "message": "保存结果..."})
# 更新数据库
db.save_asset(project_id, 0, "final_video", "completed", local_path=output_path)
db.update_project_status(project_id, "completed")
output_url = f"/static/output/{Path(output_path).name}"
logger.info(f"[Task {task_id}] 合成完成: {output_path}")
return {
"status": "success",
"output_path": output_path,
"output_url": output_url,
"task_id": task_id
}
except Exception as e:
logger.error(f"[Task {task_id}] 合成失败: {e}")
db.update_project_status(project_id, "failed")
raise
@shared_task(bind=True, name="video.compose_from_tracks")
def compose_from_tracks_task(
self,
project_id: str,
video_clips: List[Dict[str, Any]],
voiceover_clips: List[Dict[str, Any]],
subtitle_clips: List[Dict[str, Any]],
fancy_text_clips: List[Dict[str, Any]],
bgm_clip: Optional[Dict[str, Any]] = None,
voice_type: str = "zh_female_santongyongns_saturn_bigtts",
bgm_volume: float = 0.15,
output_name: Optional[str] = None
) -> Dict[str, Any]:
"""
基于编辑器轨道数据合成视频(异步任务)
这是核心的多轨合成逻辑
"""
task_id = self.request.id
logger.info(f"[Task {task_id}] 开始多轨合成: {project_id}")
timestamp = int(time.time())
if not output_name:
output_name = f"composed_{project_id}_{timestamp}"
temp_files = []
try:
self.update_state(state="PROGRESS", meta={"progress": 0.05, "message": "验证素材..."})
# 1. 收集并验证视频片段
video_clips = sorted(video_clips, key=lambda x: x.get("start", 0))
if not video_clips:
raise ValueError("没有视频片段")
video_paths = []
for clip in video_clips:
source_path = clip.get("source_path")
if not source_path or not os.path.exists(source_path):
continue
trim_start = clip.get("trim_start", 0)
trim_end = clip.get("trim_end")
if trim_start > 0 or trim_end:
# 需要裁剪
trimmed_path = str(config.TEMP_DIR / f"trim_{timestamp}_{len(video_paths)}.mp4")
duration = (trim_end or 999) - trim_start
cmd = [
ffmpeg_utils.FFMPEG_PATH, "-y",
"-ss", str(trim_start),
"-i", source_path,
"-t", str(duration),
"-c", "copy",
trimmed_path
]
ffmpeg_utils._run_ffmpeg(cmd)
video_paths.append(trimmed_path)
temp_files.append(trimmed_path)
else:
video_paths.append(source_path)
if not video_paths:
raise ValueError("没有可用的视频片段")
self.update_state(state="PROGRESS", meta={"progress": 0.15, "message": "拼接视频..."})
# 2. 拼接视频
merged_path = str(config.TEMP_DIR / f"{output_name}_merged.mp4")
ffmpeg_utils.concat_videos(video_paths, merged_path, (1080, 1920))
temp_files.append(merged_path)
current_video = merged_path
# 添加静音轨
silent_path = str(config.TEMP_DIR / f"{output_name}_silent.mp4")
ffmpeg_utils.add_silence_audio(current_video, silent_path)
temp_files.append(silent_path)
current_video = silent_path
# 获取总时长
info = ffmpeg_utils.get_video_info(current_video)
total_duration = float(info.get("duration", 10))
self.update_state(state="PROGRESS", meta={"progress": 0.3, "message": "生成旁白..."})
# 3. 生成并混入旁白
if voiceover_clips:
mixed_audio_path = str(config.TEMP_DIR / f"{output_name}_mixed_vo.mp3")
# 初始化静音底轨
ffmpeg_utils._run_ffmpeg([
ffmpeg_utils.FFMPEG_PATH, "-y",
"-f", "lavfi", "-i", "anullsrc=r=44100:cl=stereo",
"-t", str(total_duration),
"-c:a", "mp3",
mixed_audio_path
])
temp_files.append(mixed_audio_path)
for i, clip in enumerate(voiceover_clips):
text = clip.get("text", "")
if not text:
continue
start_time = clip.get("start", 0)
target_duration = clip.get("duration", 3)
# 生成 TTS
tts_path = factory.generate_voiceover_volcengine(
text=text,
voice_type=voice_type,
output_path=str(config.TEMP_DIR / f"{output_name}_vo_{i}.mp3")
)
if not tts_path:
continue
temp_files.append(tts_path)
# 调整时长
adjusted_path = str(config.TEMP_DIR / f"{output_name}_vo_adj_{i}.mp3")
ffmpeg_utils.adjust_audio_duration(tts_path, target_duration, adjusted_path)
temp_files.append(adjusted_path)
# 混合
new_mixed = str(config.TEMP_DIR / f"{output_name}_mixed_{i}.mp3")
ffmpeg_utils.mix_audio_at_offset(mixed_audio_path, adjusted_path, start_time, new_mixed)
mixed_audio_path = new_mixed
temp_files.append(new_mixed)
# 混入视频
voiced_path = str(config.TEMP_DIR / f"{output_name}_voiced.mp4")
ffmpeg_utils.mix_audio(
current_video, mixed_audio_path, voiced_path,
audio_volume=1.5,
video_volume=0.2
)
temp_files.append(voiced_path)
current_video = voiced_path
self.update_state(state="PROGRESS", meta={"progress": 0.5, "message": "添加字幕..."})
# 4. 添加字幕
if subtitle_clips:
subtitles = []
for clip in subtitle_clips:
text = clip.get("text", "")
if text:
subtitles.append({
"text": ffmpeg_utils.wrap_text_smart(text),
"start": clip.get("start", 0),
"duration": clip.get("duration", 3),
"style": clip.get("style", {})
})
if subtitles:
subtitled_path = str(config.TEMP_DIR / f"{output_name}_subtitled.mp4")
subtitle_style = {
"font": ffmpeg_utils._get_font_path(),
"fontsize": 60,
"fontcolor": "white",
"borderw": 5,
"bordercolor": "black",
"box": 0,
"x": "(w-text_w)/2",
"y": "h-200",
}
ffmpeg_utils.add_multiple_subtitles(
current_video, subtitles, subtitled_path, default_style=subtitle_style
)
temp_files.append(subtitled_path)
current_video = subtitled_path
self.update_state(state="PROGRESS", meta={"progress": 0.65, "message": "叠加花字..."})
# 5. 叠加花字
if fancy_text_clips:
overlay_configs = []
for clip in fancy_text_clips:
text = clip.get("text", "")
if not text:
continue
style = clip.get("style", {
"font_size": 72,
"font_color": "#FFFFFF",
"stroke": {"color": "#000000", "width": 5}
})
img_path = renderer.render(text, style, cache=False)
temp_files.append(img_path)
position = clip.get("position", {})
overlay_configs.append({
"path": img_path,
"x": position.get("x", "(W-w)/2"),
"y": position.get("y", "180"),
"start": clip.get("start", 0),
"duration": clip.get("duration", 5)
})
if overlay_configs:
fancy_path = str(config.TEMP_DIR / f"{output_name}_fancy.mp4")
ffmpeg_utils.overlay_multiple_images(current_video, overlay_configs, fancy_path)
temp_files.append(fancy_path)
current_video = fancy_path
self.update_state(state="PROGRESS", meta={"progress": 0.8, "message": "添加背景音乐..."})
# 6. 添加 BGM
if bgm_clip:
bgm_source = bgm_clip.get("source_path")
if bgm_source and os.path.exists(bgm_source):
bgm_output = str(config.TEMP_DIR / f"{output_name}_bgm.mp4")
ffmpeg_utils.add_bgm(
current_video, bgm_source, bgm_output,
bgm_volume=bgm_volume
)
temp_files.append(bgm_output)
current_video = bgm_output
self.update_state(state="PROGRESS", meta={"progress": 0.9, "message": "保存输出..."})
# 7. 输出最终文件
final_path = str(config.OUTPUT_DIR / f"{output_name}.mp4")
shutil.copy(current_video, final_path)
# 更新数据库
db.save_asset(project_id, 0, "final_video", "completed", local_path=final_path)
db.update_project_status(project_id, "completed")
output_url = f"/static/output/{Path(final_path).name}"
logger.info(f"[Task {task_id}] 多轨合成完成: {final_path}")
return {
"status": "success",
"output_path": final_path,
"output_url": output_url,
"task_id": task_id
}
except Exception as e:
logger.error(f"[Task {task_id}] 多轨合成失败: {e}")
db.update_project_status(project_id, "failed")
raise
finally:
# 清理临时文件
for f in temp_files:
try:
if os.path.exists(f):
os.remove(f)
except:
pass
@shared_task(bind=True, name="video.trim")
def trim_video_task(
self,
source_path: str,
start_time: float,
end_time: float,
output_path: Optional[str] = None
) -> Dict[str, Any]:
"""
裁剪视频片段(异步任务)
"""
task_id = self.request.id
logger.info(f"[Task {task_id}] 裁剪视频: {source_path}")
if not os.path.exists(source_path):
raise FileNotFoundError(f"源视频不存在: {source_path}")
if not output_path:
timestamp = int(time.time())
output_path = str(config.TEMP_DIR / f"trimmed_{timestamp}.mp4")
try:
duration = end_time - start_time
cmd = [
ffmpeg_utils.FFMPEG_PATH, "-y",
"-ss", str(start_time),
"-i", source_path,
"-t", str(duration),
"-c", "copy",
output_path
]
ffmpeg_utils._run_ffmpeg(cmd)
output_url = f"/static/temp/{Path(output_path).name}"
return {
"status": "success",
"output_path": output_path,
"output_url": output_url,
"duration": duration
}
except Exception as e:
logger.error(f"[Task {task_id}] 裁剪失败: {e}")
raise