kv-tube/app.py
2026-01-10 14:35:08 +07:00

1821 lines
64 KiB
Python

from flask import (
Flask,
render_template,
request,
redirect,
url_for,
jsonify,
send_file,
Response,
stream_with_context,
session,
flash,
)
import os
import sys
import subprocess
import json
import requests
import sqlite3
from werkzeug.security import generate_password_hash, check_password_hash
import yt_dlp
from functools import wraps
import yt_dlp
from functools import wraps
import re
import heapq
import threading
import uuid
import datetime
import time
# Fix for OMP: Error #15
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
app = Flask(__name__)
app.secret_key = "super_secret_key_change_this" # Required for sessions
# Ensure data directory exists for persistence
DATA_DIR = "data"
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
DB_NAME = os.path.join(DATA_DIR, "kvtube.db")
# --- Database Setup ---
def init_db():
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
# Users Table
c.execute("""CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
)""")
# Saved/History Table
# type: 'history' or 'saved'
c.execute("""CREATE TABLE IF NOT EXISTS user_videos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
video_id TEXT,
title TEXT,
thumbnail TEXT,
type TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id)
)""")
# Cache Table for video metadata/streams
c.execute("""CREATE TABLE IF NOT EXISTS video_cache (
video_id TEXT PRIMARY KEY,
data TEXT,
expires_at DATETIME
)""")
conn.commit()
conn.close()
# Run init
init_db()
# Transcription Task Status
transcription_tasks = {}
def get_db_connection():
conn = sqlite3.connect(DB_NAME)
conn.row_factory = sqlite3.Row
return conn
# --- Auth Helpers Removed ---
# Use client-side storage for all user data
# --- Auth Routes Removed ---
@app.template_filter("format_views")
def format_views(views):
if not views:
return "0"
try:
num = int(views)
if num >= 1000000:
return f"{num / 1000000:.1f}M"
if num >= 1000:
return f"{num / 1000:.0f}K"
return f"{num:,}"
except:
return str(views)
@app.template_filter("format_date")
def format_date(value):
if not value:
return "Recently"
from datetime import datetime, timedelta
try:
# Handle YYYYMMDD
if len(str(value)) == 8 and str(value).isdigit():
dt = datetime.strptime(str(value), "%Y%m%d")
# Handle Timestamp
elif isinstance(value, (int, float)):
dt = datetime.fromtimestamp(value)
# Handle already formatted (YYYY-MM-DD)
else:
# Try common formats
try:
dt = datetime.strptime(str(value), "%Y-%m-%d")
except:
return str(value)
now = datetime.now()
diff = now - dt
if diff.days > 365:
return f"{diff.days // 365} years ago"
if diff.days > 30:
return f"{diff.days // 30} months ago"
if diff.days > 0:
return f"{diff.days} days ago"
if diff.seconds > 3600:
return f"{diff.seconds // 3600} hours ago"
return "Just now"
except:
return str(value)
# Configuration for local video path - configurable via env var
VIDEO_DIR = os.environ.get("KVTUBE_VIDEO_DIR", "./videos")
@app.route("/")
def index():
return render_template("index.html", page="home")
@app.route("/results")
def results():
query = request.args.get("search_query", "")
return render_template("index.html", page="results", query=query)
@app.route("/my-videos")
def my_videos():
# Purely client-side rendering now
return render_template("my_videos.html")
@app.route("/api/save_video", methods=["POST"])
def save_video():
# Deprecated endpoint - client-side handled
return jsonify({"success": True, "message": "Use local storage"})
def save_video():
data = request.json
video_id = data.get("id")
title = data.get("title")
thumbnail = data.get("thumbnail")
action_type = data.get("type", "history") # 'history' or 'saved'
conn = get_db_connection()
# Check if already exists to prevent duplicates (optional, strictly for 'saved')
if action_type == "saved":
exists = conn.execute(
"SELECT id FROM user_videos WHERE user_id = ? AND video_id = ? AND type = ?",
(session["user_id"], video_id, "saved"),
).fetchone()
if exists:
conn.close()
return jsonify({"status": "already_saved"})
conn.execute(
"INSERT INTO user_videos (user_id, video_id, title, thumbnail, type) VALUES (?, ?, ?, ?, ?)",
(1, video_id, title, thumbnail, action_type),
) # Default user_id 1
conn.commit()
conn.close()
return jsonify({"status": "success"})
@app.route("/api/history")
def get_history():
conn = get_db_connection()
rows = conn.execute(
'SELECT video_id as id, title, thumbnail FROM user_videos WHERE type = "history" ORDER BY timestamp DESC LIMIT 50'
).fetchall()
conn.close()
return jsonify([dict(row) for row in rows])
@app.route("/api/suggested")
def get_suggested():
# Simple recommendation based on history: search for "trending" related to the last 3 viewed channels/titles
conn = get_db_connection()
history = conn.execute(
'SELECT title FROM user_videos WHERE type = "history" ORDER BY timestamp DESC LIMIT 3'
).fetchall()
conn.close()
if not history:
return jsonify(fetch_videos("trending", limit=20))
all_suggestions = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
queries = [f"{row['title']} related" for row in history]
results = list(executor.map(lambda q: fetch_videos(q, limit=10), queries))
for res in results:
all_suggestions.extend(res)
# Remove duplicates and shuffle
unique_vids = {v["id"]: v for v in all_suggestions}.values()
import random
final_list = list(unique_vids)
random.shuffle(final_list)
return jsonify(final_list[:30])
@app.route("/stream/<path:filename>")
def stream_local(filename):
return send_from_directory(VIDEO_DIR, filename)
@app.route("/settings")
def settings():
return render_template("settings.html", page="settings")
@app.route("/downloads")
def downloads():
return render_template("downloads.html", page="downloads")
@app.route("/video_proxy")
def video_proxy():
url = request.args.get("url")
if not url:
return "No URL provided", 400
# Forward headers to mimic browser and support seeking
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
}
# Support Range requests (scrubbing)
range_header = request.headers.get("Range")
if range_header:
headers["Range"] = range_header
try:
req = requests.get(url, headers=headers, stream=True, timeout=30)
# Handle HLS (M3U8) Rewriting - CRITICAL for 1080p+ and proper sync
content_type = req.headers.get("content-type", "").lower()
# Extract URL path without query params for checking extension
url_path = url.split("?")[0]
is_manifest = (
url_path.endswith(".m3u8")
or "application/x-mpegurl" in content_type
or "application/vnd.apple.mpegurl" in content_type
)
if is_manifest:
content = req.text
base_url = url.rsplit("/", 1)[0]
new_lines = []
for line in content.splitlines():
if line.strip() and not line.startswith("#"):
# It's a segment or sub-playlist
# If relative, make absolute
if not line.startswith("http"):
full_url = f"{base_url}/{line}"
else:
full_url = line
# Proxy it - use urllib.parse.quote with safe parameter
from urllib.parse import quote
quoted_url = quote(full_url, safe="")
new_lines.append(f"/video_proxy?url={quoted_url}")
else:
new_lines.append(line)
return Response(
"\n".join(new_lines), content_type="application/vnd.apple.mpegurl"
)
# Standard Stream Proxy (Binary)
# We exclude headers that might confuse the browser/flask
excluded_headers = [
"content-encoding",
"content-length",
"transfer-encoding",
"connection",
]
response_headers = [
(name, value)
for (name, value) in req.headers.items()
if name.lower() not in excluded_headers
]
return Response(
stream_with_context(req.iter_content(chunk_size=8192)),
status=req.status_code,
headers=response_headers,
content_type=req.headers.get("content-type"),
)
except Exception as e:
print(f"Proxy Error: {e}")
return str(e), 500
@app.route("/watch")
def watch():
video_id = request.args.get("v")
local_file = request.args.get("local")
if local_file:
return render_template(
"watch.html",
video_type="local",
src=url_for("stream_local", filename=local_file),
title=local_file,
)
if not video_id:
return "No video ID provided", 400
return render_template("watch.html", video_type="youtube", video_id=video_id)
@app.route("/channel/<channel_id>")
def channel(channel_id):
if not channel_id:
return redirect(url_for("index"))
try:
# Robustness: Resolve name to ID if needed (Metadata only fetch)
real_id_or_url = channel_id
is_search_fallback = False
if not channel_id.startswith("UC") and not channel_id.startswith("@"):
# Simple resolve logic - reusing similar block from before but optimized for metadata
search_cmd = [
sys.executable,
"-m",
"yt_dlp",
f"ytsearch1:{channel_id}",
"--dump-json",
"--default-search",
"ytsearch",
"--no-playlist",
]
try:
proc_search = subprocess.run(search_cmd, capture_output=True, text=True)
if proc_search.returncode == 0:
first_result = json.loads(proc_search.stdout.splitlines()[0])
if first_result.get("channel_id"):
real_id_or_url = first_result.get("channel_id")
is_search_fallback = True
except:
pass
# Fetch basic channel info (Avatar/Banner)
# We use a very short playlist fetch just to get the channel dict
channel_info = {
"id": real_id_or_url, # Use resolved ID for API calls
"title": channel_id if not is_search_fallback else "Loading...",
"avatar": None,
"banner": None,
"subscribers": None,
}
# Determine target URL for metadata fetch
target_url = real_id_or_url
if target_url.startswith("UC"):
target_url = f"https://www.youtube.com/channel/{target_url}"
elif target_url.startswith("@"):
target_url = f"https://www.youtube.com/{target_url}"
cmd = [
sys.executable,
"-m",
"yt_dlp",
target_url,
"--dump-json",
"--flat-playlist",
"--playlist-end",
"1", # Fetch just 1 to get metadata
"--no-warnings",
]
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout, stderr = proc.communicate()
if stdout:
try:
first = json.loads(stdout.splitlines()[0])
channel_info["title"] = (
first.get("channel")
or first.get("uploader")
or channel_info["title"]
)
channel_info["id"] = first.get("channel_id") or channel_info["id"]
# Try to get avatar/banner if available in flat dump (often NOT, but title/id are key)
except:
pass
# Render shell - videos fetched via JS
return render_template("channel.html", channel=channel_info)
except Exception as e:
return f"Error loading channel: {str(e)}", 500
@app.route("/api/related")
def get_related_videos():
video_id = request.args.get("v")
title = request.args.get("title")
uploader = request.args.get("uploader", "")
page = int(request.args.get("page", 1))
limit = int(request.args.get("limit", 10))
if not title and not video_id:
return jsonify({"error": "Video ID or Title required"}), 400
try:
# Hybrid Approach: 50% Topic, 50% Channel
topic_limit = limit // 2
channel_limit = limit - topic_limit
# Calculate offsets
# We use a simplified offset approach here since strict paging on mixed results is complex
# We just advance the "playlist_start" for both queries
start = (page - 1) * (limit // 2)
topic_query = f"{title} related" if title else f"{video_id} related"
channel_query = uploader if uploader else topic_query
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future_topic = executor.submit(
fetch_videos,
topic_query,
limit=topic_limit,
playlist_start=start + 1
)
future_channel = executor.submit(
fetch_videos,
channel_query,
limit=channel_limit,
playlist_start=start + 1
)
topic_videos = future_topic.result()
channel_videos = future_channel.result()
# Combine and interleave
combined = []
import random
# Add channel videos (if any) to encorage sticking with creator
combined.extend(channel_videos)
combined.extend(topic_videos)
# Deduplicate (by ID) - keeping order roughly but ensuring uniqueness
seen = set()
if video_id: seen.add(video_id) # Don't recommend current video
unique_videos = []
for v in combined:
if v['id'] not in seen:
seen.add(v['id'])
unique_videos.append(v)
# Shuffle slightly to mix them
random.shuffle(unique_videos)
return jsonify(unique_videos)
except Exception as e:
print(f"Error fetching related: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/api/download")
def get_download_url():
"""Get a direct MP4 download URL for a video"""
video_id = request.args.get("v")
if not video_id:
return jsonify({"error": "No video ID"}), 400
try:
url = f"https://www.youtube.com/watch?v={video_id}"
# Use format that avoids HLS/DASH manifests (m3u8)
# Prefer progressive download formats
ydl_opts = {
"format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best[protocol!*=m3u8]/best",
"noplaylist": True,
"quiet": True,
"no_warnings": True,
"skip_download": True,
"youtube_include_dash_manifest": False, # Avoid DASH
"youtube_include_hls_manifest": False, # Avoid HLS
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
# Try to get URL that's NOT an m3u8
download_url = info.get("url", "")
# If still m3u8, try getting from formats directly
if ".m3u8" in download_url or not download_url:
formats = info.get("formats", [])
# Find best non-HLS format
for f in reversed(formats):
f_url = f.get("url", "")
f_ext = f.get("ext", "")
f_protocol = f.get("protocol", "")
if f_url and "m3u8" not in f_url and f_ext == "mp4":
download_url = f_url
break
title = info.get("title", "video")
if download_url and ".m3u8" not in download_url:
return jsonify({"url": download_url, "title": title, "ext": "mp4"})
else:
# Fallback: return YouTube link for manual download
return jsonify(
{
"error": "Direct download not available. Try a video downloader site.",
"fallback_url": url,
}
), 200
except Exception as e:
print(f"Download URL error: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/api/download/formats")
def get_download_formats():
"""Get available download formats for a video"""
video_id = request.args.get("v")
if not video_id:
return jsonify({"success": False, "error": "No video ID"}), 400
try:
url = f"https://www.youtube.com/watch?v={video_id}"
ydl_opts = {
"format": "best",
"noplaylist": True,
"quiet": True,
"no_warnings": True,
"skip_download": True,
"youtube_include_dash_manifest": False,
"youtube_include_hls_manifest": False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
title = info.get("title", "Unknown")
duration = info.get("duration", 0)
thumbnail = info.get("thumbnail", "")
# Collect available formats
video_formats = []
audio_formats = []
formats = info.get("formats", [])
for f in formats:
f_url = f.get("url", "")
f_ext = f.get("ext", "")
f_format_note = f.get("format_note", "")
f_format = f.get("format", "")
f_filesize = f.get("filesize", 0) or f.get("filesize_approx", 0)
# Skip HLS formats
if not f_url or "m3u8" in f_url:
continue
# Parse quality from format string
quality = f_format_note or f_format or "Unknown"
# Format size for display
size_str = ""
if f_filesize:
if f_filesize > 1024 * 1024 * 1024:
size_str = f"{f_filesize / (1024 * 1024 * 1024):.1f} GB"
elif f_filesize > 1024 * 1024:
size_str = f"{f_filesize / (1024 * 1024):.1f} MB"
elif f_filesize > 1024:
size_str = f"{f_filesize / 1024:.1f} KB"
# Categorize by type
if f_ext == "mp4" or f_ext == "webm":
# Check if it's video or audio
if (
f.get("vcodec", "none") != "none"
and f.get("acodec", "none") == "none"
):
# Video only - include detailed specs
if quality not in ["audio only", "unknown"]:
# Get resolution
width = f.get("width", 0)
height = f.get("height", 0)
resolution = f"{width}x{height}" if width and height else None
# Get codec (simplified name)
vcodec = f.get("vcodec", "")
codec_display = vcodec.split(".")[0] if vcodec else "" # e.g., "avc1" from "avc1.4d401f"
# Get fps and bitrate
fps = f.get("fps", 0)
vbr = f.get("vbr", 0) or f.get("tbr", 0) # video bitrate in kbps
video_formats.append(
{
"quality": quality,
"ext": f_ext,
"size": size_str,
"size_bytes": f_filesize,
"url": f_url,
"type": "video",
"resolution": resolution,
"width": width,
"height": height,
"fps": fps,
"vcodec": codec_display,
"bitrate": int(vbr) if vbr else None,
}
)
elif (
f.get("acodec", "none") != "none"
and f.get("vcodec", "none") == "none"
):
# Audio only - include detailed specs
acodec = f.get("acodec", "")
codec_display = acodec.split(".")[0] if acodec else ""
abr = f.get("abr", 0) or f.get("tbr", 0) # audio bitrate in kbps
asr = f.get("asr", 0) # sample rate in Hz
audio_formats.append(
{
"quality": quality,
"ext": f_ext,
"size": size_str,
"size_bytes": f_filesize,
"url": f_url,
"type": "audio",
"acodec": codec_display,
"bitrate": int(abr) if abr else None,
"sample_rate": int(asr) if asr else None,
}
)
# Sort by quality (best first)
def parse_quality(f):
q = f["quality"].lower()
if "4k" in q or "2160" in q:
return 0
elif "1080" in q:
return 1
elif "720" in q:
return 2
elif "480" in q:
return 3
elif "360" in q:
return 4
elif "240" in q:
return 5
elif "144" in q:
return 6
else:
return 99
video_formats.sort(key=parse_quality)
audio_formats.sort(key=parse_quality)
# Remove duplicates
seen = set()
unique_video = []
for f in video_formats:
if f["quality"] not in seen:
seen.add(f["quality"])
unique_video.append(f)
seen = set()
unique_audio = []
for f in audio_formats:
if f["quality"] not in seen:
seen.add(f["quality"])
unique_audio.append(f)
return jsonify(
{
"success": True,
"video_id": video_id,
"title": title,
"duration": duration,
"thumbnail": thumbnail,
"formats": {"video": unique_video, "audio": unique_audio},
}
)
except Exception as e:
print(f"Download formats error: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route("/api/channel/videos")
def get_channel_videos():
channel_id = request.args.get("id")
page = int(request.args.get("page", 1))
limit = int(request.args.get("limit", 20))
sort_mode = request.args.get("sort", "latest")
filter_type = request.args.get("filter_type", "video") # 'video' or 'shorts'
if not channel_id:
return jsonify([])
try:
# Calculate playlist range
start = (page - 1) * limit + 1
end = start + limit - 1
# Resolve channel_id if it's not a proper YouTube ID
resolved_id = channel_id
if not channel_id.startswith("UC") and not channel_id.startswith("@"):
# Try to resolve by searching
search_cmd = [
sys.executable,
"-m",
"yt_dlp",
f"ytsearch1:{channel_id}",
"--dump-json",
"--default-search",
"ytsearch",
"--no-playlist",
]
try:
proc_search = subprocess.run(
search_cmd, capture_output=True, text=True, timeout=15
)
if proc_search.returncode == 0:
first_result = json.loads(proc_search.stdout.splitlines()[0])
if first_result.get("channel_id"):
resolved_id = first_result.get("channel_id")
except:
pass
# Construct URL based on ID type AND Filter Type
if resolved_id.startswith("UC"):
base_url = f"https://www.youtube.com/channel/{resolved_id}"
elif resolved_id.startswith("@"):
base_url = f"https://www.youtube.com/{resolved_id}"
else:
base_url = f"https://www.youtube.com/channel/{resolved_id}"
target_url = base_url
if filter_type == "shorts":
target_url += "/shorts"
elif filter_type == "video":
target_url += "/videos"
playlist_args = ["--playlist-start", str(start), "--playlist-end", str(end)]
if sort_mode == "oldest":
playlist_args = [
"--playlist-reverse",
"--playlist-start",
str(start),
"--playlist-end",
str(end),
]
cmd = [
sys.executable,
"-m",
"yt_dlp",
target_url,
"--dump-json",
"--flat-playlist",
"--no-warnings",
] + playlist_args
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout, stderr = proc.communicate()
videos = []
for line in stdout.splitlines():
try:
v = json.loads(line)
dur_str = None
if v.get("duration"):
m, s = divmod(int(v["duration"]), 60)
h, m = divmod(m, 60)
dur_str = f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}"
videos.append(
{
"id": v.get("id"),
"title": v.get("title"),
"thumbnail": f"https://i.ytimg.com/vi/{v.get('id')}/mqdefault.jpg",
"view_count": v.get("view_count") or 0,
"duration": dur_str,
"upload_date": v.get("upload_date"),
"uploader": v.get("uploader")
or v.get("channel")
or v.get("uploader_id")
or "",
"channel": v.get("channel") or v.get("uploader") or "",
"channel_id": v.get("channel_id") or resolved_id,
}
)
except:
continue
return jsonify(videos)
except Exception as e:
print(f"API Error: {e}")
return jsonify([])
@app.route("/api/get_stream_info")
def get_stream_info():
video_id = request.args.get("v")
if not video_id:
return jsonify({"error": "No video ID"}), 400
try:
# 1. Check Cache
import time
conn = get_db_connection()
cached = conn.execute(
"SELECT data, expires_at FROM video_cache WHERE video_id = ?", (video_id,)
).fetchone()
current_time = time.time()
if cached:
# Check expiry (stored as unix timestamp or datetime string, we'll use timestamp for simplicity)
try:
expires_at = float(cached["expires_at"])
if current_time < expires_at:
data = json.loads(cached["data"])
conn.close()
# Re-proxy the URL just in case, or use cached if valid.
# Actually proxy url requires encoding, let's reconstruct it to be safe.
from urllib.parse import quote
proxied_url = (
f"/video_proxy?url={quote(data['original_url'], safe='')}"
)
data["stream_url"] = proxied_url
# Add cache hit header for debug
response = jsonify(data)
response.headers["X-Cache"] = "HIT"
return response
except:
pass # Invalid cache, fall through
# 2. Fetch from YouTube (Library Optimization)
url = f"https://www.youtube.com/watch?v={video_id}"
ydl_opts = {
"format": "best[ext=mp4]/best",
"noplaylist": True,
"quiet": True,
"no_warnings": True,
"skip_download": True,
"force_ipv4": True,
"socket_timeout": 10,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
except Exception as e:
print(f"❌ yt-dlp error for {video_id}: {str(e)}")
return jsonify({"error": f"Stream extraction failed: {str(e)}"}), 500
stream_url = info.get("url")
if not stream_url:
return jsonify({"error": "No stream URL found in metadata"}), 500
# Fetch Related Videos (Optimization: Client-side Lazy Load)
# We skipped fetching here to speed up video load time.
# The frontend will call /api/related using the video title.
related_videos = []
# Extract Subtitles (English preferred)
subtitle_url = None
start_lang = "en"
subs = info.get("subtitles") or {}
auto_subs = info.get("automatic_captions") or {}
# DEBUG: Print subtitle info
print(f"Checking subtitles for {video_id}")
print(f"Manual Subs keys: {list(subs.keys())}")
print(f"Auto Subs keys: {list(auto_subs.keys())}")
# Check manual subs first
if "en" in subs:
subtitle_url = subs["en"][0]["url"]
elif "vi" in subs: # Vietnamese fallback
subtitle_url = subs["vi"][0]["url"]
# Check auto subs (usually available)
elif "en" in auto_subs:
subtitle_url = auto_subs["en"][0]["url"]
elif "vi" in auto_subs:
subtitle_url = auto_subs["vi"][0]["url"]
# If still none, just pick the first one from manual then auto
if not subtitle_url:
if subs:
first_key = list(subs.keys())[0]
subtitle_url = subs[first_key][0]["url"]
elif auto_subs:
first_key = list(auto_subs.keys())[0]
subtitle_url = auto_subs[first_key][0]["url"]
print(f"Selected Subtitle URL: {subtitle_url}")
# 3. Construct Response Data
response_data = {
"original_url": stream_url,
"title": info.get("title", "Unknown Title"),
"description": info.get("description", ""),
"uploader": info.get("uploader", ""),
"uploader_id": info.get("uploader_id", ""),
"channel_id": info.get("channel_id", ""),
"upload_date": info.get("upload_date", ""),
"view_count": info.get("view_count", 0),
"related": related_videos,
"subtitle_url": subtitle_url,
}
# 4. Cache It (valid for 1 hour = 3600s)
# YouTube URLs expire in ~6 hours usually.
expiry = current_time + 3600
conn.execute(
"INSERT OR REPLACE INTO video_cache (video_id, data, expires_at) VALUES (?, ?, ?)",
(video_id, json.dumps(response_data), expiry),
)
conn.commit()
conn.close()
# 5. Return Response
from urllib.parse import quote
proxied_url = f"/video_proxy?url={quote(stream_url, safe='')}"
response_data["stream_url"] = proxied_url
response = jsonify(response_data)
response.headers["X-Cache"] = "MISS"
return response
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/search")
def search():
query = request.args.get("q")
if not query:
return jsonify({"error": "No query provided"}), 400
try:
# Check if query is a YouTube URL
import re
# Regex to catch youtube.com/watch?v=, youtu.be/, shorts/, etc.
youtube_regex = r"(https?://)?(www\.)?(youtube\.com/watch\?v=|youtu\.be/|youtube\.com/shorts/)([\w-]+)"
match = re.search(youtube_regex, query)
if match:
video_id = match.group(4)
# Fetch direct metadata
meta_cmd = [
sys.executable,
"-m",
"yt_dlp",
"--dump-json",
"--no-playlist",
f"https://www.youtube.com/watch?v={video_id}",
]
meta_proc = subprocess.run(meta_cmd, capture_output=True, text=True)
results = []
search_title = ""
if meta_proc.returncode == 0:
data = json.loads(meta_proc.stdout)
search_title = data.get("title", "")
# Format duration
duration_secs = data.get("duration")
if duration_secs:
mins, secs = divmod(int(duration_secs), 60)
hours, mins = divmod(mins, 60)
duration = (
f"{hours}:{mins:02d}:{secs:02d}"
if hours
else f"{mins}:{secs:02d}"
)
else:
duration = None
results.append(
{
"id": video_id,
"title": search_title,
"uploader": data.get("uploader")
or data.get("channel")
or "Unknown",
"thumbnail": f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
"view_count": data.get("view_count", 0),
"upload_date": data.get("upload_date", ""),
"duration": duration,
"description": data.get("description", ""),
"is_exact_match": True,
}
)
# Now fetch related/similar videos using title
if search_title:
rel_cmd = [
sys.executable,
"-m",
"yt_dlp",
f"ytsearch19:{search_title}",
"--dump-json",
"--default-search",
"ytsearch",
"--no-playlist",
"--flat-playlist",
]
rel_proc = subprocess.Popen(
rel_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout, _ = rel_proc.communicate()
for line in stdout.splitlines():
try:
r_data = json.loads(line)
r_id = r_data.get("id")
if r_id != video_id:
r_dur = r_data.get("duration")
if r_dur:
m, s = divmod(int(r_dur), 60)
h, m = divmod(m, 60)
dur_str = (
f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}"
)
else:
dur_str = None
results.append(
{
"id": r_id,
"title": r_data.get("title", "Unknown"),
"uploader": r_data.get("uploader")
or r_data.get("channel")
or "Unknown",
"thumbnail": f"https://i.ytimg.com/vi/{r_id}/hqdefault.jpg",
"view_count": r_data.get("view_count", 0),
"upload_date": r_data.get("upload_date", ""),
"duration": dur_str,
}
)
except:
continue
return jsonify(results)
else:
# Standard Text Search
cmd = [
sys.executable,
"-m",
"yt_dlp",
f"ytsearch20:{query}",
"--dump-json",
"--default-search",
"ytsearch",
"--no-playlist",
"--flat-playlist",
]
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout, stderr = process.communicate()
results = []
for line in stdout.splitlines():
try:
data = json.loads(line)
video_id = data.get("id")
if video_id:
duration_secs = data.get("duration")
if duration_secs:
mins, secs = divmod(int(duration_secs), 60)
hours, mins = divmod(mins, 60)
duration = (
f"{hours}:{mins:02d}:{secs:02d}"
if hours
else f"{mins}:{secs:02d}"
)
else:
duration = None
results.append(
{
"id": video_id,
"title": data.get("title", "Unknown"),
"uploader": data.get("uploader")
or data.get("channel")
or "Unknown",
"thumbnail": f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg",
"view_count": data.get("view_count", 0),
"upload_date": data.get("upload_date", ""),
"duration": duration,
}
)
except:
continue
return jsonify(results)
except Exception as e:
print(f"Search Error: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/api/channel")
def get_channel_videos_simple():
channel_id = request.args.get("id")
if not channel_id:
return jsonify({"error": "No channel ID provided"}), 400
try:
# Construct Channel URL
if channel_id.startswith("http"):
url = channel_id
elif channel_id.startswith("@"):
url = f"https://www.youtube.com/{channel_id}"
elif len(channel_id) == 24 and channel_id.startswith(
"UC"
): # Standard Channel ID
url = f"https://www.youtube.com/channel/{channel_id}"
else:
url = f"https://www.youtube.com/{channel_id}"
# Fetch videos (flat playlist to be fast)
cmd = [
sys.executable,
"-m",
"yt_dlp",
"--dump-json",
"--flat-playlist",
"--playlist-end",
"20",
url,
]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
return jsonify(
{"error": "Failed to fetch channel videos", "details": proc.stderr}
), 500
videos = []
for line in proc.stdout.splitlines():
try:
v = json.loads(line)
if v.get("id") and v.get("title"):
videos.append(sanitize_video_data(v))
except json.JSONDecodeError:
continue
return jsonify(videos)
except Exception as e:
print(f"Channel Fetch Error: {e}")
return jsonify({"error": str(e)}), 500
# --- Helper: Extractive Summarization ---
def extractive_summary(text, num_sentences=5):
# 1. Clean and parse text
# Remove metadata like [Music] (common in auto-caps)
clean_text = re.sub(r"\[.*?\]", "", text)
clean_text = clean_text.replace("\n", " ")
# 2. Split into sentences (simple punctuation split)
sentences = re.split(r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s", clean_text)
# 3. Tokenize and Calculate Word Frequencies
word_frequencies = {}
stop_words = set(
[
"the",
"a",
"an",
"and",
"or",
"but",
"is",
"are",
"was",
"were",
"to",
"of",
"in",
"on",
"at",
"for",
"width",
"that",
"this",
"it",
"you",
"i",
"we",
"they",
"he",
"she",
]
)
for word in re.findall(r"\w+", clean_text.lower()):
if word not in stop_words:
if word not in word_frequencies:
word_frequencies[word] = 1
else:
word_frequencies[word] += 1
if not word_frequencies:
return "Not enough content to summarize."
# Normalize frequencies
max_freq = max(word_frequencies.values())
for word in word_frequencies:
word_frequencies[word] = word_frequencies[word] / max_freq
# 4. Score Sentences
sentence_scores = {}
for sent in sentences:
for word in re.findall(r"\w+", sent.lower()):
if word in word_frequencies:
if sent not in sentence_scores:
sentence_scores[sent] = word_frequencies[word]
else:
sentence_scores[sent] += word_frequencies[word]
# 5. Extract Top N Sentences
summary_sentences = heapq.nlargest(
num_sentences, sentence_scores, key=sentence_scores.get
)
return " ".join(summary_sentences)
@app.route("/api/summarize")
def summarize_video():
video_id = request.args.get("v")
if not video_id:
return jsonify({"error": "No video ID"}), 400
try:
# Fetch Transcript
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
# Try to find english or manually created first, then auto
try:
transcript = transcript_list.find_transcript(["en", "vi"])
except:
# Fallback to whatever is available (likely auto-generated)
transcript = transcript_list.find_generated_transcript(["en", "vi"])
transcript_data = transcript.fetch()
# Combine text
full_text = " ".join([entry["text"] for entry in transcript_data])
# Summarize
summary = extractive_summary(full_text, num_sentences=7)
return jsonify({"success": True, "summary": summary})
except TranscriptsDisabled:
return jsonify(
{"success": False, "message": "Subtitles are disabled for this video."}
)
except Exception as e:
return jsonify({"success": False, "message": f"Could not summarize: {str(e)}"})
@app.route("/api/transcript")
def get_transcript():
video_id = request.args.get("v")
lang = request.args.get("lang", "en,vi")
if not video_id:
return jsonify({"success": False, "error": "No video ID provided"}), 400
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
try:
transcript = transcript_list.find_transcript(["en", "vi"])
except:
transcript = transcript_list.find_generated_transcript(["en", "vi"])
transcript_data = transcript.fetch()
full_text = " ".join([entry["text"] for entry in transcript_data])
return jsonify(
{
"success": True,
"video_id": video_id,
"transcript": transcript_data,
"language": "en",
"is_generated": True,
"full_text": full_text[:10000],
}
)
except TranscriptsDisabled:
return jsonify(
{"success": False, "error": "Subtitles are disabled for this video."}
)
except Exception as e:
return jsonify(
{"success": False, "error": f"Could not load transcript: {str(e)}"}
)
# Helper function to fetch videos (not a route)
def fetch_videos(
query, limit=20, filter_type=None, playlist_start=1, playlist_end=None
):
try:
# Source-Level Filter: Exclude Shorts for standard video requests
# REMOVED: Causing 0 results with complex queries. Rely on Python filtering.
# if filter_type == 'video':
# query = f"{query} -shorts -#shorts"
# If no end specified, default to start + limit - 1
if not playlist_end:
playlist_end = playlist_start + limit - 1
cmd = [
sys.executable,
"-m",
"yt_dlp",
f"ytsearch{playlist_end}:{query}", # Explicitly request enough items to populate the list up to 'end'
"--dump-json",
"--default-search",
"ytsearch",
"--no-playlist",
"--flat-playlist",
"--playlist-start",
str(playlist_start),
"--playlist-end",
str(playlist_end),
]
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout, stderr = process.communicate()
results = []
for line in stdout.splitlines():
try:
data = json.loads(line)
video_id = data.get("id")
if video_id:
# Format duration
duration_secs = data.get("duration")
# Filter Logic
title_lower = data.get("title", "").lower()
if filter_type == "video":
# STRICT: If duration is missing, DO NOT SKIP. Just trust the query exclusion.
# if not duration_secs:
# continue
# Exclude explicit Shorts
if "#shorts" in title_lower:
continue
# Exclude short duration (buffer to 70s to avoid vertical clutter) ONLY IF WE KNOW IT
if duration_secs and int(duration_secs) <= 70:
continue
if (
filter_type == "short"
and duration_secs
and int(duration_secs) > 60
):
continue
if duration_secs:
mins, secs = divmod(int(duration_secs), 60)
hours, mins = divmod(mins, 60)
duration = (
f"{hours}:{mins:02d}:{secs:02d}"
if hours
else f"{mins}:{secs:02d}"
)
else:
duration = None
results.append(
{
"id": video_id,
"title": data.get("title", "Unknown"),
"uploader": data.get("uploader")
or data.get("channel")
or "Unknown",
"channel_id": data.get("channel_id"),
"uploader_id": data.get("uploader_id"),
"thumbnail": f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg",
"view_count": data.get("view_count", 0),
"upload_date": data.get("upload_date", ""),
"duration": duration,
}
)
except:
continue
return results
except Exception as e:
print(f"Error fetching videos: {e}")
return []
import concurrent.futures
# Caching
import time
API_CACHE = {}
CACHE_TIMEOUT = 600 # 10 minutes
@app.route("/api/trending")
def trending():
try:
# Create cache key from arguments
category = request.args.get("category", "all")
page = int(request.args.get("page", 1))
sort = request.args.get("sort", "month")
region = request.args.get("region", "vietnam")
cache_key = f"trending_{category}_{page}_{sort}_{region}"
# Check cache
if cache_key in API_CACHE:
data, timestamp = API_CACHE[cache_key]
if time.time() - timestamp < CACHE_TIMEOUT:
print(f"[Cache] Serving {cache_key} from cache")
return jsonify(data)
else:
del API_CACHE[cache_key]
limit = 120 if category != "all" else 20 # 120 for grid, 20 for sections
def get_query(cat, reg, s_sort):
if reg == "vietnam":
queries = {
"general": "trending vietnam -shorts",
"tech": "review công nghệ điện thoại laptop",
"all": "trending vietnam -shorts",
"music": "nhạc việt trending -shorts",
"gaming": "gaming việt nam -shorts",
"movies": "phim việt nam -shorts",
"news": "tin tức việt nam hôm nay -shorts",
"sports": "thể thao việt nam -shorts",
"shorts": "trending việt nam",
"trending": "trending việt nam -shorts",
"podcasts": "podcast việt nam -shorts",
"live": "live stream việt nam -shorts",
}
else:
queries = {
"general": "trending -shorts",
"tech": "tech gadget review smartphone",
"all": "trending -shorts",
"music": "music trending -shorts",
"gaming": "gaming trending -shorts",
"movies": "movies trending -shorts",
"news": "news today -shorts",
"sports": "sports highlights -shorts",
"shorts": "trending",
"trending": "trending now -shorts",
"podcasts": "podcast trending -shorts",
"live": "live stream -shorts",
}
base = queries.get(cat, "trending")
if s_sort == "newest":
return base + ", today" # Or use explicit date filter
from datetime import datetime, timedelta
three_months_ago = (datetime.now() - timedelta(days=90)).strftime(
"%Y-%m-%d"
)
sort_filters = {
"day": ", today",
"week": ", this week",
"month": ", this month",
"3months": f" after:{three_months_ago}",
"year": ", this year",
}
return base + sort_filters.get(s_sort, f" after:{three_months_ago}")
sort = request.args.get("sort", "newest") # Ensure newest is default
# === Parallel Fetching for Home Feed ===
if category == "all":
# === 1. Suggested For You (History Based) ===
suggested_videos = []
try:
conn = get_db_connection()
# Get last 5 videos for context
history = conn.execute(
'SELECT title, video_id, type FROM user_videos WHERE type = "history" ORDER BY timestamp DESC LIMIT 5'
).fetchall()
conn.close()
if history:
# Create a composite query from history
import random
# Pick 1-2 random items from recent history to diversify
bases = random.sample(history, min(len(history), 2))
query_parts = [row["title"] for row in bases]
# Add "related" to find similar content, not exact same
suggestion_query = " ".join(query_parts) + " related"
suggested_videos = fetch_videos(
suggestion_query, limit=16, filter_type="video"
)
except Exception as e:
print(f"Suggestion Error: {e}")
# === 2. You Might Like (Discovery) ===
discovery_videos = []
try:
# curated list of interesting topics to rotate
topics = [
"amazing inventions",
"primitive technology",
"street food around the world",
"documentary 2024",
"space exploration",
"wildlife 4k",
"satisfying restoration",
"travel vlog 4k",
"tech gadgets review",
"coding tutorial",
]
import random
topic = random.choice(topics)
discovery_videos = fetch_videos(
f"{topic} best", limit=16, filter_type="video"
)
except:
pass
# === New Progressive Loading Strategy ===
feed_type = request.args.get('feed_type', 'all') # 'primary', 'secondary', or 'all'
final_sections = []
# --- Primary Feed: Discovery + Trending (Fast) ---
if feed_type in ['primary', 'all']:
# 1. Suggested (if any)
if suggested_videos:
final_sections.append({
"id": "suggested",
"title": "Suggested for You",
"icon": "sparkles",
"videos": suggested_videos[:8], # Limit to 8
})
# 2. Discovery (Random Topic) - Calculated above
if discovery_videos:
final_sections.append({
"id": "discovery",
"title": "You Might Like",
"icon": "compass",
"videos": discovery_videos[:8], # Limit to 8
})
# 3. Trending (Standard)
# Limit reduced to 8 (2 rows) for speed
trending_videos = fetch_videos(get_query("trending", region, "relevance"), limit=8, filter_type="video")
if trending_videos:
final_sections.append({
"id": "trending",
"title": "Trending Now",
"icon": "fire",
"videos": trending_videos
})
# --- Secondary Feed: Categories (Lazy) ---
if feed_type in ['secondary', 'all']:
sections_to_fetch = [
{"id": "music", "title": "Music", "icon": "music"},
{"id": "tech", "title": "Tech & AI", "icon": "microchip"},
{"id": "movies", "title": "Movies", "icon": "film"},
{"id": "gaming", "title": "Gaming", "icon": "gamepad"},
{"id": "news", "title": "News", "icon": "newspaper"},
{"id": "sports", "title": "Sports", "icon": "football-ball"},
]
def fetch_section(section):
target_sort = "newest"
q = get_query(section["id"], region, target_sort)
# Don't add timestamp to standard sections, it kills relevance
# q_fresh = f"{q} {int(time.time())}"
# Limit reduced to 8 (2 rows) for speed
vids = fetch_videos(
q, limit=8, filter_type="video", playlist_start=1
)
return {
"id": section["id"],
"title": section["title"],
"icon": section["icon"],
"videos": vids[:8] if vids else [],
}
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
standard_results = list(executor.map(fetch_section, sections_to_fetch))
final_sections.extend(standard_results)
return jsonify({"mode": "sections", "data": final_sections})
# === Standard Single Category Fetch ===
query = get_query(category, region, sort)
# Calculate offset
start = (page - 1) * limit + 1
# Determine filter type
is_shorts_req = request.args.get("shorts")
if is_shorts_req:
filter_mode = "short"
else:
filter_mode = "short" if category == "shorts" else "video"
results = fetch_videos(
query, limit=limit, filter_type=filter_mode, playlist_start=start
)
# Randomize a bit for "freshness" if it's the first page
if page == 1:
import random
random.shuffle(results)
return jsonify(results)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/update_ytdlp", methods=["POST"])
def update_ytdlp():
try:
# Run pip install -U yt-dlp
cmd = [sys.executable, "-m", "pip", "install", "-U", "yt-dlp"]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# Check new version
ver_cmd = [sys.executable, "-m", "yt_dlp", "--version"]
ver_result = subprocess.run(ver_cmd, capture_output=True, text=True)
version = ver_result.stdout.strip()
return jsonify(
{"success": True, "message": f"Updated successfully to {version}"}
)
else:
return jsonify(
{"success": False, "message": f"Update failed: {result.stderr}"}
), 500
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route("/api/comments")
def get_comments():
"""Get comments for a YouTube video"""
video_id = request.args.get("v")
if not video_id:
return jsonify({"error": "No video ID"}), 400
try:
url = f"https://www.youtube.com/watch?v={video_id}"
cmd = [
sys.executable,
"-m",
"yt_dlp",
url,
"--write-comments",
"--skip-download",
"--dump-json",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
data = json.loads(result.stdout)
comments_data = data.get("comments", [])
# Format comments for frontend
comments = []
for c in comments_data[:50]: # Limit to 50 comments
comments.append(
{
"author": c.get("author", "Unknown"),
"author_thumbnail": c.get("author_thumbnail", ""),
"text": c.get("text", ""),
"likes": c.get("like_count", 0),
"time": c.get("time_text", ""),
"is_pinned": c.get("is_pinned", False),
}
)
return jsonify(
{
"comments": comments,
"count": data.get("comment_count", len(comments)),
}
)
else:
return jsonify(
{"comments": [], "count": 0, "error": "Could not load comments"}
)
except subprocess.TimeoutExpired:
return jsonify(
{"comments": [], "count": 0, "error": "Comments loading timed out"}
)
except Exception as e:
return jsonify({"comments": [], "count": 0, "error": str(e)})
# --- AI Transcription REMOVED ---
@app.route("/api/captions.vtt")
def get_captions_vtt():
video_id = request.args.get("v")
if not video_id:
return "WEBVTT\n\n", 400, {'Content-Type': 'text/vtt'}
try:
# Fetch transcript (prefer En/Vi, fallback to generated)
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
try:
transcript = transcript_list.find_transcript(["en", "vi"])
except:
transcript = transcript_list.find_generated_transcript(["en", "vi"])
transcript_data = transcript.fetch()
# Format to WebVTT
formatter = WebVTTFormatter()
vtt_formatted = formatter.format_transcript(transcript_data)
return Response(vtt_formatted, mimetype='text/vtt')
except Exception as e:
# Return empty VTT on error to avoid player breaking
print(f"Caption Error: {e}")
return "WEBVTT\n\n", 200, {'Content-Type': 'text/vtt'}
if __name__ == "__main__":
print("Starting KV-Tube Server on port 5002 (Reloader Disabled)")
app.run(debug=True, host="0.0.0.0", port=5002, use_reloader=False)