- Add Litterbox temporary image hosting for image URLs - Update backend to accept image_base64 parameter - Update TypeScript client and API route - Subject button now enabled for Meta AI (for image-to-video) - Button changes from 'Video' to 'Animate' when subject is set - Pink/purple gradient for image-to-video, blue/cyan for text-to-video
355 lines
11 KiB
Python
355 lines
11 KiB
Python
"""
|
|
Meta AI FastAPI Service (v2.0)
|
|
|
|
Uses metaai-api library for Meta AI image generation.
|
|
See: https://github.com/mir-ashiq/metaai-api
|
|
"""
|
|
from contextlib import asynccontextmanager
|
|
from fastapi import FastAPI, BackgroundTasks, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
import asyncio
|
|
import uuid
|
|
|
|
|
|
|
|
from .models import (
|
|
GenerateRequest,
|
|
GenerateResponse,
|
|
ImageResult,
|
|
TaskStatusResponse,
|
|
HealthResponse,
|
|
GrokChatRequest,
|
|
GrokChatResponse,
|
|
VideoGenerateRequest,
|
|
VideoGenerateResponse,
|
|
VideoResult
|
|
)
|
|
from .grok_client import GrokChatClient
|
|
from .meta_crawler import meta_crawler
|
|
|
|
# Initialize Grok client
|
|
grok_client = GrokChatClient()
|
|
|
|
|
|
# Task storage (in-memory for simplicity)
|
|
tasks: dict = {}
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Startup and shutdown events"""
|
|
print("[MetaAI] Starting Meta AI service...")
|
|
yield
|
|
print("[MetaAI] Shutting down...")
|
|
|
|
|
|
app = FastAPI(
|
|
title="Meta AI Image Generation Service",
|
|
description="FastAPI wrapper for Meta AI image generation using metaai-api",
|
|
version="2.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/health", response_model=HealthResponse)
|
|
async def health_check():
|
|
"""Health check endpoint"""
|
|
return HealthResponse(
|
|
status="healthy",
|
|
version="2.0.0",
|
|
browser_ready=True # metaai-api handles this internally
|
|
)
|
|
|
|
|
|
@app.get("/rate-limit")
|
|
async def get_rate_limit():
|
|
"""Get current rate limiting status"""
|
|
return meta_crawler.get_rate_limit_status()
|
|
|
|
|
|
@app.post("/generate/sync", response_model=GenerateResponse)
|
|
async def generate_sync(request: GenerateRequest):
|
|
"""
|
|
Synchronous image generation - returns when complete.
|
|
|
|
Requires:
|
|
- prompt: The image generation prompt
|
|
- cookies: Facebook/Meta cookies (JSON array or string format)
|
|
"""
|
|
try:
|
|
images = await meta_crawler.generate_images(
|
|
prompt=request.prompt,
|
|
cookies=request.cookies,
|
|
num_images=request.num_images
|
|
)
|
|
|
|
return GenerateResponse(
|
|
success=True,
|
|
images=images,
|
|
error=None
|
|
)
|
|
|
|
except Exception as e:
|
|
return GenerateResponse(
|
|
success=False,
|
|
images=[],
|
|
error=str(e)
|
|
)
|
|
|
|
|
|
@app.post("/generate", response_model=GenerateResponse)
|
|
async def generate_async(request: GenerateRequest, background_tasks: BackgroundTasks):
|
|
"""
|
|
Async image generation - returns immediately with task_id.
|
|
Poll /status/{task_id} for results.
|
|
"""
|
|
task_id = str(uuid.uuid4())
|
|
|
|
tasks[task_id] = {
|
|
"status": "pending",
|
|
"images": [],
|
|
"error": None
|
|
}
|
|
|
|
async def run_generation():
|
|
try:
|
|
images = await meta_crawler.generate_images(
|
|
prompt=request.prompt,
|
|
cookies=request.cookies,
|
|
num_images=request.num_images
|
|
)
|
|
tasks[task_id] = {
|
|
"status": "completed",
|
|
"images": images,
|
|
"error": None
|
|
}
|
|
except Exception as e:
|
|
tasks[task_id] = {
|
|
"status": "failed",
|
|
"images": [],
|
|
"error": str(e)
|
|
}
|
|
|
|
# Run in background
|
|
background_tasks.add_task(asyncio.create_task, run_generation())
|
|
|
|
return GenerateResponse(
|
|
success=True,
|
|
images=[],
|
|
error=None,
|
|
task_id=task_id
|
|
)
|
|
|
|
|
|
@app.get("/status/{task_id}", response_model=TaskStatusResponse)
|
|
async def get_task_status(task_id: str):
|
|
"""Get status of async generation task"""
|
|
if task_id not in tasks:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
task = tasks[task_id]
|
|
return TaskStatusResponse(
|
|
task_id=task_id,
|
|
status=task["status"],
|
|
images=task["images"],
|
|
error=task["error"]
|
|
)
|
|
|
|
|
|
@app.delete("/status/{task_id}")
|
|
async def delete_task(task_id: str):
|
|
"""Clean up completed task"""
|
|
if task_id in tasks:
|
|
del tasks[task_id]
|
|
return {"deleted": True}
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
|
|
|
|
@app.post("/grok/chat", response_model=GrokChatResponse)
|
|
async def grok_chat(request: GrokChatRequest):
|
|
"""
|
|
Chat with Grok AI
|
|
"""
|
|
try:
|
|
response = await grok_client.chat(request.message, request.history, request.cookies, request.user_agent)
|
|
return GrokChatResponse(response=response)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/video/generate", response_model=VideoGenerateResponse)
|
|
async def generate_video(request: VideoGenerateRequest):
|
|
"""
|
|
Generate a video from a text prompt (and optionally an image) using Meta AI.
|
|
|
|
This uses the metaai_api library's video generation feature.
|
|
- Text-to-Video: Just provide a prompt
|
|
- Image-to-Video: Provide a prompt + image_base64
|
|
|
|
Video generation takes longer than image generation (30-60+ seconds).
|
|
|
|
Requires:
|
|
- prompt: The video generation prompt
|
|
- cookies: Facebook/Meta cookies (JSON array or string format)
|
|
- image_base64: Optional base64 image data for image-to-video
|
|
"""
|
|
import json
|
|
import asyncio
|
|
import base64
|
|
import requests as sync_requests
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
try:
|
|
# Parse cookies to dict format for MetaAI
|
|
cookies_str = request.cookies.strip()
|
|
cookies_dict = {}
|
|
|
|
if cookies_str.startswith('['):
|
|
# JSON array format from Cookie Editor
|
|
parsed = json.loads(cookies_str)
|
|
if isinstance(parsed, list):
|
|
cookies_dict = {c['name']: c['value'] for c in parsed if 'name' in c and 'value' in c}
|
|
else:
|
|
# String format: "name1=value1; name2=value2"
|
|
for pair in cookies_str.split(';'):
|
|
pair = pair.strip()
|
|
if '=' in pair:
|
|
name, value = pair.split('=', 1)
|
|
cookies_dict[name.strip()] = value.strip()
|
|
|
|
if not cookies_dict:
|
|
return VideoGenerateResponse(
|
|
success=False,
|
|
videos=[],
|
|
error="No valid cookies provided"
|
|
)
|
|
|
|
# Handle image upload to Litterbox if image_base64 is provided
|
|
image_url = None
|
|
if request.image_base64:
|
|
print(f"[VideoGen] Uploading image to Litterbox for image-to-video...")
|
|
try:
|
|
# Extract base64 data (remove data:image/...;base64, prefix if present)
|
|
image_data = request.image_base64
|
|
if ',' in image_data:
|
|
image_data = image_data.split(',')[1]
|
|
|
|
# Decode base64 to bytes
|
|
image_bytes = base64.b64decode(image_data)
|
|
|
|
# Upload to Litterbox (temporary hosting, 1 hour expiry)
|
|
litterbox_url = "https://litterbox.catbox.moe/resources/internals/api.php"
|
|
files = {
|
|
'fileToUpload': ('image.png', image_bytes, 'image/png')
|
|
}
|
|
data = {
|
|
'reqtype': 'fileupload',
|
|
'time': '1h' # 1 hour expiry
|
|
}
|
|
|
|
upload_response = sync_requests.post(litterbox_url, files=files, data=data)
|
|
|
|
if upload_response.status_code == 200 and upload_response.text.startswith('http'):
|
|
image_url = upload_response.text.strip()
|
|
print(f"[VideoGen] Image uploaded to: {image_url}")
|
|
else:
|
|
print(f"[VideoGen] Litterbox upload failed: {upload_response.text}")
|
|
return VideoGenerateResponse(
|
|
success=False,
|
|
videos=[],
|
|
error=f"Failed to upload image: {upload_response.text[:200]}"
|
|
)
|
|
except Exception as upload_error:
|
|
print(f"[VideoGen] Image upload error: {str(upload_error)}")
|
|
return VideoGenerateResponse(
|
|
success=False,
|
|
videos=[],
|
|
error=f"Failed to upload image: {str(upload_error)}"
|
|
)
|
|
|
|
mode = "image-to-video" if image_url else "text-to-video"
|
|
print(f"[VideoGen] Starting {mode} for: '{request.prompt[:50]}...'")
|
|
|
|
# Import MetaAI and run video generation in thread pool (it's synchronous)
|
|
from metaai_api import MetaAI
|
|
|
|
def run_video_gen():
|
|
ai = MetaAI(cookies=cookies_dict)
|
|
|
|
if image_url:
|
|
# Image-to-video: Use prompt() with images parameter
|
|
result = ai.prompt(
|
|
message=request.prompt,
|
|
images=[image_url]
|
|
)
|
|
# Extract video URLs from media
|
|
video_urls = []
|
|
for media in result.get('media', []):
|
|
if media.get('type') == 'VIDEO' and media.get('url'):
|
|
video_urls.append(media['url'])
|
|
|
|
return {
|
|
'success': len(video_urls) > 0,
|
|
'video_urls': video_urls,
|
|
'message': result.get('message', '')
|
|
}
|
|
else:
|
|
# Text-to-video: Use generate_video()
|
|
return ai.generate_video(
|
|
prompt=request.prompt,
|
|
wait_before_poll=10,
|
|
max_attempts=60, # Up to 5 minutes of polling
|
|
wait_seconds=5,
|
|
verbose=True
|
|
)
|
|
|
|
# Run in thread pool since metaai_api is synchronous
|
|
loop = asyncio.get_event_loop()
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
result = await loop.run_in_executor(executor, run_video_gen)
|
|
|
|
if not result.get('success', False):
|
|
error_msg = result.get('error') or result.get('message') or 'Video generation failed'
|
|
print(f"[VideoGen] Failed: {error_msg}")
|
|
return VideoGenerateResponse(
|
|
success=False,
|
|
videos=[],
|
|
error=error_msg
|
|
)
|
|
|
|
video_urls = result.get('video_urls', [])
|
|
print(f"[VideoGen] Success! Got {len(video_urls)} video(s)")
|
|
|
|
videos = [
|
|
VideoResult(
|
|
url=url,
|
|
prompt=request.prompt,
|
|
model="meta_video"
|
|
)
|
|
for url in video_urls
|
|
]
|
|
|
|
return VideoGenerateResponse(
|
|
success=True,
|
|
videos=videos,
|
|
conversation_id=result.get('conversation_id')
|
|
)
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
print(f"[VideoGen] Error: {str(e)}")
|
|
return VideoGenerateResponse(
|
|
success=False,
|
|
videos=[],
|
|
error=str(e)
|
|
)
|