
- Phone number authentication with OTP verification - Email/password authentication with secure bcrypt hashing - Third-party OAuth login support for Google and Apple - JWT token-based authentication system - Rate limiting for OTP requests (5/minute) - SQLite database with SQLAlchemy ORM - Comprehensive user model with multiple auth providers - Alembic database migrations setup - API documentation with Swagger/OpenAPI - Health check and system endpoints - Environment configuration with security best practices - Code quality with Ruff linting and formatting Features: - POST /auth/request-otp - Request OTP for phone authentication - POST /auth/verify-otp - Verify OTP and get access token - POST /auth/signup-email - Email signup with password - POST /auth/login-email - Email login authentication - POST /auth/login-google - Google OAuth integration - POST /auth/login-apple - Apple OAuth integration - GET /user/me - Get current authenticated user info - GET / - API information and documentation links - GET /health - Application health check
58 lines
1.6 KiB
Python
58 lines
1.6 KiB
Python
from sqlalchemy.orm import Session
|
|
from typing import Optional
|
|
from app.models.user import User, AuthProvider
|
|
from app.schemas.user import UserCreate
|
|
from app.utils.auth import get_password_hash
|
|
|
|
|
|
def get_user_by_email(db: Session, email: str) -> Optional[User]:
|
|
return db.query(User).filter(User.email == email).first()
|
|
|
|
|
|
def get_user_by_phone(db: Session, phone_number: str) -> Optional[User]:
|
|
return db.query(User).filter(User.phone_number == phone_number).first()
|
|
|
|
|
|
def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
|
|
return db.query(User).filter(User.id == user_id).first()
|
|
|
|
|
|
def create_user(db: Session, user: UserCreate) -> User:
|
|
db_user = User(
|
|
email=user.email,
|
|
phone_number=user.phone_number,
|
|
name=user.name,
|
|
auth_provider=user.auth_provider,
|
|
is_verified=True
|
|
if user.auth_provider in [AuthProvider.GOOGLE, AuthProvider.APPLE]
|
|
else False,
|
|
)
|
|
|
|
if user.password:
|
|
db_user.password_hash = get_password_hash(user.password)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
|
|
from app.utils.auth import verify_password
|
|
|
|
user = get_user_by_email(db, email)
|
|
if not user or not user.password_hash:
|
|
return None
|
|
if not verify_password(password, user.password_hash):
|
|
return None
|
|
return user
|
|
|
|
|
|
def verify_user_phone(db: Session, phone_number: str) -> Optional[User]:
|
|
user = get_user_by_phone(db, phone_number)
|
|
if user:
|
|
user.is_verified = True
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|