correct application of playlists rules

This commit is contained in:
Gabriel Radureau
2025-10-16 17:18:58 +02:00
parent d2e2028610
commit 78313ffbef
17 changed files with 1098 additions and 247 deletions

View File

@@ -4,28 +4,57 @@ import db
from playlists.playlist_model import Playlist, RuleSet
from playlists import playlist_db
from views.label_views import video_filter_sidebar, video_list_view
from views.video_views import show_video_row
from models import Video
def playlist_manual_editor(playlist: Playlist):
"""Permet de gérer les vidéos d'une playlist manuelle."""
st.subheader(f"🎞️ Édition manuelle : {playlist.name}")
# Filtres standard pour explorer les vidéos
filters = video_filter_sidebar()
videos = video_list_view(filters, editable_labels=False, editable_difficulty=False)
existing_videos = db.get_videos_in_playlist(playlist.id)
existing_names = [v["file_name"] for v in existing_videos]
# Précharge les vidéos déjà incluses dans la playlist
playlist_video_ids = db.get_video_file_names_in_playlist(playlist.id)
for video in videos:
in_playlist = video.file_name in existing_names
button_label = " Ajouter" if not in_playlist else "❌ Retirer"
if st.button(button_label, key=f"toggle_{playlist.id}_{video.file_name}"):
if in_playlist:
db.remove_video_from_playlist(playlist.id, video.file_name)
st.success(f"Supprimée de {playlist.name}")
else:
db.add_video_to_playlist(playlist.id, video.file_name)
st.success(f"Ajoutée à {playlist.name}")
st.rerun()
# Récupère les vidéos filtrées
df_videos = db.search_videos(
label_names=filters["selected_labels"],
day_of_week=filters["day_filter"],
difficulty=filters["difficulty_filter"],
**filters
)
if df_videos.empty:
st.info("Aucune vidéo trouvée avec ces filtres.")
return
videos = [Video(**row) for _, row in df_videos.iterrows()]
st.write(f"🎬 {len(videos)} vidéo(s) disponibles.")
# Affiche les vidéos avec boutons dajout/retrait à droite
summary_map = summary_map = get_video_summary_cached()
for video in videos[:50]: # limite de sécurité
summary = summary_map.get(video.file_name, {"labels": [], "playlists": []})
preselected = summary["labels"]
playlists = summary["playlists"]
show_video_row(
video,
preselected_labels=preselected,
editable_labels=False,
editable_difficulty=False,
playlist=playlist,
playlist_video_ids=playlist_video_ids,
video_playlists=playlists
)
if st.button("📦 Charger plus"):
st.session_state.video_page += 1
st.rerun()
@st.cache_data(ttl=30)
def get_video_summary_cached():
return playlist_db.load_video_summary_map()
def playlist_dynamic_editor(playlist: Playlist):
"""Édite les règles d'une playlist dynamique et affiche le rendu en temps réel."""
@@ -33,36 +62,77 @@ def playlist_dynamic_editor(playlist: Playlist):
rules = playlist.rules or RuleSet()
labels = db.load_labels()
playlists = [p.name for p in playlist_db.load_all_playlists()]
rules.include_labels = st.multiselect("Inclure labels", labels, default=rules.include_labels)
rules.exclude_labels = st.multiselect("Exclure labels", labels, default=rules.exclude_labels)
rules.include_playlists = st.multiselect("Inclure playlists", playlists, default=rules.include_playlists)
rules.exclude_playlists = st.multiselect("Exclure playlists", playlists, default=rules.exclude_playlists)
all_playlists = playlist_db.load_all_playlists()
name_to_id = {p.name: p.id for p in all_playlists}
id_to_name = {p.id: p.name for p in all_playlists}
default_include = [id_to_name.get(pid) for pid in rules.include_playlists if pid in id_to_name]
default_exclude = [id_to_name.get(pid) for pid in rules.exclude_playlists if pid in id_to_name]
selected_includes = st.multiselect("Inclure playlists", id_to_name.values(), default=default_include)
selected_excludes = st.multiselect("Exclure playlists", id_to_name.values(), default=default_exclude)
rules.include_playlists = [name_to_id[name] for name in selected_includes]
rules.exclude_playlists = [name_to_id[name] for name in selected_excludes]
col1, col2 = st.columns(2)
with col1:
rules.date_after = st.date_input("📅 Après le", value=rules.date_after or None)
use_delta = st.checkbox("⏱️ Utiliser un delta de jours", value=bool(rules.date_delta_days))
if use_delta:
rules.date_delta_days = st.number_input(
"Nombre de jours depuis aujourdhui (négatif pour passé)",
value=rules.date_delta_days or -15
)
rules.date_after = None
rules.date_before = None
else:
rules.date_after = st.date_input("📅 Après le", value=rules.date_after or None)
rules.date_before = st.date_input("📅 Avant le", value=rules.date_before or None)
rules.date_delta_days = None
with col2:
rules.date_before = st.date_input("📅 Avant le", value=rules.date_before or None)
rules.logic = st.radio("Logique de combinaison", ["AND", "OR"], index=0 if rules.logic == "AND" else 1)
rules.logic = st.radio("Logique de combinaison", ["AND", "OR"], index=0 if rules.logic == "AND" else 1)
rules.label_logic = st.radio("Logique de combinaison entre labels", ["AND", "OR"], index=0 if rules.label_logic == "AND" else 1)
# --- Enregistrement ---
if st.button("💾 Enregistrer les règles"):
playlist.rules = rules
playlist.save()
st.success("Règles mises à jour ✅")
st.rerun()
(st.rerun if hasattr(st, "rerun") else st.experimental_rerun)()
st.markdown("---")
st.subheader("🧩 Rendu de la playlist")
rows = playlist_db.get_videos_for_playlist(playlist)
if not rows:
st.info("Aucune vidéo ne correspond aux règles actuelles.")
return
videos = [Video(**row) for row in rows]
st.write(f"{len(videos)} vidéo(s) trouvée(s).")
from views.video_views import show_video_row
for v in videos[:30]:
show_video_row(v, db.load_video_labels(v.file_name), editable_labels=False, editable_difficulty=False)
st.write(f"🎬 {len(videos)} vidéo(s) trouvée(s).")
playlist_video_ids = db.get_video_file_names_in_playlist(playlist.id)
summary_map = summary_map = get_video_summary_cached()
for v in videos[:50]:
summary = summary_map.get(v.file_name, {"labels": [], "playlists": []})
preselected = summary["labels"]
playlists = summary["playlists"]
show_video_row(
v,
preselected_labels=preselected,
editable_labels=False,
editable_difficulty=False,
playlist=playlist,
playlist_video_ids=playlist_video_ids,
video_playlists=playlists
)
if st.button("📦 Charger plus"):
st.session_state.video_page += 1
st.rerun()

View File

@@ -1,7 +1,9 @@
# playlists/playlist_db.py
import db
import json
import pandas as pd
from playlists.playlist_model import Playlist, RuleSet
from playlists.sql_builder import build_sql_from_rules
def load_all_playlists():
"""Retourne une liste de Playlist (Pydantic) — tolérant aux rules_json nuls / vides."""
@@ -28,12 +30,16 @@ def load_all_playlists():
row_dict = dict(zip(cols, row))
try:
created_at = str(row["created_at"]) if row["created_at"] is not None else None
updated_at = str(row["updated_at"]) if row["updated_at"] is not None else None
pl = Playlist(
id=row_dict.get("id"),
name=row_dict.get("name") or "",
description=row_dict.get("description") or "",
type=row_dict.get("type") or "manual",
rules=row_dict.get("rules_json")
id=row["id"],
name=row["name"],
description=row["description"],
type=row["type"],
rules=row["rules_json"],
created_at=created_at,
updated_at=updated_at
)
playlists.append(pl)
except Exception as e:
@@ -47,50 +53,31 @@ def delete_playlist(playlist_id: int):
conn.execute("DELETE FROM video_playlists WHERE playlist_id = ?", (playlist_id,))
conn.commit()
def get_videos_for_playlist(playlist: Playlist):
"""Retourne les vidéos selon le type"""
with db.get_conn() as conn:
if playlist.type == "manual":
q = """
def get_videos_for_playlist(playlist):
"""Retourne les vidéos correspondant aux règles d'une playlist dynamique."""
if playlist.type == "manual":
with db.get_conn() as conn:
query = """
SELECT v.*
FROM videos v
JOIN video_playlists vp ON vp.video_file_name = v.file_name
WHERE vp.playlist_id = ?
ORDER BY vp.position
"""
return conn.execute(q, (playlist.id,)).fetchall()
else:
rules = playlist.rules
clauses = []
params = []
return conn.execute(query, (playlist.id,)).fetchall()
else:
sql, params = build_sql_from_rules(playlist.rules)
with db.get_conn() as conn:
return conn.execute(sql, params).fetchall()
if rules.include_labels:
placeholders = ",".join("?" * len(rules.include_labels))
clauses.append(f"v.file_name IN (SELECT vl.video_file_name FROM video_labels vl JOIN labels l ON l.id=vl.label_id WHERE l.name IN ({placeholders}))")
params += rules.include_labels
if rules.exclude_labels:
placeholders = ",".join("?" * len(rules.exclude_labels))
clauses.append(f"v.file_name NOT IN (SELECT vl.video_file_name FROM video_labels vl JOIN labels l ON l.id=vl.label_id WHERE l.name IN ({placeholders}))")
params += rules.exclude_labels
if rules.include_playlists:
placeholders = ",".join("?" * len(rules.include_playlists))
clauses.append(f"v.file_name IN (SELECT vp.video_file_name FROM video_playlists vp JOIN playlists p ON vp.playlist_id=p.id WHERE p.name IN ({placeholders}))")
params += rules.include_playlists
if rules.exclude_playlists:
placeholders = ",".join("?" * len(rules.exclude_playlists))
clauses.append(f"v.file_name NOT IN (SELECT vp.video_file_name FROM video_playlists vp JOIN playlists p ON vp.playlist_id=p.id WHERE p.name IN ({placeholders}))")
params += rules.exclude_playlists
if rules.date_after:
clauses.append("v.record_datetime >= ?")
params.append(rules.date_after)
if rules.date_before:
clauses.append("v.record_datetime <= ?")
params.append(rules.date_before)
where_clause = f" {' AND ' if rules.logic == 'AND' else ' OR '} ".join(clauses) if clauses else "1=1"
q = f"SELECT v.* FROM videos v WHERE {where_clause} ORDER BY v.record_datetime DESC"
return conn.execute(q, params).fetchall()
def load_video_summary_map():
"""Retourne un dict {file_name: {'labels': [...], 'playlists': [...]}} depuis la vue video_summary."""
with db.get_conn() as conn:
df = pd.read_sql_query("SELECT file_name, labels, playlists FROM video_summary", conn)
summary = {}
for _, row in df.iterrows():
summary[row["file_name"]] = {
"labels": row["labels"].split(",") if row["labels"] else [],
"playlists": row["playlists"].split(",") if row["playlists"] else [],
}
return summary

View File

@@ -1,25 +1,67 @@
# playlists/playlist_model.py
from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, Field, validator, field_validator
from typing import List, Optional, Literal
from datetime import date, datetime
import json
import db
from typing import Optional, List, Literal
from pydantic import BaseModel, Field
import json
class RuleSet(BaseModel):
include_labels: List[str] = []
exclude_labels: List[str] = []
include_playlists: List[str] = []
exclude_playlists: List[str] = []
include_playlists: List[int] = []
exclude_playlists: List[int] = []
date_after: Optional[str] = None
date_before: Optional[str] = None
date_delta_days: Optional[int] = None
difficulty: Optional[str] = None
day_of_week: Optional[str] = None
address_keyword: Optional[str] = None
label_logic: Literal["AND", "OR"] = "AND"
logic: Literal["AND", "OR"] = "AND"
# --- Normalisation des dates (entrée) ---
@field_validator("date_after", "date_before", mode="before")
def normalize_date(cls, v):
"""Convertit automatiquement date/datetime en str ISO avant stockage."""
if isinstance(v, (date, datetime)):
return v.isoformat()
return v
def model_dump(self, *args, **kwargs):
"""Force la sortie JSON-safe (dates converties en str)."""
data = super().model_dump(*args, **kwargs)
for key in ["date_after", "date_before"]:
v = data.get(key)
if isinstance(v, (date, datetime)):
data[key] = v.isoformat()
return data
@validator("date_after", "date_before", pre=True, always=True)
def convert_date(cls, v):
"""Convertit automatiquement les objets date/datetime en ISO string."""
if isinstance(v, (date, datetime)):
return v.isoformat()
return v
def dict(self, *args, **kwargs):
"""Sassure que toutes les dates sont des chaînes sérialisables."""
data = super().dict(*args, **kwargs)
for key in ["date_after", "date_before"]:
v = data.get(key)
if isinstance(v, (date, datetime)):
data[key] = v.isoformat()
return data
def to_json(self) -> str:
return json.dumps(self.dict(), ensure_ascii=False, indent=2)
@classmethod
def from_json(cls, raw):
"""Accepte None, str vide ou dict JSON."""
if not raw:
return cls()
if isinstance(raw, dict):

View File

@@ -21,14 +21,31 @@ def main():
# Appliquer les filtres
filtered = []
for p in playlists:
# filtrage texte
if search_term.lower() not in p.name.lower() and search_term.lower() not in (p.description or "").lower():
continue
if date_filter and datetime.fromisoformat(p.created_at) < datetime.combine(date_filter, datetime.min.time()):
continue
# filtrage date de création
if date_filter:
created = p.created_at
if isinstance(created, datetime):
created_dt = created
elif isinstance(created, (str, bytes)):
try:
created_dt = datetime.fromisoformat(created)
except ValueError:
continue # ignore invalid date
else:
continue
if created_dt < datetime.combine(date_filter, datetime.min.time()):
continue
filtered.append(p)
# --- Sélection ou création ---
names = ["( Nouvelle playlist)"] + [p.name for p in filtered]
if "new_playlist_name" in st.session_state:
st.session_state["playlist_select"] = st.session_state.pop("new_playlist_name")
selected_name = st.selectbox("Sélectionnez une playlist", names, key="playlist_select")
# --- Création ---
@@ -43,11 +60,8 @@ def main():
else:
pl = Playlist(name=name.strip(), description=desc, type=type_choice, rules=RuleSet())
pl.save()
st.session_state["playlist_select"] = pl.name # ✅ sélectionne automatiquement
if hasattr(st, "rerun"):
st.rerun()
else:
st.experimental_rerun()
st.session_state["new_playlist_name"] = pl.name # on stocke temporairement
st.rerun()
return
# --- Mode édition ---

View File

@@ -0,0 +1,109 @@
# playlists/sql_builder.py
from typing import Tuple, List
from playlists.playlist_model import RuleSet
from datetime import datetime, timedelta
def build_sql_from_rules(rules: RuleSet) -> Tuple[str, List]:
"""
Construit une requête SQL complète (SELECT * FROM videos ...)
en fonction d'un RuleSet.
Retourne (sql_query, params).
"""
where = ["1=1"]
params = []
# --- DATES (delta prioritaire) ---
if rules.date_delta_days is not None:
try:
delta_days = int(rules.date_delta_days)
date_after = (datetime.now() + timedelta(days=delta_days)).strftime("%Y-%m-%d")
where.append("record_datetime >= ?")
params.append(date_after)
except (ValueError, TypeError) as e:
print(f"⚠️ [SQL Builder] Delta invalide ({rules.date_delta_days!r}) : {e}")
else:
if rules.date_after:
where.append("record_datetime >= ?")
params.append(rules.date_after)
if rules.date_before:
where.append("record_datetime <= ?")
params.append(rules.date_before)
# --- LABELS ---
if rules.include_labels:
placeholders = ",".join("?" * len(rules.include_labels))
if rules.label_logic == "AND":
where.append(f"""
file_name IN (
SELECT vl.video_file_name
FROM video_labels vl
JOIN labels l ON l.id = vl.label_id
WHERE l.name IN ({placeholders})
GROUP BY vl.video_file_name
HAVING COUNT(DISTINCT l.name) = {len(rules.include_labels)}
)
""")
else:
where.append(f"""
file_name IN (
SELECT vl.video_file_name
FROM video_labels vl
JOIN labels l ON l.id = vl.label_id
WHERE l.name IN ({placeholders})
)
""")
params.extend(rules.include_labels)
if rules.exclude_labels:
placeholders = ",".join("?" * len(rules.exclude_labels))
where.append(f"""
file_name NOT IN (
SELECT vl.video_file_name
FROM video_labels vl
JOIN labels l ON l.id = vl.label_id
WHERE l.name IN ({placeholders})
)
""")
params.extend(rules.exclude_labels)
# --- PLAYLISTS ---
if rules.include_playlists:
placeholders = ",".join("?" * len(rules.include_playlists))
where.append(f"""
file_name IN (
SELECT vp.video_file_name
FROM video_playlists vp
JOIN playlists p ON p.id = vp.playlist_id
WHERE p.name IN ({placeholders})
)
""")
params.extend(rules.include_playlists)
if rules.exclude_playlists:
placeholders = ",".join("?" * len(rules.exclude_playlists))
where.append(f"""
file_name NOT IN (
SELECT vp.video_file_name
FROM video_playlists vp
JOIN playlists p ON p.id = vp.playlist_id
WHERE p.name IN ({placeholders})
)
""")
params.extend(rules.exclude_playlists)
# --- DIFFICULTÉ ---
if rules.difficulty and rules.difficulty != "Tous":
where.append("difficulty_level = ?")
params.append(rules.difficulty)
# --- JOUR ---
if rules.day_of_week:
where.append("day_of_week = ?")
params.append(rules.day_of_week)
# --- ADRESSE ---
if rules.address_keyword:
where.append("address NOT LIKE '%unknown%' AND address LIKE ?")
params.append(f"%{rules.address_keyword}%")
sql = f"SELECT * FROM videos WHERE {' AND '.join(where)} ORDER BY record_datetime DESC"
return sql, params