""" FFmpeg 视频处理工具模块 支持规模化批量视频处理:拼接、字幕、叠加、混音 """ import os import re import subprocess import tempfile import logging from pathlib import Path from typing import List, Dict, Any, Optional, Tuple import config logger = logging.getLogger(__name__) # FFmpeg/FFprobe 路径(优先使用项目内的二进制) FFMPEG_PATH = str(config.BASE_DIR / "bin" / "ffmpeg") if (config.BASE_DIR / "bin" / "ffmpeg").exists() else "ffmpeg" FFPROBE_PATH = str(config.BASE_DIR / "bin" / "ffprobe") if (config.BASE_DIR / "bin" / "ffprobe").exists() else "ffprobe" # 字体路径优先使用项目自带中文字体,其次使用 Linux 系统字体,最后再回退到 macOS 路径 DEFAULT_FONT_PATHS = [ # 优先使用 Linux 系统级中文字体 (服务器环境最稳健) "/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf", "/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc", # 项目内字体 (注意:需确保文件不是 LFS 指针) str(config.FONTS_DIR / "HarmonyOS-Sans-SC-Regular.ttf"), str(config.FONTS_DIR / "AlibabaPuHuiTi-Regular.ttf"), # macOS 字体(仅本地调试生效) "/System/Library/Fonts/PingFang.ttc", "/System/Library/Fonts/STHeiti Medium.ttc", "/System/Library/Fonts/Supplemental/Arial Unicode.ttf", ] def _get_font_path() -> str: for p in DEFAULT_FONT_PATHS: if os.path.exists(p) and os.path.getsize(p) > 1000: return p return "Arial" # 极端情况下退回英文字体,避免崩溃 def _sanitize_text(text: str) -> str: """ 去除可能导致 ffmpeg 命令行错误的特殊控制字符,但保留 Emoji、数字、标点和各国语言。 """ if not text: return "" # 不再过滤任何字符,只确保不是 None return text def add_silence_audio(video_path: str, output_path: str) -> str: """ 给无音轨的视频补一条静音轨(立体声 44.1k),避免后续 filter 找不到 0:a """ cmd = [ FFMPEG_PATH, "-y", "-i", video_path, "-f", "lavfi", "-i", "anullsrc=channel_layout=stereo:sample_rate=44100", "-shortest", "-c:v", "copy", "-c:a", "aac", output_path ] _run_ffmpeg(cmd) return output_path def _run_ffmpeg(cmd: List[str], check: bool = True) -> subprocess.CompletedProcess: """执行 FFmpeg 命令""" logger.debug(f"FFmpeg command: {' '.join(cmd)}") try: result = subprocess.run( cmd, capture_output=True, text=True, check=check ) # 无论成功失败,输出 stderr 以便排查字体等警告 if result.stderr: print(f"[FFmpeg stderr] {result.stderr}", flush=True) if result.returncode != 0: logger.error(f"FFmpeg stderr: {result.stderr}") return result except subprocess.CalledProcessError as e: logger.error(f"FFmpeg failed: {e.stderr}") raise def get_video_info(video_path: str) -> Dict[str, Any]: """获取视频信息(时长、分辨率、帧率等)""" cmd = [ FFPROBE_PATH, "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", video_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise ValueError(f"Failed to probe video: {video_path}") import json data = json.loads(result.stdout) # 提取关键信息 info = { "duration": float(data.get("format", {}).get("duration", 0)), "width": 0, "height": 0, "fps": 30 } for stream in data.get("streams", []): if stream.get("codec_type") == "video": info["width"] = stream.get("width", 0) info["height"] = stream.get("height", 0) # 解析帧率 (如 "30/1" 或 "29.97") fps_str = stream.get("r_frame_rate", "30/1") if "/" in fps_str: num, den = fps_str.split("/") info["fps"] = float(num) / float(den) if float(den) != 0 else 30 else: info["fps"] = float(fps_str) break return info def concat_videos( video_paths: List[str], output_path: str, target_size: Tuple[int, int] = (1080, 1920) ) -> str: """ 使用 FFmpeg concat demuxer 拼接多段视频 Args: video_paths: 视频文件路径列表 output_path: 输出文件路径 target_size: 目标分辨率 (width, height),默认竖屏 1080x1920 Returns: 输出文件路径 """ if not video_paths: raise ValueError("No video paths provided") logger.info(f"Concatenating {len(video_paths)} videos...") # 创建 concat 文件列表 concat_file = config.TEMP_DIR / f"concat_{os.getpid()}.txt" with open(concat_file, "w", encoding="utf-8") as f: for vp in video_paths: # 使用绝对路径并转义单引号 abs_path = os.path.abspath(vp) f.write(f"file '{abs_path}'\n") width, height = target_size # 使用 filter_complex 统一分辨率后拼接 # 每个视频先 scale + pad 到目标尺寸 filter_parts = [] for i in range(len(video_paths)): # scale 保持宽高比,pad 填充黑边居中 filter_parts.append( f"[{i}:v]scale={width}:{height}:force_original_aspect_ratio=decrease," f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black,setsar=1[v{i}]" ) # 拼接所有视频流 concat_inputs = "".join([f"[v{i}]" for i in range(len(video_paths))]) filter_parts.append(f"{concat_inputs}concat=n={len(video_paths)}:v=1:a=0[outv]") filter_complex = ";".join(filter_parts) # 构建 ffmpeg 命令 cmd = [FFMPEG_PATH, "-y"] for vp in video_paths: cmd.extend(["-i", vp]) cmd.extend([ "-filter_complex", filter_complex, "-map", "[outv]", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-pix_fmt", "yuv420p", output_path ]) _run_ffmpeg(cmd) # 清理临时文件 if concat_file.exists(): concat_file.unlink() logger.info(f"Concatenated video saved: {output_path}") return output_path def concat_videos_with_audio( video_paths: List[str], output_path: str, target_size: Tuple[int, int] = (1080, 1920) ) -> str: """ 拼接视频并保留音频轨道 """ if not video_paths: raise ValueError("No video paths provided") logger.info(f"Concatenating {len(video_paths)} videos with audio...") width, height = target_size n = len(video_paths) # 构建 filter_complex filter_parts = [] # 视频处理 for i in range(n): filter_parts.append( f"[{i}:v]scale={width}:{height}:force_original_aspect_ratio=decrease," f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:black,setsar=1[v{i}]" ) # 音频处理(静音填充如果没有音频) for i in range(n): filter_parts.append(f"[{i}:a]aformat=sample_rates=44100:channel_layouts=stereo[a{i}]") # 拼接 v_concat = "".join([f"[v{i}]" for i in range(n)]) a_concat = "".join([f"[a{i}]" for i in range(n)]) filter_parts.append(f"{v_concat}concat=n={n}:v=1:a=0[outv]") filter_parts.append(f"{a_concat}concat=n={n}:v=0:a=1[outa]") filter_complex = ";".join(filter_parts) cmd = [FFMPEG_PATH, "-y"] for vp in video_paths: cmd.extend(["-i", vp]) cmd.extend([ "-filter_complex", filter_complex, "-map", "[outv]", "-map", "[outa]", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-pix_fmt", "yuv420p", output_path ]) try: _run_ffmpeg(cmd) except subprocess.CalledProcessError: # 如果音频拼接失败,回退到无音频版本 logger.warning("Audio concat failed, falling back to video only") return concat_videos(video_paths, output_path, target_size) logger.info(f"Concatenated video with audio saved: {output_path}") return output_path def add_subtitle( video_path: str, text: str, start: float, duration: float, output_path: str, style: Dict[str, Any] = None ) -> str: """ 使用 drawtext filter 添加单条字幕 Args: video_path: 输入视频路径 text: 字幕文本 start: 开始时间(秒) duration: 持续时间(秒) output_path: 输出路径 style: 样式配置 { fontsize: 字体大小, fontcolor: 字体颜色, borderw: 描边宽度, bordercolor: 描边颜色, x: x位置 (可用表达式如 "(w-text_w)/2"), y: y位置, font: 字体路径或名称 } Returns: 输出文件路径 """ style = style or {} # 默认样式 fontsize = style.get("fontsize", 48) fontcolor = style.get("fontcolor", "white") borderw = style.get("borderw", 3) bordercolor = style.get("bordercolor", "black") x = style.get("x", "(w-text_w)/2") # 默认水平居中 y = style.get("y", "h-200") # 默认底部偏上 # 优先使用动态检测到的有效字体,而不是硬编码的可能损坏的路径 default_font_path = _get_font_path() font = style.get("font", default_font_path) # 转义特殊字符 escaped_text = text.replace("'", "\\'").replace(":", "\\:") # drawtext filter drawtext = ( f"drawtext=text='{escaped_text}':" f"fontfile='{font}':" f"fontsize={fontsize}:" f"fontcolor={fontcolor}:" f"borderw={borderw}:" f"bordercolor={bordercolor}:" f"x={x}:y={y}:" f"enable='between(t,{start},{start + duration})'" ) cmd = [ FFMPEG_PATH, "-y", "-i", video_path, "-vf", drawtext, "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "copy", "-pix_fmt", "yuv420p", output_path ] _run_ffmpeg(cmd) logger.info(f"Added subtitle: '{text[:20]}...' at {start}s") return output_path def wrap_text(text: str, max_chars: int = 18) -> str: """ 简单的文本换行处理 """ if not text: return "" # 如果已经有换行符,假设用户已经手动处理 if "\n" in text: return text result = "" count = 0 for char in text: if count >= max_chars: result += "\n" count = 0 result += char # 简单估算:中文算1个,英文也算1个(等宽字体) # 实际上中英文混合较复杂,这里简化处理 count += 1 return result def mix_audio_at_offset( base_audio: str, overlay_audio: str, offset: float, output_path: str, base_volume: float = 1.0, overlay_volume: float = 1.0 ) -> str: """ 在指定偏移位置混合音频 """ # 如果 base_audio 不存在,创建一个静音底 if not os.path.exists(base_audio): logger.warning(f"Base audio not found: {base_audio}") return overlay_audio cmd = [ FFMPEG_PATH, "-y", "-i", base_audio, "-i", overlay_audio, "-filter_complex", f"[0:a]volume={base_volume}[a0];[1:a]volume={overlay_volume},adelay={int(offset*1000)}|{int(offset*1000)}[a1];[a0][a1]amix=inputs=2:duration=first:dropout_transition=0:normalize=0[out]", "-map", "[out]", "-c:a", "mp3", # Use MP3 for audio only mixing output_path ] _run_ffmpeg(cmd) return output_path def adjust_audio_duration( input_path: str, target_duration: float, output_path: str ) -> str: """ 调整音频时长(仅在音频过长时加速,音频较短时保持原速) 用户需求: - 音频时长 > 目标时长 → 加速播放 - 音频时长 <= 目标时长 → 保持原速(不慢放) """ if not os.path.exists(input_path): return None current_duration = float(get_audio_info(input_path).get("duration", 0)) if current_duration <= 0: return input_path # 只在音频过长时才加速,音频较短时保持原速 if current_duration <= target_duration: # 音频时长 <= 目标时长,不需要调整,直接复制 import shutil shutil.copy(input_path, output_path) logger.info(f"Audio ({current_duration:.2f}s) <= target ({target_duration:.2f}s), keeping original speed") return output_path # 音频过长,需要加速 speed_ratio = current_duration / target_duration # 限制加速范围 (最多2倍速),避免声音变调太严重 speed_ratio = min(speed_ratio, 2.0) logger.info(f"Audio ({current_duration:.2f}s) > target ({target_duration:.2f}s), speeding up {speed_ratio:.2f}x") cmd = [ FFMPEG_PATH, "-y", "-i", input_path, "-filter:a", f"atempo={speed_ratio}", output_path ] _run_ffmpeg(cmd) return output_path def get_audio_info(file_path: str) -> Dict[str, Any]: """获取音频信息""" return get_video_info(file_path) def wrap_text_smart(text: str, max_chars: int = 15) -> str: """ 智能字幕换行(上短下长策略) """ if not text or len(text) <= max_chars: return text # 优先在标点或空格处换行 split_chars = [",", "。", "!", "?", " ", ",", ".", "!", "?"] best_split = -1 # 寻找中间附近的分割点 mid = len(text) // 2 for i in range(len(text)): if text[i] in split_chars: # 偏好后半部分(上短下长) if abs(i - mid) < abs(best_split - mid): best_split = i if best_split != -1 and best_split < len(text) - 1: return text[:best_split+1] + "\n" + text[best_split+1:] # 强制换行(上短下长) split_idx = int(len(text) * 0.4) # 上面 40% return text[:split_idx] + "\n" + text[split_idx:] def add_multiple_subtitles( video_path: str, subtitles: List[Dict[str, Any]], output_path: str, default_style: Dict[str, Any] = None ) -> str: """ 添加多条字幕 """ if not subtitles: # 无字幕直接复制 import shutil shutil.copy(video_path, output_path) return output_path default_style = default_style or {} # 强制使用完整字体(先用项目内 NotoSansSC,如果不存在则回退 Droid) font = "/root/video-flow/assets/fonts/NotoSansSC-Regular.otf" if not (os.path.exists(font) and os.path.getsize(font) > 1024 * 100): # 至少100KB以上认为有效 font = "/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf" if not (os.path.exists(font) and os.path.getsize(font) > 1024 * 100): font = _get_font_path() print(f"[SubDebug] Using font for subtitles: {font}", flush=True) # 构建多个 drawtext filter filters = [] for sub in subtitles: raw_text = sub.get("text", "") # 打印原始文本的 repr 和 hex,以便排查特殊字符 print(f"[SubDebug] Subtitle text repr: {repr(raw_text)}", flush=True) print(f"[SubDebug] Subtitle text hex: {' '.join(hex(ord(c)) for c in raw_text)}", flush=True) text = _sanitize_text(raw_text) # 自动换行 text = wrap_text(text) start = sub.get("start", 0) duration = sub.get("duration", 3) style = {**default_style, **sub.get("style", {})} fontsize = style.get("fontsize", 48) fontcolor = style.get("fontcolor", "white") borderw = style.get("borderw", 3) bordercolor = style.get("bordercolor", "black") base_y = style.get("y", "h-200") # 默认启用背景框以提高可读性 box = style.get("box", 1) boxcolor = style.get("boxcolor", "black@0.5") boxborderw = style.get("boxborderw", 10) # 多行字幕:拆分成多个 drawtext 滤镜,每行单独居中 lines = text.split("\n") if "\n" in text else [text] line_height = int(fontsize * 1.3) # 行高 for line_idx, line in enumerate(lines): if not line.strip(): continue # 转义:反斜杠、单引号、冒号、百分号 escaped_line = line.replace("\\", "\\\\").replace("'", "\\'").replace(":", "\\:").replace("%", "\\%") # 计算每行的 y 位置(从底部往上排列) # base_y 是最后一行的位置,往上依次排列 line_offset = (len(lines) - 1 - line_idx) * line_height if isinstance(base_y, str) and base_y.startswith("h-"): y_expr = f"({base_y})-{line_offset}" else: y_expr = f"({base_y})-{line_offset}" drawtext = ( f"drawtext=text='{escaped_line}':" f"fontfile='{font}':" f"fontsize={fontsize}:" f"fontcolor={fontcolor}:" f"borderw={borderw}:" f"bordercolor={bordercolor}:" f"box={box}:boxcolor={boxcolor}:boxborderw={boxborderw}:" f"x=(w-text_w)/2:y={y_expr}:" # 每行都水平居中 f"enable='between(t,{start},{start + duration})'" ) filters.append(drawtext) # 用逗号连接多个 filter vf = ",".join(filters) cmd = [ FFMPEG_PATH, "-y", "-i", video_path, "-vf", vf, "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "copy", "-pix_fmt", "yuv420p", output_path ] _run_ffmpeg(cmd) logger.info(f"Added {len(subtitles)} subtitles") return output_path def overlay_image( video_path: str, image_path: str, output_path: str, position: Tuple[int, int] = None, start: float = 0, duration: float = None, fade_in: float = 0, fade_out: float = 0 ) -> str: """ 叠加透明PNG图片(花字、水印等)到视频 Args: video_path: 输入视频路径 image_path: PNG图片路径(支持透明通道) output_path: 输出路径 position: (x, y) 位置,None则居中 start: 开始时间(秒) duration: 持续时间(秒),None则到视频结束 fade_in: 淡入时间(秒) fade_out: 淡出时间(秒) Returns: 输出文件路径 """ # 获取视频信息 info = get_video_info(video_path) video_duration = info["duration"] if duration is None: duration = video_duration - start # 位置 if position: x, y = position pos_str = f"x={x}:y={y}" else: pos_str = "x=(W-w)/2:y=(H-h)/2" # 居中 # 时间控制 enable = f"enable='between(t,{start},{start + duration})'" # 构建 overlay filter overlay_filter = f"overlay={pos_str}:{enable}" # 添加淡入淡出效果 if fade_in > 0 or fade_out > 0: fade_filter = [] if fade_in > 0: fade_filter.append(f"fade=t=in:st={start}:d={fade_in}:alpha=1") if fade_out > 0: fade_out_start = start + duration - fade_out fade_filter.append(f"fade=t=out:st={fade_out_start}:d={fade_out}:alpha=1") img_filter = ",".join(fade_filter) if fade_filter else "" filter_complex = f"[1:v]{img_filter}[img];[0:v][img]{overlay_filter}[outv]" else: filter_complex = f"[0:v][1:v]{overlay_filter}[outv]" cmd = [ FFMPEG_PATH, "-y", "-i", video_path, "-i", image_path, "-filter_complex", filter_complex, "-map", "[outv]", "-map", "0:a?", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "copy", "-pix_fmt", "yuv420p", output_path ] _run_ffmpeg(cmd) logger.info(f"Overlaid image at {position or 'center'}, {start}s-{start+duration}s") return output_path def overlay_multiple_images( video_path: str, images: List[Dict[str, Any]], output_path: str ) -> str: """ 叠加多个透明PNG图片 Args: video_path: 输入视频路径 images: 图片配置列表 [{path, x, y, start, duration}] output_path: 输出路径 Returns: 输出文件路径 """ if not images: import shutil shutil.copy(video_path, output_path) return output_path # 构建复杂 filter_complex inputs = ["-i", video_path] for img in images: inputs.extend(["-i", img["path"]]) # 链式 overlay filter_parts = [] prev_output = "0:v" for i, img in enumerate(images): x = img.get("x", "(W-w)/2") y = img.get("y", "(H-h)/2") start = img.get("start", 0) duration = img.get("duration", 999) enable = f"enable='between(t,{start},{start + duration})'" if i == len(images) - 1: out_label = "outv" else: out_label = f"tmp{i}" filter_parts.append( f"[{prev_output}][{i+1}:v]overlay=x={x}:y={y}:{enable}[{out_label}]" ) prev_output = out_label filter_complex = ";".join(filter_parts) cmd = [FFMPEG_PATH, "-y"] + inputs + [ "-filter_complex", filter_complex, "-map", "[outv]", "-map", "0:a?", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "copy", "-pix_fmt", "yuv420p", output_path ] _run_ffmpeg(cmd) logger.info(f"Overlaid {len(images)} images") return output_path def mix_audio( video_path: str, audio_path: str, output_path: str, audio_volume: float = 1.0, video_volume: float = 0.1, audio_start: float = 0 ) -> str: """ 混合音频到视频(旁白、BGM等) Args: video_path: 输入视频路径 audio_path: 音频文件路径 output_path: 输出路径 audio_volume: 新音频音量(0-1) video_volume: 原视频音量(0-1) audio_start: 音频开始时间(秒) Returns: 输出文件路径 """ logger.info(f"Mixing audio: {audio_path}") # 检查视频是否有音频轨道 info = get_video_info(video_path) video_duration = info["duration"] # 构建 filter_complex # adelay 用于延迟音频开始时间(毫秒) delay_ms = int(audio_start * 1000) filter_complex = ( f"[0:a]volume={video_volume}[va];" f"[1:a]adelay={delay_ms}|{delay_ms},volume={audio_volume}[aa];" f"[va][aa]amix=inputs=2:duration=longest:dropout_transition=0:normalize=0[outa]" ) cmd = [ FFMPEG_PATH, "-y", "-i", video_path, "-i", audio_path, "-filter_complex", filter_complex, "-map", "0:v", "-map", "[outa]", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", output_path ] try: _run_ffmpeg(cmd) except subprocess.CalledProcessError: # 如果原视频没有音频,直接添加新音频 logger.warning("Video has no audio track, adding audio directly") cmd = [ FFMPEG_PATH, "-y", "-i", video_path, "-i", audio_path, "-map", "0:v", "-map", "1:a", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", output_path ] _run_ffmpeg(cmd) logger.info(f"Audio mixed: {output_path}") return output_path def add_bgm( video_path: str, bgm_path: str, output_path: str, bgm_volume: float = 0.06, loop: bool = True, ducking: bool = True, duck_gain_db: float = -6.0, fade_in: float = 1.0, fade_out: float = 1.0 ) -> str: """ 添加背景音乐(自动循环以匹配视频长度) Args: video_path: 输入视频路径 bgm_path: BGM文件路径 output_path: 输出路径 bgm_volume: BGM音量 loop: 是否循环BGM """ # 验证 BGM 文件存在 if not bgm_path or not os.path.exists(bgm_path): logger.error(f"BGM file not found: {bgm_path}") # 直接复制原视频,不添加 BGM import shutil shutil.copy(video_path, output_path) return output_path logger.info(f"Adding BGM: {bgm_path} (volume={bgm_volume})") info = get_video_info(video_path) video_duration = info["duration"] if loop: bgm_chain = ( f"[1:a]aloop=-1:size=2e+09,asetpts=N/SR/TB," f"atrim=0:{video_duration}," f"afade=t=in:st=0:d={fade_in}," f"afade=t=out:st={max(video_duration - fade_out, 0)}:d={fade_out}," f"volume={bgm_volume}[bgm]" ) else: bgm_chain = ( f"[1:a]" f"afade=t=in:st=0:d={fade_in}," f"afade=t=out:st={max(video_duration - fade_out, 0)}:d={fade_out}," f"volume={bgm_volume}[bgm]" ) if ducking: # 使用安全参数的 sidechaincompress,避免 unsupported 参数 filter_complex = ( f"{bgm_chain};" f"[0:a][bgm]sidechaincompress=threshold=0.1:ratio=4:attack=5:release=250:makeup=1:mix=1:level_in=1:level_sc=1[outa]" ) else: filter_complex = f"{bgm_chain};[0:a][bgm]amix=inputs=2:duration=first[outa]" cmd = [ FFMPEG_PATH, "-y", "-i", video_path, "-stream_loop", "-1" if loop else "0", "-i", bgm_path, "-filter_complex", filter_complex, "-map", "0:v", "-map", "[outa]", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-t", str(video_duration), output_path ] try: _run_ffmpeg(cmd) except subprocess.CalledProcessError: # sidechain失败时,回退为 amix(保留原有音频 + 低音量BGM) logger.warning("Sidechain failed, fallback to simple amix for BGM") filter_complex = f"{bgm_chain};[0:a][bgm]amix=inputs=2:duration=first[outa]" cmd = [ FFMPEG_PATH, "-y", "-i", video_path, "-stream_loop", "-1" if loop else "0", "-i", bgm_path, "-filter_complex", filter_complex, "-map", "0:v", "-map", "[outa]", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-t", str(video_duration), output_path ] _run_ffmpeg(cmd) logger.info(f"BGM added: {output_path}") return output_path def trim_video( video_path: str, output_path: str, start: float = 0, duration: float = None, end: float = None ) -> str: """ 裁剪视频 Args: video_path: 输入视频路径 output_path: 输出路径 start: 开始时间(秒) duration: 持续时间(秒) end: 结束时间(秒),与 duration 二选一 """ cmd = [ FFMPEG_PATH, "-y", "-i", video_path, "-ss", str(start) ] if duration: cmd.extend(["-t", str(duration)]) elif end: cmd.extend(["-to", str(end)]) cmd.extend([ "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "copy", output_path ]) _run_ffmpeg(cmd) logger.info(f"Trimmed video: {start}s - {end or start + duration}s") return output_path def speed_up_video( video_path: str, output_path: str, speed: float = 1.5 ) -> str: """ 加速/减速视频 Args: video_path: 输入视频路径 output_path: 输出路径 speed: 速度倍率(>1 加速,<1 减速) """ # setpts 控制视频速度,atempo 控制音频速度 video_filter = f"setpts={1/speed}*PTS" # atempo 只支持 0.5-2.0,超出需要链式处理 if speed > 2.0: audio_filter = "atempo=2.0,atempo=" + str(speed / 2.0) elif speed < 0.5: audio_filter = "atempo=0.5,atempo=" + str(speed / 0.5) else: audio_filter = f"atempo={speed}" cmd = [ FFMPEG_PATH, "-y", "-i", video_path, "-vf", video_filter, "-af", audio_filter, "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", output_path ] _run_ffmpeg(cmd) logger.info(f"Speed changed to {speed}x: {output_path}") return output_path