Implement user authentication system with JWT tokens

This commit is contained in:
Automated Action 2025-06-05 05:51:54 +00:00
parent 9ea1258a12
commit a07f19f5cc
9 changed files with 440 additions and 4 deletions

63
app/api/deps.py Normal file
View File

@ -0,0 +1,63 @@
from typing import Generator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import SessionLocal
from app.models.user import User
from app.schemas.token import TokenPayload
from app.services.user import get_user
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login"
)
def get_db() -> Generator:
try:
db = SessionLocal()
yield db
finally:
db.close()
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(reusable_oauth2)
) -> User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=["HS256"]
)
token_data = TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = get_user(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges",
)
return current_user

View File

@ -1,13 +1,14 @@
from fastapi import APIRouter
# Import individual routers here
# from app.api.v1.endpoints import users, auth, songs, albums, artists, playlists
from app.api.v1.endpoints import users, auth
# from app.api.v1.endpoints import songs, albums, artists, playlists
api_router = APIRouter()
# Include all routers
# api_router.include_router(users.router, prefix="/users", tags=["users"])
# api_router.include_router(auth.router, prefix="/auth", tags=["auth"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(auth.router, prefix="/auth", tags=["auth"])
# api_router.include_router(songs.router, prefix="/songs", tags=["songs"])
# api_router.include_router(albums.router, prefix="/albums", tags=["albums"])
# api_router.include_router(artists.router, prefix="/artists", tags=["artists"])

View File

@ -0,0 +1,73 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.api.deps import get_current_user, get_db
from app.core.config import settings
from app.core.security import create_access_token
from app.models.user import User
from app.schemas.token import Token
from app.schemas.user import User as UserSchema
from app.services.user import authenticate_user, get_user_by_email, create_user
from app.schemas.user import UserCreate
router = APIRouter()
@router.post("/login", response_model=Token)
def login_access_token(
db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests.
"""
user = authenticate_user(db, email=form_data.username, password=form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
elif not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/register", response_model=UserSchema)
def register_user(
*,
db: Session = Depends(get_db),
user_in: UserCreate,
) -> Any:
"""
Register a new user.
"""
user = get_user_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists.",
)
user = create_user(db, obj_in=user_in)
return user
@router.get("/me", response_model=UserSchema)
def read_users_me(
current_user: User = Depends(get_current_user),
) -> Any:
"""
Get current user.
"""
return current_user

View File

@ -0,0 +1,143 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.api.deps import get_current_active_superuser, get_current_active_user, get_db
from app.models.user import User
from app.schemas.user import User as UserSchema
from app.schemas.user import UserCreate, UserUpdate
from app.services.user import (
create_user,
delete_user,
get_user,
get_user_by_email,
get_users,
update_user,
)
router = APIRouter()
@router.get("/", response_model=List[UserSchema])
def read_users(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Retrieve users. Only superusers can retrieve all users.
"""
users = get_users(db, skip=skip, limit=limit)
return users
@router.post("/", response_model=UserSchema)
def create_new_user(
*,
db: Session = Depends(get_db),
user_in: UserCreate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Create new user. Only superusers can create new users.
"""
user = get_user_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists.",
)
user = create_user(db, obj_in=user_in)
return user
@router.get("/me", response_model=UserSchema)
def read_user_me(
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.put("/me", response_model=UserSchema)
def update_user_me(
*,
db: Session = Depends(get_db),
user_in: UserUpdate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Update own user.
"""
user = update_user(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/{user_id}", response_model=UserSchema)
def read_user_by_id(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get a specific user by id.
"""
user = get_user(db, id=user_id)
if user == current_user:
return user
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges",
)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return user
@router.put("/{user_id}", response_model=UserSchema)
def update_specific_user(
*,
db: Session = Depends(get_db),
user_id: int,
user_in: UserUpdate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Update a user. Only superusers can update other users.
"""
user = get_user(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
user = update_user(db, db_obj=user, obj_in=user_in)
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_specific_user(
*,
db: Session = Depends(get_db),
user_id: int,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Delete a user. Only superusers can delete users.
"""
user = get_user(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
delete_user(db, id=user_id)
return None

22
app/db/init_db.py Normal file
View File

@ -0,0 +1,22 @@
from sqlalchemy.orm import Session
from app.schemas.user import UserCreate
from app.services.user import get_user_by_email, create_user
# make sure all SQL Alchemy models are imported (app.db.base) before initializing DB
# otherwise, SQL Alchemy might fail to initialize relationships properly
# for more details: https://github.com/tiangolo/full-stack-fastapi-postgresql/issues/28
def init_db(db: Session) -> None:
# Create super user if it doesn't exist
user = get_user_by_email(db, email="admin@example.com")
if not user:
user_in = UserCreate(
email="admin@example.com",
username="admin",
password="adminpassword",
is_superuser=True,
)
create_user(db, obj_in=user_in)

11
app/schemas/token.py Normal file
View File

@ -0,0 +1,11 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

40
app/schemas/user.py Normal file
View File

@ -0,0 +1,40 @@
from typing import Optional
from pydantic import BaseModel, EmailStr, Field
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = None
is_active: Optional[bool] = True
is_superuser: bool = False
full_name: Optional[str] = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
password: str = Field(..., min_length=8)
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
class UserInDBBase(UserBase):
id: Optional[int] = None
class Config:
from_attributes = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

74
app/services/user.py Normal file
View File

@ -0,0 +1,74 @@
from typing import Any, Dict, Optional, Union, List
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
def get_user(db: Session, id: int) -> Optional[User]:
return db.query(User).filter(User.id == id).first()
def get_user_by_email(db: Session, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def get_user_by_username(db: Session, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
username=obj_in.username,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_active=obj_in.is_active,
is_superuser=obj_in.is_superuser,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update_user(
db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
for field in update_data:
if hasattr(db_obj, field):
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def delete_user(db: Session, *, id: int) -> None:
user = db.query(User).get(id)
if user:
db.delete(user)
db.commit()
def authenticate_user(db: Session, *, email: str, password: str) -> Optional[User]:
user = get_user_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user

11
main.py
View File

@ -1,8 +1,11 @@
from fastapi import FastAPI
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from app.api.v1.api import api_router
from app.core.config import settings
from app.api.deps import get_db
from app.db.init_db import init_db
app = FastAPI(
title=settings.PROJECT_NAME,
@ -30,6 +33,12 @@ app.include_router(api_router, prefix=settings.API_V1_STR)
async def health_check():
return {"status": "healthy"}
# Initialize database endpoint (normally would be in a separate command)
@app.get("/init-db", tags=["admin"])
def initialize_db(db: Session = Depends(get_db)):
init_db(db)
return {"message": "Database initialized successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)