kv-netflix/backend/database.py

98 lines
3.2 KiB
Python

"""
Database module - SQLAlchemy with SQLite for video metadata
"""
from datetime import datetime
from typing import Optional
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./streamflow.db")
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class Video(Base):
"""Video metadata model"""
__tablename__ = "videos"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(500), index=True)
description = Column(Text, nullable=True)
thumbnail = Column(String(1000), nullable=True)
source_url = Column(String(2000), unique=True, index=True)
duration = Column(Integer, default=0)
resolution = Column(String(20), nullable=True)
category = Column(String(100), index=True, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def init_db():
"""Initialize database tables"""
Base.metadata.create_all(bind=engine)
def get_db():
"""Dependency for getting database session"""
db = SessionLocal()
try:
yield db
finally:
db.close()
class VideoRepository:
"""CRUD operations for videos"""
def __init__(self, db):
self.db = db
def create(self, title: str, source_url: str, **kwargs) -> Video:
video = Video(title=title, source_url=source_url, **kwargs)
self.db.add(video)
self.db.commit()
self.db.refresh(video)
return video
def get_by_id(self, video_id: int) -> Optional[Video]:
return self.db.query(Video).filter(Video.id == video_id).first()
def get_by_url(self, source_url: str) -> Optional[Video]:
return self.db.query(Video).filter(Video.source_url == source_url).first()
def search(self, query: str, limit: int = 20) -> list[Video]:
return self.db.query(Video).filter(
Video.title.ilike(f"%{query}%")
).limit(limit).all()
def get_all(self, skip: int = 0, limit: int = 50) -> list[Video]:
return self.db.query(Video).offset(skip).limit(limit).all()
def get_by_category(self, category: str, limit: int = 20) -> list[Video]:
return self.db.query(Video).filter(
Video.category == category
).limit(limit).all()
def update(self, video_id: int, **kwargs) -> Optional[Video]:
video = self.get_by_id(video_id)
if video:
for key, value in kwargs.items():
setattr(video, key, value)
self.db.commit()
self.db.refresh(video)
return video
def delete(self, video_id: int) -> bool:
video = self.get_by_id(video_id)
if video:
self.db.delete(video)
self.db.commit()
return True
return False