import re import json import requests import yt_dlp from ytmusicapi import YTMusic from backend.core.cache import CacheManager from backend.core.config import settings from backend.core.exceptions import ResourceNotFound, ExternalAPIError class YouTubeService: def __init__(self): self.yt = YTMusic() self.cache = CacheManager(str(settings.CACHE_DIR)) def _get_high_res_thumbnail(self, thumbnails: list) -> str: if not thumbnails: return "https://placehold.co/300x300" best_url = thumbnails[-1]['url'] if "googleusercontent.com" in best_url or "ggpht.com" in best_url: if "w" in best_url and "h" in best_url: best_url = re.sub(r'=w\d+-h\d+', '=w544-h544', best_url) return best_url def _extract_artist_names(self, track: dict) -> str: artists = track.get('artists') or [] if isinstance(artists, list): names = [] for a in artists: if isinstance(a, dict): names.append(a.get('name', 'Unknown')) elif isinstance(a, str): names.append(a) return ", ".join(names) if names else "Unknown Artist" return "Unknown Artist" def _extract_album_name(self, track: dict, default="Single") -> str: album = track.get('album') if isinstance(album, dict): return album.get('name', default) if isinstance(album, str): return album return default def _clean_title(self, title: str) -> str: if not title: return "Playlist" title = title.encode('ascii', 'ignore').decode('ascii') spam_words = ["Playlist", "Music Chart", "Full SPOTIFY Video", "Updated Weekly", "Official", "Video"] for word in spam_words: title = re.sub(word, "", title, flags=re.IGNORECASE) title = re.sub(r'\s+', ' ', title).strip() title = title.strip('*- ') return title def _clean_description(self, desc: str) -> str: if not desc: return "" desc = re.sub(r'http\S+', '', desc) desc = re.sub(r'[*_=]{3,}', '', desc) if len(desc) > 300: desc = desc[:300] + "..." return desc.strip() def get_playlist(self, id: str): cache_key = f"playlist:{id}" cached_playlist = self.cache.get(cache_key) if cached_playlist: return cached_playlist try: playlist_data = None is_album = False # Try as Album first if MPREb ID if id.startswith("MPREb"): try: playlist_data = self.yt.get_album(id) is_album = True except: pass if not playlist_data: try: playlist_data = self.yt.get_playlist(id, limit=100) except Exception: if not is_album: playlist_data = self.yt.get_album(id) is_album = True formatted_tracks = [] if 'tracks' in playlist_data: for track in playlist_data['tracks']: formatted_tracks.append({ "title": track.get('title', 'Unknown Title'), "artist": self._extract_artist_names(track), "album": self._extract_album_name(track, playlist_data.get('title', 'Single')), "duration": track.get('duration_seconds', track.get('length_seconds', 0)), "cover_url": self._get_high_res_thumbnail(track.get('thumbnails', []) or (playlist_data.get('thumbnails', []) if is_album else [])), "id": track.get('videoId'), "url": f"https://music.youtube.com/watch?v={track.get('videoId')}" }) p_cover = self._get_high_res_thumbnail(playlist_data.get('thumbnails', [])) author = "YouTube Music" if is_album: artists = playlist_data.get('artists', []) names = [a.get('name', 'Unknown') if isinstance(a, dict) else a for a in artists] author = ", ".join(names) else: author_data = playlist_data.get('author', {}) author = author_data.get('name', 'YouTube Music') if isinstance(author_data, dict) else str(author_data) formatted_playlist = { "id": playlist_data.get('browseId', playlist_data.get('id')), "title": self._clean_title(playlist_data.get('title', 'Unknown')), "description": self._clean_description(playlist_data.get('description', '')), "author": author, "cover_url": p_cover, "tracks": formatted_tracks } self.cache.set(cache_key, formatted_playlist, ttl_seconds=3600) return formatted_playlist except Exception as e: print(f"Playlist Fetch Error: {e}") raise ResourceNotFound(f"Playlist {id} not found") def search(self, query: str): if not query: return [] cache_key = f"search:{query.lower().strip()}" cached = self.cache.get(cache_key) if cached: return cached try: results = self.yt.search(query, filter="songs", limit=20) tracks = [] for track in results: tracks.append({ "title": track.get('title', 'Unknown Title'), "artist": self._extract_artist_names(track), "album": self._extract_album_name(track, "Single"), "duration": track.get('duration_seconds', 0), "cover_url": self._get_high_res_thumbnail(track.get('thumbnails', [])), "id": track.get('videoId'), "url": f"https://music.youtube.com/watch?v={track.get('videoId')}" }) response = {"tracks": tracks} self.cache.set(cache_key, response, ttl_seconds=86400) return response except Exception as e: print(f"Search Error: {e}") raise ExternalAPIError(str(e)) def get_stream_url(self, id: str): cache_key = f"stream:{id}" cached = self.cache.get(cache_key) if cached: return cached # Strategy: Try versatile clients in order clients_to_try = [ # 1. iOS (often best for audio) {'extractor_args': {'youtube': {'player_client': ['ios']}}}, # 2. Android (robust) {'extractor_args': {'youtube': {'player_client': ['android']}}}, # 3. Web (standard, prone to 403) {'extractor_args': {'youtube': {'player_client': ['web']}}}, # 4. TV (sometimes works for age-gated) {'extractor_args': {'youtube': {'player_client': ['tv']}}}, ] last_error = None for client_config in clients_to_try: try: url = f"https://www.youtube.com/watch?v={id}" ydl_opts = { 'format': 'bestaudio[ext=m4a]/best[ext=mp4]/best', 'quiet': True, 'noplaylist': True, 'force_ipv4': True, } ydl_opts.update(client_config) with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) stream_url = info.get('url') if stream_url: headers = info.get('http_headers', {}) result = { "url": stream_url, "headers": headers } self.cache.set(cache_key, result, ttl_seconds=3600) return result except Exception as e: last_error = e print(f"Fetch failed with client {client_config}: {e}") continue # If all fail print(f"All clients failed for {id}. Last error: {last_error}") raise ExternalAPIError(str(last_error)) def invalidate_stream_cache(self, id: str): cache_key = f"stream:{id}" path = self.cache._get_path(cache_key) if path.exists(): try: path.unlink() except: pass def get_recommendations(self, seed_id: str): if not seed_id: return [] cache_key = f"rec:{seed_id}" cached = self.cache.get(cache_key) if cached: return cached try: watch_playlist = self.yt.get_watch_playlist(videoId=seed_id, limit=20) tracks = [] if 'tracks' in watch_playlist: seen_ids = {seed_id} for track in watch_playlist['tracks']: t_id = track.get('videoId') if not t_id or t_id in seen_ids: continue seen_ids.add(t_id) tracks.append({ "title": track.get('title', 'Unknown Title'), "artist": self._extract_artist_names(track), "album": self._extract_album_name(track, "Single"), "duration": track.get('length_seconds', track.get('duration_seconds', 0)), "cover_url": self._get_high_res_thumbnail(track.get('thumbnails') or track.get('thumbnail') or []), "id": t_id, "url": f"https://music.youtube.com/watch?v={t_id}" }) response = {"tracks": tracks} self.cache.set(cache_key, response, ttl_seconds=3600) return response except Exception as e: print(f"Rec Error: {e}") return {"tracks": []} def get_home(self): cache_key = "home:browse" cached = self.cache.get(cache_key) if cached: return cached try: # ytmusicapi `get_home` returns complex Sections # For simplicity, we'll fetch charts and new releases as "Browse" content charts = self.yt.get_charts(country="US") # Formating Charts trending_songs = [] if 'videos' in charts and 'items' in charts['videos']: for track in charts['videos']['items']: trending_songs.append({ "title": track.get('title', 'Unknown'), "artist": self._extract_artist_names(track), "album": "Trending", "duration": 0, # Charts often lack duration "cover_url": self._get_high_res_thumbnail(track.get('thumbnails', [])), "id": track.get('videoId'), "url": f"https://music.youtube.com/watch?v={track.get('videoId')}" }) # New Releases (using search for "New Songs" as proxy or actual new releases if supported) # Actually ytmusicapi has get_new_releases usually under get_charts or specific calls # We'll use get_charts "trending" for "Trending" category # And maybe "Top Songs" for "Top Hits" response = { "Trending": [{ "id": "trending", "title": "Trending Now", "description": "Top music videos right now", "cover_url": trending_songs[0]['cover_url'] if trending_songs else "", "tracks": trending_songs, "type": "Playlist", "creator": "YouTube Charts" }] } self.cache.set(cache_key, response, ttl_seconds=3600) return response except Exception as e: print(f"Home Error: {e}") return {} def get_trending(self): # Dedicated trending endpoint home = self.get_home() if "Trending" in home and home["Trending"]: return {"tracks": home["Trending"][0]["tracks"]} return {"tracks": []}