Fix search API and remove duplicate code

This commit is contained in:
Khoa.vo 2025-12-19 20:01:21 +07:00
parent 76cb8cc785
commit 6c6e5880f6

View file

@ -204,7 +204,7 @@ async def search_videos(
): ):
""" """
Search for videos by keyword or hashtag. Search for videos by keyword or hashtag.
Uses TikTok's internal search API for fast, reliable results. Uses TikTok's video search API.
""" """
from urllib.parse import quote from urllib.parse import quote
@ -221,61 +221,65 @@ async def search_videos(
headers = { headers = {
"User-Agent": user_agent or PlaywrightManager.DEFAULT_USER_AGENT, "User-Agent": user_agent or PlaywrightManager.DEFAULT_USER_AGENT,
"Referer": "https://www.tiktok.com/", "Referer": f"https://www.tiktok.com/search?q={quote(query)}",
"Cookie": cookie_str, "Cookie": cookie_str,
"Accept": "application/json", "Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9",
} }
try: try:
# TikTok search API endpoint # TikTok video search API - simpler endpoint
search_url = f"https://www.tiktok.com/api/search/general/full/?keyword={quote(query)}&offset=0&search_id=&from_page=search&web_search_code=%7B%22tiktok%22%3A%7B%22client_params_x%22%3A%7B%22search_engine%22%3A%7B%22ies_mt_user_live_video_card_use_b%22%3A1%2C%22mt_search_general_user_live_card%22%3A1%7D%7D%2C%22search_server%22%3A%7B%7D%7D%7D" search_url = f"https://www.tiktok.com/api/search/item/full/?keyword={quote(query)}&offset=0&count={limit}"
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
response = await client.get(search_url, headers=headers) response = await client.get(search_url, headers=headers)
videos = [] videos = []
print(f"Search API status: {response.status_code}")
if response.status_code == 200: if response.status_code == 200:
try: try:
data = response.json() data = response.json()
items = data.get("data", []) items = data.get("item_list", []) or data.get("itemList", []) or data.get("data", [])
print(f"Found {len(items)} raw items")
for item in items[:limit]: for item in items[:limit]:
# Extract video data from search result # Handle different response formats
item_type = item.get("type") if isinstance(item, dict):
video_id = item.get("id", "")
author_info = item.get("author", {})
video_data = item.get("video", {})
# Type 1 = video # Get playable URL
if item_type == 1: play_addr = video_data.get("playAddr") or video_data.get("downloadAddr", "")
video_item = item.get("item", {})
if video_item:
video_id = video_item.get("id", "")
author_info = video_item.get("author", {})
video_data = video_item.get("video", {})
# Get playable URL author_name = author_info.get("uniqueId") or author_info.get("unique_id", "unknown")
play_addr = video_data.get("playAddr") or video_data.get("downloadAddr", "")
videos.append({ videos.append({
"id": video_id, "id": video_id,
"url": f"https://www.tiktok.com/@{author_info.get('uniqueId', 'user')}/video/{video_id}", "url": f"https://www.tiktok.com/@{author_name}/video/{video_id}",
"cdn_url": play_addr, "cdn_url": play_addr,
"author": author_info.get("uniqueId", "unknown"), "author": author_name,
"description": video_item.get("desc", ""), "description": item.get("desc", ""),
"thumbnail": video_data.get("cover") or video_data.get("dynamicCover", ""), "thumbnail": video_data.get("cover") or video_data.get("dynamicCover") or video_data.get("originCover", ""),
"views": video_item.get("stats", {}).get("playCount", 0), "views": item.get("stats", {}).get("playCount", 0),
"likes": video_item.get("stats", {}).get("diggCount", 0), "likes": item.get("stats", {}).get("diggCount", 0),
}) })
print(f"Found {len(videos)} videos for '{query}'") print(f"Processed {len(videos)} videos for '{query}'")
except Exception as parse_error: except Exception as parse_error:
print(f"Error parsing search response: {parse_error}") print(f"Error parsing search response: {parse_error}")
# Try to print raw response for debugging
print(f"Raw response: {response.text[:500] if response.text else 'empty'}")
else: else:
print(f"Search API returned status {response.status_code}") print(f"Search API returned status {response.status_code}")
print(f"Response: {response.text[:300] if response.text else 'empty'}")
return {"query": query, "videos": videos, "count": len(videos)} return {"query": query, "videos": videos, "count": len(videos)}
except Exception as e: except Exception as e:
print(f"Error searching for {query}: {e}") print(f"Error searching for {query}: {e}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))