""" Meta AI Wrapper - Lightweight wrapper around metaai-api library Uses the mir-ashiq/metaai-api library for actual Meta AI interaction. This wrapper adds rate limiting and adapts the response format for our API. To update the library: pip install -U git+https://github.com/mir-ashiq/metaai-api.git """ import asyncio import json import time import random from concurrent.futures import ThreadPoolExecutor from typing import Optional from metaai_api import MetaAI from .config import settings from .models import ImageResult class RateLimiter: """Simple rate limiter to prevent shadowban""" def __init__(self): self.last_request_time: float = 0 self.request_count_hour: int = 0 self.hour_start: float = time.time() async def wait_if_needed(self): """Wait if rate limit would be exceeded""" current_time = time.time() # Reset hourly counter if current_time - self.hour_start > 3600: self.request_count_hour = 0 self.hour_start = current_time # Check hourly limit if self.request_count_hour >= settings.max_requests_per_hour: wait_time = 3600 - (current_time - self.hour_start) if wait_time > 0: raise Exception(f"Hourly rate limit reached. Try again in {int(wait_time)} seconds.") # Enforce minimum delay between requests (with jitter) elapsed = current_time - self.last_request_time min_delay = settings.rate_limit_delay + random.uniform(0, 5) if elapsed < min_delay: await asyncio.sleep(min_delay - elapsed) self.last_request_time = time.time() self.request_count_hour += 1 def get_status(self) -> dict: """Get current rate limit status""" current_time = time.time() time_since_last = current_time - self.last_request_time if self.last_request_time else 0 time_until_reset = max(0, 3600 - (current_time - self.hour_start)) return { "requests_this_hour": self.request_count_hour, "max_requests_per_hour": settings.max_requests_per_hour, "seconds_since_last_request": int(time_since_last), "seconds_until_hour_reset": int(time_until_reset), "can_request_now": self.request_count_hour < settings.max_requests_per_hour } class MetaAICrawler: """ Thin wrapper around metaai-api library. Handles: - Cookie format conversion (JSON array to dict) - Rate limiting - Response format adaptation The actual Meta AI interaction is delegated to metaai-api. """ def __init__(self): self.rate_limiter = RateLimiter() self._executor = ThreadPoolExecutor(max_workers=2) def _parse_cookies(self, cookies: str) -> dict: """Convert cookies from various formats to dict""" if not cookies: return {} # Try JSON array format first try: cookies_str = cookies.strip() if cookies_str.startswith('['): parsed = json.loads(cookies_str) if isinstance(parsed, list): return {c['name']: c['value'] for c in parsed if 'name' in c and 'value' in c} except json.JSONDecodeError: pass # Try cookie string format: "name1=value1; name2=value2" result = {} for pair in cookies.split(';'): pair = pair.strip() if '=' in pair: name, value = pair.split('=', 1) result[name.strip()] = value.strip() return result def _generate_sync(self, prompt: str, cookies_dict: dict) -> dict: """Synchronous generation using metaai-api""" ai = MetaAI(cookies=cookies_dict) return ai.prompt(prompt) async def generate_images(self, prompt: str, cookies: str, num_images: int = 4) -> list[ImageResult]: """ Generate images using Meta AI's Imagine model. Args: prompt: The image generation prompt cookies: Meta AI/Facebook cookies (JSON array or string format) num_images: Number of images (metaai-api returns 4 by default) Returns: List of ImageResult objects with generated image URLs """ # Rate limiting await self.rate_limiter.wait_if_needed() print(f"[MetaCrawler] Generating images for: '{prompt[:50]}...'") # Parse cookies cookies_dict = self._parse_cookies(cookies) if not cookies_dict: raise Exception("No valid cookies provided") # Check for essential cookies if 'c_user' not in cookies_dict and 'xs' not in cookies_dict: print("[MetaCrawler] Warning: Missing Facebook auth cookies (c_user, xs)") # Prepare prompt (add "Imagine" prefix if not present) image_prompt = prompt if prompt.lower().startswith('imagine') else f"Imagine {prompt}" # Run in thread pool since metaai_api is synchronous loop = asyncio.get_event_loop() try: result = await loop.run_in_executor( self._executor, self._generate_sync, image_prompt, cookies_dict ) except Exception as e: print(f"[MetaCrawler] Error: {str(e)}") raise # Extract media from response media = result.get('media', []) if not media: message = result.get('message', '') if message: raise Exception(f"Meta AI response: {message[:200]}") raise Exception("No images generated") print(f"[MetaCrawler] Got {len(media)} images!") # Convert to ImageResult format images = [] for item in media: if item.get('type') == 'IMAGE' and item.get('url'): images.append(ImageResult( url=item['url'], prompt=item.get('prompt', prompt), model="imagine" )) return images[:num_images] # Limit to requested count def get_rate_limit_status(self) -> dict: """Get current rate limiting status""" return self.rate_limiter.get_status() # Singleton instance meta_crawler = MetaAICrawler()