""" YouTube Service Module Handles all yt-dlp interactions using the library directly (not subprocess) """ import yt_dlp import logging from typing import Optional, List, Dict, Any from config import Config logger = logging.getLogger(__name__) class YouTubeService: """Service for fetching YouTube content using yt-dlp library""" # Common yt-dlp options BASE_OPTS = { 'quiet': True, 'no_warnings': True, 'extract_flat': 'in_playlist', 'force_ipv4': True, 'socket_timeout': Config.YTDLP_TIMEOUT, } @staticmethod def sanitize_video_data(data: Dict[str, Any]) -> Dict[str, Any]: """Sanitize and format video data from yt-dlp""" video_id = data.get('id', '') duration_secs = data.get('duration') # Format duration duration_str = None if duration_secs: mins, secs = divmod(int(duration_secs), 60) hours, mins = divmod(mins, 60) duration_str = f"{hours}:{mins:02d}:{secs:02d}" if hours else f"{mins}:{secs:02d}" return { 'id': video_id, 'title': data.get('title', 'Unknown'), 'uploader': data.get('uploader') or data.get('channel') or 'Unknown', 'channel_id': data.get('channel_id'), 'uploader_id': data.get('uploader_id'), 'thumbnail': f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg" if video_id else None, 'view_count': data.get('view_count', 0), 'upload_date': data.get('upload_date', ''), 'duration': duration_str, 'description': data.get('description', ''), } @classmethod def search_videos(cls, query: str, limit: int = 20, filter_type: Optional[str] = None) -> List[Dict[str, Any]]: """ Search for videos using yt-dlp library directly Args: query: Search query limit: Maximum number of results filter_type: 'video' to exclude shorts, 'short' for only shorts Returns: List of sanitized video data dictionaries """ try: search_url = f"ytsearch{limit}:{query}" ydl_opts = { **cls.BASE_OPTS, 'extract_flat': True, 'playlist_items': f'1:{limit}', } results = [] with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(search_url, download=False) entries = info.get('entries', []) if info else [] for entry in entries: if not entry or not entry.get('id'): continue # Filter logic title_lower = (entry.get('title') or '').lower() duration_secs = entry.get('duration') if filter_type == 'video': # Exclude shorts if '#shorts' in title_lower: continue if duration_secs and int(duration_secs) <= 70: continue elif filter_type == 'short': # Only shorts if duration_secs and int(duration_secs) > 60: continue results.append(cls.sanitize_video_data(entry)) return results except Exception as e: logger.error(f"Search error for '{query}': {e}") return [] @classmethod def get_video_info(cls, video_id: str) -> Optional[Dict[str, Any]]: """ Get detailed video information including stream URL Args: video_id: YouTube video ID Returns: Video info dict with stream_url, or None on error """ try: url = f"https://www.youtube.com/watch?v={video_id}" ydl_opts = { **cls.BASE_OPTS, 'format': Config.YTDLP_FORMAT, 'noplaylist': True, 'skip_download': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) if not info: return None stream_url = info.get('url') if not stream_url: logger.warning(f"No stream URL found for {video_id}") return None # Get subtitles subtitle_url = cls._extract_subtitle_url(info) return { 'stream_url': stream_url, 'title': info.get('title', 'Unknown'), 'description': info.get('description', ''), 'uploader': info.get('uploader', ''), 'uploader_id': info.get('uploader_id', ''), 'channel_id': info.get('channel_id', ''), 'upload_date': info.get('upload_date', ''), 'view_count': info.get('view_count', 0), 'subtitle_url': subtitle_url, 'duration': info.get('duration'), } except Exception as e: logger.error(f"Error getting video info for {video_id}: {e}") return None @staticmethod def _extract_subtitle_url(info: Dict[str, Any]) -> Optional[str]: """Extract best subtitle URL from video info""" subs = info.get('subtitles') or {} auto_subs = info.get('automatic_captions') or {} # Priority: en manual > vi manual > en auto > vi auto > first available for lang in ['en', 'vi']: if lang in subs and subs[lang]: return subs[lang][0].get('url') for lang in ['en', 'vi']: if lang in auto_subs and auto_subs[lang]: return auto_subs[lang][0].get('url') # Fallback to first available if subs: first_key = list(subs.keys())[0] if subs[first_key]: return subs[first_key][0].get('url') if auto_subs: first_key = list(auto_subs.keys())[0] if auto_subs[first_key]: return auto_subs[first_key][0].get('url') return None @classmethod def get_channel_videos(cls, channel_id: str, limit: int = 20) -> List[Dict[str, Any]]: """ Get videos from a YouTube channel Args: channel_id: Channel ID, handle (@username), or URL limit: Maximum number of videos Returns: List of video data dictionaries """ try: # Construct URL based on ID format if channel_id.startswith('http'): url = channel_id elif channel_id.startswith('@'): url = f"https://www.youtube.com/{channel_id}" elif len(channel_id) == 24 and channel_id.startswith('UC'): url = f"https://www.youtube.com/channel/{channel_id}" else: url = f"https://www.youtube.com/{channel_id}" ydl_opts = { **cls.BASE_OPTS, 'extract_flat': True, 'playlist_items': f'1:{limit}', } results = [] with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) entries = info.get('entries', []) if info else [] for entry in entries: if entry and entry.get('id'): results.append(cls.sanitize_video_data(entry)) return results except Exception as e: logger.error(f"Error getting channel videos for {channel_id}: {e}") return [] @classmethod def get_related_videos(cls, title: str, limit: int = 10) -> List[Dict[str, Any]]: """Get videos related to a given title""" query = f"{title} related" return cls.search_videos(query, limit=limit, filter_type='video') @classmethod def get_download_url(cls, video_id: str) -> Optional[Dict[str, str]]: """ Get direct download URL (non-HLS) for a video Returns: Dict with 'url', 'title', 'ext' or None """ try: url = f"https://www.youtube.com/watch?v={video_id}" ydl_opts = { **cls.BASE_OPTS, 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best[protocol!*=m3u8]/best', 'noplaylist': True, 'skip_download': True, 'youtube_include_dash_manifest': False, 'youtube_include_hls_manifest': False, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) download_url = info.get('url', '') # If m3u8, try to find non-HLS format if '.m3u8' in download_url or not download_url: formats = info.get('formats', []) for f in reversed(formats): f_url = f.get('url', '') if f_url and 'm3u8' not in f_url and f.get('ext') == 'mp4': download_url = f_url break if download_url and '.m3u8' not in download_url: return { 'url': download_url, 'title': info.get('title', 'video'), 'ext': 'mp4' } return None except Exception as e: logger.error(f"Error getting download URL for {video_id}: {e}") return None