kv-netflix/backend/security.py

141 lines
4.5 KiB
Python

"""
StreamFlow Security - HMAC Authentication
Implements request signing to prevent unauthorized API access
"""
import hmac
import hashlib
import time
from fastapi import Request, HTTPException, Security
from fastapi.security import APIKeyHeader
from starlette.middleware.base import BaseHTTPMiddleware
from config import settings
from logging_config import get_logger
logger = get_logger("security")
# Header definitions
signature_header = APIKeyHeader(name="X-Signature", auto_error=False)
timestamp_header = APIKeyHeader(name="X-Timestamp", auto_error=False)
# Request body cache key
REQUEST_BODY_KEY = "_cached_body"
class BodyCacheMiddleware(BaseHTTPMiddleware):
"""
Middleware to cache request body for HMAC verification.
This solves the issue of body consumption in FastAPI.
"""
async def dispatch(self, request: Request, call_next):
# Only cache body for methods that have a body
if request.method in ("POST", "PUT", "PATCH"):
body = await request.body()
# Store in request state for later access
request.state._cached_body = body
response = await call_next(request)
return response
def get_cached_body(request: Request) -> bytes:
"""Get the cached request body, or empty bytes if not cached."""
return getattr(request.state, REQUEST_BODY_KEY, b"")
async def verify_hmac(
request: Request,
signature: str = Security(signature_header),
timestamp: str = Security(timestamp_header)
) -> bool:
"""
Verify HMAC signature of the request.
Signature = HMAC_SHA256(secret, timestamp + path + method + body)
This provides:
- Authentication: Only clients with the secret can sign requests
- Integrity: Request content cannot be tampered with
- Replay protection: Timestamp prevents request reuse (5 min window)
Returns:
True if signature is valid
Raises:
HTTPException: 401 if authentication fails
"""
if not signature or not timestamp:
logger.warning("Authentication headers missing", extra={
"path": request.url.path,
"method": request.method
})
raise HTTPException(status_code=401, detail="Authentication headers missing")
# 1. Check timestamp (prevents replay attacks, 5 minute window)
try:
request_time = int(timestamp)
current_time = int(time.time())
time_diff = abs(current_time - request_time)
if time_diff > 300: # 5 minutes
logger.warning(f"Request expired: time_diff={time_diff}s", extra={
"path": request.url.path,
"request_time": request_time,
"current_time": current_time
})
raise HTTPException(status_code=401, detail="Request expired")
except ValueError:
logger.warning("Invalid timestamp format", extra={"timestamp": timestamp})
raise HTTPException(status_code=401, detail="Invalid timestamp")
# 2. Reconstruct payload including body for POST/PUT/PATCH
body = b""
if request.method in ("POST", "PUT", "PATCH"):
body = get_cached_body(request)
path = request.url.path
method = request.method
# Payload format: timestamp + path + method + body
payload = f"{timestamp}{path}{method}".encode() + body
# 3. Calculate expected signature
expected_signature = hmac.new(
settings.secret_key.encode(),
payload,
hashlib.sha256
).hexdigest()
# 4. Constant-time comparison (prevents timing attacks)
if not hmac.compare_digest(signature, expected_signature):
logger.warning("Invalid signature", extra={
"path": request.url.path,
"method": request.method
})
raise HTTPException(status_code=401, detail="Invalid signature")
return True
def generate_signature(timestamp: str, path: str, method: str, body: bytes = b"") -> str:
"""
Generate HMAC signature for a request.
This is useful for testing or client-side signature generation.
Args:
timestamp: Unix timestamp as string
path: Request path (e.g., "/api/catalog")
method: HTTP method (e.g., "GET", "POST")
body: Request body bytes (empty for GET)
Returns:
Hex-encoded HMAC-SHA256 signature
"""
payload = f"{timestamp}{path}{method}".encode() + body
return hmac.new(
settings.secret_key.encode(),
payload,
hashlib.sha256
).hexdigest()