Automated Action b9798f0eaf Implement comprehensive crypto P2P trading platform
- Complete FastAPI application with JWT authentication
- SQLite database with SQLAlchemy ORM and Alembic migrations
- User registration/login with secure password hashing
- Multi-cryptocurrency wallet system with balance tracking
- Advertisement system for buy/sell listings with fund locking
- Order management with automatic payment integration
- Payment provider API integration with mock fallback
- Automatic crypto release after payment confirmation
- Health monitoring endpoint and CORS configuration
- Comprehensive API documentation with OpenAPI/Swagger
- Database models for users, wallets, ads, orders, and payments
- Complete CRUD operations for all entities
- Security features including fund locking and order expiration
- Detailed README with setup and usage instructions

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-26 14:48:18 +00:00

69 lines
2.2 KiB
Python

from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core import security
from app.core.config import settings
from app.core.deps import get_db
from app.models.user import User
from app.schemas.token import Token
from app.schemas.user import UserCreate, User as UserSchema
router = APIRouter()
@router.post("/login", response_model=Token)
def login_access_token(
db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
user = db.query(User).filter(User.email == form_data.username).first()
if not user or not security.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/register", response_model=UserSchema)
def register(
*,
db: Session = Depends(get_db),
user_in: UserCreate,
) -> Any:
user = db.query(User).filter(User.email == user_in.email).first()
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system.",
)
user = db.query(User).filter(User.username == user_in.username).first()
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system.",
)
user = User(
email=user_in.email,
username=user_in.username,
hashed_password=security.get_password_hash(user_in.password),
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user