Automated Action 50b3fc513b Implement FastAPI user authentication service with MongoDB
- Set up FastAPI application with MongoDB Motor driver
- Implemented user registration, login, and logout with HTTP-only cookies
- Added JWT token authentication and password hashing
- Created user management endpoints for username updates and password changes
- Structured application with proper separation of concerns (models, schemas, services, routes)
- Added CORS configuration and health endpoints
- Documented API endpoints and environment variables in README
2025-06-20 14:19:13 +00:00

104 lines
3.7 KiB
Python

from typing import Optional
from datetime import datetime
from bson import ObjectId
from pymongo.errors import DuplicateKeyError
from app.db.connection import get_database
from app.models.user import User, UserInDB
from app.schemas.user import UserCreate, UserUpdate
from app.utils.security import get_password_hash, verify_password
class UserService:
def __init__(self):
self.collection_name = "users"
async def get_collection(self):
db = await get_database()
return db[self.collection_name]
async def create_user(self, user_data: UserCreate) -> Optional[UserInDB]:
try:
collection = await self.get_collection()
existing_user = await collection.find_one({"email": user_data.email})
if existing_user:
return None
existing_username = await collection.find_one({"username": user_data.username})
if existing_username:
return None
hashed_password = get_password_hash(user_data.password)
user_dict = {
"email": user_data.email,
"username": user_data.username,
"hashed_password": hashed_password,
"is_active": True,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
result = await collection.insert_one(user_dict)
user_dict["_id"] = result.inserted_id
return UserInDB(**user_dict)
except DuplicateKeyError:
return None
async def get_user_by_email(self, email: str) -> Optional[UserInDB]:
collection = await self.get_collection()
user_data = await collection.find_one({"email": email})
if user_data:
return UserInDB(**user_data)
return None
async def get_user_by_id(self, user_id: str) -> Optional[UserInDB]:
collection = await self.get_collection()
user_data = await collection.find_one({"_id": ObjectId(user_id)})
if user_data:
return UserInDB(**user_data)
return None
async def authenticate_user(self, email: str, password: str) -> Optional[UserInDB]:
user = await self.get_user_by_email(email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
async def update_username(self, user_id: str, new_username: str) -> Optional[UserInDB]:
collection = await self.get_collection()
existing_username = await collection.find_one({"username": new_username, "_id": {"$ne": ObjectId(user_id)}})
if existing_username:
return None
result = await collection.update_one(
{"_id": ObjectId(user_id)},
{"$set": {"username": new_username, "updated_at": datetime.utcnow()}}
)
if result.modified_count:
return await self.get_user_by_id(user_id)
return None
async def change_password(self, user_id: str, current_password: str, new_password: str) -> bool:
user = await self.get_user_by_id(user_id)
if not user:
return False
if not verify_password(current_password, user.hashed_password):
return False
collection = await self.get_collection()
new_hashed_password = get_password_hash(new_password)
result = await collection.update_one(
{"_id": ObjectId(user_id)},
{"$set": {"hashed_password": new_hashed_password, "updated_at": datetime.utcnow()}}
)
return result.modified_count > 0
user_service = UserService()