spotify-clone/backend/services/youtube.py

411 lines
18 KiB
Python

import re
import json
import requests
import yt_dlp
from ytmusicapi import YTMusic
from backend.core.cache import CacheManager
from backend.core.config import settings
from backend.core.exceptions import ResourceNotFound, ExternalAPIError
class YouTubeService:
def __init__(self):
self.yt = YTMusic()
self.cache = CacheManager(str(settings.CACHE_DIR))
def _get_high_res_thumbnail(self, thumbnails: list) -> str:
if not thumbnails:
return "https://placehold.co/300x300"
best_url = thumbnails[-1]['url']
if "googleusercontent.com" in best_url or "ggpht.com" in best_url:
if "w" in best_url and "h" in best_url:
best_url = re.sub(r'=w\d+-h\d+', '=w544-h544', best_url)
return best_url
def _extract_artist_names(self, track: dict) -> str:
artists = track.get('artists') or []
if isinstance(artists, list):
names = []
for a in artists:
if isinstance(a, dict):
names.append(a.get('name', 'Unknown'))
elif isinstance(a, str):
names.append(a)
return ", ".join(names) if names else "Unknown Artist"
return "Unknown Artist"
def _extract_album_name(self, track: dict, default="Single") -> str:
album = track.get('album')
if isinstance(album, dict):
return album.get('name', default)
if isinstance(album, str):
return album
return default
def _clean_title(self, title: str) -> str:
if not title: return "Playlist"
title = title.encode('ascii', 'ignore').decode('ascii')
spam_words = ["Playlist", "Music Chart", "Full SPOTIFY Video", "Updated Weekly", "Official", "Video"]
for word in spam_words:
title = re.sub(word, "", title, flags=re.IGNORECASE)
title = re.sub(r'\s+', ' ', title).strip()
title = title.strip('*- ')
return title
def _clean_description(self, desc: str) -> str:
if not desc: return ""
desc = re.sub(r'http\S+', '', desc)
desc = re.sub(r'[*_=]{3,}', '', desc)
if len(desc) > 300:
desc = desc[:300] + "..."
return desc.strip()
def get_playlist(self, id: str):
cache_key = f"playlist:{id}"
cached_playlist = self.cache.get(cache_key)
if cached_playlist:
return cached_playlist
try:
playlist_data = None
is_album = False
# Try as Album first if MPREb ID
if id.startswith("MPREb"):
try:
playlist_data = self.yt.get_album(id)
is_album = True
except:
pass
if not playlist_data:
try:
playlist_data = self.yt.get_playlist(id, limit=100)
except Exception:
if not is_album:
playlist_data = self.yt.get_album(id)
is_album = True
formatted_tracks = []
if 'tracks' in playlist_data:
for track in playlist_data['tracks']:
formatted_tracks.append({
"title": track.get('title', 'Unknown Title'),
"artist": self._extract_artist_names(track),
"album": self._extract_album_name(track, playlist_data.get('title', 'Single')),
"duration": track.get('duration_seconds', track.get('length_seconds', 0)),
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails', []) or (playlist_data.get('thumbnails', []) if is_album else [])),
"id": track.get('videoId'),
"url": f"https://music.youtube.com/watch?v={track.get('videoId')}"
})
p_cover = self._get_high_res_thumbnail(playlist_data.get('thumbnails', []))
author = "YouTube Music"
if is_album:
artists = playlist_data.get('artists', [])
names = [a.get('name', 'Unknown') if isinstance(a, dict) else a for a in artists]
author = ", ".join(names)
else:
author_data = playlist_data.get('author', {})
author = author_data.get('name', 'YouTube Music') if isinstance(author_data, dict) else str(author_data)
formatted_playlist = {
"id": playlist_data.get('browseId', playlist_data.get('id')),
"title": self._clean_title(playlist_data.get('title', 'Unknown')),
"description": self._clean_description(playlist_data.get('description', '')),
"author": author,
"cover_url": p_cover,
"tracks": formatted_tracks
}
self.cache.set(cache_key, formatted_playlist, ttl_seconds=3600)
return formatted_playlist
except Exception as e:
print(f"Playlist Fetch Error: {e}")
raise ResourceNotFound(f"Playlist {id} not found")
def search(self, query: str):
if not query: return []
cache_key = f"search:{query.lower().strip()}"
cached = self.cache.get(cache_key)
if cached: return cached
try:
results = self.yt.search(query, filter="songs", limit=20)
tracks = []
for track in results:
tracks.append({
"title": track.get('title', 'Unknown Title'),
"artist": self._extract_artist_names(track),
"album": self._extract_album_name(track, "Single"),
"duration": track.get('duration_seconds', 0),
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails', [])),
"id": track.get('videoId'),
"url": f"https://music.youtube.com/watch?v={track.get('videoId')}"
})
response = {"tracks": tracks}
self.cache.set(cache_key, response, ttl_seconds=86400)
return response
except Exception as e:
print(f"Search Error: {e}")
raise ExternalAPIError(str(e))
def get_stream_url(self, id: str):
cache_key = f"stream:{id}"
cached = self.cache.get(cache_key)
if cached: return cached
# Strategy: Try versatile clients in order
clients_to_try = [
# 1. iOS (often best for audio)
{'extractor_args': {'youtube': {'player_client': ['ios']}}},
# 2. Android (robust)
{'extractor_args': {'youtube': {'player_client': ['android']}}},
# 3. Web (standard, prone to 403)
{'extractor_args': {'youtube': {'player_client': ['web']}}},
# 4. TV (sometimes works for age-gated)
{'extractor_args': {'youtube': {'player_client': ['tv']}}},
]
last_error = None
for client_config in clients_to_try:
try:
url = f"https://www.youtube.com/watch?v={id}"
ydl_opts = {
'format': 'bestaudio[ext=m4a]/best[ext=mp4]/best',
'quiet': True,
'noplaylist': True,
'force_ipv4': True,
}
ydl_opts.update(client_config)
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
stream_url = info.get('url')
if stream_url:
headers = info.get('http_headers', {})
result = {
"url": stream_url,
"headers": headers
}
self.cache.set(cache_key, result, ttl_seconds=3600)
return result
except Exception as e:
last_error = e
print(f"Fetch failed with client {client_config}: {e}")
continue
# If all fail
print(f"All clients failed for {id}. Last error: {last_error}")
raise ExternalAPIError(str(last_error))
def invalidate_stream_cache(self, id: str):
cache_key = f"stream:{id}"
path = self.cache._get_path(cache_key)
if path.exists():
try:
path.unlink()
except:
pass
def get_recommendations(self, seed_id: str):
if not seed_id: return []
cache_key = f"rec:{seed_id}"
cached = self.cache.get(cache_key)
if cached: return cached
try:
watch_playlist = self.yt.get_watch_playlist(videoId=seed_id, limit=20)
tracks = []
if 'tracks' in watch_playlist:
seen_ids = {seed_id}
for track in watch_playlist['tracks']:
t_id = track.get('videoId')
if not t_id or t_id in seen_ids: continue
seen_ids.add(t_id)
tracks.append({
"title": track.get('title', 'Unknown Title'),
"artist": self._extract_artist_names(track),
"album": self._extract_album_name(track, "Single"),
"duration": track.get('length_seconds', track.get('duration_seconds', 0)),
"cover_url": self._get_high_res_thumbnail(track.get('thumbnails') or track.get('thumbnail') or []),
"id": t_id,
"url": f"https://music.youtube.com/watch?v={t_id}"
})
response = {"tracks": tracks}
self.cache.set(cache_key, response, ttl_seconds=3600)
return response
except Exception as e:
print(f"Rec Error: {e}")
return {"tracks": []}
def get_home(self):
cache_key = "home:browse"
cached = self.cache.get(cache_key)
if cached: return cached
try:
# ytmusicapi `get_home` returns complex Sections
# For simplicity, we'll fetch charts and new releases as "Browse" content
# Prepare trending songs
trending_songs = []
try:
# Get charts
trending = self.yt.get_charts(country='VN')
if 'videos' in trending and trending['videos']:
for item in trending['videos']['items']:
# Extract high-res thumbnail
thumbnails = item.get('thumbnails', [])
cover_url = thumbnails[-1]['url'] if thumbnails else ""
trending_songs.append({
"id": item.get('videoId'),
"title": item.get('title'),
"artist": item.get('artists', [{'name': 'Unknown'}])[0]['name'],
"album": "Trending", # Charts don't usually have album info, stick to generic
"cover_url": cover_url,
"duration": 0 # Charts might not have duration
})
except Exception as e:
print(f"Error fetching trending: {e}")
# --- FALLBACK IF API FAILS OR RETURNS EMPTY ---
if not trending_songs:
print("Using HARDCODED fallback for trending songs.")
trending_songs = [
{
"id": "Da4P2uT4ikU", "title": "Angel Baby", "artist": "Troye Sivan", "album": "Angel Baby",
"cover_url": "https://lh3.googleusercontent.com/Fj_JpwC1QGEFkH3y973Xv7w7tqVw5C_V-1o7g1gX_c4X_1o7g1gX_c4X_1o7g1=w544-h544-l90-rj"
},
{
"id": "fJ9rUzIMcZQ", "title": "Bohemian Rhapsody", "artist": "Queen", "album": "A Night at the Opera",
"cover_url": "https://lh3.googleusercontent.com/yFj_JpwC1QGEFkH3y973Xv7w7tqVw5C_V-1o7g1gX_c4X_1o7g1gX_c4X_1o7g1=w544-h544-l90-rj"
},
{
"id": "4NRXx6U8ABQ", "title": "Blinding Lights", "artist": "The Weeknd", "album": "After Hours",
"cover_url": "https://lh3.googleusercontent.com/Fj_JpwC1QGEFkH3y973Xv7w7tqVw5C_V-1o7g1gX_c4X_1o7g1gX_c4X_1o7g1=w544-h544-l90-rj"
},
{
"id": "OPf0YbXqDm0", "title": "Uptown Funk", "artist": "Mark Ronson", "album": "Uptown Special",
"cover_url": "https://lh3.googleusercontent.com/Fj_JpwC1QGEFkH3y973Xv7w7tqVw5C_V-1o7g1gX_c4X_1o7g1gX_c4X_1o7g1=w544-h544-l90-rj"
}
]
# -----------------------------------------------
# New Releases (using search for "New Songs" as proxy or actual new releases if supported)
# Actually ytmusicapi has get_new_releases usually under get_charts or specific calls
# We'll use get_charts "trending" for "Trending" category
# And maybe "Top Songs" for "Top Hits"
# 1. Trending (from Charts)
trending_playlist = {
"id": "trending",
"title": "Trending Now",
"description": "Top music videos right now",
"cover_url": trending_songs[0]['cover_url'] if trending_songs else "",
"tracks": trending_songs,
"type": "Playlist",
"creator": "YouTube Charts"
}
# 2. Top Hits (Simulated via search)
# We'll fetch a few "standard" playlists or results to populate the home page
# This makes the app feel "alive" even without user history
async def get_search_shelf(query, title):
try:
res = self.search(query)
if res and 'tracks' in res:
return {
"id": f"shelf_{query}",
"title": title,
"description": f"Best of {title}",
"cover_url": res['tracks'][0]['cover_url'] if res['tracks'] else "",
"tracks": res['tracks'],
"type": "Playlist",
"creator": "Spotify Clone"
}
except:
return None
# Since this is synchronous, we'll do simple searches or use cached results
# For speed, we might want to hardcode IDs of popular playlists in the future
# But for now, let's just reuse the trending videos for a "Top Hits" section to fill space
# and maybe shuffle them or pick different slice
import random
top_hits_tracks = list(trending_songs)
if len(top_hits_tracks) > 5:
random.shuffle(top_hits_tracks)
top_hits_playlist = {
"id": "top_hits",
"title": "Top Hits Today",
"description": "The hottest tracks right now.",
"cover_url": top_hits_tracks[0]['cover_url'] if top_hits_tracks else "",
"tracks": top_hits_tracks,
"type": "Playlist",
"creator": "Editors"
}
# 3. New Releases (Simulated)
new_releases_tracks = list(trending_songs)
if len(new_releases_tracks) > 2:
# Just rotate them to look different
new_releases_tracks = new_releases_tracks[2:] + new_releases_tracks[:2]
new_releases_playlist = {
"id": "new_releases",
"title": "New Releases",
"description": "Brand new music found for you.",
"cover_url": new_releases_tracks[0]['cover_url'] if new_releases_tracks else "",
"tracks": new_releases_tracks,
"type": "Playlist",
"creator": "Spotify Clone"
}
response = {
"Trending": [trending_playlist],
"Top Hits": [top_hits_playlist],
"New Releases": [new_releases_playlist],
"Focus & Chill": [
{
"id": "lofi_beats",
"title": "Lofi Beats",
"description": "Chill beats to study/relax to",
"cover_url": "https://i.ytimg.com/vi/jfKfPfyJRdk/hqdefault.jpg",
"tracks": [], # Empty tracks will force a fetch when clicked if handled
"type": "Playlist",
"creator": "Lofi Girl"
},
{
"id": "jazz_vibes",
"title": "Jazz Vibes",
"description": "Relaxing Jazz instrumental",
"cover_url": "https://i.ytimg.com/vi/DX7W7WUI6w8/hqdefault.jpg",
"tracks": [],
"type": "Playlist",
"creator": "Jazz Cafe"
}
]
}
self.cache.set(cache_key, response, ttl_seconds=3600)
return response
except Exception as e:
print(f"Home Error: {e}")
return {}
def get_trending(self):
# Dedicated trending endpoint
home = self.get_home()
if "Trending" in home and home["Trending"]:
return {"tracks": home["Trending"][0]["tracks"]}
return {"tracks": []}