from __future__ import annotations from fastapi import HTTPException, status from sqlalchemy import text from sqlalchemy.engine import Connection from core.auth.utils import create_access_token, hash_password, verify_password from config import settings class AuthService: def __init__(self, db: Connection): self._db = db def login(self, username: str, password: str) -> dict: row = self._db.execute( text("SELECT * FROM users WHERE username = :u"), {"u": username} ).fetchone() if row is None or not verify_password(password, row.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"code": "UNAUTHORIZED", "message": "Invalid credentials"}, ) user = dict(row._mapping) self._db.execute( text("UPDATE users SET last_login = datetime('now') WHERE id = :id"), {"id": user["id"]}, ) token = create_access_token(user["id"], user["username"], user["role"]) return { "access_token": token, "token_type": "bearer", "expires_in": settings.jwt_expire_hours * 3600, "user": {"id": user["id"], "username": user["username"], "role": user["role"]}, } def create_user(self, username: str, password: str, role: str = "viewer") -> dict: existing = self._db.execute( text("SELECT id FROM users WHERE username = :u"), {"u": username} ).fetchone() if existing: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail={"code": "CONFLICT", "message": f"Username '{username}' already taken"}, ) self._db.execute( text( "INSERT INTO users (username, password_hash, role) VALUES (:u, :ph, :r)" ), {"u": username, "ph": hash_password(password), "r": role}, ) row = self._db.execute( text("SELECT id, username, role, created_at FROM users WHERE username = :u"), {"u": username}, ).fetchone() return dict(row._mapping) def change_password(self, user_id: int, current_password: str, new_password: str) -> None: row = self._db.execute( text("SELECT password_hash FROM users WHERE id = :id"), {"id": user_id}, ).fetchone() if row is None or not verify_password(current_password, row.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"code": "UNAUTHORIZED", "message": "Current password is incorrect"}, ) self._db.execute( text("UPDATE users SET password_hash = :ph WHERE id = :id"), {"ph": hash_password(new_password), "id": user_id}, ) def list_users(self) -> list[dict]: rows = self._db.execute( text("SELECT id, username, role, created_at, last_login FROM users ORDER BY id") ).fetchall() return [dict(r._mapping) for r in rows] def delete_user(self, user_id: int, requesting_user_id: int) -> None: if user_id == requesting_user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={"code": "VALIDATION_ERROR", "message": "Cannot delete yourself"}, ) self._db.execute( text("DELETE FROM users WHERE id = :id"), {"id": user_id}, ) def seed_admin_if_empty(self) -> str | None: """ Create a default admin user if no users exist. Returns the generated password (printed to stdout on startup). """ count = self._db.execute(text("SELECT COUNT(*) FROM users")).fetchone()[0] if count > 0: return None import secrets password = secrets.token_urlsafe(16) self.create_user("admin", password, "admin") return password