265 lines
8.6 KiB
Python

from datetime import timedelta
from typing import Any, Dict
from fastapi import APIRouter, Body, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.api.deps import authenticate_user, get_current_user
from app.core.config import settings
from app.core.email import send_password_reset_email, send_verification_email
from app.core.security import create_access_token
from app.crud import user
from app.db.utils import get_db
from app.models.user import User
from app.schemas.token import (
EmailVerificationToken,
PasswordReset,
PasswordResetRequest,
PasswordResetToken,
Token,
)
from app.schemas.user import User as UserSchema
from app.schemas.user import UserCreate
router = APIRouter()
@router.post("/register", response_model=UserSchema)
def register_user(
*,
db: Session = Depends(get_db),
user_in: UserCreate,
) -> Any:
"""
Register a new user.
This endpoint allows creating a new user account with the following fields:
- email: The user's email address (must be unique)
- password: The user's password
- full_name: The user's full name (optional)
Upon successful registration:
- A new user account is created
- If email verification is enabled, a verification email is sent
- The user is created with is_active=True and is_verified=False
Notes:
- The password is securely hashed before storage
- Email addresses are unique, attempting to register with an existing email will fail
"""
# Check if user with this email already exists
db_user = user.get_by_email(db, email=user_in.email)
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The user with this email already exists in the system",
)
# Create new user
new_user = user.create(db, obj_in=user_in)
# Send verification email if emails are enabled
if settings.EMAILS_ENABLED:
send_verification_email(
email_to=new_user.email,
token=new_user.verification_token,
)
return new_user
@router.post("/verify-email", response_model=Dict[str, str])
def verify_email(
*,
db: Session = Depends(get_db),
token_in: EmailVerificationToken,
) -> Any:
"""
Verify a user's email address.
This endpoint verifies a user's email address using the token sent via email.
Parameters:
- token: The verification token from the email
Upon successful verification:
- The user's account is marked as verified (is_verified=True)
- The verification token is cleared
Notes:
- Tokens expire after the time specified in settings.VERIFICATION_TOKEN_EXPIRE_HOURS
- This endpoint should be called when a user clicks the verification link in their email
"""
verified_user = user.verify_email(db, token=token_in.token)
if not verified_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired verification token",
)
return {"msg": "Email verified successfully"}
@router.post("/login", response_model=Token)
def login_access_token(
db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login.
This endpoint follows the OAuth2 password flow standard:
- Accepts username (email) and password via form data
- Returns a JWT access token for authenticated users
Parameters:
- username: The user's email address (passed as 'username' in the OAuth2 form)
- password: The user's password
Returns:
- access_token: JWT token for authenticating future requests
- token_type: Token type (always "bearer")
Authentication requirements:
- Valid email and password combination
- User must be active (is_active=True)
- Email verification can be required by uncommenting the verification check
Notes:
- The access token expires after the time specified in settings.ACCESS_TOKEN_EXPIRE_MINUTES
- To use the token, include it in the Authorization header of future requests
as "Bearer {token}"
"""
authenticated_user = authenticate_user(db, form_data.username, form_data.password)
if not authenticated_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not authenticated_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user",
)
# Optionally check if email is verified
# if not authenticated_user.is_verified:
# raise HTTPException(
# status_code=status.HTTP_400_BAD_REQUEST,
# detail="Email not verified. Please check your email for verification instructions.",
# )
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(
authenticated_user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/password-reset-request", response_model=Dict[str, str])
def request_password_reset(
*,
db: Session = Depends(get_db),
reset_request: PasswordResetRequest,
) -> Any:
"""
Request a password reset.
This endpoint initiates the password reset process:
- Takes a user's email address
- Generates a password reset token if the email exists
- Sends a reset link to the user's email
Parameters:
- email: The email address of the account to reset
Security features:
- The endpoint always returns a success message, regardless of whether
the email exists (to prevent email enumeration)
- The reset token has a limited validity defined by settings.PASSWORD_RESET_TOKEN_EXPIRE_HOURS
Notes:
- Email sending must be enabled (settings.EMAILS_ENABLED=True) for the email to be sent
- The reset link includes a token that will be used in the reset-password endpoint
"""
# For security, we don't reveal if the email exists
reset_token = user.create_password_reset_token(db, email=reset_request.email)
if reset_token and settings.EMAILS_ENABLED:
send_password_reset_email(
email_to=reset_request.email,
token=reset_token,
)
return {"msg": "If your email is registered, you will receive a password reset link"}
@router.post("/reset-password", response_model=Dict[str, str])
def reset_password(
*,
db: Session = Depends(get_db),
reset_data: PasswordReset,
) -> Any:
"""
Reset a user's password.
This endpoint completes the password reset process:
- Takes the reset token and new password
- Validates the token
- Updates the user's password if the token is valid
Parameters:
- token: The password reset token received via email
- new_password: The new password to set
Upon successful reset:
- The user's password is updated (hashed and stored)
- The reset token is cleared
Security features:
- Tokens expire after the time specified in settings.PASSWORD_RESET_TOKEN_EXPIRE_HOURS
- Each token can only be used once
- Password is securely hashed before storage
"""
updated_user = user.reset_password(
db, token=reset_data.token, new_password=reset_data.new_password
)
if not updated_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired password reset token",
)
return {"msg": "Password updated successfully"}
@router.post("/test-token", response_model=Dict[str, Any])
def test_token(current_user: User = Depends(get_current_user)) -> Any:
"""
Test access token validity.
This endpoint allows checking if a JWT token is valid.
It requires authentication and returns information about the authenticated user.
Returns:
- msg: Confirmation message
- user_id: The ID of the authenticated user
- email: The email of the authenticated user
- is_verified: Whether the user's email is verified
Usage:
- Include the JWT token in the Authorization header as "Bearer {token}"
- A successful response indicates the token is valid
- If the token is invalid or expired, a 403 Forbidden error will be returned
"""
return {
"msg": "Token is valid",
"user_id": current_user.id,
"email": current_user.email,
"is_verified": current_user.is_verified,
}