Implement SkillSync AI-Powered Resume & Job Match Hub backend

- Complete FastAPI backend with SQLite database
- AI-powered resume parsing and job matching using OpenAI
- JWT authentication with role-based access control
- Resume upload, job management, and matching endpoints
- Recruiter dashboard with candidate ranking
- Analytics and skill gap analysis features
- Comprehensive API documentation with OpenAPI
- Alembic database migrations
- File upload support for PDF, DOCX, and TXT resumes
- CORS enabled for frontend integration

🤖 Generated with BackendIM

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Automated Action 2025-06-27 14:58:12 +00:00
parent 40204019ad
commit 3d6b44a6e6
39 changed files with 2360 additions and 2 deletions

213
README.md
View File

@ -1,3 +1,212 @@
# FastAPI Application # SkillSync - AI Resume & Job Match Hub
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform. SkillSync is an AI-powered backend API that helps job seekers match their resumes with job descriptions, provides skill gap analysis, resume improvement suggestions, and generates personalized cover letters. It also includes a recruiter dashboard for candidate ranking and management.
## Features
### Core Features
- **Resume Upload & Parsing**: Upload PDF, DOCX, or TXT resume files with AI-powered text extraction
- **Job Description Analysis**: AI analysis of job postings to extract required skills and requirements
- **AI Matching & Scoring**: Intelligent matching between resumes and jobs with detailed scoring
- **Skill Gap Analysis**: Identify missing skills and get suggestions for improvement
- **Resume Suggestions**: AI-powered recommendations to improve resume content
- **Cover Letter Generation**: Personalized cover letter creation based on resume and job match
- **Recruiter Dashboard**: Advanced candidate ranking and job management for recruiters
- **Analytics & Tracking**: User improvement tracking and engagement analytics
### Technical Features
- **FastAPI Framework**: Modern, fast web framework for building APIs
- **SQLite Database**: Lightweight, serverless database with SQLAlchemy ORM
- **JWT Authentication**: Secure user authentication with role-based access control
- **OpenAI Integration**: Powered by GPT models for intelligent text analysis
- **File Upload Support**: Secure file handling with validation
- **CORS Enabled**: Cross-origin resource sharing for frontend integration
- **Comprehensive API Documentation**: Auto-generated OpenAPI/Swagger docs
## Tech Stack
- **Backend**: Python 3.9+ with FastAPI
- **Database**: SQLite with SQLAlchemy ORM
- **AI/ML**: OpenAI GPT-3.5/4 for text analysis and generation
- **Authentication**: JWT tokens with bcrypt password hashing
- **File Processing**: PyPDF2, python-docx for document parsing
- **API Documentation**: Automatic OpenAPI/Swagger documentation
## Installation & Setup
### Prerequisites
- Python 3.9 or higher
- pip package manager
### Local Development Setup
1. **Clone the repository**
```bash
git clone <repository-url>
cd airesumejobmatchingapi-5g56kf
```
2. **Install dependencies**
```bash
pip install -r requirements.txt
```
3. **Set up environment variables**
Create a `.env` file in the root directory:
```env
SECRET_KEY=your-super-secret-key-change-in-production
OPENAI_API_KEY=your-openai-api-key
```
4. **Run database migrations**
```bash
alembic upgrade head
```
5. **Start the development server**
```bash
uvicorn main:app --reload --host 0.0.0.0 --port 8000
```
### Production Deployment
For production deployment, ensure you set the following environment variables:
- `SECRET_KEY`: A strong secret key for JWT token signing
- `OPENAI_API_KEY`: Your OpenAI API key for AI functionality
## API Documentation
Once the server is running, you can access:
- **Interactive API Documentation**: http://localhost:8000/docs
- **Alternative API Documentation**: http://localhost:8000/redoc
- **OpenAPI Schema**: http://localhost:8000/openapi.json
- **Health Check**: http://localhost:8000/health
## API Endpoints
### Authentication
- `POST /api/v1/auth/register` - User registration
- `POST /api/v1/auth/login` - User login
### Resume Management
- `POST /api/v1/resumes/upload` - Upload and parse resume
- `GET /api/v1/resumes/` - Get user's resumes
- `GET /api/v1/resumes/{resume_id}` - Get specific resume
- `PUT /api/v1/resumes/{resume_id}` - Update resume
- `DELETE /api/v1/resumes/{resume_id}` - Delete resume
### Job Management
- `POST /api/v1/jobs/` - Create job posting (recruiters only)
- `GET /api/v1/jobs/` - Get all active jobs with filters
- `GET /api/v1/jobs/my-jobs` - Get recruiter's jobs
- `GET /api/v1/jobs/{job_id}` - Get specific job
- `PUT /api/v1/jobs/{job_id}` - Update job posting
- `DELETE /api/v1/jobs/{job_id}` - Delete job posting
### AI Matching
- `POST /api/v1/matching/analyze` - Analyze resume-job match
- `GET /api/v1/matching/` - Get user's matches
- `GET /api/v1/matching/{match_id}` - Get specific match
- `POST /api/v1/matching/{match_id}/cover-letter` - Generate cover letter
### Recruiter Dashboard
- `GET /api/v1/dashboard/candidates/{job_id}` - Get ranked candidates for job
- `GET /api/v1/dashboard/jobs/stats` - Get job statistics
- `GET /api/v1/dashboard/overview` - Get dashboard overview
### Analytics
- `GET /api/v1/analytics/user-stats` - Get user analytics
- `GET /api/v1/analytics/skill-gaps` - Get skill gap analysis
- `GET /api/v1/analytics/improvement-suggestions` - Get improvement suggestions
- `POST /api/v1/analytics/track-event` - Track custom events
## User Roles
### Applicant (Default)
- Upload and manage resumes
- Search and view job postings
- Get AI-powered job matching and scoring
- Receive skill gap analysis and resume suggestions
- Generate personalized cover letters
- Track improvement analytics
### Recruiter
- All applicant features
- Create and manage job postings
- Access recruiter dashboard
- View ranked candidates for jobs
- Get job performance statistics
### Admin
- All recruiter and applicant features
- System administration capabilities
## Database Schema
The application uses SQLite with the following main tables:
- `users` - User accounts and authentication
- `resumes` - Resume data and metadata
- `jobs` - Job postings and requirements
- `matches` - Resume-job matches with AI scoring
- `skill_gaps` - Identified skill gaps from matches
- `analytics` - User activity and improvement tracking
## Environment Variables
### Required Variables
- `OPENAI_API_KEY`: Your OpenAI API key for AI functionality
### Optional Variables (with defaults)
- `SECRET_KEY`: JWT secret key (default: change in production)
- `ACCESS_TOKEN_EXPIRE_MINUTES`: Token expiration time (default: 30)
- `MAX_FILE_SIZE`: Maximum upload file size in bytes (default: 10MB)
## File Storage
The application stores uploaded files in `/app/storage/` with the following structure:
- `/app/storage/db/` - SQLite database file
- `/app/storage/uploads/` - Uploaded resume files
## Development
### Code Style
The project uses Ruff for linting and code formatting:
```bash
ruff check .
ruff format .
```
### Testing
Run tests with pytest:
```bash
pytest
```
### Database Migrations
Create new migrations:
```bash
alembic revision --autogenerate -m "Description"
```
Apply migrations:
```bash
alembic upgrade head
```
## Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Run tests and linting
5. Submit a pull request
## License
This project is licensed under the MIT License.
## Support
For support and questions, please create an issue in the repository.

108
alembic.ini Normal file
View File

@ -0,0 +1,108 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses
# os.pathsep. If this key is omitted entirely, it falls back to the legacy
# behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

58
alembic/env.py Normal file
View File

@ -0,0 +1,58 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
import os
import sys
# Add the project root to the Python path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import your models here
from app.db.base import Base
# this is the Alembic Config object
config = context.config
# Interpret the config file for Python logging.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
alembic/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,140 @@
"""Initial migration
Revision ID: 001
Revises:
Create Date: 2024-01-01 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create users table
op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('hashed_password', sa.String(), nullable=False),
sa.Column('full_name', sa.String(), nullable=False),
sa.Column('role', sa.Enum('APPLICANT', 'RECRUITER', 'ADMIN', name='userrole'), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
# Create jobs table
op.create_table('jobs',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('recruiter_id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(), nullable=False),
sa.Column('company', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=False),
sa.Column('requirements', sa.Text(), nullable=True),
sa.Column('location', sa.String(), nullable=True),
sa.Column('job_type', sa.String(), nullable=True),
sa.Column('salary_range', sa.String(), nullable=True),
sa.Column('required_skills', sa.JSON(), nullable=True),
sa.Column('preferred_skills', sa.JSON(), nullable=True),
sa.Column('experience_level', sa.String(), nullable=True),
sa.Column('education_requirement', sa.String(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['recruiter_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_jobs_id'), 'jobs', ['id'], unique=False)
# Create resumes table
op.create_table('resumes',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(), nullable=False),
sa.Column('file_path', sa.String(), nullable=True),
sa.Column('original_filename', sa.String(), nullable=True),
sa.Column('extracted_text', sa.Text(), nullable=True),
sa.Column('parsed_data', sa.JSON(), nullable=True),
sa.Column('skills', sa.JSON(), nullable=True),
sa.Column('experience_years', sa.Integer(), nullable=True),
sa.Column('education_level', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_resumes_id'), 'resumes', ['id'], unique=False)
# Create analytics table
op.create_table('analytics',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('event_type', sa.String(), nullable=False),
sa.Column('event_data', sa.JSON(), nullable=True),
sa.Column('improvement_score', sa.Float(), nullable=True),
sa.Column('session_id', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_analytics_id'), 'analytics', ['id'], unique=False)
# Create matches table
op.create_table('matches',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('resume_id', sa.Integer(), nullable=False),
sa.Column('job_id', sa.Integer(), nullable=False),
sa.Column('match_score', sa.Float(), nullable=False),
sa.Column('skill_match_score', sa.Float(), nullable=True),
sa.Column('experience_match_score', sa.Float(), nullable=True),
sa.Column('education_match_score', sa.Float(), nullable=True),
sa.Column('overall_feedback', sa.Text(), nullable=True),
sa.Column('resume_suggestions', sa.JSON(), nullable=True),
sa.Column('cover_letter', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['job_id'], ['jobs.id'], ),
sa.ForeignKeyConstraint(['resume_id'], ['resumes.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_matches_id'), 'matches', ['id'], unique=False)
# Create skill_gaps table
op.create_table('skill_gaps',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('match_id', sa.Integer(), nullable=False),
sa.Column('missing_skill', sa.String(), nullable=False),
sa.Column('importance', sa.String(), nullable=True),
sa.Column('suggestion', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.ForeignKeyConstraint(['match_id'], ['matches.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_skill_gaps_id'), 'skill_gaps', ['id'], unique=False)
def downgrade() -> None:
op.drop_index(op.f('ix_skill_gaps_id'), table_name='skill_gaps')
op.drop_table('skill_gaps')
op.drop_index(op.f('ix_matches_id'), table_name='matches')
op.drop_table('matches')
op.drop_index(op.f('ix_analytics_id'), table_name='analytics')
op.drop_table('analytics')
op.drop_index(op.f('ix_resumes_id'), table_name='resumes')
op.drop_table('resumes')
op.drop_index(op.f('ix_jobs_id'), table_name='jobs')
op.drop_table('jobs')
op.drop_index(op.f('ix_users_id'), table_name='users')
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

0
app/api/v1/__init__.py Normal file
View File

199
app/api/v1/analytics.py Normal file
View File

@ -0,0 +1,199 @@
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from sqlalchemy import func, desc
from app.core.deps import get_db, get_current_active_user
from app.models.user import User
from app.models.analytics import Analytics
from app.models.match import Match
router = APIRouter()
@router.get("/user-stats")
def get_user_analytics(
days: int = Query(30, ge=1, le=365),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get user analytics and improvement tracking"""
start_date = datetime.utcnow() - timedelta(days=days)
# Activity stats
activities = db.query(
Analytics.event_type,
func.count(Analytics.id).label("count")
).filter(
Analytics.user_id == current_user.id,
Analytics.created_at >= start_date
).group_by(Analytics.event_type).all()
activity_stats = {activity.event_type: activity.count for activity in activities}
# Match score improvement over time
matches = db.query(Match).filter(
Match.user_id == current_user.id,
Match.created_at >= start_date
).order_by(Match.created_at).all()
improvement_trend = []
if matches:
# Group matches by week
weekly_scores = {}
for match in matches:
week_key = match.created_at.strftime("%Y-W%U")
if week_key not in weekly_scores:
weekly_scores[week_key] = []
weekly_scores[week_key].append(match.match_score)
# Calculate average score per week
for week, scores in weekly_scores.items():
improvement_trend.append({
"week": week,
"avg_score": round(sum(scores) / len(scores), 2),
"match_count": len(scores)
})
# Best and recent matches
best_matches = db.query(Match).filter(
Match.user_id == current_user.id
).order_by(desc(Match.match_score)).limit(5).all()
recent_matches = db.query(Match).filter(
Match.user_id == current_user.id
).order_by(desc(Match.created_at)).limit(5).all()
return {
"period_days": days,
"activity_stats": activity_stats,
"improvement_trend": improvement_trend,
"total_matches": len(matches),
"avg_match_score": round(sum(m.match_score for m in matches) / len(matches), 2) if matches else 0,
"best_matches": [
{
"match_id": m.id,
"score": m.match_score,
"created_at": m.created_at
} for m in best_matches
],
"recent_matches": [
{
"match_id": m.id,
"score": m.match_score,
"created_at": m.created_at
} for m in recent_matches
]
}
@router.get("/skill-gaps")
def get_skill_gap_analysis(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get skill gap analysis across all matches"""
from app.models.match import SkillGap
# Get all skill gaps for user
skill_gaps = db.query(
SkillGap.missing_skill,
SkillGap.importance,
func.count(SkillGap.id).label("frequency")
).join(Match).filter(
Match.user_id == current_user.id
).group_by(
SkillGap.missing_skill,
SkillGap.importance
).order_by(desc(func.count(SkillGap.id))).all()
# Group by importance
skill_analysis = {
"required": [],
"preferred": [],
"other": []
}
for gap in skill_gaps:
category = gap.importance if gap.importance in ["required", "preferred"] else "other"
skill_analysis[category].append({
"skill": gap.missing_skill,
"frequency": gap.frequency
})
# Overall skill recommendations
top_skills = [gap.missing_skill for gap in skill_gaps[:10]]
return {
"skill_gaps_by_importance": skill_analysis,
"top_missing_skills": top_skills,
"total_unique_gaps": len(skill_gaps)
}
@router.get("/improvement-suggestions")
def get_improvement_suggestions(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get personalized improvement suggestions"""
# Get recent matches with low scores
low_score_matches = db.query(Match).filter(
Match.user_id == current_user.id,
Match.match_score < 70
).order_by(desc(Match.created_at)).limit(10).all()
# Analyze common issues
common_suggestions = {}
for match in low_score_matches:
if match.resume_suggestions:
for suggestion in match.resume_suggestions:
if isinstance(suggestion, dict):
section = suggestion.get("section", "general")
if section not in common_suggestions:
common_suggestions[section] = []
common_suggestions[section].append(suggestion.get("suggestion", ""))
# Get skill gap patterns
from app.models.match import SkillGap
frequent_gaps = db.query(
SkillGap.missing_skill,
func.count(SkillGap.id).label("count")
).join(Match).filter(
Match.user_id == current_user.id
).group_by(SkillGap.missing_skill).order_by(
desc(func.count(SkillGap.id))
).limit(5).all()
return {
"priority_improvements": [
{
"area": "Skills",
"suggestion": f"Focus on learning {gap.missing_skill}",
"frequency": gap.count,
"impact": "high" if gap.count >= 3 else "medium"
} for gap in frequent_gaps
],
"resume_improvements": common_suggestions,
"overall_recommendation": "Focus on the most frequently missing skills to improve your match scores."
}
@router.post("/track-event")
def track_analytics_event(
event_type: str,
event_data: Optional[Dict[str, Any]] = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Track custom analytics event"""
analytics = Analytics(
user_id=current_user.id,
event_type=event_type,
event_data=event_data or {}
)
db.add(analytics)
db.commit()
return {"message": "Event tracked successfully"}

56
app/api/v1/auth.py Normal file
View File

@ -0,0 +1,56 @@
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.security import create_access_token, verify_password, get_password_hash
from app.core.deps import get_db
from app.models.user import User
from app.schemas.user import UserCreate, UserResponse, Token
router = APIRouter()
@router.post("/register", response_model=UserResponse)
def register(user: UserCreate, db: Session = Depends(get_db)):
"""Register a new user"""
# Check if user already exists
existing_user = db.query(User).filter(User.email == user.email).first()
if existing_user:
raise HTTPException(
status_code=400,
detail="Email already registered"
)
# Create new user
hashed_password = get_password_hash(user.password)
db_user = User(
email=user.email,
hashed_password=hashed_password,
full_name=user.full_name,
role=user.role
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.post("/login", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
"""Login and get access token"""
user = db.query(User).filter(User.email == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
subject=user.email, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

154
app/api/v1/dashboard.py Normal file
View File

@ -0,0 +1,154 @@
from fastapi import APIRouter, Depends, Query, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import func, desc
from app.core.deps import get_db, get_current_recruiter
from app.models.user import User
from app.models.job import Job
from app.models.match import Match
from app.models.resume import Resume
router = APIRouter()
@router.get("/candidates/{job_id}")
def get_candidates_for_job(
job_id: int,
min_score: float = Query(0, ge=0, le=100),
limit: int = Query(50, ge=1, le=100),
current_user: User = Depends(get_current_recruiter),
db: Session = Depends(get_db)
):
"""Get ranked candidates for a specific job"""
# Verify job belongs to recruiter
job = db.query(Job).filter(
Job.id == job_id,
Job.recruiter_id == current_user.id
).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
# Get matches for this job, ordered by score
matches = db.query(Match).join(Resume).join(User).filter(
Match.job_id == job_id,
Match.match_score >= min_score
).order_by(desc(Match.match_score)).limit(limit).all()
# Format response with candidate info
candidates = []
for match in matches:
resume = db.query(Resume).filter(Resume.id == match.resume_id).first()
user = db.query(User).filter(User.id == match.user_id).first()
candidates.append({
"match_id": match.id,
"candidate_name": user.full_name,
"candidate_email": user.email,
"resume_title": resume.title,
"match_score": match.match_score,
"skill_match_score": match.skill_match_score,
"experience_match_score": match.experience_match_score,
"education_match_score": match.education_match_score,
"skills": resume.skills,
"experience_years": resume.experience_years,
"education_level": resume.education_level,
"created_at": match.created_at
})
return {
"job_title": job.title,
"job_company": job.company,
"total_candidates": len(candidates),
"candidates": candidates
}
@router.get("/jobs/stats")
def get_job_stats(
current_user: User = Depends(get_current_recruiter),
db: Session = Depends(get_db)
):
"""Get statistics for recruiter's jobs"""
# Get job stats
job_stats = db.query(
Job.id,
Job.title,
Job.company,
Job.created_at,
func.count(Match.id).label("total_matches"),
func.avg(Match.match_score).label("avg_match_score"),
func.max(Match.match_score).label("best_match_score")
).outerjoin(Match).filter(
Job.recruiter_id == current_user.id
).group_by(Job.id).all()
return [
{
"job_id": stat.id,
"job_title": stat.title,
"company": stat.company,
"created_at": stat.created_at,
"total_matches": stat.total_matches or 0,
"avg_match_score": round(stat.avg_match_score or 0, 2),
"best_match_score": stat.best_match_score or 0
}
for stat in job_stats
]
@router.get("/overview")
def get_dashboard_overview(
current_user: User = Depends(get_current_recruiter),
db: Session = Depends(get_db)
):
"""Get overview statistics for recruiter dashboard"""
# Total jobs
total_jobs = db.query(Job).filter(Job.recruiter_id == current_user.id).count()
# Active jobs
active_jobs = db.query(Job).filter(
Job.recruiter_id == current_user.id,
Job.is_active
).count()
# Total matches across all jobs
total_matches = db.query(Match).join(Job).filter(
Job.recruiter_id == current_user.id
).count()
# High-quality matches (score >= 80)
high_quality_matches = db.query(Match).join(Job).filter(
Job.recruiter_id == current_user.id,
Match.match_score >= 80
).count()
# Recent matches (last 7 days)
from datetime import datetime, timedelta
recent_matches = db.query(Match).join(Job).filter(
Job.recruiter_id == current_user.id,
Match.created_at >= datetime.utcnow() - timedelta(days=7)
).count()
# Top performing job
top_job = db.query(
Job.title,
func.count(Match.id).label("match_count"),
func.avg(Match.match_score).label("avg_score")
).outerjoin(Match).filter(
Job.recruiter_id == current_user.id
).group_by(Job.id).order_by(
desc(func.count(Match.id))
).first()
return {
"total_jobs": total_jobs,
"active_jobs": active_jobs,
"total_matches": total_matches,
"high_quality_matches": high_quality_matches,
"recent_matches": recent_matches,
"top_performing_job": {
"title": top_job.title if top_job else None,
"match_count": top_job.match_count if top_job else 0,
"avg_score": round(top_job.avg_score or 0, 2) if top_job else 0
}
}

141
app/api/v1/jobs.py Normal file
View File

@ -0,0 +1,141 @@
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.core.deps import get_db, get_current_recruiter
from app.models.user import User
from app.models.job import Job
from app.schemas.job import JobResponse, JobCreate, JobUpdate
from app.services.ai_service import AIService
router = APIRouter()
ai_service = AIService()
@router.post("/", response_model=JobResponse)
async def create_job(
job: JobCreate,
current_user: User = Depends(get_current_recruiter),
db: Session = Depends(get_db)
):
"""Create a new job posting"""
# Analyze job description with AI
job_analysis = await ai_service.analyze_job_description(job.description)
# Create job record
db_job = Job(
recruiter_id=current_user.id,
title=job.title,
company=job.company,
description=job.description,
requirements=job.requirements,
location=job.location,
job_type=job.job_type,
salary_range=job.salary_range,
required_skills=job_analysis.get("required_skills", job.required_skills),
preferred_skills=job_analysis.get("preferred_skills", job.preferred_skills),
experience_level=job_analysis.get("experience_level", job.experience_level),
education_requirement=job_analysis.get("education_requirement", job.education_requirement)
)
db.add(db_job)
db.commit()
db.refresh(db_job)
return db_job
@router.get("/", response_model=List[JobResponse])
def get_jobs(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
search: Optional[str] = Query(None),
location: Optional[str] = Query(None),
job_type: Optional[str] = Query(None),
db: Session = Depends(get_db)
):
"""Get all active job postings with filtering"""
query = db.query(Job).filter(Job.is_active)
if search:
query = query.filter(
Job.title.contains(search) |
Job.description.contains(search) |
Job.company.contains(search)
)
if location:
query = query.filter(Job.location.contains(location))
if job_type:
query = query.filter(Job.job_type == job_type)
return query.offset(skip).limit(limit).all()
@router.get("/my-jobs", response_model=List[JobResponse])
def get_my_jobs(
current_user: User = Depends(get_current_recruiter),
db: Session = Depends(get_db)
):
"""Get jobs created by current recruiter"""
return db.query(Job).filter(Job.recruiter_id == current_user.id).all()
@router.get("/{job_id}", response_model=JobResponse)
def get_job(
job_id: int,
db: Session = Depends(get_db)
):
"""Get specific job"""
job = db.query(Job).filter(Job.id == job_id, Job.is_active).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@router.put("/{job_id}", response_model=JobResponse)
def update_job(
job_id: int,
job_update: JobUpdate,
current_user: User = Depends(get_current_recruiter),
db: Session = Depends(get_db)
):
"""Update job posting"""
job = db.query(Job).filter(
Job.id == job_id,
Job.recruiter_id == current_user.id
).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
for field, value in job_update.dict(exclude_unset=True).items():
setattr(job, field, value)
db.commit()
db.refresh(job)
return job
@router.delete("/{job_id}")
def delete_job(
job_id: int,
current_user: User = Depends(get_current_recruiter),
db: Session = Depends(get_db)
):
"""Delete job posting"""
job = db.query(Job).filter(
Job.id == job_id,
Job.recruiter_id == current_user.id
).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
db.delete(job)
db.commit()
return {"message": "Job deleted successfully"}

208
app/api/v1/matching.py Normal file
View File

@ -0,0 +1,208 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.core.deps import get_db, get_current_active_user
from app.models.user import User
from app.models.resume import Resume
from app.models.job import Job
from app.models.match import Match, SkillGap
from app.schemas.match import MatchResponse, MatchRequest
from app.services.ai_service import AIService
from app.models.analytics import Analytics
router = APIRouter()
ai_service = AIService()
@router.post("/analyze", response_model=MatchResponse)
async def analyze_match(
match_request: MatchRequest,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Analyze match between resume and job description"""
# Verify resume belongs to user
resume = db.query(Resume).filter(
Resume.id == match_request.resume_id,
Resume.user_id == current_user.id
).first()
if not resume:
raise HTTPException(status_code=404, detail="Resume not found")
# Get job
job = db.query(Job).filter(
Job.id == match_request.job_id,
Job.is_active
).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
# Check if match already exists
existing_match = db.query(Match).filter(
Match.user_id == current_user.id,
Match.resume_id == match_request.resume_id,
Match.job_id == match_request.job_id
).first()
if existing_match:
return existing_match
# Prepare data for AI analysis
resume_data = {
"skills": resume.skills or [],
"experience_years": resume.experience_years,
"education_level": resume.education_level,
"parsed_data": resume.parsed_data or {}
}
job_data = {
"required_skills": job.required_skills or [],
"preferred_skills": job.preferred_skills or [],
"experience_level": job.experience_level,
"education_requirement": job.education_requirement,
"description": job.description
}
# Calculate match score using AI
match_analysis = await ai_service.calculate_match_score(resume_data, job_data)
# Generate resume suggestions
suggestions = await ai_service.generate_resume_suggestions(
resume_data, job_data, match_analysis
)
# Create match record
match = Match(
user_id=current_user.id,
resume_id=match_request.resume_id,
job_id=match_request.job_id,
match_score=match_analysis.get("overall_score", 0),
skill_match_score=match_analysis.get("skill_match_score", 0),
experience_match_score=match_analysis.get("experience_match_score", 0),
education_match_score=match_analysis.get("education_match_score", 0),
overall_feedback=match_analysis.get("overall_feedback", ""),
resume_suggestions=suggestions
)
db.add(match)
db.commit()
db.refresh(match)
# Create skill gap records
missing_skills = match_analysis.get("missing_skills", [])
for skill_data in missing_skills:
skill_gap = SkillGap(
match_id=match.id,
missing_skill=skill_data.get("skill", ""),
importance=skill_data.get("importance", ""),
suggestion=skill_data.get("suggestion", "")
)
db.add(skill_gap)
db.commit()
# Log analytics
analytics = Analytics(
user_id=current_user.id,
event_type="job_match",
event_data={
"resume_id": match_request.resume_id,
"job_id": match_request.job_id,
"match_score": match.match_score
},
improvement_score=match.match_score
)
db.add(analytics)
db.commit()
# Refresh to get skill gaps
db.refresh(match)
return match
@router.get("/", response_model=List[MatchResponse])
def get_matches(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get all matches for current user"""
return db.query(Match).filter(Match.user_id == current_user.id).all()
@router.get("/{match_id}", response_model=MatchResponse)
def get_match(
match_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get specific match"""
match = db.query(Match).filter(
Match.id == match_id,
Match.user_id == current_user.id
).first()
if not match:
raise HTTPException(status_code=404, detail="Match not found")
return match
@router.post("/{match_id}/cover-letter")
async def generate_cover_letter(
match_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Generate cover letter for specific match"""
match = db.query(Match).filter(
Match.id == match_id,
Match.user_id == current_user.id
).first()
if not match:
raise HTTPException(status_code=404, detail="Match not found")
# Get resume and job data
resume = db.query(Resume).filter(Resume.id == match.resume_id).first()
job = db.query(Job).filter(Job.id == match.job_id).first()
resume_data = {
"skills": resume.skills or [],
"experience_years": resume.experience_years,
"education_level": resume.education_level,
"parsed_data": resume.parsed_data or {}
}
job_data = {
"title": job.title,
"company": job.company,
"description": job.description,
"required_skills": job.required_skills or [],
"preferred_skills": job.preferred_skills or []
}
# Generate cover letter
cover_letter = await ai_service.generate_cover_letter(
resume_data, job_data, current_user.full_name
)
# Update match with cover letter
match.cover_letter = cover_letter
db.commit()
# Log analytics
analytics = Analytics(
user_id=current_user.id,
event_type="cover_letter_generate",
event_data={
"match_id": match_id,
"job_id": match.job_id
}
)
db.add(analytics)
db.commit()
return {"cover_letter": cover_letter}

136
app/api/v1/resumes.py Normal file
View File

@ -0,0 +1,136 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from sqlalchemy.orm import Session
from app.core.deps import get_db, get_current_active_user
from app.models.user import User
from app.models.resume import Resume
from app.schemas.resume import ResumeResponse, ResumeUpdate
from app.services.file_service import FileService
from app.services.resume_parser import ResumeParser
from app.services.ai_service import AIService
router = APIRouter()
file_service = FileService()
resume_parser = ResumeParser()
ai_service = AIService()
@router.post("/upload", response_model=ResumeResponse)
async def upload_resume(
title: str,
file: UploadFile = File(...),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Upload and parse resume file"""
# Save file
file_path, original_filename = await file_service.save_file(file, current_user.id)
# Extract text from file
extracted_text = resume_parser.extract_text(file_path)
if not extracted_text:
# Clean up file if parsing failed
file_service.delete_file(file_path)
raise HTTPException(
status_code=400,
detail="Failed to extract text from file"
)
# Analyze resume with AI
parsed_data = await ai_service.analyze_resume(extracted_text)
# Create resume record
resume = Resume(
user_id=current_user.id,
title=title,
file_path=file_path,
original_filename=original_filename,
extracted_text=extracted_text,
parsed_data=parsed_data,
skills=parsed_data.get("skills", []),
experience_years=parsed_data.get("experience_years"),
education_level=parsed_data.get("education_level")
)
db.add(resume)
db.commit()
db.refresh(resume)
return resume
@router.get("/", response_model=List[ResumeResponse])
def get_resumes(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get all resumes for current user"""
return db.query(Resume).filter(Resume.user_id == current_user.id).all()
@router.get("/{resume_id}", response_model=ResumeResponse)
def get_resume(
resume_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get specific resume"""
resume = db.query(Resume).filter(
Resume.id == resume_id,
Resume.user_id == current_user.id
).first()
if not resume:
raise HTTPException(status_code=404, detail="Resume not found")
return resume
@router.put("/{resume_id}", response_model=ResumeResponse)
def update_resume(
resume_id: int,
resume_update: ResumeUpdate,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update resume"""
resume = db.query(Resume).filter(
Resume.id == resume_id,
Resume.user_id == current_user.id
).first()
if not resume:
raise HTTPException(status_code=404, detail="Resume not found")
for field, value in resume_update.dict(exclude_unset=True).items():
setattr(resume, field, value)
db.commit()
db.refresh(resume)
return resume
@router.delete("/{resume_id}")
def delete_resume(
resume_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Delete resume"""
resume = db.query(Resume).filter(
Resume.id == resume_id,
Resume.user_id == current_user.id
).first()
if not resume:
raise HTTPException(status_code=404, detail="Resume not found")
# Delete file
if resume.file_path:
file_service.delete_file(resume.file_path)
db.delete(resume)
db.commit()
return {"message": "Resume deleted successfully"}

11
app/api/v1/router.py Normal file
View File

@ -0,0 +1,11 @@
from fastapi import APIRouter
from app.api.v1 import auth, resumes, jobs, matching, dashboard, analytics
api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["Authentication"])
api_router.include_router(resumes.router, prefix="/resumes", tags=["Resumes"])
api_router.include_router(jobs.router, prefix="/jobs", tags=["Jobs"])
api_router.include_router(matching.router, prefix="/matching", tags=["Matching"])
api_router.include_router(dashboard.router, prefix="/dashboard", tags=["Dashboard"])
api_router.include_router(analytics.router, prefix="/analytics", tags=["Analytics"])

26
app/core/config.py Normal file
View File

@ -0,0 +1,26 @@
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
APP_NAME: str = "SkillSync - AI Resume & Job Match Hub"
APP_VERSION: str = "1.0.0"
SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
ALGORITHM: str = "HS256"
# OpenAI Configuration
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
# CORS settings
CORS_ORIGINS: list = ["*"]
# File upload settings
MAX_FILE_SIZE: int = 10 * 1024 * 1024 # 10MB
ALLOWED_FILE_EXTENSIONS: list = [".pdf", ".docx", ".txt"]
class Config:
env_file = ".env"
settings = Settings()

52
app/core/deps.py Normal file
View File

@ -0,0 +1,52 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer
from jose import jwt, JWTError
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import get_db
from app.models.user import User
from app.schemas.user import TokenData
security = HTTPBearer()
def get_current_user(
db: Session = Depends(get_db),
token: str = Depends(security)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token.credentials, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.email == token_data.email).first()
if user is None:
raise credentials_exception
return user
def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_recruiter(current_user: User = Depends(get_current_active_user)) -> User:
from app.models.user import UserRole
if current_user.role != UserRole.RECRUITER and current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user

29
app/core/security.py Normal file
View File

@ -0,0 +1,29 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

0
app/db/__init__.py Normal file
View File

3
app/db/base.py Normal file
View File

@ -0,0 +1,3 @@
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

23
app/db/session.py Normal file
View File

@ -0,0 +1,23 @@
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

5
app/models/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .user import User
from .resume import Resume
from .job import Job
from .match import Match, SkillGap
from .analytics import Analytics

19
app/models/analytics.py Normal file
View File

@ -0,0 +1,19 @@
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, JSON, Float
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class Analytics(Base):
__tablename__ = "analytics"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
event_type = Column(String, nullable=False) # resume_upload, job_match, cover_letter_generate
event_data = Column(JSON, nullable=True) # Additional event details
improvement_score = Column(Float, nullable=True) # Track improvement over time
session_id = Column(String, nullable=True) # Track user sessions
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
user = relationship("User", back_populates="analytics")

29
app/models/job.py Normal file
View File

@ -0,0 +1,29 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, JSON, Boolean
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class Job(Base):
__tablename__ = "jobs"
id = Column(Integer, primary_key=True, index=True)
recruiter_id = Column(Integer, ForeignKey("users.id"), nullable=False)
title = Column(String, nullable=False)
company = Column(String, nullable=False)
description = Column(Text, nullable=False)
requirements = Column(Text, nullable=True)
location = Column(String, nullable=True)
job_type = Column(String, nullable=True) # full-time, part-time, contract
salary_range = Column(String, nullable=True)
required_skills = Column(JSON, nullable=True) # Extracted required skills
preferred_skills = Column(JSON, nullable=True) # Extracted preferred skills
experience_level = Column(String, nullable=True) # entry, mid, senior
education_requirement = Column(String, nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
recruiter = relationship("User", back_populates="jobs")
matches = relationship("Match", back_populates="job")

42
app/models/match.py Normal file
View File

@ -0,0 +1,42 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, JSON, Float
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class Match(Base):
__tablename__ = "matches"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
resume_id = Column(Integer, ForeignKey("resumes.id"), nullable=False)
job_id = Column(Integer, ForeignKey("jobs.id"), nullable=False)
match_score = Column(Float, nullable=False) # 0-100 percentage
skill_match_score = Column(Float, nullable=True)
experience_match_score = Column(Float, nullable=True)
education_match_score = Column(Float, nullable=True)
overall_feedback = Column(Text, nullable=True)
resume_suggestions = Column(JSON, nullable=True) # AI suggestions for resume improvement
cover_letter = Column(Text, nullable=True) # Generated cover letter
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
user = relationship("User", back_populates="matches")
resume = relationship("Resume", back_populates="matches")
job = relationship("Job", back_populates="matches")
skill_gaps = relationship("SkillGap", back_populates="match")
class SkillGap(Base):
__tablename__ = "skill_gaps"
id = Column(Integer, primary_key=True, index=True)
match_id = Column(Integer, ForeignKey("matches.id"), nullable=False)
missing_skill = Column(String, nullable=False)
importance = Column(String, nullable=True) # required, preferred, nice-to-have
suggestion = Column(Text, nullable=True) # How to acquire this skill
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
match = relationship("Match", back_populates="skill_gaps")

25
app/models/resume.py Normal file
View File

@ -0,0 +1,25 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, JSON
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class Resume(Base):
__tablename__ = "resumes"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
title = Column(String, nullable=False)
file_path = Column(String, nullable=True)
original_filename = Column(String, nullable=True)
extracted_text = Column(Text, nullable=True)
parsed_data = Column(JSON, nullable=True) # Structured resume data
skills = Column(JSON, nullable=True) # Extracted skills
experience_years = Column(Integer, nullable=True)
education_level = Column(String, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
user = relationship("User", back_populates="resumes")
matches = relationship("Match", back_populates="resume")

30
app/models/user.py Normal file
View File

@ -0,0 +1,30 @@
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Enum
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
import enum
class UserRole(enum.Enum):
APPLICANT = "applicant"
RECRUITER = "recruiter"
ADMIN = "admin"
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String, nullable=False)
role = Column(Enum(UserRole), default=UserRole.APPLICANT)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
resumes = relationship("Resume", back_populates="user")
jobs = relationship("Job", back_populates="recruiter")
matches = relationship("Match", back_populates="user")
analytics = relationship("Analytics", back_populates="user")

5
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .user import User, UserCreate, UserUpdate, UserResponse, Token
from .resume import Resume, ResumeCreate, ResumeUpdate, ResumeResponse
from .job import Job, JobCreate, JobUpdate, JobResponse
from .match import Match, MatchResponse, SkillGap, SkillGapResponse
from .analytics import Analytics, AnalyticsCreate, AnalyticsResponse

27
app/schemas/analytics.py Normal file
View File

@ -0,0 +1,27 @@
from typing import Optional, Dict, Any
from pydantic import BaseModel
from datetime import datetime
class AnalyticsBase(BaseModel):
event_type: str
event_data: Optional[Dict[str, Any]] = None
improvement_score: Optional[float] = None
session_id: Optional[str] = None
class AnalyticsCreate(AnalyticsBase):
pass
class AnalyticsResponse(AnalyticsBase):
id: int
user_id: int
created_at: datetime
class Config:
from_attributes = True
class Analytics(AnalyticsResponse):
pass

51
app/schemas/job.py Normal file
View File

@ -0,0 +1,51 @@
from typing import Optional, List
from pydantic import BaseModel
from datetime import datetime
class JobBase(BaseModel):
title: str
company: str
description: str
requirements: Optional[str] = None
location: Optional[str] = None
job_type: Optional[str] = None
salary_range: Optional[str] = None
required_skills: Optional[List[str]] = None
preferred_skills: Optional[List[str]] = None
experience_level: Optional[str] = None
education_requirement: Optional[str] = None
class JobCreate(JobBase):
pass
class JobUpdate(BaseModel):
title: Optional[str] = None
company: Optional[str] = None
description: Optional[str] = None
requirements: Optional[str] = None
location: Optional[str] = None
job_type: Optional[str] = None
salary_range: Optional[str] = None
required_skills: Optional[List[str]] = None
preferred_skills: Optional[List[str]] = None
experience_level: Optional[str] = None
education_requirement: Optional[str] = None
is_active: Optional[bool] = None
class JobResponse(JobBase):
id: int
recruiter_id: int
is_active: bool
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class Job(JobResponse):
pass

54
app/schemas/match.py Normal file
View File

@ -0,0 +1,54 @@
from typing import Optional, List, Dict, Any
from pydantic import BaseModel
from datetime import datetime
class SkillGapBase(BaseModel):
missing_skill: str
importance: Optional[str] = None
suggestion: Optional[str] = None
class SkillGapResponse(SkillGapBase):
id: int
match_id: int
created_at: datetime
class Config:
from_attributes = True
class SkillGap(SkillGapResponse):
pass
class MatchBase(BaseModel):
match_score: float
skill_match_score: Optional[float] = None
experience_match_score: Optional[float] = None
education_match_score: Optional[float] = None
overall_feedback: Optional[str] = None
resume_suggestions: Optional[List[Dict[str, Any]]] = None
cover_letter: Optional[str] = None
class MatchResponse(MatchBase):
id: int
user_id: int
resume_id: int
job_id: int
skill_gaps: Optional[List[SkillGapResponse]] = None
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class Match(MatchResponse):
pass
class MatchRequest(BaseModel):
resume_id: int
job_id: int

41
app/schemas/resume.py Normal file
View File

@ -0,0 +1,41 @@
from typing import Optional, List, Dict, Any
from pydantic import BaseModel
from datetime import datetime
class ResumeBase(BaseModel):
title: str
extracted_text: Optional[str] = None
parsed_data: Optional[Dict[str, Any]] = None
skills: Optional[List[str]] = None
experience_years: Optional[int] = None
education_level: Optional[str] = None
class ResumeCreate(ResumeBase):
pass
class ResumeUpdate(BaseModel):
title: Optional[str] = None
extracted_text: Optional[str] = None
parsed_data: Optional[Dict[str, Any]] = None
skills: Optional[List[str]] = None
experience_years: Optional[int] = None
education_level: Optional[str] = None
class ResumeResponse(ResumeBase):
id: int
user_id: int
file_path: Optional[str] = None
original_filename: Optional[str] = None
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class Resume(ResumeResponse):
pass

43
app/schemas/user.py Normal file
View File

@ -0,0 +1,43 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
from datetime import datetime
from app.models.user import UserRole
class UserBase(BaseModel):
email: EmailStr
full_name: str
role: UserRole = UserRole.APPLICANT
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
full_name: Optional[str] = None
password: Optional[str] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class User(UserResponse):
pass
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
email: Optional[str] = None

3
app/services/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .ai_service import AIService
from .resume_parser import ResumeParser
from .file_service import FileService

218
app/services/ai_service.py Normal file
View File

@ -0,0 +1,218 @@
import openai
from typing import Dict, List, Any
from app.core.config import settings
import json
openai.api_key = settings.OPENAI_API_KEY
class AIService:
def __init__(self):
self.client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)
async def analyze_resume(self, resume_text: str) -> Dict[str, Any]:
"""Extract structured data from resume text using AI"""
prompt = f"""
Analyze the following resume text and extract structured information:
{resume_text}
Please return a JSON object with the following structure:
{{
"skills": ["skill1", "skill2", ...],
"experience_years": number,
"education_level": "string",
"work_experience": [
{{
"company": "string",
"position": "string",
"duration": "string",
"description": "string"
}}
],
"education": [
{{
"institution": "string",
"degree": "string",
"field": "string",
"year": "string"
}}
],
"contact_info": {{
"email": "string",
"phone": "string",
"location": "string"
}}
}}
"""
try:
response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an expert resume analyzer. Return only valid JSON."},
{"role": "user", "content": prompt}
],
temperature=0.1
)
result = response.choices[0].message.content
return json.loads(result)
except Exception as e:
print(f"Error analyzing resume: {e}")
return {}
async def analyze_job_description(self, job_description: str) -> Dict[str, Any]:
"""Extract structured data from job description using AI"""
prompt = f"""
Analyze the following job description and extract structured information:
{job_description}
Please return a JSON object with the following structure:
{{
"required_skills": ["skill1", "skill2", ...],
"preferred_skills": ["skill1", "skill2", ...],
"experience_level": "entry/mid/senior",
"education_requirement": "string",
"key_responsibilities": ["resp1", "resp2", ...],
"company_benefits": ["benefit1", "benefit2", ...],
"job_type": "full-time/part-time/contract",
"remote_option": "yes/no/hybrid"
}}
"""
try:
response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an expert job description analyzer. Return only valid JSON."},
{"role": "user", "content": prompt}
],
temperature=0.1
)
result = response.choices[0].message.content
return json.loads(result)
except Exception as e:
print(f"Error analyzing job description: {e}")
return {}
async def calculate_match_score(
self, resume_data: Dict[str, Any], job_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Calculate match score between resume and job description"""
prompt = f"""
Calculate a match score between this resume and job description:
RESUME DATA:
{json.dumps(resume_data, indent=2)}
JOB DATA:
{json.dumps(job_data, indent=2)}
Please return a JSON object with the following structure:
{{
"overall_score": number (0-100),
"skill_match_score": number (0-100),
"experience_match_score": number (0-100),
"education_match_score": number (0-100),
"missing_skills": [
{{
"skill": "string",
"importance": "required/preferred",
"suggestion": "string"
}}
],
"strengths": ["strength1", "strength2", ...],
"weaknesses": ["weakness1", "weakness2", ...],
"overall_feedback": "detailed feedback string"
}}
"""
try:
response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an expert HR analyst. Provide accurate match scoring."},
{"role": "user", "content": prompt}
],
temperature=0.2
)
result = response.choices[0].message.content
return json.loads(result)
except Exception as e:
print(f"Error calculating match score: {e}")
return {"overall_score": 0, "skill_match_score": 0, "experience_match_score": 0, "education_match_score": 0}
async def generate_resume_suggestions(
self, resume_data: Dict[str, Any], job_data: Dict[str, Any], match_analysis: Dict[str, Any]
) -> List[Dict[str, str]]:
"""Generate suggestions for improving resume based on job requirements"""
prompt = f"""
Based on this resume and job analysis, provide specific suggestions for improving the resume:
RESUME: {json.dumps(resume_data, indent=2)}
JOB: {json.dumps(job_data, indent=2)}
MATCH ANALYSIS: {json.dumps(match_analysis, indent=2)}
Please return a JSON array of suggestions with this structure:
[
{{
"section": "skills/experience/education/summary",
"suggestion": "specific improvement suggestion",
"priority": "high/medium/low",
"impact": "explanation of how this helps"
}}
]
"""
try:
response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an expert resume coach. Provide actionable suggestions."},
{"role": "user", "content": prompt}
],
temperature=0.3
)
result = response.choices[0].message.content
return json.loads(result)
except Exception as e:
print(f"Error generating resume suggestions: {e}")
return []
async def generate_cover_letter(
self, resume_data: Dict[str, Any], job_data: Dict[str, Any], user_name: str
) -> str:
"""Generate a personalized cover letter"""
prompt = f"""
Generate a professional cover letter for {user_name} based on their resume and the job description:
RESUME: {json.dumps(resume_data, indent=2)}
JOB: {json.dumps(job_data, indent=2)}
The cover letter should:
- Be professional and engaging
- Highlight relevant skills and experiences
- Show enthusiasm for the role
- Be 3-4 paragraphs long
- Include a proper greeting and closing
"""
try:
response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an expert cover letter writer. Write compelling, professional cover letters."},
{"role": "user", "content": prompt}
],
temperature=0.4
)
return response.choices[0].message.content
except Exception as e:
print(f"Error generating cover letter: {e}")
return "Unable to generate cover letter at this time."

View File

@ -0,0 +1,63 @@
import os
import uuid
from pathlib import Path
from fastapi import UploadFile, HTTPException
from app.core.config import settings
class FileService:
def __init__(self):
self.upload_dir = Path("/app/storage/uploads")
self.upload_dir.mkdir(parents=True, exist_ok=True)
def validate_file(self, file: UploadFile) -> bool:
"""Validate uploaded file"""
# Check file size
if file.size and file.size > settings.MAX_FILE_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum size is {settings.MAX_FILE_SIZE / (1024*1024)}MB"
)
# Check file extension
if file.filename:
file_extension = Path(file.filename).suffix.lower()
if file_extension not in settings.ALLOWED_FILE_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"File type not allowed. Allowed types: {', '.join(settings.ALLOWED_FILE_EXTENSIONS)}"
)
return True
async def save_file(self, file: UploadFile, user_id: int) -> tuple[str, str]:
"""Save uploaded file and return file path and filename"""
self.validate_file(file)
# Generate unique filename
file_extension = Path(file.filename).suffix.lower() if file.filename else ""
unique_filename = f"{user_id}_{uuid.uuid4().hex}{file_extension}"
file_path = self.upload_dir / unique_filename
# Save file
try:
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
return str(file_path), file.filename or unique_filename
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to save file: {str(e)}"
)
def delete_file(self, file_path: str) -> bool:
"""Delete file from storage"""
try:
if os.path.exists(file_path):
os.remove(file_path)
return True
return False
except Exception:
return False

View File

@ -0,0 +1,58 @@
import PyPDF2
import docx
from typing import Optional
from pathlib import Path
class ResumeParser:
@staticmethod
def extract_text_from_pdf(file_path: str) -> Optional[str]:
"""Extract text from PDF file"""
try:
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text.strip()
except Exception as e:
print(f"Error extracting PDF text: {e}")
return None
@staticmethod
def extract_text_from_docx(file_path: str) -> Optional[str]:
"""Extract text from DOCX file"""
try:
doc = docx.Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text.strip()
except Exception as e:
print(f"Error extracting DOCX text: {e}")
return None
@staticmethod
def extract_text_from_txt(file_path: str) -> Optional[str]:
"""Extract text from TXT file"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read().strip()
except Exception as e:
print(f"Error extracting TXT text: {e}")
return None
@classmethod
def extract_text(cls, file_path: str) -> Optional[str]:
"""Extract text from file based on extension"""
file_extension = Path(file_path).suffix.lower()
if file_extension == '.pdf':
return cls.extract_text_from_pdf(file_path)
elif file_extension == '.docx':
return cls.extract_text_from_docx(file_path)
elif file_extension == '.txt':
return cls.extract_text_from_txt(file_path)
else:
print(f"Unsupported file format: {file_extension}")
return None

50
main.py Normal file
View File

@ -0,0 +1,50 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
from app.api.v1.router import api_router
from app.db.session import engine
from app.db.base import Base
# Create database tables
Base.metadata.create_all(bind=engine)
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
description="AI-Powered Resume & Job Match Hub - Helping job seekers find the perfect match",
openapi_url="/openapi.json"
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API routes
app.include_router(api_router, prefix="/api/v1")
@app.get("/")
async def root():
"""Root endpoint providing service information"""
return {
"service": settings.APP_NAME,
"version": settings.APP_VERSION,
"description": "AI-Powered Resume & Job Match Hub",
"documentation": "/docs",
"health_check": "/health"
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": settings.APP_NAME,
"version": settings.APP_VERSION
}

18
requirements.txt Normal file
View File

@ -0,0 +1,18 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
alembic==1.12.1
pydantic==2.5.0
pydantic-settings==2.1.0
python-multipart==0.0.6
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
httpx==0.25.2
openai==1.3.5
langchain==0.0.350
langchain-openai==0.0.2
PyPDF2==3.0.1
python-docx==1.1.0
ruff==0.1.6
pytest==7.4.3
pytest-asyncio==0.21.1