Automated Action 2de9589e3d Initial Travel App Backend Implementation
- FastAPI application with user management, destinations, and trip planning
- SQLite database with SQLAlchemy ORM
- Database models for Users, Destinations, and Trips
- Pydantic schemas for request/response validation
- Full CRUD API endpoints for all resources
- Alembic migrations setup
- Health check endpoint
- CORS configuration for development
- Comprehensive documentation and README
2025-06-20 01:25:27 +00:00

61 lines
1.9 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from app.db.session import get_db
from app.models.user import User
from app.schemas.user import User as UserSchema, UserCreate, UserUpdate
router = APIRouter()
def get_password_hash(password: str) -> str:
return f"hashed_{password}"
@router.get("/", response_model=List[UserSchema])
def get_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.get("/{user_id}", response_model=UserSchema)
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.post("/", response_model=UserSchema)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
hashed_password = get_password_hash(user.password)
db_user = User(
email=user.email, full_name=user.full_name, hashed_password=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.put("/{user_id}", response_model=UserSchema)
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
update_data = user.dict(exclude_unset=True)
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(db_user, field, value)
db.commit()
db.refresh(db_user)
return db_user