""" web/app.py ---------- Flask backend for the ProjectX courseware dashboard. Routes: GET / - Dashboard UI POST /api/upload/template - Upload PPTX template POST /api/upload/outline - Upload PDF outline GET /api/files - List uploaded files POST /api/generate - Start pipeline (background thread) GET /api/job/ - Poll job status GET /api/download/ - Download generated PPTX GET /api/stats - Daily activity for charts GET /api/history - Past generation records """ from __future__ import annotations import io import json import os import secrets import sys import threading import time import uuid from datetime import date, datetime, timedelta from pathlib import Path from flask import Flask, jsonify, render_template, request, send_file from sqlalchemy import func # Force UTF-8 output on Windows consoles that default to cp1252 so that # Unicode characters in log messages (≥ — …) don't raise UnicodeEncodeError. if hasattr(sys.stdout, "reconfigure"): try: sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") except Exception: pass # ── Path setup ──────────────────────────────────────────────────────────────── ROOT = Path(__file__).parent.parent WEB_DIR = Path(__file__).parent UPLOAD = WEB_DIR / "uploads" DATA_DIR = WEB_DIR / "data" # Legacy JSON state files (kept for safety; new writes go to the DB instead). ACTIVITY = DATA_DIR / "activity.json" HISTORY = DATA_DIR / "history.json" # ── Database ────────────────────────────────────────────────────────────────── # Phase 1: replace JSON state files (history.json, activity.json, tokens.json) # with SQLite. Schema lives in web/models.py; the SessionLocal factory comes # from web/db.py. Both files are imported here so SQLAlchemy registers all # tables on Base.metadata at process start. from web.db import SessionLocal from web import models # ── Tagged-infographic bank: every shape carries an "Item N" placeholder tag # so the slide builder can fill slots deterministically (no spatial heuristics). BANK_DIR = ROOT / "tagged infographics" / "new labeled grouped infographics" CATALOGUE = ROOT / "tagged_bank" / "tagged_catalogue.json" # Make ppt_generator importable sys.path.insert(0, str(ROOT / "ppt_generator")) # Activate process-wide Anthropic token tracking (monkey-patches anthropic.Anthropic # so every messages.create() call updates web/data/tokens.json automatically). try: from engine import token_tracker # noqa: F401 except Exception as _e: print(f"[warn] token_tracker not loaded: {_e!r}") app = Flask(__name__) app.config["MAX_CONTENT_LENGTH"] = 100 * 1024 * 1024 # 100 MB max upload # Disable Flask's default 12-hour static-file cache header so edits load on refresh. app.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0 # Force Jinja2 to re-read templates from disk on every request. Without this, # Flask caches the parsed template even though it re-reads the file — meaning # HTML edits silently don't take effect until restart. With this on, every # refresh of /login or / picks up the latest template content. app.config["TEMPLATES_AUTO_RELOAD"] = True app.jinja_env.auto_reload = True # ── Session config ──────────────────────────────────────────────────────────── # Cookie signing key — set SESSION_SECRET in .env for stable sessions across # server restarts. In dev (no env var) we generate an ephemeral key and warn; # this means sessions reset on every restart, which is fine for development. app.secret_key = os.environ.get("SESSION_SECRET") or secrets.token_hex(32) if not os.environ.get("SESSION_SECRET"): print("[warn] SESSION_SECRET not set — using ephemeral key. " "Sessions reset on every restart. " "Set SESSION_SECRET in .env for stable sessions.") # Long enough to outlast a pipeline run (~10–12 min) plus headroom. app.permanent_session_lifetime = timedelta(hours=8) # Register the auth blueprint (login / logout / /api/me + decorators). from web.auth import auth_bp, login_required, admin_required # noqa: E402 from flask import g # noqa: E402 app.register_blueprint(auth_bp) # Cache-bust token: regenerated each server start AND surfaced as ?v= # on /static asset URLs so browsers cannot serve a stale copy. _STATIC_TOKEN = str(int(time.time())) @app.context_processor def _inject_static_token(): return {"static_token": _STATIC_TOKEN} @app.after_request def _no_cache_static(resp): """Dev convenience: never let the browser cache our HTML/CSS/JS so edits show up on plain refresh without DevTools tricks.""" if (request.path in ("/", "/login") or request.path.startswith("/static/")): resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" resp.headers["Expires"] = "0" return resp # In-memory job store {job_id: {status, progress, message, output_path, error}} _jobs: dict[str, dict] = {} _jobs_lock = threading.Lock() # ═══════════════════════════════════════════════════════════════════════════════ # Helpers # ═══════════════════════════════════════════════════════════════════════════════ def _load_json(path: Path, default) -> dict | list: if path.exists(): try: return json.loads(path.read_text(encoding="utf-8")) except Exception: pass return default def _save_json(path: Path, data): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") def _today() -> str: return date.today().isoformat() # ── DB helpers (replaces _record_activity / _append_history) ────────────────── def _create_job_row(job_id: str, params: dict, user_id: int | None = None) -> None: """Insert a new Job row at queue time, stamped with the issuing user.""" db = SessionLocal() try: db.add(models.Job( id = job_id, user_id = user_id, course_title = params.get("course_title", "") or "", template_name = Path(params.get("template_path", "")).name or None, duration_days = int(params.get("duration_days", 1)), language = params.get("language", "uk"), status = "queued", progress = 0, )) db.commit() finally: db.close() def _update_job(job_id: str, **fields) -> None: """Update a Job row in place. Silently ignores unknown job_ids.""" if not fields: return db = SessionLocal() try: job = db.get(models.Job, job_id) if job is None: return for k, v in fields.items(): setattr(job, k, v) db.commit() finally: db.close() def _bump_daily_activity(slides: int, courses: int = 1) -> None: """Increment today's row in daily_activity, creating it if missing.""" today = date.today() db = SessionLocal() try: row = db.get(models.DailyActivity, today) if row is None: db.add(models.DailyActivity( date=today, courses=courses, slides=slides, )) else: row.courses += courses row.slides += slides db.commit() finally: db.close() def _job_to_history_dict(j: "models.Job") -> dict: """Shape a Job row for the /api/history payload (matches legacy JSON).""" final_ts = j.completed_at or j.created_at return { "job_id": j.id, "course_title": j.course_title, "slides": j.slide_count, "duration": j.duration_days, "date": final_ts.strftime("%Y-%m-%d %H:%M"), "timestamp": final_ts.isoformat(), "duration_sec": j.duration_sec, "output": j.output_local, "qa_score": j.qa_score, "qa_verdict": j.qa_verdict, } def _list_uploads(subfolder: str) -> list[dict]: folder = UPLOAD / subfolder folder.mkdir(parents=True, exist_ok=True) files = [] for p in sorted(folder.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if not p.is_file(): continue entry = { "name": p.name, "path": str(p), "size_kb": round(p.stat().st_size / 1024, 1), "modified": datetime.fromtimestamp(p.stat().st_mtime).strftime("%Y-%m-%d %H:%M"), } if subfolder == "templates" and p.suffix.lower() == ".pptx": cache_path = _inspection_cache_path(p.name) if cache_path.exists(): report = _load_json(cache_path, {}) entry["inspection_status"] = report.get("status", "warn") entry["inspection_summary"] = report.get("summary", "") else: # Lazily inspect existing templates so the badge appears # without requiring a re-upload. report = _inspect_and_cache(p) entry["inspection_status"] = report.get("status", "warn") entry["inspection_summary"] = report.get("summary", "") files.append(entry) return files # ═══════════════════════════════════════════════════════════════════════════════ # Background pipeline worker # ═══════════════════════════════════════════════════════════════════════════════ def _run_pipeline_job(job_id: str, params: dict): def _set_job(status: str, progress: int, message: str): with _jobs_lock: job = _jobs[job_id] job.update({ "status": status, "progress": progress, "message": message, }) # Append to live build log (dedupe identical consecutive messages). log = job.setdefault("log", []) if not log or log[-1].get("msg") != message: log.append({ "ts": datetime.now().strftime("%H:%M:%S"), "pct": progress, "msg": message, }) # Keep the log bounded — only the last 200 entries if len(log) > 200: del log[: len(log) - 200] def progress_cb(stage: int, pct: int, msg: str): _set_job("running", pct, msg) def plan_cb(slide_plan): """Once planning finishes, expose a compact list of slide titles + types so the frontend can show the live slide-plan preview underneath the status box during build.""" summary = [ {"title": (s.get("title") or s.get("step_label") or "—"), "type": s.get("type", "content")} for s in (slide_plan or []) ] with _jobs_lock: _jobs[job_id]["slide_plan"] = summary _jobs[job_id]["plan_len"] = len(summary) job_start = datetime.now() # Stamp every Anthropic API call made on this thread with this job_id # so the per-call TokenUsage rows link back to the build that issued them. from engine.token_tracker import set_active_job set_active_job(job_id) try: _set_job("running", 5, "Reading outline…") _update_job(job_id, status="running", progress=5) import pdfplumber outline_path = params["outline_path"] with pdfplumber.open(outline_path) as pdf: outline_text = "\n".join(p.extract_text() or "" for p in pdf.pages) outline_text = "\n".join( ln for ln in outline_text.splitlines() if not any(x in ln for x in ["theknowledgeacademy", "T: 01344", "Page "]) ) api_key = params.get("api_key") or os.environ.get("ANTHROPIC_API_KEY", "") output_name = f"{job_id}.pptx" output_path = str(UPLOAD / "outputs" / output_name) (UPLOAD / "outputs").mkdir(parents=True, exist_ok=True) from pipeline import run_pipeline result = run_pipeline( api_key = api_key, outline = outline_text, course_title = params.get("course_title", ""), duration_days = int(params.get("duration_days", 1)), output_path = output_path, template_path = params.get("template_path", ""), gold_path = params.get("gold_path", ""), bank_dir = params.get("bank_dir", ""), catalogue_path = params.get("catalogue_path", ""), images_dir = str(ROOT / "images"), language = params.get("language", "uk"), progress_callback = progress_cb, plan_callback = plan_cb, ) slide_plan = result.get("slide_plan", []) qa_verdict = result.get("qa_verdict", "PASS") qa_issues = result.get("qa_issues", []) result_path = result.get("output_path", "") # Real slide count = slides in the built PPTX, NOT len(slide_plan). # The plan is high-level (one "groups" entry expands into a groups slide # + one drill_down per item), so plan length under-counts actual slides. slide_count = len(slide_plan) try: from pptx import Presentation if result_path and Path(result_path).exists(): slide_count = len(Presentation(result_path).slides) except Exception as exc: print(f" [warn] could not count built slides ({exc!r}) — using plan length") # Persist plan regardless of build outcome plan_path = UPLOAD / "outputs" / f"{job_id}_plan.json" plan_path.write_text(json.dumps(slide_plan, indent=2, ensure_ascii=False), encoding="utf-8") # Pipeline errors (plan failed, QA blocked build, etc.) pipeline_errors = result.get("errors", []) if pipeline_errors: msg = "\n".join(pipeline_errors) with _jobs_lock: _jobs[job_id].update({ "status": "error", "progress": 0, "message": msg, "error": msg, "qa_verdict": qa_verdict, "qa_issues": qa_issues, }) _update_job(job_id, status="failed", progress=0, log_message=msg, completed_at=datetime.utcnow(), qa_verdict=qa_verdict) return # QA blocked the build if qa_verdict == "FAIL" or not result_path or not Path(result_path).exists(): issue_lines = [i.get("detail", i.get("code", "?")) for i in qa_issues] qa_score = result.get("qa_score", 0) msg = f"QA score {qa_score}/100 — below passing threshold after all fix attempts.\n\n" + "\n".join(issue_lines) with _jobs_lock: _jobs[job_id].update({ "status": "error", "progress": 0, "message": msg, "error": msg, "qa_verdict": qa_verdict, "qa_issues": qa_issues, }) _update_job(job_id, status="failed", progress=0, log_message=msg, completed_at=datetime.utcnow(), qa_score=qa_score, qa_verdict=qa_verdict) return _set_job("running", 92, "Saving PPTX…") _bump_daily_activity(slides=slide_count, courses=1) job_end = datetime.now() duration_sec = max(0, int((job_end - job_start).total_seconds())) # Upload the generated PPTX to S3 and remove the local file. We keep # output_local set ONLY if the upload fails — that way the download # endpoint can fall back to streaming from disk. s3_key: str | None = None try: from web.s3 import upload_output s3_key = upload_output(result_path, job_id) try: Path(result_path).unlink() except Exception as exc: print(f" [warn] local cleanup failed ({exc!r}) — file remains at {result_path}") except Exception as exc: print(f" [warn] S3 upload failed ({exc!r}) — keeping local copy at {result_path}") _update_job( job_id, status = "complete", progress = 100, slide_count = slide_count, qa_score = result.get("qa_score", 100), qa_verdict = qa_verdict, duration_sec = duration_sec, output_local = output_name if s3_key is None else None, output_s3_key = s3_key, completed_at = job_end, log_message = None, ) with _jobs_lock: _jobs[job_id].update({ "status": "done", "progress": 100, "message": f"Done — {slide_count} slides generated.", "output_path": result_path, "output_name": output_name, "slide_count": slide_count, "qa_verdict": qa_verdict, "qa_issues": qa_issues, "qa_score": result.get("qa_score", 100), }) except Exception as e: import traceback full_tb = traceback.format_exc() try: (WEB_DIR / "error_log.txt").write_text(full_tb, encoding="utf-8") except Exception: pass with _jobs_lock: _jobs[job_id].update({ "status": "error", "progress": 0, "message": full_tb, "error": full_tb, }) _update_job(job_id, status="failed", progress=0, log_message=full_tb, completed_at=datetime.utcnow()) finally: # Clear job-context so subsequent Anthropic calls (none expected on # this thread after the build, but defensive) are not mislabelled. set_active_job(None) # ═══════════════════════════════════════════════════════════════════════════════ # Routes # ═══════════════════════════════════════════════════════════════════════════════ @app.route("/") @login_required def index(): return render_template("index.html") # ── Uploads ─────────────────────────────────────────────────────────────────── @app.route("/api/upload/template", methods=["POST"]) @login_required def upload_template(): """Upload-with-validation. 1. Save the file to a temp name inside the templates dir. 2. Run the inspector against the temp file. 3. If status == 'fail' → delete the temp, return 400 with the reasons. If status == 'pass' or 'warn' → atomically rename the temp to the final filename, cache the inspection report, return 200. This way the templates/ folder never accumulates files that don't meet the 7-slide contract. """ import tempfile import os f = request.files.get("file") if not f or not f.filename: return jsonify({"error": "No file provided"}), 400 if not f.filename.lower().endswith(".pptx"): return jsonify({"error": "Only .pptx files accepted"}), 400 templates_dir = UPLOAD / "templates" templates_dir.mkdir(parents=True, exist_ok=True) # Save into a temp file in the SAME directory so the eventual rename is # atomic (cross-volume renames may fail). tmp_fd, tmp_name = tempfile.mkstemp( suffix=".pptx", prefix=".upload_", dir=str(templates_dir), ) os.close(tmp_fd) tmp_path = Path(tmp_name) f.save(str(tmp_path)) # Inspect the file BEFORE deciding whether to keep it. try: from engine.template_inspector import inspect_template report = inspect_template(tmp_path) except Exception as exc: tmp_path.unlink(missing_ok=True) return jsonify({ "error": f"Could not inspect template: {exc!r}", }), 500 # ── Reject hard fails ──────────────────────────────────────────────── if report.get("status") == "fail": tmp_path.unlink(missing_ok=True) # Pull human-readable reasons from each failing slide. fail_lines: list[str] = [] for s in report.get("slides", []): if s.get("status") != "fail": continue num = s.get("num", "?") role = s.get("role", "?") tag = f"Slide {num}" if num else "Template" for r in s.get("reasons", []): fail_lines.append(f"{tag} ({role}): {r}") return jsonify({ "error": "Template did not pass inspection.", "summary": report.get("summary", ""), "fail_reasons": fail_lines, "inspection": report, }), 400 # ── Accept pass / warn ─────────────────────────────────────────────── dest = templates_dir / f.filename try: os.replace(str(tmp_path), str(dest)) # atomic on Windows + POSIX except OSError as exc: tmp_path.unlink(missing_ok=True) return jsonify({ "error": f"Could not save template: {exc!r}", }), 500 cache_path = _inspection_cache_path(f.filename) _save_json(cache_path, report) return jsonify({ "success": True, "name": f.filename, "path": str(dest), "inspection": report, }) def _inspection_cache_path(template_name: str) -> Path: safe = Path(template_name).stem return DATA_DIR / "template_inspections" / f"{safe}.json" def _inspect_and_cache(pptx_path: Path) -> dict: """Run the inspector and persist the JSON report alongside other data.""" try: from engine.template_inspector import inspect_template report = inspect_template(pptx_path) except Exception as exc: report = { "status": "fail", "summary": f"Inspection error: {exc!r}", "meta": {}, "slides": [], } cache_path = _inspection_cache_path(pptx_path.name) _save_json(cache_path, report) return report @app.route("/api/upload/outline", methods=["POST"]) @login_required def upload_outline(): f = request.files.get("file") if not f or not f.filename: return jsonify({"error": "No file provided"}), 400 if not f.filename.lower().endswith(".pdf"): return jsonify({"error": "Only .pdf files accepted"}), 400 dest = UPLOAD / "outlines" / f.filename dest.parent.mkdir(parents=True, exist_ok=True) f.save(str(dest)) return jsonify({"success": True, "name": f.filename, "path": str(dest)}) @app.route("/api/delete//", methods=["DELETE"]) @login_required def delete_file(subfolder: str, filename: str): if subfolder not in ("templates", "outlines"): return jsonify({"error": "Invalid folder"}), 400 path = UPLOAD / subfolder / filename if not path.exists(): return jsonify({"error": "File not found"}), 404 path.unlink() return jsonify({"success": True}) @app.route("/api/files") @login_required def list_files(): return jsonify({ "templates": _list_uploads("templates"), "outlines": _list_uploads("outlines"), }) # ── Generation ──────────────────────────────────────────────────────────────── @app.route("/api/generate", methods=["POST"]) @login_required def generate(): data = request.get_json(force=True) job_id = str(uuid.uuid4())[:8] # Course title is required — defense-in-depth alongside the frontend check. # Without a title, builds appear in history as anonymous blobs that the # user can't tell apart later. course_title = (data.get("course_title") or "").strip() if not course_title: return jsonify({ "error": "Course title is required. Please name the course before generating.", }), 400 # Resolve template path — fall back to first uploaded template template_name = data.get("template") if template_name: template_path = str(UPLOAD / "templates" / template_name) else: uploaded = sorted((UPLOAD / "templates").glob("*.pptx")) if uploaded: template_path = str(uploaded[0]) else: return jsonify({"error": "No template found — please upload a .pptx template first"}), 400 if not Path(template_path).exists(): return jsonify({"error": f"Template not found: {template_name}"}), 400 # Resolve outline path outline_name = data.get("outline") if not outline_name: return jsonify({"error": "Select an outline file first"}), 400 outline_path = str(UPLOAD / "outlines" / outline_name) if not Path(outline_path).exists(): return jsonify({"error": f"Outline not found: {outline_name}"}), 400 # Gold standard for drill-down templates — fixed file takes priority _GOLD_FIXED = DATA_DIR / "gold_standard.pptx" gold_name = data.get("gold") if gold_name: gold_path = str(UPLOAD / "templates" / gold_name) elif _GOLD_FIXED.exists(): gold_path = str(_GOLD_FIXED) else: uploaded_golds = sorted((UPLOAD / "templates").glob("*.pptx")) gold_path = str(uploaded_golds[0]) if uploaded_golds else "" params = { "api_key": data.get("api_key", os.environ.get("ANTHROPIC_API_KEY", "")), "course_title": course_title, "duration_days": int(data.get("duration_days", 1)), "language": (data.get("language", "uk") or "uk").lower(), "template_path": template_path, "outline_path": outline_path, "gold_path": gold_path, "bank_dir": str(BANK_DIR), "catalogue_path": str(CATALOGUE), } with _jobs_lock: _jobs[job_id] = {"status": "queued", "progress": 0, "message": "Queued…"} # Persist a queued Job row immediately so the DB has a record even if # the worker thread crashes before it gets a chance to update status. # ``g.user`` is populated by @login_required. _create_job_row(job_id, params, user_id=g.user.id) t = threading.Thread(target=_run_pipeline_job, args=(job_id, params), daemon=True) t.start() return jsonify({"job_id": job_id}) @app.route("/api/job/") @login_required def job_status(job_id: str): with _jobs_lock: job = _jobs.get(job_id) if job is None: return jsonify({"error": "Job not found"}), 404 return jsonify(job) @app.route("/api/download/") @login_required def download(job_id: str): """Serve a generated PPTX. Preferred path: the Job row's S3 key → 302 redirect to a 1-hour presigned URL (browser fetches directly from S3, no Flask bandwidth). Fallback: the Job row's local path → stream from disk. Used only for legacy jobs uploaded before Phase 3b, or jobs whose S3 upload failed at completion time. """ # Look up persistent state from DB (in-memory _jobs only has # currently/recently-active jobs). db = SessionLocal() try: job = db.get(models.Job, job_id) finally: db.close() if job is None or job.status != "complete": return jsonify({"error": "Not ready"}), 404 download_name = (job.course_title or "course").strip() or "course" download_name = f"{download_name}.pptx".replace("/", "-").replace("\\", "-") if job.output_s3_key: try: from web.s3 import presigned_get_url from flask import redirect url = presigned_get_url(job.output_s3_key, download_filename=download_name) return redirect(url, code=302) except Exception as exc: return jsonify({"error": f"S3 download URL failed: {exc!r}"}), 500 if job.output_local: path = UPLOAD / "outputs" / job.output_local if path.exists(): return send_file(str(path), as_attachment=True, download_name=download_name) return jsonify({"error": "Output not found"}), 404 # ── Stats & history ─────────────────────────────────────────────────────────── @app.route("/api/stats") @login_required def stats(): """ Query params: range = 7d (default) | 30d | all | custom start, end = ISO dates, required when range=custom """ range_type = request.args.get("range", "7d") today = date.today() db = SessionLocal() try: # Pull every DailyActivity row once. Cheap (typically <1k rows). all_rows = db.query(models.DailyActivity).all() finally: db.close() days_data = { r.date.isoformat(): {"slides": r.slides, "courses": r.courses} for r in all_rows } if range_type == "30d": start_date, end_date = today - timedelta(days=29), today elif range_type == "all": all_dates = sorted(days_data.keys()) start_date = date.fromisoformat(all_dates[0]) if all_dates else today - timedelta(days=6) end_date = today elif range_type == "custom": try: start_date = date.fromisoformat(request.args.get("start", "")) end_date = date.fromisoformat(request.args.get("end", "")) if end_date < start_date: start_date, end_date = end_date, start_date except ValueError: start_date, end_date = today - timedelta(days=6), today else: # 7d start_date, end_date = today - timedelta(days=6), today span = (end_date - start_date).days + 1 labels, slides_vals, courses_vals = [], [], [] for i in range(span): day = (start_date + timedelta(days=i)).isoformat() d = days_data.get(day, {}) labels.append(day) slides_vals.append(d.get("slides", 0)) courses_vals.append(d.get("courses", 0)) today_d = days_data.get(_today(), {}) return jsonify({ "labels": labels, "slides": slides_vals, "courses": courses_vals, "range": range_type, "today": { "slides": today_d.get("slides", 0), "courses": today_d.get("courses", 0), }, "total": { "slides": sum(d.get("slides", 0) for d in days_data.values()), "courses": sum(d.get("courses", 0) for d in days_data.values()), }, }) @app.route("/api/history") @login_required def history(): """Last 100 completed jobs, newest first.""" db = SessionLocal() try: rows = ( db.query(models.Job) .filter(models.Job.status == "complete") .order_by(models.Job.created_at.desc()) .limit(100) .all() ) return jsonify([_job_to_history_dict(j) for j in rows]) finally: db.close() @app.route("/api/tokens") @login_required def tokens(): """Total + per-day Anthropic token usage. Aggregates the per-call TokenUsage rows (new) AND the synthetic 'aggregate-legacy' rows imported from the old tokens.json (so historical totals survive the migration). """ db = SessionLocal() try: rows = ( db.query( func.date(models.TokenUsage.timestamp).label("day"), func.sum(models.TokenUsage.input_tokens).label("inp"), func.sum(models.TokenUsage.output_tokens).label("out"), ) .group_by("day") .order_by("day") .all() ) days = { r.day: {"input": int(r.inp or 0), "output": int(r.out or 0)} for r in rows } finally: db.close() total_input = sum(d["input"] for d in days.values()) total_output = sum(d["output"] for d in days.values()) return jsonify({ "total_input": total_input, "total_output": total_output, "days": days, }) @app.route("/api/template-inspect/") @login_required def template_inspect(name): """ Return the cached inspection report for a template, re-running the inspector if no cache exists yet (handles templates uploaded before the inspector existed). """ safe = Path(name).name pptx_path = UPLOAD / "templates" / safe if not pptx_path.exists(): return jsonify({"status": "fail", "summary": "Template not found", "meta": {}, "slides": []}), 404 cache_path = _inspection_cache_path(safe) if cache_path.exists(): return jsonify(_load_json(cache_path, {})) report = _inspect_and_cache(pptx_path) return jsonify(report) @app.route("/api/template-thumb/") @login_required def template_thumb(name): """ Extract the slide-1 thumbnail baked into a PPTX file by PowerPoint (`docProps/thumbnail.jpeg/.jpg/.png`) and serve it as an image. Returns 404 if the PPTX has no embedded thumbnail (rare — PowerPoint saves one by default; only files produced by raw python-pptx or similar tools may be missing it). """ import io import zipfile safe = Path(name).name # strip any directory-traversal attempt pptx_path = UPLOAD / "templates" / safe if not pptx_path.exists() or pptx_path.suffix.lower() != ".pptx": return ("", 404) try: with zipfile.ZipFile(pptx_path) as z: for candidate in ( "docProps/thumbnail.jpeg", "docProps/thumbnail.jpg", "docProps/thumbnail.png", ): if candidate in z.namelist(): data = z.read(candidate) mime = "image/png" if candidate.endswith(".png") else "image/jpeg" return send_file(io.BytesIO(data), mimetype=mime, download_name=Path(candidate).name) return ("", 404) except Exception: return ("", 500) # ── Admin: user management ──────────────────────────────────────────────────── @app.route("/api/users", methods=["GET"]) @admin_required def list_users(): db = SessionLocal() try: rows = db.query(models.User).order_by(models.User.created_at.desc()).all() return jsonify([{ "id": u.id, "email": u.email, "name": u.name, "is_admin": u.is_admin, "is_active": u.is_active, "created_at": u.created_at.isoformat() if u.created_at else None, "last_login_at": u.last_login_at.isoformat() if u.last_login_at else None, } for u in rows]) finally: db.close() @app.route("/api/users", methods=["POST"]) @admin_required def create_user(): data = request.get_json(force=True) or {} email = (data.get("email") or "").strip().lower() name = (data.get("name") or "").strip() password = data.get("password") or "" is_admin = bool(data.get("is_admin", False)) if not email or "@" not in email: return jsonify({"error": "A valid email is required."}), 400 if not name: return jsonify({"error": "Name is required."}), 400 if len(password) < 8: return jsonify({"error": "Password must be at least 8 characters."}), 400 db = SessionLocal() try: if db.query(models.User).filter_by(email=email).first(): return jsonify({"error": f"User {email!r} already exists."}), 409 u = models.User(email=email, name=name, is_admin=is_admin, is_active=True) u.set_password(password) db.add(u) db.commit() return jsonify({ "id": u.id, "email": u.email, "name": u.name, "is_admin": u.is_admin, }), 201 finally: db.close() @app.route("/api/users//password", methods=["POST"]) @admin_required def reset_user_password(uid: int): data = request.get_json(force=True) or {} new_password = data.get("password") or "" if len(new_password) < 8: return jsonify({"error": "Password must be at least 8 characters."}), 400 db = SessionLocal() try: u = db.get(models.User, uid) if u is None: return jsonify({"error": "User not found."}), 404 u.set_password(new_password) db.commit() return jsonify({"success": True}) finally: db.close() @app.route("/api/users/", methods=["DELETE"]) @admin_required def delete_user(uid: int): # Refuse to delete the calling admin themselves. if g.user.id == uid: return jsonify({"error": "You cannot delete your own account."}), 400 db = SessionLocal() try: u = db.get(models.User, uid) if u is None: return jsonify({"error": "User not found."}), 404 db.delete(u) db.commit() return jsonify({"success": True}) finally: db.close() # ═══════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": # Load .env for API key try: from dotenv import load_dotenv load_dotenv(ROOT / ".env") except ImportError: pass import socket host_ip = "127.0.0.1" try: # Best-effort local IP detection so teammates can find the URL s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) host_ip = s.getsockname()[0] s.close() except Exception: pass print("ProjectX Dashboard") print(f" Local: http://127.0.0.1:5000") print(f" LAN: http://{host_ip}:5000 <-- share this with teammates") print(f"[boot] _run_pipeline_job uses: _set_job") # confirms new code loaded # Local-dev convenience: auto-restart the server whenever a .py file # changes. Saves you from Ctrl+C / re-run after every Python edit. # Templates and static files already reload per-request, so they don't # need this. Production runs via systemd (not this `app.run` block), so # this flag has no effect on the production server. app.run(debug=False, host="0.0.0.0", port=5000, use_reloader=True, threaded=True)