Restore Playwright-based search and user videos for reliable crawling

This commit is contained in:
Khoa.vo 2025-12-19 20:19:59 +07:00
parent 8dbf3fc4bf
commit 732dc97756

View file

@ -112,7 +112,7 @@ async def get_user_videos(
): ):
""" """
Fetch videos from a TikTok user's profile. Fetch videos from a TikTok user's profile.
Uses TikTok's internal API for fast results. Uses Playwright to crawl the user's page for reliable results.
""" """
username = username.replace("@", "") username = username.replace("@", "")
@ -124,74 +124,9 @@ async def get_user_videos(
print(f"Fetching videos for @{username}...") print(f"Fetching videos for @{username}...")
# Build cookie header
cookie_str = "; ".join([f"{c['name']}={c['value']}" for c in cookies])
headers = {
"User-Agent": user_agent or PlaywrightManager.DEFAULT_USER_AGENT,
"Referer": f"https://www.tiktok.com/@{username}",
"Cookie": cookie_str,
"Accept": "application/json",
}
try: try:
# First get user's secUid from profile API videos = await PlaywrightManager.fetch_user_videos(username, cookies, user_agent, limit)
profile_url = f"https://www.tiktok.com/api/user/detail/?uniqueId={username}" return {"username": username, "videos": videos, "count": len(videos)}
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
profile_res = await client.get(profile_url, headers=headers)
if profile_res.status_code != 200:
print(f"Profile API returned {profile_res.status_code}")
return {"username": username, "videos": [], "count": 0}
profile_data = profile_res.json()
user_info = profile_data.get("userInfo", {}).get("user", {})
sec_uid = user_info.get("secUid", "")
if not sec_uid:
print(f"Could not get secUid for {username}")
return {"username": username, "videos": [], "count": 0}
# Fetch user's videos
videos_url = f"https://www.tiktok.com/api/post/item_list/?secUid={sec_uid}&count={limit}&cursor=0"
videos_res = await client.get(videos_url, headers=headers)
videos = []
if videos_res.status_code == 200:
try:
data = videos_res.json()
items = data.get("itemList", [])
for item in items[:limit]:
video_id = item.get("id", "")
author_info = item.get("author", {})
video_data = item.get("video", {})
play_addr = video_data.get("playAddr") or video_data.get("downloadAddr", "")
videos.append({
"id": video_id,
"url": f"https://www.tiktok.com/@{username}/video/{video_id}",
"cdn_url": play_addr,
"author": username,
"description": item.get("desc", ""),
"thumbnail": video_data.get("cover") or video_data.get("dynamicCover", ""),
"views": item.get("stats", {}).get("playCount", 0),
"likes": item.get("stats", {}).get("diggCount", 0),
})
print(f"Found {len(videos)} videos for @{username}")
except Exception as parse_error:
print(f"Error parsing videos response: {parse_error}")
else:
print(f"Videos API returned status {videos_res.status_code}")
return {"username": username, "videos": videos, "count": len(videos)}
except Exception as e: except Exception as e:
print(f"Error fetching videos for {username}: {e}") print(f"Error fetching videos for {username}: {e}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@ -204,10 +139,8 @@ async def search_videos(
): ):
""" """
Search for videos by keyword or hashtag. Search for videos by keyword or hashtag.
Uses TikTok's video search API. Uses Playwright to crawl TikTok search results for reliable data.
""" """
from urllib.parse import quote
# Load stored credentials # Load stored credentials
cookies, user_agent = PlaywrightManager.load_stored_credentials() cookies, user_agent = PlaywrightManager.load_stored_credentials()
@ -216,70 +149,10 @@ async def search_videos(
print(f"Searching for: {query}...") print(f"Searching for: {query}...")
# Build cookie header
cookie_str = "; ".join([f"{c['name']}={c['value']}" for c in cookies])
headers = {
"User-Agent": user_agent or PlaywrightManager.DEFAULT_USER_AGENT,
"Referer": f"https://www.tiktok.com/search?q={quote(query)}",
"Cookie": cookie_str,
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9",
}
try: try:
# TikTok video search API - simpler endpoint videos = await PlaywrightManager.search_videos(query, cookies, user_agent, limit)
search_url = f"https://www.tiktok.com/api/search/item/full/?keyword={quote(query)}&offset=0&count={limit}" return {"query": query, "videos": videos, "count": len(videos)}
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
response = await client.get(search_url, headers=headers)
videos = []
print(f"Search API status: {response.status_code}")
if response.status_code == 200:
try:
data = response.json()
items = data.get("item_list", []) or data.get("itemList", []) or data.get("data", [])
print(f"Found {len(items)} raw items")
for item in items[:limit]:
# Handle different response formats
if isinstance(item, dict):
video_id = item.get("id", "")
author_info = item.get("author", {})
video_data = item.get("video", {})
# Get playable URL
play_addr = video_data.get("playAddr") or video_data.get("downloadAddr", "")
author_name = author_info.get("uniqueId") or author_info.get("unique_id", "unknown")
videos.append({
"id": video_id,
"url": f"https://www.tiktok.com/@{author_name}/video/{video_id}",
"cdn_url": play_addr,
"author": author_name,
"description": item.get("desc", ""),
"thumbnail": video_data.get("cover") or video_data.get("dynamicCover") or video_data.get("originCover", ""),
"views": item.get("stats", {}).get("playCount", 0),
"likes": item.get("stats", {}).get("diggCount", 0),
})
print(f"Processed {len(videos)} videos for '{query}'")
except Exception as parse_error:
print(f"Error parsing search response: {parse_error}")
# Try to print raw response for debugging
print(f"Raw response: {response.text[:500] if response.text else 'empty'}")
else:
print(f"Search API returned status {response.status_code}")
print(f"Response: {response.text[:300] if response.text else 'empty'}")
return {"query": query, "videos": videos, "count": len(videos)}
except Exception as e: except Exception as e:
print(f"Error searching for {query}: {e}") print(f"Error searching for {query}: {e}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))