import requests import json import time import base64 import os import uuid import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(levelname)s:%(name)s:%(message)s') logger = logging.getLogger("whisk_client") # Constants from reverse engineering AUTH_ENDPOINT = "https://labs.google/fx/api/auth/session" UPLOAD_ENDPOINT = "https://labs.google/fx/api/trpc/backbone.uploadImage" # Endpoint 1: Text-to-Image # (Captured in Step 405) GENERATE_ENDPOINT = "https://aisandbox-pa.googleapis.com/v1/whisk:generateImage" # Endpoint 2: Reference Image (Recipe) # (Captured in Step 424) RECIPE_ENDPOINT = "https://aisandbox-pa.googleapis.com/v1/whisk:runImageRecipe" DEFAULT_HEADERS = { "Origin": "https://labs.google", "Content-Type": "application/json", "Referer": "https://labs.google/fx/tools/image-fx", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", } class WhiskClientError(Exception): pass def parse_cookies(cookie_input): if not cookie_input: return {} cookies = {} cookie_input = cookie_input.strip() if cookie_input.startswith('[') and cookie_input.endswith(']'): try: cookie_list = json.loads(cookie_input) for c in cookie_list: name = c.get('name') value = c.get('value') if name and value: cookies[name] = value return cookies except json.JSONDecodeError: pass for item in cookie_input.split(';'): if '=' in item: name, value = item.split('=', 1) cookies[name.strip()] = value.strip() return cookies def get_session_token(cookies): logger.info("Fetching session token from labs.google...") try: response = requests.get( AUTH_ENDPOINT, headers={**DEFAULT_HEADERS}, cookies=cookies, timeout=30 ) response.raise_for_status() data = response.json() if not data.get('access_token'): raise WhiskClientError("Session response missing access_token") return data['access_token'] except Exception as e: logger.error(f"Failed to fetch session token: {e}") raise WhiskClientError(f"Authentication failed: {str(e)}") def upload_reference_image(image_path, cookies): if not image_path or not os.path.exists(image_path): return None logger.info(f"Uploading reference image: {image_path}") try: with open(image_path, "rb") as img_file: import mimetypes mime_type, _ = mimetypes.guess_type(image_path) if not mime_type: mime_type = "image/png" b64_data = base64.b64encode(img_file.read()).decode('utf-8') data_uri = f"data:{mime_type};base64,{b64_data}" payload = { "json": { "clientContext": { "workflowId": str(uuid.uuid4()), "sessionId": str(int(time.time() * 1000)) }, "uploadMediaInput": { "mediaCategory": "MEDIA_CATEGORY_SUBJECT", "rawBytes": data_uri, "caption": "" } } } response = requests.post( UPLOAD_ENDPOINT, headers=DEFAULT_HEADERS, cookies=cookies, json=payload, timeout=60 ) if not response.ok: raise WhiskClientError(f"Image upload failed: {response.text}") data = response.json() try: media_id = data['result']['data']['json']['result']['uploadMediaGenerationId'] except (KeyError, TypeError): raise WhiskClientError("Failed to retrieve uploadMediaGenerationId") logger.info(f"Image uploaded successfully. ID: {media_id}") return media_id except Exception as e: logger.error(f"Error uploading image: {e}") raise e def generate_image_whisk(prompt, cookie_str, **kwargs): cookies = parse_cookies(cookie_str) if not cookies: raise WhiskClientError("No valid cookies found") access_token = get_session_token(cookies) ref_image_path = kwargs.get('reference_image_path') media_generation_id = None if ref_image_path: try: media_generation_id = upload_reference_image(ref_image_path, cookies) except Exception as e: logger.warning(f"Skipping reference image due to upload error: {e}") aspect_ratio_map = { "1:1": "IMAGE_ASPECT_RATIO_SQUARE", "9:16": "IMAGE_ASPECT_RATIO_PORTRAIT", "16:9": "IMAGE_ASPECT_RATIO_LANDSCAPE", "4:3": "IMAGE_ASPECT_RATIO_LANDSCAPE_FOUR_THREE", "3:4": "IMAGE_ASPECT_RATIO_PORTRAIT", "Auto": "IMAGE_ASPECT_RATIO_SQUARE" } aspect_ratio_key = kwargs.get('aspect_ratio', 'Auto') aspect_ratio_enum = aspect_ratio_map.get(aspect_ratio_key, "IMAGE_ASPECT_RATIO_SQUARE") seed = kwargs.get('seed', int(time.time())) headers = { **DEFAULT_HEADERS, "Authorization": f"Bearer {access_token}" } # BRANCH: Use Recipe Endpoint if Reference Image exists if media_generation_id: target_endpoint = RECIPE_ENDPOINT payload = { "clientContext": { "workflowId": str(uuid.uuid4()), "tool": "BACKBONE", "sessionId": str(int(time.time() * 1000)) }, "seed": seed, "imageModelSettings": { "imageModel": "GEM_PIX", "aspectRatio": aspect_ratio_enum }, "userInstruction": prompt, "recipeMediaInputs": [{ "mediaInput": { "mediaCategory": "MEDIA_CATEGORY_SUBJECT", "mediaGenerationId": media_generation_id } }] } else: # BRANCH: Use Generate Endpoint for Text-to-Image # NOTE: Payload for generateImage is inferred to be userInput based. # If this fails, we might need further inspection, but Recipe flow is the priority. target_endpoint = GENERATE_ENDPOINT payload = { "userInput": { "candidatesCount": 2, "prompts": [prompt], "seed": seed }, "clientContext": { "workflowId": str(uuid.uuid4()), "tool": "IMAGE_FX", # Usually ImageFX for T2I "sessionId": str(int(time.time() * 1000)) }, "modelInput": { "modelNameType": "IMAGEN_3_5", # Usually Imagen 3 for ImageFX "aspectRatio": aspect_ratio_enum } } logger.info(f"Generating image. Endpoint: {target_endpoint}, Prompt: {prompt}") try: response = requests.post( target_endpoint, headers=headers, json=payload, timeout=120 ) if not response.ok: error_text = response.text try: err_json = response.json() details = err_json.get('error', {}).get('details', []) if any(d.get('reason') in ['PUBLIC_ERROR_UNSAFE_GENERATION', 'PUBLIC_ERROR_SEXUAL'] for d in details): raise WhiskClientError("⚠️ Google Safety Filter Triggered. Prompt bị từ chối do nội dung không an toàn.") except (json.JSONDecodeError, WhiskClientError) as e: if isinstance(e, WhiskClientError): raise e # Additional T2I Fallback: If generateImage fails 400, try Recipe with empty media? # Not implementing strictly to avoid loops, but helpful mental note. raise WhiskClientError(f"Generation failed ({response.status_code}): {error_text}") # Parse Response json_resp = response.json() images = [] if 'imagePanels' in json_resp: for panel in json_resp['imagePanels']: for img in panel.get('generatedImages', []): if 'encodedImage' in img: images.append(img['encodedImage']) if not images: logger.error(f"Unexpected response structure: {json_resp.keys()}") raise WhiskClientError("No images found in response") return base64.b64decode(images[0]) except requests.exceptions.Timeout: raise WhiskClientError("Timout connecting to Google Whisk.") except Exception as e: logger.error(f"Whisk Generation Error: {e}") raise WhiskClientError(str(e))