"""Initialise the SQLite database and import existing JSON state. Creates the tables defined in web.models (User, Job, DailyActivity, TokenUsage), then walks the three legacy JSON files under web/data/ and imports each into the matching table. Safe to re-run: if a table already has rows, the import is skipped for that table (a message is printed). Use ``--force`` to wipe and re-import all tables. Usage: python scripts/init_db.py # create + import (if empty) python scripts/init_db.py --force # wipe + reimport everything Does NOT touch the JSON files. They remain on disk until you choose to move them to a backup folder (see DEPLOYMENT_STEPS.md Phase 1, step 9). """ from __future__ import annotations import argparse import json import sys from datetime import date, datetime, timedelta from pathlib import Path # Make 'web' importable when this script is launched from anywhere ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from web.db import Base, engine, SessionLocal # noqa: E402 from web import models # noqa: E402, F401 ← registers tables on Base.metadata DATA_DIR = ROOT / "web" / "data" HISTORY_JSON = DATA_DIR / "history.json" ACTIVITY_JSON = DATA_DIR / "activity.json" TOKENS_JSON = DATA_DIR / "tokens.json" def _parse_ts(s: str | None) -> datetime | None: """Parse an ISO-8601 string; tolerate missing/empty values.""" if not s: return None try: return datetime.fromisoformat(s) except ValueError: return None def import_history(db) -> int: if not HISTORY_JSON.exists(): print(f" [skip] {HISTORY_JSON.name} not found") return 0 raw = json.loads(HISTORY_JSON.read_text(encoding="utf-8")) if not isinstance(raw, list): print(f" [warn] {HISTORY_JSON.name} is not a list; skipping") return 0 inserted = 0 for entry in raw: job_id = entry.get("job_id") if not job_id: continue created_at = _parse_ts(entry.get("timestamp")) or datetime.utcnow() duration_sec = entry.get("duration_sec") completed_at = (created_at + timedelta(seconds=duration_sec)) if duration_sec else None job = models.Job( id = job_id[:8], # column is String(8) user_id = None, course_title = entry.get("course_title") or "", template_name = entry.get("template") or None, duration_days = entry.get("duration"), language = entry.get("language"), status = entry.get("status") or "complete", progress = 100, log_message = None, slide_count = entry.get("slides"), qa_score = entry.get("qa_score"), qa_verdict = entry.get("qa_verdict"), duration_sec = duration_sec, output_local = entry.get("output") or None, output_s3_key = None, created_at = created_at, completed_at = completed_at, ) db.add(job) inserted += 1 db.commit() return inserted def import_activity(db) -> int: if not ACTIVITY_JSON.exists(): print(f" [skip] {ACTIVITY_JSON.name} not found") return 0 raw = json.loads(ACTIVITY_JSON.read_text(encoding="utf-8")) days = raw.get("days", {}) if isinstance(raw, dict) else {} inserted = 0 for day_str, vals in days.items(): try: d = date.fromisoformat(day_str) except ValueError: continue db.add(models.DailyActivity( date = d, courses = int(vals.get("courses", 0) or 0), slides = int(vals.get("slides", 0) or 0), )) inserted += 1 db.commit() return inserted def import_tokens(db) -> int: """Legacy tokens.json holds DAILY aggregates only — no per-call records. Import each day as one synthetic TokenUsage row tagged model='aggregate-legacy' so we can distinguish them from per-call data written by the new token tracker going forward. """ if not TOKENS_JSON.exists(): print(f" [skip] {TOKENS_JSON.name} not found") return 0 raw = json.loads(TOKENS_JSON.read_text(encoding="utf-8")) days = raw.get("days", {}) if isinstance(raw, dict) else {} inserted = 0 for day_str, vals in days.items(): try: d = date.fromisoformat(day_str) except ValueError: continue # Anchor synthetic record at start of day so per-day grouping works. ts = datetime(d.year, d.month, d.day, 0, 0, 0) db.add(models.TokenUsage( job_id = None, timestamp = ts, model = "aggregate-legacy", input_tokens = int(vals.get("input", 0) or 0), output_tokens = int(vals.get("output", 0) or 0), cache_creation_tokens = 0, cache_read_tokens = 0, )) inserted += 1 db.commit() return inserted def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--force", action="store_true", help="Wipe existing tables and re-import everything.") args = parser.parse_args() print(f"DATABASE_URL: {engine.url}") print() if args.force: print("Dropping all tables (--force)...") Base.metadata.drop_all(engine) print("Creating tables...") Base.metadata.create_all(engine) db = SessionLocal() try: results: dict[str, int | str] = {} # Jobs existing = db.query(models.Job).count() if existing and not args.force: results["jobs"] = f"skipped ({existing} rows already present)" else: results["jobs"] = import_history(db) # Activity existing = db.query(models.DailyActivity).count() if existing and not args.force: results["daily_activity"] = f"skipped ({existing} rows already present)" else: results["daily_activity"] = import_activity(db) # Tokens existing = db.query(models.TokenUsage).count() if existing and not args.force: results["token_usage"] = f"skipped ({existing} rows already present)" else: results["token_usage"] = import_tokens(db) # Users results["users"] = db.query(models.User).count() print() print("Final row counts:") for table in ("users", "jobs", "daily_activity", "token_usage"): count = db.query(getattr(models, { "users": "User", "jobs": "Job", "daily_activity": "DailyActivity", "token_usage": "TokenUsage", }[table])).count() note = f" ({results[table]})" if isinstance(results[table], str) else "" print(f" {table:<18} {count:>5}{note}") print() print("Done.") return 0 finally: db.close() if __name__ == "__main__": raise SystemExit(main())