""" MatchMe Studio - Editor Module (Assembly + BGM) """ import logging import requests from pathlib import Path from typing import Dict, Any, List, Optional from moviepy.editor import ( VideoFileClip, AudioFileClip, TextClip, CompositeVideoClip, CompositeAudioClip, concatenate_videoclips ) import config from modules import storage logger = logging.getLogger(__name__) # ============================================================ # Video Assembly # ============================================================ def download_video(url: str) -> str: """Download video from URL to temp.""" filename = f"dl_{Path(url).name}" local_path = config.TEMP_DIR / filename with open(local_path, "wb") as f: f.write(requests.get(url).content) return str(local_path) def concatenate_scenes(video_urls: List[str]) -> str: """Concatenate multiple video clips into one.""" logger.info(f"Concatenating {len(video_urls)} clips...") clips = [] for url in video_urls: local_path = download_video(url) clip = VideoFileClip(local_path) # Resize to 9:16 if needed if clip.w != 1080 or clip.h != 1920: clip = clip.resize(newsize=(1080, 1920)) clips.append(clip) final = concatenate_videoclips(clips, method="compose") output_path = config.TEMP_DIR / f"merged_{int(__import__('time').time())}.mp4" final.write_videofile( str(output_path), fps=30, codec="libx264", audio_codec="aac", threads=4, logger=None ) # Cleanup for clip in clips: clip.close() final.close() return str(output_path) # ============================================================ # Subtitle Burning # ============================================================ def burn_subtitles( video_path: str, scenes: List[Dict[str, Any]] ) -> str: """Burn subtitles onto video.""" logger.info("Burning subtitles...") clip = VideoFileClip(video_path) subtitle_clips = [] current_time = 0 for scene in scenes: voiceover = scene.get("voiceover", "") duration = scene.get("duration", 5) if voiceover: try: txt = TextClip( voiceover, fontsize=48, color='white', stroke_color='black', stroke_width=2, font='DejaVu-Sans', method='caption', size=(900, None) ).set_position(('center', 1600)).set_start(current_time).set_duration(duration) subtitle_clips.append(txt) except Exception as e: logger.warning(f"Subtitle error: {e}") current_time += duration if subtitle_clips: final = CompositeVideoClip([clip] + subtitle_clips) else: final = clip output_path = config.TEMP_DIR / f"subtitled_{int(__import__('time').time())}.mp4" final.write_videofile( str(output_path), fps=30, codec="libx264", audio_codec="aac", threads=4, logger=None ) clip.close() final.close() return str(output_path) # ============================================================ # Voiceover Mixing # ============================================================ def mix_voiceover(video_path: str, voiceover_url: str) -> str: """Mix voiceover audio with video.""" if not voiceover_url: return video_path logger.info("Mixing voiceover...") # Download voiceover vo_local = download_video(voiceover_url) video = VideoFileClip(video_path) voiceover = AudioFileClip(vo_local) # Trim voiceover if longer than video if voiceover.duration > video.duration: voiceover = voiceover.subclip(0, video.duration) # Mix with original audio (if any) if video.audio: mixed = CompositeAudioClip([ video.audio.volumex(0.3), # Lower original voiceover.volumex(1.0) ]) else: mixed = voiceover final = video.set_audio(mixed) output_path = config.TEMP_DIR / f"voiced_{int(__import__('time').time())}.mp4" final.write_videofile( str(output_path), fps=30, codec="libx264", audio_codec="aac", threads=4, logger=None ) video.close() voiceover.close() final.close() return str(output_path) # ============================================================ # BGM Mixing # ============================================================ def mix_bgm( video_path: str, bgm_path: str, bgm_volume: float = 0.2 ) -> str: """Mix background music with video.""" logger.info("Mixing BGM...") video = VideoFileClip(video_path) bgm = AudioFileClip(bgm_path) # Loop BGM if shorter than video if bgm.duration < video.duration: loops_needed = int(video.duration / bgm.duration) + 1 bgm = bgm.loop(loops_needed) # Trim to video length bgm = bgm.subclip(0, video.duration).volumex(bgm_volume) # Mix with existing audio if video.audio: mixed = CompositeAudioClip([video.audio, bgm]) else: mixed = bgm final = video.set_audio(mixed) output_path = config.TEMP_DIR / f"bgm_{int(__import__('time').time())}.mp4" final.write_videofile( str(output_path), fps=30, codec="libx264", audio_codec="aac", threads=4, logger=None ) video.close() bgm.close() final.close() return str(output_path) # ============================================================ # Full Pipeline # ============================================================ def assemble_final_video( video_urls: List[str], scenes: List[Dict[str, Any]], voiceover_url: str = "", bgm_url: str = "" ) -> str: """ Full assembly pipeline: 1. Concatenate scene videos 2. Burn subtitles 3. Mix voiceover 4. Mix BGM 5. Upload to R2 """ logger.info("Starting full assembly...") # Step 1: Concatenate merged = concatenate_scenes(video_urls) # Step 2: Subtitles subtitled = burn_subtitles(merged, scenes) # Step 3: Voiceover if voiceover_url: voiced = mix_voiceover(subtitled, voiceover_url) else: voiced = subtitled # Step 4: BGM if bgm_url: bgm_local = download_video(bgm_url) final_path = mix_bgm(voiced, bgm_local) else: final_path = voiced # Step 5: Upload final_url = storage.upload_file(final_path) logger.info(f"Final video uploaded: {final_url}") return final_url