""" 编辑器 API 路由 提供时间轴编辑、轨道管理功能 """ import os import time import json import logging from typing import List, Optional, Dict, Any from pathlib import Path from functools import lru_cache from fastapi import APIRouter, HTTPException, Body from pydantic import BaseModel, Field import config from modules.db_manager import db from modules import factory, ffmpeg_utils from modules.text_renderer import renderer from modules.legacy_path_mapper import map_legacy_local_path from modules.preview_proxy import ensure_video_proxy logger = logging.getLogger(__name__) router = APIRouter() def _safe_float(v: Any) -> Optional[float]: try: if v is None: return None return float(v) except Exception: return None @lru_cache(maxsize=2048) def _probe_source_duration_cache_key(path: str, mtime_ns: int, size: int) -> float: """ Cache wrapper to avoid repeated ffprobe for same file version. Note: caller must pass (path, stat.mtime_ns, stat.size). """ info = ffmpeg_utils.get_video_info(path) return float(info.get("duration") or 0.0) def _get_source_duration_seconds(path: Optional[str]) -> Optional[float]: if not path: return None if not os.path.exists(path): return None try: st = os.stat(path) dur = _probe_source_duration_cache_key(path, st.st_mtime_ns, st.st_size) return dur if dur and dur > 0 else None except Exception: return None # ============================================================ # Pydantic Models - 编辑器数据结构 # ============================================================ class TimelineClip(BaseModel): """时间轴片段""" id: str type: str # video, audio, subtitle, fancy_text, bgm start: float # 开始时间(秒) duration: float # 持续时间(秒) source_path: Optional[str] = None # 源文件路径 source_url: Optional[str] = None # 源文件 URL # 视频特有 trim_start: float = 0 # 裁剪起点 trim_end: Optional[float] = None # 裁剪终点 source_duration: Optional[float] = None # 源素材时长(秒) # 文本特有 text: Optional[str] = None style: Optional[Dict[str, Any]] = None position: Optional[Dict[str, Any]] = None # {x, y} # 音频特有 volume: float = 1.0 fade_in: Optional[float] = None fade_out: Optional[float] = None ducking: Optional[bool] = None duck_volume: Optional[float] = None playback_rate: Optional[float] = None class Track(BaseModel): """轨道""" id: str name: str type: str # video, audio, subtitle, fancy_text, bgm, sticker clips: List[TimelineClip] = [] locked: bool = False visible: bool = True muted: bool = False class EditorState(BaseModel): """编辑器状态""" project_id: str total_duration: float = 0 tracks: List[Track] = [] current_time: float = 0 zoom: float = 1.0 ripple_mode: bool = True subtitle_style: Optional[Dict[str, Any]] = None class VoiceoverRequest(BaseModel): """旁白生成请求""" text: str voice_type: str = "zh_female_santongyongns_saturn_bigtts" target_duration: Optional[float] = None class FancyTextRequest(BaseModel): """花字生成请求""" text: str style: Dict[str, Any] = Field(default_factory=lambda: { "font_size": 72, "font_color": "#FFFFFF", "stroke": {"color": "#000000", "width": 5} }) class TrimRequest(BaseModel): """视频裁剪请求""" source_path: str start_time: float end_time: float # ============================================================ # API Endpoints # ============================================================ @router.get("/{project_id}/state", response_model=EditorState) async def get_editor_state(project_id: str, use_proxy: bool = True): """ 获取编辑器状态 从项目数据和素材自动构建时间轴 """ project = db.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="项目不存在") # 获取素材 assets = db.get_assets(project_id) script_data = project.get("script_data", {}) product_info = project.get("product_info", {}) or {} # 如果已保存过 editor_state,优先回放(非破坏式:轨道时间轴优先) saved_state = product_info.get("editor_state") if isinstance(saved_state, dict) and saved_state.get("tracks"): try: state_obj = EditorState.model_validate(saved_state) # pydantic v2 # 回放兜底补全:source_duration 可能是旧版本保存为 null for t in state_obj.tracks: for c in t.clips: # 统一用 source_duration 表示“源素材时长”(视频/音频都适用) if c.source_duration is None or (isinstance(c.source_duration, (int, float)) and c.source_duration <= 0): if c.source_path: try: if t.type in ("audio", "bgm") or c.type in ("audio", "bgm"): c.source_duration = float(ffmpeg_utils.get_audio_info(c.source_path).get("duration") or 0) or None else: c.source_duration = _get_source_duration_seconds(c.source_path) except Exception: c.source_duration = _get_source_duration_seconds(c.source_path) # normalize trim_end if missing(仅视频有意义,但这里兜底不会出错) if c.trim_end is None: c.trim_end = (c.trim_start or 0) + (c.duration or 0) # 回放兜底补全:如果保存态里“旁白/字幕/花字”为空,但 script_data 里有内容,则自动回填 voiceover_timeline = (script_data or {}).get("voiceover_timeline") or [] scenes = (script_data or {}).get("scenes") or [] def _ensure_track(tid: str, ttype: str, name: str) -> Track: existing = next((t for t in state_obj.tracks if t.id == tid), None) if existing: return existing t = Track(id=tid, name=name, type=ttype, clips=[]) state_obj.tracks.append(t) return t # ensure total_duration for ratio-derived items total_duration = float(state_obj.total_duration or 0.0) # voiceover vo_t = _ensure_track("audio-voiceover", "audio", "旁白") if (not vo_t.clips) and voiceover_timeline: for i, item in enumerate(voiceover_timeline): start_time = float(item.get("start_time", item.get("start_ratio", 0) * total_duration)) duration = float(item.get("duration", item.get("duration_ratio", 0.25) * total_duration)) vo_t.clips.append(TimelineClip( id=f"vo-{i}", type="audio", start=start_time, duration=duration, text=item.get("text", ""), volume=1.0 )) # subtitle sub_t = _ensure_track("subtitle-main", "subtitle", "字幕") if (not sub_t.clips) and voiceover_timeline: for i, item in enumerate(voiceover_timeline): start_time = float(item.get("start_time", item.get("start_ratio", 0) * total_duration)) duration = float(item.get("duration", item.get("duration_ratio", 0.25) * total_duration)) sub_t.clips.append(TimelineClip( id=f"sub-{i}", type="subtitle", start=start_time, duration=duration, text=item.get("subtitle", item.get("text", "")), style={"fontsize": 60, "fontcolor": "white"}, position={"x": "(w-text_w)/2", "y": "h-200"} )) # fancy text fancy_t = _ensure_track("fancy-text", "fancy_text", "花字") if (not fancy_t.clips) and scenes: # best effort: align with existing video duration if any scene_start = 0.0 video_track = next((t for t in state_obj.tracks if t.type == "video"), None) # build scene durations from video clips if possible scene_durations = [] if video_track and video_track.clips: scene_durations = [float(c.duration or 0.0) for c in video_track.clips] for idx, scene in enumerate(scenes): scene_duration = scene_durations[idx] if idx < len(scene_durations) and scene_durations[idx] > 0 else 5.0 ft = scene.get("fancy_text", {}) if isinstance(scene, dict) else {} if isinstance(ft, dict) and ft.get("text"): fancy_t.clips.append(TimelineClip( id=f"fancy-{scene.get('id', idx)}", type="fancy_text", start=scene_start, duration=scene_duration, text=ft.get("text", ""), style={ "font_size": 72, "font_color": "#FFFFFF", "stroke": {"color": "#000000", "width": 5} }, position={"x": "(W-w)/2", "y": "180"} )) scene_start += scene_duration # stickers(贴纸轨道:默认存在,便于拖拽添加) _ensure_track("sticker-main", "sticker", "贴纸") # video(如果保存态里没有视频轨或为空,但 assets 里有视频,则回填,避免“从 video flow 进来却黑屏”) video_assets = sorted( [a for a in assets if a.get("asset_type") == "video" and a.get("status") == "completed"], key=lambda x: x.get("scene_id", 0) ) video_t = _ensure_track("video-main", "video", "视频") if (not video_t.clips) and video_assets: cur_t = 0.0 for asset in video_assets: remote_url = asset.get("remote_url") source_path, _ = map_legacy_local_path(asset.get("local_path")) duration = 5.0 if source_path and os.path.exists(source_path): try: duration = float(ffmpeg_utils.get_video_info(source_path).get("duration", 5.0)) except Exception: duration = 5.0 # 统一走 file proxy(内部会处理 legacy 映射 / remote_url) url = f"/api/assets/file/{asset.get('id')}" video_t.clips.append(TimelineClip( id=f"video-{asset.get('scene_id')}", type="video", start=cur_t, duration=duration, source_path=source_path, source_url=url, trim_start=0, trim_end=duration, source_duration=duration )) cur_t += duration continue if remote_url and isinstance(remote_url, str) and remote_url.strip(): meta = asset.get("metadata") or {} duration = float(meta.get("duration") or meta.get("source_duration") or 5.0) url = f"/api/assets/file/{asset.get('id')}" video_t.clips.append(TimelineClip( id=f"video-{asset.get('scene_id')}", type="video", start=cur_t, duration=duration, source_path=None, source_url=url, trim_start=0, trim_end=duration, source_duration=duration )) cur_t += duration # ------------------------------ # 时间轴对齐:如果视频片段是真实时长(例如 3s),而字幕/花字按 5s 切段, # 会出现“花字超出视频、拖拽时看不对齐”的体验。 # 这里按视频轨道重排花字/字幕/旁白的 start/duration,并把 total_duration 收敛到 videoEnd。 # ------------------------------ def _clip_end(c: TimelineClip) -> float: try: return float(c.start or 0.0) + float(c.duration or 0.0) except Exception: return 0.0 scene_timeline = [] if video_t and video_t.clips: for vc in sorted(video_t.clips, key=lambda c: float(c.start or 0.0)): # prefer parse scene_id from id=video-{scene_id} sid = None if isinstance(vc.id, str) and vc.id.startswith("video-"): try: sid = int(vc.id.replace("video-", "")) except Exception: sid = None scene_timeline.append({ "scene_id": sid, "start": float(vc.start or 0.0), "duration": max(0.01, float(vc.duration or 0.0)), }) if scene_timeline: # 1) 花字:按 fancy-{scene_id} 精确对齐 fancy_t = next((t for t in state_obj.tracks if t.type == "fancy_text"), None) if fancy_t and fancy_t.clips: by_id = {c.id: c for c in fancy_t.clips if isinstance(c.id, str)} for seg in scene_timeline: sid = seg["scene_id"] if sid is None: continue cid = f"fancy-{sid}" c = by_id.get(cid) if not c: continue c.start = float(seg["start"]) c.duration = float(seg["duration"]) # 修复“表达式坐标导致拖拽不直观”:初始化为居中百分比坐标(后续拖拽会改成数值) if isinstance(c.position, dict): if not isinstance(c.position.get("x"), (int, float)): c.position["x"] = 0.5 if not isinstance(c.position.get("y"), (int, float)): c.position["y"] = 0.2 # 2) 字幕/旁白:如果片段数与场景数一致,则按索引对齐 subtitle_t = next((t for t in state_obj.tracks if t.type == "subtitle"), None) voice_t = next((t for t in state_obj.tracks if t.id == "audio-voiceover"), None) for tr in [subtitle_t, voice_t]: if not tr or not tr.clips: continue if len(tr.clips) != len(scene_timeline): continue for i, seg in enumerate(scene_timeline): tr.clips[i].start = float(seg["start"]) tr.clips[i].duration = float(seg["duration"]) # 3) total_duration:收敛到视频结束时间 video_end = max(0.0, max((seg["start"] + seg["duration"]) for seg in scene_timeline)) if video_end > 0: state_obj.total_duration = float(video_end) # 4) 兜底裁剪:任何片段不允许超出 total_duration(避免视频结束后黑屏但字幕/花字继续) td = float(state_obj.total_duration or 0.0) if td > 0: for tr in state_obj.tracks: kept = [] for c in (tr.clips or []): if float(c.start or 0.0) >= td: continue end = _clip_end(c) if end > td: c.duration = max(0.01, td - float(c.start or 0.0)) kept.append(c) tr.clips = kept # ------------------------------ # BGM:如果没有片段,但脚本给了 bgm_style,则默认塞一条(可在前端再调整/替换) # ------------------------------ bgm_t = next((t for t in state_obj.tracks if t.type == "bgm" or t.id == "audio-bgm"), None) if bgm_t is None: bgm_t = _ensure_track("audio-bgm", "bgm", "背景音乐") if bgm_t and (not bgm_t.clips): bgm_style = (script_data or {}).get("bgm_style") or "" bgm_dir = config.ASSETS_DIR / "bgm" chosen = None if bgm_dir.exists(): files = [f for f in bgm_dir.iterdir() if f.is_file() and f.suffix.lower() in [".mp3", ".mp4", ".m4a", ".wav"]] files.sort(key=lambda p: p.name) if isinstance(bgm_style, str) and bgm_style.strip(): # very small heuristic: pick file that shares any keyword kws = [k.strip() for k in bgm_style.replace(",", " ").replace(",", " ").split() if len(k.strip()) >= 2] for f in files: name = f.stem if any(k in name for k in kws): chosen = f break if chosen is None and files: chosen = files[0] if chosen is not None: td = float(state_obj.total_duration or 0.0) if td <= 0: # fallback: use max end across all tracks td = max(0.0, max((_clip_end(c) for t in state_obj.tracks for c in (t.clips or [])), default=0.0)) if td > 0: bgm_t.clips.append(TimelineClip( id="bgm-0", type="bgm", start=0.0, duration=float(td), source_path=str(chosen), source_url=f"/static/assets/bgm/{chosen.name}", volume=0.25, style={"loop": True}, )) return state_obj except Exception: # fall back to rebuild pass # 构建轨道 tracks = [] # 1. 视频轨道 video_track = Track( id="video-main", name="视频", type="video", clips=[] ) current_time = 0 video_assets = sorted( [a for a in assets if a["asset_type"] == "video" and a["status"] == "completed"], key=lambda x: x["scene_id"] ) for asset in video_assets: remote_url = asset.get("remote_url") source_path, mapped_url = map_legacy_local_path(asset.get("local_path")) # 1) 本地存在:正常走本地(统一用 /api/assets/file 作为 source_url,更稳) if source_path and os.path.exists(source_path): try: info = ffmpeg_utils.get_video_info(source_path) duration = float(info.get("duration", 5.0)) except Exception: duration = 5.0 url = f"/api/assets/file/{asset['id']}" video_track.clips.append(TimelineClip( id=f"video-{asset['scene_id']}", type="video", start=current_time, duration=duration, source_path=source_path, source_url=url, trim_start=0, trim_end=duration, source_duration=duration )) current_time += duration continue # 2) 本地缺失但有 remote_url:也要能预览(至少不黑屏) if remote_url and isinstance(remote_url, str) and remote_url.strip(): meta = asset.get("metadata") or {} duration = float(meta.get("duration") or meta.get("source_duration") or 5.0) url = f"/api/assets/file/{asset['id']}" # 统一走 file proxy(会 307 到 remote_url) video_track.clips.append(TimelineClip( id=f"video-{asset['scene_id']}", type="video", start=current_time, duration=duration, source_path=None, source_url=url, trim_start=0, trim_end=duration, source_duration=duration )) current_time += duration tracks.append(video_track) total_duration = current_time # 2. 旁白/TTS 轨道 voiceover_track = Track( id="audio-voiceover", name="旁白", type="audio", clips=[] ) voiceover_timeline = script_data.get("voiceover_timeline", []) for i, item in enumerate(voiceover_timeline): start_time = float(item.get("start_time", item.get("start_ratio", 0) * total_duration)) duration = float(item.get("duration", item.get("duration_ratio", 0.25) * total_duration)) voiceover_track.clips.append(TimelineClip( id=f"vo-{i}", type="audio", start=start_time, duration=duration, text=item.get("text", ""), volume=1.0 )) tracks.append(voiceover_track) # 3. 字幕轨道 subtitle_track = Track( id="subtitle-main", name="字幕", type="subtitle", clips=[] ) for i, item in enumerate(voiceover_timeline): start_time = float(item.get("start_time", item.get("start_ratio", 0) * total_duration)) duration = float(item.get("duration", item.get("duration_ratio", 0.25) * total_duration)) subtitle_track.clips.append(TimelineClip( id=f"sub-{i}", type="subtitle", start=start_time, duration=duration, text=item.get("subtitle", item.get("text", "")), style={"fontsize": 60, "fontcolor": "white"}, position={"x": "(w-text_w)/2", "y": "h-200"} )) tracks.append(subtitle_track) # 4. 花字轨道 fancy_track = Track( id="fancy-text", name="花字", type="fancy_text", clips=[] ) scenes = script_data.get("scenes", []) scene_start = 0 for scene in scenes: # 计算该场景的时长 scene_video = next( (a for a in video_assets if a["scene_id"] == scene["id"]), None ) source_path, _ = map_legacy_local_path(scene_video.get("local_path") if scene_video else None) if source_path and os.path.exists(source_path): try: info = ffmpeg_utils.get_video_info(source_path) scene_duration = float(info.get("duration", 5.0)) except: scene_duration = 5.0 else: scene_duration = 5.0 ft = scene.get("fancy_text", {}) if isinstance(ft, dict) and ft.get("text"): fancy_track.clips.append(TimelineClip( id=f"fancy-{scene['id']}", type="fancy_text", start=scene_start, duration=scene_duration, text=ft.get("text", ""), style={ "font_size": 72, "font_color": "#FFFFFF", "stroke": {"color": "#000000", "width": 5} }, position={"x": "(W-w)/2", "y": "180"} )) scene_start += scene_duration tracks.append(fancy_track) # 5. BGM 轨道 bgm_track = Track( id="audio-bgm", name="背景音乐", type="bgm", clips=[], muted=False ) # 默认 BGM:如果脚本给了 bgm_style,则塞一条,便于一键出片(用户可在前端替换/删除) try: bgm_style = (script_data or {}).get("bgm_style") or "" bgm_dir = config.ASSETS_DIR / "bgm" chosen = None if bgm_dir.exists(): files = [f for f in bgm_dir.iterdir() if f.is_file() and f.suffix.lower() in [".mp3", ".mp4", ".m4a", ".wav"]] files.sort(key=lambda p: p.name) if isinstance(bgm_style, str) and bgm_style.strip(): kws = [k.strip() for k in bgm_style.replace(",", " ").replace(",", " ").split() if len(k.strip()) >= 2] for f in files: name = f.stem if any(k in name for k in kws): chosen = f break if chosen is None and files: chosen = files[0] if chosen is not None and float(total_duration or 0.0) > 0: bgm_track.clips.append(TimelineClip( id="bgm-0", type="bgm", start=0.0, duration=float(total_duration), source_path=str(chosen), source_url=f"/static/assets/bgm/{chosen.name}", volume=0.25, style={"loop": True}, )) except Exception: pass tracks.append(bgm_track) # 6. 贴纸轨道 sticker_track = Track( id="sticker-main", name="贴纸", type="sticker", clips=[], muted=False ) tracks.append(sticker_track) return EditorState( project_id=project_id, total_duration=total_duration, tracks=tracks, current_time=0, zoom=1.0 ) @router.post("/{project_id}/state") async def save_editor_state(project_id: str, state: EditorState): """保存编辑器状态到数据库""" project = db.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="项目不存在") # 1) 持久化 editor_state(用于 Cut/Trim/Split 回放,不改表结构) product_info = project.get("product_info", {}) or {} product_info["editor_state"] = state.model_dump() db.update_project_product_info(project_id, product_info) # 将编辑器状态转换回 script_data 格式 script_data = project.get("script_data", {}) # 更新 voiceover_timeline voiceover_timeline = [] subtitle_clips = [] for track in state.tracks: if track.type == "audio" and track.id == "audio-voiceover": for clip in track.clips: voiceover_timeline.append({ "text": clip.text or "", "start_time": clip.start, "duration": clip.duration }) elif track.type == "subtitle": for clip in track.clips: subtitle_clips.append({ "text": clip.text or "", "subtitle": clip.text or "", "start_time": clip.start, "duration": clip.duration }) # 合并字幕到 voiceover_timeline for i, vo in enumerate(voiceover_timeline): if i < len(subtitle_clips): vo["subtitle"] = subtitle_clips[i].get("text", vo.get("text", "")) script_data["voiceover_timeline"] = voiceover_timeline # 更新花字 for track in state.tracks: if track.type == "fancy_text": for clip in track.clips: # 找到对应的 scene scene_id_str = clip.id.replace("fancy-", "") try: scene_id = int(scene_id_str) for scene in script_data.get("scenes", []): if scene["id"] == scene_id: if "fancy_text" not in scene: scene["fancy_text"] = {} scene["fancy_text"]["text"] = clip.text or "" scene["fancy_text"]["start_time"] = clip.start scene["fancy_text"]["duration"] = clip.duration break except ValueError: pass db.update_project_script(project_id, script_data) return {"message": "编辑器状态已保存"} @router.post("/generate-voiceover") async def generate_voiceover(request: VoiceoverRequest): """ 生成 TTS 音频 返回音频文件路径 """ try: output_path = str(config.TEMP_DIR / f"vo_{int(time.time())}.mp3") audio_path = factory.generate_voiceover_volcengine( text=request.text, voice_type=request.voice_type, output_path=output_path ) if audio_path and os.path.exists(audio_path): # 如果需要调整时长 if request.target_duration: adjusted_path = str(config.TEMP_DIR / f"vo_adj_{int(time.time())}.mp3") ffmpeg_utils.fit_audio_to_duration_by_speed(audio_path, request.target_duration, adjusted_path) audio_path = adjusted_path # 返回源时长(用于前端计算倍速) try: dur = float(ffmpeg_utils.get_audio_info(audio_path).get("duration") or 0.0) except Exception: dur = 0.0 return { "success": True, "path": audio_path, "url": f"/static/temp/{Path(audio_path).name}", "duration": dur if dur > 0 else None, } else: raise HTTPException(status_code=500, detail="TTS 生成失败") except Exception as e: logger.error(f"TTS 生成错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/generate-fancy-text") async def generate_fancy_text(request: FancyTextRequest): """ 生成花字图片 返回图片路径 """ try: img_path = renderer.render( text=request.text, style=request.style, cache=False ) if img_path and os.path.exists(img_path): return { "success": True, "path": img_path, "url": f"/static/temp/{Path(img_path).name}" } else: raise HTTPException(status_code=500, detail="花字生成失败") except Exception as e: logger.error(f"花字生成错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/trim-video") async def trim_video(request: TrimRequest): """ 裁剪视频片段 返回新视频路径 """ if not os.path.exists(request.source_path): raise HTTPException(status_code=404, detail="源视频不存在") try: output_path = str(config.TEMP_DIR / f"trimmed_{int(time.time())}.mp4") # 使用 ffmpeg 裁剪 duration = request.end_time - request.start_time cmd = [ ffmpeg_utils.FFMPEG_PATH, "-y", "-ss", str(request.start_time), "-i", request.source_path, "-t", str(duration), "-c", "copy", output_path ] ffmpeg_utils._run_ffmpeg(cmd) return { "success": True, "path": output_path, "url": f"/static/temp/{Path(output_path).name}", "duration": duration } except Exception as e: logger.error(f"视频裁剪错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.delete("/{project_id}/clip/{clip_id}") async def delete_clip(project_id: str, clip_id: str): """删除时间轴上的片段""" # 这里主要是前端状态管理,后端只做记录 logger.info(f"删除片段: {project_id}/{clip_id}") return {"message": "片段已删除", "clip_id": clip_id}