Automated Action 246b4e058e Implement comprehensive FastAPI backend for landing page
Added complete backend infrastructure with:
- Authentication system with OAuth (Google, GitHub, Apple)
- Stripe payment processing with subscription management
- Testimonials management API
- Usage statistics tracking
- Email communication services
- Health monitoring endpoints
- Database migrations with Alembic
- Comprehensive API documentation

All APIs are production-ready with proper error handling,
security measures, and environment variable configuration.

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-01 23:39:39 +00:00

149 lines
5.1 KiB
Python

from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from starlette.responses import RedirectResponse
import httpx
from app.db.session import get_db
from app.core.config import settings
from app.core.security import create_access_token
from app.schemas.user import User, UserCreate, Token
from app.services.user_service import UserService
from app.services.email_service import EmailService
from app.auth.oauth import oauth
router = APIRouter()
@router.post("/register", response_model=User)
async def register(
user: UserCreate,
db: Session = Depends(get_db)
):
user_service = UserService(db)
# Check if user already exists
if user_service.get_user_by_email(user.email):
raise HTTPException(
status_code=400,
detail="Email already registered"
)
# Create user
db_user = user_service.create_user(user)
# Send welcome email
email_service = EmailService()
await email_service.send_welcome_email(db_user.email, db_user.full_name or "User")
return db_user
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
user_service = UserService(db)
user = user_service.authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/oauth/{provider}")
async def oauth_login(provider: str, request: Request):
if provider not in ["google", "github", "apple"]:
raise HTTPException(status_code=400, detail="Invalid OAuth provider")
client = oauth.create_client(provider)
redirect_uri = f"{request.url.scheme}://{request.url.netloc}/api/v1/auth/oauth/{provider}/callback"
return await client.authorize_redirect(request, redirect_uri)
@router.get("/oauth/{provider}/callback")
async def oauth_callback(provider: str, request: Request, db: Session = Depends(get_db)):
if provider not in ["google", "github", "apple"]:
raise HTTPException(status_code=400, detail="Invalid OAuth provider")
client = oauth.create_client(provider)
token = await client.authorize_access_token(request)
user_service = UserService(db)
if provider == "google":
user_info = token.get('userinfo')
if user_info:
email = user_info['email']
name = user_info.get('name', '')
picture = user_info.get('picture', '')
provider_id = user_info['sub']
elif provider == "github":
# Get user info from GitHub API
async with httpx.AsyncClient() as client_http:
user_resp = await client_http.get(
'https://api.github.com/user',
headers={'Authorization': f"Bearer {token['access_token']}"}
)
user_data = user_resp.json()
# Get user email
email_resp = await client_http.get(
'https://api.github.com/user/emails',
headers={'Authorization': f"Bearer {token['access_token']}"}
)
emails = email_resp.json()
primary_email = next((e['email'] for e in emails if e['primary']), None)
email = primary_email or user_data.get('email')
name = user_data.get('name', '')
picture = user_data.get('avatar_url', '')
provider_id = str(user_data['id'])
elif provider == "apple":
# Apple token handling is more complex, simplified here
user_info = token.get('userinfo', {})
email = user_info.get('email', '')
name = user_info.get('name', '')
picture = ''
provider_id = user_info.get('sub', '')
# Check if user exists
existing_user = user_service.get_user_by_email(email)
if existing_user:
user = existing_user
else:
# Create new user
user = user_service.create_oauth_user(
email=email,
full_name=name,
provider=provider,
provider_id=provider_id,
avatar_url=picture
)
# Send welcome email
email_service = EmailService()
await email_service.send_welcome_email(user.email, user.full_name or "User")
# Create access token
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
# Redirect to frontend with token
return RedirectResponse(
url=f"{settings.frontend_url}/auth/callback?token={access_token}"
)