""" KV-Tube API Blueprint All JSON API endpoints for the frontend """ from flask import Blueprint, request, jsonify, Response import os import sys import subprocess import json import sqlite3 import re import heapq import logging import time import random import concurrent.futures import yt_dlp import tempfile import threading logger = logging.getLogger(__name__) api_bp = Blueprint('api', __name__, url_prefix='/api') # Database path DATA_DIR = os.environ.get("KVTUBE_DATA_DIR", "data") DB_NAME = os.path.join(DATA_DIR, "kvtube.db") # Caching API_CACHE = {} CACHE_TIMEOUT = 600 # 10 minutes # AI Models WHISPER_MODEL = None WHISPER_LOCK = threading.Lock() def get_db_connection(): """Get database connection with row factory.""" conn = sqlite3.connect(DB_NAME) conn.row_factory = sqlite3.Row return conn # --- Helper Functions --- def extractive_summary(text, num_sentences=5): """Extract key sentences from text using word frequency.""" # Clean text clean_text = re.sub(r"\[.*?\]", "", text) clean_text = clean_text.replace("\n", " ") # Split into sentences sentences = re.split(r"(? 1024**3: size_str = f"{f_filesize / 1024**3:.1f} GB" elif f_filesize > 1024**2: size_str = f"{f_filesize / 1024**2:.1f} MB" elif f_filesize > 1024: size_str = f"{f_filesize / 1024:.1f} KB" if f_ext in ["mp4", "webm"]: vcodec = f.get("vcodec", "none") acodec = f.get("acodec", "none") if vcodec != "none" and acodec != "none": video_formats.append({ "quality": f"{quality} (with audio)", "ext": f_ext, "size": size_str, "url": f_url, "type": "combined", "has_audio": True, }) elif vcodec != "none": video_formats.append({ "quality": quality, "ext": f_ext, "size": size_str, "url": f_url, "type": "video", "has_audio": False, }) elif acodec != "none": audio_formats.append({ "quality": quality, "ext": f_ext, "size": size_str, "url": f_url, "type": "audio", }) def parse_quality(f): q = f["quality"].lower() for i, res in enumerate(["4k", "2160", "1080", "720", "480", "360", "240", "144"]): if res in q: return i return 99 video_formats.sort(key=parse_quality) audio_formats.sort(key=parse_quality) return jsonify({ "success": True, "video_id": video_id, "title": title, "duration": duration, "thumbnail": thumbnail, "formats": {"video": video_formats[:10], "audio": audio_formats[:5]}, }) except Exception as e: logger.error(f"Download formats error: {e}") return jsonify({"success": False, "error": str(e)}), 500 @api_bp.route("/get_stream_info") def get_stream_info(): """Get video stream info with caching.""" video_id = request.args.get("v") if not video_id: return jsonify({"error": "No video ID"}), 400 try: conn = get_db_connection() cached = conn.execute( "SELECT data, expires_at FROM video_cache WHERE video_id = ?", (video_id,) ).fetchone() current_time = time.time() if cached: try: expires_at = float(cached["expires_at"]) if current_time < expires_at: data = json.loads(cached["data"]) conn.close() from urllib.parse import quote proxied_url = f"/video_proxy?url={quote(data['original_url'], safe='')}" data["stream_url"] = proxied_url response = jsonify(data) response.headers["X-Cache"] = "HIT" return response except (ValueError, KeyError): pass url = f"https://www.youtube.com/watch?v={video_id}" ydl_opts = { "format": "best[ext=mp4]/best", "noplaylist": True, "quiet": True, "skip_download": True, "socket_timeout": 10, "force_ipv4": True, "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36", } with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = ydl.extract_info(url, download=False) except Exception as e: logger.warning(f"yt-dlp error for {video_id}: {str(e)}") return jsonify({"error": f"Stream extraction failed: {str(e)}"}), 500 stream_url = info.get("url") if not stream_url: return jsonify({"error": "No stream URL found"}), 500 # Log the headers yt-dlp expects us to use expected_headers = info.get("http_headers", {}) logger.info(f"YT-DLP Expected Headers: {expected_headers}") # Extract subtitles subtitle_url = None subs = info.get("subtitles") or {} auto_subs = info.get("automatic_captions") or {} for lang in ["en", "vi"]: if lang in subs and subs[lang]: subtitle_url = subs[lang][0]["url"] break if lang in auto_subs and auto_subs[lang]: subtitle_url = auto_subs[lang][0]["url"] break # Extract best audio-only URL for AI transcription audio_url = None try: formats = info.get("formats", []) # Debug: Log format details to understand why we aren't matching # logger.info(f"Scanning {len(formats)} formats for audio-only...") audio_formats = [] for f in formats: vcodec = f.get("vcodec") acodec = f.get("acodec") # Check for audio-only: vcodec should be none/None, acodec should be something if (vcodec == "none" or vcodec is None) and (acodec != "none" and acodec is not None): audio_formats.append(f) if audio_formats: # Prefer m4a (itag 140) for best compatibility, or webm (251) # Sort by filesize (smaller is faster for whisper) or bitrate? # For now simply pick the first one that looks like m4a, else first available chosen_audio = next((f for f in audio_formats if f.get("ext") == "m4a"), audio_formats[0]) audio_url = chosen_audio.get("url") logger.info(f"Found audio-only URL: {audio_url[:30]}...") else: logger.warning("No audio-only formats found in valid stream info.") except Exception as e: logger.error(f"Failed to extract audio url: {e}") response_data = { "original_url": stream_url, "title": info.get("title", "Unknown"), "description": info.get("description", ""), "uploader": info.get("uploader", ""), "uploader_id": info.get("uploader_id", ""), "channel_id": info.get("channel_id", ""), "upload_date": info.get("upload_date", ""), "view_count": info.get("view_count", 0), "related": [], "subtitle_url": subtitle_url, "audio_url": None # Placeholder, filled below } from urllib.parse import quote proxied_url = f"/video_proxy?url={quote(stream_url, safe='')}" response_data["stream_url"] = proxied_url if audio_url: response_data["audio_url"] = f"/video_proxy?url={quote(audio_url, safe='')}" # Cache it expiry = current_time + 3600 conn.execute( "INSERT OR REPLACE INTO video_cache (video_id, data, expires_at) VALUES (?, ?, ?)", (video_id, json.dumps(response_data), expiry), ) conn.commit() conn.close() response = jsonify(response_data) response.headers["X-Cache"] = "MISS" return response except Exception as e: return jsonify({"error": str(e)}), 500 @api_bp.route("/search") def search(): """Search for videos.""" query = request.args.get("q") if not query: return jsonify({"error": "No query provided"}), 400 try: # Check if URL url_match = re.match(r"(?:https?://)?(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]{11})", query) if url_match: video_id = url_match.group(1) # Fetch single video info ydl_opts = { "quiet": True, "no_warnings": True, "noplaylist": True, "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36", } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(f"https://www.youtube.com/watch?v={video_id}", download=False) return jsonify([{ "id": video_id, "title": info.get("title", "Unknown"), "uploader": info.get("uploader", "Unknown"), "thumbnail": f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg", "view_count": info.get("view_count", 0), "upload_date": info.get("upload_date", ""), "duration": None, }]) # Standard search results = fetch_videos(query, limit=20, filter_type="video") return jsonify(results) except Exception as e: logger.error(f"Search Error: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route("/channel") def get_channel_videos_simple(): """Get videos from a channel.""" channel_id = request.args.get("id") filter_type = request.args.get("filter_type", "video") if not channel_id: return jsonify({"error": "No channel ID provided"}), 400 try: # Construct URL suffix = "shorts" if filter_type == "shorts" else "videos" if channel_id.startswith("UC"): url = f"https://www.youtube.com/channel/{channel_id}/{suffix}" elif channel_id.startswith("@"): url = f"https://www.youtube.com/{channel_id}/{suffix}" else: url = f"https://www.youtube.com/channel/{channel_id}/{suffix}" cmd = [ sys.executable, "-m", "yt_dlp", url, "--dump-json", "--flat-playlist", "--playlist-end", "20", "--no-warnings", ] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, stderr = proc.communicate() videos = [] for line in stdout.splitlines(): try: v = json.loads(line) dur_str = None if v.get("duration"): m, s = divmod(int(v["duration"]), 60) h, m = divmod(m, 60) dur_str = f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}" videos.append({ "id": v.get("id"), "title": v.get("title"), "thumbnail": f"https://i.ytimg.com/vi/{v.get('id')}/mqdefault.jpg", "view_count": v.get("view_count") or 0, "duration": dur_str, "upload_date": v.get("upload_date"), "uploader": v.get("uploader") or v.get("channel") or "", }) except json.JSONDecodeError: continue return jsonify(videos) except Exception as e: logger.error(f"Channel Fetch Error: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route("/trending") def trending(): """Get trending videos.""" from flask import current_app category = request.args.get("category", "all") page = int(request.args.get("page", 1)) sort = request.args.get("sort", "newest") region = request.args.get("region", "vietnam") cache_key = f"trending_{category}_{page}_{sort}_{region}" # Check cache if cache_key in API_CACHE: cached_time, cached_data = API_CACHE[cache_key] if time.time() - cached_time < CACHE_TIMEOUT: return jsonify(cached_data) try: # Category search queries queries = { "all": "trending videos 2024", "music": "music trending", "gaming": "gaming trending", "news": "news today", "tech": "technology reviews 2024", "movies": "movie trailers 2024", "sports": "sports highlights", } # For 'all' category, always fetch from multiple categories for diverse content if category == "all": region_suffix = " vietnam" if region == "vietnam" else "" # Rotate through different queries based on page for variety query_sets = [ [f"trending videos 2024{region_suffix}", f"music trending{region_suffix}", f"tech reviews 2024{region_suffix}"], [f"movie trailers 2024{region_suffix}", f"gaming trending{region_suffix}", f"sports highlights{region_suffix}"], [f"trending music 2024{region_suffix}", f"viral videos{region_suffix}", f"entertainment news{region_suffix}"], [f"tech gadgets{region_suffix}", f"comedy videos{region_suffix}", f"documentary{region_suffix}"], ] # Use different query set based on page to get variety query_index = (page - 1) % len(query_sets) current_queries = query_sets[query_index] # Calculate offset within query set start_offset = ((page - 1) // len(query_sets)) * 7 + 1 # Fetch from multiple categories in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [ executor.submit(fetch_videos, q, limit=7, filter_type="video", playlist_start=start_offset) for q in current_queries ] results = [f.result() for f in futures] # Combine all videos and deduplicate all_videos = [] seen_ids = set() for video_list in results: for vid in video_list: if vid['id'] not in seen_ids: seen_ids.add(vid['id']) all_videos.append(vid) # Shuffle for variety random.shuffle(all_videos) # Cache result API_CACHE[cache_key] = (time.time(), all_videos) return jsonify(all_videos) # Single category - support proper pagination query = queries.get(category, queries["all"]) if region == "vietnam": query += " vietnam" videos = fetch_videos(query, limit=20, filter_type="video", playlist_start=(page-1)*20+1) # Cache result API_CACHE[cache_key] = (time.time(), videos) return jsonify(videos) except Exception as e: return jsonify({"error": str(e)}), 500 @api_bp.route("/summarize") def summarize_video(): """Get video summary from transcript.""" video_id = request.args.get("v") if not video_id: return jsonify({"error": "No video ID"}), 400 try: from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api._errors import TranscriptsDisabled transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) try: transcript = transcript_list.find_transcript(["en", "vi"]) except Exception: transcript = transcript_list.find_generated_transcript(["en", "vi"]) transcript_data = transcript.fetch() full_text = " ".join([entry["text"] for entry in transcript_data]) summary = extractive_summary(full_text, num_sentences=7) return jsonify({"success": True, "summary": summary}) except Exception as e: return jsonify({"success": False, "message": f"Could not summarize: {str(e)}"}) @api_bp.route("/transcript") def get_transcript(): """Get video transcript.""" video_id = request.args.get("v") if not video_id: return jsonify({"success": False, "error": "No video ID provided"}), 400 try: from youtube_transcript_api import YouTubeTranscriptApi transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) try: transcript = transcript_list.find_transcript(["en", "vi"]) except Exception: transcript = transcript_list.find_generated_transcript(["en", "vi"]) transcript_data = transcript.fetch() full_text = " ".join([entry["text"] for entry in transcript_data]) return jsonify({ "success": True, "video_id": video_id, "transcript": transcript_data, "language": "en", "is_generated": True, "full_text": full_text[:10000], }) except Exception as e: return jsonify({"success": False, "error": f"Could not load transcript: {str(e)}"}) @api_bp.route("/generate_subtitles", methods=["POST"]) def generate_subtitles(): """Generate subtitles using server-side Whisper.""" global WHISPER_MODEL data = request.get_json() video_id = data.get("video_id") if not video_id: return jsonify({"error": "No video ID provided"}), 400 temp_path = None try: # Lazy load model with WHISPER_LOCK: if WHISPER_MODEL is None: import whisper logger.info("Loading Whisper model (tiny)...") WHISPER_MODEL = whisper.load_model("tiny") # Extract Audio URL url = f"https://www.youtube.com/watch?v={video_id}" ydl_opts = { "format": "bestaudio[ext=m4a]/bestaudio/best", "noplaylist": True, "quiet": True, "force_ipv4": True, } audio_url = None with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) audio_url = info.get("url") if not audio_url: return jsonify({"error": "Could not extract audio URL"}), 500 # Download audio to temp file import requests headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36", } logger.info(f"Downloading audio for transcription: {audio_url[:30]}...") with requests.get(audio_url, headers=headers, stream=True) as r: r.raise_for_status() with tempfile.NamedTemporaryFile(suffix=".m4a", delete=False) as f: temp_path = f.name for chunk in r.iter_content(chunk_size=8192): f.write(chunk) # Transcribe logger.info("Transcribing...") result = WHISPER_MODEL.transcribe(temp_path) # Convert to VTT def format_timestamp(seconds): hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = seconds % 60 return f"{hours:02d}:{minutes:02d}:{seconds:06.3f}" vtt_output = "WEBVTT\n\n" for segment in result["segments"]: start = format_timestamp(segment["start"]) end = format_timestamp(segment["end"]) text = segment["text"].strip() vtt_output += f"{start} --> {end}\n{text}\n\n" return jsonify({"success": True, "vtt": vtt_output}) except Exception as e: logger.error(f"Subtitle generation failed: {e}") return jsonify({"error": str(e)}), 500 finally: if temp_path and os.path.exists(temp_path): os.remove(temp_path) @api_bp.route("/update_ytdlp", methods=["POST"]) def update_ytdlp(): """Update yt-dlp to latest version.""" try: cmd = [sys.executable, "-m", "pip", "install", "-U", "yt-dlp"] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: ver_cmd = [sys.executable, "-m", "yt_dlp", "--version"] ver_result = subprocess.run(ver_cmd, capture_output=True, text=True) version = ver_result.stdout.strip() return jsonify({"success": True, "message": f"Updated to {version}"}) else: return jsonify({"success": False, "message": f"Update failed: {result.stderr}"}), 500 except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 @api_bp.route("/comments") def get_comments(): """Get comments for a video.""" video_id = request.args.get("v") if not video_id: return jsonify({"error": "No video ID"}), 400 try: url = f"https://www.youtube.com/watch?v={video_id}" cmd = [ sys.executable, "-m", "yt_dlp", url, "--write-comments", "--skip-download", "--dump-json", ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0: data = json.loads(result.stdout) comments_data = data.get("comments", []) comments = [] for c in comments_data[:50]: comments.append({ "author": c.get("author", "Unknown"), "author_thumbnail": c.get("author_thumbnail", ""), "text": c.get("text", ""), "likes": c.get("like_count", 0), "time": c.get("time_text", ""), "is_pinned": c.get("is_pinned", False), }) return jsonify({"comments": comments, "count": data.get("comment_count", len(comments))}) else: return jsonify({"comments": [], "count": 0, "error": "Could not load comments"}) except subprocess.TimeoutExpired: return jsonify({"comments": [], "count": 0, "error": "Comments loading timed out"}) except Exception as e: return jsonify({"comments": [], "count": 0, "error": str(e)}) @api_bp.route("/captions.vtt") def get_captions_vtt(): """Get captions in WebVTT format.""" video_id = request.args.get("v") if not video_id: return "WEBVTT\n\n", 400, {'Content-Type': 'text/vtt'} try: from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api.formatters import WebVTTFormatter transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) try: transcript = transcript_list.find_transcript(["en", "vi"]) except Exception: transcript = transcript_list.find_generated_transcript(["en", "vi"]) transcript_data = transcript.fetch() formatter = WebVTTFormatter() vtt_formatted = formatter.format_transcript(transcript_data) return Response(vtt_formatted, mimetype='text/vtt') except Exception as e: logger.warning(f"Caption Error: {e}") return "WEBVTT\n\n", 200, {'Content-Type': 'text/vtt'}