""" Meta AI FastAPI Service (v2.0) Uses metaai-api library for Meta AI image generation. See: https://github.com/mir-ashiq/metaai-api """ from contextlib import asynccontextmanager from fastapi import FastAPI, BackgroundTasks, HTTPException from fastapi.middleware.cors import CORSMiddleware import asyncio import uuid from .models import ( GenerateRequest, GenerateResponse, ImageResult, TaskStatusResponse, HealthResponse, GrokChatRequest, GrokChatResponse, VideoGenerateRequest, VideoGenerateResponse, VideoResult ) from .grok_client import GrokChatClient from .meta_crawler import meta_crawler # Initialize Grok client grok_client = GrokChatClient() # Task storage (in-memory for simplicity) tasks: dict = {} @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown events""" print("[MetaAI] Starting Meta AI service...") yield print("[MetaAI] Shutting down...") app = FastAPI( title="Meta AI Image Generation Service", description="FastAPI wrapper for Meta AI image generation using metaai-api", version="2.0.0", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/health", response_model=HealthResponse) async def health_check(): """Health check endpoint""" return HealthResponse( status="healthy", version="2.0.0", browser_ready=True # metaai-api handles this internally ) @app.get("/rate-limit") async def get_rate_limit(): """Get current rate limiting status""" return meta_crawler.get_rate_limit_status() @app.post("/generate/sync", response_model=GenerateResponse) async def generate_sync(request: GenerateRequest): """ Synchronous image generation - returns when complete. Requires: - prompt: The image generation prompt - cookies: Facebook/Meta cookies (JSON array or string format) """ try: images = await meta_crawler.generate_images( prompt=request.prompt, cookies=request.cookies, num_images=request.num_images ) return GenerateResponse( success=True, images=images, error=None ) except Exception as e: return GenerateResponse( success=False, images=[], error=str(e) ) @app.post("/generate", response_model=GenerateResponse) async def generate_async(request: GenerateRequest, background_tasks: BackgroundTasks): """ Async image generation - returns immediately with task_id. Poll /status/{task_id} for results. """ task_id = str(uuid.uuid4()) tasks[task_id] = { "status": "pending", "images": [], "error": None } async def run_generation(): try: images = await meta_crawler.generate_images( prompt=request.prompt, cookies=request.cookies, num_images=request.num_images ) tasks[task_id] = { "status": "completed", "images": images, "error": None } except Exception as e: tasks[task_id] = { "status": "failed", "images": [], "error": str(e) } # Run in background background_tasks.add_task(asyncio.create_task, run_generation()) return GenerateResponse( success=True, images=[], error=None, task_id=task_id ) @app.get("/status/{task_id}", response_model=TaskStatusResponse) async def get_task_status(task_id: str): """Get status of async generation task""" if task_id not in tasks: raise HTTPException(status_code=404, detail="Task not found") task = tasks[task_id] return TaskStatusResponse( task_id=task_id, status=task["status"], images=task["images"], error=task["error"] ) @app.delete("/status/{task_id}") async def delete_task(task_id: str): """Clean up completed task""" if task_id in tasks: del tasks[task_id] return {"deleted": True} raise HTTPException(status_code=404, detail="Task not found") @app.post("/grok/chat", response_model=GrokChatResponse) async def grok_chat(request: GrokChatRequest): """ Chat with Grok AI """ try: response = await grok_client.chat(request.message, request.history, request.cookies, request.user_agent) return GrokChatResponse(response=response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/video/generate", response_model=VideoGenerateResponse) async def generate_video(request: VideoGenerateRequest): """ Generate a video from a text prompt using Meta AI. This uses the metaai_api library's video generation feature. Video generation takes longer than image generation (30-60+ seconds). Requires: - prompt: The video generation prompt - cookies: Facebook/Meta cookies (JSON array or string format) """ import json import asyncio from concurrent.futures import ThreadPoolExecutor try: # Parse cookies to dict format for MetaAI cookies_str = request.cookies.strip() cookies_dict = {} if cookies_str.startswith('['): # JSON array format from Cookie Editor parsed = json.loads(cookies_str) if isinstance(parsed, list): cookies_dict = {c['name']: c['value'] for c in parsed if 'name' in c and 'value' in c} else: # String format: "name1=value1; name2=value2" for pair in cookies_str.split(';'): pair = pair.strip() if '=' in pair: name, value = pair.split('=', 1) cookies_dict[name.strip()] = value.strip() if not cookies_dict: return VideoGenerateResponse( success=False, videos=[], error="No valid cookies provided" ) print(f"[VideoGen] Starting video generation for: '{request.prompt[:50]}...'") # Import MetaAI and run video generation in thread pool (it's synchronous) from metaai_api import MetaAI def run_video_gen(): ai = MetaAI(cookies=cookies_dict) return ai.generate_video( prompt=request.prompt, wait_before_poll=10, max_attempts=60, # Up to 5 minutes of polling wait_seconds=5, verbose=True ) # Run in thread pool since metaai_api is synchronous loop = asyncio.get_event_loop() with ThreadPoolExecutor(max_workers=1) as executor: result = await loop.run_in_executor(executor, run_video_gen) if not result.get('success', False): error_msg = result.get('error', 'Video generation failed') print(f"[VideoGen] Failed: {error_msg}") return VideoGenerateResponse( success=False, videos=[], error=error_msg ) video_urls = result.get('video_urls', []) print(f"[VideoGen] Success! Got {len(video_urls)} video(s)") videos = [ VideoResult( url=url, prompt=request.prompt, model="meta_video" ) for url in video_urls ] return VideoGenerateResponse( success=True, videos=videos, conversation_id=result.get('conversation_id') ) except Exception as e: import traceback traceback.print_exc() print(f"[VideoGen] Error: {str(e)}") return VideoGenerateResponse( success=False, videos=[], error=str(e) )