""" Video Flow v2.0 - 命令行主流程控制器 独立的 CLI 入口,支持命令行参数调用完整的视频生成流程。 与 app.py (Streamlit UI) 分离,共用 modules 层。 Usage: python main_flow.py --help python main_flow.py \ --product-name "网红气质大号发量多!高马尾香蕉夹" \ --images /path/to/主图1.png /path/to/主图2.png \ --category "钟表配饰-时尚饰品-发饰" \ --price "3.99元" \ --tags "回头客|款式好看|材质好" \ --model doubao \ --output final_hairclip """ import argparse import logging import sys import json import time import random from pathlib import Path from typing import Dict, List, Optional # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("video_flow.log") ] ) import config from modules.script_gen import ScriptGenerator from modules.image_gen import ImageGenerator from modules.video_gen import VideoGenerator from modules.composer import VideoComposer logger = logging.getLogger("MainFlow") def parse_args(): """解析命令行参数""" parser = argparse.ArgumentParser( description="Video Flow CLI - 商品短视频自动生成命令行工具", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 使用默认测试数据 python main_flow.py --demo # 指定商品信息 python main_flow.py \\ --product-name "网红气质大号发量多!高马尾香蕉夹" \\ --images ./素材/发夹/原始稿/主图1.png ./素材/发夹/原始稿/主图2.png \\ --category "钟表配饰-时尚饰品-发饰" \\ --price "3.99元" \\ --tags "回头客|款式好看|材质好" \\ --model doubao \\ --output final_hairclip """ ) # 基本参数 parser.add_argument("--demo", action="store_true", help="使用内置测试数据(发夹案例)") parser.add_argument("--product-name", type=str, help="商品标题") parser.add_argument("--images", nargs="+", type=str, help="商品主图路径列表 (建议 3-5 张)") # 商品信息 parser.add_argument("--category", type=str, default="", help="商品类目") parser.add_argument("--price", type=str, default="", help="商品价格") parser.add_argument("--tags", type=str, default="", help="评价标签 (用于提炼卖点)") parser.add_argument("--params", type=str, default="", help="商品参数") parser.add_argument("--style-hint", type=str, default="", help="风格提示 (如: 韩风、高级感)") # 模型选择 parser.add_argument("--script-model", choices=["shubiaobiao", "doubao"], default="doubao", help="脚本生成模型 (default: doubao)") parser.add_argument("--image-model", choices=["shubiaobiao", "doubao", "gemini", "doubao-group"], default="doubao", help="图片生成模型 (default: doubao)") # 输出选项 parser.add_argument("--output", type=str, default="final_video", help="输出文件名 (不含扩展名)") parser.add_argument("--project-id", type=str, default=None, help="项目ID (默认自动生成)") # 可选步骤控制 parser.add_argument("--skip-video", action="store_true", help="跳过视频生成步骤 (仅生成脚本和图片)") parser.add_argument("--skip-compose", action="store_true", help="跳过合成步骤") return parser.parse_args() def get_demo_data() -> tuple: """获取内置测试数据 (发夹案例)""" product_name = "网红气质大号发量多!高马尾香蕉夹 马尾显发量蓬松神器马尾夹" product_info = { "category": "钟表配饰-时尚饰品-发饰", "price": "3.99元", "tags": "回头客|款式好看|材质好|尺寸合适|颜色好看|很好用|做工好|质感不错|很牢固", "params": "金属材质:非金属; 非金属材质:树脂; 发夹分类:香蕉夹; 风格:日韩|简约风|法式|瑞丽风", "style_hint": "" } # 原始图片路径 base_image_dir = config.BASE_DIR / "素材" / "发夹" / "原始稿" original_images = [ str(base_image_dir / "主图1.png"), str(base_image_dir / "主图2.png"), str(base_image_dir / "主图3.png") ] return product_name, product_info, original_images def match_bgm_by_style(bgm_style: str, bgm_dir: Path) -> Optional[str]: """ 根据脚本 bgm_style 智能匹配 BGM 文件 - 匹配成功:随机选一个匹配的 BGM - 匹配失败:随机选任意一个 BGM """ # 获取所有 BGM 文件 (支持 .mp3 和 .mp4) bgm_files = list(bgm_dir.glob("*.[mM][pP][34]")) + list(bgm_dir.glob("*.[mM][pP]3")) bgm_files = [f for f in bgm_files if f.is_file() and not f.name.startswith('.')] if not bgm_files: return None # 关键词匹配 if bgm_style: style_lower = bgm_style.lower() keywords = ["活泼", "欢快", "轻松", "舒缓", "休闲", "温柔", "随性", "百搭", "bling", "节奏"] matched_keywords = [kw for kw in keywords if kw in style_lower] matched_files = [] for f in bgm_files: fname = f.name if any(kw in fname for kw in matched_keywords): matched_files.append(f) if matched_files: return str(random.choice(matched_files)) # 无匹配则随机选一个 return str(random.choice(bgm_files)) def run_video_flow(args) -> Optional[str]: """执行完整的视频生成流程""" # ===== 1. 准备输入数据 ===== if args.demo: logger.info("Using DEMO data (发夹案例)...") product_name, product_info, original_images = get_demo_data() else: if not args.product_name or not args.images: logger.error("Must provide --product-name and --images, or use --demo") return None product_name = args.product_name product_info = { "category": args.category, "price": args.price, "tags": args.tags, "params": args.params, "style_hint": args.style_hint } original_images = args.images # 验证图片是否存在 valid_images = [p for p in original_images if Path(p).exists()] if not valid_images: logger.error("No valid input images found!") logger.error(f"Checked paths: {original_images}") return None logger.info(f"Found {len(valid_images)} valid images") # 生成项目 ID project_id = args.project_id or f"CLI-{int(time.time())}" logger.info(f"Project ID: {project_id}") # ===== 2. 生成脚本 ===== logger.info("="*50) logger.info("Step 1: Generating Script...") logger.info("="*50) script_gen = ScriptGenerator() script = script_gen.generate_script( product_name, product_info, valid_images, model_provider=args.script_model ) if not script: logger.error("Script generation failed.") return None # 保存脚本供检查 script_path = config.OUTPUT_DIR / f"script_{project_id}.json" with open(script_path, "w", encoding="utf-8") as f: json.dump(script, f, ensure_ascii=False, indent=2) logger.info(f"Script saved to {script_path}") scenes = script.get("scenes", []) logger.info(f"Generated {len(scenes)} scenes") # ===== 3. 生成分镜图片 ===== logger.info("="*50) logger.info("Step 2: Generating Scene Images...") logger.info("="*50) image_gen = ImageGenerator() visual_anchor = script.get("visual_anchor", "") scene_images: Dict[int, str] = {} if args.image_model == "doubao-group": # 组图生成模式 logger.info("Using Doubao Group Image Generation...") scene_images = image_gen.generate_group_images_doubao( scenes=scenes, reference_images=valid_images, visual_anchor=visual_anchor ) else: # 顺序生成模式 current_refs = list(valid_images) for idx, scene in enumerate(scenes): scene_id = scene["id"] logger.info(f"Generating image for Scene {scene_id} ({idx+1}/{len(scenes)})...") img_path = image_gen.generate_single_scene_image( scene=scene, original_image_path=current_refs, previous_image_path=None, model_provider=args.image_model, visual_anchor=visual_anchor ) if img_path: scene_images[scene_id] = img_path current_refs.append(img_path) logger.info(f"Scene {scene_id} image: {img_path}") else: logger.warning(f"Failed to generate image for Scene {scene_id}") if not scene_images: logger.error("Image generation failed (no images generated).") return None logger.info(f"Generated {len(scene_images)} scene images.") if args.skip_video: logger.info("Skipping video generation (--skip-video)") return None # ===== 4. 生成分镜视频 ===== logger.info("="*50) logger.info("Step 3: Generating Scene Videos...") logger.info("="*50) video_gen = VideoGenerator() scene_videos = video_gen.generate_scene_videos(project_id, script, scene_images) if not scene_videos: logger.error("Video generation failed (or partially failed).") return None logger.info(f"Generated {len(scene_videos)} scene videos.") if args.skip_compose: logger.info("Skipping composition (--skip-compose)") return None # ===== 5. 合成最终视频 ===== logger.info("="*50) logger.info("Step 4: Composing Final Video...") logger.info("="*50) composer = VideoComposer(voice_type=config.VOLC_TTS_DEFAULT_VOICE) # 智能匹配 BGM bgm_style = script.get("bgm_style", "") bgm_path = match_bgm_by_style(bgm_style, config.ASSETS_DIR / "bgm") if bgm_path: logger.info(f"Selected BGM: {Path(bgm_path).name} (style: {bgm_style or 'default'})") # 合成 output_name = f"{args.output}_{project_id}" final_video = composer.compose_from_script( script=script, video_map=scene_videos, bgm_path=bgm_path, output_name=output_name ) logger.info("="*50) logger.info(f"✅ Workflow Complete!") logger.info(f" Final Video: {final_video}") logger.info(f" Script: {script_path}") logger.info("="*50) return final_video def main(): """CLI 入口""" args = parse_args() # 验证参数 if not args.demo and not args.product_name: print("Error: Must provide --product-name and --images, or use --demo") print("Run with --help for usage information.") sys.exit(1) try: result = run_video_flow(args) if result: sys.exit(0) else: sys.exit(1) except KeyboardInterrupt: logger.info("Interrupted by user") sys.exit(130) except Exception as e: logger.exception(f"Unexpected error: {e}") sys.exit(1) if __name__ == "__main__": main()