249 lines
8.2 KiB
Python
249 lines
8.2 KiB
Python
"""
|
|
Legacy project JSON normalizer.
|
|
|
|
Goal:
|
|
- Convert legacy project JSON (from /opt/gloda-factory/temp/project_*.json)
|
|
into the script_data schema expected by current Streamlit UI (`app.py`)
|
|
and composer (`modules/composer.py`).
|
|
|
|
Principles:
|
|
- Pure rule-based, no AI generation.
|
|
- Never drop legacy information: keep full raw doc under `script_data["_legacy"]`
|
|
and per-scene under `scene["_legacy"]`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
|
def _as_str(v: Any) -> str:
|
|
return v if isinstance(v, str) else ""
|
|
|
|
|
|
def _as_dict(v: Any) -> Dict[str, Any]:
|
|
return v if isinstance(v, dict) else {}
|
|
|
|
|
|
def _as_list(v: Any) -> List[Any]:
|
|
return v if isinstance(v, list) else []
|
|
|
|
|
|
def _detect_schema_variant(doc: Dict[str, Any]) -> str:
|
|
scenes = _as_list(doc.get("scenes"))
|
|
if not scenes:
|
|
return "Unknown"
|
|
prompt_keys = {"image_prompt", "visual_prompt", "video_prompt"}
|
|
for s in scenes:
|
|
if isinstance(s, dict) and (set(s.keys()) & prompt_keys):
|
|
return "Schema_A"
|
|
typical_b = {"keyframe", "story_beat", "camera_movement", "image_url"}
|
|
for s in scenes:
|
|
if isinstance(s, dict) and (set(s.keys()) & typical_b):
|
|
return "Schema_B"
|
|
return "Unknown"
|
|
|
|
|
|
def _derive_visual_prompt_from_keyframe(scene: Dict[str, Any]) -> str:
|
|
"""
|
|
Build a readable prompt-like summary from keyframe + story_beat.
|
|
This is NOT an AI prompt; it's a structured description to avoid empty fields.
|
|
"""
|
|
keyframe = _as_dict(scene.get("keyframe") or scene.get("keyframes"))
|
|
story_beat = _as_str(scene.get("story_beat"))
|
|
|
|
parts: List[str] = []
|
|
if keyframe:
|
|
parts.append("[DerivedFromKeyframe]")
|
|
# deterministic ordering for readability
|
|
for k in sorted(keyframe.keys()):
|
|
v = keyframe.get(k)
|
|
if isinstance(v, (str, int, float)) and str(v).strip():
|
|
parts.append(f"{k}: {v}")
|
|
elif isinstance(v, dict) and v:
|
|
# flatten one level
|
|
sub = ", ".join(f"{sk}={sv}" for sk, sv in sorted(v.items()) if str(sv).strip())
|
|
if sub:
|
|
parts.append(f"{k}: {sub}")
|
|
if story_beat:
|
|
parts.append(f"story_beat: {story_beat}")
|
|
return "\n".join(parts).strip()
|
|
|
|
|
|
def _derive_video_prompt_from_motion(scene: Dict[str, Any]) -> str:
|
|
camera_movement = _as_str(scene.get("camera_movement"))
|
|
rhythm = scene.get("rhythm")
|
|
story_beat = _as_str(scene.get("story_beat"))
|
|
|
|
parts: List[str] = []
|
|
parts.append("[DerivedFromMotion]")
|
|
if camera_movement:
|
|
parts.append(f"camera_movement: {camera_movement}")
|
|
if isinstance(rhythm, dict) and rhythm:
|
|
# keep stable keys
|
|
sub = ", ".join(f"{k}={rhythm.get(k)}" for k in sorted(rhythm.keys()))
|
|
parts.append(f"rhythm: {sub}")
|
|
if story_beat:
|
|
parts.append(f"story_beat: {story_beat}")
|
|
return "\n".join(parts).strip()
|
|
|
|
|
|
def _normalize_fancy_text(scene: Dict[str, Any], default_duration: float) -> Dict[str, Any]:
|
|
ft = scene.get("fancy_text")
|
|
if isinstance(ft, dict):
|
|
# Ensure required keys exist
|
|
out = dict(ft)
|
|
out.setdefault("text", "")
|
|
out.setdefault("style", "highlight")
|
|
# support either position dict or string
|
|
if "position" not in out:
|
|
out["position"] = "center"
|
|
out.setdefault("start_time", 0.0)
|
|
out.setdefault("duration", default_duration)
|
|
return out
|
|
|
|
# legacy doesn't have fancy_text
|
|
return {
|
|
"text": "",
|
|
"style": "highlight",
|
|
"position": "center",
|
|
"start_time": 0.0,
|
|
"duration": default_duration,
|
|
}
|
|
|
|
|
|
def _build_voiceover_timeline_from_scenes(normalized_scenes: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
timeline: List[Dict[str, Any]] = []
|
|
t = 0.0
|
|
for idx, s in enumerate(normalized_scenes):
|
|
dur = float(s.get("duration") or 0) or 0.0
|
|
legacy = _as_dict(s.get("_legacy"))
|
|
vo = _as_str(legacy.get("voiceover") or s.get("voiceover") or "")
|
|
if vo.strip():
|
|
timeline.append(
|
|
{
|
|
"id": idx + 1,
|
|
"text": vo,
|
|
"subtitle": vo,
|
|
"start_time": t,
|
|
"duration": dur if dur > 0 else 3.0,
|
|
}
|
|
)
|
|
t += dur if dur > 0 else 0.0
|
|
return timeline
|
|
|
|
|
|
def normalize_legacy_project(doc: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Return a script_data dict compatible with current UI.
|
|
"""
|
|
schema = _detect_schema_variant(doc)
|
|
|
|
scenes_in = _as_list(doc.get("scenes"))
|
|
normalized_scenes: List[Dict[str, Any]] = []
|
|
|
|
for s in scenes_in:
|
|
if not isinstance(s, dict):
|
|
continue
|
|
|
|
scene_id = int(s.get("id") or (len(normalized_scenes) + 1))
|
|
duration = float(s.get("duration") or 0) or 0.0
|
|
if duration <= 0:
|
|
duration = 3.0
|
|
|
|
# visual prompt
|
|
visual_prompt = ""
|
|
if schema == "Schema_A":
|
|
# legacy key is usually image_prompt
|
|
visual_prompt = _as_str(s.get("visual_prompt") or s.get("image_prompt") or "")
|
|
elif schema == "Schema_B":
|
|
visual_prompt = _derive_visual_prompt_from_keyframe(s)
|
|
else:
|
|
visual_prompt = _as_str(s.get("visual_prompt") or s.get("image_prompt") or "")
|
|
|
|
if not visual_prompt and s.get("keyframe"):
|
|
visual_prompt = _derive_visual_prompt_from_keyframe(s)
|
|
|
|
# video prompt
|
|
video_prompt = _as_str(s.get("video_prompt") or "")
|
|
if not video_prompt:
|
|
video_prompt = _derive_video_prompt_from_motion(s)
|
|
|
|
# fancy text (default safe)
|
|
fancy_text = _normalize_fancy_text(s, default_duration=duration)
|
|
|
|
normalized_scene: Dict[str, Any] = {
|
|
"id": scene_id,
|
|
"duration": duration,
|
|
"visual_prompt": visual_prompt,
|
|
"video_prompt": video_prompt,
|
|
"fancy_text": fancy_text,
|
|
# keep optional fields if present
|
|
"timeline": s.get("timeline", ""),
|
|
}
|
|
|
|
# Attach per-scene legacy snapshot (do not mutate the original)
|
|
normalized_scene["_legacy"] = {
|
|
"schema": schema,
|
|
"image_url": s.get("image_url"),
|
|
"keyframe": s.get("keyframe") or s.get("keyframes"),
|
|
"camera_movement": s.get("camera_movement"),
|
|
"story_beat": s.get("story_beat"),
|
|
"rhythm": s.get("rhythm"),
|
|
"sound_design": s.get("sound_design"),
|
|
"voiceover": s.get("voiceover"),
|
|
}
|
|
|
|
normalized_scenes.append(normalized_scene)
|
|
|
|
# voiceover timeline: normalize existing if present, else derive from scenes voiceover
|
|
vtl = doc.get("voiceover_timeline")
|
|
voiceover_timeline: List[Dict[str, Any]] = []
|
|
if isinstance(vtl, list) and vtl:
|
|
for idx, it in enumerate(vtl):
|
|
if not isinstance(it, dict):
|
|
continue
|
|
# unify field names
|
|
text = _as_str(it.get("text") or it.get("voiceover") or "")
|
|
subtitle = _as_str(it.get("subtitle") or text)
|
|
start_time = float(it.get("start_time") or 0.0)
|
|
duration = float(it.get("duration") or 3.0)
|
|
voiceover_timeline.append(
|
|
{
|
|
"id": int(it.get("id") or (idx + 1)),
|
|
"text": text,
|
|
"subtitle": subtitle,
|
|
"start_time": start_time,
|
|
"duration": duration,
|
|
}
|
|
)
|
|
else:
|
|
voiceover_timeline = _build_voiceover_timeline_from_scenes(normalized_scenes)
|
|
|
|
# script_data expected by UI
|
|
script_data: Dict[str, Any] = {
|
|
"hook": doc.get("hook", ""),
|
|
"selling_points": doc.get("selling_points", []) or [],
|
|
"target_audience": doc.get("target_audience", "") or "",
|
|
"video_style": doc.get("video_style", "") or "",
|
|
"bgm_style": doc.get("bgm_style", "") or "",
|
|
"voiceover_timeline": voiceover_timeline,
|
|
"scenes": normalized_scenes,
|
|
"cta": doc.get("cta", ""),
|
|
# Keep analysis for UI fallback display
|
|
"analysis": doc.get("analysis", ""),
|
|
# Preserve original
|
|
"_legacy": doc,
|
|
"_legacy_schema": schema,
|
|
}
|
|
|
|
return script_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|