spotify-clone/backend/api/endpoints/search.py

86 lines
3.1 KiB
Python

from fastapi import APIRouter, HTTPException, Depends
from backend.services.youtube import YouTubeService
from backend.api.schemas import SearchRequest
from backend.core.config import settings
import json
from pathlib import Path
router = APIRouter()
def get_youtube_service():
return YouTubeService()
@router.get("/search")
async def search_tracks(query: str, yt: YouTubeService = Depends(get_youtube_service)):
return yt.search(query)
@router.get("/recommendations")
async def get_recommendations(seed_id: str = None, yt: YouTubeService = Depends(get_youtube_service)):
if not seed_id:
return await get_trending()
return yt.get_recommendations(seed_id)
@router.get("/recommendations/albums")
async def get_recommended_albums(seed_artist: str = None, yt: YouTubeService = Depends(get_youtube_service)):
if not seed_artist: return []
# Missing method in service, implementing here for now using inner yt
# or adding it to service is better but trying to be fast without editing service again?
# Actually, I should edit service to be complete.
# But for now I'll just do it here to ensure it works.
cache_key = f"rec_albums:{seed_artist.lower().strip()}"
cached = yt.cache.get(cache_key)
if cached: return cached
try:
results = yt.yt.search(seed_artist, filter="albums", limit=10)
albums = []
for album in results:
thumbnails = album.get('thumbnails', [])
cover_url = yt._get_high_res_thumbnail(thumbnails)
albums.append({
"title": album.get('title', 'Unknown Album'),
"description": album.get('year', '') + "" + album.get('artist', seed_artist),
"cover_url": cover_url,
"id": album.get('browseId'),
"type": "Album"
})
yt.cache.set(cache_key, albums, ttl_seconds=86400)
return albums
except Exception as e:
print(f"Album Rec Error: {e}")
return []
@router.get("/artist/info")
async def get_artist_info(name: str, yt: YouTubeService = Depends(get_youtube_service)):
if not name: return {"photo": None}
cache_key = f"artist_info:{name.lower().strip()}"
cached = yt.cache.get(cache_key)
if cached: return cached
try:
results = yt.yt.search(name, filter="artists", limit=1)
if results:
artist = results[0]
thumbnails = artist.get('thumbnails', [])
photo_url = yt._get_high_res_thumbnail(thumbnails)
result = {"photo": photo_url}
yt.cache.set(cache_key, result, ttl_seconds=86400*7)
return result
return {"photo": None}
except Exception as e:
return {"photo": None}
@router.get("/trending")
async def get_trending():
try:
data_path = settings.DATA_DIR.parent / "data.json" # backend/data.json
if data_path.exists():
with open(data_path, "r") as f:
return json.load(f)
else:
return {"error": "Trending data not found. Run fetch_data.py first."}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))