383 lines
16 KiB
Python
383 lines
16 KiB
Python
import re
|
|
import json
|
|
import requests
|
|
import yt_dlp
|
|
from ytmusicapi import YTMusic
|
|
from backend.core.cache import CacheManager
|
|
from backend.core.config import settings
|
|
from backend.core.exceptions import ResourceNotFound, ExternalAPIError
|
|
|
|
class YouTubeService:
|
|
def __init__(self):
|
|
self.yt = YTMusic()
|
|
self.cache = CacheManager(str(settings.CACHE_DIR))
|
|
|
|
def _get_high_res_thumbnail(self, thumbnails: list) -> str:
|
|
if not thumbnails:
|
|
return "https://placehold.co/300x300"
|
|
|
|
best_url = thumbnails[-1]['url']
|
|
|
|
if "googleusercontent.com" in best_url or "ggpht.com" in best_url:
|
|
if "w" in best_url and "h" in best_url:
|
|
best_url = re.sub(r'=w\d+-h\d+', '=w544-h544', best_url)
|
|
return best_url
|
|
|
|
def _extract_artist_names(self, track: dict) -> str:
|
|
artists = track.get('artists') or []
|
|
if isinstance(artists, list):
|
|
names = []
|
|
for a in artists:
|
|
if isinstance(a, dict):
|
|
names.append(a.get('name', 'Unknown'))
|
|
elif isinstance(a, str):
|
|
names.append(a)
|
|
return ", ".join(names) if names else "Unknown Artist"
|
|
return "Unknown Artist"
|
|
|
|
def _extract_album_name(self, track: dict, default="Single") -> str:
|
|
album = track.get('album')
|
|
if isinstance(album, dict):
|
|
return album.get('name', default)
|
|
if isinstance(album, str):
|
|
return album
|
|
return default
|
|
|
|
def _clean_title(self, title: str) -> str:
|
|
if not title: return "Playlist"
|
|
title = title.encode('ascii', 'ignore').decode('ascii')
|
|
spam_words = ["Playlist", "Music Chart", "Full SPOTIFY Video", "Updated Weekly", "Official", "Video"]
|
|
for word in spam_words:
|
|
title = re.sub(word, "", title, flags=re.IGNORECASE)
|
|
title = re.sub(r'\s+', ' ', title).strip()
|
|
title = title.strip('*- ')
|
|
return title
|
|
|
|
def _clean_description(self, desc: str) -> str:
|
|
if not desc: return ""
|
|
desc = re.sub(r'http\S+', '', desc)
|
|
desc = re.sub(r'[*_=]{3,}', '', desc)
|
|
if len(desc) > 300:
|
|
desc = desc[:300] + "..."
|
|
return desc.strip()
|
|
|
|
def get_playlist(self, id: str):
|
|
cache_key = f"playlist:{id}"
|
|
cached_playlist = self.cache.get(cache_key)
|
|
if cached_playlist:
|
|
return cached_playlist
|
|
|
|
try:
|
|
playlist_data = None
|
|
is_album = False
|
|
|
|
# Try as Album first if MPREb ID
|
|
if id.startswith("MPREb"):
|
|
try:
|
|
playlist_data = self.yt.get_album(id)
|
|
is_album = True
|
|
except:
|
|
pass
|
|
|
|
if not playlist_data:
|
|
try:
|
|
playlist_data = self.yt.get_playlist(id, limit=100)
|
|
except Exception:
|
|
if not is_album:
|
|
playlist_data = self.yt.get_album(id)
|
|
is_album = True
|
|
|
|
formatted_tracks = []
|
|
if 'tracks' in playlist_data:
|
|
for track in playlist_data['tracks']:
|
|
formatted_tracks.append({
|
|
"title": track.get('title', 'Unknown Title'),
|
|
"artist": self._extract_artist_names(track),
|
|
"album": self._extract_album_name(track, playlist_data.get('title', 'Single')),
|
|
"duration": track.get('duration_seconds', track.get('length_seconds', 0)),
|
|
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails', []) or (playlist_data.get('thumbnails', []) if is_album else [])),
|
|
"id": track.get('videoId'),
|
|
"url": f"https://music.youtube.com/watch?v={track.get('videoId')}"
|
|
})
|
|
|
|
p_cover = self._get_high_res_thumbnail(playlist_data.get('thumbnails', []))
|
|
|
|
author = "YouTube Music"
|
|
if is_album:
|
|
artists = playlist_data.get('artists', [])
|
|
names = [a.get('name', 'Unknown') if isinstance(a, dict) else a for a in artists]
|
|
author = ", ".join(names)
|
|
else:
|
|
author_data = playlist_data.get('author', {})
|
|
author = author_data.get('name', 'YouTube Music') if isinstance(author_data, dict) else str(author_data)
|
|
|
|
formatted_playlist = {
|
|
"id": playlist_data.get('browseId', playlist_data.get('id')),
|
|
"title": self._clean_title(playlist_data.get('title', 'Unknown')),
|
|
"description": self._clean_description(playlist_data.get('description', '')),
|
|
"author": author,
|
|
"cover_url": p_cover,
|
|
"tracks": formatted_tracks
|
|
}
|
|
|
|
self.cache.set(cache_key, formatted_playlist, ttl_seconds=3600)
|
|
return formatted_playlist
|
|
|
|
except Exception as e:
|
|
print(f"Playlist Fetch Error: {e}")
|
|
raise ResourceNotFound(f"Playlist {id} not found")
|
|
|
|
def search(self, query: str):
|
|
if not query: return []
|
|
cache_key = f"search:{query.lower().strip()}"
|
|
cached = self.cache.get(cache_key)
|
|
if cached: return cached
|
|
|
|
try:
|
|
results = self.yt.search(query, filter="songs", limit=20)
|
|
tracks = []
|
|
for track in results:
|
|
tracks.append({
|
|
"title": track.get('title', 'Unknown Title'),
|
|
"artist": self._extract_artist_names(track),
|
|
"album": self._extract_album_name(track, "Single"),
|
|
"duration": track.get('duration_seconds', 0),
|
|
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails', [])),
|
|
"id": track.get('videoId'),
|
|
"url": f"https://music.youtube.com/watch?v={track.get('videoId')}"
|
|
})
|
|
|
|
response = {"tracks": tracks}
|
|
self.cache.set(cache_key, response, ttl_seconds=86400)
|
|
return response
|
|
except Exception as e:
|
|
print(f"Search Error: {e}")
|
|
raise ExternalAPIError(str(e))
|
|
|
|
def get_stream_url(self, id: str):
|
|
cache_key = f"stream:{id}"
|
|
cached = self.cache.get(cache_key)
|
|
if cached: return cached
|
|
|
|
# Strategy: Try versatile clients in order
|
|
clients_to_try = [
|
|
# 1. iOS (often best for audio)
|
|
{'extractor_args': {'youtube': {'player_client': ['ios']}}},
|
|
# 2. Android (robust)
|
|
{'extractor_args': {'youtube': {'player_client': ['android']}}},
|
|
# 3. Web (standard, prone to 403)
|
|
{'extractor_args': {'youtube': {'player_client': ['web']}}},
|
|
# 4. TV (sometimes works for age-gated)
|
|
{'extractor_args': {'youtube': {'player_client': ['tv']}}},
|
|
]
|
|
|
|
last_error = None
|
|
|
|
for client_config in clients_to_try:
|
|
try:
|
|
url = f"https://www.youtube.com/watch?v={id}"
|
|
ydl_opts = {
|
|
'format': 'bestaudio[ext=m4a]/best[ext=mp4]/best',
|
|
'quiet': True,
|
|
'noplaylist': True,
|
|
'force_ipv4': True,
|
|
}
|
|
ydl_opts.update(client_config)
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(url, download=False)
|
|
stream_url = info.get('url')
|
|
|
|
if stream_url:
|
|
headers = info.get('http_headers', {})
|
|
result = {
|
|
"url": stream_url,
|
|
"headers": headers
|
|
}
|
|
self.cache.set(cache_key, result, ttl_seconds=3600)
|
|
return result
|
|
except Exception as e:
|
|
last_error = e
|
|
print(f"Fetch failed with client {client_config}: {e}")
|
|
continue
|
|
|
|
# If all fail
|
|
print(f"All clients failed for {id}. Last error: {last_error}")
|
|
raise ExternalAPIError(str(last_error))
|
|
|
|
def invalidate_stream_cache(self, id: str):
|
|
cache_key = f"stream:{id}"
|
|
path = self.cache._get_path(cache_key)
|
|
if path.exists():
|
|
try:
|
|
path.unlink()
|
|
except:
|
|
pass
|
|
|
|
def get_recommendations(self, seed_id: str):
|
|
if not seed_id: return []
|
|
cache_key = f"rec:{seed_id}"
|
|
cached = self.cache.get(cache_key)
|
|
if cached: return cached
|
|
|
|
try:
|
|
watch_playlist = self.yt.get_watch_playlist(videoId=seed_id, limit=20)
|
|
tracks = []
|
|
if 'tracks' in watch_playlist:
|
|
seen_ids = {seed_id}
|
|
for track in watch_playlist['tracks']:
|
|
t_id = track.get('videoId')
|
|
if not t_id or t_id in seen_ids: continue
|
|
seen_ids.add(t_id)
|
|
|
|
tracks.append({
|
|
"title": track.get('title', 'Unknown Title'),
|
|
"artist": self._extract_artist_names(track),
|
|
"album": self._extract_album_name(track, "Single"),
|
|
"duration": track.get('length_seconds', track.get('duration_seconds', 0)),
|
|
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails') or track.get('thumbnail') or []),
|
|
"id": t_id,
|
|
"url": f"https://music.youtube.com/watch?v={t_id}"
|
|
})
|
|
|
|
response = {"tracks": tracks}
|
|
self.cache.set(cache_key, response, ttl_seconds=3600)
|
|
return response
|
|
except Exception as e:
|
|
print(f"Rec Error: {e}")
|
|
return {"tracks": []}
|
|
|
|
def get_home(self):
|
|
cache_key = "home:browse"
|
|
cached = self.cache.get(cache_key)
|
|
if cached: return cached
|
|
|
|
try:
|
|
# ytmusicapi `get_home` returns complex Sections
|
|
# For simplicity, we'll fetch charts and new releases as "Browse" content
|
|
charts = self.yt.get_charts(country="US")
|
|
|
|
# Formating Charts
|
|
trending_songs = []
|
|
if 'videos' in charts and 'items' in charts['videos']:
|
|
for track in charts['videos']['items']:
|
|
trending_songs.append({
|
|
"title": track.get('title', 'Unknown'),
|
|
"artist": self._extract_artist_names(track),
|
|
"album": "Trending",
|
|
"duration": 0, # Charts often lack duration
|
|
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails', [])),
|
|
"id": track.get('videoId'),
|
|
"url": f"https://music.youtube.com/watch?v={track.get('videoId')}"
|
|
})
|
|
|
|
# New Releases (using search for "New Songs" as proxy or actual new releases if supported)
|
|
# Actually ytmusicapi has get_new_releases usually under get_charts or specific calls
|
|
# We'll use get_charts "trending" for "Trending" category
|
|
# And maybe "Top Songs" for "Top Hits"
|
|
|
|
# 1. Trending (from Charts)
|
|
trending_playlist = {
|
|
"id": "trending",
|
|
"title": "Trending Now",
|
|
"description": "Top music videos right now",
|
|
"cover_url": trending_songs[0]['cover_url'] if trending_songs else "",
|
|
"tracks": trending_songs,
|
|
"type": "Playlist",
|
|
"creator": "YouTube Charts"
|
|
}
|
|
|
|
# 2. Top Hits (Simulated via search)
|
|
# We'll fetch a few "standard" playlists or results to populate the home page
|
|
# This makes the app feel "alive" even without user history
|
|
|
|
async def get_search_shelf(query, title):
|
|
try:
|
|
res = self.search(query)
|
|
if res and 'tracks' in res:
|
|
return {
|
|
"id": f"shelf_{query}",
|
|
"title": title,
|
|
"description": f"Best of {title}",
|
|
"cover_url": res['tracks'][0]['cover_url'] if res['tracks'] else "",
|
|
"tracks": res['tracks'],
|
|
"type": "Playlist",
|
|
"creator": "Spotify Clone"
|
|
}
|
|
except:
|
|
return None
|
|
|
|
# Since this is synchronous, we'll do simple searches or use cached results
|
|
# For speed, we might want to hardcode IDs of popular playlists in the future
|
|
# But for now, let's just reuse the trending videos for a "Top Hits" section to fill space
|
|
# and maybe shuffle them or pick different slice
|
|
|
|
import random
|
|
top_hits_tracks = list(trending_songs)
|
|
if len(top_hits_tracks) > 5:
|
|
random.shuffle(top_hits_tracks)
|
|
|
|
top_hits_playlist = {
|
|
"id": "top_hits",
|
|
"title": "Top Hits Today",
|
|
"description": "The hottest tracks right now.",
|
|
"cover_url": top_hits_tracks[0]['cover_url'] if top_hits_tracks else "",
|
|
"tracks": top_hits_tracks,
|
|
"type": "Playlist",
|
|
"creator": "Editors"
|
|
}
|
|
|
|
# 3. New Releases (Simulated)
|
|
new_releases_tracks = list(trending_songs)
|
|
if len(new_releases_tracks) > 2:
|
|
# Just rotate them to look different
|
|
new_releases_tracks = new_releases_tracks[2:] + new_releases_tracks[:2]
|
|
|
|
new_releases_playlist = {
|
|
"id": "new_releases",
|
|
"title": "New Releases",
|
|
"description": "Brand new music found for you.",
|
|
"cover_url": new_releases_tracks[0]['cover_url'] if new_releases_tracks else "",
|
|
"tracks": new_releases_tracks,
|
|
"type": "Playlist",
|
|
"creator": "Spotify Clone"
|
|
}
|
|
|
|
response = {
|
|
"Trending": [trending_playlist],
|
|
"Top Hits": [top_hits_playlist],
|
|
"New Releases": [new_releases_playlist],
|
|
"Focus & Chill": [
|
|
{
|
|
"id": "lofi_beats",
|
|
"title": "Lofi Beats",
|
|
"description": "Chill beats to study/relax to",
|
|
"cover_url": "https://i.ytimg.com/vi/jfKfPfyJRdk/hqdefault.jpg",
|
|
"tracks": [], # Empty tracks will force a fetch when clicked if handled
|
|
"type": "Playlist",
|
|
"creator": "Lofi Girl"
|
|
},
|
|
{
|
|
"id": "jazz_vibes",
|
|
"title": "Jazz Vibes",
|
|
"description": "Relaxing Jazz instrumental",
|
|
"cover_url": "https://i.ytimg.com/vi/DX7W7WUI6w8/hqdefault.jpg",
|
|
"tracks": [],
|
|
"type": "Playlist",
|
|
"creator": "Jazz Cafe"
|
|
}
|
|
]
|
|
}
|
|
|
|
self.cache.set(cache_key, response, ttl_seconds=3600)
|
|
return response
|
|
except Exception as e:
|
|
print(f"Home Error: {e}")
|
|
return {}
|
|
|
|
def get_trending(self):
|
|
# Dedicated trending endpoint
|
|
home = self.get_home()
|
|
if "Trending" in home and home["Trending"]:
|
|
return {"tracks": home["Trending"][0]["tracks"]}
|
|
return {"tracks": []}
|