233 lines
6.5 KiB
Python
233 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Scan legacy project JSON schemas under temp dir.
|
|
|
|
Purpose:
|
|
- Identify schema variants for /opt/gloda-factory/temp/project_*.json
|
|
- Produce a machine-readable summary + a markdown report
|
|
|
|
This script is READ-ONLY.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from collections import Counter, defaultdict
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Tuple
|
|
|
|
|
|
def _safe_load_json(path: Path) -> Dict[str, Any] | None:
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _type_name(v: Any) -> str:
|
|
if v is None:
|
|
return "null"
|
|
if isinstance(v, bool):
|
|
return "bool"
|
|
if isinstance(v, int):
|
|
return "int"
|
|
if isinstance(v, float):
|
|
return "float"
|
|
if isinstance(v, str):
|
|
return "str"
|
|
if isinstance(v, list):
|
|
return "list"
|
|
if isinstance(v, dict):
|
|
return "dict"
|
|
return type(v).__name__
|
|
|
|
|
|
def _detect_schema_variant(doc: Dict[str, Any]) -> str:
|
|
"""
|
|
Heuristic:
|
|
- Schema_A: scenes contain prompt-like fields (image_prompt/visual_prompt/video_prompt)
|
|
- Schema_B: scenes do NOT contain these, but contain keyframe/story_beat/camera_movement/image_url
|
|
"""
|
|
scenes = doc.get("scenes") or []
|
|
if not isinstance(scenes, list):
|
|
return "Unknown"
|
|
|
|
prompt_keys = {"image_prompt", "visual_prompt", "video_prompt"}
|
|
seen_prompt = False
|
|
for s in scenes:
|
|
if isinstance(s, dict) and (set(s.keys()) & prompt_keys):
|
|
seen_prompt = True
|
|
break
|
|
|
|
if seen_prompt:
|
|
return "Schema_A"
|
|
|
|
# If no prompt keys, but has typical B keys, call Schema_B
|
|
typical_b = {"keyframe", "story_beat", "camera_movement", "image_url"}
|
|
seen_b = False
|
|
for s in scenes:
|
|
if isinstance(s, dict) and (set(s.keys()) & typical_b):
|
|
seen_b = True
|
|
break
|
|
|
|
return "Schema_B" if seen_b else "Unknown"
|
|
|
|
|
|
@dataclass
|
|
class ScanResult:
|
|
total_files: int
|
|
parsed_files: int
|
|
failed_files: int
|
|
schema_counts: Counter
|
|
top_level_key_counts: Counter
|
|
scene_key_counts: Counter
|
|
cta_type_counts: Counter
|
|
sample_by_schema: Dict[str, List[str]]
|
|
|
|
|
|
def scan_dir(temp_dir: Path) -> ScanResult:
|
|
files = sorted(temp_dir.glob("project_*.json"))
|
|
schema_counts: Counter = Counter()
|
|
top_level_key_counts: Counter = Counter()
|
|
scene_key_counts: Counter = Counter()
|
|
cta_type_counts: Counter = Counter()
|
|
sample_by_schema: Dict[str, List[str]] = defaultdict(list)
|
|
|
|
parsed = 0
|
|
failed = 0
|
|
|
|
for f in files:
|
|
doc = _safe_load_json(f)
|
|
if not isinstance(doc, dict):
|
|
failed += 1
|
|
continue
|
|
parsed += 1
|
|
|
|
schema = _detect_schema_variant(doc)
|
|
schema_counts[schema] += 1
|
|
if len(sample_by_schema[schema]) < 5:
|
|
pid = str(doc.get("id") or f.stem.replace("project_", ""))
|
|
sample_by_schema[schema].append(pid)
|
|
|
|
# top-level keys
|
|
for k in doc.keys():
|
|
top_level_key_counts[k] += 1
|
|
|
|
# scenes keys
|
|
scenes = doc.get("scenes") or []
|
|
if isinstance(scenes, list):
|
|
for s in scenes:
|
|
if isinstance(s, dict):
|
|
for k in s.keys():
|
|
scene_key_counts[k] += 1
|
|
|
|
# cta type
|
|
cta_type_counts[_type_name(doc.get("cta"))] += 1
|
|
|
|
return ScanResult(
|
|
total_files=len(files),
|
|
parsed_files=parsed,
|
|
failed_files=failed,
|
|
schema_counts=schema_counts,
|
|
top_level_key_counts=top_level_key_counts,
|
|
scene_key_counts=scene_key_counts,
|
|
cta_type_counts=cta_type_counts,
|
|
sample_by_schema=dict(sample_by_schema),
|
|
)
|
|
|
|
|
|
def _to_jsonable(sr: ScanResult) -> Dict[str, Any]:
|
|
return {
|
|
"total_files": sr.total_files,
|
|
"parsed_files": sr.parsed_files,
|
|
"failed_files": sr.failed_files,
|
|
"schema_counts": dict(sr.schema_counts),
|
|
"cta_type_counts": dict(sr.cta_type_counts),
|
|
"top_level_key_counts": dict(sr.top_level_key_counts),
|
|
"scene_key_counts": dict(sr.scene_key_counts),
|
|
"sample_by_schema": sr.sample_by_schema,
|
|
}
|
|
|
|
|
|
def _render_markdown(sr: ScanResult, temp_dir: Path) -> str:
|
|
lines: List[str] = []
|
|
lines.append("# Legacy Project JSON Schema Scan Report\n")
|
|
lines.append(f"- temp_dir: `{temp_dir}`")
|
|
lines.append(f"- total_files: {sr.total_files}")
|
|
lines.append(f"- parsed_files: {sr.parsed_files}")
|
|
lines.append(f"- failed_files: {sr.failed_files}\n")
|
|
|
|
lines.append("## Schema variants\n")
|
|
for k, v in sr.schema_counts.most_common():
|
|
samples = ", ".join(sr.sample_by_schema.get(k, [])[:5])
|
|
lines.append(f"- {k}: {v} (samples: {samples})")
|
|
lines.append("")
|
|
|
|
lines.append("## CTA type distribution\n")
|
|
for k, v in sr.cta_type_counts.most_common():
|
|
lines.append(f"- {k}: {v}")
|
|
lines.append("")
|
|
|
|
def _topn(counter: Counter, n: int = 30) -> List[Tuple[str, int]]:
|
|
return counter.most_common(n)
|
|
|
|
lines.append("## Top-level keys (top 30)\n")
|
|
for k, v in _topn(sr.top_level_key_counts, 30):
|
|
lines.append(f"- {k}: {v}/{sr.parsed_files}")
|
|
lines.append("")
|
|
|
|
lines.append("## Scene keys (top 40)\n")
|
|
for k, v in _topn(sr.scene_key_counts, 40):
|
|
lines.append(f"- {k}: {v}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Scan legacy project JSON schemas")
|
|
parser.add_argument("--temp-dir", required=True, help="Directory containing project_*.json")
|
|
parser.add_argument("--out-json", required=False, help="Write summary json to path")
|
|
parser.add_argument("--out-md", required=False, help="Write markdown report to path")
|
|
args = parser.parse_args()
|
|
|
|
temp_dir = Path(args.temp_dir)
|
|
if not temp_dir.exists():
|
|
raise SystemExit(f"temp dir not found: {temp_dir}")
|
|
|
|
sr = scan_dir(temp_dir)
|
|
payload = _to_jsonable(sr)
|
|
|
|
print(json.dumps(payload, ensure_ascii=False, indent=2))
|
|
|
|
if args.out_json:
|
|
out_json = Path(args.out_json)
|
|
out_json.parent.mkdir(parents=True, exist_ok=True)
|
|
out_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
if args.out_md:
|
|
out_md = Path(args.out_md)
|
|
out_md.parent.mkdir(parents=True, exist_ok=True)
|
|
out_md.write_text(_render_markdown(sr, temp_dir), encoding="utf-8")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|