import re import json import requests import yt_dlp from ytmusicapi import YTMusic from backend.core.cache import CacheManager from backend.core.config import settings from backend.core.exceptions import ResourceNotFound, ExternalAPIError class YouTubeService: def __init__(self): self.yt = YTMusic() self.cache = CacheManager(str(settings.CACHE_DIR)) def _get_high_res_thumbnail(self, thumbnails: list) -> str: if not thumbnails: return "https://placehold.co/300x300" best_url = thumbnails[-1]['url'] if "googleusercontent.com" in best_url or "ggpht.com" in best_url: if "w" in best_url and "h" in best_url: best_url = re.sub(r'=w\d+-h\d+', '=w544-h544', best_url) return best_url def _extract_artist_names(self, track: dict) -> str: artists = track.get('artists') or [] if isinstance(artists, list): names = [] for a in artists: if isinstance(a, dict): names.append(a.get('name', 'Unknown')) elif isinstance(a, str): names.append(a) return ", ".join(names) if names else "Unknown Artist" return "Unknown Artist" def _extract_album_name(self, track: dict, default="Single") -> str: album = track.get('album') if isinstance(album, dict): return album.get('name', default) if isinstance(album, str): return album return default def _clean_title(self, title: str) -> str: if not title: return "Playlist" title = title.encode('ascii', 'ignore').decode('ascii') spam_words = ["Playlist", "Music Chart", "Full SPOTIFY Video", "Updated Weekly", "Official", "Video"] for word in spam_words: title = re.sub(word, "", title, flags=re.IGNORECASE) title = re.sub(r'\s+', ' ', title).strip() title = title.strip('*- ') return title def _clean_description(self, desc: str) -> str: if not desc: return "" desc = re.sub(r'http\S+', '', desc) desc = re.sub(r'[*_=]{3,}', '', desc) if len(desc) > 300: desc = desc[:300] + "..." return desc.strip() def get_playlist(self, id: str): cache_key = f"playlist:{id}" cached_playlist = self.cache.get(cache_key) if cached_playlist: return cached_playlist try: playlist_data = None is_album = False # Try as Album first if MPREb ID if id.startswith("MPREb"): try: playlist_data = self.yt.get_album(id) is_album = True except: pass if not playlist_data: try: playlist_data = self.yt.get_playlist(id, limit=100) except Exception: if not is_album: playlist_data = self.yt.get_album(id) is_album = True formatted_tracks = [] if 'tracks' in playlist_data: for track in playlist_data['tracks']: formatted_tracks.append({ "title": track.get('title', 'Unknown Title'), "artist": self._extract_artist_names(track), "album": self._extract_album_name(track, playlist_data.get('title', 'Single')), "duration": track.get('duration_seconds', track.get('length_seconds', 0)), "cover_url": self._get_high_res_thumbnail(track.get('thumbnails', []) or (playlist_data.get('thumbnails', []) if is_album else [])), "id": track.get('videoId'), "url": f"https://music.youtube.com/watch?v={track.get('videoId')}" }) p_cover = self._get_high_res_thumbnail(playlist_data.get('thumbnails', [])) author = "YouTube Music" if is_album: artists = playlist_data.get('artists', []) names = [a.get('name', 'Unknown') if isinstance(a, dict) else a for a in artists] author = ", ".join(names) else: author_data = playlist_data.get('author', {}) author = author_data.get('name', 'YouTube Music') if isinstance(author_data, dict) else str(author_data) formatted_playlist = { "id": playlist_data.get('browseId', playlist_data.get('id')), "title": self._clean_title(playlist_data.get('title', 'Unknown')), "description": self._clean_description(playlist_data.get('description', '')), "author": author, "cover_url": p_cover, "tracks": formatted_tracks } self.cache.set(cache_key, formatted_playlist, ttl_seconds=3600) return formatted_playlist except Exception as e: print(f"Playlist Fetch Error: {e}") raise ResourceNotFound(f"Playlist {id} not found") def search(self, query: str): if not query: return [] cache_key = f"search:{query.lower().strip()}" cached = self.cache.get(cache_key) if cached: return cached try: results = self.yt.search(query, filter="songs", limit=20) tracks = [] for track in results: tracks.append({ "title": track.get('title', 'Unknown Title'), "artist": self._extract_artist_names(track), "album": self._extract_album_name(track, "Single"), "duration": track.get('duration_seconds', 0), "cover_url": self._get_high_res_thumbnail(track.get('thumbnails', [])), "id": track.get('videoId'), "url": f"https://music.youtube.com/watch?v={track.get('videoId')}" }) response = {"tracks": tracks} self.cache.set(cache_key, response, ttl_seconds=86400) return response except Exception as e: print(f"Search Error: {e}") raise ExternalAPIError(str(e)) def get_stream_url(self, id: str): cache_key = f"stream:{id}" cached = self.cache.get(cache_key) if cached: return cached try: url = f"https://www.youtube.com/watch?v={id}" ydl_opts = { 'format': 'bestaudio[ext=m4a]/best[ext=mp4]/best', 'quiet': True, 'noplaylist': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) stream_url = info.get('url') if stream_url: self.cache.set(cache_key, stream_url, ttl_seconds=3600) return stream_url raise ResourceNotFound("Stream not found") except Exception as e: raise ExternalAPIError(str(e)) def get_recommendations(self, seed_id: str): if not seed_id: return [] cache_key = f"rec:{seed_id}" cached = self.cache.get(cache_key) if cached: return cached try: watch_playlist = self.yt.get_watch_playlist(videoId=seed_id, limit=20) tracks = [] if 'tracks' in watch_playlist: seen_ids = {seed_id} for track in watch_playlist['tracks']: t_id = track.get('videoId') if not t_id or t_id in seen_ids: continue seen_ids.add(t_id) tracks.append({ "title": track.get('title', 'Unknown Title'), "artist": self._extract_artist_names(track), "album": self._extract_album_name(track, "Single"), "duration": track.get('length_seconds', track.get('duration_seconds', 0)), "cover_url": self._get_high_res_thumbnail(track.get('thumbnails') or track.get('thumbnail') or []), "id": t_id, "url": f"https://music.youtube.com/watch?v={t_id}" }) response = {"tracks": tracks} self.cache.set(cache_key, response, ttl_seconds=3600) return response except Exception as e: print(f"Rec Error: {e}") return {"tracks": []}