spotify-clone/backend/services/youtube.py

233 lines
9.2 KiB
Python

import re
import json
import requests
import yt_dlp
from ytmusicapi import YTMusic
from backend.core.cache import CacheManager
from backend.core.config import settings
from backend.core.exceptions import ResourceNotFound, ExternalAPIError
class YouTubeService:
def __init__(self):
self.yt = YTMusic()
self.cache = CacheManager(str(settings.CACHE_DIR))
def _get_high_res_thumbnail(self, thumbnails: list) -> str:
if not thumbnails:
return "https://placehold.co/300x300"
best_url = thumbnails[-1]['url']
if "googleusercontent.com" in best_url or "ggpht.com" in best_url:
if "w" in best_url and "h" in best_url:
best_url = re.sub(r'=w\d+-h\d+', '=w544-h544', best_url)
return best_url
def _extract_artist_names(self, track: dict) -> str:
artists = track.get('artists') or []
if isinstance(artists, list):
names = []
for a in artists:
if isinstance(a, dict):
names.append(a.get('name', 'Unknown'))
elif isinstance(a, str):
names.append(a)
return ", ".join(names) if names else "Unknown Artist"
return "Unknown Artist"
def _extract_album_name(self, track: dict, default="Single") -> str:
album = track.get('album')
if isinstance(album, dict):
return album.get('name', default)
if isinstance(album, str):
return album
return default
def _clean_title(self, title: str) -> str:
if not title: return "Playlist"
title = title.encode('ascii', 'ignore').decode('ascii')
spam_words = ["Playlist", "Music Chart", "Full SPOTIFY Video", "Updated Weekly", "Official", "Video"]
for word in spam_words:
title = re.sub(word, "", title, flags=re.IGNORECASE)
title = re.sub(r'\s+', ' ', title).strip()
title = title.strip('*- ')
return title
def _clean_description(self, desc: str) -> str:
if not desc: return ""
desc = re.sub(r'http\S+', '', desc)
desc = re.sub(r'[*_=]{3,}', '', desc)
if len(desc) > 300:
desc = desc[:300] + "..."
return desc.strip()
def get_playlist(self, id: str):
cache_key = f"playlist:{id}"
cached_playlist = self.cache.get(cache_key)
if cached_playlist:
return cached_playlist
try:
playlist_data = None
is_album = False
# Try as Album first if MPREb ID
if id.startswith("MPREb"):
try:
playlist_data = self.yt.get_album(id)
is_album = True
except:
pass
if not playlist_data:
try:
playlist_data = self.yt.get_playlist(id, limit=100)
except Exception:
if not is_album:
playlist_data = self.yt.get_album(id)
is_album = True
formatted_tracks = []
if 'tracks' in playlist_data:
for track in playlist_data['tracks']:
formatted_tracks.append({
"title": track.get('title', 'Unknown Title'),
"artist": self._extract_artist_names(track),
"album": self._extract_album_name(track, playlist_data.get('title', 'Single')),
"duration": track.get('duration_seconds', track.get('length_seconds', 0)),
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails', []) or (playlist_data.get('thumbnails', []) if is_album else [])),
"id": track.get('videoId'),
"url": f"https://music.youtube.com/watch?v={track.get('videoId')}"
})
p_cover = self._get_high_res_thumbnail(playlist_data.get('thumbnails', []))
author = "YouTube Music"
if is_album:
artists = playlist_data.get('artists', [])
names = [a.get('name', 'Unknown') if isinstance(a, dict) else a for a in artists]
author = ", ".join(names)
else:
author_data = playlist_data.get('author', {})
author = author_data.get('name', 'YouTube Music') if isinstance(author_data, dict) else str(author_data)
formatted_playlist = {
"id": playlist_data.get('browseId', playlist_data.get('id')),
"title": self._clean_title(playlist_data.get('title', 'Unknown')),
"description": self._clean_description(playlist_data.get('description', '')),
"author": author,
"cover_url": p_cover,
"tracks": formatted_tracks
}
self.cache.set(cache_key, formatted_playlist, ttl_seconds=3600)
return formatted_playlist
except Exception as e:
print(f"Playlist Fetch Error: {e}")
raise ResourceNotFound(f"Playlist {id} not found")
def search(self, query: str):
if not query: return []
cache_key = f"search:{query.lower().strip()}"
cached = self.cache.get(cache_key)
if cached: return cached
try:
results = self.yt.search(query, filter="songs", limit=20)
tracks = []
for track in results:
tracks.append({
"title": track.get('title', 'Unknown Title'),
"artist": self._extract_artist_names(track),
"album": self._extract_album_name(track, "Single"),
"duration": track.get('duration_seconds', 0),
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails', [])),
"id": track.get('videoId'),
"url": f"https://music.youtube.com/watch?v={track.get('videoId')}"
})
response = {"tracks": tracks}
self.cache.set(cache_key, response, ttl_seconds=86400)
return response
except Exception as e:
print(f"Search Error: {e}")
raise ExternalAPIError(str(e))
def get_stream_url(self, id: str):
cache_key = f"stream:{id}"
cached = self.cache.get(cache_key)
if cached: return cached
try:
url = f"https://www.youtube.com/watch?v={id}"
ydl_opts = {
'format': 'bestaudio[ext=m4a]/best[ext=mp4]/best',
'quiet': True,
'noplaylist': True,
'force_ipv4': True,
# Use mobile clients to avoid web scraping blocks
'extractor_args': {
'youtube': {
'player_client': ['android', 'ios']
}
}
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
stream_url = info.get('url')
if stream_url:
# Extract headers that yt-dlp used/recommends
headers = info.get('http_headers', {})
result = {
"url": stream_url,
"headers": headers
}
self.cache.set(cache_key, result, ttl_seconds=3600)
return result
raise ResourceNotFound("Stream not found")
except Exception as e:
raise ExternalAPIError(str(e))
def invalidate_stream_cache(self, id: str):
cache_key = f"stream:{id}"
path = self.cache._get_path(cache_key)
if path.exists():
try:
path.unlink()
except:
pass
def get_recommendations(self, seed_id: str):
if not seed_id: return []
cache_key = f"rec:{seed_id}"
cached = self.cache.get(cache_key)
if cached: return cached
try:
watch_playlist = self.yt.get_watch_playlist(videoId=seed_id, limit=20)
tracks = []
if 'tracks' in watch_playlist:
seen_ids = {seed_id}
for track in watch_playlist['tracks']:
t_id = track.get('videoId')
if not t_id or t_id in seen_ids: continue
seen_ids.add(t_id)
tracks.append({
"title": track.get('title', 'Unknown Title'),
"artist": self._extract_artist_names(track),
"album": self._extract_album_name(track, "Single"),
"duration": track.get('length_seconds', track.get('duration_seconds', 0)),
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails') or track.get('thumbnail') or []),
"id": t_id,
"url": f"https://music.youtube.com/watch?v={t_id}"
})
response = {"tracks": tracks}
self.cache.set(cache_key, response, ttl_seconds=3600)
return response
except Exception as e:
print(f"Rec Error: {e}")
return {"tracks": []}