#!/usr/bin/env python3 """ Scan legacy project JSON schemas under temp dir. Purpose: - Identify schema variants for /opt/gloda-factory/temp/project_*.json - Produce a machine-readable summary + a markdown report This script is READ-ONLY. """ from __future__ import annotations import argparse import json from collections import Counter, defaultdict from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Tuple def _safe_load_json(path: Path) -> Dict[str, Any] | None: try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None def _type_name(v: Any) -> str: if v is None: return "null" if isinstance(v, bool): return "bool" if isinstance(v, int): return "int" if isinstance(v, float): return "float" if isinstance(v, str): return "str" if isinstance(v, list): return "list" if isinstance(v, dict): return "dict" return type(v).__name__ def _detect_schema_variant(doc: Dict[str, Any]) -> str: """ Heuristic: - Schema_A: scenes contain prompt-like fields (image_prompt/visual_prompt/video_prompt) - Schema_B: scenes do NOT contain these, but contain keyframe/story_beat/camera_movement/image_url """ scenes = doc.get("scenes") or [] if not isinstance(scenes, list): return "Unknown" prompt_keys = {"image_prompt", "visual_prompt", "video_prompt"} seen_prompt = False for s in scenes: if isinstance(s, dict) and (set(s.keys()) & prompt_keys): seen_prompt = True break if seen_prompt: return "Schema_A" # If no prompt keys, but has typical B keys, call Schema_B typical_b = {"keyframe", "story_beat", "camera_movement", "image_url"} seen_b = False for s in scenes: if isinstance(s, dict) and (set(s.keys()) & typical_b): seen_b = True break return "Schema_B" if seen_b else "Unknown" @dataclass class ScanResult: total_files: int parsed_files: int failed_files: int schema_counts: Counter top_level_key_counts: Counter scene_key_counts: Counter cta_type_counts: Counter sample_by_schema: Dict[str, List[str]] def scan_dir(temp_dir: Path) -> ScanResult: files = sorted(temp_dir.glob("project_*.json")) schema_counts: Counter = Counter() top_level_key_counts: Counter = Counter() scene_key_counts: Counter = Counter() cta_type_counts: Counter = Counter() sample_by_schema: Dict[str, List[str]] = defaultdict(list) parsed = 0 failed = 0 for f in files: doc = _safe_load_json(f) if not isinstance(doc, dict): failed += 1 continue parsed += 1 schema = _detect_schema_variant(doc) schema_counts[schema] += 1 if len(sample_by_schema[schema]) < 5: pid = str(doc.get("id") or f.stem.replace("project_", "")) sample_by_schema[schema].append(pid) # top-level keys for k in doc.keys(): top_level_key_counts[k] += 1 # scenes keys scenes = doc.get("scenes") or [] if isinstance(scenes, list): for s in scenes: if isinstance(s, dict): for k in s.keys(): scene_key_counts[k] += 1 # cta type cta_type_counts[_type_name(doc.get("cta"))] += 1 return ScanResult( total_files=len(files), parsed_files=parsed, failed_files=failed, schema_counts=schema_counts, top_level_key_counts=top_level_key_counts, scene_key_counts=scene_key_counts, cta_type_counts=cta_type_counts, sample_by_schema=dict(sample_by_schema), ) def _to_jsonable(sr: ScanResult) -> Dict[str, Any]: return { "total_files": sr.total_files, "parsed_files": sr.parsed_files, "failed_files": sr.failed_files, "schema_counts": dict(sr.schema_counts), "cta_type_counts": dict(sr.cta_type_counts), "top_level_key_counts": dict(sr.top_level_key_counts), "scene_key_counts": dict(sr.scene_key_counts), "sample_by_schema": sr.sample_by_schema, } def _render_markdown(sr: ScanResult, temp_dir: Path) -> str: lines: List[str] = [] lines.append("# Legacy Project JSON Schema Scan Report\n") lines.append(f"- temp_dir: `{temp_dir}`") lines.append(f"- total_files: {sr.total_files}") lines.append(f"- parsed_files: {sr.parsed_files}") lines.append(f"- failed_files: {sr.failed_files}\n") lines.append("## Schema variants\n") for k, v in sr.schema_counts.most_common(): samples = ", ".join(sr.sample_by_schema.get(k, [])[:5]) lines.append(f"- {k}: {v} (samples: {samples})") lines.append("") lines.append("## CTA type distribution\n") for k, v in sr.cta_type_counts.most_common(): lines.append(f"- {k}: {v}") lines.append("") def _topn(counter: Counter, n: int = 30) -> List[Tuple[str, int]]: return counter.most_common(n) lines.append("## Top-level keys (top 30)\n") for k, v in _topn(sr.top_level_key_counts, 30): lines.append(f"- {k}: {v}/{sr.parsed_files}") lines.append("") lines.append("## Scene keys (top 40)\n") for k, v in _topn(sr.scene_key_counts, 40): lines.append(f"- {k}: {v}") lines.append("") return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser(description="Scan legacy project JSON schemas") parser.add_argument("--temp-dir", required=True, help="Directory containing project_*.json") parser.add_argument("--out-json", required=False, help="Write summary json to path") parser.add_argument("--out-md", required=False, help="Write markdown report to path") args = parser.parse_args() temp_dir = Path(args.temp_dir) if not temp_dir.exists(): raise SystemExit(f"temp dir not found: {temp_dir}") sr = scan_dir(temp_dir) payload = _to_jsonable(sr) print(json.dumps(payload, ensure_ascii=False, indent=2)) if args.out_json: out_json = Path(args.out_json) out_json.parent.mkdir(parents=True, exist_ok=True) out_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") if args.out_md: out_md = Path(args.out_md) out_md.parent.mkdir(parents=True, exist_ok=True) out_md.write_text(_render_markdown(sr, temp_dir), encoding="utf-8") return 0 if __name__ == "__main__": raise SystemExit(main())