2025-05-15 20:28:43 +00:00

211 lines
6.1 KiB
Python

from fastapi import APIRouter, Body, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import get_db
from app.dependencies.auth import get_current_active_user
from app.models.user import User
from app.schemas.auth import ChangePassword, Login
from app.schemas.password import PasswordReset, PasswordResetConfirm
from app.schemas.token import Token, TokenPayload, TokenRefresh
from app.schemas.user import User as UserSchema, UserCreate
from app.services.auth import (
authenticate_user,
create_tokens_for_user,
generate_password_reset_token,
reset_password,
verify_password_reset_token
)
from app.services.user import create_user
from app.utils.security import get_password_hash, verify_password
router = APIRouter()
@router.post("/login", response_model=Token)
def login(
db: Session = Depends(get_db),
form_data: OAuth2PasswordRequestForm = Depends()
):
"""
OAuth2 compatible token login, get an access token for future requests.
"""
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"}
)
tokens = create_tokens_for_user(user.id)
return tokens
@router.post("/login/access-token", response_model=Token)
def login_access_token(
login_data: Login,
db: Session = Depends(get_db)
):
"""
Get an access token for future requests.
"""
user = authenticate_user(db, login_data.email, login_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"}
)
tokens = create_tokens_for_user(user.id)
return tokens
@router.post("/refresh-token", response_model=Token)
def refresh_token(
refresh_token_data: TokenRefresh,
db: Session = Depends(get_db)
):
"""
Get a new access token using refresh token.
"""
try:
payload = jwt.decode(
refresh_token_data.refresh_token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
# Check if it's a refresh token
if not hasattr(token_data, "type") or token_data.type != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type",
headers={"WWW-Authenticate": "Bearer"}
)
except (JWTError, ValueError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"}
)
user = db.query(User).filter(User.id == token_data.sub).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"}
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
tokens = create_tokens_for_user(user.id)
return tokens
@router.post("/password-reset", status_code=status.HTTP_200_OK)
def request_password_reset(
password_reset: PasswordReset,
db: Session = Depends(get_db)
):
"""
Request a password reset token.
"""
token = generate_password_reset_token(db, email=password_reset.email)
# In a real application, you would send an email with the token
# For this example, we'll just return a success message
return {
"message": "Password reset link has been sent to your email"
}
@router.post("/password-reset/confirm", status_code=status.HTTP_200_OK)
def confirm_password_reset(
password_reset_confirm: PasswordResetConfirm,
db: Session = Depends(get_db)
):
"""
Reset password using a token.
"""
user = reset_password(
db,
token=password_reset_confirm.token,
new_password=password_reset_confirm.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired token"
)
return {
"message": "Password has been reset successfully"
}
@router.post("/password-change", status_code=status.HTTP_200_OK)
def change_password(
password_change: ChangePassword,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Change password.
"""
# Verify current password
if not verify_password(password_change.current_password, current_user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect password"
)
# Update password
current_user.hashed_password = get_password_hash(password_change.new_password)
db.add(current_user)
db.commit()
return {
"message": "Password has been changed successfully"
}
@router.post("/signup", response_model=Token, status_code=status.HTTP_201_CREATED)
def signup(
user_in: UserCreate,
db: Session = Depends(get_db)
):
"""
Create new user account and return access token.
"""
# Check if the user already exists
user = db.query(User).filter(User.email == user_in.email).first()
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
user = create_user(db, user_in=user_in)
if not user:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error creating user account"
)
# Generate tokens for the newly created user
tokens = create_tokens_for_user(user.id)
return tokens