60 lines
1.7 KiB
Python
60 lines
1.7 KiB
Python
import datetime
|
|
from typing import Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.secret import Secret
|
|
from app.schemas.secret import SecretCreate
|
|
from app.utils.security import encrypt_content, decrypt_content, generate_random_key
|
|
|
|
|
|
def create_secret(db: Session, secret: SecretCreate) -> Secret:
|
|
"""Create a new secret in the database."""
|
|
# Generate a unique access key
|
|
access_key = generate_random_key(32)
|
|
|
|
# Encrypt the content
|
|
encrypted_content, salt = encrypt_content(secret.content)
|
|
|
|
# Calculate expiration time
|
|
ttl_hours = secret.ttl_hours or 24 # Default to 24 hours
|
|
expires_at = datetime.datetime.utcnow() + datetime.timedelta(hours=ttl_hours)
|
|
|
|
# Create the secret
|
|
db_secret = Secret(
|
|
content=encrypted_content,
|
|
salt=salt,
|
|
access_key=access_key,
|
|
expires_at=expires_at,
|
|
)
|
|
|
|
db.add(db_secret)
|
|
db.commit()
|
|
db.refresh(db_secret)
|
|
|
|
return db_secret
|
|
|
|
|
|
def get_secret_by_access_key(db: Session, access_key: str) -> Optional[Secret]:
|
|
"""Get a secret by its access key."""
|
|
return db.query(Secret).filter(Secret.access_key == access_key).first()
|
|
|
|
|
|
def read_and_delete_secret(db: Session, access_key: str) -> Optional[str]:
|
|
"""
|
|
Read a secret by its access key, then delete it.
|
|
Returns the decrypted content or None if the secret doesn't exist or has been accessed.
|
|
"""
|
|
secret = get_secret_by_access_key(db, access_key)
|
|
|
|
if not secret or secret.is_accessed or secret.is_expired:
|
|
return None
|
|
|
|
# Decrypt the content
|
|
decrypted_content = decrypt_content(secret.content, secret.salt)
|
|
|
|
# Mark as accessed and delete the secret
|
|
db.delete(secret)
|
|
db.commit()
|
|
|
|
return decrypted_content |