Files
video-flow/modules/ffmpeg_utils.py
2026-01-09 14:09:16 +08:00

1272 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FFmpeg 视频处理工具模块
支持规模化批量视频处理:拼接、字幕、叠加、混音
"""
import os
import re
import subprocess
import tempfile
import logging
import shutil
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
import config
logger = logging.getLogger(__name__)
def _pick_exec(preferred_path: str, fallback_name: str) -> str:
"""
Pick an executable path.
Why:
- In docker, /app/bin may accidentally contain binaries built for another OS/arch,
causing `Exec format error` at runtime (seen on /app/bin/ffprobe).
Strategy:
- Prefer preferred_path if it exists AND is runnable.
- Otherwise fall back to PATH-resolved command (fallback_name).
"""
if preferred_path and os.path.exists(preferred_path):
try:
# Validate it can be executed (arch OK) and is a real binary.
# ffmpeg/ffprobe both support `-version`.
result = subprocess.run(
[preferred_path, "-version"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return preferred_path
except OSError:
# Exec format error / permission error -> fall back
pass
resolved = shutil.which(fallback_name)
return resolved or fallback_name
# FFmpeg/FFprobe 路径(优先使用项目内的二进制,但会做可执行性自检)
FFMPEG_PATH = _pick_exec(str(config.BASE_DIR / "bin" / "ffmpeg"), "ffmpeg")
FFPROBE_PATH = _pick_exec(str(config.BASE_DIR / "bin" / "ffprobe"), "ffprobe")
# 字体路径:优先使用项目内置字体,然后按平台回退到系统字体
DEFAULT_FONT_PATHS = [
# 优先使用项目内置字体 (跨平台通用)
str(config.FONTS_DIR / "NotoSansSC-Regular.otf"),
str(config.FONTS_DIR / "HarmonyOS-Sans-SC-Regular.ttf"),
str(config.FONTS_DIR / "AlibabaPuHuiTi-Regular.ttf"),
# Linux 系统字体
"/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf",
"/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
"/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
# macOS 系统字体
"/System/Library/Fonts/PingFang.ttc",
"/System/Library/Fonts/STHeiti Medium.ttc",
# Windows 系统字体
"C:/Windows/Fonts/msyh.ttc",
"C:/Windows/Fonts/simhei.ttf",
]
def _get_font_path() -> str:
for p in DEFAULT_FONT_PATHS:
if os.path.exists(p) and os.path.getsize(p) > 1000:
return p
return "Arial" # 极端情况下退回英文字体,避免崩溃
def _sanitize_text(text: str) -> str:
"""
去除可能导致 ffmpeg 命令行错误的特殊控制字符,但保留 Emoji、数字、标点和各国语言。
"""
if not text:
return ""
# 不再过滤任何字符,只确保不是 None
return text
def add_silence_audio(video_path: str, output_path: str) -> str:
"""
给无音轨的视频补一条静音轨(立体声 44.1k),避免后续 filter 找不到 0:a
"""
cmd = [
FFMPEG_PATH, "-y",
"-i", video_path,
"-f", "lavfi",
"-i", "anullsrc=channel_layout=stereo:sample_rate=44100",
"-shortest",
"-c:v", "copy",
"-c:a", "aac",
output_path
]
_run_ffmpeg(cmd)
return output_path
def _run_ffmpeg(cmd: List[str], check: bool = True) -> subprocess.CompletedProcess:
"""执行 FFmpeg 命令"""
logger.debug(f"FFmpeg command: {' '.join(cmd)}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=check
)
# 无论成功失败,输出 stderr 以便排查字体等警告
if result.stderr:
print(f"[FFmpeg stderr] {result.stderr}", flush=True)
if result.returncode != 0:
logger.error(f"FFmpeg stderr: {result.stderr}")
return result
except subprocess.CalledProcessError as e:
logger.error(f"FFmpeg failed: {e.stderr}")
raise
def get_video_info(video_path: str) -> Dict[str, Any]:
"""获取视频信息(时长、分辨率、帧率等)"""
cmd = [
FFPROBE_PATH,
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise ValueError(f"Failed to probe video: {video_path}")
import json
data = json.loads(result.stdout)
# 提取关键信息
info = {
"duration": float(data.get("format", {}).get("duration", 0)),
"width": 0,
"height": 0,
"fps": 30
}
for stream in data.get("streams", []):
if stream.get("codec_type") == "video":
info["width"] = stream.get("width", 0)
info["height"] = stream.get("height", 0)
# 解析帧率 (如 "30/1" 或 "29.97")
fps_str = stream.get("r_frame_rate", "30/1")
if "/" in fps_str:
num, den = fps_str.split("/")
info["fps"] = float(num) / float(den) if float(den) != 0 else 30
else:
info["fps"] = float(fps_str)
break
return info
def concat_videos(
video_paths: List[str],
output_path: str,
target_size: Tuple[int, int] = (1080, 1920),
fades: Optional[List[Dict[str, float]]] = None
) -> str:
"""
使用 FFmpeg concat demuxer 拼接多段视频
Args:
video_paths: 视频文件路径列表
output_path: 输出文件路径
target_size: 目标分辨率 (width, height),默认竖屏 1080x1920
Returns:
输出文件路径
"""
if not video_paths:
raise ValueError("No video paths provided")
logger.info(f"Concatenating {len(video_paths)} videos...")
width, height = target_size
# 使用 filter_complex 统一分辨率后拼接
# 每个视频先 scale + pad 到目标尺寸
filter_parts = []
for i in range(len(video_paths)):
# scale 保持宽高比pad 填充黑边居中
chain = (
f"[{i}:v]scale={width}:{height}:force_original_aspect_ratio=decrease,"
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black,setsar=1"
)
# 可选:片段末尾“火山式转场”(不改时长、不重叠)
if fades and i < len(fades):
fx = fades[i] or {}
fi = float(fx.get("in", 0) or 0.0)
fo = float(fx.get("out", 0) or 0.0)
t_type = str(fx.get("type") or "")
t_dur = float(fx.get("dur") or 0.0)
try:
dur = float(get_video_info(video_paths[i]).get("duration") or 0.0)
except Exception:
dur = 0.0
# 基础淡入/淡出
if fi > 0:
chain += f",fade=t=in:st=0:d={fi}"
if fo > 0 and dur > 0:
st = max(dur - fo, 0.0)
chain += f",fade=t=out:st={st}:d={fo}"
# 末尾动效WYSIWYG前端预览必须与此一致
if t_type and t_dur > 0 and dur > 0:
st = max(dur - t_dur, 0.0)
td = max(t_dur, 0.001)
p = f"if(between(t\\,{st}\\,{dur})\\,(t-{st})/{td}\\,0)"
if t_type == "fade":
chain += f",fade=t=out:st={st}:d={t_dur}"
elif t_type == "fadeWhite":
chain += f",fade=t=out:st={st}:d={t_dur}:color=white"
elif t_type == "blurOut":
chain += f",gblur=sigma='10*{p}':steps=1"
elif t_type == "blurFade":
chain += f",gblur=sigma='8*{p}':steps=1,fade=t=out:st={st}:d={t_dur}"
elif t_type == "flash":
chain += f",eq=brightness='0.7*(1-abs(0.5-{p})*2)'"
elif t_type == "desaturate":
chain += f",hue=s='1-0.9*{p}'"
elif t_type == "colorPop":
chain += f",hue=s='1+0.8*{p}',eq=contrast='1+0.3*{p}'"
elif t_type == "hueShift":
chain += f",hue=h='60*{p}'"
elif t_type == "darken":
chain += f",eq=brightness='-0.4*{p}'"
elif t_type in ("slideLeft", "slideRight", "slideUp", "slideDown"):
off = 80
if t_type == "slideLeft":
chain += f",pad={width+off}:{height}:{off/2}-{off}*{p}:0:black,crop={width}:{height}:{off/2}:0"
if t_type == "slideRight":
chain += f",pad={width+off}:{height}:{off/2}+{off}*{p}:0:black,crop={width}:{height}:{off/2}:0"
if t_type == "slideUp":
chain += f",pad={width}:{height+off}:0:{off/2}-{off}*{p}:black,crop={width}:{height}:0:{off/2}"
if t_type == "slideDown":
chain += f",pad={width}:{height+off}:0:{off/2}+{off}*{p}:black,crop={width}:{height}:0:{off/2}"
elif t_type in ("zoomOut", "zoomIn"):
if t_type == "zoomOut":
chain += f",scale=w='{width}*(1-0.10*{p})':h='{height}*(1-0.10*{p})':eval=frame,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black"
else:
chain += f",scale=w='{width}*(1+0.10*{p})':h='{height}*(1+0.10*{p})':eval=frame,crop={width}:{height}"
elif t_type == "rotateOut":
chain += f",rotate=a='0.12*{p}':c=black@1:ow={width}:oh={height}"
chain += f"[v{i}]"
filter_parts.append(chain)
# 拼接所有视频流
concat_inputs = "".join([f"[v{i}]" for i in range(len(video_paths))])
filter_parts.append(f"{concat_inputs}concat=n={len(video_paths)}:v=1:a=0[outv]")
filter_complex = ";".join(filter_parts)
# 构建 ffmpeg 命令
cmd = [FFMPEG_PATH, "-y"]
for vp in video_paths:
cmd.extend(["-i", vp])
cmd.extend([
"-filter_complex", filter_complex,
"-map", "[outv]",
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-pix_fmt", "yuv420p",
output_path
])
_run_ffmpeg(cmd)
logger.info(f"Concatenated video saved: {output_path}")
return output_path
def concat_videos_with_audio(
video_paths: List[str],
output_path: str,
target_size: Tuple[int, int] = (1080, 1920),
fades: Optional[List[Dict[str, float]]] = None
) -> str:
"""
拼接视频并保留音频轨道
"""
if not video_paths:
raise ValueError("No video paths provided")
logger.info(f"Concatenating {len(video_paths)} videos with audio...")
width, height = target_size
n = len(video_paths)
# 构建 filter_complex
filter_parts = []
# 视频处理
for i in range(n):
chain = (
f"[{i}:v]scale={width}:{height}:force_original_aspect_ratio=decrease,"
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black,setsar=1"
)
# 可选:片段末尾“火山式转场”(不改时长、不重叠)
if fades and i < len(fades):
fx = fades[i] or {}
fi = float(fx.get("in", 0) or 0.0)
fo = float(fx.get("out", 0) or 0.0)
t_type = str(fx.get("type") or "")
t_dur = float(fx.get("dur") or 0.0)
try:
dur = float(get_video_info(video_paths[i]).get("duration") or 0.0)
except Exception:
dur = 0.0
if fi > 0:
chain += f",fade=t=in:st=0:d={fi}"
if fo > 0 and dur > 0:
st = max(dur - fo, 0.0)
chain += f",fade=t=out:st={st}:d={fo}"
if t_type and t_dur > 0 and dur > 0:
st = max(dur - t_dur, 0.0)
td = max(t_dur, 0.001)
p = f"if(between(t\\,{st}\\,{dur})\\,(t-{st})/{td}\\,0)"
if t_type == "fade":
chain += f",fade=t=out:st={st}:d={t_dur}"
elif t_type == "fadeWhite":
chain += f",fade=t=out:st={st}:d={t_dur}:color=white"
elif t_type == "blurOut":
chain += f",gblur=sigma='10*{p}':steps=1"
elif t_type == "blurFade":
chain += f",gblur=sigma='8*{p}':steps=1,fade=t=out:st={st}:d={t_dur}"
elif t_type == "flash":
chain += f",eq=brightness='0.7*(1-abs(0.5-{p})*2)'"
elif t_type == "desaturate":
chain += f",hue=s='1-0.9*{p}'"
elif t_type == "colorPop":
chain += f",hue=s='1+0.8*{p}',eq=contrast='1+0.3*{p}'"
elif t_type == "hueShift":
chain += f",hue=h='60*{p}'"
elif t_type == "darken":
chain += f",eq=brightness='-0.4*{p}'"
elif t_type in ("slideLeft", "slideRight", "slideUp", "slideDown"):
off = 80
if t_type == "slideLeft":
chain += f",pad={width+off}:{height}:{off/2}-{off}*{p}:0:black,crop={width}:{height}:{off/2}:0"
if t_type == "slideRight":
chain += f",pad={width+off}:{height}:{off/2}+{off}*{p}:0:black,crop={width}:{height}:{off/2}:0"
if t_type == "slideUp":
chain += f",pad={width}:{height+off}:0:{off/2}-{off}*{p}:black,crop={width}:{height}:0:{off/2}"
if t_type == "slideDown":
chain += f",pad={width}:{height+off}:0:{off/2}+{off}*{p}:black,crop={width}:{height}:0:{off/2}"
elif t_type in ("zoomOut", "zoomIn"):
if t_type == "zoomOut":
chain += f",scale=w='{width}*(1-0.10*{p})':h='{height}*(1-0.10*{p})':eval=frame,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black"
else:
chain += f",scale=w='{width}*(1+0.10*{p})':h='{height}*(1+0.10*{p})':eval=frame,crop={width}:{height}"
elif t_type == "rotateOut":
chain += f",rotate=a='0.12*{p}':c=black@1:ow={width}:oh={height}"
chain += f"[v{i}]"
filter_parts.append(chain)
# 音频处理(静音填充如果没有音频)
for i in range(n):
filter_parts.append(f"[{i}:a]aformat=sample_rates=44100:channel_layouts=stereo[a{i}]")
# 拼接
v_concat = "".join([f"[v{i}]" for i in range(n)])
a_concat = "".join([f"[a{i}]" for i in range(n)])
filter_parts.append(f"{v_concat}concat=n={n}:v=1:a=0[outv]")
filter_parts.append(f"{a_concat}concat=n={n}:v=0:a=1[outa]")
filter_complex = ";".join(filter_parts)
cmd = [FFMPEG_PATH, "-y"]
for vp in video_paths:
cmd.extend(["-i", vp])
cmd.extend([
"-filter_complex", filter_complex,
"-map", "[outv]",
"-map", "[outa]",
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-c:a", "aac",
"-b:a", "128k",
"-pix_fmt", "yuv420p",
output_path
])
try:
_run_ffmpeg(cmd)
except subprocess.CalledProcessError:
# 如果音频拼接失败,回退到无音频版本
logger.warning("Audio concat failed, falling back to video only")
return concat_videos(video_paths, output_path, target_size)
logger.info(f"Concatenated video with audio saved: {output_path}")
return output_path
def add_subtitle(
video_path: str,
text: str,
start: float,
duration: float,
output_path: str,
style: Dict[str, Any] = None
) -> str:
"""
使用 drawtext filter 添加单条字幕
Args:
video_path: 输入视频路径
text: 字幕文本
start: 开始时间(秒)
duration: 持续时间(秒)
output_path: 输出路径
style: 样式配置 {
fontsize: 字体大小,
fontcolor: 字体颜色,
borderw: 描边宽度,
bordercolor: 描边颜色,
x: x位置 (可用表达式如 "(w-text_w)/2"),
y: y位置,
font: 字体路径或名称
}
Returns:
输出文件路径
"""
style = style or {}
# 默认样式
fontsize = style.get("fontsize", 48)
fontcolor = style.get("fontcolor", "white")
borderw = style.get("borderw", 3)
bordercolor = style.get("bordercolor", "black")
x = style.get("x", "(w-text_w)/2") # 默认水平居中
y = style.get("y", "h-200") # 默认底部偏上
# 优先使用动态检测到的有效字体,而不是硬编码的可能损坏的路径
default_font_path = _get_font_path()
font = style.get("font", default_font_path)
# 转义特殊字符
escaped_text = text.replace("'", "\\'").replace(":", "\\:")
# drawtext filter
drawtext = (
f"drawtext=text='{escaped_text}':"
f"fontfile='{font}':"
f"fontsize={fontsize}:"
f"fontcolor={fontcolor}:"
f"borderw={borderw}:"
f"bordercolor={bordercolor}:"
f"x={x}:y={y}:"
f"enable='between(t,{start},{start + duration})'"
)
cmd = [
FFMPEG_PATH, "-y",
"-i", video_path,
"-vf", drawtext,
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-c:a", "copy",
"-pix_fmt", "yuv420p",
output_path
]
_run_ffmpeg(cmd)
logger.info(f"Added subtitle: '{text[:20]}...' at {start}s")
return output_path
def wrap_text(text: str, max_chars: int = 18) -> str:
"""
简单的文本换行处理
"""
if not text: return ""
# 如果已经有换行符,假设用户已经手动处理
if "\n" in text:
return text
result = ""
count = 0
for char in text:
if count >= max_chars:
result += "\n"
count = 0
result += char
# 简单估算中文算1个英文也算1个等宽字体
# 实际上中英文混合较复杂,这里简化处理
count += 1
return result
def mix_audio_at_offset(
base_audio: str,
overlay_audio: str,
offset: float,
output_path: str,
base_volume: float = 1.0,
overlay_volume: float = 1.0
) -> str:
"""
在指定偏移位置混合音频
"""
# 如果 base_audio 不存在,创建一个静音底
if not os.path.exists(base_audio):
logger.warning(f"Base audio not found: {base_audio}")
return overlay_audio
cmd = [
FFMPEG_PATH, "-y",
"-i", base_audio,
"-i", overlay_audio,
"-filter_complex",
f"[0:a]volume={base_volume}[a0];[1:a]volume={overlay_volume},adelay={int(offset*1000)}|{int(offset*1000)}[a1];[a0][a1]amix=inputs=2:duration=first:dropout_transition=0:normalize=0[out]",
"-map", "[out]",
"-c:a", "mp3", # Use MP3 for audio only mixing
output_path
]
_run_ffmpeg(cmd)
return output_path
def adjust_audio_duration(
input_path: str,
target_duration: float,
output_path: str
) -> str:
"""
调整音频时长(仅在音频过长时加速,音频较短时保持原速)
用户需求:
- 音频时长 > 目标时长 → 加速播放
- 音频时长 <= 目标时长 → 保持原速(不慢放)
"""
if not os.path.exists(input_path):
return None
current_duration = float(get_audio_info(input_path).get("duration", 0))
if current_duration <= 0:
return input_path
# 只在音频过长时才加速,音频较短时保持原速
if current_duration <= target_duration:
# 音频时长 <= 目标时长,不需要调整,直接复制
import shutil
shutil.copy(input_path, output_path)
logger.info(f"Audio ({current_duration:.2f}s) <= target ({target_duration:.2f}s), keeping original speed")
return output_path
# 音频过长,需要加速
speed_ratio = current_duration / target_duration
# 限制加速范围 (最多2倍速),避免声音变调太严重
speed_ratio = min(speed_ratio, 2.0)
logger.info(f"Audio ({current_duration:.2f}s) > target ({target_duration:.2f}s), speeding up {speed_ratio:.2f}x")
cmd = [
FFMPEG_PATH, "-y",
"-i", input_path,
"-filter:a", f"atempo={speed_ratio}",
output_path
]
_run_ffmpeg(cmd)
return output_path
def _atempo_chain(speed: float) -> str:
"""
构造 atempo 链,支持 <0.5 或 >2.0 的倍速(通过链式 atempo
"""
try:
s = float(speed)
except Exception:
s = 1.0
if s <= 0:
s = 1.0
parts = []
# atempo 支持 0.5~2.0
while s > 2.0:
parts.append("atempo=2.0")
s /= 2.0
while s < 0.5:
parts.append("atempo=0.5")
s /= 0.5
parts.append(f"atempo={s}")
return ",".join(parts)
def change_audio_speed(input_path: str, speed: float, output_path: str) -> str:
"""改变音频播放倍速(纯播放倍速)。"""
if not os.path.exists(input_path):
return None
af = _atempo_chain(speed)
cmd = [FFMPEG_PATH, "-y", "-i", input_path, "-filter:a", af, output_path]
_run_ffmpeg(cmd)
return output_path
def fit_audio_to_duration_by_speed(input_path: str, target_duration: float, output_path: str) -> str:
"""
通过“改变播放倍速”来贴合目标时长(可快可慢),并裁剪/补齐到严格时长。
适用于旁白:用户拉伸片段期望语速变化,而不是静音补齐。
"""
if not os.path.exists(input_path):
return None
try:
td = float(target_duration or 0)
except Exception:
td = 0.0
if td <= 0:
import shutil
shutil.copy(input_path, output_path)
return output_path
cur = float(get_audio_info(input_path).get("duration") or 0.0)
if cur <= 0:
import shutil
shutil.copy(input_path, output_path)
return output_path
speed = cur / td
af_speed = _atempo_chain(speed)
# 贴合后仍做一次 atrim+apad 保证严格时长(避免累计误差)
af = f"{af_speed},atrim=0:{td},apad=pad_dur=0,atrim=0:{td}"
cmd = [FFMPEG_PATH, "-y", "-i", input_path, "-filter:a", af, output_path]
_run_ffmpeg(cmd)
return output_path
def force_audio_duration(input_path: str, target_duration: float, output_path: str) -> str:
"""不改变倍速,仅裁剪/补齐到严格时长(用于倍速已在上游完成的场景)。"""
if not os.path.exists(input_path):
return None
try:
td = float(target_duration or 0)
except Exception:
td = 0.0
if td <= 0:
import shutil
shutil.copy(input_path, output_path)
return output_path
af = f"atrim=0:{td},apad=pad_dur=0,atrim=0:{td}"
cmd = [FFMPEG_PATH, "-y", "-i", input_path, "-filter:a", af, output_path]
_run_ffmpeg(cmd)
return output_path
def _which(cmd: str) -> Optional[str]:
try:
import shutil
return shutil.which(cmd)
except Exception:
return None
def normalize_sticker_to_png(input_path: str, output_path: str) -> str:
"""
将贴纸规范化为 PNG用于 ffmpeg overlay
- PNG/WEBP直接返回原图或拷贝
- SVG优先用 rsvg-convert 转 PNG否则尝试 ffmpeg 直接解码
"""
if not input_path or not os.path.exists(input_path):
return None
ext = Path(input_path).suffix.lower()
if ext in [".png"]:
return input_path
if ext in [".webp"]:
# 转 PNG避免某些 ffmpeg build 对 webp 支持不一致
cmd = [FFMPEG_PATH, "-y", "-i", input_path, output_path]
_run_ffmpeg(cmd)
return output_path
if ext == ".svg":
rsvg = _which("rsvg-convert")
if rsvg:
import subprocess
subprocess.check_call([rsvg, "-o", output_path, input_path])
return output_path
# fallback: ffmpeg decode svg依赖 build
cmd = [FFMPEG_PATH, "-y", "-i", input_path, output_path]
_run_ffmpeg(cmd)
return output_path
# 其他格式:尽量用 ffmpeg 转
cmd = [FFMPEG_PATH, "-y", "-i", input_path, output_path]
_run_ffmpeg(cmd)
return output_path
def get_audio_info(file_path: str) -> Dict[str, Any]:
"""获取音频信息"""
return get_video_info(file_path)
def wrap_text_smart(text: str, max_chars: int = 15) -> str:
"""
智能字幕换行(上短下长策略)
"""
if not text or len(text) <= max_chars:
return text
# 优先在标点或空格处换行
split_chars = ["", "", "", "", " ", ",", ".", "!", "?"]
best_split = -1
# 寻找中间附近的分割点
mid = len(text) // 2
for i in range(len(text)):
if text[i] in split_chars:
# 偏好后半部分(上短下长)
if abs(i - mid) < abs(best_split - mid):
best_split = i
if best_split != -1 and best_split < len(text) - 1:
return text[:best_split+1] + "\n" + text[best_split+1:]
# 强制换行(上短下长)
split_idx = int(len(text) * 0.4) # 上面 40%
return text[:split_idx] + "\n" + text[split_idx:]
def add_multiple_subtitles(
video_path: str,
subtitles: List[Dict[str, Any]],
output_path: str,
default_style: Dict[str, Any] = None
) -> str:
"""
添加多条字幕
"""
if not subtitles:
# 无字幕直接复制
import shutil
shutil.copy(video_path, output_path)
return output_path
default_style = default_style or {}
# 使用统一的字体查找逻辑(跨平台兼容)
font = _get_font_path()
print(f"[SubDebug] Using font for subtitles: {font}", flush=True)
# 构建多个 drawtext filter
filters = []
for sub in subtitles:
raw_text = sub.get("text", "")
# 打印原始文本的 repr 和 hex以便排查特殊字符
print(f"[SubDebug] Subtitle text repr: {repr(raw_text)}", flush=True)
print(f"[SubDebug] Subtitle text hex: {' '.join(hex(ord(c)) for c in raw_text)}", flush=True)
text = _sanitize_text(raw_text)
# 自动换行
text = wrap_text(text)
start = sub.get("start", 0)
duration = sub.get("duration", 3)
style = {**default_style, **sub.get("style", {})}
fontsize = style.get("fontsize", 48)
fontcolor = style.get("fontcolor", "white")
borderw = style.get("borderw", 3)
bordercolor = style.get("bordercolor", "black")
base_y = style.get("y", "h-200")
# 默认启用背景框以提高可读性
box = style.get("box", 1)
boxcolor = style.get("boxcolor", "black@0.5")
boxborderw = style.get("boxborderw", 10)
# 多行字幕:拆分成多个 drawtext 滤镜,每行单独居中
lines = text.split("\n") if "\n" in text else [text]
line_height = int(fontsize * 1.3) # 行高
for line_idx, line in enumerate(lines):
if not line.strip():
continue
# 转义:反斜杠、单引号、冒号、百分号
escaped_line = line.replace("\\", "\\\\").replace("'", "\\'").replace(":", "\\:").replace("%", "\\%")
# 计算每行的 y 位置(从底部往上排列)
# base_y 是最后一行的位置,往上依次排列
line_offset = (len(lines) - 1 - line_idx) * line_height
if isinstance(base_y, str) and base_y.startswith("h-"):
y_expr = f"({base_y})-{line_offset}"
else:
y_expr = f"({base_y})-{line_offset}"
drawtext = (
f"drawtext=text='{escaped_line}':"
f"fontfile='{font}':"
f"fontsize={fontsize}:"
f"fontcolor={fontcolor}:"
f"borderw={borderw}:"
f"bordercolor={bordercolor}:"
f"box={box}:boxcolor={boxcolor}:boxborderw={boxborderw}:"
f"x=(w-text_w)/2:y={y_expr}:" # 每行都水平居中
f"enable='between(t,{start},{start + duration})'"
)
filters.append(drawtext)
# 用逗号连接多个 filter
vf = ",".join(filters)
cmd = [
FFMPEG_PATH, "-y",
"-i", video_path,
"-vf", vf,
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-c:a", "copy",
"-pix_fmt", "yuv420p",
output_path
]
_run_ffmpeg(cmd)
logger.info(f"Added {len(subtitles)} subtitles")
return output_path
def overlay_image(
video_path: str,
image_path: str,
output_path: str,
position: Tuple[int, int] = None,
start: float = 0,
duration: float = None,
fade_in: float = 0,
fade_out: float = 0
) -> str:
"""
叠加透明PNG图片花字、水印等到视频
Args:
video_path: 输入视频路径
image_path: PNG图片路径支持透明通道
output_path: 输出路径
position: (x, y) 位置None则居中
start: 开始时间(秒)
duration: 持续时间None则到视频结束
fade_in: 淡入时间(秒)
fade_out: 淡出时间(秒)
Returns:
输出文件路径
"""
# 获取视频信息
info = get_video_info(video_path)
video_duration = info["duration"]
if duration is None:
duration = video_duration - start
# 位置
if position:
x, y = position
pos_str = f"x={x}:y={y}"
else:
pos_str = "x=(W-w)/2:y=(H-h)/2" # 居中
# 时间控制
enable = f"enable='between(t,{start},{start + duration})'"
# 构建 overlay filter
overlay_filter = f"overlay={pos_str}:{enable}"
# 添加淡入淡出效果
if fade_in > 0 or fade_out > 0:
fade_filter = []
if fade_in > 0:
fade_filter.append(f"fade=t=in:st={start}:d={fade_in}:alpha=1")
if fade_out > 0:
fade_out_start = start + duration - fade_out
fade_filter.append(f"fade=t=out:st={fade_out_start}:d={fade_out}:alpha=1")
img_filter = ",".join(fade_filter) if fade_filter else ""
filter_complex = f"[1:v]{img_filter}[img];[0:v][img]{overlay_filter}[outv]"
else:
filter_complex = f"[0:v][1:v]{overlay_filter}[outv]"
cmd = [
FFMPEG_PATH, "-y",
"-i", video_path,
"-i", image_path,
"-filter_complex", filter_complex,
"-map", "[outv]",
"-map", "0:a?",
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-c:a", "copy",
"-pix_fmt", "yuv420p",
output_path
]
_run_ffmpeg(cmd)
logger.info(f"Overlaid image at {position or 'center'}, {start}s-{start+duration}s")
return output_path
def overlay_multiple_images(
video_path: str,
images: List[Dict[str, Any]],
output_path: str
) -> str:
"""
叠加多个透明PNG图片
Args:
video_path: 输入视频路径
images: 图片配置列表 [{path, x, y, start, duration}]
output_path: 输出路径
Returns:
输出文件路径
"""
if not images:
import shutil
shutil.copy(video_path, output_path)
return output_path
# 构建复杂 filter_complex
inputs = ["-i", video_path]
for img in images:
inputs.extend(["-i", img["path"]])
# 链式 overlay
filter_parts = []
prev_output = "0:v"
for i, img in enumerate(images):
x = img.get("x", "(W-w)/2")
y = img.get("y", "(H-h)/2")
start = img.get("start", 0)
duration = img.get("duration", 999)
enable = f"enable='between(t,{start},{start + duration})'"
if i == len(images) - 1:
out_label = "outv"
else:
out_label = f"tmp{i}"
filter_parts.append(
f"[{prev_output}][{i+1}:v]overlay=x={x}:y={y}:{enable}[{out_label}]"
)
prev_output = out_label
filter_complex = ";".join(filter_parts)
cmd = [FFMPEG_PATH, "-y"] + inputs + [
"-filter_complex", filter_complex,
"-map", "[outv]",
"-map", "0:a?",
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-c:a", "copy",
"-pix_fmt", "yuv420p",
output_path
]
_run_ffmpeg(cmd)
logger.info(f"Overlaid {len(images)} images")
return output_path
def mix_audio(
video_path: str,
audio_path: str,
output_path: str,
audio_volume: float = 1.0,
video_volume: float = 0.1,
audio_start: float = 0
) -> str:
"""
混合音频到视频旁白、BGM等
Args:
video_path: 输入视频路径
audio_path: 音频文件路径
output_path: 输出路径
audio_volume: 新音频音量0-1
video_volume: 原视频音量0-1
audio_start: 音频开始时间(秒)
Returns:
输出文件路径
"""
logger.info(f"Mixing audio: {audio_path}")
# 检查视频是否有音频轨道
info = get_video_info(video_path)
video_duration = info["duration"]
# 构建 filter_complex
# adelay 用于延迟音频开始时间(毫秒)
delay_ms = int(audio_start * 1000)
filter_complex = (
f"[0:a]volume={video_volume}[va];"
f"[1:a]adelay={delay_ms}|{delay_ms},volume={audio_volume}[aa];"
f"[va][aa]amix=inputs=2:duration=longest:dropout_transition=0:normalize=0[outa]"
)
cmd = [
FFMPEG_PATH, "-y",
"-i", video_path,
"-i", audio_path,
"-filter_complex", filter_complex,
"-map", "0:v",
"-map", "[outa]",
"-c:v", "copy",
"-c:a", "aac",
"-b:a", "192k",
output_path
]
try:
_run_ffmpeg(cmd)
except subprocess.CalledProcessError:
# 如果原视频没有音频,直接添加新音频
logger.warning("Video has no audio track, adding audio directly")
cmd = [
FFMPEG_PATH, "-y",
"-i", video_path,
"-i", audio_path,
"-map", "0:v",
"-map", "1:a",
"-c:v", "copy",
"-c:a", "aac",
"-b:a", "192k",
output_path
]
_run_ffmpeg(cmd)
logger.info(f"Audio mixed: {output_path}")
return output_path
def add_bgm(
video_path: str,
bgm_path: str,
output_path: str,
bgm_volume: float = 0.06,
loop: bool = True,
ducking: bool = True,
duck_gain_db: float = -6.0,
# 新增:按时间段闪避(更可控,和旁白时间轴严格对齐)
duck_volume: float = 0.25,
duck_ranges: Optional[List[Tuple[float, float]]] = None,
# 新增BGM 片段可有起点/时长(不强制从 0 覆盖整段视频)
start_time: float = 0.0,
clip_duration: Optional[float] = None,
fade_in: float = 1.0,
fade_out: float = 1.0
) -> str:
"""
添加背景音乐(自动循环以匹配视频长度)
Args:
video_path: 输入视频路径
bgm_path: BGM文件路径
output_path: 输出路径
bgm_volume: BGM音量
loop: 是否循环BGM
"""
# 验证 BGM 文件存在(默认保持兼容:仍会输出视频,但会明确打日志)
if not bgm_path or not os.path.exists(bgm_path):
logger.error(f"BGM file not found (skip add_bgm): {bgm_path}")
# 直接复制原视频,不添加 BGM上层应当提示用户/写入 metadata
import shutil
shutil.copy(video_path, output_path)
return output_path
logger.info(f"Adding BGM: {bgm_path} (volume={bgm_volume})")
info = get_video_info(video_path)
video_duration = info["duration"]
# 片段时长:默认覆盖整段视频
dur = float(clip_duration) if (clip_duration is not None and float(clip_duration) > 0) else float(video_duration)
st = max(0.0, float(start_time or 0.0))
end_for_fade = max(dur - float(fade_out or 0.0), 0.0)
# 基础链loop/trim -> fades -> base volume
if loop:
bgm_chain = f"[1:a]aloop=-1:size=2e+09,asetpts=N/SR/TB,atrim=0:{dur}"
else:
bgm_chain = f"[1:a]atrim=0:{dur}"
bgm_chain += f",afade=t=in:st=0:d={float(fade_in or 0.0)},afade=t=out:st={end_for_fade}:d={float(fade_out or 0.0)},volume={bgm_volume}"
# 延迟到 start_time
if st > 1e-6:
ms = int(st * 1000)
bgm_chain += f",adelay={ms}|{ms}"
# 闪避(按时间段)
# 注意:使用 enable 让 filter 只在区间内生效(外部直接 passthrough
if ducking and duck_ranges:
dv = max(0.05, min(1.0, float(duck_volume or 0.25)))
for (rs, re) in duck_ranges:
rsf = max(0.0, float(rs))
ref = max(rsf, float(re))
bgm_chain += f",volume={dv}:enable='between(t,{rsf},{ref})'"
bgm_chain += "[bgm]"
# 如果提供了 duck_ranges就用确定性的 amixducking 已在 bgm_chain 内完成)
if ducking and duck_ranges:
filter_complex = f"{bgm_chain};[0:a][bgm]amix=inputs=2:duration=first:dropout_transition=0:normalize=0[outa]"
elif ducking:
# 否则退回 sidechaincompress对原视频音频进行侧链压缩
filter_complex = (
f"{bgm_chain};"
f"[0:a][bgm]sidechaincompress=threshold=0.1:ratio=4:attack=5:release=250:makeup=1:mix=1:level_in=1:level_sc=1[outa]"
)
else:
filter_complex = f"{bgm_chain};[0:a][bgm]amix=inputs=2:duration=first:dropout_transition=0:normalize=0[outa]"
cmd = [
FFMPEG_PATH, "-y",
"-i", video_path,
"-stream_loop", "-1" if loop else "0",
"-i", bgm_path,
"-filter_complex", filter_complex,
"-map", "0:v",
"-map", "[outa]",
"-c:v", "copy",
"-c:a", "aac",
"-b:a", "192k",
"-t", str(video_duration),
output_path
]
try:
_run_ffmpeg(cmd)
except subprocess.CalledProcessError:
# sidechain失败时回退为 amix保留原有音频 + 低音量BGM
logger.warning("Sidechain failed, fallback to simple amix for BGM")
filter_complex = f"{bgm_chain};[0:a][bgm]amix=inputs=2:duration=first[outa]"
cmd = [
FFMPEG_PATH, "-y",
"-i", video_path,
"-stream_loop", "-1" if loop else "0",
"-i", bgm_path,
"-filter_complex", filter_complex,
"-map", "0:v",
"-map", "[outa]",
"-c:v", "copy",
"-c:a", "aac",
"-b:a", "192k",
"-t", str(video_duration),
output_path
]
_run_ffmpeg(cmd)
logger.info(f"BGM added: {output_path}")
return output_path
def trim_video(
video_path: str,
output_path: str,
start: float = 0,
duration: float = None,
end: float = None
) -> str:
"""
裁剪视频
Args:
video_path: 输入视频路径
output_path: 输出路径
start: 开始时间(秒)
duration: 持续时间(秒)
end: 结束时间(秒),与 duration 二选一
"""
cmd = [
FFMPEG_PATH, "-y",
"-i", video_path,
"-ss", str(start)
]
if duration:
cmd.extend(["-t", str(duration)])
elif end:
cmd.extend(["-to", str(end)])
cmd.extend([
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-c:a", "copy",
output_path
])
_run_ffmpeg(cmd)
logger.info(f"Trimmed video: {start}s - {end or start + duration}s")
return output_path
def speed_up_video(
video_path: str,
output_path: str,
speed: float = 1.5
) -> str:
"""
加速/减速视频
Args:
video_path: 输入视频路径
output_path: 输出路径
speed: 速度倍率(>1 加速,<1 减速)
"""
# setpts 控制视频速度atempo 控制音频速度
video_filter = f"setpts={1/speed}*PTS"
# atempo 只支持 0.5-2.0,超出需要链式处理
if speed > 2.0:
audio_filter = "atempo=2.0,atempo=" + str(speed / 2.0)
elif speed < 0.5:
audio_filter = "atempo=0.5,atempo=" + str(speed / 0.5)
else:
audio_filter = f"atempo={speed}"
cmd = [
FFMPEG_PATH, "-y",
"-i", video_path,
"-vf", video_filter,
"-af", audio_filter,
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-c:a", "aac",
output_path
]
_run_ffmpeg(cmd)
logger.info(f"Speed changed to {speed}x: {output_path}")
return output_path