""" PlaywrightManager - Core class for TikTok network interception. Uses Playwright to: 1. Parse cookies from JSON format 2. Handle browser-based SSL login 3. Intercept /item_list API responses (instead of scraping HTML) """ import os import json import asyncio import traceback from typing import List, Dict, Optional, Any from playwright.async_api import async_playwright, Response, Browser, BrowserContext try: from playwright_stealth import stealth_async except ImportError: try: from playwright_stealth import Stealth async def stealth_async(page): await Stealth().apply_stealth_async(page) except ImportError: print("WARNING: playwright_stealth not found, disabling stealth mode.") async def stealth_async(page): pass COOKIES_FILE = "cookies.json" USER_AGENT_FILE = "user_agent.json" class PlaywrightManager: """Manages Playwright browser for TikTok feed interception.""" # Anti-detection browser args BROWSER_ARGS = [ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-accelerated-2d-canvas", "--no-first-run", "--no-zygote", "--disable-gpu", "--kiosk", # Force full screen mode "--start-maximized" ] DEFAULT_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" # Use installed Chrome instead of Playwright's Chromium (avoids slow download) import platform import os # Check if running on macOS if platform.system() == "Darwin" and os.path.exists("/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"): CHROME_PATH = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" else: # On Linux/Docker, use Playwright's bundled Chromium (None lets Playwright decide) CHROME_PATH = None # VNC login state (class-level to persist across requests) _vnc_playwright = None _vnc_browser = None _vnc_context = None _vnc_page = None _vnc_active = False @staticmethod def parse_json_credentials(json_creds: Any) -> tuple[List[dict], str]: """ Parse JSON credentials. Supports: 1. Array format: [{"name": "...", "value": "..."}, ...] 2. http object format: {"http": {"headers": {...}, "cookies": {...}}} Returns: (cookies_list, user_agent) """ cookies = [] user_agent = PlaywrightManager.DEFAULT_USER_AGENT # Handle array format (Cookie-Editor) if isinstance(json_creds, list): for c in json_creds: if isinstance(c, dict) and "name" in c and "value" in c: cookie = { "name": c["name"], "value": str(c["value"]), "domain": c.get("domain") or ".tiktok.com", "path": c.get("path") or "/", "secure": c.get("secure", True), "httpOnly": c.get("httpOnly", False) } if "sameSite" in c and c["sameSite"]: # Playwright expects "Strict", "Lax", or "None" ss = str(c["sameSite"]).capitalize() if ss in ["Strict", "Lax", "None"]: cookie["sameSite"] = ss cookies.append(cookie) return cookies, user_agent # Handle object format if isinstance(json_creds, dict): http_data = json_creds.get("http", {}) headers = http_data.get("headers", {}) cookies_dict = http_data.get("cookies", {}) # Get User-Agent from headers if "User-Agent" in headers: user_agent = headers["User-Agent"] # Parse cookies from the cookies dict (preferred) if cookies_dict: for name, value in cookies_dict.items(): cookies.append({ "name": name, "value": str(value), "domain": ".tiktok.com", "path": "/" }) # Fallback: parse from Cookie header string elif "Cookie" in headers: cookie_str = headers["Cookie"] for part in cookie_str.split(";"): part = part.strip() if "=" in part: name, value = part.split("=", 1) cookies.append({ "name": name.strip(), "value": value.strip(), "domain": ".tiktok.com", "path": "/" }) return cookies, user_agent @staticmethod def load_stored_credentials() -> tuple[List[dict], str]: """Load cookies and user agent from stored files.""" cookies = [] user_agent = PlaywrightManager.DEFAULT_USER_AGENT if os.path.exists(COOKIES_FILE): try: with open(COOKIES_FILE, "r") as f: data = json.load(f) if isinstance(data, list): # Sanitize each cookie for Playwright compatibility for c in data: if isinstance(c, dict) and "name" in c and "value" in c: cookie = { "name": c["name"], "value": str(c["value"]), "domain": c.get("domain") or ".tiktok.com", "path": c.get("path") or "/", } # Only add optional fields if they have valid values if c.get("secure") is not None: cookie["secure"] = bool(c["secure"]) if c.get("httpOnly") is not None: cookie["httpOnly"] = bool(c["httpOnly"]) # Sanitize sameSite - Playwright only accepts Strict|Lax|None if c.get("sameSite"): ss = str(c["sameSite"]).capitalize() if ss in ["Strict", "Lax", "None"]: cookie["sameSite"] = ss # If invalid, just omit it cookies.append(cookie) elif isinstance(data, dict): # Backward compatibility or simple dict format for name, value in data.items(): cookies.append({ "name": name, "value": str(value), "domain": ".tiktok.com", "path": "/" }) except Exception as e: print(f"Error loading cookies: {e}") if os.path.exists(USER_AGENT_FILE): try: with open(USER_AGENT_FILE, "r") as f: data = json.load(f) user_agent = data.get("user_agent", user_agent) except: pass return cookies, user_agent @staticmethod def save_credentials(cookies: List[dict] | dict, user_agent: str = None): """Save cookies and user agent to files.""" with open(COOKIES_FILE, "w") as f: json.dump(cookies, f, indent=2) if user_agent: with open(USER_AGENT_FILE, "w") as f: json.dump({"user_agent": user_agent}, f) @classmethod async def start_vnc_login(cls) -> dict: """ Start a visible browser for VNC login. The browser displays on DISPLAY=:99 which is streamed via noVNC. Returns immediately - browser stays open for user interaction. """ # Close any existing VNC session if cls._vnc_active: await cls.stop_vnc_login() print("DEBUG: Starting VNC login browser...") try: cls._vnc_playwright = await async_playwright().start() cls._vnc_browser = await cls._vnc_playwright.chromium.launch( headless=False, # Visible browser args=cls.BROWSER_ARGS ) cls._vnc_context = await cls._vnc_browser.new_context( user_agent=cls.DEFAULT_USER_AGENT, viewport={"width": 1920, "height": 1000} ) cls._vnc_page = await cls._vnc_context.new_page() await stealth_async(cls._vnc_page) await cls._vnc_page.goto("https://www.tiktok.com/login", wait_until="domcontentloaded") cls._vnc_active = True print("DEBUG: VNC browser opened with TikTok login page") return { "status": "started", "message": "Browser opened. Please login via the VNC stream." } except Exception as e: print(f"DEBUG: VNC login start error: {e}") cls._vnc_active = False return { "status": "error", "message": f"Failed to start browser: {str(e)}" } @classmethod async def check_vnc_login(cls) -> dict: """ Check if user has logged in by looking for sessionid cookie. Called by frontend via polling. """ if not cls._vnc_active or not cls._vnc_context: return {"status": "not_active", "logged_in": False} try: all_cookies = await cls._vnc_context.cookies() cookies_found = {} for cookie in all_cookies: if cookie.get("domain", "").endswith("tiktok.com"): cookies_found[cookie["name"]] = cookie["value"] if "sessionid" in cookies_found: # Save cookies and close browser cls.save_credentials(cookies_found, cls.DEFAULT_USER_AGENT) await cls.stop_vnc_login() return { "status": "success", "logged_in": True, "message": "Login successful!", "cookie_count": len(cookies_found) } return {"status": "waiting", "logged_in": False} except Exception as e: print(f"DEBUG: VNC check error: {e}") return {"status": "error", "logged_in": False, "message": str(e)} @classmethod async def stop_vnc_login(cls) -> dict: """Close the VNC browser session.""" print("DEBUG: Stopping VNC login browser...") try: if cls._vnc_browser: await cls._vnc_browser.close() if cls._vnc_playwright: await cls._vnc_playwright.stop() except Exception as e: print(f"DEBUG: Error closing VNC browser: {e}") cls._vnc_browser = None cls._vnc_context = None cls._vnc_page = None cls._vnc_playwright = None cls._vnc_active = False return {"status": "stopped"} @staticmethod async def credential_login(username: str, password: str, timeout_seconds: int = 60) -> dict: """ Headless login using username/password. Works on Docker/NAS deployments without a display. Args: username: TikTok username, email, or phone password: TikTok password timeout_seconds: Max time to wait for login Returns: {"status": "success/error", "message": "...", "cookie_count": N} """ print(f"DEBUG: Starting headless credential login for: {username}") async with async_playwright() as p: browser = await p.chromium.launch( headless=True, args=PlaywrightManager.BROWSER_ARGS ) context = await browser.new_context( user_agent=PlaywrightManager.DEFAULT_USER_AGENT ) page = await context.new_page() await stealth_async(page) try: # Navigate to TikTok login page await page.goto("https://www.tiktok.com/login/phone-or-email/email", wait_until="domcontentloaded") await asyncio.sleep(2) print("DEBUG: Looking for login form...") # Wait for and fill username/email field username_selector = 'input[name="username"], input[placeholder*="Email"], input[placeholder*="email"], input[type="text"]' await page.wait_for_selector(username_selector, timeout=10000) await page.fill(username_selector, username) await asyncio.sleep(0.5) # Fill password field password_selector = 'input[type="password"]' await page.wait_for_selector(password_selector, timeout=5000) await page.fill(password_selector, password) await asyncio.sleep(0.5) print("DEBUG: Credentials filled, clicking login...") # Click login button login_button = 'button[type="submit"], button[data-e2e="login-button"]' await page.click(login_button) # Wait for login to complete - poll for sessionid cookie print("DEBUG: Waiting for login to complete...") elapsed = 0 check_interval = 2 cookies_found = {} while elapsed < timeout_seconds: await asyncio.sleep(check_interval) elapsed += check_interval # Check for error messages error_el = await page.query_selector('[class*="error"], [class*="Error"]') if error_el: error_text = await error_el.inner_text() if error_text and len(error_text) > 0: await browser.close() return { "status": "error", "message": f"Login failed: {error_text[:100]}", "cookie_count": 0 } # Check cookies all_cookies = await context.cookies() for cookie in all_cookies: if cookie.get("domain", "").endswith("tiktok.com"): cookies_found[cookie["name"]] = cookie["value"] if "sessionid" in cookies_found: print(f"DEBUG: Login successful! Found {len(cookies_found)} cookies.") break # Check if CAPTCHA or verification needed captcha = await page.query_selector('[class*="captcha"], [class*="Captcha"], [class*="verify"]') if captcha: await browser.close() return { "status": "error", "message": "TikTok requires verification (CAPTCHA). Please try the cookie method.", "cookie_count": 0 } print(f"DEBUG: Waiting for login... ({elapsed}s)") await browser.close() if "sessionid" not in cookies_found: return { "status": "error", "message": "Login timed out. Check your credentials or try the cookie method.", "cookie_count": 0 } # Save credentials PlaywrightManager.save_credentials(cookies_found, PlaywrightManager.DEFAULT_USER_AGENT) return { "status": "success", "message": "Successfully logged in!", "cookie_count": len(cookies_found) } except Exception as e: await browser.close() print(f"DEBUG: Login error: {e}") return { "status": "error", "message": f"Login failed: {str(e)[:100]}", "cookie_count": 0 } @staticmethod async def browser_login(timeout_seconds: int = 180) -> dict: """ Open visible browser for user to login via TikTok's SSL login. Waits for sessionid cookie to be set. Returns: {"status": "success/timeout", "cookies": {...}, "cookie_count": N} """ print("DEBUG: Opening browser for TikTok login...") async with async_playwright() as p: browser = await p.chromium.launch( headless=False, args=PlaywrightManager.BROWSER_ARGS ) context = await browser.new_context( user_agent=PlaywrightManager.DEFAULT_USER_AGENT ) page = await context.new_page() await stealth_async(page) # Navigate to TikTok login await page.goto("https://www.tiktok.com/login", wait_until="domcontentloaded") print("DEBUG: Login page opened. Waiting for user to complete login...") # Poll for sessionid cookie elapsed = 0 check_interval = 2 cookies_found = {} while elapsed < timeout_seconds: await asyncio.sleep(check_interval) elapsed += check_interval all_cookies = await context.cookies() for cookie in all_cookies: if cookie.get("domain", "").endswith("tiktok.com"): cookies_found[cookie["name"]] = cookie["value"] if "sessionid" in cookies_found: print(f"DEBUG: Login detected! Found {len(cookies_found)} cookies.") break print(f"DEBUG: Waiting for login... ({elapsed}s)") await browser.close() if "sessionid" not in cookies_found: return { "status": "timeout", "message": "Login timed out. Please try again.", "cookie_count": 0 } # Save credentials PlaywrightManager.save_credentials(cookies_found, PlaywrightManager.DEFAULT_USER_AGENT) return { "status": "success", "message": "Successfully connected to TikTok!", "cookie_count": len(cookies_found) } @staticmethod async def intercept_feed(cookies: List[dict] = None, user_agent: str = None, scroll_count: int = 5) -> List[dict]: """Navigate to TikTok feed and intercept API responses.""" try: return await PlaywrightManager._intercept_feed_impl(cookies, user_agent, scroll_count) except Exception as e: print(f"DEBUG: Error in intercept_feed: {e}") print(traceback.format_exc()) raise e @staticmethod async def _intercept_feed_impl(cookies: List[dict] = None, user_agent: str = None, scroll_count: int = 5) -> List[dict]: if not cookies: cookies, user_agent = PlaywrightManager.load_stored_credentials() if not user_agent: user_agent = PlaywrightManager.DEFAULT_USER_AGENT if not cookies: print("DEBUG: No cookies available") return [] print(f"DEBUG: Starting network interception with {len(cookies)} cookies (scrolls={scroll_count})") captured_videos = [] async def handle_response(response: Response): """Capture /item_list API responses.""" nonlocal captured_videos url = response.url # Look for TikTok's feed API if "item_list" in url or "recommend/item" in url: try: data = await response.json() # TikTok returns videos in various nested formats items = [] # Try direct itemList first if data.get("itemList") and isinstance(data["itemList"], list): items = data["itemList"] elif data.get("aweme_list") and isinstance(data["aweme_list"], list): items = data["aweme_list"] # Try nested data structure elif data.get("data"): nested_data = data["data"] if isinstance(nested_data, list): for item in nested_data: if isinstance(item, dict): if "item" in item and isinstance(item["item"], dict): items.append(item["item"]) else: items.append(item) elif isinstance(nested_data, dict): items = nested_data.get("itemList", []) or nested_data.get("aweme_list", []) for item in items: video_data = PlaywrightManager._extract_video_data(item) if video_data: captured_videos.append(video_data) print(f"DEBUG: Captured {len(items)} videos from API") except Exception as e: print(f"DEBUG: Error parsing API response: {e}") async with async_playwright() as p: browser = await p.chromium.launch( headless=True, executable_path=PlaywrightManager.CHROME_PATH, args=PlaywrightManager.BROWSER_ARGS ) context = await browser.new_context(user_agent=user_agent) if cookies: try: await context.add_cookies(cookies) print(f"DEBUG: Applied {len(cookies)} cookies to browser context") except Exception as e: print(f"DEBUG: Error applying cookies: {e}") if len(cookies) > 0: print(f"DEBUG: Sample cookie: {cookies[0]}") raise e page = await context.new_page() await stealth_async(page) # Set up response listener page.on("response", handle_response) try: # Navigate to For You page await page.goto( "https://www.tiktok.com/foryou", wait_until="domcontentloaded", timeout=30000 ) # Wait for initial load - ensure we capture at least one batch # Poll for videos if in fast mode for _ in range(10): # Max 10 seconds wait if len(captured_videos) > 0: break await asyncio.sleep(1) # If still no videos, maybe scroll once to trigger if len(captured_videos) == 0: print("DEBUG: No videos after initial load, scrolling once...") await page.evaluate("window.scrollBy(0, 800)") await asyncio.sleep(2) # Scroll loop for i in range(scroll_count): await page.evaluate("window.scrollBy(0, 800)") await asyncio.sleep(1) # Give time for API responses to be captured await asyncio.sleep(2) except Exception as e: print(f"DEBUG: Navigation error: {e}") await browser.close() print(f"DEBUG: Total captured videos: {len(captured_videos)}") return captured_videos @staticmethod def _extract_video_data(item: dict) -> Optional[dict]: """Extract video data from TikTok API item, including product/shop videos.""" try: if not isinstance(item, dict): print(f"DEBUG: Skipping invalid item (type: {type(item)})") return None # Handle different API response formats video_id = item.get("id") or item.get("aweme_id") # Get author info author_data = item.get("author", {}) author = author_data.get("uniqueId") or author_data.get("unique_id") or "unknown" # Get description desc = item.get("desc") or item.get("description") or "" # Check if this is a product/shop video is_shop_video = bool(item.get("products") or item.get("commerce_info") or item.get("poi_info")) # Get thumbnail/cover image thumbnail = None video_data = item.get("video", {}) # Try different thumbnail sources thumbnail_sources = [ video_data.get("cover"), video_data.get("dynamicCover"), video_data.get("originCover"), video_data.get("ai_dynamic_cover", {}).get("url_list", [None])[0] if isinstance(video_data.get("ai_dynamic_cover"), dict) else None, ] for src in thumbnail_sources: if src: thumbnail = src break # Get direct CDN URL - try multiple sources (including for shop videos) cdn_url = None cdn_sources = [ # Standard sources video_data.get("playAddr"), video_data.get("downloadAddr"), # Bit rate sources (often works for shop videos) video_data.get("bitrateInfo", [{}])[0].get("PlayAddr", {}).get("UrlList", [None])[0] if video_data.get("bitrateInfo") else None, # Play URL list video_data.get("play_addr", {}).get("url_list", [None])[0] if isinstance(video_data.get("play_addr"), dict) else None, # Download URL list video_data.get("download_addr", {}).get("url_list", [None])[0] if isinstance(video_data.get("download_addr"), dict) else None, ] for src in cdn_sources: if src: cdn_url = src break # Use TikTok page URL as fallback (yt-dlp resolves this) video_url = f"https://www.tiktok.com/@{author}/video/{video_id}" # Get stats (views, likes) stats = item.get("stats", {}) or item.get("statistics", {}) views = stats.get("playCount") or stats.get("play_count") or 0 likes = stats.get("diggCount") or stats.get("digg_count") or 0 if video_id and author: result = { "id": str(video_id), "url": video_url, "author": author, "description": desc[:200] if desc else f"Video by @{author}" } if thumbnail: result["thumbnail"] = thumbnail if cdn_url: result["cdn_url"] = cdn_url # Direct CDN URL for thin proxy if views: result["views"] = views if likes: result["likes"] = likes if is_shop_video: result["has_product"] = True # Flag for product videos return result except Exception as e: print(f"DEBUG: Error extracting video data: {e}") return None @staticmethod async def fetch_user_videos(username: str, cookies: list, user_agent: str = None, limit: int = 10) -> list: """ Fetch videos from a specific user's profile page. Uses Playwright to intercept the user's video list API. """ from playwright.async_api import async_playwright, Response if not user_agent: user_agent = PlaywrightManager.DEFAULT_USER_AGENT if not cookies: print("DEBUG: No cookies available for user videos") return [] print(f"DEBUG: Fetching videos for @{username}...") captured_videos = [] async def handle_response(response: Response): """Capture user's video list API responses.""" nonlocal captured_videos url = response.url # Look for user's video list API if "item_list" in url or "post/item_list" in url: try: data = await response.json() items = data.get("itemList", []) or data.get("aweme_list", []) for item in items: if len(captured_videos) >= limit: break video_data = PlaywrightManager._extract_video_data(item) if video_data: captured_videos.append(video_data) print(f"DEBUG: Captured {len(items)} videos from user API") except Exception as e: print(f"DEBUG: Error parsing user API response: {e}") async with async_playwright() as p: browser = await p.chromium.launch( headless=True, executable_path=PlaywrightManager.CHROME_PATH, args=PlaywrightManager.BROWSER_ARGS ) context = await browser.new_context(user_agent=user_agent) await context.add_cookies(cookies) page = await context.new_page() await stealth_async(page) page.on("response", handle_response) try: # Navigate to user's profile page profile_url = f"https://www.tiktok.com/@{username}" await page.goto(profile_url, wait_until="networkidle", timeout=30000) # Wait for videos to load await asyncio.sleep(2) # Scroll loop to ensure we get enough videos scroll_attempts = 0 last_count = 0 max_scrolls = 20 # Prevent infinite loops while len(captured_videos) < limit and scroll_attempts < max_scrolls: print(f"DEBUG: Scrolling profile (Current: {len(captured_videos)}/{limit})...") await page.evaluate("window.scrollBy(0, 800)") await asyncio.sleep(1.5) # Wait for network/DOM # DOM Fallback check inside loop (for hybrid loading) if len(captured_videos) == last_count: # If count didn't increase via network, try scraping DOM again # This handles cases where TikTok renders new items in DOM without standard API # (Unlikely for infinite scroll, but good safety) pass last_count = len(captured_videos) scroll_attempts += 1 # DOM Fallback: If no API captured (SSR case), scrape from DOM if len(captured_videos) == 0: print("DEBUG: No API response for user videos, trying DOM scrape (SSR)...") video_elements = await page.locator('div[data-e2e="user-post-item"]').all() for el in video_elements: if len(captured_videos) >= limit: break try: # Extract data from DOM attributes/links url = await el.locator("a").get_attribute("href") desc = await el.locator("img").get_attribute("alt") # Try to find specific img for cover # Often the img alt is the description if url: # Parse video ID and author from URL # Format: https://www.tiktok.com/@user/video/123456... if "/video/" in url: parts = url.split("/video/") vid_id = parts[1].split("?")[0] if len(parts) > 1 else "" # We already know the author from the function arg, but can verify # Construct basic video object dom_video = { "id": vid_id, "url": url, "author": username, "description": desc or f"Video by @{username}", "views": 0, # Cannot easily get from list view DOM "likes": 0 } # Try to get thumbnail info thumb = await el.locator("img").get_attribute("src") if thumb: dom_video["thumbnail"] = thumb captured_videos.append(dom_video) except Exception as el_err: print(f"DEBUG: Error extracting DOM item: {el_err}") except Exception as e: print(f"DEBUG: Error navigating to profile: {e}") await browser.close() print(f"DEBUG: Total captured user videos: {len(captured_videos)}") return captured_videos @staticmethod async def search_videos(query: str, cookies: list, user_agent: str = None, limit: int = 20, cursor: int = 0) -> list: """ Search for videos by keyword or hashtag. Optimized: Uses page.evaluate to fetch specific offsets via internal API. """ from playwright.async_api import async_playwright from urllib.parse import quote import json if not user_agent: user_agent = PlaywrightManager.DEFAULT_USER_AGENT print(f"DEBUG: Searching for '{query}' (limit={limit}, cursor={cursor})...") captured_videos = [] async with async_playwright() as p: browser = await p.chromium.launch( headless=True, executable_path=PlaywrightManager.CHROME_PATH, args=PlaywrightManager.BROWSER_ARGS ) context = await browser.new_context( user_agent=user_agent, viewport={"width": 1280, "height": 720} ) if cookies: await context.add_cookies(cookies) page = await context.new_page() await stealth_async(page) try: # 1. Navigate to Search Page to initialize session/state # We don't need to wait for full load if we are just going to fetch search_url = f"https://www.tiktok.com/search?q={quote(query)}" await page.goto(search_url, wait_until="domcontentloaded", timeout=20000) # 2. If cursor > 0 (or always), Try to fetch API directly from browser context # This leverages the browser's valid session/signature generation print(f"DEBUG: Executing internal API fetch for offset {cursor}...") api_script = f""" async () => {{ const url = "https://www.tiktok.com/api/search/general/full/?keyword={quote(query)}&offset={cursor}&count={limit}&search_source=normal_search&is_filter_search=0"; try {{ const res = await fetch(url); return await res.json(); }} catch (e) {{ return {{ error: e.toString() }}; }} }} """ data = await page.evaluate(api_script) # 3. Parse Results if data and "error" not in data: items = [] # Try data list directly (general search) if data.get("data") and isinstance(data["data"], list): for item in data["data"]: if isinstance(item, dict): if "item" in item: items.append(item["item"]) elif "aweme" in item: items.append(item["aweme"]) elif "type" in item and item["type"] == 1: # Video type items.append(item) # Try itemList (item search) elif data.get("itemList"): items = data["itemList"] elif data.get("item_list"): items = data["item_list"] except Exception as e: print(f"DEBUG: Search navigation error: {e}") await browser.close() print(f"DEBUG: Total captured search videos: {len(captured_videos)}") return captured_videos @staticmethod async def fetch_suggested_accounts(cookies: list, user_agent: str = None, limit: int = 50) -> list: """ Fetch trending/suggested accounts from TikTok Vietnam. Uses the discover/creators API. """ from playwright.async_api import async_playwright, Response if not user_agent: user_agent = PlaywrightManager.DEFAULT_USER_AGENT captured_accounts = [] async def handle_response(response: Response): """Capture suggested accounts from API responses.""" nonlocal captured_accounts url = response.url # Look for suggest/discover APIs if any(x in url for x in ["suggest", "discover", "recommend/user", "creator"]): try: data = await response.json() # Different API formats users = data.get("userList", []) or data.get("users", []) or data.get("data", []) for item in users: user_data = item.get("user", item) if isinstance(item, dict) else item if isinstance(user_data, dict): username = user_data.get("uniqueId") or user_data.get("unique_id") if username: captured_accounts.append({ "username": username, "nickname": user_data.get("nickname", username), "avatar": user_data.get("avatarThumb") or user_data.get("avatar"), "followers": user_data.get("followerCount", 0), "verified": user_data.get("verified", False), "region": "VN" }) if users: print(f"DEBUG: Captured {len(users)} suggested accounts") except Exception as e: pass # Ignore parse errors async with async_playwright() as p: browser = await p.chromium.launch( headless=True, executable_path=PlaywrightManager.CHROME_PATH, args=PlaywrightManager.BROWSER_ARGS ) context = await browser.new_context( user_agent=user_agent, locale="vi-VN", # Vietnamese locale timezone_id="Asia/Ho_Chi_Minh" ) await context.add_cookies(cookies) page = await context.new_page() await stealth_async(page) page.on("response", handle_response) try: # Navigate to TikTok explore/discover page (Vietnam) await page.goto("https://www.tiktok.com/explore?lang=vi-VN", wait_until="networkidle", timeout=30000) await asyncio.sleep(3) # Also try the For You page to capture suggested await page.goto("https://www.tiktok.com/foryou?lang=vi-VN", wait_until="domcontentloaded", timeout=15000) await asyncio.sleep(2) # Scroll to trigger more suggestions for _ in range(3): await page.evaluate("window.scrollBy(0, 800)") await asyncio.sleep(1) except Exception as e: print(f"DEBUG: Error fetching suggested accounts: {e}") await browser.close() # Remove duplicates by username seen = set() unique_accounts = [] for acc in captured_accounts: if acc["username"] not in seen: seen.add(acc["username"]) unique_accounts.append(acc) print(f"DEBUG: Total unique suggested accounts: {len(unique_accounts)}") return unique_accounts[:limit] # Singleton instance playwright_manager = PlaywrightManager()