59 lines
1.9 KiB
Python
59 lines
1.9 KiB
Python
import hmac
|
|
import hashlib
|
|
import time
|
|
import os
|
|
from fastapi import Request, HTTPException, Security
|
|
from fastapi.security import APIKeyHeader
|
|
|
|
# In production, this should be an environment variable
|
|
# For now, we'll use a placeholder that the user can set
|
|
SECRET_KEY = os.getenv("STREAMFLIX_SECRET_KEY", "your-super-secret-key-change-this")
|
|
|
|
signature_header = APIKeyHeader(name="X-Signature", auto_error=False)
|
|
timestamp_header = APIKeyHeader(name="X-Timestamp", auto_error=False)
|
|
|
|
def verify_hmac(
|
|
request: Request,
|
|
signature: str = Security(signature_header),
|
|
timestamp: str = Security(timestamp_header)
|
|
):
|
|
"""
|
|
Verify HMAC signature of the request.
|
|
Signature = HMAC_SHA256(secret, timestamp + path + method + body)
|
|
"""
|
|
if not signature or not timestamp:
|
|
raise HTTPException(status_code=401, detail="Authentication headers missing")
|
|
|
|
# 1. Check timestamp (prevents replay attacks, 5 minute window)
|
|
try:
|
|
request_time = int(timestamp)
|
|
current_time = int(time.time())
|
|
if abs(current_time - request_time) > 300: # 5 minutes
|
|
raise HTTPException(status_code=401, detail="Request expired")
|
|
except ValueError:
|
|
raise HTTPException(status_code=401, detail="Invalid timestamp")
|
|
|
|
# 2. Reconstruct payload
|
|
# Note: For GET requests, body is empty
|
|
body = b""
|
|
if request.method in ["POST", "PUT", "PATCH"]:
|
|
# This is tricky in FastAPI as reading body consumes it
|
|
# We'll need to handle this carefully if we want to sign the body
|
|
pass
|
|
|
|
path = request.url.path
|
|
method = request.method
|
|
|
|
payload = f"{timestamp}{path}{method}".encode()
|
|
|
|
# 3. Calculate signature
|
|
expected_signature = hmac.new(
|
|
SECRET_KEY.encode(),
|
|
payload,
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
|
|
if not hmac.compare_digest(signature, expected_signature):
|
|
raise HTTPException(status_code=401, detail="Invalid signature")
|
|
|
|
return True
|