apix/backend/services/meta_video_client.py
Khoa.vo 4050f4c853
Some checks are pending
CI / build (18.x) (push) Waiting to run
CI / build (20.x) (push) Waiting to run
v3.1.0: Integrate Meta AI video generation, remove redundant wrapper
- Add backend/services/meta_video_client.py (direct video generation)
- Update routers/meta.py with working /meta/video endpoint
- Delete services/metaai-api/ (~1500 lines removed)
- Simplify docker-compose.yml to single container
- No external service dependencies for Meta AI
2026-01-13 08:02:36 +07:00

432 lines
16 KiB
Python

"""
Meta AI Video Generation Client for FastAPI
Ported from services/metaai-api/src/metaai_api/video_generation.py
Integrated directly into the FastAPI backend to eliminate separate service.
"""
import httpx
import json
import time
import uuid
import re
import asyncio
from typing import Dict, List, Optional, Any
GRAPHQL_URL = "https://www.meta.ai/api/graphql/"
META_AI_BASE = "https://www.meta.ai"
class MetaVideoResult:
def __init__(self, url: str, prompt: str, conversation_id: str):
self.url = url
self.prompt = prompt
self.conversation_id = conversation_id
def to_dict(self) -> Dict[str, Any]:
return {
"url": self.url,
"prompt": self.prompt,
"conversation_id": self.conversation_id
}
class MetaVideoClient:
"""
Async client for Meta AI video generation.
Handles session tokens, video creation requests, and polling for results.
"""
def __init__(self, cookies: str):
"""
Initialize the video client with cookies.
Args:
cookies: Cookie string or JSON array of cookies
"""
self.cookies_str = self._normalize_cookies(cookies)
self.cookies_dict = self._parse_cookies(self.cookies_str)
self.lsd: Optional[str] = None
self.fb_dtsg: Optional[str] = None
def _normalize_cookies(self, cookies: str) -> str:
"""Normalize cookies from JSON array to string format"""
if not cookies:
return ""
try:
trimmed = cookies.strip()
if trimmed.startswith('['):
parsed = json.loads(trimmed)
if isinstance(parsed, list):
return "; ".join(
f"{c['name']}={c['value']}" for c in parsed
if isinstance(c, dict) and 'name' in c and 'value' in c
)
except (json.JSONDecodeError, KeyError):
pass
return cookies
def _parse_cookies(self, cookie_str: str) -> Dict[str, str]:
"""Parse cookie string into dictionary"""
cookies = {}
for item in cookie_str.split('; '):
if '=' in item:
key, value = item.split('=', 1)
cookies[key] = value
return cookies
async def init_session(self) -> None:
"""Fetch lsd and fb_dtsg tokens from Meta AI page"""
print("[Meta Video] Initializing session tokens...")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
META_AI_BASE,
headers={
"Cookie": self.cookies_str,
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
}
)
html = response.text
# Extract LSD token
lsd_match = re.search(r'"LSD",\[\],\{"token":"([^"]+)"', html)
if lsd_match:
self.lsd = lsd_match.group(1)
else:
# Fallback patterns
lsd_match = re.search(r'"lsd":"([^"]+)"', html) or \
re.search(r'name="lsd" value="([^"]+)"', html)
if lsd_match:
self.lsd = lsd_match.group(1)
# Extract FB DTSG token
dtsg_match = re.search(r'DTSGInitData",\[\],\{"token":"([^"]+)"', html)
if dtsg_match:
self.fb_dtsg = dtsg_match.group(1)
else:
dtsg_match = re.search(r'"DTSGInitialData".*?"token":"([^"]+)"', html)
if dtsg_match:
self.fb_dtsg = dtsg_match.group(1)
if not self.lsd or not self.fb_dtsg:
if 'login_form' in html or 'facebook.com/login' in html:
raise Exception("Meta AI: Cookies expired or invalid - please refresh")
raise Exception("Meta AI: Failed to extract session tokens")
print(f"[Meta Video] Got tokens - lsd: {self.lsd[:10]}..., dtsg: {self.fb_dtsg[:10]}...")
async def generate_video(
self,
prompt: str,
wait_before_poll: int = 10,
max_attempts: int = 30,
poll_interval: int = 5
) -> List[MetaVideoResult]:
"""
Generate video from text prompt.
Args:
prompt: Text prompt for video generation
wait_before_poll: Seconds to wait before polling
max_attempts: Maximum polling attempts
poll_interval: Seconds between polls
Returns:
List of MetaVideoResult with video URLs
"""
# Initialize session if needed
if not self.lsd or not self.fb_dtsg:
await self.init_session()
# Step 1: Create video generation request
print(f"[Meta Video] Generating video for: \"{prompt[:50]}...\"")
conversation_id = await self._create_video_request(prompt)
if not conversation_id:
raise Exception("Failed to create video generation request")
print(f"[Meta Video] Got conversation ID: {conversation_id}")
# Step 2: Wait before polling
print(f"[Meta Video] Waiting {wait_before_poll}s before polling...")
await asyncio.sleep(wait_before_poll)
# Step 3: Poll for video URLs
video_urls = await self._poll_for_videos(
conversation_id,
max_attempts=max_attempts,
poll_interval=poll_interval
)
if not video_urls:
raise Exception("No videos generated after polling")
return [
MetaVideoResult(url=url, prompt=prompt, conversation_id=conversation_id)
for url in video_urls
]
async def _create_video_request(self, prompt: str) -> Optional[str]:
"""Send video generation request to Meta AI"""
external_conversation_id = str(uuid.uuid4())
offline_threading_id = str(int(time.time() * 1000000000))[:19]
thread_session_id = str(uuid.uuid4())
bot_offline_threading_id = str(int(time.time() * 1000000000) + 1)[:19]
qpl_join_id = str(uuid.uuid4()).replace('-', '')
spin_t = str(int(time.time()))
# Build variables JSON
variables = json.dumps({
"message": {"sensitive_string_value": prompt},
"externalConversationId": external_conversation_id,
"offlineThreadingId": offline_threading_id,
"threadSessionId": thread_session_id,
"isNewConversation": True,
"suggestedPromptIndex": None,
"promptPrefix": None,
"entrypoint": "KADABRA__CHAT__UNIFIED_INPUT_BAR",
"attachments": [],
"attachmentsV2": [],
"activeMediaSets": [],
"activeCardVersions": [],
"activeArtifactVersion": None,
"userUploadEditModeInput": None,
"reelComposeInput": None,
"qplJoinId": qpl_join_id,
"sourceRemixPostId": None,
"gkPlannerOrReasoningEnabled": True,
"selectedModel": "BASIC_OPTION",
"conversationMode": None,
"selectedAgentType": "PLANNER",
"conversationStarterId": None,
"promptType": None,
"artifactRewriteOptions": None,
"imagineOperationRequest": None,
"imagineClientOptions": {"orientation": "VERTICAL"},
"spaceId": None,
"sparkSnapshotId": None,
"topicPageId": None,
"includeSpace": False,
"storybookId": None,
"messagePersistentInput": {
"attachment_size": None,
"attachment_type": None,
"bot_message_offline_threading_id": bot_offline_threading_id,
"conversation_mode": None,
"external_conversation_id": external_conversation_id,
"is_new_conversation": True,
"meta_ai_entry_point": "KADABRA__CHAT__UNIFIED_INPUT_BAR",
"offline_threading_id": offline_threading_id,
"prompt_id": None,
"prompt_session_id": thread_session_id
},
"alakazam_enabled": True,
"skipInFlightMessageWithParams": None,
"__relay_internal__pv__KadabraSocialSearchEnabledrelayprovider": False,
"__relay_internal__pv__KadabraZeitgeistEnabledrelayprovider": False,
"__relay_internal__pv__alakazam_enabledrelayprovider": True,
"__relay_internal__pv__AbraArtifactsEnabledrelayprovider": True,
"__relay_internal__pv__AbraPlannerEnabledrelayprovider": True,
"__relay_internal__pv__WebPixelRatiorelayprovider": 1,
"__relay_internal__pv__KadabraVideoDeliveryRequestrelayprovider": {
"dash_manifest_requests": [{}],
"progressive_url_requests": [{"quality": "HD"}, {"quality": "SD"}]
}
}, separators=(',', ':'))
# Build multipart form body
boundary = "----WebKitFormBoundaryu59CeaZS4ag939lz"
body = self._build_multipart_body(boundary, variables, spin_t)
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.5',
'content-type': f'multipart/form-data; boundary={boundary}',
'origin': META_AI_BASE,
'referer': f'{META_AI_BASE}/',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'x-fb-lsd': self.lsd,
}
url = f"{GRAPHQL_URL}?fb_dtsg={self.fb_dtsg}&jazoest=25499&lsd={self.lsd}"
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
url,
headers=headers,
cookies=self.cookies_dict,
content=body.encode('utf-8')
)
if response.status_code == 200:
return external_conversation_id
else:
print(f"[Meta Video] Request failed: {response.status_code}")
return None
except Exception as e:
print(f"[Meta Video] Request error: {e}")
return None
def _build_multipart_body(
self,
boundary: str,
variables: str,
spin_t: str
) -> str:
"""Build multipart form body for video request"""
parts = [
('av', '813590375178585'),
('__user', '0'),
('__a', '1'),
('__req', 'q'),
('__hs', '20413.HYP:kadabra_pkg.2.1...0'),
('dpr', '1'),
('__ccg', 'GOOD'),
('__rev', '1030219547'),
('__s', 'q59jx4:9bnqdw:3ats33'),
('__hsi', '7575127759957881428'),
('__dyn', '7xeUjG1mxu1syUqxemh0no6u5U4e2C1vzEdE98K360CEbo1nEhw2nVEtwMw6ywaq221FwpUO0n24oaEnxO0Bo7O2l0Fwqo31w9O1lwlE-U2zxe2GewbS361qw82dUlwhE-15wmo423-0j52oS0Io5d0bS1LBwNwKG0WE8oC1IwGw-wlUcE2-G2O7E5y1rwa211wo84y1iwfe1aw'),
('__csr', ''),
('__comet_req', '72'),
('fb_dtsg', self.fb_dtsg),
('jazoest', '25499'),
('lsd', self.lsd),
('__spin_r', '1030219547'),
('__spin_b', 'trunk'),
('__spin_t', spin_t),
('__jssesw', '1'),
('__crn', 'comet.kadabra.KadabraAssistantRoute'),
('fb_api_caller_class', 'RelayModern'),
('fb_api_req_friendly_name', 'useKadabraSendMessageMutation'),
('server_timestamps', 'true'),
('variables', variables),
('doc_id', '25290947477183545'),
]
body_lines = []
for name, value in parts:
body_lines.append(f'------{boundary[4:]}')
body_lines.append(f'Content-Disposition: form-data; name="{name}"')
body_lines.append('')
body_lines.append(value)
body_lines.append(f'------{boundary[4:]}--')
return '\r\n'.join(body_lines) + '\r\n'
async def _poll_for_videos(
self,
conversation_id: str,
max_attempts: int = 30,
poll_interval: int = 5
) -> List[str]:
"""Poll for video URLs from a conversation"""
print(f"[Meta Video] Polling for videos (max {max_attempts} attempts)...")
variables = {
"prompt_id": conversation_id,
"__relay_internal__pv__AbraIsLoggedOutrelayprovider": False,
"__relay_internal__pv__alakazam_enabledrelayprovider": True,
"__relay_internal__pv__AbraArtifactsEnabledrelayprovider": True,
"__relay_internal__pv__AbraPlannerEnabledrelayprovider": True,
"__relay_internal__pv__WebPixelRatiorelayprovider": 1,
"__relay_internal__pv__KadabraVideoDeliveryRequestrelayprovider": {
"dash_manifest_requests": [{}],
"progressive_url_requests": [{"quality": "HD"}, {"quality": "SD"}]
}
}
data = {
'av': '813590375178585',
'__user': '0',
'__a': '1',
'__req': 's',
'__hs': '20413.HYP:kadabra_pkg.2.1...0',
'dpr': '1',
'__ccg': 'GOOD',
'__rev': '1030219547',
'__comet_req': '72',
'fb_dtsg': self.fb_dtsg,
'jazoest': '25499',
'lsd': self.lsd,
'__spin_r': '1030219547',
'__spin_b': 'trunk',
'__spin_t': str(int(time.time())),
'__jssesw': '1',
'__crn': 'comet.kadabra.KadabraAssistantRoute',
'fb_api_caller_class': 'RelayModern',
'fb_api_req_friendly_name': 'KadabraPromptRootQuery',
'server_timestamps': 'true',
'variables': json.dumps(variables),
'doc_id': '25290569913909283',
}
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.5',
'content-type': 'application/x-www-form-urlencoded',
'origin': META_AI_BASE,
'referer': f'{META_AI_BASE}/',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'x-fb-lsd': self.lsd,
'x-fb-friendly-name': 'KadabraPromptRootQuery'
}
for attempt in range(1, max_attempts + 1):
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
GRAPHQL_URL,
headers=headers,
cookies=self.cookies_dict,
data=data
)
if response.status_code == 200:
video_urls = self._extract_video_urls(response.text)
if video_urls:
print(f"[Meta Video] Found {len(video_urls)} video(s) on attempt {attempt}")
return video_urls
else:
print(f"[Meta Video] Attempt {attempt}/{max_attempts} - no videos yet")
except Exception as e:
print(f"[Meta Video] Poll error: {e}")
await asyncio.sleep(poll_interval)
return []
def _extract_video_urls(self, response_text: str) -> List[str]:
"""Extract video URLs from Meta AI response"""
video_urls = set()
try:
data = json.loads(response_text)
def search_for_urls(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key in ['video_url', 'progressive_url', 'generated_video_uri', 'uri', 'url']:
if isinstance(value, str) and 'fbcdn' in value and '.mp4' in value:
video_urls.add(value)
search_for_urls(value)
elif isinstance(obj, list):
for item in obj:
search_for_urls(item)
elif isinstance(obj, str):
if 'fbcdn' in obj and ('.mp4' in obj or 'video' in obj.lower()):
urls = re.findall(r'https?://[^\s"\'<>]+\.mp4[^\s"\'<>]*', obj)
video_urls.update(urls)
search_for_urls(data)
except json.JSONDecodeError:
# Fallback regex extraction
urls = re.findall(r'https?://[^\s"\'<>]+fbcdn[^\s"\'<>]+\.mp4[^\s"\'<>]*', response_text)
video_urls.update(urls)
return list(video_urls)