diff --git a/cogs/music.py b/cogs/music.py index 51d4f52..94867a8 100644 --- a/cogs/music.py +++ b/cogs/music.py @@ -28,7 +28,8 @@ search_youtube_candidates, ) from utils.spotify_resolver import ( - get_spotify_recommendations, + get_artist_top_tracks, + get_similar_artists_tracks, get_track_details, resolve_spotify_url, parse_spotify_url, @@ -39,10 +40,10 @@ SEARCH_SUGGESTION_LIMIT = 10 SEARCH_PREVIEW_LIMIT = 5 -# Number of tracks to auto-queue when the queue runs dry -AUTOPLAY_FILL_TRACKS = 5 -# Extra candidates to fetch beyond the target fill count (to filter out seed/duplicates) +AUTOPLAY_MAX_TRACKS = 5 AUTOPLAY_SEARCH_BUFFER = 5 +MAX_SEED_HISTORY = 5 +MAX_ARTIST_HISTORY = 10 @dataclass(slots=True) @@ -396,6 +397,11 @@ async def stop_btn(self, interaction: discord.Interaction, button: discord.ui.Bu if interaction.guild_id: msg_id = interaction.message.id if interaction.message else None await self.cog._stop_nowplaying_cleanup(interaction.guild_id, exclude_msg_id=msg_id) + self.cog._seed_history.pop(interaction.guild_id, None) + self.cog._artist_history.pop(interaction.guild_id, None) + self.cog._autoplay_url_history.pop(interaction.guild_id, None) + self.cog._autoplay_title_history.pop(interaction.guild_id, None) + self.cog._last_track_info.pop(interaction.guild_id, None) asyncio.create_task(_delete_msg_after(interaction.message)) except Exception as e: await interaction.response.send_message(f"Error: {e}", ephemeral=True, delete_after=15) @@ -498,6 +504,17 @@ async def open_move_modal(self, interaction: discord.Interaction, button: discor await interaction.response.send_modal(modal) +CACHE_MAX_SIZE = 500 + + +def _trim_cache(cache: dict, max_size: int = CACHE_MAX_SIZE) -> None: + while len(cache) > max_size: + try: + cache.pop(next(iter(cache))) + except (StopIteration, RuntimeError): + break + + async def _delete_msg_after(msg: discord.Message | None, delay: int = 15) -> None: if msg is None: return @@ -562,24 +579,23 @@ async def song_query_autocomplete(interaction: discord.Interaction, current: str class MusicCog(commands.Cog): def __init__(self, bot): self.bot = bot - # Autoplay SIEMPRE activo para todos los servidores self._autoplay_guilds: set[int] = set() - # last track info per guild, used to seed autoplay searches self._last_track_info: dict[int, dict] = {} - # cache Lavalink results for Spotify URLs: url -> (tracks, load_type) self._spotify_cache: dict[str, tuple] = {} - # guard against registering lavalink event hooks more than once self._hooks_registered: bool = False - # track history per guild for the ⏮ previous button (list of URIs) self._track_history: dict[int, list[str]] = {} - # nowplaying messages per guild for auto-update: guild_id -> [(message, requester_name, requester_avatar)] self._nowplaying_messages: dict[int, list[tuple[discord.Message, str, str | None]]] = {} - # channel where /play was last used per guild, for auto-sending nowplaying self._nowplaying_channels: dict[int, int] = {} - # cache Spotify details per track URI for progress updates self._spotify_details_cache: dict[str, dict] = {} - # background task for nowplaying progress updates self._progress_task: asyncio.Task | None = None + # seed history per guild for autoplay diversity (max 5 entries) + self._seed_history: dict[int, list[dict]] = {} + # artist history per guild to avoid repeating artists in autoplay (max 10 names) + self._artist_history: dict[int, list[str]] = {} + # URLs recently added by autoplay (max 20 per guild) to avoid repeats across sessions + self._autoplay_url_history: dict[int, list[str]] = {} + # (author||title) history to catch duplicates with different URLs (e.g. same track from diff releases) + self._autoplay_title_history: dict[int, list[str]] = {} def _get_bot_member(self, interaction: discord.Interaction): if interaction.guild is None or self.bot.user is None: @@ -745,6 +761,7 @@ async def _build_nowplaying_embed_auto( ) if spotify_details: self._spotify_details_cache[track.uri] = spotify_details + _trim_cache(self._spotify_details_cache) except Exception: pass @@ -953,7 +970,10 @@ async def _queue_selected_track( track = results.tracks[0] track.requester = interaction.user.id track.extra["requester_name"] = interaction.user.display_name - player.add(track) + if is_spotify_url(choice.url): + track.extra["original_uri"] = choice.url + user_count = sum(1 for t in player.queue if getattr(t, 'requester', None) != self.bot.user.id) + player.queue.insert(user_count, track) started = False if not player.is_playing: @@ -1233,6 +1253,7 @@ async def _queue_query( results = await player.node.get_tracks(normalized_query) if is_spotify_url(normalized_query) and results and results.tracks: self._spotify_cache[normalized_query] = (results,) + _trim_cache(self._spotify_cache) print(f"[{log_prefix}] Cache guardado para: {normalized_query[:60]}") except Exception as exc: print(f"[{log_prefix}] Error en Lavalink: {type(exc).__name__}: {exc}") @@ -1245,6 +1266,7 @@ async def _queue_query( results = await player.node.get_tracks(normalized_query) if results and results.tracks: self._spotify_cache[normalized_query] = (results,) + _trim_cache(self._spotify_cache) print(f"[{log_prefix}] ✓ Éxito en reintento") else: print(f"[{log_prefix}] ✗ Reintento también falló") @@ -1259,11 +1281,13 @@ async def _queue_query( tracks = results.tracks display_title = "" + user_count = sum(1 for t in player.queue if getattr(t, 'requester', 0) != self.bot.user.id) if results.load_type == lavalink.LoadType.PLAYLIST: for track in tracks: track.requester = interaction.user.id track.extra["requester_name"] = interaction.user.display_name - player.add(track) + for i, track in enumerate(tracks): + player.queue.insert(user_count + i, track) playlist_name = getattr(results.playlist_info, "name", "Playlist") display_title = f"{playlist_name} ({len(tracks)} canciones)" @@ -1271,7 +1295,9 @@ async def _queue_query( track = tracks[0] track.requester = interaction.user.id track.extra["requester_name"] = interaction.user.display_name - player.add(track) + if is_spotify_url(normalized_query): + track.extra["original_uri"] = normalized_query + player.queue.insert(user_count, track) display_title = track.title or query print(f"[{log_prefix}] ✓ En cola: {display_title}") @@ -1323,6 +1349,7 @@ async def _queue_spotify_collection( loaded = 0 failed = 0 first_title = "" + user_count = sum(1 for t in player.queue if getattr(t, 'requester', 0) != self.bot.user.id) for i, track_url in enumerate(urls): print( @@ -1339,11 +1366,12 @@ async def _queue_spotify_collection( results = await player.node.get_tracks(track_url) if results and results.tracks: self._spotify_cache[track_url] = (results,) + _trim_cache(self._spotify_cache) if results and results.tracks: track = results.tracks[0] track.requester = interaction.user.id track.extra["requester_name"] = interaction.user.display_name - player.add(track) + player.queue.insert(user_count + loaded, track) loaded += 1 if not first_title: first_title = track.title or "" @@ -1423,6 +1451,7 @@ async def _queue_playlist_query( loaded = 0 failed = 0 display_title = "" + user_count = sum(1 for t in player.queue if getattr(t, 'requester', 0) != self.bot.user.id) for track_url in urls: try: @@ -1433,11 +1462,12 @@ async def _queue_playlist_query( results = await player.node.get_tracks(track_url) if results and results.tracks: self._spotify_cache[track_url] = (results,) + _trim_cache(self._spotify_cache) if results and results.tracks: track = results.tracks[0] track.requester = interaction.user.id track.extra["requester_name"] = interaction.user.display_name - player.add(track) + player.queue.insert(user_count + loaded, track) loaded += 1 if not display_title: display_title = track.title or "" @@ -1497,7 +1527,9 @@ async def _queue_playlist_query( for track in tracks: track.requester = interaction.user.id track.extra["requester_name"] = interaction.user.display_name - player.add(track) + user_count = sum(1 for t in player.queue if getattr(t, 'requester', 0) != self.bot.user.id) + for i, track in enumerate(tracks): + player.queue.insert(user_count + i, track) playlist_name = getattr(results.playlist_info, "name", None) or ( playlist_result["title"] if playlist_result else "Playlist" @@ -1521,94 +1553,170 @@ async def _queue_playlist_query( async def _autofill_queue(self, player) -> int: """Search for related tracks and add them to the player queue. - Uses Spotify recommendations if available, falls back to YouTube search. + Uses YouTube search with artist name for related music discovery. + Rotates search queries to get different recommendations each time. + Adds 1 to AUTOPLAY_MAX_TRACKS tracks. Returns the number of tracks added. """ - from config import SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET - guild_id = player.guild_id - seed_info = self._last_track_info.get(guild_id, {}) - spotify_id = seed_info.get("spotify_id") - title = seed_info.get("title") or "" - - if not title: - print("[AUTOPLAY] No hay información de la última canción para semilla") + seeds = self._seed_history.get(guild_id, []) + if not seeds: + fallback = self._last_track_info.get(guild_id, {}) + if fallback.get("title"): + seeds = [fallback] + if not seeds: + print("[AUTOPLAY] No hay semilla para autoplay") return 0 - added = 0 - queued_urls: set[str] = set() + from config import SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET - seed_uri = seed_info.get("uri") or "" - seed_title_lower = title.lower().strip() + added = 0 + existing_titles: set[str] = set() + for t in player.queue: + existing_titles.add((t.title or "").lower().strip()) + if player.current: + existing_titles.add((player.current.title or "").lower().strip()) candidates: list[dict] = [] + seen_urls: set[str] = set() + searched_authors: set[str] = set() + recent_urls: set[str] = set(self._autoplay_url_history.get(guild_id, [])) + recent_titles: set[str] = set(self._autoplay_title_history.get(guild_id, [])) - if spotify_id and SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET: - print(f"[AUTOPLAY] Recomendaciones vía Spotify: seed_track={spotify_id}") - try: - spotify_recs = await get_spotify_recommendations( - spotify_id, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, - limit=AUTOPLAY_FILL_TRACKS + AUTOPLAY_SEARCH_BUFFER, - ) - if spotify_recs: - print(f"[AUTOPLAY] Spotify devolvió {len(spotify_recs)} recomendaciones") - for rec in spotify_recs: - if rec["url"] not in queued_urls and rec["url"] != seed_uri: - candidates.append(rec) - queued_urls.add(rec["url"]) - except Exception as exc: - print(f"[AUTOPLAY] Error en recomendaciones Spotify: {exc}") + for seed in seeds: + if len(candidates) >= AUTOPLAY_MAX_TRACKS + AUTOPLAY_SEARCH_BUFFER: + break + spotify_id = seed.get("spotify_id") + # Try Spotify artist top-tracks (most reliable: same artist, their best songs) + if spotify_id and SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET: + try: + recs = await get_artist_top_tracks( + spotify_id, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, + limit=AUTOPLAY_MAX_TRACKS + AUTOPLAY_SEARCH_BUFFER, + ) + for r in recs: + url = r.get("url", "") + if url and url not in seen_urls and url not in recent_urls: + seen_urls.add(url) + candidates.append(r) + except Exception: + pass + # Try similar artists via Spotify search (co-listened discovery) for variety + if spotify_id and SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET: + try: + similar = await get_similar_artists_tracks( + spotify_id, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, + limit=AUTOPLAY_MAX_TRACKS + AUTOPLAY_SEARCH_BUFFER, + exclude_artists=searched_authors, + ) + for r in similar: + url = r.get("url", "") + if url and url not in seen_urls and url not in recent_urls: + seen_urls.add(url) + candidates.append(r) + except Exception: + pass + title = seed.get("title", "").lower().strip() + author = seed.get("author", "") + # Skip YouTube search if we already have enough candidates + if len(candidates) >= AUTOPLAY_MAX_TRACKS: + continue + # Skip if we already searched this artist + if not author or author in searched_authors: + continue + searched_authors.add(author) - if not candidates: - author = seed_info.get("author") or "" - query = f"{title} {author}".strip() if author else title - print(f"[AUTOPLAY] Fallback a YouTube: {query!r}") - try: - yt_candidates = await search_youtube_candidates( - query, limit=AUTOPLAY_FILL_TRACKS + AUTOPLAY_SEARCH_BUFFER - ) - for c in yt_candidates: - url = c.get("url") or "" - if url and url not in queued_urls and url != seed_uri: + # Rotate search queries for variety across autoplay sessions + queries = [f"{author} - topic", f"{author} songs", f"{author} music"] + random.shuffle(queries) + for artist_query in queries: + try: + yt = await search_youtube_candidates( + artist_query, limit=AUTOPLAY_MAX_TRACKS + AUTOPLAY_SEARCH_BUFFER + ) + for c in yt: + url = c.get("url", "") + if not url or url in seen_urls or url in recent_urls: + continue + c_title = (c.get("title") or "").lower().strip() + # Skip if it's the same song (title matches seed) + if title and title in c_title: + continue + seen_urls.add(url) candidates.append(c) - queued_urls.add(url) - except Exception as exc: - print(f"[AUTOPLAY] Error en búsqueda YouTube: {exc}") - return 0 + except Exception: + pass + if candidates: + break + if candidates: + break + # Fallback: search with title + author (may return same song versions) + query = f"{title} {author}".strip() if author else title + if query: + try: + yt = await search_youtube_candidates( + query, limit=AUTOPLAY_MAX_TRACKS + AUTOPLAY_SEARCH_BUFFER + ) + for c in yt: + url = c.get("url", "") + if url and url not in seen_urls and url not in recent_urls: + seen_urls.add(url) + candidates.append(c) + except Exception: + pass - existing_titles: set[str] = set() - for t in player.queue: - existing_titles.add((t.title or "").lower().strip()) - if player.current: - existing_titles.add((player.current.title or "").lower().strip()) + random.shuffle(candidates) for candidate in candidates: - if added >= AUTOPLAY_FILL_TRACKS: + if added >= AUTOPLAY_MAX_TRACKS: break - cand_url = candidate.get("url") or "" cand_title = (candidate.get("title") or "").lower().strip() + cand_author = (candidate.get("author") or candidate.get("uploader") or candidate.get("channel") or "").strip() if not cand_url: continue - if cand_title == seed_title_lower or cand_title in existing_titles: + if cand_url in recent_urls: + continue + cand_key = f"{cand_author}|{cand_title}" + if cand_key in recent_titles: + continue + if cand_title in existing_titles: continue - try: results = await player.node.get_tracks(cand_url) except Exception as exc: print(f"[AUTOPLAY] Error cargando pista {cand_url}: {exc}") continue - if not results or not results.tracks: continue - track = results.tracks[0] track.requester = self.bot.user.id track.extra["requester_name"] = "Ghostify" + track.extra["original_uri"] = cand_url player.add(track) existing_titles.add((track.title or "").lower().strip()) added += 1 - print(f"[AUTOPLAY] Añadida: {track.title}") + print(f"[AUTOPLAY] Añadida: {track.title} — {cand_url}") + + # Track autoplay URL and title history to avoid repeats + url_history = self._autoplay_url_history.setdefault(guild_id, []) + url_history.append(cand_url) + if len(url_history) > 20: + url_history.pop(0) + title_history = self._autoplay_title_history.setdefault(guild_id, []) + title_history.append(cand_key) + if len(title_history) > 20: + title_history.pop(0) + + # Update artist history for diversity + cand_author = candidate.get("author") or candidate.get("uploader") or candidate.get("channel") or "" + if cand_author: + ah = self._artist_history.setdefault(guild_id, []) + if cand_author in ah: + ah.remove(cand_author) + ah.append(cand_author) + if len(ah) > MAX_ARTIST_HISTORY: + ah.pop(0) return added @@ -1616,32 +1724,53 @@ async def _autofill_queue(self, player) -> int: async def on_track_start(self, event: lavalink.TrackStartEvent) -> None: """Save metadata of the currently playing track for autoplay seeding and history.""" track = event.track - print(f"[TRACK_START] Evento recibido: track={track.title if track else None}, guild={event.player.guild_id}", flush=True) + print(f"[TRACK_START] Evento recibido: track={track.title if track else None}, guild={event.player.guild_id}") if track is None: return uri = track.uri or "" + track_extra = getattr(track, "extra", {}) or {} + original_uri = track_extra.get("original_uri") or uri spotify_id: str | None = None - if uri.startswith("https://open.spotify.com/track/"): - parsed = parse_spotify_url(uri) + if original_uri.startswith("https://open.spotify.com/track/"): + parsed = parse_spotify_url(original_uri) if parsed: spotify_id = parsed[1] - track_extra = getattr(track, "extra", {}) or {} - requester_name = track_extra.get("requester_name", None) or "Ghostify" - self._last_track_info[event.player.guild_id] = { + requester_name = track_extra.get("requester_name") or "Ghostify" + seed_entry = { "title": track.title or "", "author": track.author or "", "uri": uri, "spotify_id": spotify_id, "requester": requester_name, } + self._last_track_info[event.player.guild_id] = seed_entry guild_id = event.player.guild_id history = self._track_history.setdefault(guild_id, []) + + seeds = self._seed_history.setdefault(guild_id, []) + seeds.append(seed_entry) + if len(seeds) > MAX_SEED_HISTORY: + seeds.pop(0) if not history or history[-1] != uri: history.append(uri) if len(history) > 20: history.pop(0) + # Track all played Spotify URLs to avoid repetition in autoplay + track_title = (track.title or "").lower().strip() + track_author = (track.author or "").strip() + if original_uri.startswith("https://open.spotify.com/track/"): + url_history = self._autoplay_url_history.setdefault(guild_id, []) + url_history.append(original_uri) + if len(url_history) > 20: + url_history.pop(0) + if track_author and track_title: + title_history = self._autoplay_title_history.setdefault(guild_id, []) + title_history.append(f"{track_author}|{track_title}") + if len(title_history) > 20: + title_history.pop(0) + entries = self._nowplaying_messages.get(guild_id, []) print(f"[NOWPLAYING_UPDATE] guild={guild_id} entries_count={len(entries)}", flush=True) if entries: @@ -1671,6 +1800,15 @@ async def on_track_start(self, event: lavalink.TrackStartEvent) -> None: self._nowplaying_messages[guild_id] = [(view.message, "Ghostify", None)] self._nowplaying_channels.pop(guild_id, None) + # Pre-fill queue early when running low (avoids delay between songs) + if guild_id in self._autoplay_guilds: + remaining = len(event.player.queue) + if remaining <= 2: + print(f"[AUTOPLAY] Pre-fill: quedan {remaining} canciones, cargando siguientes…") + added = await self._autofill_queue(event.player) + if added > 0 and not event.player.is_playing: + await event.player.play() + @lavalink.listener(lavalink.QueueEndEvent) async def on_queue_end(self, event: lavalink.QueueEndEvent) -> None: """When the queue runs out, auto-fill it if autoplay is enabled for this guild.""" @@ -2137,6 +2275,11 @@ async def stop(self, interaction: discord.Interaction): if interaction.guild_id: await self._stop_nowplaying_cleanup(interaction.guild_id) + self._seed_history.pop(interaction.guild_id, None) + self._artist_history.pop(interaction.guild_id, None) + self._autoplay_url_history.pop(interaction.guild_id, None) + self._autoplay_title_history.pop(interaction.guild_id, None) + self._last_track_info.pop(interaction.guild_id, None) print("[STOP] ✓ Bot desconectado") await self._send_embed( @@ -2220,6 +2363,10 @@ async def on_guild_remove(self, guild: discord.Guild) -> None: self._nowplaying_channels.pop(guild.id, None) self._nowplaying_messages.pop(guild.id, None) self._track_history.pop(guild.id, None) + self._seed_history.pop(guild.id, None) + self._artist_history.pop(guild.id, None) + self._autoplay_url_history.pop(guild.id, None) + self._autoplay_title_history.pop(guild.id, None) lavalink_client = self._get_lavalink_client() if lavalink_client: try: diff --git a/utils/spotify_resolver.py b/utils/spotify_resolver.py index 54adcbd..5a9ae8b 100644 --- a/utils/spotify_resolver.py +++ b/utils/spotify_resolver.py @@ -215,17 +215,33 @@ async def get_spotify_recommendations( client_id: str, client_secret: str, limit: int = 5, + *, + exclude_artists: set[str] | None = None, ) -> list[dict]: - """Get smart recommendations using Spotify. - - Strategies: - 1. Tracks by the same artist (different songs) - 2. Covers/remixes by track name - 3. Mix both pools prioritizing variety + """Get smart recommendations using Spotify's related-artists endpoint. + + Strategies (in order): + 1. Related artists (based on Spotify community listening data) -> top tracks + 2. Same artist (other songs) + 3. Fallback: search by track name (covers/remixes by different artists) + + Args: + seed_track_id: Spotify track ID to seed recommendations + client_id: Spotify Client ID + client_secret: Spotify Client Secret + limit: max tracks to return + exclude_artists: set of artist names to exclude (for diversity) + + Returns: + List of dicts with keys: url, title, author, duration, thumbnail """ if not client_id or not client_secret: return [] + seen_urls: set[str] = set() + candidates: list[dict] = [] + exclude = exclude_artists or set() + async with aiohttp.ClientSession() as session: token = await _get_access_token(client_id, client_secret, session) if not token: @@ -241,22 +257,81 @@ async def get_spotify_recommendations( track_name = data.get("name", "") seed_url = data.get("external_urls", {}).get("spotify", "") - artist_name = data["artists"][0]["name"] if data.get("artists") else "" + artists_data = data.get("artists", []) + if not artists_data: + return [] + artist_id = artists_data[0]["id"] + artist_name = artists_data[0]["name"] + seen_urls.add(seed_url) if not track_name: return [] - artist_names = set() - seen_urls: set[str] = {seed_url} - candidates: list[dict] = [] - needed = limit + 5 + # Strategy 1: related-artists -> top tracks + try: + async with session.get( + f"{SPOTIFY_API_BASE}/artists/{artist_id}/related-artists", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status == 200: + ra_data = await resp.json() + related = ra_data.get("artists", []) + print(f"[SPOTIFY] related-artists: {len(related)} encontrados para '{artist_name}'") + for ra in related[:5]: # max 5 related artists + ra_name = ra.get("name", "") + ra_id = ra.get("id", "") + if ra_name in exclude: + continue + if not ra_id: + continue + async with session.get( + f"{SPOTIFY_API_BASE}/artists/{ra_id}/top-tracks", + params={"market": "ES"}, + headers={"Authorization": f"Bearer {token}"}, + ) as tt_resp: + if tt_resp.status == 200: + tt_data = await tt_resp.json() + for item in tt_data.get("tracks", [])[:3]: + url = item.get("external_urls", {}).get("spotify", "") + if not url or url in seen_urls: + continue + seen_urls.add(url) + candidates.append(await _format_track(item)) + if len(candidates) >= limit + 3: + break + if len(candidates) >= limit + 3: + break + except Exception as exc: + print(f"[SPOTIFY] Error en related-artists: {exc}") + + # Strategy 2: same artist (other songs from the seed artist) + if len(candidates) < limit + 3 and artist_name not in exclude: + try: + async with session.get( + f"{SPOTIFY_API_BASE}/artists/{artist_id}/top-tracks", + params={"market": "ES"}, + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status == 200: + tt_data = await resp.json() + for item in tt_data.get("tracks", []): + url = item.get("external_urls", {}).get("spotify", "") + if not url or url in seen_urls: + continue + seen_urls.add(url) + candidates.append(await _format_track(item)) + if len(candidates) >= limit + 3: + break + except Exception as exc: + print(f"[SPOTIFY] Error en top-tracks del artista: {exc}") - # Strategy 1: tracks by same artist (different songs) - if artist_name: + # Strategy 3: fallback search by track name (covers, remixes) + if len(candidates) < limit + 3: try: + needed = (limit + 3) - len(candidates) async with session.get( f"{SPOTIFY_API_BASE}/search", - params={"q": f'artist:"{artist_name}"', "type": "track", "limit": "10"}, + params={"q": track_name, "type": "track", "limit": str(needed)}, headers={"Authorization": f"Bearer {token}"}, ) as resp: if resp.status == 200: @@ -267,33 +342,172 @@ async def get_spotify_recommendations( continue seen_urls.add(url) candidates.append(await _format_track(item)) - artist_names.add(item["artists"][0]["name"] if item.get("artists") else "") + if len(candidates) >= limit + 3: + break except Exception: pass - # Strategy 2: track name search (covers, remixes by different artists) - try: - async with session.get( - f"{SPOTIFY_API_BASE}/search", - params={"q": track_name, "type": "track", "limit": str(needed)}, - headers={"Authorization": f"Bearer {token}"}, - ) as resp: - if resp.status == 200: - sd = await resp.json() - for item in sd.get("tracks", {}).get("items", []): - url = item["external_urls"]["spotify"] - if url in seen_urls: - continue - if len(candidates) >= needed: - break - seen_urls.add(url) - candidates.append(await _format_track(item)) - except Exception: - pass - - # Sort: prefer tracks by DIFFERENT artists first, then same-artist tracks + # Sort: prefer tracks by DIFFERENT artists first different_artist = [c for c in candidates if c["author"] != artist_name] same_artist = [c for c in candidates if c["author"] == artist_name and c["url"] != seed_url] final = (different_artist + same_artist)[:limit] return final + + +async def get_artist_top_tracks( + seed_track_id: str, + client_id: str, + client_secret: str, + limit: int = 5, +) -> list[dict]: + """Get popular tracks by the artist via Spotify search. + + Fetches the seed track to get the artist name, then searches + Spotify for the artist's top tracks. Returns list of formatted + track dicts with Spotify URLs (resolvable by Lavalink). + """ + if not client_id or not client_secret: + return [] + + async with aiohttp.ClientSession() as session: + token = await _get_access_token(client_id, client_secret, session) + if not token: + return [] + + async with session.get( + f"{SPOTIFY_API_BASE}/tracks/{seed_track_id}", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status != 200: + return [] + data = await resp.json() + + artists_data = data.get("artists", []) + if not artists_data: + return [] + artist_name = artists_data[0]["name"] + + async with session.get( + f"{SPOTIFY_API_BASE}/search", + params={"q": artist_name, "type": "track", "limit": "10"}, + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status != 200: + return [] + sd = await resp.json() + + results: list[dict] = [] + seen_urls: set[str] = set() + seen_titles: set[str] = set() + seed_url = data.get("external_urls", {}).get("spotify", "") + seed_title = (data.get("name", "") or "").lower().strip() + seen_titles.add(seed_title) + for item in sd.get("tracks", {}).get("items", []): + url = item.get("external_urls", {}).get("spotify", "") + if not url or url in seen_urls or url == seed_url: + continue + seen_urls.add(url) + # Only include tracks where seed artist is the primary artist + primary_artist = (item.get("artists") or [{}])[0].get("name", "") + if primary_artist != artist_name: + continue + item_title = (item.get("name", "") or "").lower().strip() + if item_title in seen_titles: + continue + seen_titles.add(item_title) + results.append(await _format_track(item)) + if len(results) >= limit: + break + + return results + + +async def get_similar_artists_tracks( + seed_track_id: str, + client_id: str, + client_secret: str, + limit: int = 5, + *, + exclude_artists: set[str] | None = None, +) -> list[dict]: + """Find similar artists and their top tracks via Spotify search. + + Uses search?q={artist}&type=artist which returns co-listened artists + (Spotify's discovery algorithm). Then fetches top tracks for the + most relevant similar artists via search?q={similar}&type=track. + + Returns list of formatted track dicts with Spotify URLs. + """ + if not client_id or not client_secret: + return [] + + async with aiohttp.ClientSession() as session: + token = await _get_access_token(client_id, client_secret, session) + if not token: + return [] + + async with session.get( + f"{SPOTIFY_API_BASE}/tracks/{seed_track_id}", + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status != 200: + return [] + data = await resp.json() + + artists_data = data.get("artists", []) + if not artists_data: + return [] + seed_artist_name = artists_data[0]["name"] + seed_url = data.get("external_urls", {}).get("spotify", "") or "" + exclude = exclude_artists or set() + exclude.add(seed_artist_name) + + async with session.get( + f"{SPOTIFY_API_BASE}/search", + params={"q": seed_artist_name, "type": "artist", "limit": "8"}, + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status != 200: + return [] + sd = await resp.json() + + similar_artists: list[str] = [] + for item in sd.get("artists", {}).get("items", []): + name = item.get("name", "") + if name.lower() != seed_artist_name.lower() and name not in exclude: + similar_artists.append(name) + + if not similar_artists: + return [] + + results: list[dict] = [] + seen_urls: set[str] = set() + if seed_url: + seen_urls.add(seed_url) + needed = limit + 3 + + for sa_name in similar_artists[:3]: + if len(results) >= needed: + break + async with session.get( + f"{SPOTIFY_API_BASE}/search", + params={"q": sa_name, "type": "track", "limit": "10"}, + headers={"Authorization": f"Bearer {token}"}, + ) as resp: + if resp.status != 200: + continue + td = await resp.json() + for item in td.get("tracks", {}).get("items", []): + url = item.get("external_urls", {}).get("spotify", "") + if not url or url in seen_urls: + continue + primary_artist = (item.get("artists") or [{}])[0].get("name", "") + if primary_artist != sa_name: + continue + seen_urls.add(url) + results.append(await _format_track(item)) + if len(results) >= needed: + break + + return results[:limit]