notesapi-q3b8fl/app/api/v1/collections.py
Automated Action 741f301b11 Implement Notes API with FastAPI and SQLite
- Set up project structure with FastAPI
- Implement user authentication system with JWT tokens
- Create database models for users, notes, and collections
- Set up SQLAlchemy ORM and Alembic migrations
- Implement CRUD operations for notes and collections
- Add filtering and sorting capabilities for notes
- Implement health check endpoint
- Update project documentation
2025-05-31 14:54:14 +00:00

130 lines
3.8 KiB
Python

from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud
from app.api import deps
from app.models.user import User
from app.schemas.collection import (
Collection,
CollectionCreate,
CollectionUpdate,
CollectionWithNotes,
)
from app.schemas.note import Note
router = APIRouter()
@router.get("/", response_model=list[Collection])
def read_collections(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve collections for the current user.
"""
collections = crud.get_collections_by_owner(
db=db, owner_id=current_user.id, skip=skip, limit=limit
)
return collections
@router.post("/", response_model=Collection)
def create_collection(
*,
db: Session = Depends(deps.get_db),
collection_in: CollectionCreate,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new collection.
"""
collection = crud.create_collection(
db=db, obj_in=collection_in, owner_id=current_user.id
)
return collection
@router.put("/{id}", response_model=Collection)
def update_collection(
*,
db: Session = Depends(deps.get_db),
id: int,
collection_in: CollectionUpdate,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update a collection.
"""
collection = crud.get_collection(db=db, collection_id=id)
if not collection:
raise HTTPException(status_code=404, detail="Collection not found")
if collection.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Not enough permissions")
collection = crud.update_collection(db=db, db_obj=collection, obj_in=collection_in)
return collection
@router.get("/{id}", response_model=CollectionWithNotes)
def read_collection(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get collection by ID with all its notes.
"""
collection = crud.get_collection(db=db, collection_id=id)
if not collection:
raise HTTPException(status_code=404, detail="Collection not found")
if collection.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Not enough permissions")
return collection
@router.get("/{id}/notes", response_model=list[Note])
def read_collection_notes(
*,
db: Session = Depends(deps.get_db),
id: int,
skip: int = 0,
limit: int = 100,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get notes from a specific collection.
"""
collection = crud.get_collection(db=db, collection_id=id)
if not collection:
raise HTTPException(status_code=404, detail="Collection not found")
if collection.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Not enough permissions")
notes = crud.get_multi_by_collection(
db=db, owner_id=current_user.id, collection_id=id, skip=skip, limit=limit
)
return notes
@router.delete("/{id}", status_code=204, response_model=None)
def delete_collection(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete a collection.
"""
collection = crud.get_collection(db=db, collection_id=id)
if not collection:
raise HTTPException(status_code=404, detail="Collection not found")
if collection.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Not enough permissions")
crud.remove_collection(db=db, collection_id=id, owner_id=current_user.id)
return None