Automated Action 48f0debd7b Implement user authentication flow with FastAPI
- Set up FastAPI application with SQLite database
- Create User model with email and password fields
- Implement JWT token-based authentication
- Add user registration and login endpoints
- Create protected user profile endpoints
- Configure Alembic for database migrations
- Add password hashing with bcrypt
- Include CORS middleware and health endpoint
- Update README with setup and usage instructions

Environment variables required:
- SECRET_KEY: JWT secret key for token signing
2025-06-27 09:18:50 +00:00

87 lines
3.1 KiB
Python

from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from jose import JWTError, jwt
from app.core.security import (
ALGORITHM,
SECRET_KEY,
create_access_token,
get_password_hash,
verify_password,
)
from app.db.session import get_db
from app.models.user import User
from app.schemas.user import Token, TokenData, UserCreate, User as UserSchema
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/v1/auth/login")
def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
def authenticate_user(db: Session, email: str, password: str):
user = get_user_by_email(db, email)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_user(db: Session, user: UserCreate):
hashed_password = get_password_hash(user.password)
db_user = User(
email=user.email,
hashed_password=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
user = get_user_by_email(db, email=token_data.email)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@router.post("/register", response_model=UserSchema)
def register(user: UserCreate, db: Session = Depends(get_db)):
db_user = get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return create_user(db=db, user=user)
@router.post("/login", response_model=Token)
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=30)
access_token = create_access_token(
subject=user.email, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}