apix/services/crawl4ai/app/main.py
Khoa.vo bae4c487da
Some checks are pending
CI / build (18.x) (push) Waiting to run
CI / build (20.x) (push) Waiting to run
feat: add image-to-video support for Meta AI
- Add Litterbox temporary image hosting for image URLs
- Update backend to accept image_base64 parameter
- Update TypeScript client and API route
- Subject button now enabled for Meta AI (for image-to-video)
- Button changes from 'Video' to 'Animate' when subject is set
- Pink/purple gradient for image-to-video, blue/cyan for text-to-video
2026-01-06 14:11:26 +07:00

355 lines
11 KiB
Python

"""
Meta AI FastAPI Service (v2.0)
Uses metaai-api library for Meta AI image generation.
See: https://github.com/mir-ashiq/metaai-api
"""
from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import uuid
from .models import (
GenerateRequest,
GenerateResponse,
ImageResult,
TaskStatusResponse,
HealthResponse,
GrokChatRequest,
GrokChatResponse,
VideoGenerateRequest,
VideoGenerateResponse,
VideoResult
)
from .grok_client import GrokChatClient
from .meta_crawler import meta_crawler
# Initialize Grok client
grok_client = GrokChatClient()
# Task storage (in-memory for simplicity)
tasks: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown events"""
print("[MetaAI] Starting Meta AI service...")
yield
print("[MetaAI] Shutting down...")
app = FastAPI(
title="Meta AI Image Generation Service",
description="FastAPI wrapper for Meta AI image generation using metaai-api",
version="2.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint"""
return HealthResponse(
status="healthy",
version="2.0.0",
browser_ready=True # metaai-api handles this internally
)
@app.get("/rate-limit")
async def get_rate_limit():
"""Get current rate limiting status"""
return meta_crawler.get_rate_limit_status()
@app.post("/generate/sync", response_model=GenerateResponse)
async def generate_sync(request: GenerateRequest):
"""
Synchronous image generation - returns when complete.
Requires:
- prompt: The image generation prompt
- cookies: Facebook/Meta cookies (JSON array or string format)
"""
try:
images = await meta_crawler.generate_images(
prompt=request.prompt,
cookies=request.cookies,
num_images=request.num_images
)
return GenerateResponse(
success=True,
images=images,
error=None
)
except Exception as e:
return GenerateResponse(
success=False,
images=[],
error=str(e)
)
@app.post("/generate", response_model=GenerateResponse)
async def generate_async(request: GenerateRequest, background_tasks: BackgroundTasks):
"""
Async image generation - returns immediately with task_id.
Poll /status/{task_id} for results.
"""
task_id = str(uuid.uuid4())
tasks[task_id] = {
"status": "pending",
"images": [],
"error": None
}
async def run_generation():
try:
images = await meta_crawler.generate_images(
prompt=request.prompt,
cookies=request.cookies,
num_images=request.num_images
)
tasks[task_id] = {
"status": "completed",
"images": images,
"error": None
}
except Exception as e:
tasks[task_id] = {
"status": "failed",
"images": [],
"error": str(e)
}
# Run in background
background_tasks.add_task(asyncio.create_task, run_generation())
return GenerateResponse(
success=True,
images=[],
error=None,
task_id=task_id
)
@app.get("/status/{task_id}", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
"""Get status of async generation task"""
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
task = tasks[task_id]
return TaskStatusResponse(
task_id=task_id,
status=task["status"],
images=task["images"],
error=task["error"]
)
@app.delete("/status/{task_id}")
async def delete_task(task_id: str):
"""Clean up completed task"""
if task_id in tasks:
del tasks[task_id]
return {"deleted": True}
raise HTTPException(status_code=404, detail="Task not found")
@app.post("/grok/chat", response_model=GrokChatResponse)
async def grok_chat(request: GrokChatRequest):
"""
Chat with Grok AI
"""
try:
response = await grok_client.chat(request.message, request.history, request.cookies, request.user_agent)
return GrokChatResponse(response=response)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/video/generate", response_model=VideoGenerateResponse)
async def generate_video(request: VideoGenerateRequest):
"""
Generate a video from a text prompt (and optionally an image) using Meta AI.
This uses the metaai_api library's video generation feature.
- Text-to-Video: Just provide a prompt
- Image-to-Video: Provide a prompt + image_base64
Video generation takes longer than image generation (30-60+ seconds).
Requires:
- prompt: The video generation prompt
- cookies: Facebook/Meta cookies (JSON array or string format)
- image_base64: Optional base64 image data for image-to-video
"""
import json
import asyncio
import base64
import requests as sync_requests
from concurrent.futures import ThreadPoolExecutor
try:
# Parse cookies to dict format for MetaAI
cookies_str = request.cookies.strip()
cookies_dict = {}
if cookies_str.startswith('['):
# JSON array format from Cookie Editor
parsed = json.loads(cookies_str)
if isinstance(parsed, list):
cookies_dict = {c['name']: c['value'] for c in parsed if 'name' in c and 'value' in c}
else:
# String format: "name1=value1; name2=value2"
for pair in cookies_str.split(';'):
pair = pair.strip()
if '=' in pair:
name, value = pair.split('=', 1)
cookies_dict[name.strip()] = value.strip()
if not cookies_dict:
return VideoGenerateResponse(
success=False,
videos=[],
error="No valid cookies provided"
)
# Handle image upload to Litterbox if image_base64 is provided
image_url = None
if request.image_base64:
print(f"[VideoGen] Uploading image to Litterbox for image-to-video...")
try:
# Extract base64 data (remove data:image/...;base64, prefix if present)
image_data = request.image_base64
if ',' in image_data:
image_data = image_data.split(',')[1]
# Decode base64 to bytes
image_bytes = base64.b64decode(image_data)
# Upload to Litterbox (temporary hosting, 1 hour expiry)
litterbox_url = "https://litterbox.catbox.moe/resources/internals/api.php"
files = {
'fileToUpload': ('image.png', image_bytes, 'image/png')
}
data = {
'reqtype': 'fileupload',
'time': '1h' # 1 hour expiry
}
upload_response = sync_requests.post(litterbox_url, files=files, data=data)
if upload_response.status_code == 200 and upload_response.text.startswith('http'):
image_url = upload_response.text.strip()
print(f"[VideoGen] Image uploaded to: {image_url}")
else:
print(f"[VideoGen] Litterbox upload failed: {upload_response.text}")
return VideoGenerateResponse(
success=False,
videos=[],
error=f"Failed to upload image: {upload_response.text[:200]}"
)
except Exception as upload_error:
print(f"[VideoGen] Image upload error: {str(upload_error)}")
return VideoGenerateResponse(
success=False,
videos=[],
error=f"Failed to upload image: {str(upload_error)}"
)
mode = "image-to-video" if image_url else "text-to-video"
print(f"[VideoGen] Starting {mode} for: '{request.prompt[:50]}...'")
# Import MetaAI and run video generation in thread pool (it's synchronous)
from metaai_api import MetaAI
def run_video_gen():
ai = MetaAI(cookies=cookies_dict)
if image_url:
# Image-to-video: Use prompt() with images parameter
result = ai.prompt(
message=request.prompt,
images=[image_url]
)
# Extract video URLs from media
video_urls = []
for media in result.get('media', []):
if media.get('type') == 'VIDEO' and media.get('url'):
video_urls.append(media['url'])
return {
'success': len(video_urls) > 0,
'video_urls': video_urls,
'message': result.get('message', '')
}
else:
# Text-to-video: Use generate_video()
return ai.generate_video(
prompt=request.prompt,
wait_before_poll=10,
max_attempts=60, # Up to 5 minutes of polling
wait_seconds=5,
verbose=True
)
# Run in thread pool since metaai_api is synchronous
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=1) as executor:
result = await loop.run_in_executor(executor, run_video_gen)
if not result.get('success', False):
error_msg = result.get('error') or result.get('message') or 'Video generation failed'
print(f"[VideoGen] Failed: {error_msg}")
return VideoGenerateResponse(
success=False,
videos=[],
error=error_msg
)
video_urls = result.get('video_urls', [])
print(f"[VideoGen] Success! Got {len(video_urls)} video(s)")
videos = [
VideoResult(
url=url,
prompt=request.prompt,
model="meta_video"
)
for url in video_urls
]
return VideoGenerateResponse(
success=True,
videos=videos,
conversation_id=result.get('conversation_id')
)
except Exception as e:
import traceback
traceback.print_exc()
print(f"[VideoGen] Error: {str(e)}")
return VideoGenerateResponse(
success=False,
videos=[],
error=str(e)
)