From b891d30c2f24848381e7612cacc0d58f867dd42f Mon Sep 17 00:00:00 2001 From: Hitomatito Date: Sat, 13 Jun 2026 02:41:55 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20integraci=C3=B3n=20completa=20de=20Spot?= =?UTF-8?q?ify=20como=20fuente=20primaria?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Búsqueda y autocomplete solo Spotify (sin YouTube) - Resolución de álbumes y tracks vía Client Credentials - Autoplay inteligente: mismo artista + covers de distintos artistas - nowplaying enriquecido: álbum, fecha, popularidad, géneros, explícito - Cache en memoria para resultados Lavalink de Spotify - Reintento automático en fallos de Lavalink con URLs Spotify - Eliminadas funciones muertas: search_youtube_best_match, build_status_embed - Refactor: utils/spotify_resolver, utils/env_utils - Autocomplete con cache local para respuestas instantáneas --- .env.docker.example | 2 + .env.example | 2 + Dockerfile.lavalink | 11 +- bootstrap.py | 43 +-- cogs/music.py | 698 +++++++++++++++++++++++++++----------- config.py | 3 + docker-compose.yml | 4 + lavalink/application.yml | 21 +- lavalink/entrypoint.sh | 7 + main.py | 1 - run.py | 42 +-- utils/embeds.py | 12 +- utils/env_utils.py | 43 +++ utils/search.py | 63 ++-- utils/spotify_resolver.py | 299 ++++++++++++++++ 15 files changed, 938 insertions(+), 313 deletions(-) create mode 100644 lavalink/entrypoint.sh create mode 100644 utils/env_utils.py create mode 100644 utils/spotify_resolver.py diff --git a/.env.docker.example b/.env.docker.example index 481ad0c..a6dee5f 100644 --- a/.env.docker.example +++ b/.env.docker.example @@ -1,3 +1,5 @@ BOT_TOKEN= LAVALINK_PASSWORD= LAVALINK_HOST_PORT=2333 +SPOTIFY_CLIENT_ID= +SPOTIFY_CLIENT_SECRET= diff --git a/.env.example b/.env.example index f1e2a24..8885cf6 100644 --- a/.env.example +++ b/.env.example @@ -2,3 +2,5 @@ BOT_TOKEN= LAVALINK_HOST=localhost LAVALINK_PORT=2333 LAVALINK_PASSWORD= +SPOTIFY_CLIENT_ID= +SPOTIFY_CLIENT_SECRET= diff --git a/Dockerfile.lavalink b/Dockerfile.lavalink index a09c0a8..7765e8a 100644 --- a/Dockerfile.lavalink +++ b/Dockerfile.lavalink @@ -3,11 +3,12 @@ FROM eclipse-temurin:21-jre-jammy ARG LAVALINK_VERSION=4.2.2 ARG YOUTUBE_PLUGIN_VERSION=1.18.0 ARG YOUTUBE_PLUGIN_JAR=youtube-plugin-1.18.0.jar +ARG LAVASRC_VERSION=4.8.3 ENV LAVALINK_HOME=/opt/lavalink RUN apt-get update \ - && apt-get install -y --no-install-recommends curl ca-certificates \ + && apt-get install -y --no-install-recommends curl ca-certificates gettext-base \ && rm -rf /var/lib/apt/lists/* RUN useradd --create-home --shell /usr/sbin/nologin lavalink @@ -16,11 +17,13 @@ WORKDIR /opt/lavalink RUN mkdir -p plugins \ && curl -fsSL -o Lavalink.jar "https://github.com/lavalink-devs/Lavalink/releases/download/${LAVALINK_VERSION}/Lavalink.jar" \ - && curl -fsSL -o "plugins/${YOUTUBE_PLUGIN_JAR}" "https://github.com/lavalink-devs/youtube-source/releases/download/${YOUTUBE_PLUGIN_VERSION}/${YOUTUBE_PLUGIN_JAR}" + && curl -fsSL -o "plugins/${YOUTUBE_PLUGIN_JAR}" "https://github.com/lavalink-devs/youtube-source/releases/download/${YOUTUBE_PLUGIN_VERSION}/${YOUTUBE_PLUGIN_JAR}" \ + && curl -fsSL -o "plugins/lavasrc-plugin-${LAVASRC_VERSION}.jar" "https://github.com/topi314/LavaSrc/releases/download/${LAVASRC_VERSION}/lavasrc-plugin-${LAVASRC_VERSION}.jar" COPY --chown=lavalink:lavalink lavalink/application.yml ./application.yml +COPY --chown=lavalink:lavalink lavalink/entrypoint.sh ./entrypoint.sh -RUN chown -R lavalink:lavalink /opt/lavalink +RUN chmod +x entrypoint.sh && chown -R lavalink:lavalink /opt/lavalink USER lavalink @@ -29,4 +32,4 @@ EXPOSE 2333 HEALTHCHECK --interval=10s --timeout=5s --start-period=20s --retries=12 \ CMD sh -c 'curl -fsS -H "Authorization: ${LAVALINK_PASSWORD}" http://localhost:2333/version >/dev/null' -CMD ["java", "-jar", "Lavalink.jar", "--spring.config.additional-location=file:./"] +ENTRYPOINT ["./entrypoint.sh"] diff --git a/bootstrap.py b/bootstrap.py index 23ffb60..2405ae0 100644 --- a/bootstrap.py +++ b/bootstrap.py @@ -8,12 +8,13 @@ import os import shutil -import signal import subprocess import sys from pathlib import Path from getpass import getpass +from utils.env_utils import parse_env_lines, read_env_file, write_env_value + ROOT_DIR = Path(__file__).resolve().parent VENV_DIR = ROOT_DIR / "venv" @@ -24,46 +25,6 @@ ENV_EXAMPLE_PATH = ROOT_DIR / ".env.example" -def read_env_file(path: Path) -> list[str]: - if not path.exists(): - return [] - return path.read_text(encoding="utf-8").splitlines() - - -def parse_env_lines(lines: list[str]) -> dict[str, str]: - values: dict[str, str] = {} - for line in lines: - stripped = line.strip() - if not stripped or stripped.startswith("#") or "=" not in stripped: - continue - key, value = stripped.split("=", 1) - values[key.strip()] = value.strip() - return values - - -def write_env_value(path: Path, key: str, value: str) -> None: - lines = read_env_file(path) - updated = False - new_lines: list[str] = [] - - for line in lines: - stripped = line.strip() - if stripped and not stripped.startswith("#") and "=" in stripped: - current_key = stripped.split("=", 1)[0].strip() - if current_key == key: - new_lines.append(f"{key}={value}") - updated = True - continue - new_lines.append(line) - - if not updated: - if new_lines and new_lines[-1] != "": - new_lines.append("") - new_lines.append(f"{key}={value}") - - path.write_text("\n".join(new_lines) + "\n", encoding="utf-8") - - def ensure_dotenv() -> None: if not ENV_PATH.exists() and ENV_EXAMPLE_PATH.exists(): ENV_PATH.write_text(ENV_EXAMPLE_PATH.read_text(encoding="utf-8"), encoding="utf-8") diff --git a/cogs/music.py b/cogs/music.py index 4912b94..ade99ec 100644 --- a/cogs/music.py +++ b/cogs/music.py @@ -20,10 +20,18 @@ from utils.lavalink_voice import LavalinkVoiceClient from utils.search import ( _canonicalize_youtube_playlist_url, + _normalize_spotify_url, + is_spotify_url, is_youtube_url, search_public_youtube_playlist, search_youtube_candidates, - search_youtube_best_match, +) +from utils.spotify_resolver import ( + get_spotify_recommendations, + get_track_details, + resolve_spotify_url, + parse_spotify_url, + search_spotify_tracks, ) @@ -136,19 +144,56 @@ async def on_timeout(self): ) +_AUTO_CACHE: dict[str, list[dict]] = {} + async def song_query_autocomplete(interaction: discord.Interaction, current: str): - if not current or len(current.strip()) < 2: + cleaned = current.strip() + if len(cleaned) < 3: return [] - choices = await search_youtube_candidates(current, limit=SEARCH_SUGGESTION_LIMIT) - return [ - app_commands.Choice( - name=(choice.get("title") or "Desconocido")[:100], - value=(choice.get("url") or "")[:100], + from config import SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET + + if not SPOTIFY_CLIENT_ID or not SPOTIFY_CLIENT_SECRET: + return [] + + # Check if we can reuse a previous search by filtering locally + cached_query = next((q for q in _AUTO_CACHE if cleaned.startswith(q)), None) + if cached_query: + filtered = [ + r for r in _AUTO_CACHE[cached_query] + if cleaned.lower() in r["title"].lower() or cleaned.lower() in r["author"].lower() + ] + if filtered: + return [ + app_commands.Choice( + name=f"{r['title']} — {r['author']}"[:100], + value=r["url"], + ) + for r in filtered[:25] + ] + + try: + print(f"[AUTO] Spotify: {cleaned}") + spotify_results = await search_spotify_tracks( + cleaned, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, limit=SEARCH_SUGGESTION_LIMIT ) - for choice in choices[:25] - if choice.get("url") - ] + if spotify_results: + _AUTO_CACHE[cleaned] = spotify_results + if len(_AUTO_CACHE) > 20: + oldest = min(_AUTO_CACHE, key=_AUTO_CACHE.get) + del _AUTO_CACHE[oldest] + return [ + app_commands.Choice( + name=f"{r['title']} — {r['author']}"[:100], + value=r["url"], + ) + for r in spotify_results[:25] + ] + print("[AUTO] Spotify sin resultados") + except Exception as e: + print(f"[AUTO] Spotify error: {e}") + + return [] class MusicCog(commands.Cog): @@ -158,6 +203,8 @@ def __init__(self, bot): self._autoplay_guilds: set[int] = set() # last track info per guild, used to seed autoplay searches self._last_track_info: dict[int, dict] = {} + # cache Lavalink results for Spotify URLs: url -> (tracks, load_type) + self._spotify_cache: dict[str, tuple] = {} # guard against registering lavalink event hooks more than once self._hooks_registered: bool = False @@ -178,6 +225,14 @@ def _get_player(self, guild_id: int): return lavalink_client.player_manager.get(guild_id) + @staticmethod + def _get_track_thumbnail(track: lavalink.AudioTrack) -> str | None: + if track.artwork_url: + return track.artwork_url + if track.identifier and not track.is_stream: + return f"https://img.youtube.com/vi/{track.identifier}/mqdefault.jpg" + return None + def _build_embed( self, interaction: discord.Interaction, @@ -185,9 +240,13 @@ def _build_embed( description: str | None = None, *, color=BOT_PRIMARY, + thumbnail_url: str | None = None, ): embed = build_base_embed(title=title, description=description, color=color) + if thumbnail_url: + embed.set_thumbnail(url=thumbnail_url) + if interaction.guild and interaction.guild.icon: embed.set_author( name=interaction.guild.name, icon_url=interaction.guild.icon.url @@ -214,6 +273,39 @@ async def _send_embed(self, interaction: discord.Interaction, embed: discord.Emb else: await interaction.response.send_message(embed=embed) + async def _send_playback_result( + self, + interaction: discord.Interaction, + description: str, + started: bool, + *, + mode: str = "play", + thumbnail_url: str | None = None, + ) -> None: + if started: + await self._send_embed( + interaction, + self._build_embed( + interaction, + "Reproduciendo ahora", + description, + color=BOT_SUCCESS, + thumbnail_url=thumbnail_url, + ), + ) + else: + title = ( + "Añadida y en reproducción" + if mode == "add" + else "Añadida a la cola" + ) + await self._send_embed( + interaction, + self._build_embed( + interaction, title, description, color=BOT_PRIMARY, + ), + ) + async def _send_reply(self, interaction: discord.Interaction, content: str): await self._send_embed( interaction, @@ -313,42 +405,42 @@ async def _queue_selected_track( started = True description = f"**{track.title or choice.title}**" - if started: - await self._send_embed( - interaction, - self._build_embed( - interaction, "Reproduciendo ahora", description, color=BOT_SUCCESS - ), - ) - else: - title = ( - "Añadida y en reproducción" if mode == "add" else "Añadida a la cola" - ) - await self._send_embed( - interaction, - self._build_embed(interaction, title, description, color=BOT_PRIMARY), - ) - + thumb = self._get_track_thumbnail(track) + await self._send_playback_result(interaction, description, started, mode=mode, thumbnail_url=thumb) return True async def _build_song_choices( self, query: str, limit: int = 5 ) -> list[SearchChoice]: - results = await search_youtube_candidates(query, limit=limit) + from config import SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET + choices: list[SearchChoice] = [] - for result in results: - choices.append( - SearchChoice( - title=result.get("title") or "Desconocido", - url=result.get("url") or "", - author=result.get("uploader") or result.get("channel"), - duration=result.get("duration"), - thumbnail=result.get("thumbnail"), + if SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET: + try: + print(f"[SEARCH] Spotify: {query}") + spotify_results = await search_spotify_tracks( + query, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, limit=limit ) - ) - - return [choice for choice in choices if choice.url] + if spotify_results: + print(f"[SEARCH] Spotify resultados: {len(spotify_results)}") + for r in spotify_results: + print(f"[SEARCH] {r['title']} — {r['author']} thumb={r.get('thumbnail', 'N/A')[:50]}") + choices.append( + SearchChoice( + title=r["title"], + url=r["url"], + author=r["author"], + duration=r["duration"], + thumbnail=r.get("thumbnail"), + ) + ) + return choices + print("[SEARCH] Spotify sin resultados") + except Exception as e: + print(f"[SEARCH] Spotify error: {e}") + + return choices async def _handle_music_search( self, @@ -372,29 +464,29 @@ async def _handle_music_search( return await self._send_error(interaction, error_message) description = f"**{display_title}**" - if started: - await self._send_embed( - interaction, - self._build_embed( - interaction, - "Reproduciendo ahora", - description, - color=BOT_SUCCESS, - ), - ) - else: - title = ( - "Añadida y en reproducción" - if mode == "add" - else "Añadida a la cola" + thumb = self._get_track_thumbnail(player.current) if player.current else None + await self._send_playback_result(interaction, description, started, mode=mode, thumbnail_url=thumb) + return + + if is_spotify_url(query): + normalized = _normalize_spotify_url(query) or query + parsed = parse_spotify_url(normalized) + if parsed and parsed[0] == "track": + player, display_title, started, error_message = await self._queue_query( + interaction, normalized, log_prefix=log_prefix ) - await self._send_embed( - interaction, - self._build_embed( - interaction, title, description, color=BOT_PRIMARY - ), + if player is None: + return await self._send_error(interaction, error_message) + description = f"**{display_title}**" + thumb = self._get_track_thumbnail(player.current) if player.current else None + await self._send_playback_result(interaction, description, started, mode=mode, thumbnail_url=thumb) + return + + if parsed and parsed[0] in ("album", "playlist"): + await self._queue_spotify_collection( + interaction, normalized, parsed, log_prefix=log_prefix ) - return + return candidates = await self._build_song_choices( query, limit=SEARCH_SUGGESTION_LIMIT @@ -406,6 +498,28 @@ async def _handle_music_search( if handled: return + from config import SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET + + if SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET: + try: + spotify_matches = await search_spotify_tracks( + query, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, limit=1 + ) + if spotify_matches: + best = spotify_matches[0] + player, display_title, started, error_message = ( + await self._queue_query( + interaction, best["url"], log_prefix=log_prefix + ) + ) + if player is not None: + description = f"**{display_title}**" + thumb = self._get_track_thumbnail(player.current) if player.current else None + await self._send_playback_result(interaction, description, started, mode=mode, thumbnail_url=thumb) + return + except Exception: + pass + player, display_title, started, error_message = await self._queue_query( interaction, query, log_prefix=log_prefix ) @@ -413,21 +527,8 @@ async def _handle_music_search( return await self._send_error(interaction, error_message) description = f"**{display_title}**" - if started: - await self._send_embed( - interaction, - self._build_embed( - interaction, "Reproduciendo ahora", description, color=BOT_SUCCESS - ), - ) - else: - title = ( - "Añadida y en reproducción" if mode == "add" else "Añadida a la cola" - ) - await self._send_embed( - interaction, - self._build_embed(interaction, title, description, color=BOT_PRIMARY), - ) + thumb = self._get_track_thumbnail(player.current) if player.current else None + await self._send_playback_result(interaction, description, started, mode=mode, thumbnail_url=thumb) def _require_voice_channel(self, interaction: discord.Interaction): if interaction.guild is None: @@ -562,40 +663,42 @@ async def _queue_query( return None, None, False, error_message normalized_query = query - search_result = None - - if not is_youtube_url(query): - search_result = await search_youtube_best_match(query, limit=12) - if search_result: - normalized_query = search_result["url"] - uploader = ( - search_result.get("uploader") - or search_result.get("channel") - or "desconocido" - ) - print( - f"[{log_prefix}] Mejor coincidencia: {search_result['title']} | {uploader}" - ) - else: - normalized_query = f"ytsearch:{query}" + + if not is_youtube_url(query) and not is_spotify_url(query): + normalized_query = f"ytsearch:{query}" print(f"[{log_prefix}] Consultando Lavalink: {normalized_query}") try: - results = await player.node.get_tracks(normalized_query) - except Exception as exc: - if search_result is not None: - fallback_query = f"ytsearch:{query}" - print( - f"[{log_prefix}] Fallback a búsqueda directa: {type(exc).__name__}: {exc}" - ) - print(f"[{log_prefix}] Consultando Lavalink: {fallback_query}") - results = await player.node.get_tracks(fallback_query) + if is_spotify_url(normalized_query) and normalized_query in self._spotify_cache: + print(f"[{log_prefix}] Usando cache para: {normalized_query[:60]}") + cached = self._spotify_cache[normalized_query] + results = cached[0] else: - raise + results = await player.node.get_tracks(normalized_query) + if is_spotify_url(normalized_query) and results and results.tracks: + self._spotify_cache[normalized_query] = (results,) + print(f"[{log_prefix}] Cache guardado para: {normalized_query[:60]}") + except Exception as exc: + print(f"[{log_prefix}] Error en Lavalink: {type(exc).__name__}: {exc}") + raise if not results or not results.tracks: - print(f"[{log_prefix}] ✗ No se encontraron resultados") - return None, None, False, f"No se encontró ninguna canción con: '{query}'" + if is_spotify_url(normalized_query): + print(f"[{log_prefix}] ✗ Sin resultados, reintentando una vez...") + try: + results = await player.node.get_tracks(normalized_query) + if results and results.tracks: + self._spotify_cache[normalized_query] = (results,) + print(f"[{log_prefix}] ✓ Éxito en reintento") + else: + print(f"[{log_prefix}] ✗ Reintento también falló") + return None, None, False, f"No se encontró ninguna canción con: '{query}'" + except Exception: + print(f"[{log_prefix}] ✗ Reintento también falló") + return None, None, False, f"No se encontró ninguna canción con: '{query}'" + else: + print(f"[{log_prefix}] ✗ No se encontraron resultados") + return None, None, False, f"No se encontró ninguna canción con: '{query}'" tracks = results.tracks display_title = "" @@ -623,6 +726,107 @@ async def _queue_query( return player, display_title, started, None + async def _queue_spotify_collection( + self, + interaction: discord.Interaction, + url: str, + parsed: tuple[str, str], + *, + log_prefix: str, + ): + kind = parsed[0] + from config import SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET + + if not SPOTIFY_CLIENT_ID or not SPOTIFY_CLIENT_SECRET: + await self._send_error( + interaction, + "Spotify no está configurado. Falta SPOTIFY_CLIENT_ID o SPOTIFY_CLIENT_SECRET.", + ) + return + + track_urls = await resolve_spotify_url( + url, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET + ) + if track_urls is None or track_urls[1] is None: + label = "álbum" if kind == "album" else "playlist" + await self._send_error( + interaction, + f"No se pudo cargar ese {label} de Spotify. " + "Las playlists pueden requerir autenticación adicional.", + ) + return + + _, urls = track_urls + player, error_message = await self._ensure_player(interaction) + if player is None: + await self._send_error(interaction, error_message) + return + + loaded = 0 + failed = 0 + first_title = "" + + for i, track_url in enumerate(urls): + print( + f"[{log_prefix}] Cargando track {i+1}/{len(urls)}: {track_url}" + ) + try: + if track_url in self._spotify_cache: + cached = self._spotify_cache[track_url] + results = cached[0] + else: + results = await player.node.get_tracks(track_url) + if not results or not results.tracks: + print(f"[{log_prefix}] ⚠️ Falló, reintentando: {track_url}") + results = await player.node.get_tracks(track_url) + if results and results.tracks: + self._spotify_cache[track_url] = (results,) + if results and results.tracks: + track = results.tracks[0] + track.requester = interaction.user.id + player.add(track) + loaded += 1 + if not first_title: + first_title = track.title or "" + print( + f"[{log_prefix}] ✓ En cola: {track.title}" + ) + else: + failed += 1 + except Exception as exc: + failed += 1 + print( + f"[{log_prefix}] ✗ Error cargando {track_url}: {exc}" + ) + + if loaded == 0: + await self._send_error( + interaction, "No se pudo cargar ningún track de ese álbum." + ) + return + + started = False + if not player.is_playing: + await player.play() + started = True + + kind_label = "álbum" if kind == "album" else "playlist" + description = ( + f"**{loaded} canciones** en cola desde el {kind_label}" + ) + if failed: + description += f"\n⚠ {failed} no se pudieron cargar" + + await self._send_embed( + interaction, + self._build_embed( + interaction, + "Reproduciendo álbum" if started else "Álbum en cola", + description, + color=BOT_SUCCESS if started else BOT_PRIMARY, + ), + ) + async def _queue_playlist_query( self, interaction: discord.Interaction, query: str, *, log_prefix: str ): @@ -630,26 +834,87 @@ async def _queue_playlist_query( if player is None: return None, None, False, error_message - playlist_url = _canonicalize_youtube_playlist_url(query) or query.strip() + spotify_url = _normalize_spotify_url(query) or query.strip() playlist_result = None - if not is_youtube_url(playlist_url): - playlist_result = await search_public_youtube_playlist(query, limit=8) - if playlist_result: - playlist_url = playlist_result["url"] - print( - f"[{log_prefix}] Mejor coincidencia de playlist: {playlist_result['title']} | {playlist_url}" - ) - else: - return ( - None, - None, - False, - ( - f"No encontré una playlist pública con: '{query}'. " - "Prueba con el título exacto + autor o pega la URL directa." - ), + if is_spotify_url(spotify_url): + parsed = parse_spotify_url(spotify_url) + if parsed and parsed[0] in ("album", "playlist"): + from config import SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET + + if not SPOTIFY_CLIENT_ID or not SPOTIFY_CLIENT_SECRET: + return None, None, False, "Spotify no está configurado." + + track_urls = await resolve_spotify_url( + spotify_url, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET ) + if track_urls is None or track_urls[1] is None: + label = "álbum" if parsed[0] == "album" else "playlist" + return ( + None, + None, + False, + f"No se pudo cargar ese {label} de Spotify.", + ) + + _, urls = track_urls + loaded = 0 + failed = 0 + display_title = "" + + for track_url in urls: + try: + if track_url in self._spotify_cache: + cached = self._spotify_cache[track_url] + results = cached[0] + else: + results = await player.node.get_tracks(track_url) + if results and results.tracks: + self._spotify_cache[track_url] = (results,) + if results and results.tracks: + track = results.tracks[0] + track.requester = interaction.user.id + player.add(track) + loaded += 1 + if not display_title: + display_title = track.title or "" + else: + failed += 1 + except Exception: + failed += 1 + + if loaded == 0: + return None, None, False, "No se pudo cargar ningún track." + + display_title = f"{display_title} y {loaded - 1} más" if loaded > 1 else display_title + + started = False + if not player.is_playing: + await player.play() + started = True + + return player, display_title, started, None + + playlist_url = spotify_url + else: + playlist_url = _canonicalize_youtube_playlist_url(query) or query.strip() + if not is_youtube_url(playlist_url): + playlist_result = await search_public_youtube_playlist(query, limit=8) + if playlist_result: + playlist_url = playlist_result["url"] + print( + f"[{log_prefix}] Mejor coincidencia de playlist: {playlist_result['title']} | {playlist_url}" + ) + else: + return ( + None, + None, + False, + ( + f"No encontré una playlist pública con: '{query}'. " + "Prueba con el título exacto + autor o pega la URL directa." + ), + ) print(f"[{log_prefix}] Consultando Lavalink: {playlist_url}") try: @@ -692,45 +957,76 @@ async def _queue_playlist_query( async def _autofill_queue(self, player) -> int: """Search for related tracks and add them to the player queue. - Uses the last known track title + author as a seed query. + Uses Spotify recommendations if available, falls back to YouTube search. Returns the number of tracks added. """ + from config import SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET + guild_id = player.guild_id seed_info = self._last_track_info.get(guild_id, {}) + spotify_id = seed_info.get("spotify_id") title = seed_info.get("title") or "" - author = seed_info.get("author") or "" if not title: print("[AUTOPLAY] No hay información de la última canción para semilla") return 0 - query = f"{title} {author}".strip() if author else title - print(f"[AUTOPLAY] Buscando canciones relacionadas con: {query!r}") + added = 0 + queued_urls: set[str] = set() - try: - candidates = await search_youtube_candidates(query, limit=AUTOPLAY_FILL_TRACKS + AUTOPLAY_SEARCH_BUFFER) - except Exception as exc: - print(f"[AUTOPLAY] Error en búsqueda: {exc}") - return 0 + seed_uri = seed_info.get("uri") or "" + seed_title_lower = title.lower().strip() - added = 0 - queued_titles: set[str] = set() + candidates: list[dict] = [] + + if spotify_id and SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET: + print(f"[AUTOPLAY] Recomendaciones vía Spotify: seed_track={spotify_id}") + try: + spotify_recs = await get_spotify_recommendations( + spotify_id, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, + limit=AUTOPLAY_FILL_TRACKS + AUTOPLAY_SEARCH_BUFFER, + ) + if spotify_recs: + print(f"[AUTOPLAY] Spotify devolvió {len(spotify_recs)} recomendaciones") + for rec in spotify_recs: + if rec["url"] not in queued_urls and rec["url"] != seed_uri: + candidates.append(rec) + queued_urls.add(rec["url"]) + except Exception as exc: + print(f"[AUTOPLAY] Error en recomendaciones Spotify: {exc}") - # Skip the seed track itself so we don't repeat it immediately - seed_title_norm = title.lower().strip() + if not candidates: + author = seed_info.get("author") or "" + query = f"{title} {author}".strip() if author else title + print(f"[AUTOPLAY] Fallback a YouTube: {query!r}") + try: + yt_candidates = await search_youtube_candidates( + query, limit=AUTOPLAY_FILL_TRACKS + AUTOPLAY_SEARCH_BUFFER + ) + for c in yt_candidates: + url = c.get("url") or "" + if url and url not in queued_urls and url != seed_uri: + candidates.append(c) + queued_urls.add(url) + except Exception as exc: + print(f"[AUTOPLAY] Error en búsqueda YouTube: {exc}") + return 0 + + existing_titles: set[str] = set() + for t in player.queue: + existing_titles.add((t.title or "").lower().strip()) + if player.current: + existing_titles.add((player.current.title or "").lower().strip()) for candidate in candidates: if added >= AUTOPLAY_FILL_TRACKS: break - cand_title = (candidate.get("title") or "").lower().strip() cand_url = candidate.get("url") or "" - + cand_title = (candidate.get("title") or "").lower().strip() if not cand_url: continue - - # Skip if it's the same as the seed or a duplicate - if cand_title == seed_title_norm or cand_title in queued_titles: + if cand_title == seed_title_lower or cand_title in existing_titles: continue try: @@ -744,7 +1040,7 @@ async def _autofill_queue(self, player) -> int: track = results.tracks[0] player.add(track) - queued_titles.add(cand_title) + existing_titles.add((track.title or "").lower().strip()) added += 1 print(f"[AUTOPLAY] Añadida: {track.title}") @@ -756,10 +1052,17 @@ async def on_track_start(self, event: lavalink.TrackStartEvent) -> None: track = event.track if track is None: return + uri = track.uri or "" + spotify_id: str | None = None + if uri.startswith("https://open.spotify.com/track/"): + parsed = parse_spotify_url(uri) + if parsed: + spotify_id = parsed[1] self._last_track_info[event.player.guild_id] = { "title": track.title or "", "author": track.author or "", - "uri": track.uri or "", + "uri": uri, + "spotify_id": spotify_id, } @lavalink.listener(lavalink.QueueEndEvent) @@ -919,49 +1222,21 @@ async def queue(self, interaction: discord.Interaction): if player is None: return await self._send_error(interaction, error_message) - message = "**Cola de reproducción:**\n" - - if player.current: - duration = player.current.duration // 1000 - message += f"▶️ **Actual:** {player.current.title}\n" - message += f" Duración: {duration}s\n\n" - else: - message += "❌ No hay nada reproduciéndose\n\n" - - if not player.queue: - message += "**Cola vacía**" - else: - queue_list = player.queue - total_duration = sum(t.duration for t in queue_list) // 1000 - message += f"**En cola ({len(queue_list)} canciones):**\n" - - for i, track in enumerate(queue_list[:10], 1): - dur = track.duration // 1000 - message += f"{i}. {track.title} ({dur}s)\n" - - if len(queue_list) > 10: - message += f"... y {len(queue_list) - 10} más\n" - - message += f"\n**Duración total en cola:** {total_duration}s" - embed = self._build_embed( interaction, "Cola de reproducción", color=BOT_PRIMARY ) + if player.current: current = player.current + thumb = self._get_track_thumbnail(current) + if thumb: + embed.set_thumbnail(url=thumb) + embed.add_field( - name="Ahora sonando", - value=current.title or "Desconocido", + name="▶️ Ahora sonando", + value=f"**{current.title}** · {current.author or 'Desconocido'} · {format_duration(current.duration)}", inline=False, ) - embed.add_field( - name="Autor", value=current.author or "Desconocido", inline=True - ) - embed.add_field( - name="Duración", - value=format_duration(current.duration), - inline=True, - ) else: embed.add_field( name="Ahora sonando", value="Nada en reproducción", inline=False @@ -974,9 +1249,12 @@ async def queue(self, interaction: discord.Interaction): total_duration = sum(t.duration for t in queue_list) queue_preview = [] for i, track in enumerate(queue_list[:10], 1): - queue_preview.append( - f"{i}. {track.title} · {format_duration(track.duration)}" - ) + dur = format_duration(track.duration) + t = self._get_track_thumbnail(track) + if t: + queue_preview.append(f"{i}. {track.title} · {dur}") + else: + queue_preview.append(f"{i}. {track.title} · {dur}") if len(queue_list) > 10: queue_preview.append(f"... y {len(queue_list) - 10} más") @@ -1109,7 +1387,7 @@ async def stop(self, interaction: discord.Interaction): if player is None: return await self._send_error(interaction, error_message) - voice_client = interaction.guild.voice_client if interaction.guild else None + voice_client = interaction.guild.voice_client player.queue.clear() if player.current: @@ -1118,19 +1396,16 @@ async def stop(self, interaction: discord.Interaction): if voice_client: await voice_client.disconnect(force=True) - if player or voice_client: - print("[STOP] ✓ Bot desconectado") - await self._send_embed( + print("[STOP] ✓ Bot desconectado") + await self._send_embed( + interaction, + self._build_embed( interaction, - self._build_embed( - interaction, - "Reproducción detenida", - "La cola se vació y el bot salió del canal.", - color=BOT_ERROR, - ), - ) - else: - await self._send_error(interaction, "No estoy conectado.") + "Reproducción detenida", + "La cola se vació y el bot salió del canal.", + color=BOT_ERROR, + ), + ) except Exception as e: print(f"[STOP] ✗ Error: {e}") @@ -1157,8 +1432,23 @@ async def nowplaying(self, interaction: discord.Interaction): progress_label = progress_bar(position_ms, track.duration) duration_label = f"{format_duration(position_ms)} / {format_duration(track.duration)}" + thumb = self._get_track_thumbnail(track) + + from config import SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET + + spotify_details: dict | None = None + if (track.uri or "").startswith("https://open.spotify.com/track/"): + parsed = parse_spotify_url(track.uri) + if parsed and SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET: + try: + spotify_details = await get_track_details( + parsed[1], SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET + ) + except Exception: + pass + embed = self._build_embed( - interaction, track.title or "Reproducción actual", color=BOT_PRIMARY + interaction, track.title or "Reproducción actual", color=BOT_PRIMARY, thumbnail_url=thumb ) if track.uri: embed.url = track.uri @@ -1172,6 +1462,32 @@ async def nowplaying(self, interaction: discord.Interaction): inline=True, ) embed.add_field(name="Duración", value=duration_label, inline=True) + + if spotify_details: + if spotify_details.get("album"): + embed.add_field( + name="Álbum", value=spotify_details["album"], inline=True + ) + if spotify_details.get("release_date"): + embed.add_field( + name="Lanzamiento", value=spotify_details["release_date"], inline=True + ) + pop = spotify_details.get("popularity") + if pop is not None: + bar = "█" * (pop // 10) + "░" * (10 - pop // 10) + embed.add_field( + name="Popularidad", value=f"{pop}% {bar}", inline=True + ) + if spotify_details.get("explicit"): + embed.add_field( + name="⛔ Explícito", value="Sí", inline=True + ) + genres = spotify_details.get("genres") + if genres: + embed.add_field( + name="Géneros", value=", ".join(genres[:3]).capitalize(), inline=False + ) + embed.add_field( name="Progreso", value=f"`{progress_label}`", inline=False ) diff --git a/config.py b/config.py index 4fe6423..055f52f 100644 --- a/config.py +++ b/config.py @@ -47,3 +47,6 @@ def load_dotenv_file(path: Path) -> None: if not LAVALINK_PASSWORD: raise RuntimeError("LAVALINK_PASSWORD must be set in the environment or .env") + +SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "") +SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "") diff --git a/docker-compose.yml b/docker-compose.yml index 1488ee7..74ab97a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,8 @@ services: dockerfile: Dockerfile.lavalink environment: LAVALINK_PASSWORD: ${LAVALINK_PASSWORD:?set LAVALINK_PASSWORD in .env} + SPOTIFY_CLIENT_ID: ${SPOTIFY_CLIENT_ID} + SPOTIFY_CLIENT_SECRET: ${SPOTIFY_CLIENT_SECRET} ports: - "${LAVALINK_HOST_PORT:-2333}:2333" restart: unless-stopped @@ -21,6 +23,8 @@ services: LAVALINK_HOST: lavalink LAVALINK_PORT: 2333 LAVALINK_PASSWORD: ${LAVALINK_PASSWORD:?set LAVALINK_PASSWORD in .env} + SPOTIFY_CLIENT_ID: ${SPOTIFY_CLIENT_ID} + SPOTIFY_CLIENT_SECRET: ${SPOTIFY_CLIENT_SECRET} PYTHONUNBUFFERED: "1" depends_on: lavalink: diff --git a/lavalink/application.yml b/lavalink/application.yml index effe9f5..2e955dc 100644 --- a/lavalink/application.yml +++ b/lavalink/application.yml @@ -13,11 +13,27 @@ lavalink: vimeo: true http: true local: false - plugins: youtube: enabled: true allowSearch: true + lavasrc: + providers: + - "ytsearch:\"%ISRC%\"" + - "ytsearch:%QUERY%" + - "scsearch:%QUERY%" + - "spsearch:%QUERY%" + sources: + spotify: true + applemusic: false + deezer: false + yandexmusic: false + spotify: + clientId: "${SPOTIFY_CLIENT_ID}" + clientSecret: "${SPOTIFY_CLIENT_SECRET}" + countryCode: "US" + playlistLoadLimit: 3 + albumLoadLimit: 3 metrics: prometheus: @@ -45,4 +61,5 @@ logging: moe.kyokobot.koe.internal.handler.DiscordUDPConnection: TRACE moe.kyokobot.koe.gateway.AbstractMediaGatewayConnection: TRACE moe.kyokobot.koe.internal.MediaConnectionImpl: DEBUG - dev.lavalink.youtube: DEBUG \ No newline at end of file + dev.lavalink.youtube: DEBUG + com.github.topi314.lavasrc: DEBUG \ No newline at end of file diff --git a/lavalink/entrypoint.sh b/lavalink/entrypoint.sh new file mode 100644 index 0000000..200d139 --- /dev/null +++ b/lavalink/entrypoint.sh @@ -0,0 +1,7 @@ +#!/bin/sh +set -e + +envsubst < /opt/lavalink/application.yml > /tmp/application.yml +mv /tmp/application.yml /opt/lavalink/application.yml + +exec java -jar Lavalink.jar --spring.config.additional-location=file:./ diff --git a/main.py b/main.py index 19a162d..074d75a 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,6 @@ class MusicBot(commands.Bot): def __init__(self) -> None: super().__init__(command_prefix="!", intents=INTENTS) self._lavalink_ready = False - self._slash_synced = False async def setup_hook(self) -> None: await self.load_extension("cogs.music") diff --git a/run.py b/run.py index 294da0f..fd905bf 100644 --- a/run.py +++ b/run.py @@ -16,6 +16,8 @@ import urllib.request from pathlib import Path +from utils.env_utils import parse_env_lines, read_env_file, write_env_value + ROOT_DIR = Path(__file__).resolve().parent ENV_PATH = ROOT_DIR / ".env" @@ -30,46 +32,6 @@ BOT_ENTRYPOINT = ROOT_DIR / "main.py" -def read_env_file(path: Path) -> list[str]: - if not path.exists(): - return [] - return path.read_text(encoding="utf-8").splitlines() - - -def parse_env_lines(lines: list[str]) -> dict[str, str]: - values: dict[str, str] = {} - for line in lines: - stripped = line.strip() - if not stripped or stripped.startswith("#") or "=" not in stripped: - continue - key, value = stripped.split("=", 1) - values[key.strip()] = value.strip() - return values - - -def write_env_value(path: Path, key: str, value: str) -> None: - lines = read_env_file(path) - updated = False - new_lines: list[str] = [] - - for line in lines: - stripped = line.strip() - if stripped and not stripped.startswith("#") and "=" in stripped: - current_key = stripped.split("=", 1)[0].strip() - if current_key == key: - new_lines.append(f"{key}={value}") - updated = True - continue - new_lines.append(line) - - if not updated: - if new_lines and new_lines[-1] != "": - new_lines.append("") - new_lines.append(f"{key}={value}") - - path.write_text("\n".join(new_lines) + "\n", encoding="utf-8") - - def ensure_env() -> dict[str, str]: if not ENV_PATH.exists() and ENV_EXAMPLE_PATH.exists(): ENV_PATH.write_text(ENV_EXAMPLE_PATH.read_text(encoding="utf-8"), encoding="utf-8") diff --git a/utils/embeds.py b/utils/embeds.py index 9384924..c73b442 100644 --- a/utils/embeds.py +++ b/utils/embeds.py @@ -1,11 +1,8 @@ from __future__ import annotations -from datetime import timedelta - import discord -BOT_ACCENT = discord.Color.from_rgb(43, 45, 49) BOT_PRIMARY = discord.Color.from_rgb(88, 101, 242) BOT_SUCCESS = discord.Color.from_rgb(46, 204, 113) BOT_WARNING = discord.Color.from_rgb(241, 196, 15) @@ -35,11 +32,4 @@ def progress_bar(position_ms: int | float, duration_ms: int | float, *, length: def build_base_embed(*, title: str, description: str | None = None, color: discord.Color = BOT_PRIMARY) -> discord.Embed: - embed = discord.Embed(title=title, description=description, color=color) - return embed - - -def build_status_embed(*, title: str, description: str | None = None, color: discord.Color = BOT_PRIMARY) -> discord.Embed: - embed = build_base_embed(title=title, description=description, color=color) - embed.set_footer(text="bot-discord") - return embed + return discord.Embed(title=title, description=description, color=color) diff --git a/utils/env_utils.py b/utils/env_utils.py new file mode 100644 index 0000000..3136f00 --- /dev/null +++ b/utils/env_utils.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from pathlib import Path + + +def read_env_file(path: Path) -> list[str]: + if not path.exists(): + return [] + return path.read_text(encoding="utf-8").splitlines() + + +def parse_env_lines(lines: list[str]) -> dict[str, str]: + values: dict[str, str] = {} + for line in lines: + stripped = line.strip() + if not stripped or stripped.startswith("#") or "=" not in stripped: + continue + key, value = stripped.split("=", 1) + values[key.strip()] = value.strip() + return values + + +def write_env_value(path: Path, key: str, value: str) -> None: + lines = read_env_file(path) + updated = False + new_lines: list[str] = [] + + for line in lines: + stripped = line.strip() + if stripped and not stripped.startswith("#") and "=" in stripped: + current_key = stripped.split("=", 1)[0].strip() + if current_key == key: + new_lines.append(f"{key}={value}") + updated = True + continue + new_lines.append(line) + + if not updated: + if new_lines and new_lines[-1] != "": + new_lines.append("") + new_lines.append(f"{key}={value}") + + path.write_text("\n".join(new_lines) + "\n", encoding="utf-8") diff --git a/utils/search.py b/utils/search.py index 495112e..0133c84 100644 --- a/utils/search.py +++ b/utils/search.py @@ -452,9 +452,6 @@ def _canonicalize_youtube_playlist_url(url: str) -> str | None: return f"https://www.youtube.com/playlist?list={playlist_id}" -def _is_youtube_playlist_url(url: str) -> bool: - return _canonicalize_youtube_playlist_url(url) is not None - def _score_playlist_result(query: str, entry: dict) -> float: query_text = _normalize_search_text(query) @@ -932,10 +929,6 @@ def _score_youtube_result(query: str, entry: dict) -> float: ) -def _score_youtube_result_for_best_match(query: str, entry: dict) -> float: - return _score_youtube_result(query, entry) - - _YOUTUBE_CANDIDATE_CACHE: dict[tuple[str, int], tuple[float, list[dict]]] = {} _YOUTUBE_VIDEO_CACHE: dict[str, tuple[float, dict]] = {} @@ -984,11 +977,10 @@ async def search_youtube_candidates(query: str, limit: int = 5): scored_video = dict(video) scored_video["source"] = "ytmusic" - scored_video["score"] = _score_youtube_result_for_best_match( + scored_video["score"] = _score_youtube_result( cleaned_query, scored_video ) candidate_videos.append(scored_video) - _YOUTUBE_VIDEO_CACHE[video_id] = (now, scored_video) print(f"[SEARCH] YTM resultados: {len(ytm_results)}") except Exception as e: print(f"[SEARCH] Error en YouTube Music: {e}") @@ -1005,20 +997,6 @@ async def search_youtube_candidates(query: str, limit: int = 5): return result -async def search_youtube_best_match(query: str, limit: int = 8): - """ - Buscar en YouTube y devolver el resultado más probable como pista original. - - Penaliza resultados tipo cover/letra/lyrics y favorece coincidencias de autor - cuando el texto de búsqueda ya incluye nombre de canción + artista. - """ - candidate_videos = await search_youtube_candidates(query, limit=limit) - - if not candidate_videos: - return None - - return candidate_videos[0] - def is_youtube_url(text: str) -> bool: """Verificar si el texto es una URL de YouTube válida""" @@ -1039,6 +1017,45 @@ def is_youtube_url(text: str) -> bool: } +def is_spotify_url(text: str) -> bool: + """Verificar si el texto es una URL de Spotify válida""" + try: + parsed_url = urlparse(_normalize_candidate_url(text)) + except Exception: + return False + + host = parsed_url.netloc.lower() + if host.startswith("www."): + host = host[4:] + + return host in {"open.spotify.com", "spotify.link", "spotify.com"} + + +def _normalize_spotify_url(url: str) -> str | None: + """Canonicalizar URL de Spotify a formato estándar.""" + try: + parsed_url = urlparse(_normalize_candidate_url(url)) + except Exception: + return None + + host = parsed_url.netloc.lower() + if host.startswith("www."): + host = host[4:] + + if host not in {"open.spotify.com", "spotify.com"}: + return None + + path = parsed_url.path.strip("/") + if "/" in path: + kind, item_id = path.split("/", 1) + kind = kind.lower() + item_id = item_id.split("?")[0].split("/")[0] + if kind in ("track", "album", "playlist", "artist") and item_id: + return f"https://open.spotify.com/{kind}/{item_id}" + + return url + + _YOUTUBE_MUSIC_CACHE = {} diff --git a/utils/spotify_resolver.py b/utils/spotify_resolver.py new file mode 100644 index 0000000..54adcbd --- /dev/null +++ b/utils/spotify_resolver.py @@ -0,0 +1,299 @@ +from __future__ import annotations + +from urllib.parse import urlparse + +import aiohttp + +SPOTIFY_API_BASE = "https://api.spotify.com/v1" + + +def parse_spotify_url(url: str) -> tuple[str, str] | None: + parsed = urlparse(url) + path = parsed.path.strip("/") + if "/" in path: + kind, item_id = path.split("/", 1) + item_id = item_id.split("?")[0].split("/")[0] + if kind in ("track", "album", "playlist") and item_id: + return kind, item_id + return None + + +async def _get_access_token( + client_id: str, client_secret: str, session: aiohttp.ClientSession +) -> str | None: + async with session.post( + "https://accounts.spotify.com/api/token", + data={"grant_type": "client_credentials"}, + auth=aiohttp.BasicAuth(client_id, client_secret), + ) as resp: + if resp.status == 200: + data = await resp.json() + return data["access_token"] + return None + + +async def resolve_album_tracks( + album_id: str, token: str, session: aiohttp.ClientSession +) -> list[str]: + track_urls: list[str] = [] + next_url: str | None = f"{SPOTIFY_API_BASE}/albums/{album_id}/tracks?limit=50" + while next_url: + async with session.get(next_url, headers={"Authorization": f"Bearer {token}"}) as resp: + if resp.status != 200: + return [] + data = await resp.json() + for item in data.get("items", []): + track_id = item.get("id") + if track_id: + track_urls.append(f"https://open.spotify.com/track/{track_id}") + next_url = data.get("next") + return track_urls + + +async def resolve_playlist_tracks( + playlist_id: str, token: str, session: aiohttp.ClientSession +) -> list[str]: + track_urls: list[str] = [] + next_url: str | None = ( + f"{SPOTIFY_API_BASE}/playlists/{playlist_id}/tracks?limit=100" + ) + while next_url: + async with session.get(next_url, headers={"Authorization": f"Bearer {token}"}) as resp: + if resp.status != 200: + return [] + data = await resp.json() + for item in data.get("items", []): + track = item.get("track") + if track and track.get("id"): + track_urls.append( + f"https://open.spotify.com/track/{track['id']}" + ) + next_url = data.get("next") + return track_urls + + +async def search_spotify_tracks( + query: str, client_id: str, client_secret: str, limit: int = 5 +) -> list[dict]: + if not client_id or not client_secret: + return [] + + async with aiohttp.ClientSession() as session: + token = await _get_access_token(client_id, client_secret, session) + if not token: + return [] + + async with session.get( + f"{SPOTIFY_API_BASE}/search", + params={"q": query, "type": "track", "limit": str(limit)}, + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status != 200: + return [] + data = await resp.json() + + results: list[dict] = [] + for item in data.get("tracks", {}).get("items", []): + artist = item["artists"][0]["name"] if item.get("artists") else "Desconocido" + images = item.get("album", {}).get("images", []) + thumbnail = images[0]["url"] if images else None + results.append( + { + "title": item["name"], + "url": item["external_urls"]["spotify"], + "author": artist, + "duration": item["duration_ms"], + "thumbnail": thumbnail, + } + ) + return results + + +async def resolve_spotify_url( + url: str, client_id: str, client_secret: str +) -> tuple[str | None, list[str] | None]: + """Resolve a Spotify URL to individual track URLs. + + Returns (kind, list_of_track_urls) or (None, None) on failure. + ``kind`` is one of: ``track``, ``album``, ``playlist``. + For ``track``, returns the original URL. + For ``album``/``playlist``, returns the list of individual track URLs. + """ + parsed = parse_spotify_url(url) + if not parsed: + return None, None + + kind, item_id = parsed + + if kind == "track": + return kind, [url] + + if not client_id or not client_secret: + return None, None + + async with aiohttp.ClientSession() as session: + token = await _get_access_token(client_id, client_secret, session) + if not token: + return None, None + + if kind == "album": + track_urls = await resolve_album_tracks(item_id, token, session) + elif kind == "playlist": + track_urls = await resolve_playlist_tracks(item_id, token, session) + else: + return None, None + + if not track_urls: + return kind, None + + return kind, track_urls + + +async def get_track_details( + track_id: str, client_id: str, client_secret: str +) -> dict | None: + """Get full metadata for a Spotify track: album, date, popularity, explicit, artist genres.""" + if not client_id or not client_secret: + return None + + async with aiohttp.ClientSession() as session: + token = await _get_access_token(client_id, client_secret, session) + if not token: + return None + + async with session.get( + f"{SPOTIFY_API_BASE}/tracks/{track_id}", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status != 200: + return None + data = await resp.json() + + album = data.get("album", {}) + artist_id = data["artists"][0]["id"] if data.get("artists") else None + artist_name = data["artists"][0]["name"] if data.get("artists") else None + genres: list[str] = [] + + if artist_id: + async with session.get( + f"{SPOTIFY_API_BASE}/artists/{artist_id}", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status == 200: + artist_data = await resp.json() + genres = artist_data.get("genres", []) + + images = album.get("images", []) + return { + "album": album.get("name"), + "album_url": album.get("external_urls", {}).get("spotify"), + "release_date": album.get("release_date"), + "popularity": data.get("popularity"), + "explicit": data.get("explicit", False), + "genres": genres[:5], + "artist_name": artist_name, + "thumbnail": images[0]["url"] if images else None, + "duration_ms": data.get("duration_ms"), + "track_number": data.get("track_number"), + } + + +async def _format_track(item: dict) -> dict: + artist = item["artists"][0]["name"] if item.get("artists") else "Desconocido" + images = item.get("album", {}).get("images", []) + return { + "title": item["name"], + "url": item["external_urls"]["spotify"], + "author": artist, + "duration": item["duration_ms"], + "thumbnail": images[0]["url"] if images else None, + } + + +async def get_spotify_recommendations( + seed_track_id: str, + client_id: str, + client_secret: str, + limit: int = 5, +) -> list[dict]: + """Get smart recommendations using Spotify. + + Strategies: + 1. Tracks by the same artist (different songs) + 2. Covers/remixes by track name + 3. Mix both pools prioritizing variety + """ + if not client_id or not client_secret: + return [] + + async with aiohttp.ClientSession() as session: + token = await _get_access_token(client_id, client_secret, session) + if not token: + return [] + + async with session.get( + f"{SPOTIFY_API_BASE}/tracks/{seed_track_id}", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status != 200: + return [] + data = await resp.json() + + track_name = data.get("name", "") + seed_url = data.get("external_urls", {}).get("spotify", "") + artist_name = data["artists"][0]["name"] if data.get("artists") else "" + + if not track_name: + return [] + + artist_names = set() + seen_urls: set[str] = {seed_url} + candidates: list[dict] = [] + needed = limit + 5 + + # Strategy 1: tracks by same artist (different songs) + if artist_name: + try: + async with session.get( + f"{SPOTIFY_API_BASE}/search", + params={"q": f'artist:"{artist_name}"', "type": "track", "limit": "10"}, + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status == 200: + sd = await resp.json() + for item in sd.get("tracks", {}).get("items", []): + url = item["external_urls"]["spotify"] + if url in seen_urls: + continue + seen_urls.add(url) + candidates.append(await _format_track(item)) + artist_names.add(item["artists"][0]["name"] if item.get("artists") else "") + except Exception: + pass + + # Strategy 2: track name search (covers, remixes by different artists) + try: + async with session.get( + f"{SPOTIFY_API_BASE}/search", + params={"q": track_name, "type": "track", "limit": str(needed)}, + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status == 200: + sd = await resp.json() + for item in sd.get("tracks", {}).get("items", []): + url = item["external_urls"]["spotify"] + if url in seen_urls: + continue + if len(candidates) >= needed: + break + seen_urls.add(url) + candidates.append(await _format_track(item)) + except Exception: + pass + + # Sort: prefer tracks by DIFFERENT artists first, then same-artist tracks + different_artist = [c for c in candidates if c["author"] != artist_name] + same_artist = [c for c in candidates if c["author"] == artist_name and c["url"] != seed_url] + + final = (different_artist + same_artist)[:limit] + return final