233 lines
9.2 KiB
Python
233 lines
9.2 KiB
Python
import re
|
|
import json
|
|
import requests
|
|
import yt_dlp
|
|
from ytmusicapi import YTMusic
|
|
from backend.core.cache import CacheManager
|
|
from backend.core.config import settings
|
|
from backend.core.exceptions import ResourceNotFound, ExternalAPIError
|
|
|
|
class YouTubeService:
|
|
def __init__(self):
|
|
self.yt = YTMusic()
|
|
self.cache = CacheManager(str(settings.CACHE_DIR))
|
|
|
|
def _get_high_res_thumbnail(self, thumbnails: list) -> str:
|
|
if not thumbnails:
|
|
return "https://placehold.co/300x300"
|
|
|
|
best_url = thumbnails[-1]['url']
|
|
|
|
if "googleusercontent.com" in best_url or "ggpht.com" in best_url:
|
|
if "w" in best_url and "h" in best_url:
|
|
best_url = re.sub(r'=w\d+-h\d+', '=w544-h544', best_url)
|
|
return best_url
|
|
|
|
def _extract_artist_names(self, track: dict) -> str:
|
|
artists = track.get('artists') or []
|
|
if isinstance(artists, list):
|
|
names = []
|
|
for a in artists:
|
|
if isinstance(a, dict):
|
|
names.append(a.get('name', 'Unknown'))
|
|
elif isinstance(a, str):
|
|
names.append(a)
|
|
return ", ".join(names) if names else "Unknown Artist"
|
|
return "Unknown Artist"
|
|
|
|
def _extract_album_name(self, track: dict, default="Single") -> str:
|
|
album = track.get('album')
|
|
if isinstance(album, dict):
|
|
return album.get('name', default)
|
|
if isinstance(album, str):
|
|
return album
|
|
return default
|
|
|
|
def _clean_title(self, title: str) -> str:
|
|
if not title: return "Playlist"
|
|
title = title.encode('ascii', 'ignore').decode('ascii')
|
|
spam_words = ["Playlist", "Music Chart", "Full SPOTIFY Video", "Updated Weekly", "Official", "Video"]
|
|
for word in spam_words:
|
|
title = re.sub(word, "", title, flags=re.IGNORECASE)
|
|
title = re.sub(r'\s+', ' ', title).strip()
|
|
title = title.strip('*- ')
|
|
return title
|
|
|
|
def _clean_description(self, desc: str) -> str:
|
|
if not desc: return ""
|
|
desc = re.sub(r'http\S+', '', desc)
|
|
desc = re.sub(r'[*_=]{3,}', '', desc)
|
|
if len(desc) > 300:
|
|
desc = desc[:300] + "..."
|
|
return desc.strip()
|
|
|
|
def get_playlist(self, id: str):
|
|
cache_key = f"playlist:{id}"
|
|
cached_playlist = self.cache.get(cache_key)
|
|
if cached_playlist:
|
|
return cached_playlist
|
|
|
|
try:
|
|
playlist_data = None
|
|
is_album = False
|
|
|
|
# Try as Album first if MPREb ID
|
|
if id.startswith("MPREb"):
|
|
try:
|
|
playlist_data = self.yt.get_album(id)
|
|
is_album = True
|
|
except:
|
|
pass
|
|
|
|
if not playlist_data:
|
|
try:
|
|
playlist_data = self.yt.get_playlist(id, limit=100)
|
|
except Exception:
|
|
if not is_album:
|
|
playlist_data = self.yt.get_album(id)
|
|
is_album = True
|
|
|
|
formatted_tracks = []
|
|
if 'tracks' in playlist_data:
|
|
for track in playlist_data['tracks']:
|
|
formatted_tracks.append({
|
|
"title": track.get('title', 'Unknown Title'),
|
|
"artist": self._extract_artist_names(track),
|
|
"album": self._extract_album_name(track, playlist_data.get('title', 'Single')),
|
|
"duration": track.get('duration_seconds', track.get('length_seconds', 0)),
|
|
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails', []) or (playlist_data.get('thumbnails', []) if is_album else [])),
|
|
"id": track.get('videoId'),
|
|
"url": f"https://music.youtube.com/watch?v={track.get('videoId')}"
|
|
})
|
|
|
|
p_cover = self._get_high_res_thumbnail(playlist_data.get('thumbnails', []))
|
|
|
|
author = "YouTube Music"
|
|
if is_album:
|
|
artists = playlist_data.get('artists', [])
|
|
names = [a.get('name', 'Unknown') if isinstance(a, dict) else a for a in artists]
|
|
author = ", ".join(names)
|
|
else:
|
|
author_data = playlist_data.get('author', {})
|
|
author = author_data.get('name', 'YouTube Music') if isinstance(author_data, dict) else str(author_data)
|
|
|
|
formatted_playlist = {
|
|
"id": playlist_data.get('browseId', playlist_data.get('id')),
|
|
"title": self._clean_title(playlist_data.get('title', 'Unknown')),
|
|
"description": self._clean_description(playlist_data.get('description', '')),
|
|
"author": author,
|
|
"cover_url": p_cover,
|
|
"tracks": formatted_tracks
|
|
}
|
|
|
|
self.cache.set(cache_key, formatted_playlist, ttl_seconds=3600)
|
|
return formatted_playlist
|
|
|
|
except Exception as e:
|
|
print(f"Playlist Fetch Error: {e}")
|
|
raise ResourceNotFound(f"Playlist {id} not found")
|
|
|
|
def search(self, query: str):
|
|
if not query: return []
|
|
cache_key = f"search:{query.lower().strip()}"
|
|
cached = self.cache.get(cache_key)
|
|
if cached: return cached
|
|
|
|
try:
|
|
results = self.yt.search(query, filter="songs", limit=20)
|
|
tracks = []
|
|
for track in results:
|
|
tracks.append({
|
|
"title": track.get('title', 'Unknown Title'),
|
|
"artist": self._extract_artist_names(track),
|
|
"album": self._extract_album_name(track, "Single"),
|
|
"duration": track.get('duration_seconds', 0),
|
|
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails', [])),
|
|
"id": track.get('videoId'),
|
|
"url": f"https://music.youtube.com/watch?v={track.get('videoId')}"
|
|
})
|
|
|
|
response = {"tracks": tracks}
|
|
self.cache.set(cache_key, response, ttl_seconds=86400)
|
|
return response
|
|
except Exception as e:
|
|
print(f"Search Error: {e}")
|
|
raise ExternalAPIError(str(e))
|
|
|
|
def get_stream_url(self, id: str):
|
|
cache_key = f"stream:{id}"
|
|
cached = self.cache.get(cache_key)
|
|
if cached: return cached
|
|
|
|
try:
|
|
url = f"https://www.youtube.com/watch?v={id}"
|
|
ydl_opts = {
|
|
'format': 'bestaudio[ext=m4a]/best[ext=mp4]/best',
|
|
'quiet': True,
|
|
'noplaylist': True,
|
|
'force_ipv4': True,
|
|
# Use mobile clients to avoid web scraping blocks
|
|
'extractor_args': {
|
|
'youtube': {
|
|
'player_client': ['ios', 'android', 'web']
|
|
}
|
|
}
|
|
}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(url, download=False)
|
|
stream_url = info.get('url')
|
|
|
|
if stream_url:
|
|
# Extract headers that yt-dlp used/recommends
|
|
headers = info.get('http_headers', {})
|
|
result = {
|
|
"url": stream_url,
|
|
"headers": headers
|
|
}
|
|
self.cache.set(cache_key, result, ttl_seconds=3600)
|
|
return result
|
|
raise ResourceNotFound("Stream not found")
|
|
except Exception as e:
|
|
raise ExternalAPIError(str(e))
|
|
|
|
def invalidate_stream_cache(self, id: str):
|
|
cache_key = f"stream:{id}"
|
|
path = self.cache._get_path(cache_key)
|
|
if path.exists():
|
|
try:
|
|
path.unlink()
|
|
except:
|
|
pass
|
|
|
|
def get_recommendations(self, seed_id: str):
|
|
if not seed_id: return []
|
|
cache_key = f"rec:{seed_id}"
|
|
cached = self.cache.get(cache_key)
|
|
if cached: return cached
|
|
|
|
try:
|
|
watch_playlist = self.yt.get_watch_playlist(videoId=seed_id, limit=20)
|
|
tracks = []
|
|
if 'tracks' in watch_playlist:
|
|
seen_ids = {seed_id}
|
|
for track in watch_playlist['tracks']:
|
|
t_id = track.get('videoId')
|
|
if not t_id or t_id in seen_ids: continue
|
|
seen_ids.add(t_id)
|
|
|
|
tracks.append({
|
|
"title": track.get('title', 'Unknown Title'),
|
|
"artist": self._extract_artist_names(track),
|
|
"album": self._extract_album_name(track, "Single"),
|
|
"duration": track.get('length_seconds', track.get('duration_seconds', 0)),
|
|
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails') or track.get('thumbnail') or []),
|
|
"id": t_id,
|
|
"url": f"https://music.youtube.com/watch?v={t_id}"
|
|
})
|
|
|
|
response = {"tracks": tracks}
|
|
self.cache.set(cache_key, response, ttl_seconds=3600)
|
|
return response
|
|
except Exception as e:
|
|
print(f"Rec Error: {e}")
|
|
return {"tracks": []}
|