1005 lines
29 KiB
Python
1005 lines
29 KiB
Python
"""
|
||
FFmpeg 视频处理工具模块
|
||
支持规模化批量视频处理:拼接、字幕、叠加、混音
|
||
"""
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import tempfile
|
||
import logging
|
||
import shutil
|
||
from pathlib import Path
|
||
from typing import List, Dict, Any, Optional, Tuple
|
||
|
||
import config
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def _pick_exec(preferred_path: str, fallback_name: str) -> str:
|
||
"""
|
||
Pick an executable path.
|
||
|
||
Why:
|
||
- In docker, /app/bin may accidentally contain binaries built for another OS/arch,
|
||
causing `Exec format error` at runtime (seen on /app/bin/ffprobe).
|
||
Strategy:
|
||
- Prefer preferred_path if it exists AND is runnable.
|
||
- Otherwise fall back to PATH-resolved command (fallback_name).
|
||
"""
|
||
if preferred_path and os.path.exists(preferred_path):
|
||
try:
|
||
# Validate it can be executed (arch OK) and is a real binary.
|
||
# ffmpeg/ffprobe both support `-version`.
|
||
result = subprocess.run(
|
||
[preferred_path, "-version"],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
if result.returncode == 0:
|
||
return preferred_path
|
||
except OSError:
|
||
# Exec format error / permission error -> fall back
|
||
pass
|
||
|
||
resolved = shutil.which(fallback_name)
|
||
return resolved or fallback_name
|
||
|
||
|
||
# FFmpeg/FFprobe 路径(优先使用项目内的二进制,但会做可执行性自检)
|
||
FFMPEG_PATH = _pick_exec(str(config.BASE_DIR / "bin" / "ffmpeg"), "ffmpeg")
|
||
FFPROBE_PATH = _pick_exec(str(config.BASE_DIR / "bin" / "ffprobe"), "ffprobe")
|
||
|
||
# 字体路径:优先使用项目内置字体,然后按平台回退到系统字体
|
||
DEFAULT_FONT_PATHS = [
|
||
# 优先使用项目内置字体 (跨平台通用)
|
||
str(config.FONTS_DIR / "NotoSansSC-Regular.otf"),
|
||
str(config.FONTS_DIR / "HarmonyOS-Sans-SC-Regular.ttf"),
|
||
str(config.FONTS_DIR / "AlibabaPuHuiTi-Regular.ttf"),
|
||
|
||
# Linux 系统字体
|
||
"/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf",
|
||
"/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
|
||
"/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
|
||
|
||
# macOS 系统字体
|
||
"/System/Library/Fonts/PingFang.ttc",
|
||
"/System/Library/Fonts/STHeiti Medium.ttc",
|
||
|
||
# Windows 系统字体
|
||
"C:/Windows/Fonts/msyh.ttc",
|
||
"C:/Windows/Fonts/simhei.ttf",
|
||
]
|
||
|
||
|
||
def _get_font_path() -> str:
|
||
for p in DEFAULT_FONT_PATHS:
|
||
if os.path.exists(p) and os.path.getsize(p) > 1000:
|
||
return p
|
||
return "Arial" # 极端情况下退回英文字体,避免崩溃
|
||
|
||
|
||
def _sanitize_text(text: str) -> str:
|
||
"""
|
||
去除可能导致 ffmpeg 命令行错误的特殊控制字符,但保留 Emoji、数字、标点和各国语言。
|
||
"""
|
||
if not text:
|
||
return ""
|
||
|
||
# 不再过滤任何字符,只确保不是 None
|
||
return text
|
||
|
||
|
||
def add_silence_audio(video_path: str, output_path: str) -> str:
|
||
"""
|
||
给无音轨的视频补一条静音轨(立体声 44.1k),避免后续 filter 找不到 0:a
|
||
"""
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", video_path,
|
||
"-f", "lavfi",
|
||
"-i", "anullsrc=channel_layout=stereo:sample_rate=44100",
|
||
"-shortest",
|
||
"-c:v", "copy",
|
||
"-c:a", "aac",
|
||
output_path
|
||
]
|
||
_run_ffmpeg(cmd)
|
||
return output_path
|
||
|
||
|
||
def _run_ffmpeg(cmd: List[str], check: bool = True) -> subprocess.CompletedProcess:
|
||
"""执行 FFmpeg 命令"""
|
||
logger.debug(f"FFmpeg command: {' '.join(cmd)}")
|
||
try:
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
check=check
|
||
)
|
||
# 无论成功失败,输出 stderr 以便排查字体等警告
|
||
if result.stderr:
|
||
print(f"[FFmpeg stderr] {result.stderr}", flush=True)
|
||
if result.returncode != 0:
|
||
logger.error(f"FFmpeg stderr: {result.stderr}")
|
||
return result
|
||
except subprocess.CalledProcessError as e:
|
||
logger.error(f"FFmpeg failed: {e.stderr}")
|
||
raise
|
||
|
||
|
||
def get_video_info(video_path: str) -> Dict[str, Any]:
|
||
"""获取视频信息(时长、分辨率、帧率等)"""
|
||
cmd = [
|
||
FFPROBE_PATH,
|
||
"-v", "quiet",
|
||
"-print_format", "json",
|
||
"-show_format",
|
||
"-show_streams",
|
||
video_path
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
if result.returncode != 0:
|
||
raise ValueError(f"Failed to probe video: {video_path}")
|
||
|
||
import json
|
||
data = json.loads(result.stdout)
|
||
|
||
# 提取关键信息
|
||
info = {
|
||
"duration": float(data.get("format", {}).get("duration", 0)),
|
||
"width": 0,
|
||
"height": 0,
|
||
"fps": 30
|
||
}
|
||
|
||
for stream in data.get("streams", []):
|
||
if stream.get("codec_type") == "video":
|
||
info["width"] = stream.get("width", 0)
|
||
info["height"] = stream.get("height", 0)
|
||
# 解析帧率 (如 "30/1" 或 "29.97")
|
||
fps_str = stream.get("r_frame_rate", "30/1")
|
||
if "/" in fps_str:
|
||
num, den = fps_str.split("/")
|
||
info["fps"] = float(num) / float(den) if float(den) != 0 else 30
|
||
else:
|
||
info["fps"] = float(fps_str)
|
||
break
|
||
|
||
return info
|
||
|
||
|
||
def concat_videos(
|
||
video_paths: List[str],
|
||
output_path: str,
|
||
target_size: Tuple[int, int] = (1080, 1920)
|
||
) -> str:
|
||
"""
|
||
使用 FFmpeg concat demuxer 拼接多段视频
|
||
|
||
Args:
|
||
video_paths: 视频文件路径列表
|
||
output_path: 输出文件路径
|
||
target_size: 目标分辨率 (width, height),默认竖屏 1080x1920
|
||
|
||
Returns:
|
||
输出文件路径
|
||
"""
|
||
if not video_paths:
|
||
raise ValueError("No video paths provided")
|
||
|
||
logger.info(f"Concatenating {len(video_paths)} videos...")
|
||
|
||
width, height = target_size
|
||
|
||
# 使用 filter_complex 统一分辨率后拼接
|
||
# 每个视频先 scale + pad 到目标尺寸
|
||
filter_parts = []
|
||
for i in range(len(video_paths)):
|
||
# scale 保持宽高比,pad 填充黑边居中
|
||
filter_parts.append(
|
||
f"[{i}:v]scale={width}:{height}:force_original_aspect_ratio=decrease,"
|
||
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black,setsar=1[v{i}]"
|
||
)
|
||
|
||
# 拼接所有视频流
|
||
concat_inputs = "".join([f"[v{i}]" for i in range(len(video_paths))])
|
||
filter_parts.append(f"{concat_inputs}concat=n={len(video_paths)}:v=1:a=0[outv]")
|
||
|
||
filter_complex = ";".join(filter_parts)
|
||
|
||
# 构建 ffmpeg 命令
|
||
cmd = [FFMPEG_PATH, "-y"]
|
||
for vp in video_paths:
|
||
cmd.extend(["-i", vp])
|
||
|
||
cmd.extend([
|
||
"-filter_complex", filter_complex,
|
||
"-map", "[outv]",
|
||
"-c:v", "libx264",
|
||
"-preset", "fast",
|
||
"-crf", "23",
|
||
"-pix_fmt", "yuv420p",
|
||
output_path
|
||
])
|
||
|
||
_run_ffmpeg(cmd)
|
||
|
||
logger.info(f"Concatenated video saved: {output_path}")
|
||
return output_path
|
||
|
||
|
||
def concat_videos_with_audio(
|
||
video_paths: List[str],
|
||
output_path: str,
|
||
target_size: Tuple[int, int] = (1080, 1920)
|
||
) -> str:
|
||
"""
|
||
拼接视频并保留音频轨道
|
||
"""
|
||
if not video_paths:
|
||
raise ValueError("No video paths provided")
|
||
|
||
logger.info(f"Concatenating {len(video_paths)} videos with audio...")
|
||
|
||
width, height = target_size
|
||
n = len(video_paths)
|
||
|
||
# 构建 filter_complex
|
||
filter_parts = []
|
||
|
||
# 视频处理
|
||
for i in range(n):
|
||
filter_parts.append(
|
||
f"[{i}:v]scale={width}:{height}:force_original_aspect_ratio=decrease,"
|
||
f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black,setsar=1[v{i}]"
|
||
)
|
||
|
||
# 音频处理(静音填充如果没有音频)
|
||
for i in range(n):
|
||
filter_parts.append(f"[{i}:a]aformat=sample_rates=44100:channel_layouts=stereo[a{i}]")
|
||
|
||
# 拼接
|
||
v_concat = "".join([f"[v{i}]" for i in range(n)])
|
||
a_concat = "".join([f"[a{i}]" for i in range(n)])
|
||
filter_parts.append(f"{v_concat}concat=n={n}:v=1:a=0[outv]")
|
||
filter_parts.append(f"{a_concat}concat=n={n}:v=0:a=1[outa]")
|
||
|
||
filter_complex = ";".join(filter_parts)
|
||
|
||
cmd = [FFMPEG_PATH, "-y"]
|
||
for vp in video_paths:
|
||
cmd.extend(["-i", vp])
|
||
|
||
cmd.extend([
|
||
"-filter_complex", filter_complex,
|
||
"-map", "[outv]",
|
||
"-map", "[outa]",
|
||
"-c:v", "libx264",
|
||
"-preset", "fast",
|
||
"-crf", "23",
|
||
"-c:a", "aac",
|
||
"-b:a", "128k",
|
||
"-pix_fmt", "yuv420p",
|
||
output_path
|
||
])
|
||
|
||
try:
|
||
_run_ffmpeg(cmd)
|
||
except subprocess.CalledProcessError:
|
||
# 如果音频拼接失败,回退到无音频版本
|
||
logger.warning("Audio concat failed, falling back to video only")
|
||
return concat_videos(video_paths, output_path, target_size)
|
||
|
||
logger.info(f"Concatenated video with audio saved: {output_path}")
|
||
return output_path
|
||
|
||
|
||
def add_subtitle(
|
||
video_path: str,
|
||
text: str,
|
||
start: float,
|
||
duration: float,
|
||
output_path: str,
|
||
style: Dict[str, Any] = None
|
||
) -> str:
|
||
"""
|
||
使用 drawtext filter 添加单条字幕
|
||
|
||
Args:
|
||
video_path: 输入视频路径
|
||
text: 字幕文本
|
||
start: 开始时间(秒)
|
||
duration: 持续时间(秒)
|
||
output_path: 输出路径
|
||
style: 样式配置 {
|
||
fontsize: 字体大小,
|
||
fontcolor: 字体颜色,
|
||
borderw: 描边宽度,
|
||
bordercolor: 描边颜色,
|
||
x: x位置 (可用表达式如 "(w-text_w)/2"),
|
||
y: y位置,
|
||
font: 字体路径或名称
|
||
}
|
||
|
||
Returns:
|
||
输出文件路径
|
||
"""
|
||
style = style or {}
|
||
|
||
# 默认样式
|
||
fontsize = style.get("fontsize", 48)
|
||
fontcolor = style.get("fontcolor", "white")
|
||
borderw = style.get("borderw", 3)
|
||
bordercolor = style.get("bordercolor", "black")
|
||
x = style.get("x", "(w-text_w)/2") # 默认水平居中
|
||
y = style.get("y", "h-200") # 默认底部偏上
|
||
|
||
# 优先使用动态检测到的有效字体,而不是硬编码的可能损坏的路径
|
||
default_font_path = _get_font_path()
|
||
font = style.get("font", default_font_path)
|
||
|
||
# 转义特殊字符
|
||
escaped_text = text.replace("'", "\\'").replace(":", "\\:")
|
||
|
||
# drawtext filter
|
||
drawtext = (
|
||
f"drawtext=text='{escaped_text}':"
|
||
f"fontfile='{font}':"
|
||
f"fontsize={fontsize}:"
|
||
f"fontcolor={fontcolor}:"
|
||
f"borderw={borderw}:"
|
||
f"bordercolor={bordercolor}:"
|
||
f"x={x}:y={y}:"
|
||
f"enable='between(t,{start},{start + duration})'"
|
||
)
|
||
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", video_path,
|
||
"-vf", drawtext,
|
||
"-c:v", "libx264",
|
||
"-preset", "fast",
|
||
"-crf", "23",
|
||
"-c:a", "copy",
|
||
"-pix_fmt", "yuv420p",
|
||
output_path
|
||
]
|
||
|
||
_run_ffmpeg(cmd)
|
||
logger.info(f"Added subtitle: '{text[:20]}...' at {start}s")
|
||
return output_path
|
||
|
||
|
||
def wrap_text(text: str, max_chars: int = 18) -> str:
|
||
"""
|
||
简单的文本换行处理
|
||
"""
|
||
if not text: return ""
|
||
|
||
# 如果已经有换行符,假设用户已经手动处理
|
||
if "\n" in text:
|
||
return text
|
||
|
||
result = ""
|
||
count = 0
|
||
for char in text:
|
||
if count >= max_chars:
|
||
result += "\n"
|
||
count = 0
|
||
result += char
|
||
# 简单估算:中文算1个,英文也算1个(等宽字体)
|
||
# 实际上中英文混合较复杂,这里简化处理
|
||
count += 1
|
||
return result
|
||
|
||
|
||
def mix_audio_at_offset(
|
||
base_audio: str,
|
||
overlay_audio: str,
|
||
offset: float,
|
||
output_path: str,
|
||
base_volume: float = 1.0,
|
||
overlay_volume: float = 1.0
|
||
) -> str:
|
||
"""
|
||
在指定偏移位置混合音频
|
||
"""
|
||
# 如果 base_audio 不存在,创建一个静音底
|
||
if not os.path.exists(base_audio):
|
||
logger.warning(f"Base audio not found: {base_audio}")
|
||
return overlay_audio
|
||
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", base_audio,
|
||
"-i", overlay_audio,
|
||
"-filter_complex",
|
||
f"[0:a]volume={base_volume}[a0];[1:a]volume={overlay_volume},adelay={int(offset*1000)}|{int(offset*1000)}[a1];[a0][a1]amix=inputs=2:duration=first:dropout_transition=0:normalize=0[out]",
|
||
"-map", "[out]",
|
||
"-c:a", "mp3", # Use MP3 for audio only mixing
|
||
output_path
|
||
]
|
||
_run_ffmpeg(cmd)
|
||
return output_path
|
||
|
||
|
||
def adjust_audio_duration(
|
||
input_path: str,
|
||
target_duration: float,
|
||
output_path: str
|
||
) -> str:
|
||
"""
|
||
调整音频时长(仅在音频过长时加速,音频较短时保持原速)
|
||
|
||
用户需求:
|
||
- 音频时长 > 目标时长 → 加速播放
|
||
- 音频时长 <= 目标时长 → 保持原速(不慢放)
|
||
"""
|
||
if not os.path.exists(input_path):
|
||
return None
|
||
|
||
current_duration = float(get_audio_info(input_path).get("duration", 0))
|
||
if current_duration <= 0:
|
||
return input_path
|
||
|
||
# 只在音频过长时才加速,音频较短时保持原速
|
||
if current_duration <= target_duration:
|
||
# 音频时长 <= 目标时长,不需要调整,直接复制
|
||
import shutil
|
||
shutil.copy(input_path, output_path)
|
||
logger.info(f"Audio ({current_duration:.2f}s) <= target ({target_duration:.2f}s), keeping original speed")
|
||
return output_path
|
||
|
||
# 音频过长,需要加速
|
||
speed_ratio = current_duration / target_duration
|
||
|
||
# 限制加速范围 (最多2倍速),避免声音变调太严重
|
||
speed_ratio = min(speed_ratio, 2.0)
|
||
|
||
logger.info(f"Audio ({current_duration:.2f}s) > target ({target_duration:.2f}s), speeding up {speed_ratio:.2f}x")
|
||
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", input_path,
|
||
"-filter:a", f"atempo={speed_ratio}",
|
||
output_path
|
||
]
|
||
_run_ffmpeg(cmd)
|
||
return output_path
|
||
|
||
|
||
def get_audio_info(file_path: str) -> Dict[str, Any]:
|
||
"""获取音频信息"""
|
||
return get_video_info(file_path)
|
||
|
||
|
||
def wrap_text_smart(text: str, max_chars: int = 15) -> str:
|
||
"""
|
||
智能字幕换行(上短下长策略)
|
||
"""
|
||
if not text or len(text) <= max_chars:
|
||
return text
|
||
|
||
# 优先在标点或空格处换行
|
||
split_chars = [",", "。", "!", "?", " ", ",", ".", "!", "?"]
|
||
best_split = -1
|
||
|
||
# 寻找中间附近的分割点
|
||
mid = len(text) // 2
|
||
|
||
for i in range(len(text)):
|
||
if text[i] in split_chars:
|
||
# 偏好后半部分(上短下长)
|
||
if abs(i - mid) < abs(best_split - mid):
|
||
best_split = i
|
||
|
||
if best_split != -1 and best_split < len(text) - 1:
|
||
return text[:best_split+1] + "\n" + text[best_split+1:]
|
||
|
||
# 强制换行(上短下长)
|
||
split_idx = int(len(text) * 0.4) # 上面 40%
|
||
return text[:split_idx] + "\n" + text[split_idx:]
|
||
|
||
|
||
def add_multiple_subtitles(
|
||
video_path: str,
|
||
subtitles: List[Dict[str, Any]],
|
||
output_path: str,
|
||
default_style: Dict[str, Any] = None
|
||
) -> str:
|
||
"""
|
||
添加多条字幕
|
||
"""
|
||
if not subtitles:
|
||
# 无字幕直接复制
|
||
import shutil
|
||
shutil.copy(video_path, output_path)
|
||
return output_path
|
||
|
||
default_style = default_style or {}
|
||
# 使用统一的字体查找逻辑(跨平台兼容)
|
||
font = _get_font_path()
|
||
|
||
print(f"[SubDebug] Using font for subtitles: {font}", flush=True)
|
||
|
||
# 构建多个 drawtext filter
|
||
filters = []
|
||
for sub in subtitles:
|
||
raw_text = sub.get("text", "")
|
||
# 打印原始文本的 repr 和 hex,以便排查特殊字符
|
||
print(f"[SubDebug] Subtitle text repr: {repr(raw_text)}", flush=True)
|
||
print(f"[SubDebug] Subtitle text hex: {' '.join(hex(ord(c)) for c in raw_text)}", flush=True)
|
||
|
||
text = _sanitize_text(raw_text)
|
||
# 自动换行
|
||
text = wrap_text(text)
|
||
|
||
start = sub.get("start", 0)
|
||
duration = sub.get("duration", 3)
|
||
style = {**default_style, **sub.get("style", {})}
|
||
|
||
fontsize = style.get("fontsize", 48)
|
||
fontcolor = style.get("fontcolor", "white")
|
||
borderw = style.get("borderw", 3)
|
||
bordercolor = style.get("bordercolor", "black")
|
||
base_y = style.get("y", "h-200")
|
||
|
||
# 默认启用背景框以提高可读性
|
||
box = style.get("box", 1)
|
||
boxcolor = style.get("boxcolor", "black@0.5")
|
||
boxborderw = style.get("boxborderw", 10)
|
||
|
||
# 多行字幕:拆分成多个 drawtext 滤镜,每行单独居中
|
||
lines = text.split("\n") if "\n" in text else [text]
|
||
line_height = int(fontsize * 1.3) # 行高
|
||
|
||
for line_idx, line in enumerate(lines):
|
||
if not line.strip():
|
||
continue
|
||
|
||
# 转义:反斜杠、单引号、冒号、百分号
|
||
escaped_line = line.replace("\\", "\\\\").replace("'", "\\'").replace(":", "\\:").replace("%", "\\%")
|
||
|
||
# 计算每行的 y 位置(从底部往上排列)
|
||
# base_y 是最后一行的位置,往上依次排列
|
||
line_offset = (len(lines) - 1 - line_idx) * line_height
|
||
if isinstance(base_y, str) and base_y.startswith("h-"):
|
||
y_expr = f"({base_y})-{line_offset}"
|
||
else:
|
||
y_expr = f"({base_y})-{line_offset}"
|
||
|
||
drawtext = (
|
||
f"drawtext=text='{escaped_line}':"
|
||
f"fontfile='{font}':"
|
||
f"fontsize={fontsize}:"
|
||
f"fontcolor={fontcolor}:"
|
||
f"borderw={borderw}:"
|
||
f"bordercolor={bordercolor}:"
|
||
f"box={box}:boxcolor={boxcolor}:boxborderw={boxborderw}:"
|
||
f"x=(w-text_w)/2:y={y_expr}:" # 每行都水平居中
|
||
f"enable='between(t,{start},{start + duration})'"
|
||
)
|
||
filters.append(drawtext)
|
||
|
||
# 用逗号连接多个 filter
|
||
vf = ",".join(filters)
|
||
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", video_path,
|
||
"-vf", vf,
|
||
"-c:v", "libx264",
|
||
"-preset", "fast",
|
||
"-crf", "23",
|
||
"-c:a", "copy",
|
||
"-pix_fmt", "yuv420p",
|
||
output_path
|
||
]
|
||
|
||
_run_ffmpeg(cmd)
|
||
logger.info(f"Added {len(subtitles)} subtitles")
|
||
return output_path
|
||
|
||
|
||
def overlay_image(
|
||
video_path: str,
|
||
image_path: str,
|
||
output_path: str,
|
||
position: Tuple[int, int] = None,
|
||
start: float = 0,
|
||
duration: float = None,
|
||
fade_in: float = 0,
|
||
fade_out: float = 0
|
||
) -> str:
|
||
"""
|
||
叠加透明PNG图片(花字、水印等)到视频
|
||
|
||
Args:
|
||
video_path: 输入视频路径
|
||
image_path: PNG图片路径(支持透明通道)
|
||
output_path: 输出路径
|
||
position: (x, y) 位置,None则居中
|
||
start: 开始时间(秒)
|
||
duration: 持续时间(秒),None则到视频结束
|
||
fade_in: 淡入时间(秒)
|
||
fade_out: 淡出时间(秒)
|
||
|
||
Returns:
|
||
输出文件路径
|
||
"""
|
||
# 获取视频信息
|
||
info = get_video_info(video_path)
|
||
video_duration = info["duration"]
|
||
|
||
if duration is None:
|
||
duration = video_duration - start
|
||
|
||
# 位置
|
||
if position:
|
||
x, y = position
|
||
pos_str = f"x={x}:y={y}"
|
||
else:
|
||
pos_str = "x=(W-w)/2:y=(H-h)/2" # 居中
|
||
|
||
# 时间控制
|
||
enable = f"enable='between(t,{start},{start + duration})'"
|
||
|
||
# 构建 overlay filter
|
||
overlay_filter = f"overlay={pos_str}:{enable}"
|
||
|
||
# 添加淡入淡出效果
|
||
if fade_in > 0 or fade_out > 0:
|
||
fade_filter = []
|
||
if fade_in > 0:
|
||
fade_filter.append(f"fade=t=in:st={start}:d={fade_in}:alpha=1")
|
||
if fade_out > 0:
|
||
fade_out_start = start + duration - fade_out
|
||
fade_filter.append(f"fade=t=out:st={fade_out_start}:d={fade_out}:alpha=1")
|
||
|
||
img_filter = ",".join(fade_filter) if fade_filter else ""
|
||
filter_complex = f"[1:v]{img_filter}[img];[0:v][img]{overlay_filter}[outv]"
|
||
else:
|
||
filter_complex = f"[0:v][1:v]{overlay_filter}[outv]"
|
||
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", video_path,
|
||
"-i", image_path,
|
||
"-filter_complex", filter_complex,
|
||
"-map", "[outv]",
|
||
"-map", "0:a?",
|
||
"-c:v", "libx264",
|
||
"-preset", "fast",
|
||
"-crf", "23",
|
||
"-c:a", "copy",
|
||
"-pix_fmt", "yuv420p",
|
||
output_path
|
||
]
|
||
|
||
_run_ffmpeg(cmd)
|
||
logger.info(f"Overlaid image at {position or 'center'}, {start}s-{start+duration}s")
|
||
return output_path
|
||
|
||
|
||
def overlay_multiple_images(
|
||
video_path: str,
|
||
images: List[Dict[str, Any]],
|
||
output_path: str
|
||
) -> str:
|
||
"""
|
||
叠加多个透明PNG图片
|
||
|
||
Args:
|
||
video_path: 输入视频路径
|
||
images: 图片配置列表 [{path, x, y, start, duration}]
|
||
output_path: 输出路径
|
||
|
||
Returns:
|
||
输出文件路径
|
||
"""
|
||
if not images:
|
||
import shutil
|
||
shutil.copy(video_path, output_path)
|
||
return output_path
|
||
|
||
# 构建复杂 filter_complex
|
||
inputs = ["-i", video_path]
|
||
for img in images:
|
||
inputs.extend(["-i", img["path"]])
|
||
|
||
# 链式 overlay
|
||
filter_parts = []
|
||
prev_output = "0:v"
|
||
|
||
for i, img in enumerate(images):
|
||
x = img.get("x", "(W-w)/2")
|
||
y = img.get("y", "(H-h)/2")
|
||
start = img.get("start", 0)
|
||
duration = img.get("duration", 999)
|
||
|
||
enable = f"enable='between(t,{start},{start + duration})'"
|
||
|
||
if i == len(images) - 1:
|
||
out_label = "outv"
|
||
else:
|
||
out_label = f"tmp{i}"
|
||
|
||
filter_parts.append(
|
||
f"[{prev_output}][{i+1}:v]overlay=x={x}:y={y}:{enable}[{out_label}]"
|
||
)
|
||
prev_output = out_label
|
||
|
||
filter_complex = ";".join(filter_parts)
|
||
|
||
cmd = [FFMPEG_PATH, "-y"] + inputs + [
|
||
"-filter_complex", filter_complex,
|
||
"-map", "[outv]",
|
||
"-map", "0:a?",
|
||
"-c:v", "libx264",
|
||
"-preset", "fast",
|
||
"-crf", "23",
|
||
"-c:a", "copy",
|
||
"-pix_fmt", "yuv420p",
|
||
output_path
|
||
]
|
||
|
||
_run_ffmpeg(cmd)
|
||
logger.info(f"Overlaid {len(images)} images")
|
||
return output_path
|
||
|
||
|
||
def mix_audio(
|
||
video_path: str,
|
||
audio_path: str,
|
||
output_path: str,
|
||
audio_volume: float = 1.0,
|
||
video_volume: float = 0.1,
|
||
audio_start: float = 0
|
||
) -> str:
|
||
"""
|
||
混合音频到视频(旁白、BGM等)
|
||
|
||
Args:
|
||
video_path: 输入视频路径
|
||
audio_path: 音频文件路径
|
||
output_path: 输出路径
|
||
audio_volume: 新音频音量(0-1)
|
||
video_volume: 原视频音量(0-1)
|
||
audio_start: 音频开始时间(秒)
|
||
|
||
Returns:
|
||
输出文件路径
|
||
"""
|
||
logger.info(f"Mixing audio: {audio_path}")
|
||
|
||
# 检查视频是否有音频轨道
|
||
info = get_video_info(video_path)
|
||
video_duration = info["duration"]
|
||
|
||
# 构建 filter_complex
|
||
# adelay 用于延迟音频开始时间(毫秒)
|
||
delay_ms = int(audio_start * 1000)
|
||
|
||
filter_complex = (
|
||
f"[0:a]volume={video_volume}[va];"
|
||
f"[1:a]adelay={delay_ms}|{delay_ms},volume={audio_volume}[aa];"
|
||
f"[va][aa]amix=inputs=2:duration=longest:dropout_transition=0:normalize=0[outa]"
|
||
)
|
||
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", video_path,
|
||
"-i", audio_path,
|
||
"-filter_complex", filter_complex,
|
||
"-map", "0:v",
|
||
"-map", "[outa]",
|
||
"-c:v", "copy",
|
||
"-c:a", "aac",
|
||
"-b:a", "192k",
|
||
output_path
|
||
]
|
||
|
||
try:
|
||
_run_ffmpeg(cmd)
|
||
except subprocess.CalledProcessError:
|
||
# 如果原视频没有音频,直接添加新音频
|
||
logger.warning("Video has no audio track, adding audio directly")
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", video_path,
|
||
"-i", audio_path,
|
||
"-map", "0:v",
|
||
"-map", "1:a",
|
||
"-c:v", "copy",
|
||
"-c:a", "aac",
|
||
"-b:a", "192k",
|
||
output_path
|
||
]
|
||
_run_ffmpeg(cmd)
|
||
|
||
logger.info(f"Audio mixed: {output_path}")
|
||
return output_path
|
||
|
||
|
||
def add_bgm(
|
||
video_path: str,
|
||
bgm_path: str,
|
||
output_path: str,
|
||
bgm_volume: float = 0.06,
|
||
loop: bool = True,
|
||
ducking: bool = True,
|
||
duck_gain_db: float = -6.0,
|
||
fade_in: float = 1.0,
|
||
fade_out: float = 1.0
|
||
) -> str:
|
||
"""
|
||
添加背景音乐(自动循环以匹配视频长度)
|
||
|
||
Args:
|
||
video_path: 输入视频路径
|
||
bgm_path: BGM文件路径
|
||
output_path: 输出路径
|
||
bgm_volume: BGM音量
|
||
loop: 是否循环BGM
|
||
"""
|
||
# 验证 BGM 文件存在(默认保持兼容:仍会输出视频,但会明确打日志)
|
||
if not bgm_path or not os.path.exists(bgm_path):
|
||
logger.error(f"BGM file not found (skip add_bgm): {bgm_path}")
|
||
# 直接复制原视频,不添加 BGM(上层应当提示用户/写入 metadata)
|
||
import shutil
|
||
shutil.copy(video_path, output_path)
|
||
return output_path
|
||
|
||
logger.info(f"Adding BGM: {bgm_path} (volume={bgm_volume})")
|
||
|
||
info = get_video_info(video_path)
|
||
video_duration = info["duration"]
|
||
|
||
if loop:
|
||
bgm_chain = (
|
||
f"[1:a]aloop=-1:size=2e+09,asetpts=N/SR/TB,"
|
||
f"atrim=0:{video_duration},"
|
||
f"afade=t=in:st=0:d={fade_in},"
|
||
f"afade=t=out:st={max(video_duration - fade_out, 0)}:d={fade_out},"
|
||
f"volume={bgm_volume}[bgm]"
|
||
)
|
||
else:
|
||
bgm_chain = (
|
||
f"[1:a]"
|
||
f"afade=t=in:st=0:d={fade_in},"
|
||
f"afade=t=out:st={max(video_duration - fade_out, 0)}:d={fade_out},"
|
||
f"volume={bgm_volume}[bgm]"
|
||
)
|
||
|
||
if ducking:
|
||
# 使用安全参数的 sidechaincompress,避免 unsupported 参数
|
||
filter_complex = (
|
||
f"{bgm_chain};"
|
||
f"[0:a][bgm]sidechaincompress=threshold=0.1:ratio=4:attack=5:release=250:makeup=1:mix=1:level_in=1:level_sc=1[outa]"
|
||
)
|
||
else:
|
||
filter_complex = f"{bgm_chain};[0:a][bgm]amix=inputs=2:duration=first[outa]"
|
||
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", video_path,
|
||
"-stream_loop", "-1" if loop else "0",
|
||
"-i", bgm_path,
|
||
"-filter_complex", filter_complex,
|
||
"-map", "0:v",
|
||
"-map", "[outa]",
|
||
"-c:v", "copy",
|
||
"-c:a", "aac",
|
||
"-b:a", "192k",
|
||
"-t", str(video_duration),
|
||
output_path
|
||
]
|
||
|
||
try:
|
||
_run_ffmpeg(cmd)
|
||
except subprocess.CalledProcessError:
|
||
# sidechain失败时,回退为 amix(保留原有音频 + 低音量BGM)
|
||
logger.warning("Sidechain failed, fallback to simple amix for BGM")
|
||
filter_complex = f"{bgm_chain};[0:a][bgm]amix=inputs=2:duration=first[outa]"
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", video_path,
|
||
"-stream_loop", "-1" if loop else "0",
|
||
"-i", bgm_path,
|
||
"-filter_complex", filter_complex,
|
||
"-map", "0:v",
|
||
"-map", "[outa]",
|
||
"-c:v", "copy",
|
||
"-c:a", "aac",
|
||
"-b:a", "192k",
|
||
"-t", str(video_duration),
|
||
output_path
|
||
]
|
||
_run_ffmpeg(cmd)
|
||
|
||
logger.info(f"BGM added: {output_path}")
|
||
return output_path
|
||
|
||
|
||
def trim_video(
|
||
video_path: str,
|
||
output_path: str,
|
||
start: float = 0,
|
||
duration: float = None,
|
||
end: float = None
|
||
) -> str:
|
||
"""
|
||
裁剪视频
|
||
|
||
Args:
|
||
video_path: 输入视频路径
|
||
output_path: 输出路径
|
||
start: 开始时间(秒)
|
||
duration: 持续时间(秒)
|
||
end: 结束时间(秒),与 duration 二选一
|
||
"""
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", video_path,
|
||
"-ss", str(start)
|
||
]
|
||
|
||
if duration:
|
||
cmd.extend(["-t", str(duration)])
|
||
elif end:
|
||
cmd.extend(["-to", str(end)])
|
||
|
||
cmd.extend([
|
||
"-c:v", "libx264",
|
||
"-preset", "fast",
|
||
"-crf", "23",
|
||
"-c:a", "copy",
|
||
output_path
|
||
])
|
||
|
||
_run_ffmpeg(cmd)
|
||
logger.info(f"Trimmed video: {start}s - {end or start + duration}s")
|
||
return output_path
|
||
|
||
|
||
def speed_up_video(
|
||
video_path: str,
|
||
output_path: str,
|
||
speed: float = 1.5
|
||
) -> str:
|
||
"""
|
||
加速/减速视频
|
||
|
||
Args:
|
||
video_path: 输入视频路径
|
||
output_path: 输出路径
|
||
speed: 速度倍率(>1 加速,<1 减速)
|
||
"""
|
||
# setpts 控制视频速度,atempo 控制音频速度
|
||
video_filter = f"setpts={1/speed}*PTS"
|
||
|
||
# atempo 只支持 0.5-2.0,超出需要链式处理
|
||
if speed > 2.0:
|
||
audio_filter = "atempo=2.0,atempo=" + str(speed / 2.0)
|
||
elif speed < 0.5:
|
||
audio_filter = "atempo=0.5,atempo=" + str(speed / 0.5)
|
||
else:
|
||
audio_filter = f"atempo={speed}"
|
||
|
||
cmd = [
|
||
FFMPEG_PATH, "-y",
|
||
"-i", video_path,
|
||
"-vf", video_filter,
|
||
"-af", audio_filter,
|
||
"-c:v", "libx264",
|
||
"-preset", "fast",
|
||
"-crf", "23",
|
||
"-c:a", "aac",
|
||
output_path
|
||
]
|
||
|
||
_run_ffmpeg(cmd)
|
||
logger.info(f"Speed changed to {speed}x: {output_path}")
|
||
return output_path
|