Automated Action 9ca9d35a1a Implement complete Urban Real Estate API backend
- Set up FastAPI project structure with modular architecture
- Create comprehensive database models for users, properties, messages, notifications, and payments
- Implement JWT-based authentication with role-based access control (Seeker, Agent, Landlord, Admin)
- Build property listings CRUD with advanced search and filtering capabilities
- Add dedicated affordable housing endpoints for Nigerian market focus
- Create real-time messaging system between users
- Implement admin dashboard with property approval workflow and analytics
- Add notification system for user alerts
- Integrate Paystack payment gateway for transactions
- Set up SQLite database with Alembic migrations
- Include comprehensive health check and API documentation
- Add proper error handling and validation throughout
- Follow FastAPI best practices with Pydantic schemas and dependency injection
2025-06-27 12:24:06 +00:00

64 lines
2.0 KiB
Python

from typing import List
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from app.auth.dependencies import get_db, get_current_active_user
from app.models.user import User
from app.models.notification import Notification
from app.schemas.notification import NotificationResponse
router = APIRouter(prefix="/api/notifications", tags=["Notifications"])
@router.get("/", response_model=List[NotificationResponse])
def get_user_notifications(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
unread_only: bool = Query(False),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
query = db.query(Notification).filter(Notification.user_id == current_user.id)
if unread_only:
query = query.filter(~Notification.is_read)
notifications = query.order_by(Notification.created_at.desc()).offset(skip).limit(limit).all()
return notifications
@router.put("/{notification_id}/read")
def mark_notification_as_read(
notification_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
notification = db.query(Notification).filter(
Notification.id == notification_id,
Notification.user_id == current_user.id
).first()
if not notification:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Notification not found"
)
notification.is_read = True
db.commit()
return {"message": "Notification marked as read"}
@router.put("/mark-all-read")
def mark_all_notifications_as_read(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
db.query(Notification).filter(
Notification.user_id == current_user.id,
~Notification.is_read
).update({"is_read": True})
db.commit()
return {"message": "All notifications marked as read"}