from dataclasses import dataclass import jwt from fastapi import Depends, HTTPException, Request, status from app.core.config import settings @dataclass class AuthUser: """Authenticated user info extracted from JWT.""" id: str # Supabase user UUID email: str | None = None role: str | None = None def _extract_token(request: Request) -> str | None: """Extract Bearer token from Authorization header.""" auth_header = request.headers.get("Authorization") if not auth_header: return None parts = auth_header.split() if len(parts) != 2 or parts[0].lower() != "bearer": return None return parts[1] def _verify_jwt(token: str) -> dict | None: """Verify JWT against Supabase JWT secret. Returns payload or None.""" if not settings.supabase_jwt_secret: return None try: payload = jwt.decode( token, settings.supabase_jwt_secret, algorithms=["HS256"], audience="authenticated", ) return payload except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None def get_current_user(request: Request) -> AuthUser | None: """ Extract and verify the current user from the request. Returns AuthUser if valid token, None otherwise. """ token = _extract_token(request) if not token: return None payload = _verify_jwt(token) if not payload: return None # Supabase JWT has 'sub' as user ID user_id = payload.get("sub") if not user_id: return None return AuthUser( id=user_id, email=payload.get("email"), role=payload.get("role"), ) def require_auth(user: AuthUser | None = Depends(get_current_user)) -> AuthUser: """ Dependency that requires authentication. Raises 401 if no valid token is present. """ if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", headers={"WWW-Authenticate": "Bearer"}, ) return user