872 lines
34 KiB
Python
872 lines
34 KiB
Python
"""
|
|
PlaywrightManager - Core class for TikTok network interception.
|
|
|
|
Uses Playwright to:
|
|
1. Parse cookies from JSON format
|
|
2. Handle browser-based SSL login
|
|
3. Intercept /item_list API responses (instead of scraping HTML)
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
from typing import List, Dict, Optional
|
|
from playwright.async_api import async_playwright, Response, Browser, BrowserContext
|
|
|
|
try:
|
|
from playwright_stealth import stealth_async
|
|
except ImportError:
|
|
print("WARNING: playwright_stealth not found, disabling stealth mode.")
|
|
async def stealth_async(page):
|
|
pass
|
|
|
|
|
|
COOKIES_FILE = "cookies.json"
|
|
USER_AGENT_FILE = "user_agent.json"
|
|
|
|
|
|
class PlaywrightManager:
|
|
"""Manages Playwright browser for TikTok feed interception."""
|
|
|
|
# Anti-detection browser args
|
|
BROWSER_ARGS = [
|
|
"--disable-blink-features=AutomationControlled",
|
|
"--no-sandbox",
|
|
"--disable-setuid-sandbox",
|
|
"--disable-dev-shm-usage",
|
|
"--disable-accelerated-2d-canvas",
|
|
"--no-first-run",
|
|
"--no-zygote",
|
|
"--disable-gpu",
|
|
"--kiosk", # Force full screen mode
|
|
"--start-maximized"
|
|
]
|
|
|
|
DEFAULT_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
|
|
# VNC login state (class-level to persist across requests)
|
|
_vnc_playwright = None
|
|
_vnc_browser = None
|
|
_vnc_context = None
|
|
_vnc_page = None
|
|
_vnc_active = False
|
|
|
|
@staticmethod
|
|
def parse_json_credentials(json_creds: dict) -> tuple[List[dict], str]:
|
|
"""
|
|
Parse JSON credentials in the format:
|
|
{
|
|
"http": {
|
|
"headers": {"User-Agent": "...", "Cookie": "..."},
|
|
"cookies": {"sessionid": "...", "ttwid": "..."}
|
|
}
|
|
}
|
|
|
|
Returns: (cookies_list, user_agent)
|
|
"""
|
|
cookies = []
|
|
user_agent = PlaywrightManager.DEFAULT_USER_AGENT
|
|
|
|
http_data = json_creds.get("http", {})
|
|
headers = http_data.get("headers", {})
|
|
cookies_dict = http_data.get("cookies", {})
|
|
|
|
# Get User-Agent from headers
|
|
if "User-Agent" in headers:
|
|
user_agent = headers["User-Agent"]
|
|
|
|
# Parse cookies from the cookies dict (preferred)
|
|
if cookies_dict:
|
|
for name, value in cookies_dict.items():
|
|
cookies.append({
|
|
"name": name,
|
|
"value": str(value),
|
|
"domain": ".tiktok.com",
|
|
"path": "/"
|
|
})
|
|
# Fallback: parse from Cookie header string
|
|
elif "Cookie" in headers:
|
|
cookie_str = headers["Cookie"]
|
|
for part in cookie_str.split(";"):
|
|
part = part.strip()
|
|
if "=" in part:
|
|
name, value = part.split("=", 1)
|
|
cookies.append({
|
|
"name": name.strip(),
|
|
"value": value.strip(),
|
|
"domain": ".tiktok.com",
|
|
"path": "/"
|
|
})
|
|
|
|
return cookies, user_agent
|
|
|
|
@staticmethod
|
|
def load_stored_credentials() -> tuple[List[dict], str]:
|
|
"""Load cookies and user agent from stored files."""
|
|
cookies = []
|
|
user_agent = PlaywrightManager.DEFAULT_USER_AGENT
|
|
|
|
if os.path.exists(COOKIES_FILE):
|
|
try:
|
|
with open(COOKIES_FILE, "r") as f:
|
|
cookie_dict = json.load(f)
|
|
for name, value in cookie_dict.items():
|
|
cookies.append({
|
|
"name": name,
|
|
"value": str(value),
|
|
"domain": ".tiktok.com",
|
|
"path": "/"
|
|
})
|
|
except Exception as e:
|
|
print(f"Error loading cookies: {e}")
|
|
|
|
if os.path.exists(USER_AGENT_FILE):
|
|
try:
|
|
with open(USER_AGENT_FILE, "r") as f:
|
|
data = json.load(f)
|
|
user_agent = data.get("user_agent", user_agent)
|
|
except:
|
|
pass
|
|
|
|
return cookies, user_agent
|
|
|
|
@staticmethod
|
|
def save_credentials(cookies: dict, user_agent: str):
|
|
"""Save cookies and user agent to files."""
|
|
with open(COOKIES_FILE, "w") as f:
|
|
json.dump(cookies, f, indent=2)
|
|
|
|
with open(USER_AGENT_FILE, "w") as f:
|
|
json.dump({"user_agent": user_agent}, f)
|
|
|
|
@classmethod
|
|
async def start_vnc_login(cls) -> dict:
|
|
"""
|
|
Start a visible browser for VNC login.
|
|
The browser displays on DISPLAY=:99 which is streamed via noVNC.
|
|
Returns immediately - browser stays open for user interaction.
|
|
"""
|
|
# Close any existing VNC session
|
|
if cls._vnc_active:
|
|
await cls.stop_vnc_login()
|
|
|
|
print("DEBUG: Starting VNC login browser...")
|
|
|
|
try:
|
|
cls._vnc_playwright = await async_playwright().start()
|
|
cls._vnc_browser = await cls._vnc_playwright.chromium.launch(
|
|
headless=False, # Visible browser
|
|
args=cls.BROWSER_ARGS
|
|
)
|
|
|
|
cls._vnc_context = await cls._vnc_browser.new_context(
|
|
user_agent=cls.DEFAULT_USER_AGENT,
|
|
viewport={"width": 1920, "height": 1000}
|
|
)
|
|
|
|
cls._vnc_page = await cls._vnc_context.new_page()
|
|
await stealth_async(cls._vnc_page)
|
|
await cls._vnc_page.goto("https://www.tiktok.com/login", wait_until="domcontentloaded")
|
|
|
|
cls._vnc_active = True
|
|
print("DEBUG: VNC browser opened with TikTok login page")
|
|
|
|
return {
|
|
"status": "started",
|
|
"message": "Browser opened. Please login via the VNC stream."
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: VNC login start error: {e}")
|
|
cls._vnc_active = False
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to start browser: {str(e)}"
|
|
}
|
|
|
|
@classmethod
|
|
async def check_vnc_login(cls) -> dict:
|
|
"""
|
|
Check if user has logged in by looking for sessionid cookie.
|
|
Called by frontend via polling.
|
|
"""
|
|
if not cls._vnc_active or not cls._vnc_context:
|
|
return {"status": "not_active", "logged_in": False}
|
|
|
|
try:
|
|
all_cookies = await cls._vnc_context.cookies()
|
|
cookies_found = {}
|
|
|
|
for cookie in all_cookies:
|
|
if cookie.get("domain", "").endswith("tiktok.com"):
|
|
cookies_found[cookie["name"]] = cookie["value"]
|
|
|
|
if "sessionid" in cookies_found:
|
|
# Save cookies and close browser
|
|
cls.save_credentials(cookies_found, cls.DEFAULT_USER_AGENT)
|
|
await cls.stop_vnc_login()
|
|
|
|
return {
|
|
"status": "success",
|
|
"logged_in": True,
|
|
"message": "Login successful!",
|
|
"cookie_count": len(cookies_found)
|
|
}
|
|
|
|
return {"status": "waiting", "logged_in": False}
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: VNC check error: {e}")
|
|
return {"status": "error", "logged_in": False, "message": str(e)}
|
|
|
|
@classmethod
|
|
async def stop_vnc_login(cls) -> dict:
|
|
"""Close the VNC browser session."""
|
|
print("DEBUG: Stopping VNC login browser...")
|
|
|
|
try:
|
|
if cls._vnc_browser:
|
|
await cls._vnc_browser.close()
|
|
if cls._vnc_playwright:
|
|
await cls._vnc_playwright.stop()
|
|
except Exception as e:
|
|
print(f"DEBUG: Error closing VNC browser: {e}")
|
|
|
|
cls._vnc_browser = None
|
|
cls._vnc_context = None
|
|
cls._vnc_page = None
|
|
cls._vnc_playwright = None
|
|
cls._vnc_active = False
|
|
|
|
return {"status": "stopped"}
|
|
|
|
@staticmethod
|
|
async def credential_login(username: str, password: str, timeout_seconds: int = 60) -> dict:
|
|
"""
|
|
Headless login using username/password.
|
|
Works on Docker/NAS deployments without a display.
|
|
|
|
Args:
|
|
username: TikTok username, email, or phone
|
|
password: TikTok password
|
|
timeout_seconds: Max time to wait for login
|
|
|
|
Returns: {"status": "success/error", "message": "...", "cookie_count": N}
|
|
"""
|
|
print(f"DEBUG: Starting headless credential login for: {username}")
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(
|
|
headless=True,
|
|
args=PlaywrightManager.BROWSER_ARGS
|
|
)
|
|
|
|
context = await browser.new_context(
|
|
user_agent=PlaywrightManager.DEFAULT_USER_AGENT
|
|
)
|
|
|
|
page = await context.new_page()
|
|
await stealth_async(page)
|
|
|
|
try:
|
|
# Navigate to TikTok login page
|
|
await page.goto("https://www.tiktok.com/login/phone-or-email/email", wait_until="domcontentloaded")
|
|
await asyncio.sleep(2)
|
|
|
|
print("DEBUG: Looking for login form...")
|
|
|
|
# Wait for and fill username/email field
|
|
username_selector = 'input[name="username"], input[placeholder*="Email"], input[placeholder*="email"], input[type="text"]'
|
|
await page.wait_for_selector(username_selector, timeout=10000)
|
|
await page.fill(username_selector, username)
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Fill password field
|
|
password_selector = 'input[type="password"]'
|
|
await page.wait_for_selector(password_selector, timeout=5000)
|
|
await page.fill(password_selector, password)
|
|
await asyncio.sleep(0.5)
|
|
|
|
print("DEBUG: Credentials filled, clicking login...")
|
|
|
|
# Click login button
|
|
login_button = 'button[type="submit"], button[data-e2e="login-button"]'
|
|
await page.click(login_button)
|
|
|
|
# Wait for login to complete - poll for sessionid cookie
|
|
print("DEBUG: Waiting for login to complete...")
|
|
elapsed = 0
|
|
check_interval = 2
|
|
cookies_found = {}
|
|
|
|
while elapsed < timeout_seconds:
|
|
await asyncio.sleep(check_interval)
|
|
elapsed += check_interval
|
|
|
|
# Check for error messages
|
|
error_el = await page.query_selector('[class*="error"], [class*="Error"]')
|
|
if error_el:
|
|
error_text = await error_el.inner_text()
|
|
if error_text and len(error_text) > 0:
|
|
await browser.close()
|
|
return {
|
|
"status": "error",
|
|
"message": f"Login failed: {error_text[:100]}",
|
|
"cookie_count": 0
|
|
}
|
|
|
|
# Check cookies
|
|
all_cookies = await context.cookies()
|
|
for cookie in all_cookies:
|
|
if cookie.get("domain", "").endswith("tiktok.com"):
|
|
cookies_found[cookie["name"]] = cookie["value"]
|
|
|
|
if "sessionid" in cookies_found:
|
|
print(f"DEBUG: Login successful! Found {len(cookies_found)} cookies.")
|
|
break
|
|
|
|
# Check if CAPTCHA or verification needed
|
|
captcha = await page.query_selector('[class*="captcha"], [class*="Captcha"], [class*="verify"]')
|
|
if captcha:
|
|
await browser.close()
|
|
return {
|
|
"status": "error",
|
|
"message": "TikTok requires verification (CAPTCHA). Please try the cookie method.",
|
|
"cookie_count": 0
|
|
}
|
|
|
|
print(f"DEBUG: Waiting for login... ({elapsed}s)")
|
|
|
|
await browser.close()
|
|
|
|
if "sessionid" not in cookies_found:
|
|
return {
|
|
"status": "error",
|
|
"message": "Login timed out. Check your credentials or try the cookie method.",
|
|
"cookie_count": 0
|
|
}
|
|
|
|
# Save credentials
|
|
PlaywrightManager.save_credentials(cookies_found, PlaywrightManager.DEFAULT_USER_AGENT)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Successfully logged in!",
|
|
"cookie_count": len(cookies_found)
|
|
}
|
|
|
|
except Exception as e:
|
|
await browser.close()
|
|
print(f"DEBUG: Login error: {e}")
|
|
return {
|
|
"status": "error",
|
|
"message": f"Login failed: {str(e)[:100]}",
|
|
"cookie_count": 0
|
|
}
|
|
|
|
@staticmethod
|
|
async def browser_login(timeout_seconds: int = 180) -> dict:
|
|
"""
|
|
Open visible browser for user to login via TikTok's SSL login.
|
|
Waits for sessionid cookie to be set.
|
|
|
|
Returns: {"status": "success/timeout", "cookies": {...}, "cookie_count": N}
|
|
"""
|
|
print("DEBUG: Opening browser for TikTok login...")
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(
|
|
headless=False,
|
|
args=PlaywrightManager.BROWSER_ARGS
|
|
)
|
|
|
|
context = await browser.new_context(
|
|
user_agent=PlaywrightManager.DEFAULT_USER_AGENT
|
|
)
|
|
|
|
page = await context.new_page()
|
|
await stealth_async(page)
|
|
|
|
# Navigate to TikTok login
|
|
await page.goto("https://www.tiktok.com/login", wait_until="domcontentloaded")
|
|
print("DEBUG: Login page opened. Waiting for user to complete login...")
|
|
|
|
# Poll for sessionid cookie
|
|
elapsed = 0
|
|
check_interval = 2
|
|
cookies_found = {}
|
|
|
|
while elapsed < timeout_seconds:
|
|
await asyncio.sleep(check_interval)
|
|
elapsed += check_interval
|
|
|
|
all_cookies = await context.cookies()
|
|
|
|
for cookie in all_cookies:
|
|
if cookie.get("domain", "").endswith("tiktok.com"):
|
|
cookies_found[cookie["name"]] = cookie["value"]
|
|
|
|
if "sessionid" in cookies_found:
|
|
print(f"DEBUG: Login detected! Found {len(cookies_found)} cookies.")
|
|
break
|
|
|
|
print(f"DEBUG: Waiting for login... ({elapsed}s)")
|
|
|
|
await browser.close()
|
|
|
|
if "sessionid" not in cookies_found:
|
|
return {
|
|
"status": "timeout",
|
|
"message": "Login timed out. Please try again.",
|
|
"cookie_count": 0
|
|
}
|
|
|
|
# Save credentials
|
|
PlaywrightManager.save_credentials(cookies_found, PlaywrightManager.DEFAULT_USER_AGENT)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Successfully connected to TikTok!",
|
|
"cookie_count": len(cookies_found)
|
|
}
|
|
|
|
@staticmethod
|
|
async def intercept_feed(cookies: List[dict] = None, user_agent: str = None, scroll_count: int = 5) -> List[dict]:
|
|
"""
|
|
Navigate to TikTok For You page and intercept the /item_list API response.
|
|
|
|
Args:
|
|
cookies: Optional list of cookies
|
|
user_agent: Optional user agent
|
|
scroll_count: Number of times to scroll to fetch more videos (0 = initial load only)
|
|
|
|
Returns: List of video objects
|
|
"""
|
|
if not cookies:
|
|
cookies, user_agent = PlaywrightManager.load_stored_credentials()
|
|
|
|
if not user_agent:
|
|
user_agent = PlaywrightManager.DEFAULT_USER_AGENT
|
|
|
|
if not cookies:
|
|
print("DEBUG: No cookies available")
|
|
return []
|
|
|
|
print(f"DEBUG: Starting network interception with {len(cookies)} cookies (scrolls={scroll_count})")
|
|
|
|
captured_videos = []
|
|
|
|
async def handle_response(response: Response):
|
|
"""Capture /item_list API responses."""
|
|
nonlocal captured_videos
|
|
|
|
url = response.url
|
|
|
|
# Look for TikTok's feed API
|
|
if "item_list" in url or "recommend/item" in url:
|
|
try:
|
|
data = await response.json()
|
|
|
|
# TikTok returns videos in "itemList" or "aweme_list"
|
|
items = data.get("itemList", []) or data.get("aweme_list", [])
|
|
|
|
for item in items:
|
|
video_data = PlaywrightManager._extract_video_data(item)
|
|
if video_data:
|
|
captured_videos.append(video_data)
|
|
|
|
print(f"DEBUG: Captured {len(items)} videos from API")
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error parsing API response: {e}")
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(
|
|
headless=True,
|
|
args=PlaywrightManager.BROWSER_ARGS
|
|
)
|
|
|
|
context = await browser.new_context(user_agent=user_agent)
|
|
await context.add_cookies(cookies)
|
|
|
|
page = await context.new_page()
|
|
await stealth_async(page)
|
|
|
|
# Set up response listener
|
|
page.on("response", handle_response)
|
|
|
|
try:
|
|
# Navigate to For You page
|
|
await page.goto(
|
|
"https://www.tiktok.com/foryou",
|
|
wait_until="domcontentloaded",
|
|
timeout=30000
|
|
)
|
|
|
|
# Wait for initial load - ensure we capture at least one batch
|
|
# Poll for videos if in fast mode
|
|
for _ in range(10): # Max 10 seconds wait
|
|
if len(captured_videos) > 0:
|
|
break
|
|
await asyncio.sleep(1)
|
|
|
|
# If still no videos, maybe scroll once to trigger
|
|
if len(captured_videos) == 0:
|
|
print("DEBUG: No videos after initial load, scrolling once...")
|
|
await page.evaluate("window.scrollBy(0, 800)")
|
|
await asyncio.sleep(2)
|
|
|
|
# Scroll loop
|
|
for i in range(scroll_count):
|
|
await page.evaluate("window.scrollBy(0, 800)")
|
|
await asyncio.sleep(1)
|
|
|
|
# Give time for API responses to be captured
|
|
await asyncio.sleep(2)
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Navigation error: {e}")
|
|
|
|
await browser.close()
|
|
|
|
print(f"DEBUG: Total captured videos: {len(captured_videos)}")
|
|
return captured_videos
|
|
|
|
@staticmethod
|
|
def _extract_video_data(item: dict) -> Optional[dict]:
|
|
"""Extract video data from TikTok API item, including product/shop videos."""
|
|
try:
|
|
# Handle different API response formats
|
|
video_id = item.get("id") or item.get("aweme_id")
|
|
|
|
# Get author info
|
|
author_data = item.get("author", {})
|
|
author = author_data.get("uniqueId") or author_data.get("unique_id") or "unknown"
|
|
|
|
# Get description
|
|
desc = item.get("desc") or item.get("description") or ""
|
|
|
|
# Check if this is a product/shop video
|
|
is_shop_video = bool(item.get("products") or item.get("commerce_info") or item.get("poi_info"))
|
|
|
|
# Get thumbnail/cover image
|
|
thumbnail = None
|
|
video_data = item.get("video", {})
|
|
|
|
# Try different thumbnail sources
|
|
thumbnail_sources = [
|
|
video_data.get("cover"),
|
|
video_data.get("dynamicCover"),
|
|
video_data.get("originCover"),
|
|
video_data.get("ai_dynamic_cover", {}).get("url_list", [None])[0] if isinstance(video_data.get("ai_dynamic_cover"), dict) else None,
|
|
]
|
|
for src in thumbnail_sources:
|
|
if src:
|
|
thumbnail = src
|
|
break
|
|
|
|
# Get direct CDN URL - try multiple sources (including for shop videos)
|
|
cdn_url = None
|
|
cdn_sources = [
|
|
# Standard sources
|
|
video_data.get("playAddr"),
|
|
video_data.get("downloadAddr"),
|
|
# Bit rate sources (often works for shop videos)
|
|
video_data.get("bitrateInfo", [{}])[0].get("PlayAddr", {}).get("UrlList", [None])[0] if video_data.get("bitrateInfo") else None,
|
|
# Play URL list
|
|
video_data.get("play_addr", {}).get("url_list", [None])[0] if isinstance(video_data.get("play_addr"), dict) else None,
|
|
# Download URL list
|
|
video_data.get("download_addr", {}).get("url_list", [None])[0] if isinstance(video_data.get("download_addr"), dict) else None,
|
|
]
|
|
|
|
for src in cdn_sources:
|
|
if src:
|
|
cdn_url = src
|
|
break
|
|
|
|
# Use TikTok page URL as fallback (yt-dlp resolves this)
|
|
video_url = f"https://www.tiktok.com/@{author}/video/{video_id}"
|
|
|
|
# Get stats (views, likes)
|
|
stats = item.get("stats", {}) or item.get("statistics", {})
|
|
views = stats.get("playCount") or stats.get("play_count") or 0
|
|
likes = stats.get("diggCount") or stats.get("digg_count") or 0
|
|
|
|
if video_id and author:
|
|
result = {
|
|
"id": str(video_id),
|
|
"url": video_url,
|
|
"author": author,
|
|
"description": desc[:200] if desc else f"Video by @{author}"
|
|
}
|
|
if thumbnail:
|
|
result["thumbnail"] = thumbnail
|
|
if cdn_url:
|
|
result["cdn_url"] = cdn_url # Direct CDN URL for thin proxy
|
|
if views:
|
|
result["views"] = views
|
|
if likes:
|
|
result["likes"] = likes
|
|
if is_shop_video:
|
|
result["has_product"] = True # Flag for product videos
|
|
return result
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error extracting video data: {e}")
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
async def fetch_user_videos(username: str, cookies: list, user_agent: str = None, limit: int = 10) -> list:
|
|
"""
|
|
Fetch videos from a specific user's profile page.
|
|
Uses Playwright to intercept the user's video list API.
|
|
"""
|
|
from playwright.async_api import async_playwright, Response
|
|
|
|
if not user_agent:
|
|
user_agent = PlaywrightManager.DEFAULT_USER_AGENT
|
|
|
|
if not cookies:
|
|
print("DEBUG: No cookies available for user videos")
|
|
return []
|
|
|
|
print(f"DEBUG: Fetching videos for @{username}...")
|
|
|
|
captured_videos = []
|
|
|
|
async def handle_response(response: Response):
|
|
"""Capture user's video list API responses."""
|
|
nonlocal captured_videos
|
|
|
|
url = response.url
|
|
|
|
# Look for user's video list API
|
|
if "item_list" in url or "post/item_list" in url:
|
|
try:
|
|
data = await response.json()
|
|
|
|
items = data.get("itemList", []) or data.get("aweme_list", [])
|
|
|
|
for item in items:
|
|
if len(captured_videos) >= limit:
|
|
break
|
|
video_data = PlaywrightManager._extract_video_data(item)
|
|
if video_data:
|
|
captured_videos.append(video_data)
|
|
|
|
print(f"DEBUG: Captured {len(items)} videos from user API")
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error parsing user API response: {e}")
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(
|
|
headless=True,
|
|
args=PlaywrightManager.BROWSER_ARGS
|
|
)
|
|
|
|
context = await browser.new_context(user_agent=user_agent)
|
|
await context.add_cookies(cookies)
|
|
|
|
page = await context.new_page()
|
|
await stealth_async(page)
|
|
page.on("response", handle_response)
|
|
|
|
try:
|
|
# Navigate to user's profile page
|
|
profile_url = f"https://www.tiktok.com/@{username}"
|
|
await page.goto(profile_url, wait_until="networkidle", timeout=30000)
|
|
|
|
# Wait for videos to load
|
|
await asyncio.sleep(2)
|
|
|
|
# Scroll a bit to trigger more video loading
|
|
await page.evaluate("window.scrollBy(0, 500)")
|
|
await asyncio.sleep(1)
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error navigating to profile: {e}")
|
|
|
|
await browser.close()
|
|
|
|
print(f"DEBUG: Total captured user videos: {len(captured_videos)}")
|
|
return captured_videos
|
|
|
|
@staticmethod
|
|
async def search_videos(query: str, cookies: list, user_agent: str = None, limit: int = 12) -> list:
|
|
"""
|
|
Search for videos by keyword or hashtag.
|
|
Uses Playwright to intercept TikTok search results API.
|
|
"""
|
|
from playwright.async_api import async_playwright, Response
|
|
from urllib.parse import quote
|
|
|
|
if not user_agent:
|
|
user_agent = PlaywrightManager.DEFAULT_USER_AGENT
|
|
|
|
if not cookies:
|
|
print("DEBUG: No cookies available for search")
|
|
return []
|
|
|
|
print(f"DEBUG: Searching for '{query}'...")
|
|
|
|
captured_videos = []
|
|
|
|
async def handle_response(response: Response):
|
|
"""Capture search results API responses."""
|
|
nonlocal captured_videos
|
|
|
|
url = response.url
|
|
|
|
# Look for search results API
|
|
if "search" in url and ("item_list" in url or "video" in url or "general" in url):
|
|
try:
|
|
data = await response.json()
|
|
|
|
# Try different response formats
|
|
items = data.get("itemList", []) or data.get("data", []) or data.get("item_list", [])
|
|
|
|
for item in items:
|
|
if len(captured_videos) >= limit:
|
|
break
|
|
video_data = PlaywrightManager._extract_video_data(item)
|
|
if video_data:
|
|
captured_videos.append(video_data)
|
|
|
|
print(f"DEBUG: Captured {len(items)} videos from search API")
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error parsing search API response: {e}")
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(
|
|
headless=True,
|
|
args=PlaywrightManager.BROWSER_ARGS
|
|
)
|
|
|
|
context = await browser.new_context(user_agent=user_agent)
|
|
await context.add_cookies(cookies)
|
|
|
|
page = await context.new_page()
|
|
await stealth_async(page)
|
|
page.on("response", handle_response)
|
|
|
|
try:
|
|
# Navigate to TikTok search page
|
|
search_url = f"https://www.tiktok.com/search/video?q={quote(query)}"
|
|
await page.goto(search_url, wait_until="networkidle", timeout=30000)
|
|
|
|
# Wait for videos to load
|
|
await asyncio.sleep(3)
|
|
|
|
# Scroll to trigger more loading
|
|
for _ in range(2):
|
|
await page.evaluate("window.scrollBy(0, 800)")
|
|
await asyncio.sleep(1)
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error during search: {e}")
|
|
|
|
await browser.close()
|
|
|
|
print(f"DEBUG: Total captured search videos: {len(captured_videos)}")
|
|
return captured_videos
|
|
|
|
@staticmethod
|
|
async def fetch_suggested_accounts(cookies: list, user_agent: str = None, limit: int = 50) -> list:
|
|
"""
|
|
Fetch trending/suggested accounts from TikTok Vietnam.
|
|
Uses the discover/creators API.
|
|
"""
|
|
from playwright.async_api import async_playwright, Response
|
|
|
|
if not user_agent:
|
|
user_agent = PlaywrightManager.DEFAULT_USER_AGENT
|
|
|
|
captured_accounts = []
|
|
|
|
async def handle_response(response: Response):
|
|
"""Capture suggested accounts from API responses."""
|
|
nonlocal captured_accounts
|
|
|
|
url = response.url
|
|
|
|
# Look for suggest/discover APIs
|
|
if any(x in url for x in ["suggest", "discover", "recommend/user", "creator"]):
|
|
try:
|
|
data = await response.json()
|
|
|
|
# Different API formats
|
|
users = data.get("userList", []) or data.get("users", []) or data.get("data", [])
|
|
|
|
for item in users:
|
|
user_data = item.get("user", item) if isinstance(item, dict) else item
|
|
if isinstance(user_data, dict):
|
|
username = user_data.get("uniqueId") or user_data.get("unique_id")
|
|
if username:
|
|
captured_accounts.append({
|
|
"username": username,
|
|
"nickname": user_data.get("nickname", username),
|
|
"avatar": user_data.get("avatarThumb") or user_data.get("avatar"),
|
|
"followers": user_data.get("followerCount", 0),
|
|
"verified": user_data.get("verified", False),
|
|
"region": "VN"
|
|
})
|
|
|
|
if users:
|
|
print(f"DEBUG: Captured {len(users)} suggested accounts")
|
|
|
|
except Exception as e:
|
|
pass # Ignore parse errors
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(
|
|
headless=True,
|
|
args=PlaywrightManager.BROWSER_ARGS
|
|
)
|
|
|
|
context = await browser.new_context(
|
|
user_agent=user_agent,
|
|
locale="vi-VN", # Vietnamese locale
|
|
timezone_id="Asia/Ho_Chi_Minh"
|
|
)
|
|
await context.add_cookies(cookies)
|
|
|
|
page = await context.new_page()
|
|
await stealth_async(page)
|
|
page.on("response", handle_response)
|
|
|
|
try:
|
|
# Navigate to TikTok explore/discover page (Vietnam)
|
|
await page.goto("https://www.tiktok.com/explore?lang=vi-VN", wait_until="networkidle", timeout=30000)
|
|
await asyncio.sleep(3)
|
|
|
|
# Also try the For You page to capture suggested
|
|
await page.goto("https://www.tiktok.com/foryou?lang=vi-VN", wait_until="domcontentloaded", timeout=15000)
|
|
await asyncio.sleep(2)
|
|
|
|
# Scroll to trigger more suggestions
|
|
for _ in range(3):
|
|
await page.evaluate("window.scrollBy(0, 800)")
|
|
await asyncio.sleep(1)
|
|
|
|
except Exception as e:
|
|
print(f"DEBUG: Error fetching suggested accounts: {e}")
|
|
|
|
await browser.close()
|
|
|
|
# Remove duplicates by username
|
|
seen = set()
|
|
unique_accounts = []
|
|
for acc in captured_accounts:
|
|
if acc["username"] not in seen:
|
|
seen.add(acc["username"])
|
|
unique_accounts.append(acc)
|
|
|
|
print(f"DEBUG: Total unique suggested accounts: {len(unique_accounts)}")
|
|
return unique_accounts[:limit]
|
|
|
|
|
|
# Singleton instance
|
|
playwright_manager = PlaywrightManager()
|