83 lines
3.7 KiB
Python
83 lines
3.7 KiB
Python
# playlists/playlist_db.py
|
|
import db
|
|
import json
|
|
import pandas as pd
|
|
from playlists.playlist_model import Playlist, RuleSet
|
|
from playlists.sql_builder import build_sql_from_rules
|
|
|
|
def load_all_playlists():
|
|
"""Retourne une liste de Playlist (Pydantic) — tolérant aux rules_json nuls / vides."""
|
|
with db.get_conn() as conn:
|
|
# s'assurer que les lignes sont accessibles par nom (si get_conn ne l'a pas fait)
|
|
try:
|
|
conn.row_factory = conn.row_factory # no-op si déjà réglé
|
|
except Exception:
|
|
pass
|
|
|
|
rows = conn.execute("SELECT * FROM playlists ORDER BY created_at DESC").fetchall()
|
|
playlists = []
|
|
for row in rows:
|
|
# si row est sqlite3.Row, on peut accéder par nom, sinon c'est un tuple et on mappe par index
|
|
if hasattr(row, "__getitem__") and isinstance(row, dict) is False and getattr(row, "keys", None):
|
|
# sqlite3.Row behaves like mapping
|
|
row_dict = {k: row[k] for k in row.keys()}
|
|
elif isinstance(row, dict):
|
|
row_dict = row
|
|
else:
|
|
# fallback: convert tuple -> dict using cursor.description
|
|
# but here we assume conn.row_factory set in db.get_conn; keep robust fallback
|
|
cols = [d[0] for d in conn.execute("PRAGMA table_info(playlists)").fetchall()]
|
|
row_dict = dict(zip(cols, row))
|
|
|
|
try:
|
|
created_at = str(row["created_at"]) if row["created_at"] is not None else None
|
|
updated_at = str(row["updated_at"]) if row["updated_at"] is not None else None
|
|
pl = Playlist(
|
|
id=row["id"],
|
|
name=row["name"],
|
|
description=row["description"],
|
|
type=row["type"],
|
|
rules=row["rules_json"],
|
|
created_at=created_at,
|
|
updated_at=updated_at
|
|
)
|
|
playlists.append(pl)
|
|
except Exception as e:
|
|
# Ne bloque pas toute la lecture : logue et passe à la suivante
|
|
print(f"⚠️ Ignored invalid playlist row (id={row_dict.get('id')}, name={row_dict.get('name')}): {e}")
|
|
return playlists
|
|
|
|
def delete_playlist(playlist_id: int):
|
|
with db.get_conn() as conn:
|
|
conn.execute("DELETE FROM playlists WHERE id = ?", (playlist_id,))
|
|
conn.execute("DELETE FROM video_playlists WHERE playlist_id = ?", (playlist_id,))
|
|
conn.commit()
|
|
|
|
def get_videos_for_playlist(playlist):
|
|
"""Retourne les vidéos correspondant aux règles d'une playlist dynamique."""
|
|
if playlist.type == "manual":
|
|
with db.get_conn() as conn:
|
|
query = """
|
|
SELECT v.*
|
|
FROM videos v
|
|
JOIN video_playlists vp ON vp.video_file_name = v.file_name
|
|
WHERE vp.playlist_id = ?
|
|
ORDER BY vp.position
|
|
"""
|
|
return conn.execute(query, (playlist.id,)).fetchall()
|
|
else:
|
|
sql, params = build_sql_from_rules(playlist.rules)
|
|
with db.get_conn() as conn:
|
|
return conn.execute(sql, params).fetchall()
|
|
|
|
def load_video_summary_map():
|
|
"""Retourne un dict {file_name: {'labels': [...], 'playlists': [...]}} depuis la vue video_summary."""
|
|
with db.get_conn() as conn:
|
|
df = pd.read_sql_query("SELECT file_name, labels, playlists FROM video_summary", conn)
|
|
summary = {}
|
|
for _, row in df.iterrows():
|
|
summary[row["file_name"]] = {
|
|
"labels": row["labels"].split(",") if row["labels"] else [],
|
|
"playlists": row["playlists"].split(",") if row["playlists"] else [],
|
|
}
|
|
return summary |