""" web/auth.py — authentication routes, decorators, and current-user helper. Phase 2: backend auth foundation. Routes registered under the ``auth`` blueprint: GET /login — render the login form POST /login — verify credentials, set session POST /logout — clear session GET /api/me — return the currently logged-in user (or 401) Decorators exposed for use in web/app.py: @login_required — redirects to /login (or 401 for /api/* paths) @admin_required — login_required + must have ``is_admin`` Sessions are server-signed cookies (Flask default). The signing key is set in web/app.py from the SESSION_SECRET env var (or an ephemeral key in dev with a warning). """ from __future__ import annotations import functools from datetime import datetime from typing import Callable from flask import ( Blueprint, g, jsonify, redirect, render_template, request, session, url_for, ) from web import models from web.db import SessionLocal auth_bp = Blueprint("auth", __name__) # ── current-user helpers ────────────────────────────────────────────────────── def load_current_user() -> "models.User | None": """Return the User attached to ``session['user_id']``, or None. Caller is responsible for closing any session it opens; this helper opens its own short-lived session. """ uid = session.get("user_id") if not uid: return None db = SessionLocal() try: user = db.get(models.User, uid) if user is None or not user.is_active: return None # Detach so the returned User survives the closed session. db.expunge(user) return user finally: db.close() # ── decorators ──────────────────────────────────────────────────────────────── def login_required(fn: Callable) -> Callable: """For HTML routes: redirect to /login if not authed. For /api/* paths: return 401 JSON. On success, stashes the user on ``flask.g.user`` so handlers can read it. """ @functools.wraps(fn) def wrapper(*args, **kwargs): user = load_current_user() if user is None: if request.path.startswith("/api/"): return jsonify({"error": "unauthorized"}), 401 return redirect(url_for("auth.login_get", next=request.path)) g.user = user return fn(*args, **kwargs) return wrapper def admin_required(fn: Callable) -> Callable: """login_required + must be admin.""" @functools.wraps(fn) def wrapper(*args, **kwargs): user = load_current_user() if user is None: return jsonify({"error": "unauthorized"}), 401 if not user.is_admin: return jsonify({"error": "forbidden"}), 403 g.user = user return fn(*args, **kwargs) return wrapper # ── routes ──────────────────────────────────────────────────────────────────── @auth_bp.route("/login", methods=["GET"]) def login_get(): """Render the login form. ``?next=`` preserves the originally-requested URL.""" return render_template("login.html", next=request.args.get("next", "/")) @auth_bp.route("/login", methods=["POST"]) def login_post(): """Accepts JSON ``{email, password}`` or form-encoded body. Returns 200 + user JSON on success, 401 on bad credentials. """ data = request.get_json(silent=True) or request.form.to_dict() email = (data.get("email") or "").strip().lower() password = data.get("password") or "" if not email or not password: return jsonify({"error": "Email and password are required."}), 400 db = SessionLocal() try: user = db.query(models.User).filter_by(email=email).first() if user is None or not user.is_active or not user.check_password(password): return jsonify({"error": "Invalid email or password."}), 401 session.clear() session["user_id"] = user.id session.permanent = True user.last_login_at = datetime.utcnow() db.commit() return jsonify({ "user": { "id": user.id, "email": user.email, "name": user.name, "is_admin": user.is_admin, }, "next": data.get("next") or request.args.get("next") or "/", }) finally: db.close() @auth_bp.route("/logout", methods=["POST"]) def logout(): session.clear() return jsonify({"success": True}) @auth_bp.route("/api/me") def me(): """Return the currently logged-in user, or 401.""" user = load_current_user() if user is None: return jsonify({"error": "unauthorized"}), 401 return jsonify({ "id": user.id, "email": user.email, "name": user.name, "is_admin": user.is_admin, })