""" PhimMoiChill Scraper - Extracts movie catalog and video sources Updated for phimmoichill.network """ import asyncio import aiohttp import ssl import re from bs4 import BeautifulSoup from dataclasses import dataclass from typing import List, Optional, Dict, Any from urllib.parse import urljoin, urlparse import json BASE_URL = "https://phimmoichill.network" @dataclass class RophimMovie: id: str title: str original_title: Optional[str] slug: str thumbnail: str backdrop: Optional[str] year: Optional[int] rating: Optional[str] duration: Optional[int] # in minutes quality: Optional[str] genre: Optional[str] description: Optional[str] category: str # movies, series, anime, etc cast: Optional[List[str]] = None director: Optional[str] = None country: Optional[str] = None episodes: Optional[List[Dict]] = None class RophimScraper: """Scraper for PhimMoiChill video catalog""" def __init__(self): self.session: Optional[aiohttp.ClientSession] = None self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'vi-VN,vi;q=0.9,en-US;q=0.8,en;q=0.7', 'Referer': BASE_URL } async def _get_session(self) -> aiohttp.ClientSession: if not self.session: # Disable SSL verification for macOS compatibility ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE connector = aiohttp.TCPConnector(ssl=ssl_context) self.session = aiohttp.ClientSession(headers=self.headers, connector=connector) return self.session async def close(self): if self.session: await self.session.close() self.session = None async def _fetch_html(self, url: str) -> str: """Fetch HTML content from URL""" session = await self._get_session() async with session.get(url) as response: if response.status == 200: return await response.text() raise Exception(f"Failed to fetch {url}: {response.status}") async def _fetch_json(self, url: str) -> Dict: """Fetch JSON from URL""" session = await self._get_session() async with session.get(url) as response: if response.status == 200: return await response.json() raise Exception(f"Failed to fetch JSON {url}: {response.status}") async def get_homepage_movies(self, page: int = 1, limit: int = 24) -> List[RophimMovie]: """Extract movies from homepage/feed Uses /danh-sach/phim-le endpoint for PhimMoiChill Pagination uses /page/N format (not ?page=N query param) """ if page == 1: url = f"{BASE_URL}/danh-sach/phim-le" else: url = f"{BASE_URL}/danh-sach/phim-le/page/{page}" html = await self._fetch_html(url) return self._parse_movie_grid(html, limit) async def get_category(self, category: str, page: int = 1, limit: int = 24) -> List[RophimMovie]: """Get movies by category with parallel page fetching""" # Determine how many pages we need to fetch to satisfy the limit (average ~40 items per page) # We'll fetch 2 pages in parallel if limit is high num_pages = 2 if limit > 40 else 1 async def fetch_page(p): try: if p == 1: url = f"{BASE_URL}/{category}" else: url = f"{BASE_URL}/{category}/page/{p}" html = await self._fetch_html(url) return self._parse_movie_grid(html, 100) except Exception: return [] # Start concurrent fetches page_tasks = [fetch_page(p) for p in range(page, page + num_pages)] results = await asyncio.gather(*page_tasks) # Combine results and remove duplicates movies = [] seen_slugs = set() for batch in results: for m in batch: if m.slug not in seen_slugs: movies.append(m) seen_slugs.add(m.slug) return movies[:limit] async def search(self, query: str, limit: int = 20) -> List[RophimMovie]: """Search for movies""" url = f"{BASE_URL}/tim-kiem?keyword={query}" html = await self._fetch_html(url) return self._parse_movie_grid(html, limit) async def get_movie_detail(self, slug: str) -> Optional[RophimMovie]: """Get detailed movie info including episodes""" url = f"{BASE_URL}/phim/{slug}" html = await self._fetch_html(url) return self._parse_movie_detail(html, slug) async def get_video_source(self, movie_slug: str, episode: int = 1) -> Optional[str]: """Extract video source URL for playback Returns direct m3u8 or MP4 URL """ # Try to get the player page player_url = f"{BASE_URL}/xem-phim/{movie_slug}/tap-{episode}" html = await self._fetch_html(player_url) # Look for embedded video sources sources = self._extract_video_sources(html) if sources: return sources[0] # Return best quality source return None def _parse_movie_grid(self, html: str, limit: int) -> List[RophimMovie]: """Parse movie cards from HTML grid using BeautifulSoup""" movies = [] soup = BeautifulSoup(html, 'lxml') # PhimMoiChill uses .myui-vodlist__box for each movie item movie_items = soup.select('.myui-vodlist__box') for item in movie_items[:limit]: try: # Find the main link with class myui-vodlist__thumb link = item.select_one('a.myui-vodlist__thumb') if not link: link = item.select_one('a[href*="/phim/"]') if not link: continue href = link.get('href', '') slug = self._extract_slug(href) if not slug: continue # Get title from link title attribute or h4.title title = link.get('title', '') if not title: title_elem = item.select_one('h4.title a, h4 a, .title a') if title_elem: title = title_elem.get_text(strip=True) else: title = slug.replace('-', ' ').title() # Get thumbnail from background-image style thumbnail = '' style = link.get('style', '') bg_match = re.search(r'url\(([^)]+)\)', style) if bg_match: thumbnail = bg_match.group(1).strip('"\'') else: # Fallback to img tag img = item.select_one('img') if img: thumbnail = img.get('src', '') or img.get('data-src', '') # Get quality badge (.pic-tag) quality_elem = item.select_one('.pic-tag, .quality, .label') quality = quality_elem.get_text(strip=True) if quality_elem else 'HD' # Get English title from description eng_title_elem = item.select_one('.text-muted, .myui-vodlist__detail p') original_title = eng_title_elem.get_text(strip=True) if eng_title_elem else None # Determine category from quality badge or episode count category = "movies" if quality and ('tập' in quality.lower() or 'ep' in quality.lower()): category = "series" # Extract year from original title year = None if original_title: year_match = re.search(r'\((\d{4})\)', original_title) if year_match: year = int(year_match.group(1)) movie = RophimMovie( id=slug, title=title, original_title=original_title, slug=slug, thumbnail=self._normalize_url(thumbnail), backdrop=None, year=year, rating=None, duration=None, quality=quality or 'HD', genre=None, description=None, category=category ) movies.append(movie) except Exception as e: # Skip problematic items continue return movies def _parse_movie_detail(self, html: str, slug: str) -> Optional[RophimMovie]: """Parse detailed movie page""" soup = BeautifulSoup(html, 'lxml') # Get title title_elem = soup.select_one('h1.movie-title, h1, .title') title = title_elem.get_text(strip=True) if title_elem else slug.replace('-', ' ').title() # Get description from meta tags (better quality) description = None meta_desc = soup.select_one('meta[name="description"], meta[property="og:description"]') if meta_desc: description = meta_desc.get('content', '').strip() # Fallback to page content if no meta description if not description: desc_elem = soup.select_one('.description, .content, .film-description, .entry-content') description = desc_elem.get_text(strip=True) if desc_elem else None # Get poster from meta og:image (high quality) poster = '' poster_meta = soup.select_one('meta[property="og:image"]') if poster_meta: poster = poster_meta.get('content', '') else: # Fallback to img tag poster_elem = soup.select_one('.movie-l-img img, .thumb img, img.img-responsive') poster = poster_elem.get('src', '') if poster_elem else '' # Get metadata from info sections director = None cast = [] country = None genres = [] year = None rating = None episodes_count = None # PhimMoiChill uses