Files
video-flow/scripts/scan_legacy_schema.py
2026-01-09 14:09:16 +08:00

233 lines
6.5 KiB
Python

#!/usr/bin/env python3
"""
Scan legacy project JSON schemas under temp dir.
Purpose:
- Identify schema variants for /opt/gloda-factory/temp/project_*.json
- Produce a machine-readable summary + a markdown report
This script is READ-ONLY.
"""
from __future__ import annotations
import argparse
import json
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Tuple
def _safe_load_json(path: Path) -> Dict[str, Any] | None:
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def _type_name(v: Any) -> str:
if v is None:
return "null"
if isinstance(v, bool):
return "bool"
if isinstance(v, int):
return "int"
if isinstance(v, float):
return "float"
if isinstance(v, str):
return "str"
if isinstance(v, list):
return "list"
if isinstance(v, dict):
return "dict"
return type(v).__name__
def _detect_schema_variant(doc: Dict[str, Any]) -> str:
"""
Heuristic:
- Schema_A: scenes contain prompt-like fields (image_prompt/visual_prompt/video_prompt)
- Schema_B: scenes do NOT contain these, but contain keyframe/story_beat/camera_movement/image_url
"""
scenes = doc.get("scenes") or []
if not isinstance(scenes, list):
return "Unknown"
prompt_keys = {"image_prompt", "visual_prompt", "video_prompt"}
seen_prompt = False
for s in scenes:
if isinstance(s, dict) and (set(s.keys()) & prompt_keys):
seen_prompt = True
break
if seen_prompt:
return "Schema_A"
# If no prompt keys, but has typical B keys, call Schema_B
typical_b = {"keyframe", "story_beat", "camera_movement", "image_url"}
seen_b = False
for s in scenes:
if isinstance(s, dict) and (set(s.keys()) & typical_b):
seen_b = True
break
return "Schema_B" if seen_b else "Unknown"
@dataclass
class ScanResult:
total_files: int
parsed_files: int
failed_files: int
schema_counts: Counter
top_level_key_counts: Counter
scene_key_counts: Counter
cta_type_counts: Counter
sample_by_schema: Dict[str, List[str]]
def scan_dir(temp_dir: Path) -> ScanResult:
files = sorted(temp_dir.glob("project_*.json"))
schema_counts: Counter = Counter()
top_level_key_counts: Counter = Counter()
scene_key_counts: Counter = Counter()
cta_type_counts: Counter = Counter()
sample_by_schema: Dict[str, List[str]] = defaultdict(list)
parsed = 0
failed = 0
for f in files:
doc = _safe_load_json(f)
if not isinstance(doc, dict):
failed += 1
continue
parsed += 1
schema = _detect_schema_variant(doc)
schema_counts[schema] += 1
if len(sample_by_schema[schema]) < 5:
pid = str(doc.get("id") or f.stem.replace("project_", ""))
sample_by_schema[schema].append(pid)
# top-level keys
for k in doc.keys():
top_level_key_counts[k] += 1
# scenes keys
scenes = doc.get("scenes") or []
if isinstance(scenes, list):
for s in scenes:
if isinstance(s, dict):
for k in s.keys():
scene_key_counts[k] += 1
# cta type
cta_type_counts[_type_name(doc.get("cta"))] += 1
return ScanResult(
total_files=len(files),
parsed_files=parsed,
failed_files=failed,
schema_counts=schema_counts,
top_level_key_counts=top_level_key_counts,
scene_key_counts=scene_key_counts,
cta_type_counts=cta_type_counts,
sample_by_schema=dict(sample_by_schema),
)
def _to_jsonable(sr: ScanResult) -> Dict[str, Any]:
return {
"total_files": sr.total_files,
"parsed_files": sr.parsed_files,
"failed_files": sr.failed_files,
"schema_counts": dict(sr.schema_counts),
"cta_type_counts": dict(sr.cta_type_counts),
"top_level_key_counts": dict(sr.top_level_key_counts),
"scene_key_counts": dict(sr.scene_key_counts),
"sample_by_schema": sr.sample_by_schema,
}
def _render_markdown(sr: ScanResult, temp_dir: Path) -> str:
lines: List[str] = []
lines.append("# Legacy Project JSON Schema Scan Report\n")
lines.append(f"- temp_dir: `{temp_dir}`")
lines.append(f"- total_files: {sr.total_files}")
lines.append(f"- parsed_files: {sr.parsed_files}")
lines.append(f"- failed_files: {sr.failed_files}\n")
lines.append("## Schema variants\n")
for k, v in sr.schema_counts.most_common():
samples = ", ".join(sr.sample_by_schema.get(k, [])[:5])
lines.append(f"- {k}: {v} (samples: {samples})")
lines.append("")
lines.append("## CTA type distribution\n")
for k, v in sr.cta_type_counts.most_common():
lines.append(f"- {k}: {v}")
lines.append("")
def _topn(counter: Counter, n: int = 30) -> List[Tuple[str, int]]:
return counter.most_common(n)
lines.append("## Top-level keys (top 30)\n")
for k, v in _topn(sr.top_level_key_counts, 30):
lines.append(f"- {k}: {v}/{sr.parsed_files}")
lines.append("")
lines.append("## Scene keys (top 40)\n")
for k, v in _topn(sr.scene_key_counts, 40):
lines.append(f"- {k}: {v}")
lines.append("")
return "\n".join(lines) + "\n"
def main() -> int:
parser = argparse.ArgumentParser(description="Scan legacy project JSON schemas")
parser.add_argument("--temp-dir", required=True, help="Directory containing project_*.json")
parser.add_argument("--out-json", required=False, help="Write summary json to path")
parser.add_argument("--out-md", required=False, help="Write markdown report to path")
args = parser.parse_args()
temp_dir = Path(args.temp_dir)
if not temp_dir.exists():
raise SystemExit(f"temp dir not found: {temp_dir}")
sr = scan_dir(temp_dir)
payload = _to_jsonable(sr)
print(json.dumps(payload, ensure_ascii=False, indent=2))
if args.out_json:
out_json = Path(args.out_json)
out_json.parent.mkdir(parents=True, exist_ok=True)
out_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
if args.out_md:
out_md = Path(args.out_md)
out_md.parent.mkdir(parents=True, exist_ok=True)
out_md.write_text(_render_markdown(sr, temp_dir), encoding="utf-8")
return 0
if __name__ == "__main__":
raise SystemExit(main())