""" Meta AI Client for Python/FastAPI Port of lib/providers/meta-client.ts Handles: - Session initialization from meta.ai - GraphQL mutation for image generation (Abra) - Streaming response parsing - Polling for async results - Free wrapper fallback support """ import httpx import json import uuid import re import asyncio from typing import Optional, Dict, List, Any META_AI_BASE = "https://www.meta.ai" GRAPHQL_ENDPOINT = f"{META_AI_BASE}/api/graphql/" # Orientation mapping ORIENTATION_MAP = { "portrait": "VERTICAL", "landscape": "HORIZONTAL", "square": "SQUARE" } class MetaImageResult: def __init__(self, url: str, data: Optional[str], prompt: str, model: str): self.url = url self.data = data self.prompt = prompt self.model = model def to_dict(self) -> Dict[str, Any]: return { "url": self.url, "data": self.data, "prompt": self.prompt, "model": self.model, "aspectRatio": "1:1" } class MetaSession: def __init__(self): self.lsd: Optional[str] = None self.fb_dtsg: Optional[str] = None self.access_token: Optional[str] = None self.external_conversation_id: Optional[str] = None class MetaAIClient: def __init__( self, cookies: str, use_free_wrapper: bool = True, free_wrapper_url: str = "http://localhost:8000" ): self.cookies = self._normalize_cookies(cookies) self.session = MetaSession() self.use_free_wrapper = use_free_wrapper self.free_wrapper_url = free_wrapper_url if self.cookies: self._parse_session_from_cookies() def _normalize_cookies(self, cookies: str) -> str: """Normalize cookies from JSON array to string format""" if not cookies: return "" try: trimmed = cookies.strip() if trimmed.startswith('['): parsed = json.loads(trimmed) if isinstance(parsed, list): return "; ".join( f"{c['name']}={c['value']}" for c in parsed if isinstance(c, dict) and 'name' in c and 'value' in c ) except (json.JSONDecodeError, KeyError): pass return cookies def _parse_session_from_cookies(self) -> None: """Extract session tokens from cookies""" lsd_match = re.search(r'lsd=([^;]+)', self.cookies) if lsd_match: self.session.lsd = lsd_match.group(1) dtsg_match = re.search(r'fb_dtsg=([^;]+)', self.cookies) if dtsg_match: self.session.fb_dtsg = dtsg_match.group(1) def _parse_cookies_to_dict(self, cookie_str: str) -> Dict[str, str]: """Parse cookie string to dictionary""" result = {} if not cookie_str: return result for pair in cookie_str.split(';'): pair = pair.strip() if '=' in pair: key, _, value = pair.partition('=') result[key.strip()] = value.strip() return result async def get_session(self) -> MetaSession: """Get initialized session tokens""" if not self.use_free_wrapper and not self.session.lsd and not self.session.fb_dtsg: await self._init_session() return self.session def get_cookies(self) -> str: return self.cookies async def generate( self, prompt: str, num_images: int = 4, aspect_ratio: str = "portrait" ) -> List[MetaImageResult]: """Generate images using Meta AI's Imagine model""" print(f"[Meta AI] Generating images for: \"{prompt[:50]}...\" ({aspect_ratio})") if self.use_free_wrapper: return await self._generate_with_free_wrapper(prompt, num_images) # Initialize session if needed if not self.session.access_token: await self._init_session() # Use "Imagine" prefix for image generation image_prompt = prompt if prompt.lower().startswith('imagine') else f"Imagine {prompt}" # Send the prompt via GraphQL response = await self._send_prompt(image_prompt, aspect_ratio) # Extract images images = self._extract_images(response, prompt) if not images: print("[Meta AI] No images in initial response, polling...") images = await self._poll_for_images(response, prompt) return images async def _generate_with_free_wrapper( self, prompt: str, num_images: int ) -> List[MetaImageResult]: """Generate using free API wrapper""" print(f"[Meta Wrapper] Generating for: \"{prompt[:50]}...\" via {self.free_wrapper_url}") cookie_dict = self._parse_cookies_to_dict(self.cookies) async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post( f"{self.free_wrapper_url}/chat", headers={"Content-Type": "application/json"}, json={ "message": f"Imagine {prompt}", "stream": False, "cookies": cookie_dict } ) if response.status_code != 200: error_text = response.text[:200] raise Exception(f"Meta Wrapper Error: {response.status_code} - {error_text}") data = response.json() images: List[MetaImageResult] = [] # Check for media in response if data.get("media") and isinstance(data["media"], list): for m in data["media"]: if m.get("url"): images.append(MetaImageResult( url=m["url"], data=None, prompt=prompt, model="meta-wrapper" )) # Fallback checks if not images and data.get("images") and isinstance(data["images"], list): for url in data["images"]: images.append(MetaImageResult( url=url, data=None, prompt=prompt, model="meta-wrapper" )) if not images: raise Exception("Meta Wrapper returned no images") return images async def _init_session(self) -> None: """Initialize session - get access token from meta.ai page""" print("[Meta AI] Initializing session...") async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( META_AI_BASE, headers={ "Cookie": self.cookies, "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", "Accept-Language": "en-US,en;q=0.9" } ) html = response.text # Extract access token token_match = re.search(r'"accessToken":"([^"]+)"', html) if not token_match: token_match = re.search(r'accessToken["\']\\s*:\\s*["\']([^"\']+)["\']', html) # Extract LSD token lsd_match = ( re.search(r'"LSD",\[\],\{"token":"([^"]+)"', html) or re.search(r'"lsd":"([^"]+)"', html) or re.search(r'name="lsd" value="([^"]+)"', html) ) if lsd_match: self.session.lsd = lsd_match.group(1) # Extract DTSG token dtsg_match = ( re.search(r'"DTSGInitialData".*?"token":"([^"]+)"', html) or re.search(r'"token":"([^"]+)"', html) ) if dtsg_match: self.session.fb_dtsg = dtsg_match.group(1) if token_match: self.session.access_token = token_match.group(1) print("[Meta AI] Got access token") elif 'login_form' in html or 'login_page' in html: raise Exception("Meta AI: Cookies expired or invalid") else: print("[Meta AI] Warning: Failed to extract access token") async def _send_prompt(self, prompt: str, aspect_ratio: str = "portrait") -> Any: """Send prompt via GraphQL mutation""" external_conversation_id = str(uuid.uuid4()) timestamp = int(asyncio.get_event_loop().time() * 1000) random_part = int(str(uuid.uuid4().int)[:7]) offline_threading_id = str((timestamp << 22) | random_part) self.session.external_conversation_id = external_conversation_id orientation = ORIENTATION_MAP.get(aspect_ratio, "VERTICAL") variables = { "message": { "sensitive_string_value": prompt }, "externalConversationId": external_conversation_id, "offlineThreadingId": offline_threading_id, "suggestedPromptIndex": None, "flashVideoRecapInput": {"images": []}, "flashPreviewInput": None, "promptPrefix": None, "entrypoint": "ABRA__CHAT__TEXT", "icebreaker_type": "TEXT", "imagineClientOptions": {"orientation": orientation}, "__relay_internal__pv__AbraDebugDevOnlyrelayprovider": False, "__relay_internal__pv__WebPixelRatiorelayprovider": 1 } body = { "fb_api_caller_class": "RelayModern", "fb_api_req_friendly_name": "useAbraSendMessageMutation", "variables": json.dumps(variables), "server_timestamps": "true", "doc_id": "7783822248314888" } if self.session.lsd: body["lsd"] = self.session.lsd if self.session.fb_dtsg: body["fb_dtsg"] = self.session.fb_dtsg headers = { "Content-Type": "application/x-www-form-urlencoded", "Cookie": self.cookies, "Origin": META_AI_BASE, "Referer": f"{META_AI_BASE}/", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36" } if self.session.access_token: headers["Authorization"] = f"OAuth {self.session.access_token}" async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post( GRAPHQL_ENDPOINT, headers=headers, data=body ) raw_text = response.text if response.status_code != 200: raise Exception(f"Meta AI Error: {response.status_code} - {raw_text[:500]}") # Parse streaming response last_valid_response = None for line in raw_text.split('\n'): if not line.strip(): continue try: parsed = json.loads(line) streaming_state = ( parsed.get("data", {}) .get("xfb_abra_send_message", {}) .get("bot_response_message", {}) .get("streaming_state") ) if streaming_state == "OVERALL_DONE": last_valid_response = parsed break # Check for imagine_card imagine_card = ( parsed.get("data", {}) .get("xfb_abra_send_message", {}) .get("bot_response_message", {}) .get("imagine_card") ) if imagine_card and imagine_card.get("session", {}).get("media_sets"): last_valid_response = parsed except json.JSONDecodeError: continue if not last_valid_response: if "login_form" in raw_text or "facebook.com/login" in raw_text: raise Exception("Meta AI: Session expired. Please refresh cookies.") raise Exception("Meta AI: No valid response found") return last_valid_response def _extract_images(self, response: Any, original_prompt: str) -> List[MetaImageResult]: """Extract image URLs from Meta AI response""" images: List[MetaImageResult] = [] message_data = ( response.get("data", {}) .get("xfb_abra_send_message", {}) .get("bot_response_message") ) if message_data: images.extend(self._extract_images_from_message(message_data, original_prompt)) # Recursive search fallback if not images and response.get("data"): print("[Meta AI] Structured extraction failed, doing recursive search...") found_urls = self._recursive_search_for_images(response["data"]) for url in found_urls: images.append(MetaImageResult( url=url, data=None, prompt=original_prompt, model="meta" )) return images def _extract_images_from_message( self, message_data: Dict, original_prompt: str ) -> List[MetaImageResult]: """Helper to extract images from a single message node""" images: List[MetaImageResult] = [] imagine_card = message_data.get("imagine_card") if imagine_card and imagine_card.get("session", {}).get("media_sets"): for media_set in imagine_card["session"]["media_sets"]: imagine_media = media_set.get("imagine_media", []) for media in imagine_media: url = media.get("uri") or media.get("image_uri") if url: images.append(MetaImageResult( url=url, data=None, prompt=original_prompt, model="meta" )) # Check attachments attachments = message_data.get("attachments", []) for attachment in attachments: media = attachment.get("media", {}) url = media.get("image_uri") or media.get("uri") if url: images.append(MetaImageResult( url=url, data=None, prompt=original_prompt, model="meta" )) return images def _recursive_search_for_images( self, obj: Any, found: Optional[set] = None ) -> List[str]: """Recursive search for image-like URLs""" if found is None: found = set() if not obj or not isinstance(obj, (dict, list)): return [] if isinstance(obj, dict): for key, val in obj.items(): if isinstance(val, str): if ('fbcdn.net' in val or 'meta.ai' in val) and \ any(ext in val for ext in ['.jpg', '.png', '.webp', 'image_uri=', '/imagine/']): found.add(val) elif isinstance(val, (dict, list)): self._recursive_search_for_images(val, found) elif isinstance(obj, list): for item in obj: self._recursive_search_for_images(item, found) return list(found) async def _poll_for_images( self, initial_response: Any, prompt: str ) -> List[MetaImageResult]: """Poll for image generation completion""" conversation_id = ( initial_response.get("data", {}) .get("node", {}) .get("external_conversation_id") ) if not conversation_id: return [] max_attempts = 30 poll_interval = 2 for attempt in range(max_attempts): print(f"[Meta AI] Polling attempt {attempt + 1}/{max_attempts}...") await asyncio.sleep(poll_interval) variables = {"external_conversation_id": conversation_id} body = { "fb_api_caller_class": "RelayModern", "fb_api_req_friendly_name": "KadabraPromptRootQuery", "variables": json.dumps(variables), "doc_id": "25290569913909283" } if self.session.lsd: body["lsd"] = self.session.lsd if self.session.fb_dtsg: body["fb_dtsg"] = self.session.fb_dtsg headers = { "Content-Type": "application/x-www-form-urlencoded", "Cookie": self.cookies, "Origin": META_AI_BASE } if self.session.access_token: headers["Authorization"] = f"OAuth {self.session.access_token}" try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( GRAPHQL_ENDPOINT, headers=headers, data=body ) data = response.json() images = self._extract_images(data, prompt) if images: print(f"[Meta AI] Got {len(images)} image(s) after polling!") return images status = data.get("data", {}).get("kadabra_prompt", {}).get("status") if status in ["FAILED", "ERROR"]: break except Exception as e: print(f"[Meta AI] Poll error: {e}") return [] async def download_as_base64(self, url: str) -> str: """Download image from URL and convert to base64""" import base64 async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( url, headers={ "Cookie": self.cookies, "Referer": META_AI_BASE } ) return base64.b64encode(response.content).decode('utf-8')