Files
video-flow/api/routes/compose.py
2026-01-09 14:09:16 +08:00

838 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
视频合成 API 路由
基于编辑器状态进行最终合成
支持同步/异步两种模式,异步模式通过 Celery 任务队列处理
"""
import os
import time
import logging
import subprocess
from typing import List, Optional, Dict, Any
from pathlib import Path
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel
import config
from modules.db_manager import db
from modules.composer import VideoComposer
from modules import ffmpeg_utils, factory
from modules.text_renderer import renderer
# Celery 任务导入 (可选,默认关闭;需要显式开启 USE_CELERY=1)
CELERY_AVAILABLE = False
if str(os.getenv("USE_CELERY", "")).lower() in ("1", "true", "yes", "on"):
try:
from api.tasks.video_tasks import compose_from_script_task, compose_from_tracks_task
CELERY_AVAILABLE = True
except ImportError:
CELERY_AVAILABLE = False
logger = logging.getLogger(__name__)
router = APIRouter()
# ============================================================
# Pydantic Models
# ============================================================
class ComposeRequest(BaseModel):
"""合成请求"""
project_id: str
# 轨道数据(从编辑器状态转换)
video_clips: List[Dict[str, Any]] = []
voiceover_clips: List[Dict[str, Any]] = []
subtitle_clips: List[Dict[str, Any]] = []
fancy_text_clips: List[Dict[str, Any]] = []
sticker_clips: List[Dict[str, Any]] = []
bgm_clip: Optional[Dict[str, Any]] = None
# 全局设置
voice_type: str = "zh_female_santongyongns_saturn_bigtts"
bgm_volume: float = 0.15
output_name: Optional[str] = None
def _validate_no_overlap(clips: List[Dict[str, Any]], *, min_gap: float = 0.0) -> None:
"""Validate clips are non-overlapping when sorted by start time."""
ordered = sorted(clips, key=lambda x: float(x.get("start") or 0))
prev_end = 0.0
for c in ordered:
start = float(c.get("start") or 0.0)
dur = float(c.get("duration") or 0.0)
end = start + dur
if start + 1e-6 < prev_end - min_gap:
raise ValueError("视频片段存在重叠,请先在时间轴调整避免同轨道覆盖。")
prev_end = max(prev_end, end)
def _validate_trim_bounds(video_clips: List[Dict[str, Any]]) -> None:
"""
Validate trim bounds are sane and, if source_duration is provided, do not exceed it.
"""
for c in video_clips:
trim_start = float(c.get("trim_start") or 0.0)
trim_end = c.get("trim_end")
duration = float(c.get("duration") or 0.0)
if trim_end is None:
trim_end = trim_start + duration
trim_end = float(trim_end)
if trim_end <= trim_start + 1e-6:
raise ValueError("视频片段裁剪区间无效trim_end 必须大于 trim_start。")
src_dur = c.get("source_duration")
if src_dur is not None:
try:
src_dur_f = float(src_dur)
if src_dur_f > 0 and trim_end > src_dur_f + 1e-3:
raise ValueError("视频片段裁剪越界trim_end 超过源视频时长,请先缩短片段或调整 trim。")
except ValueError:
raise
except Exception:
# ignore parsing errors
pass
class ComposeResponse(BaseModel):
success: bool
message: str
output_path: Optional[str] = None
output_url: Optional[str] = None
task_id: Optional[str] = None
class ComposeStatus(BaseModel):
status: str # queued, pending, processing, completed, failed, cancelled
progress: float = 0
message: str = ""
output_path: Optional[str] = None
output_url: Optional[str] = None
def _persist_job(task_id: str, project_id: str, *, status: str, progress: float, message: str, request: Optional[Dict[str, Any]] = None):
"""Persist status to DB (render_jobs)."""
try:
existing = db.get_render_job(task_id)
if not existing:
db.create_render_job(task_id, project_id, status=status, progress=progress, message=message, request=request or {})
else:
db.update_render_job(task_id, {"status": status, "progress": progress, "message": message})
except Exception as e:
logger.debug(f"persist_job failed (non-fatal): {e}")
# ============================================================
# API Endpoints
# ============================================================
@router.post("/render", response_model=ComposeResponse)
async def render_video(request: ComposeRequest, background_tasks: BackgroundTasks):
"""
异步合成视频
优先使用 Celery 任务队列(支持水平扩展)
如果 Celery 不可用,降级为 FastAPI BackgroundTasks
"""
project = db.get_project(request.project_id)
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 优先使用 Celery 任务队列
if CELERY_AVAILABLE:
try:
task = compose_from_tracks_task.delay(
project_id=request.project_id,
video_clips=request.video_clips,
voiceover_clips=request.voiceover_clips,
subtitle_clips=request.subtitle_clips,
fancy_text_clips=request.fancy_text_clips,
bgm_clip=request.bgm_clip,
voice_type=request.voice_type,
bgm_volume=request.bgm_volume,
output_name=request.output_name
)
# 持久化任务记录(便于回溯/重试)
_persist_job(
task.id,
request.project_id,
status="queued",
progress=0.0,
message="合成任务已提交到队列",
request=request.model_dump(),
)
return ComposeResponse(
success=True,
message="合成任务已提交到队列",
task_id=task.id
)
except Exception as e:
logger.warning(f"Celery 任务提交失败,降级为同步模式: {e}")
# 降级:使用 FastAPI BackgroundTasks
task_id = f"compose_{request.project_id}_{int(time.time())}"
_persist_job(task_id, request.project_id, status="queued", progress=0.0, message="任务已创建(本地模式)", request=request.model_dump())
background_tasks.add_task(
_do_compose,
task_id,
request
)
return ComposeResponse(
success=True,
message="合成任务已提交",
task_id=task_id
)
@router.post("/render-sync", response_model=ComposeResponse)
async def render_video_sync(request: ComposeRequest):
"""
同步合成视频(适合短视频)
直接返回结果
"""
project = db.get_project(request.project_id)
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
try:
# 轻量导出前校验MVP禁止同轨道视频重叠
if request.video_clips:
_validate_no_overlap(request.video_clips)
_validate_trim_bounds(request.video_clips)
output_path = await _compose_video(None, request)
return ComposeResponse(
success=True,
message="合成完成",
output_path=output_path,
output_url=f"/static/output/{Path(output_path).name}"
)
except Exception as e:
logger.error(f"合成失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/status/{task_id}")
async def get_compose_status(task_id: str):
"""
获取合成任务状态
支持 Celery 任务和本地 BackgroundTask 两种模式
"""
# 先检查是否是 Celery 任务
if CELERY_AVAILABLE:
try:
from celery.result import AsyncResult
from api.celery_app import celery_app
result = AsyncResult(task_id, app=celery_app)
if result.state == "PENDING":
_persist_job(task_id, (db.get_render_job(task_id) or {}).get("project_id") or "", status="pending", progress=0.0, message="等待处理...")
return ComposeStatus(status="pending", progress=0, message="等待处理...")
elif result.state == "PROGRESS":
meta = result.info or {}
_persist_job(task_id, (db.get_render_job(task_id) or {}).get("project_id") or "", status="processing", progress=meta.get("progress", 0), message=meta.get("message", "处理中..."))
return ComposeStatus(
status="processing",
progress=meta.get("progress", 0),
message=meta.get("message", "处理中...")
)
elif result.state == "SUCCESS":
data = result.result or {}
_persist_job(task_id, (db.get_render_job(task_id) or {}).get("project_id") or "", status="completed", progress=1.0, message="合成完成")
try:
db.update_render_job(task_id, {"output_path": data.get("output_path"), "output_url": data.get("output_url")})
except Exception:
pass
return ComposeStatus(
status="completed",
progress=1.0,
message="合成完成",
output_path=data.get("output_path"),
output_url=data.get("output_url")
)
elif result.state == "FAILURE":
_persist_job(task_id, (db.get_render_job(task_id) or {}).get("project_id") or "", status="failed", progress=0.0, message=str(result.info) if result.info else "任务失败")
try:
db.update_render_job(task_id, {"error": str(result.info) if result.info else "任务失败"})
except Exception:
pass
return ComposeStatus(
status="failed",
progress=0,
message=str(result.info) if result.info else "任务失败"
)
else:
return ComposeStatus(status=result.state.lower(), progress=0, message=result.state)
except Exception as e:
logger.debug(f"Celery 状态查询失败: {e}")
# 本地任务/持久化任务:从 DB 取
job = db.get_render_job(task_id)
if job:
return ComposeStatus(
status=job.get("status") or "pending",
progress=float(job.get("progress") or 0.0),
message=job.get("message") or "",
output_path=job.get("output_path"),
output_url=job.get("output_url"),
)
raise HTTPException(status_code=404, detail="任务不存在")
@router.post("/retry/{task_id}", response_model=ComposeResponse)
async def retry_compose(task_id: str, background_tasks: BackgroundTasks):
"""失败任务一键重试:复用原 request 创建新任务。"""
job = db.get_render_job(task_id)
if not job or not job.get("request"):
raise HTTPException(status_code=404, detail="任务不存在或无可重试的请求数据")
if (job.get("status") or "").lower() not in ("failed", "cancelled"):
raise HTTPException(status_code=400, detail="仅失败/取消的任务可重试")
req = job["request"]
new_id = f"compose_{job.get('project_id')}_{int(time.time())}"
db.create_render_job(new_id, job.get("project_id") or "", status="queued", progress=0.0, message="重试任务已创建", request=req, parent_id=task_id)
# 走 BackgroundTasks不阻塞请求
background_tasks.add_task(_do_compose, new_id, ComposeRequest(**req))
return ComposeResponse(success=True, message="重试任务已提交", task_id=new_id)
@router.post("/quick", response_model=ComposeResponse)
async def quick_compose(project_id: str, bgm_id: Optional[str] = None, async_mode: bool = False):
"""
快速合成(使用项目默认设置)
适合工作流一键合成
Args:
project_id: 项目 ID
bgm_id: BGM 文件名
async_mode: 是否使用异步模式(推荐大量并发时使用)
"""
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
script_data = project.get("script_data")
if not script_data:
raise HTTPException(status_code=400, detail="项目缺少脚本数据")
# 获取视频素材
assets = db.get_assets(project_id, "video")
video_map = {a["scene_id"]: a["local_path"] for a in assets if a["status"] == "completed"}
if not video_map:
raise HTTPException(status_code=400, detail="项目缺少视频素材")
# BGM 路径
bgm_path = None
if bgm_id:
bgm_path = str(config.ASSETS_DIR / "bgm" / bgm_id)
if not os.path.exists(bgm_path):
bgm_path = None
# 异步模式:提交到 Celery 队列
if async_mode and CELERY_AVAILABLE:
try:
task = compose_from_script_task.delay(
project_id=project_id,
script_data=script_data,
video_map=video_map,
bgm_path=bgm_path,
voice_type=config.VOLC_TTS_DEFAULT_VOICE
)
return ComposeResponse(
success=True,
message="合成任务已提交到队列",
task_id=task.id
)
except Exception as e:
logger.warning(f"Celery 任务提交失败,降级为同步模式: {e}")
# 同步模式
try:
composer = VideoComposer(voice_type=config.VOLC_TTS_DEFAULT_VOICE)
output_name = f"final_{project_id}_{int(time.time())}"
output_path = composer.compose_from_script(
script=script_data,
video_map=video_map,
bgm_path=bgm_path,
output_name=output_name
)
# 保存到数据库
db.save_asset(project_id, 0, "final_video", "completed", local_path=output_path)
db.update_project_status(project_id, "completed")
return ComposeResponse(
success=True,
message="合成完成",
output_path=output_path,
output_url=f"/static/output/{Path(output_path).name}"
)
except Exception as e:
logger.error(f"快速合成失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================
# Internal Functions
# ============================================================
async def _do_compose(task_id: str, request: ComposeRequest):
"""后台合成任务"""
try:
db.update_render_job(task_id, {"status": "processing", "progress": 0.05, "message": "正在准备素材..."})
output_path = await _compose_video(task_id, request)
db.update_render_job(task_id, {
"status": "completed",
"progress": 1.0,
"message": "合成完成",
"output_path": output_path,
"output_url": f"/static/output/{Path(output_path).name}",
})
# 保存到数据库
db.save_asset(request.project_id, 0, "final_video", "completed", local_path=output_path)
db.update_project_status(request.project_id, "completed")
except Exception as e:
logger.error(f"合成任务失败: {e}")
db.update_render_job(task_id, {"status": "failed", "progress": 0.0, "message": str(e), "error": str(e)})
async def _compose_video(task_id: Optional[str], request: ComposeRequest) -> str:
"""
执行视频合成
基于编辑器轨道数据
"""
timestamp = int(time.time())
output_name = request.output_name or f"composed_{request.project_id}_{timestamp}"
if task_id:
db.update_render_job(task_id, {"status": "processing", "progress": 0.10, "message": "收集视频片段..."})
# 1. 收集视频片段(按时间排序)
video_clips = sorted(request.video_clips, key=lambda x: x.get("start", 0))
if not video_clips:
raise ValueError("没有视频片段")
video_paths = []
video_fades = []
for clip in video_clips:
source_path = clip.get("source_path")
if source_path and os.path.exists(source_path):
# 如果有裁剪,先裁剪
trim_start = clip.get("trim_start", 0)
trim_end = clip.get("trim_end")
# 读取转场参数(存放在 style 里;必须 WYSIWYG预览=导出)
style = clip.get("style") or {}
vf_in = float(style.get("vFadeIn") or style.get("v_fade_in") or 0.0)
vf_out = float(style.get("vFadeOut") or style.get("v_fade_out") or 0.0)
# 转场库WYSIWYG仅使用 vTransitionType/vTransitionDur由 ffmpeg_utils 在拼接阶段实现
t_type = str(style.get("vTransitionType") or style.get("v_transition_type") or "")
try:
t_dur = float(style.get("vTransitionDur") or style.get("v_transition_dur") or 0.0)
except Exception:
t_dur = 0.0
if trim_start > 0 or trim_end:
# 需要裁剪
trimmed_path = str(config.TEMP_DIR / f"trim_{timestamp}_{len(video_paths)}.mp4")
duration = (trim_end or 999) - trim_start
cmd = [
ffmpeg_utils.FFMPEG_PATH, "-y",
"-ss", str(trim_start),
"-i", source_path,
"-t", str(duration),
"-c", "copy",
trimmed_path
]
ffmpeg_utils._run_ffmpeg(cmd)
video_paths.append(trimmed_path)
video_fades.append({"in": max(0.0, vf_in), "out": max(0.0, vf_out), "type": t_type, "dur": max(0.0, t_dur)})
else:
video_paths.append(source_path)
video_fades.append({"in": max(0.0, vf_in), "out": max(0.0, vf_out), "type": t_type, "dur": max(0.0, t_dur)})
# 2. 拼接视频(尽量保留原声;失败则回退无音频)
if task_id:
db.update_render_job(task_id, {"progress": 0.25, "message": "拼接视频..."})
merged_path = str(config.TEMP_DIR / f"{output_name}_merged.mp4")
ffmpeg_utils.concat_videos_with_audio(video_paths, merged_path, (1080, 1920), fades=video_fades)
current_video = merged_path
# 添加静音轨(仅当拼接结果没有音轨时)
if task_id:
db.update_render_job(task_id, {"progress": 0.30, "message": "添加静音轨..."})
has_audio = False
try:
r = subprocess.run(
[ffmpeg_utils.FFPROBE_PATH, "-v", "error", "-select_streams", "a", "-show_entries", "stream=index", "-of", "csv=p=0", current_video],
capture_output=True,
text=True,
)
has_audio = bool((r.stdout or "").strip())
except Exception:
has_audio = False
if not has_audio:
silent_path = str(config.TEMP_DIR / f"{output_name}_silent.mp4")
ffmpeg_utils.add_silence_audio(current_video, silent_path)
current_video = silent_path
# 获取总时长
info = ffmpeg_utils.get_video_info(current_video)
total_duration = float(info.get("duration", 10))
# 3. 生成并混入旁白
if request.voiceover_clips:
if task_id:
db.update_render_job(task_id, {"progress": 0.38, "message": "生成/混入旁白..."})
mixed_audio_path = str(config.TEMP_DIR / f"{output_name}_mixed_vo.mp3")
# 初始化静音底轨
ffmpeg_utils._run_ffmpeg([
ffmpeg_utils.FFMPEG_PATH, "-y",
"-f", "lavfi", "-i", "anullsrc=r=44100:cl=stereo",
"-t", str(total_duration),
"-c:a", "mp3",
mixed_audio_path
])
for i, clip in enumerate(request.voiceover_clips):
start_time = float(clip.get("start", 0) or 0.0)
target_duration = float(clip.get("duration", 3) or 3.0)
vol = float(clip.get("volume", 1.0) or 1.0)
fade_in = float(clip.get("fade_in", 0.05) or 0.0)
fade_out = float(clip.get("fade_out", 0.05) or 0.0)
playback_rate = clip.get("playback_rate", clip.get("playbackRate", None))
try:
playback_rate = float(playback_rate) if playback_rate is not None else None
except Exception:
playback_rate = None
# 优先使用已生成的旁白音频(编辑器预览生成)
existing_path = clip.get("source_path")
voice_path = None
if existing_path and os.path.exists(existing_path):
voice_path = existing_path
else:
text = (clip.get("text") or "").strip()
if not text:
continue
voice_path = factory.generate_voiceover_volcengine(
text=text,
voice_type=request.voice_type,
output_path=str(config.TEMP_DIR / f"{output_name}_vo_{i}.mp3")
)
if not voice_path:
continue
# 旁白:纯播放倍速贴合时长(可快可慢,预览/导出一致)
adjusted_path = str(config.TEMP_DIR / f"{output_name}_vo_adj_{i}.mp3")
if playback_rate and playback_rate > 0:
# 先按用户倍速改变语速,再严格裁剪/补齐到片段时长(不再二次计算倍速)
sped = str(config.TEMP_DIR / f"{output_name}_vo_sp_{i}.mp3")
ffmpeg_utils.change_audio_speed(voice_path, playback_rate, sped)
ffmpeg_utils.force_audio_duration(sped, target_duration, adjusted_path)
else:
ffmpeg_utils.fit_audio_to_duration_by_speed(voice_path, target_duration, adjusted_path)
# 音量/淡入淡出(与预览一致)
processed_path = adjusted_path
try:
if vol != 1.0 or fade_in > 0 or fade_out > 0:
processed_path = str(config.TEMP_DIR / f"{output_name}_vo_fx_{i}.mp3")
fo_st = max(target_duration - fade_out, 0.0)
af = (
f"afade=t=in:st=0:d={fade_in},"
f"afade=t=out:st={fo_st}:d={fade_out},"
f"volume={vol}"
)
ffmpeg_utils._run_ffmpeg([
ffmpeg_utils.FFMPEG_PATH, "-y",
"-i", adjusted_path,
"-filter:a", af,
processed_path
])
except Exception:
processed_path = adjusted_path
# 混合
new_mixed = str(config.TEMP_DIR / f"{output_name}_mixed_{i}.mp3")
ffmpeg_utils.mix_audio_at_offset(mixed_audio_path, processed_path, start_time, new_mixed, overlay_volume=1.0)
mixed_audio_path = new_mixed
# 混入视频
if task_id:
db.update_render_job(task_id, {"progress": 0.55, "message": "混音合成..."})
voiced_path = str(config.TEMP_DIR / f"{output_name}_voiced.mp4")
ffmpeg_utils.mix_audio(
current_video, mixed_audio_path, voiced_path,
audio_volume=1.5,
video_volume=0.2
)
current_video = voiced_path
# 4. 添加字幕(使用文本渲染器生成 PNG 叠加,支持字体/颜色/B/I/U
if request.subtitle_clips:
if task_id:
db.update_render_job(task_id, {"progress": 0.65, "message": "渲染字幕..."})
overlay_configs = []
for clip in request.subtitle_clips:
text = (clip.get("text") or "").strip()
if not text:
continue
style = clip.get("style") or {}
# WYSIWYG自动换行交给 TextRenderer按 max_width 控制)
# 将前端 style 映射到 renderer style
r_style = {
"font_family": style.get("font_family") or style.get("fontFamily") or None,
"font_size": int(style.get("font_size") or style.get("fontSize") or 60),
"font_color": style.get("font_color") or style.get("fontColor") or "#FFFFFF",
"bold": bool(style.get("bold", False)),
"italic": bool(style.get("italic", False)),
"underline": bool(style.get("underline", False)),
# 默认给字幕一点描边,提升可读性
"stroke": style.get("stroke") or {"color": "#000000", "width": int(style.get("stroke_width") or 5)},
}
# 文本框宽度:前端用 0~1相对画布宽度控制换行导出映射到像素1080
try:
box_w = style.get("box_w") or style.get("boxW") or style.get("max_width") or style.get("maxWidth")
if isinstance(box_w, (int, float)):
if 0 < float(box_w) <= 1:
r_style["max_width"] = int(1080 * float(box_w))
elif float(box_w) > 1:
r_style["max_width"] = int(float(box_w))
except Exception:
pass
img_path = renderer.render(text, r_style, cache=False)
def _pos(v, axis: str):
# WYSIWYG前端 position 允许 0~1相对画面或 ffmpeg 表达式字符串
if isinstance(v, (int, float)):
fv = float(v)
if 0 <= fv <= 1:
return f"({axis}-w)*{fv}" if axis == "W" else f"({axis}-h)*{fv}"
return str(fv)
if isinstance(v, str) and v.strip():
return v
return None
position = clip.get("position") or {}
overlay_configs.append({
"path": img_path,
"x": _pos(position.get("x"), "W") or "(W-w)/2",
"y": _pos(position.get("y"), "H") or "H*0.78",
"start": clip.get("start", 0),
"duration": clip.get("duration", 3)
})
if overlay_configs:
subtitled_path = str(config.TEMP_DIR / f"{output_name}_subtitled.mp4")
ffmpeg_utils.overlay_multiple_images(current_video, overlay_configs, subtitled_path)
current_video = subtitled_path
# 5. 叠加花字
if request.fancy_text_clips:
if task_id:
db.update_render_job(task_id, {"progress": 0.78, "message": "叠加花字..."})
overlay_configs = []
for clip in request.fancy_text_clips:
text = clip.get("text", "")
if not text:
continue
style = clip.get("style", {
"font_size": 72,
"font_color": "#FFFFFF",
"stroke": {"color": "#000000", "width": 5}
})
# 文本框宽度0~1相对画布宽度-> 像素1080
try:
box_w = style.get("box_w") or style.get("boxW") or style.get("max_width") or style.get("maxWidth")
if isinstance(box_w, (int, float)):
if 0 < float(box_w) <= 1:
style["max_width"] = int(1080 * float(box_w))
elif float(box_w) > 1:
style["max_width"] = int(float(box_w))
except Exception:
pass
img_path = renderer.render(text, style, cache=False)
def _pos(v, axis: str):
if isinstance(v, (int, float)):
fv = float(v)
if 0 <= fv <= 1:
return f"({axis}-w)*{fv}" if axis == "W" else f"({axis}-h)*{fv}"
return str(fv)
if isinstance(v, str) and v.strip():
return v
return None
position = clip.get("position", {})
overlay_configs.append({
"path": img_path,
"x": _pos(position.get("x"), "W") or "(W-w)/2",
"y": _pos(position.get("y"), "H") or "180",
"start": clip.get("start", 0),
"duration": clip.get("duration", 5)
})
if overlay_configs:
fancy_path = str(config.TEMP_DIR / f"{output_name}_fancy.mp4")
ffmpeg_utils.overlay_multiple_images(current_video, overlay_configs, fancy_path)
current_video = fancy_path
# 5.5 叠加贴纸PNG/SVG
if request.sticker_clips:
if task_id:
db.update_render_job(task_id, {"progress": 0.82, "message": "叠加贴纸..."})
overlay_configs = []
for i, clip in enumerate(request.sticker_clips):
src_url = clip.get("source_url") or clip.get("sourceUrl")
src_path = clip.get("source_path") or clip.get("sourcePath")
# 贴纸可能来自 /static/assets/...,导出侧优先用 source_path其次尝试解析 url 到本地路径
path = src_path
if not path and isinstance(src_url, str) and src_url.startswith("/static/"):
# /static/assets/... 映射到容器内 /app/assets/...
try:
rel = src_url.replace("/static/assets/", "")
path = str(config.ASSETS_DIR / rel)
except Exception:
path = None
if not path or not os.path.exists(path):
continue
# 规范化为 PNG
png_path = str(config.TEMP_DIR / f"{output_name}_st_{i}.png")
try:
norm = ffmpeg_utils.normalize_sticker_to_png(path, png_path)
except Exception:
norm = None
if not norm or not os.path.exists(norm):
continue
pos = clip.get("position") or {}
x = pos.get("x", 0.8)
y = pos.get("y", 0.2)
# x/y: 0~1 视为比例
def _pos(v, axis: str):
if isinstance(v, (int, float)):
fv = float(v)
if 0 <= fv <= 1:
return f"({axis}-w)*{fv}" if axis == "W" else f"({axis}-h)*{fv}"
return str(fv)
if isinstance(v, str) and v.strip():
return v
return None
st = clip.get("style") or {}
scale = 1.0
try:
scale = float(st.get("scale", 1.0) or 1.0)
except Exception:
scale = 1.0
scale = max(0.3, min(3.0, scale))
# 通过 overlay 的 scale先 scale 贴纸,再 overlay
# overlay_multiple_images 不支持单张单独 scale filter所以这里把 PNG 预缩放成新文件
scaled_path = str(config.TEMP_DIR / f"{output_name}_st_sc_{i}.png")
try:
if abs(scale - 1.0) > 1e-3:
ffmpeg_utils._run_ffmpeg([
ffmpeg_utils.FFMPEG_PATH, "-y",
"-i", norm,
"-vf", f"scale=iw*{scale}:ih*{scale}:flags=lanczos",
scaled_path
])
norm = scaled_path
except Exception:
pass
overlay_configs.append({
"path": norm,
"x": _pos(x, "W") or "(W-w)*0.8",
"y": _pos(y, "H") or "(H-h)*0.2",
"start": clip.get("start", 0),
"duration": clip.get("duration", 2),
})
if overlay_configs:
st_path = str(config.TEMP_DIR / f"{output_name}_stickers.mp4")
ffmpeg_utils.overlay_multiple_images(current_video, overlay_configs, st_path)
current_video = st_path
# 6. 添加 BGM
if request.bgm_clip:
if task_id:
db.update_render_job(task_id, {"progress": 0.88, "message": "混入 BGM..."})
bgm_source = request.bgm_clip.get("source_path")
if bgm_source and os.path.exists(bgm_source):
bgm_output = str(config.TEMP_DIR / f"{output_name}_bgm.mp4")
# BGM 参数:优先使用 clip 内配置
bgm_start = float(request.bgm_clip.get("start", 0) or 0.0)
bgm_dur = request.bgm_clip.get("duration")
try:
bgm_dur = float(bgm_dur) if bgm_dur is not None else None
except Exception:
bgm_dur = None
bgm_vol = float(request.bgm_clip.get("volume", request.bgm_volume) or request.bgm_volume)
bgm_fade_in = float(request.bgm_clip.get("fade_in", 0.8) or 0.0)
bgm_fade_out = float(request.bgm_clip.get("fade_out", 0.8) or 0.0)
bgm_ducking = request.bgm_clip.get("ducking")
if not isinstance(bgm_ducking, bool):
bgm_ducking = True
bgm_duck_volume = float(request.bgm_clip.get("duck_volume", 0.25) or 0.25)
# 闪避区间:来自旁白时间轴(更可控)
duck_ranges = None
if request.voiceover_clips and bgm_ducking:
duck_ranges = []
for vc in request.voiceover_clips:
s = float(vc.get("start", 0) or 0.0)
d = float(vc.get("duration", 0) or 0.0)
if d <= 0:
continue
# 给一点点缓冲,避免边缘咬字突兀
duck_ranges.append((max(0.0, s - 0.03), s + d + 0.05))
ffmpeg_utils.add_bgm(
current_video, bgm_source, bgm_output,
bgm_volume=bgm_vol,
ducking=bool(bgm_ducking),
duck_volume=bgm_duck_volume,
duck_ranges=duck_ranges,
start_time=bgm_start,
clip_duration=bgm_dur,
fade_in=bgm_fade_in,
fade_out=bgm_fade_out,
)
current_video = bgm_output
# 7. 输出最终文件
if task_id:
db.update_render_job(task_id, {"progress": 0.95, "message": "写出最终文件..."})
final_path = str(config.OUTPUT_DIR / f"{output_name}.mp4")
import shutil
shutil.copy(current_video, final_path)
logger.info(f"合成完成: {final_path}")
return final_path