From 76cb8cc785d5b105c5dff4e7959d8d2ad241ca5e Mon Sep 17 00:00:00 2001 From: "Khoa.vo" Date: Fri, 19 Dec 2025 19:54:51 +0700 Subject: [PATCH] Replace Playwright search/user videos with fast HTTP API calls --- backend/api/routes/user.py | 136 +++++++++++++++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 6 deletions(-) diff --git a/backend/api/routes/user.py b/backend/api/routes/user.py index 6a57783..d576cd8 100644 --- a/backend/api/routes/user.py +++ b/backend/api/routes/user.py @@ -112,7 +112,7 @@ async def get_user_videos( ): """ Fetch videos from a TikTok user's profile. - Uses Playwright to intercept the user's video list API. + Uses TikTok's internal API for fast results. """ username = username.replace("@", "") @@ -124,9 +124,74 @@ async def get_user_videos( print(f"Fetching videos for @{username}...") + # Build cookie header + cookie_str = "; ".join([f"{c['name']}={c['value']}" for c in cookies]) + + headers = { + "User-Agent": user_agent or PlaywrightManager.DEFAULT_USER_AGENT, + "Referer": f"https://www.tiktok.com/@{username}", + "Cookie": cookie_str, + "Accept": "application/json", + } + try: - videos = await PlaywrightManager.fetch_user_videos(username, cookies, user_agent, limit) - return {"username": username, "videos": videos, "count": len(videos)} + # First get user's secUid from profile API + profile_url = f"https://www.tiktok.com/api/user/detail/?uniqueId={username}" + + async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: + profile_res = await client.get(profile_url, headers=headers) + + if profile_res.status_code != 200: + print(f"Profile API returned {profile_res.status_code}") + return {"username": username, "videos": [], "count": 0} + + profile_data = profile_res.json() + user_info = profile_data.get("userInfo", {}).get("user", {}) + sec_uid = user_info.get("secUid", "") + + if not sec_uid: + print(f"Could not get secUid for {username}") + return {"username": username, "videos": [], "count": 0} + + # Fetch user's videos + videos_url = f"https://www.tiktok.com/api/post/item_list/?secUid={sec_uid}&count={limit}&cursor=0" + + videos_res = await client.get(videos_url, headers=headers) + + videos = [] + + if videos_res.status_code == 200: + try: + data = videos_res.json() + items = data.get("itemList", []) + + for item in items[:limit]: + video_id = item.get("id", "") + author_info = item.get("author", {}) + video_data = item.get("video", {}) + + play_addr = video_data.get("playAddr") or video_data.get("downloadAddr", "") + + videos.append({ + "id": video_id, + "url": f"https://www.tiktok.com/@{username}/video/{video_id}", + "cdn_url": play_addr, + "author": username, + "description": item.get("desc", ""), + "thumbnail": video_data.get("cover") or video_data.get("dynamicCover", ""), + "views": item.get("stats", {}).get("playCount", 0), + "likes": item.get("stats", {}).get("diggCount", 0), + }) + + print(f"Found {len(videos)} videos for @{username}") + + except Exception as parse_error: + print(f"Error parsing videos response: {parse_error}") + else: + print(f"Videos API returned status {videos_res.status_code}") + + return {"username": username, "videos": videos, "count": len(videos)} + except Exception as e: print(f"Error fetching videos for {username}: {e}") raise HTTPException(status_code=500, detail=str(e)) @@ -139,8 +204,10 @@ async def search_videos( ): """ Search for videos by keyword or hashtag. - Uses Playwright to intercept TikTok search results. + Uses TikTok's internal search API for fast, reliable results. """ + from urllib.parse import quote + # Load stored credentials cookies, user_agent = PlaywrightManager.load_stored_credentials() @@ -149,9 +216,66 @@ async def search_videos( print(f"Searching for: {query}...") + # Build cookie header + cookie_str = "; ".join([f"{c['name']}={c['value']}" for c in cookies]) + + headers = { + "User-Agent": user_agent or PlaywrightManager.DEFAULT_USER_AGENT, + "Referer": "https://www.tiktok.com/", + "Cookie": cookie_str, + "Accept": "application/json", + } + try: - videos = await PlaywrightManager.search_videos(query, cookies, user_agent, limit) - return {"query": query, "videos": videos, "count": len(videos)} + # TikTok search API endpoint + search_url = f"https://www.tiktok.com/api/search/general/full/?keyword={quote(query)}&offset=0&search_id=&from_page=search&web_search_code=%7B%22tiktok%22%3A%7B%22client_params_x%22%3A%7B%22search_engine%22%3A%7B%22ies_mt_user_live_video_card_use_b%22%3A1%2C%22mt_search_general_user_live_card%22%3A1%7D%7D%2C%22search_server%22%3A%7B%7D%7D%7D" + + async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: + response = await client.get(search_url, headers=headers) + + videos = [] + + if response.status_code == 200: + try: + data = response.json() + items = data.get("data", []) + + for item in items[:limit]: + # Extract video data from search result + item_type = item.get("type") + + # Type 1 = video + if item_type == 1: + video_item = item.get("item", {}) + if video_item: + video_id = video_item.get("id", "") + author_info = video_item.get("author", {}) + video_data = video_item.get("video", {}) + + # Get playable URL + play_addr = video_data.get("playAddr") or video_data.get("downloadAddr", "") + + videos.append({ + "id": video_id, + "url": f"https://www.tiktok.com/@{author_info.get('uniqueId', 'user')}/video/{video_id}", + "cdn_url": play_addr, + "author": author_info.get("uniqueId", "unknown"), + "description": video_item.get("desc", ""), + "thumbnail": video_data.get("cover") or video_data.get("dynamicCover", ""), + "views": video_item.get("stats", {}).get("playCount", 0), + "likes": video_item.get("stats", {}).get("diggCount", 0), + }) + + print(f"Found {len(videos)} videos for '{query}'") + + except Exception as parse_error: + print(f"Error parsing search response: {parse_error}") + else: + print(f"Search API returned status {response.status_code}") + + return {"query": query, "videos": videos, "count": len(videos)} + except Exception as e: print(f"Error searching for {query}: {e}") raise HTTPException(status_code=500, detail=str(e)) +