""" 视频合成器模块 整合视频拼接、花字叠加、旁白配音的完整流程 """ import os import time import logging from pathlib import Path from typing import Dict, Any, List, Optional, Union import config from modules import ffmpeg_utils, fancy_text, factory, storage from modules.text_renderer import renderer logger = logging.getLogger(__name__) class VideoComposer: """视频合成器""" def __init__( self, output_dir: str = None, target_size: tuple = (1080, 1920), voice_type: str = "sweet_female" ): """ 初始化合成器 Args: output_dir: 输出目录 target_size: 目标分辨率 (width, height) voice_type: 默认旁白音色 """ self.output_dir = Path(output_dir) if output_dir else config.OUTPUT_DIR self.output_dir.mkdir(exist_ok=True) self.target_size = target_size self.voice_type = voice_type # 临时文件追踪 self._temp_files = [] def _add_temp(self, path: str): """记录临时文件""" if path: self._temp_files.append(path) def cleanup(self): """清理临时文件""" for f in self._temp_files: try: if os.path.exists(f): os.remove(f) except Exception as e: logger.warning(f"Failed to cleanup {f}: {e}") self._temp_files = [] def compose( self, video_paths: List[str], subtitles: List[Dict[str, Any]] = None, fancy_texts: List[Dict[str, Any]] = None, voiceover_text: str = None, voiceover_segments: List[Dict[str, Any]] = None, bgm_path: str = None, bgm_volume: float = 0.15, output_name: str = None, upload_to_r2: bool = False ) -> str: """ 完整视频合成流程 Args: video_paths: 分镜视频路径列表 subtitles: 字幕配置列表 [{text, start, duration, style}] fancy_texts: 花字配置列表 [{text, style, x, y, start, duration}] voiceover_text: 完整旁白文本(会自动生成并混音) voiceover_segments: 分段旁白配置 [{text, start}],与 voiceover_text 二选一 bgm_path: 背景音乐路径 bgm_volume: BGM音量 output_name: 输出文件名(不含扩展名) upload_to_r2: 是否上传到R2存储 Returns: 最终视频路径(或R2 URL) """ if not video_paths: raise ValueError("No video paths provided") timestamp = int(time.time()) output_name = output_name or f"composed_{timestamp}" logger.info(f"Starting composition: {len(video_paths)} videos") try: # Step 1: 拼接视频 merged_path = str(config.TEMP_DIR / f"{output_name}_merged.mp4") ffmpeg_utils.concat_videos(video_paths, merged_path, self.target_size) self._add_temp(merged_path) current_video = merged_path # Step 1.1: 若无音轨,补一条静音底,避免后续滤镜找不到 0:a silent_path = str(config.TEMP_DIR / f"{output_name}_silent.mp4") ffmpeg_utils.add_silence_audio(current_video, silent_path) self._add_temp(silent_path) current_video = silent_path # Step 2: 添加字幕 (白字黑边,无底框,下半区域居中) if subtitles: subtitled_path = str(config.TEMP_DIR / f"{output_name}_subtitled.mp4") subtitle_style = { "font": ffmpeg_utils._get_font_path(), "fontsize": 60, "fontcolor": "white", "borderw": 5, "bordercolor": "black", "box": 0, # 无底框 "y": "h-200", # 下半区域居中 } ffmpeg_utils.add_multiple_subtitles( current_video, subtitles, subtitled_path, default_style=subtitle_style ) self._add_temp(subtitled_path) current_video = subtitled_path # Step 3: 叠加花字 (支持原子化参数) if fancy_texts: overlay_configs = [] for ft in fancy_texts: text = ft.get("text", "") style = ft.get("style") custom_style = ft.get("custom_style") # 如果 style 是字典,说明是原子化参数,直接使用 if isinstance(style, dict): img_path = renderer.render(text, style, cache=False) elif custom_style and isinstance(custom_style, dict): # 兼容旧逻辑:如果有 custom_style,尝试通过原子化渲染器渲染 if "font_size" in custom_style: img_path = renderer.render(text, custom_style, cache=False) else: # 回退到旧版 fancy_text img_path = fancy_text.create_fancy_text( text=text, style=style if isinstance(style, str) else "subtitle", custom_style={ **(custom_style or {}), "font_name": "/System/Library/Fonts/PingFang.ttc", }, cache=False ) else: # 旧版逻辑 img_path = fancy_text.create_fancy_text( text=text, style=style if isinstance(style, str) else "subtitle", custom_style={ "font_name": "/System/Library/Fonts/PingFang.ttc", }, cache=False ) overlay_configs.append({ "path": img_path, "x": ft.get("x", "(W-w)/2"), "y": ft.get("y", "(H-h)/2"), "start": ft.get("start", 0), "duration": ft.get("duration", 999) }) fancy_path = str(config.TEMP_DIR / f"{output_name}_fancy.mp4") ffmpeg_utils.overlay_multiple_images( current_video, overlay_configs, fancy_path ) self._add_temp(fancy_path) current_video = fancy_path # Step 4: 生成并混合旁白(火山 WS 优先,失败回退 Edge) if voiceover_text: vo_path = factory.generate_voiceover_volcengine( text=voiceover_text, voice_type=self.voice_type ) self._add_temp(vo_path) voiced_path = str(config.TEMP_DIR / f"{output_name}_voiced.mp4") ffmpeg_utils.mix_audio( current_video, vo_path, voiced_path, audio_volume=1.5, video_volume=0.2 ) self._add_temp(voiced_path) current_video = voiced_path elif voiceover_segments: current_video = self._add_segmented_voiceover( current_video, voiceover_segments, output_name ) # Step 5: 添加BGM(淡入淡出,若 duck 失败会自动退回低音量混合) if bgm_path: bgm_output = str(config.TEMP_DIR / f"{output_name}_bgm.mp4") ffmpeg_utils.add_bgm( current_video, bgm_path, bgm_output, bgm_volume=bgm_volume, ducking=False, # 为避免兼容性问题,这里禁用 duck,保持低音量 duck_gain_db=-6.0, fade_in=1.0, fade_out=1.0 ) self._add_temp(bgm_output) current_video = bgm_output # Step 6: 输出最终文件 final_path = str(self.output_dir / f"{output_name}.mp4") # 复制到输出目录 import shutil shutil.copy(current_video, final_path) logger.info(f"Composition complete: {final_path}") # 上传到R2 if upload_to_r2: r2_url = storage.upload_file(final_path) logger.info(f"Uploaded to R2: {r2_url}") return r2_url return final_path finally: # 清理临时文件(保留最终输出) self.cleanup() def _add_segmented_voiceover( self, video_path: str, segments: List[Dict[str, Any]], output_name: str ) -> str: """添加分段旁白""" if not segments: return video_path # 为每段生成音频 audio_files = [] for i, seg in enumerate(segments): text = seg.get("text", "") if not text: continue voice = seg.get("voice_type", self.voice_type) audio_path = factory.generate_voiceover_volcengine( text=text, voice_type=voice, output_path=str(config.TEMP_DIR / f"{output_name}_seg_{i}.mp3") ) if audio_path: audio_files.append({ "path": audio_path, "start": seg.get("start", 0) }) self._add_temp(audio_path) if not audio_files: return video_path # 依次混入音频 current = video_path for i, af in enumerate(audio_files): output = str(config.TEMP_DIR / f"{output_name}_seg_mixed_{i}.mp4") ffmpeg_utils.mix_audio( current, af["path"], output, audio_volume=1.0, video_volume=0.2 if i == 0 else 1.0, # 只在第一次降低原视频音量 audio_start=af["start"] ) self._add_temp(output) current = output return current def compose_from_script( self, script: Dict[str, Any], video_map: Dict[int, str], bgm_path: str = None, output_name: str = None ) -> str: """ 基于生成脚本和视频映射进行合成 Args: script: 标准化分镜脚本 video_map: 场景ID到视频路径的映射 bgm_path: BGM路径 output_name: 输出文件名 """ scenes = script.get("scenes", []) if not scenes: raise ValueError("Empty script") video_paths = [] fancy_texts = [] # 1. 收集视频路径和花字 (按分镜顺序) total_duration = 0.0 for scene in scenes: scene_id = scene["id"] video_path = video_map.get(scene_id) if not video_path or not os.path.exists(video_path): logger.warning(f"Missing video for scene {scene_id}, skipping") continue # 获取实际视频时长 try: info = ffmpeg_utils.get_video_info(video_path) duration = float(info.get("duration", 5.0)) except: duration = 5.0 video_paths.append(video_path) # 花字 (白字黑边,无底框,固定在上半区域居中) if "fancy_text" in scene: ft = scene["fancy_text"] if isinstance(ft, dict): text = ft.get("text", "") if text: # 固定样式:白字黑边,无底框 fixed_style = { "font_size": 72, "font_color": "#FFFFFF", "stroke": {"color": "#000000", "width": 5} # 无 background,不加底框 } fancy_texts.append({ "text": text, "style": fixed_style, "x": "(W-w)/2", # 居中 "y": "180", # 上半区域 "start": total_duration + float(ft.get("start_time", 0)), "duration": float(ft.get("duration", duration)) }) total_duration += duration # 2. 拼接视频 timestamp = int(time.time()) output_name = output_name or f"composed_{timestamp}" merged_path = str(config.TEMP_DIR / f"{output_name}_merged.mp4") ffmpeg_utils.concat_videos(video_paths, merged_path, self.target_size) self._add_temp(merged_path) current_video = merged_path # 3. 处理整体旁白时间轴 (New Logic) voiceover_timeline = script.get("voiceover_timeline", []) mixed_audio_path = str(config.TEMP_DIR / f"{output_name}_mixed_vo.mp3") # 初始化静音底轨 (长度为 total_duration) ffmpeg_utils._run_ffmpeg([ ffmpeg_utils.FFMPEG_PATH, "-y", "-f", "lavfi", "-i", "anullsrc=r=44100:cl=stereo", "-t", str(total_duration), "-c:a", "mp3", mixed_audio_path ]) self._add_temp(mixed_audio_path) subtitles = [] if voiceover_timeline: for i, item in enumerate(voiceover_timeline): text = item.get("text", "") sub_text = item.get("subtitle", text) # 支持两种格式: # 新格式: start_time (秒), duration (秒) - 直接使用绝对时间 # 旧格式: start_ratio (0-1), duration_ratio (0-1) - 按比例计算 if "start_time" in item: # 新格式:直接使用秒 target_start = float(item.get("start_time", 0)) target_duration = float(item.get("duration", 3)) else: # 旧格式:按比例计算(向后兼容) start_ratio = float(item.get("start_ratio", 0)) duration_ratio = float(item.get("duration_ratio", 0)) target_start = start_ratio * total_duration target_duration = duration_ratio * total_duration if not text: continue # 生成 TTS tts_path = factory.generate_voiceover_volcengine( text=text, voice_type=self.voice_type, output_path=str(config.TEMP_DIR / f"{output_name}_vo_{i}.mp3") ) self._add_temp(tts_path) # 调整时长 adjusted_path = str(config.TEMP_DIR / f"{output_name}_vo_adj_{i}.mp3") ffmpeg_utils.adjust_audio_duration(tts_path, target_duration, adjusted_path) self._add_temp(adjusted_path) # 混合到总音轨 new_mixed = str(config.TEMP_DIR / f"{output_name}_mixed_{i}.mp3") ffmpeg_utils.mix_audio_at_offset(mixed_audio_path, adjusted_path, target_start, new_mixed) mixed_audio_path = new_mixed # Update current mixed path self._add_temp(new_mixed) # 添加字幕配置 (完全同步) subtitles.append({ "text": ffmpeg_utils.wrap_text_smart(sub_text), "start": target_start, "duration": target_duration, "style": {} # Default }) # 4. 将合成好的旁白混入视频 voiced_path = str(config.TEMP_DIR / f"{output_name}_voiced.mp4") ffmpeg_utils.mix_audio( current_video, mixed_audio_path, voiced_path, audio_volume=1.5, video_volume=0.2 # 压低原音 ) self._add_temp(voiced_path) current_video = voiced_path # 5. 添加字幕 (使用新的 ffmpeg_utils.add_multiple_subtitles) if subtitles: subtitled_path = str(config.TEMP_DIR / f"{output_name}_subtitled.mp4") subtitle_style = { "font": ffmpeg_utils._get_font_path(), "fontsize": 60, "fontcolor": "white", "borderw": 5, "bordercolor": "black", "box": 0, # 无底框 "y": "h-200", # 下半区域居中 } ffmpeg_utils.add_multiple_subtitles( current_video, subtitles, subtitled_path, default_style=subtitle_style ) self._add_temp(subtitled_path) current_video = subtitled_path # 6. 添加花字 if fancy_texts: fancy_path = str(config.TEMP_DIR / f"{output_name}_fancy.mp4") overlay_configs = [] for ft in fancy_texts: # 渲染花字图片 img_path = renderer.render(ft["text"], ft["style"], cache=False) overlay_configs.append({ "path": img_path, "x": ft["x"], "y": ft["y"], "start": ft["start"], "duration": ft["duration"] }) ffmpeg_utils.overlay_multiple_images( current_video, overlay_configs, fancy_path ) self._add_temp(fancy_path) current_video = fancy_path # 7. 添加 BGM if bgm_path: bgm_output = str(config.TEMP_DIR / f"{output_name}_bgm.mp4") ffmpeg_utils.add_bgm( current_video, bgm_path, bgm_output, bgm_volume=0.15 ) self._add_temp(bgm_output) current_video = bgm_output # 8. 输出最终文件 final_path = str(self.output_dir / f"{output_name}.mp4") import shutil shutil.copy(current_video, final_path) logger.info(f"Composition complete: {final_path}") self.cleanup() return final_path def compose_standard_task(self, task_config: Dict[str, Any]) -> str: """ 执行标准合成任务 (Legacy) """ settings = task_config.get("settings", {}) self.voice_type = settings.get("voice_type", self.voice_type) # 1. 准备视频片段 video_paths = [] for seg in task_config.get("segments", []): path = seg.get("path") or seg.get("video_path") if not path: continue video_paths.append(path) # 2. 解析时间轴 subtitles = [] fancy_texts = [] voiceover_segments = [] for item in task_config.get("timeline", []): itype = item.get("type") if not itype: if "text" in item and ("style" in item or "x" in item or "y" in item): itype = "fancy_text" elif "text" in item and "duration" in item and "start" in item: itype = "subtitle" elif "text" in item and "start" in item: itype = "voiceover" else: continue if itype == "subtitle": subtitles.append(item) elif itype == "fancy_text": if "x" not in item and "position" in item: item["x"] = item["position"].get("x") item["y"] = item["position"].get("y") fancy_texts.append(item) elif itype == "voiceover": voiceover_segments.append(item) return self.compose( video_paths=video_paths, subtitles=subtitles, fancy_texts=fancy_texts, voiceover_segments=voiceover_segments, bgm_path=settings.get("bgm_path"), bgm_volume=settings.get("bgm_volume", 0.06), output_name=settings.get("output_name"), upload_to_r2=settings.get("upload_to_r2", False) ) def compose_product_video( video_paths: List[str], subtitle_configs: List[Dict[str, Any]] = None, fancy_text_configs: List[Dict[str, Any]] = None, voiceover_text: str = None, bgm_path: str = None, output_path: str = None, voice_type: str = "sweet_female" ) -> str: """便捷函数:合成商品短视频""" composer = VideoComposer(voice_type=voice_type) output_name = None if output_path: output_name = Path(output_path).stem composer.output_dir = Path(output_path).parent return composer.compose( video_paths=video_paths, subtitles=subtitle_configs, fancy_texts=fancy_text_configs, voiceover_text=voiceover_text, bgm_path=bgm_path, output_name=output_name ) def quick_compose( video_folder: str, script: List[Dict[str, Any]], output_path: str = None, voice_type: str = "sweet_female", bgm_path: str = None ) -> str: """快速合成:从文件夹读取视频,配合脚本合成""" folder = Path(video_folder) video_files = sorted([ f for f in folder.iterdir() if f.suffix.lower() in ['.mp4', '.mov', '.avi', '.mkv'] ]) video_paths = [] subtitles = [] fancy_texts = [] voiceovers = [] current_time = 0 for i, item in enumerate(script): if "video" in item: vp = folder / item["video"] elif i < len(video_files): vp = video_files[i] else: logger.warning(f"No video for script item {i}") continue video_paths.append(str(vp)) try: info = ffmpeg_utils.get_video_info(str(vp)) duration = info.get("duration", 5) except: duration = item.get("duration", 5) if "subtitle" in item: subtitles.append({ "text": item["subtitle"], "start": current_time, "duration": duration, "style": item.get("subtitle_style", {}) }) if "fancy_text" in item: ft = item["fancy_text"] if isinstance(ft, str): ft = {"text": ft} fancy_texts.append({ "text": ft.get("text", ""), "style": ft.get("style", "highlight"), "custom_style": ft.get("custom_style"), "x": ft.get("x", "(W-w)/2"), "y": ft.get("y", 200), "start": current_time, "duration": duration }) if "voiceover" in item: voiceovers.append(item["voiceover"]) current_time += duration voiceover_text = "。".join(voiceovers) if voiceovers else None return compose_product_video( video_paths=video_paths, subtitle_configs=subtitles if subtitles else None, fancy_text_configs=fancy_texts if fancy_texts else None, voiceover_text=voiceover_text, bgm_path=bgm_path, output_path=output_path, voice_type=voice_type ) # ============================================================ # 示例用法 # ============================================================ def example_hairclip_video(): """示例:发夹商品视频合成""" 素材目录 = Path("/Volumes/Tony/video-flow/素材/发夹/合成图拆分镜") video_paths = [ str(素材目录 / "视频-分镜1.mp4"), str(素材目录 / "视频-分镜2.mp4"), str(素材目录 / "视频-分镜3.mp4"), str(素材目录 / "视频-分镜4.mp4"), str(素材目录 / "视频-分镜5.mp4"), ] script = [ { "subtitle": "塌马尾 vs 高颅顶", "fancy_text": { "text": "塌马尾 vs 高颅顶", "style": "comparison", "y": 150 }, "voiceover": "普通马尾和高颅顶马尾的区别,你看出来了吗", }, { "subtitle": "3秒出门,无需皮筋", "fancy_text": {"text": "发量+50%", "style": "bubble", "y": 300}, "voiceover": "只需要三秒钟,不需要皮筋,发量瞬间增加百分之五十", }, { "subtitle": "发量+50%", "voiceover": "蓬松的高颅顶效果,让你瞬间变美", }, { "subtitle": "狂甩不掉!", "fancy_text": {"text": "狂甩不掉!", "style": "warning", "y": 400}, "voiceover": "而且超级牢固,怎么甩都不会掉", }, { "subtitle": "¥3.99 立即抢购", "fancy_text": {"text": "3.99", "style": "price", "y": 500}, "voiceover": "只要三块九毛九,点击下方链接立即购买", }, ] output = quick_compose( video_folder=str(素材目录), script=script, output_path="/Volumes/Tony/video-flow/output/发夹_合成视频.mp4", voice_type="sweet_female" ) print(f"视频合成完成: {output}") return output if __name__ == "__main__": logging.basicConfig(level=logging.INFO) example_hairclip_video()