264 lines
9.4 KiB
Python
264 lines
9.4 KiB
Python
import requests
|
|
import json
|
|
import time
|
|
import base64
|
|
import os
|
|
import uuid
|
|
import logging
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(levelname)s:%(name)s:%(message)s')
|
|
logger = logging.getLogger("whisk_client")
|
|
|
|
# Constants from reverse engineering
|
|
AUTH_ENDPOINT = "https://labs.google/fx/api/auth/session"
|
|
UPLOAD_ENDPOINT = "https://labs.google/fx/api/trpc/backbone.uploadImage"
|
|
|
|
# Endpoint 1: Text-to-Image
|
|
# (Captured in Step 405)
|
|
GENERATE_ENDPOINT = "https://aisandbox-pa.googleapis.com/v1/whisk:generateImage"
|
|
|
|
# Endpoint 2: Reference Image (Recipe)
|
|
# (Captured in Step 424)
|
|
RECIPE_ENDPOINT = "https://aisandbox-pa.googleapis.com/v1/whisk:runImageRecipe"
|
|
|
|
DEFAULT_HEADERS = {
|
|
"Origin": "https://labs.google",
|
|
"Content-Type": "application/json",
|
|
"Referer": "https://labs.google/fx/tools/image-fx",
|
|
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
|
}
|
|
|
|
class WhiskClientError(Exception):
|
|
pass
|
|
|
|
def parse_cookies(cookie_input):
|
|
if not cookie_input:
|
|
return {}
|
|
|
|
cookies = {}
|
|
cookie_input = cookie_input.strip()
|
|
|
|
if cookie_input.startswith('[') and cookie_input.endswith(']'):
|
|
try:
|
|
cookie_list = json.loads(cookie_input)
|
|
for c in cookie_list:
|
|
name = c.get('name')
|
|
value = c.get('value')
|
|
if name and value:
|
|
cookies[name] = value
|
|
return cookies
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
for item in cookie_input.split(';'):
|
|
if '=' in item:
|
|
name, value = item.split('=', 1)
|
|
cookies[name.strip()] = value.strip()
|
|
return cookies
|
|
|
|
def get_session_token(cookies):
|
|
logger.info("Fetching session token from labs.google...")
|
|
try:
|
|
response = requests.get(
|
|
AUTH_ENDPOINT,
|
|
headers={**DEFAULT_HEADERS},
|
|
cookies=cookies,
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
if not data.get('access_token'):
|
|
raise WhiskClientError("Session response missing access_token")
|
|
|
|
return data['access_token']
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch session token: {e}")
|
|
raise WhiskClientError(f"Authentication failed: {str(e)}")
|
|
|
|
def upload_reference_image(image_path, cookies):
|
|
if not image_path or not os.path.exists(image_path):
|
|
return None
|
|
|
|
logger.info(f"Uploading reference image: {image_path}")
|
|
|
|
try:
|
|
with open(image_path, "rb") as img_file:
|
|
import mimetypes
|
|
mime_type, _ = mimetypes.guess_type(image_path)
|
|
if not mime_type: mime_type = "image/png"
|
|
|
|
b64_data = base64.b64encode(img_file.read()).decode('utf-8')
|
|
data_uri = f"data:{mime_type};base64,{b64_data}"
|
|
|
|
payload = {
|
|
"json": {
|
|
"clientContext": {
|
|
"workflowId": str(uuid.uuid4()),
|
|
"sessionId": str(int(time.time() * 1000))
|
|
},
|
|
"uploadMediaInput": {
|
|
"mediaCategory": "MEDIA_CATEGORY_SUBJECT",
|
|
"rawBytes": data_uri,
|
|
"caption": ""
|
|
}
|
|
}
|
|
}
|
|
|
|
response = requests.post(
|
|
UPLOAD_ENDPOINT,
|
|
headers=DEFAULT_HEADERS,
|
|
cookies=cookies,
|
|
json=payload,
|
|
timeout=60
|
|
)
|
|
|
|
if not response.ok:
|
|
raise WhiskClientError(f"Image upload failed: {response.text}")
|
|
|
|
data = response.json()
|
|
try:
|
|
media_id = data['result']['data']['json']['result']['uploadMediaGenerationId']
|
|
except (KeyError, TypeError):
|
|
raise WhiskClientError("Failed to retrieve uploadMediaGenerationId")
|
|
|
|
logger.info(f"Image uploaded successfully. ID: {media_id}")
|
|
return media_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error uploading image: {e}")
|
|
raise e
|
|
|
|
def generate_image_whisk(prompt, cookie_str, **kwargs):
|
|
cookies = parse_cookies(cookie_str)
|
|
if not cookies:
|
|
raise WhiskClientError("No valid cookies found")
|
|
|
|
access_token = get_session_token(cookies)
|
|
|
|
def load_template_favorites():
|
|
path = get_config_path('gallery_favorites.json') # Typo in original? No, it uses gallery_favorites for both? Let's check logic.
|
|
# Ah, original code had a separate logic or file?
|
|
# Let me check the original code via view first to be safe.
|
|
# It seems there was no load_template_favorites definition in the previous view.
|
|
# I will replace the top of the file definitions if they exist there.
|
|
pass
|
|
|
|
# Retrying with correct strategy: replace the specific lines if I can find them.
|
|
# I'll view the file top to locate these helpers first.
|
|
|
|
aspect_ratio_map = {
|
|
"1:1": "IMAGE_ASPECT_RATIO_SQUARE",
|
|
"9:16": "IMAGE_ASPECT_RATIO_PORTRAIT",
|
|
"16:9": "IMAGE_ASPECT_RATIO_LANDSCAPE",
|
|
"4:3": "IMAGE_ASPECT_RATIO_LANDSCAPE_FOUR_THREE",
|
|
"3:4": "IMAGE_ASPECT_RATIO_PORTRAIT",
|
|
"Auto": "IMAGE_ASPECT_RATIO_SQUARE"
|
|
}
|
|
aspect_ratio_key = kwargs.get('aspect_ratio', 'Auto')
|
|
aspect_ratio_enum = aspect_ratio_map.get(aspect_ratio_key, "IMAGE_ASPECT_RATIO_SQUARE")
|
|
|
|
seed = kwargs.get('seed', int(time.time()))
|
|
headers = {
|
|
**DEFAULT_HEADERS,
|
|
"Authorization": f"Bearer {access_token}"
|
|
}
|
|
|
|
# BRANCH: Use Recipe Endpoint if Reference Image exists
|
|
if media_generation_id:
|
|
target_endpoint = RECIPE_ENDPOINT
|
|
payload = {
|
|
"clientContext": {
|
|
"workflowId": str(uuid.uuid4()),
|
|
"tool": "BACKBONE",
|
|
"sessionId": str(int(time.time() * 1000))
|
|
},
|
|
"seed": seed,
|
|
"imageModelSettings": {
|
|
"imageModel": "GEM_PIX",
|
|
"aspectRatio": aspect_ratio_enum
|
|
},
|
|
"userInstruction": prompt,
|
|
"recipeMediaInputs": [{
|
|
"mediaInput": {
|
|
"mediaCategory": "MEDIA_CATEGORY_SUBJECT",
|
|
"mediaGenerationId": media_generation_id
|
|
}
|
|
}]
|
|
}
|
|
else:
|
|
# BRANCH: Use Generate Endpoint for Text-to-Image
|
|
# NOTE: Payload for generateImage is inferred to be userInput based.
|
|
# If this fails, we might need further inspection, but Recipe flow is the priority.
|
|
target_endpoint = GENERATE_ENDPOINT
|
|
payload = {
|
|
"userInput": {
|
|
"candidatesCount": 2,
|
|
"prompts": [prompt],
|
|
"seed": seed
|
|
},
|
|
"clientContext": {
|
|
"workflowId": str(uuid.uuid4()),
|
|
"tool": "IMAGE_FX", # Usually ImageFX for T2I
|
|
"sessionId": str(int(time.time() * 1000))
|
|
},
|
|
"modelInput": {
|
|
"modelNameType": "IMAGEN_3_5", # Usually Imagen 3 for ImageFX
|
|
"aspectRatio": aspect_ratio_enum
|
|
}
|
|
}
|
|
|
|
logger.info(f"Generating image. Endpoint: {target_endpoint}, Prompt: {prompt}")
|
|
|
|
try:
|
|
response = requests.post(
|
|
target_endpoint,
|
|
headers=headers,
|
|
json=payload,
|
|
timeout=120
|
|
)
|
|
|
|
if not response.ok:
|
|
error_text = response.text
|
|
try:
|
|
err_json = response.json()
|
|
details = err_json.get('error', {}).get('details', [])
|
|
if any(d.get('reason') in ['PUBLIC_ERROR_UNSAFE_GENERATION', 'PUBLIC_ERROR_SEXUAL'] for d in details):
|
|
raise WhiskClientError("⚠️ Google Safety Filter Triggered. Prompt bị từ chối do nội dung không an toàn.")
|
|
except (json.JSONDecodeError, WhiskClientError) as e:
|
|
if isinstance(e, WhiskClientError): raise e
|
|
|
|
# Additional T2I Fallback: If generateImage fails 400, try Recipe with empty media?
|
|
# Not implementing strictly to avoid loops, but helpful mental note.
|
|
raise WhiskClientError(f"Generation failed ({response.status_code}): {error_text}")
|
|
|
|
# Parse Response
|
|
json_resp = response.json()
|
|
|
|
images = []
|
|
if 'imagePanels' in json_resp:
|
|
for panel in json_resp['imagePanels']:
|
|
for img in panel.get('generatedImages', []):
|
|
if 'encodedImage' in img:
|
|
images.append(img['encodedImage'])
|
|
|
|
if not images:
|
|
import json
|
|
logger.error(f"WHISK DEBUG - Full Response: {json.dumps(json_resp)}")
|
|
debug_info = json.dumps(json_resp)
|
|
# check for common non-standard errors
|
|
if 'error' in json_resp:
|
|
err_msg = json_resp['error']
|
|
raise WhiskClientError(f"Whisk Error: {err_msg} | Valid Cookies? Check logs.")
|
|
|
|
# Return the full structure in the error so user can see it in UI
|
|
raise WhiskClientError(f"Whisk API returned NO IMAGES. Google says: {debug_info}")
|
|
|
|
return base64.b64decode(images[0])
|
|
|
|
except requests.exceptions.Timeout:
|
|
raise WhiskClientError("Timout connecting to Google Whisk.")
|
|
except Exception as e:
|
|
logger.error(f"Whisk Generation Error: {e}")
|
|
raise WhiskClientError(str(e))
|