107 lines
2.5 KiB
Python
107 lines
2.5 KiB
Python
from datetime import datetime
|
|
from typing import Optional, Tuple
|
|
from sqlalchemy.orm import Session
|
|
from app.models.secret import Secret
|
|
from app.services.encryption import encrypt_secret, decrypt_secret
|
|
|
|
|
|
def create_secret(db: Session, content: str, ttl_hours: Optional[int] = None) -> Secret:
|
|
"""
|
|
Create a new secret in the database.
|
|
|
|
Args:
|
|
db: Database session
|
|
content: Secret content to store
|
|
ttl_hours: Time-to-live in hours
|
|
|
|
Returns:
|
|
The created Secret instance
|
|
"""
|
|
# Encrypt the content
|
|
encrypted_content = encrypt_secret(content)
|
|
|
|
# Create the secret
|
|
secret = Secret.create_with_ttl(content=encrypted_content, ttl_hours=ttl_hours)
|
|
|
|
# Add to database
|
|
db.add(secret)
|
|
db.commit()
|
|
db.refresh(secret)
|
|
|
|
return secret
|
|
|
|
|
|
def get_secret_by_id(db: Session, secret_id: str) -> Optional[Secret]:
|
|
"""
|
|
Get a secret by its ID.
|
|
|
|
Args:
|
|
db: Database session
|
|
secret_id: Secret ID
|
|
|
|
Returns:
|
|
The Secret instance if found, None otherwise
|
|
"""
|
|
return db.query(Secret).filter(Secret.id == secret_id).first()
|
|
|
|
|
|
def retrieve_and_delete_secret(db: Session, secret_id: str) -> Tuple[Optional[str], str]:
|
|
"""
|
|
Retrieve a secret and then delete it.
|
|
|
|
Args:
|
|
db: Database session
|
|
secret_id: Secret ID
|
|
|
|
Returns:
|
|
Tuple of (decrypted_content, message)
|
|
"""
|
|
# Get the secret
|
|
secret = get_secret_by_id(db, secret_id)
|
|
|
|
# Check if it exists
|
|
if not secret:
|
|
return None, "Secret not found"
|
|
|
|
# Check if it's expired
|
|
if secret.is_expired():
|
|
# Delete the expired secret
|
|
db.delete(secret)
|
|
db.commit()
|
|
return None, "Secret has expired"
|
|
|
|
# Check if it's already been viewed
|
|
if secret.is_viewed:
|
|
return None, "Secret has already been viewed"
|
|
|
|
# Decrypt the content
|
|
decrypted_content = decrypt_secret(secret.content)
|
|
|
|
# Delete the secret
|
|
db.delete(secret)
|
|
db.commit()
|
|
|
|
return decrypted_content, "Secret retrieved successfully"
|
|
|
|
|
|
def cleanup_expired_secrets(db: Session) -> int:
|
|
"""
|
|
Delete all expired secrets from the database.
|
|
|
|
Args:
|
|
db: Database session
|
|
|
|
Returns:
|
|
Number of secrets deleted
|
|
"""
|
|
now = datetime.utcnow()
|
|
expired_secrets = db.query(Secret).filter(Secret.expires_at < now).all()
|
|
|
|
count = len(expired_secrets)
|
|
|
|
for secret in expired_secrets:
|
|
db.delete(secret)
|
|
|
|
db.commit()
|
|
|
|
return count |