mirror of
https://github.com/csd4ni3l/music-player.git
synced 2026-01-01 04:03:42 +01:00
Remove FFmpeg Linux download which wouldnt work and add messageboxes, add a yes/no messagebox for yt-dlp, add acoustid music recognition, only support MP3, split online_metadata to multiple files, add missing metadata to files automatically, add synchronized lyrics pane
This commit is contained in:
75
utils/acoustid_metadata.py
Normal file
75
utils/acoustid_metadata.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import os, platform, tarfile, acoustid, urllib.request, shutil, gzip, glob, logging, sys, io
|
||||
|
||||
from utils.constants import ACOUSTID_API_KEY
|
||||
|
||||
from zipfile import ZipFile
|
||||
|
||||
def get_fpcalc_name():
|
||||
system = platform.system()
|
||||
if system == "Linux" or system == "Darwin":
|
||||
return "fpcalc"
|
||||
elif system == "Windows":
|
||||
return "fpcalc.exe"
|
||||
|
||||
def get_fpcalc_path():
|
||||
return os.path.join(os.getcwd(), "bin", get_fpcalc_name())
|
||||
|
||||
def download_fpcalc():
|
||||
system = platform.system()
|
||||
architecture = platform.machine()
|
||||
|
||||
if system == "Linux":
|
||||
url = "https://github.com/acoustid/chromaprint/releases/download/v1.5.1/chromaprint-fpcalc-1.5.1-linux-x86_64.tar.gz"
|
||||
elif system == "Darwin":
|
||||
if architecture.lower() == "x86_64" or architecture.lower() == "amd64":
|
||||
url = "https://github.com/acoustid/chromaprint/releases/download/v1.5.1/chromaprint-fpcalc-1.5.1-macos-x86_64.tar.gz"
|
||||
else:
|
||||
url = "https://github.com/acoustid/chromaprint/releases/download/v1.5.1/chromaprint-fpcalc-1.5.1-macos-arm64.tar.gz"
|
||||
elif system == "Windows":
|
||||
url = "https://github.com/acoustid/chromaprint/releases/download/v1.5.1/chromaprint-fpcalc-1.5.1-windows-x86_64.zip"
|
||||
|
||||
if url.endswith(".zip"):
|
||||
zip_path = os.path.join(os.getcwd(), "bin", "chromaprint.zip")
|
||||
urllib.request.urlretrieve(url, zip_path)
|
||||
with ZipFile(zip_path) as file:
|
||||
file.extractall(os.path.join(os.getcwd(), "bin"))
|
||||
|
||||
os.remove(zip_path)
|
||||
else:
|
||||
tar_gz_path = os.path.join(os.getcwd(), "bin", "chromaprint.tar.gz")
|
||||
urllib.request.urlretrieve(url, tar_gz_path)
|
||||
|
||||
with gzip.open(tar_gz_path, "rb") as f: # For some reason, tarfile by itself didnt work for tar.gz so i have to uncompress with gzip first and then with tarfile
|
||||
with tarfile.open(fileobj=io.BytesIO(f.read())) as tar:
|
||||
tar.extractall(os.path.join(os.getcwd(), "bin"))
|
||||
|
||||
os.remove(tar_gz_path)
|
||||
|
||||
chromaprint_matches = glob.glob(os.path.join("bin", "chromaprint*"))
|
||||
if chromaprint_matches:
|
||||
shutil.move(os.path.join(chromaprint_matches[0], get_fpcalc_name()), os.path.join("bin", get_fpcalc_name()))
|
||||
shutil.rmtree(chromaprint_matches[0])
|
||||
|
||||
os.chmod(get_fpcalc_path(), 0o755)
|
||||
|
||||
def get_recording_id_from_acoustic(filename):
|
||||
os.environ["FPCALC"] = get_fpcalc_path()
|
||||
|
||||
try:
|
||||
results = acoustid.match(ACOUSTID_API_KEY, filename, meta=['recordings'], force_fpcalc=True, parse=False)["results"]
|
||||
except acoustid.NoBackendError:
|
||||
logging.debug("ChromaPrint library/tool not found")
|
||||
return None, None
|
||||
except acoustid.FingerprintGenerationError:
|
||||
logging.debug("Fingerprint could not be calculated")
|
||||
return None, None
|
||||
except acoustid.WebServiceError as exc:
|
||||
logging.debug(f"Web service request failed: {exc}")
|
||||
return None, None
|
||||
|
||||
if not results:
|
||||
return None, None
|
||||
|
||||
result = results[0]
|
||||
|
||||
return result["id"], result["recordings"][0]["id"]
|
||||
@@ -6,10 +6,13 @@ from arcade.gui.widgets.slider import UISliderStyle
|
||||
menu_background_color = (17, 17, 17)
|
||||
log_dir = 'logs'
|
||||
discord_presence_id = 1368277020332523530
|
||||
audio_extensions = ["mp3", "m4a", "aac", "flac", "ogg", "opus", "wav"]
|
||||
audio_extensions = ["mp3"]
|
||||
view_modes = ["files", "playlist"]
|
||||
|
||||
MUSIC_TITLE_WORD_BLACKLIST = ["compilation", "remix", "vs", "cover", "version", "instrumental", "restrung", "interlude"]
|
||||
COVER_CACHE_DIR = "cover_cache"
|
||||
ACOUSTID_API_KEY = 'PuUkMEnUXf'
|
||||
LRCLIB_BASE_URL = "https://lrclib.net/api/search"
|
||||
|
||||
MUSICBRAINZ_PROJECT_NAME = "csd4ni3l/music-player"
|
||||
MUSCIBRAINZ_VERSION = "git"
|
||||
|
||||
52
utils/cover_art.py
Normal file
52
utils/cover_art.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from io import BytesIO
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
from urllib.request import urlopen, Request
|
||||
from urllib.error import URLError, HTTPError
|
||||
|
||||
from utils.constants import COVER_CACHE_DIR, MUSCIBRAINZ_VERSION, MUSICBRAINZ_CONTACT, MUSICBRAINZ_PROJECT_NAME
|
||||
|
||||
import musicbrainzngs as music_api
|
||||
|
||||
import os, logging, arcade
|
||||
|
||||
def fetch_image_bytes(url):
|
||||
try:
|
||||
req = Request(url, headers={"User-Agent": "csd4ni3l/music-player/git python-musicbrainzngs/0.7.1 ( csd4ni3l@proton.me )"})
|
||||
with urlopen(req, timeout=10) as resp:
|
||||
return resp.read()
|
||||
except (HTTPError, URLError) as e:
|
||||
logging.debug(f"Error fetching {url}: {e}")
|
||||
return None
|
||||
|
||||
def download_cover_art(mb_album_id, size=250):
|
||||
path = os.path.join(COVER_CACHE_DIR, f"{mb_album_id}_{size}.png")
|
||||
if os.path.exists(path):
|
||||
return mb_album_id, Image.open(path)
|
||||
|
||||
url = f"https://coverartarchive.org/release/{mb_album_id}/front-{size}"
|
||||
img_bytes = fetch_image_bytes(url)
|
||||
if not img_bytes:
|
||||
return mb_album_id, None
|
||||
|
||||
try:
|
||||
img = Image.open(BytesIO(img_bytes)).convert("RGBA")
|
||||
img.save(path)
|
||||
return mb_album_id, img
|
||||
except Exception as e:
|
||||
logging.debug(f"Failed to decode/save image for {mb_album_id}: {e}")
|
||||
return mb_album_id, None
|
||||
|
||||
def download_albums_cover_art(album_ids, size=250, max_workers=5):
|
||||
music_api.set_useragent(MUSICBRAINZ_PROJECT_NAME, MUSCIBRAINZ_VERSION, MUSICBRAINZ_CONTACT)
|
||||
os.makedirs(COVER_CACHE_DIR, exist_ok=True)
|
||||
images = {}
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = [executor.submit(download_cover_art, album_id, size) for album_id in album_ids]
|
||||
for future in as_completed(futures):
|
||||
album_id, img = future.result()
|
||||
images[album_id] = arcade.Texture(img) if img else None
|
||||
return images
|
||||
58
utils/lyrics_metadata.py
Normal file
58
utils/lyrics_metadata.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import urllib.parse, urllib.request, json
|
||||
|
||||
from utils.utils import ensure_metadata_file
|
||||
from utils.constants import LRCLIB_BASE_URL
|
||||
|
||||
def convert_syncronized_time_to_seconds(synchronized_time):
|
||||
minutes_str, seconds_str = synchronized_time.split(":")
|
||||
return float(minutes_str) * 60 + float(seconds_str)
|
||||
|
||||
def parse_synchronized_lyrics(synchronized_lyrics: str):
|
||||
lyrics_list = {}
|
||||
|
||||
for lyrics_line in synchronized_lyrics.splitlines():
|
||||
uncleaned_date, text = lyrics_line.split("] ")
|
||||
cleaned_date = uncleaned_date.replace("[", "")
|
||||
|
||||
lyrics_list[convert_syncronized_time_to_seconds(cleaned_date)] = text
|
||||
|
||||
return list(lyrics_list.keys()), lyrics_list
|
||||
|
||||
def get_closest_time(current_time, lyrics_times):
|
||||
closest_time = 0
|
||||
|
||||
for lyrics_time in lyrics_times:
|
||||
if lyrics_time <= current_time and lyrics_time > closest_time:
|
||||
closest_time = lyrics_time
|
||||
|
||||
return closest_time
|
||||
|
||||
def get_lyrics(artist, title):
|
||||
metadata_cache = ensure_metadata_file()
|
||||
|
||||
if (artist, title) in metadata_cache["lyrics_by_artist_title"]:
|
||||
return metadata_cache["lyrics_by_artist_title"][(artist, title)]
|
||||
else:
|
||||
if artist:
|
||||
query = f"{artist} - {title}"
|
||||
else:
|
||||
query = title
|
||||
|
||||
query_string = urllib.parse.urlencode({"q": query})
|
||||
full_url = f"{LRCLIB_BASE_URL}?{query_string}"
|
||||
|
||||
with urllib.request.urlopen(full_url) as request:
|
||||
data = json.loads(request.read().decode("utf-8"))
|
||||
|
||||
for result in data:
|
||||
if result.get("plainLyrics") and result.get("syncedLyrics"):
|
||||
metadata_cache["lyrics_by_artist_title"][(artist, title)] = (result["plainLyrics"], result["syncedLyrics"])
|
||||
return (result["plainLyrics"], result["syncedLyrics"])
|
||||
|
||||
with open("metadata_cache.json", "w") as file:
|
||||
file.write(json.dumps(metadata_cache))
|
||||
|
||||
if artist: # if there was an artist, it might have been misleading. For example, on Youtube, the uploader might not be the artist. We retry with only title.
|
||||
return get_lyrics(None, title)
|
||||
|
||||
return [None, None]
|
||||
@@ -1,12 +1,12 @@
|
||||
import io, base64, tempfile, struct, re, os, logging, arcade, time
|
||||
import io, tempfile, re, os, logging, arcade, time
|
||||
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from mutagen.id3 import ID3, TXXX, ID3NoHeaderError
|
||||
from mutagen import File
|
||||
from mutagen.id3 import ID3, TXXX, SYLT, ID3NoHeaderError
|
||||
|
||||
from pydub import AudioSegment
|
||||
from PIL import Image
|
||||
|
||||
from utils.lyrics_metadata import parse_synchronized_lyrics
|
||||
from utils.utils import convert_seconds_to_date
|
||||
|
||||
def truncate_end(text: str, max_length: int) -> str:
|
||||
@@ -16,7 +16,7 @@ def truncate_end(text: str, max_length: int) -> str:
|
||||
return text
|
||||
return text[:max_length - 3] + '...'
|
||||
|
||||
def extract_metadata_and_thumbnail(file_path: str, thumb_resolution: tuple) -> tuple:
|
||||
def extract_metadata_and_thumbnail(file_path: str, thumb_resolution: tuple):
|
||||
artist = "Unknown"
|
||||
title = ""
|
||||
source_url = "Unknown"
|
||||
@@ -31,55 +31,40 @@ def extract_metadata_and_thumbnail(file_path: str, thumb_resolution: tuple) -> t
|
||||
|
||||
basename = os.path.basename(file_path)
|
||||
name_only = os.path.splitext(basename)[0]
|
||||
ext = os.path.splitext(file_path)[1].lower().lstrip('.')
|
||||
|
||||
try:
|
||||
thumb_audio = EasyID3(file_path)
|
||||
try:
|
||||
artist = str(thumb_audio["artist"][0])
|
||||
title = str(thumb_audio["title"][0])
|
||||
upload_year = int(thumb_audio["date"][0])
|
||||
except KeyError:
|
||||
artist_title_match = re.search(r'^.+\s*-\s*.+$', title)
|
||||
if artist_title_match:
|
||||
title = title.split("- ")[1]
|
||||
easyid3 = EasyID3(file_path)
|
||||
if "artist" in easyid3:
|
||||
artist = easyid3["artist"][0]
|
||||
if "title" in easyid3:
|
||||
title = easyid3["title"][0]
|
||||
if "date" in easyid3:
|
||||
upload_year = int(re.match(r"\d{4}", easyid3["date"][0]).group())
|
||||
|
||||
file_audio = File(file_path)
|
||||
if hasattr(file_audio, 'info'):
|
||||
sound_length = round(file_audio.info.length, 2)
|
||||
bitrate = int((file_audio.info.bitrate or 0) / 1000)
|
||||
sample_rate = int(file_audio.info.sample_rate / 1000)
|
||||
id3 = ID3(file_path)
|
||||
for frame in id3.getall("WXXX"):
|
||||
desc = frame.desc.lower()
|
||||
if desc == "uploader":
|
||||
uploader_url = frame.url
|
||||
elif desc == "source":
|
||||
source_url = frame.url
|
||||
for frame in id3.getall("TXXX"):
|
||||
desc = frame.desc.lower()
|
||||
if desc == "last_played":
|
||||
last_played = float(frame.text[0])
|
||||
elif desc == "play_count":
|
||||
play_count = int(frame.text[0])
|
||||
except ID3NoHeaderError:
|
||||
pass
|
||||
|
||||
thumb_image_data = None
|
||||
if ext == 'mp3':
|
||||
for tag in file_audio.values():
|
||||
if tag.FrameID == "APIC":
|
||||
thumb_image_data = tag.data
|
||||
break
|
||||
elif ext in ('m4a', 'aac'):
|
||||
if 'covr' in file_audio:
|
||||
thumb_image_data = file_audio['covr'][0]
|
||||
elif ext == 'flac':
|
||||
if file_audio.pictures:
|
||||
thumb_image_data = file_audio.pictures[0].data
|
||||
elif ext in ('ogg', 'opus'):
|
||||
if "metadata_block_picture" in file_audio:
|
||||
pic_data = base64.b64decode(file_audio["metadata_block_picture"][0])
|
||||
header_len = struct.unpack(">I", pic_data[0:4])[0]
|
||||
thumb_image_data = pic_data[4 + header_len:]
|
||||
if hasattr(easyid3, "info"):
|
||||
sound_length = round(easyid3.info.length, 2)
|
||||
bitrate = int((easyid3.info.bitrate or 0) / 1000)
|
||||
sample_rate = int(easyid3.info.sample_rate / 1000)
|
||||
|
||||
id3 = ID3(file_path)
|
||||
for frame in id3.getall("WXXX"):
|
||||
if frame.desc.lower() == "uploader":
|
||||
uploader_url = frame.url
|
||||
elif frame.desc.lower() == "source":
|
||||
source_url = frame.url
|
||||
|
||||
for frame in id3.getall("TXXX"):
|
||||
if frame.desc.lower() == "last_played":
|
||||
last_played = float(frame.text[0])
|
||||
elif frame.desc.lower() == "play_count":
|
||||
play_count = int(frame.text[0])
|
||||
apic = id3.getall("APIC")
|
||||
thumb_image_data = apic[0].data if apic else None
|
||||
|
||||
if thumb_image_data:
|
||||
pil_image = Image.open(io.BytesIO(thumb_image_data)).convert("RGBA")
|
||||
@@ -90,22 +75,19 @@ def extract_metadata_and_thumbnail(file_path: str, thumb_resolution: tuple) -> t
|
||||
logging.debug(f"[Metadata/Thumbnail Error] {file_path}: {e}")
|
||||
|
||||
if artist == "Unknown" or not title:
|
||||
match = re.search(r'^(.*?)\s+-\s+(.*?)$', name_only)
|
||||
if match:
|
||||
file_path_artist, file_path_title = match.groups()
|
||||
if artist == "Unknown":
|
||||
artist = file_path_artist
|
||||
if not title:
|
||||
title = file_path_title
|
||||
m = re.match(r"^(.*?)\s+-\s+(.*?)$", name_only) # check for artist - title titles in the title
|
||||
if m:
|
||||
artist = m.group(1)
|
||||
title = m.group(2)
|
||||
|
||||
if not title:
|
||||
if not title:
|
||||
title = name_only
|
||||
|
||||
|
||||
if thumb_texture is None:
|
||||
from utils.preload import music_icon
|
||||
thumb_texture = music_icon
|
||||
|
||||
file_size = round(os.path.getsize(file_path) / (1024 ** 2), 2) # MiB
|
||||
file_size = round(os.path.getsize(file_path) / (1024 ** 2), 2)
|
||||
|
||||
return {
|
||||
"sound_length": sound_length,
|
||||
@@ -119,9 +101,10 @@ def extract_metadata_and_thumbnail(file_path: str, thumb_resolution: tuple) -> t
|
||||
"source_url": source_url,
|
||||
"artist": artist,
|
||||
"title": title,
|
||||
"thumbnail": thumb_texture
|
||||
"thumbnail": thumb_texture,
|
||||
}
|
||||
|
||||
|
||||
def adjust_volume(input_path, volume):
|
||||
audio = AudioSegment.from_file(input_path)
|
||||
change = volume - audio.dBFS
|
||||
@@ -191,3 +174,25 @@ def convert_timestamp_to_time_ago(timestamp):
|
||||
return convert_seconds_to_date(elapsed_time) + ' ago'
|
||||
else:
|
||||
return "Never"
|
||||
|
||||
def add_metadata_to_file(file_path, musicbrainz_artist_ids, artist, title, synchronized_lyrics, isrc, acoustid_id=None):
|
||||
easyid3 = EasyID3(file_path)
|
||||
easyid3["musicbrainz_artistid"] = musicbrainz_artist_ids
|
||||
easyid3["artist"] = artist
|
||||
easyid3["title"] = title
|
||||
easyid3["isrc"] = isrc
|
||||
|
||||
if acoustid_id:
|
||||
easyid3["acoustid_id"] = acoustid_id
|
||||
|
||||
easyid3.save()
|
||||
|
||||
id3 = ID3(file_path)
|
||||
id3.delall("SYLT")
|
||||
|
||||
lyrics_dict = parse_synchronized_lyrics(synchronized_lyrics)[1]
|
||||
synchronized_lyrics_tuples = [(text, int(lyrics_time * 1000)) for lyrics_time, text in lyrics_dict.items()] # * 1000 because format 2 means milliseconds
|
||||
|
||||
id3.add(SYLT(encoding=3, lang="eng", format=2, type=1, desc="From lrclib", text=synchronized_lyrics_tuples))
|
||||
|
||||
id3.save()
|
||||
@@ -1,20 +1,10 @@
|
||||
import musicbrainzngs as music_api
|
||||
|
||||
from io import BytesIO
|
||||
from utils.constants import MUSICBRAINZ_PROJECT_NAME, MUSICBRAINZ_CONTACT, MUSCIBRAINZ_VERSION, MUSIC_TITLE_WORD_BLACKLIST
|
||||
from utils.lyrics_metadata import get_lyrics
|
||||
from utils.utils import ensure_metadata_file
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
from urllib.request import urlopen, Request
|
||||
from urllib.error import URLError, HTTPError
|
||||
|
||||
from utils.constants import MUSICBRAINZ_PROJECT_NAME, MUSICBRAINZ_CONTACT, MUSCIBRAINZ_VERSION, COVER_CACHE_DIR
|
||||
|
||||
import urllib.request, json, os, arcade, logging, iso3166
|
||||
|
||||
WORD_BLACKLIST = ["compilation", "remix", "vs", "cover", "version", "instrumental", "restrung", "interlude"]
|
||||
LRCLIB_BASE_URL = "https://lrclib.net/api/search"
|
||||
import json, iso3166
|
||||
|
||||
def get_country(code):
|
||||
country = iso3166.countries.get(code, None)
|
||||
@@ -24,9 +14,9 @@ def check_blacklist(text, blacklist):
|
||||
return any(word in text for word in blacklist)
|
||||
|
||||
def finalize_blacklist(title):
|
||||
blacklist = WORD_BLACKLIST[:]
|
||||
blacklist = MUSIC_TITLE_WORD_BLACKLIST[:]
|
||||
|
||||
for word in WORD_BLACKLIST:
|
||||
for word in MUSIC_TITLE_WORD_BLACKLIST:
|
||||
if word in title:
|
||||
blacklist.remove(word)
|
||||
|
||||
@@ -35,21 +25,6 @@ def finalize_blacklist(title):
|
||||
def is_release_valid(release):
|
||||
return release.get("release-event-count", 0) == 0 # only include albums
|
||||
|
||||
def ensure_metadata_file():
|
||||
if os.path.exists("metadata_cache.json") and os.path.isfile("metadata_cache.json"):
|
||||
with open("metadata_cache.json", "r") as file:
|
||||
metadata_cache = json.load(file)
|
||||
else:
|
||||
metadata_cache = {
|
||||
"query_results": {},
|
||||
"recording_by_id": {},
|
||||
"artist_by_id": {},
|
||||
"lyrics_by_artist_title": {},
|
||||
"album_by_id": {}
|
||||
}
|
||||
|
||||
return metadata_cache
|
||||
|
||||
def get_artists_metadata(artist_ids):
|
||||
metadata_cache = ensure_metadata_file()
|
||||
|
||||
@@ -121,6 +96,7 @@ def extract_release_metadata(release_list):
|
||||
"album_name": release.get("title") if release else "Unknown",
|
||||
"album_date": release.get("date") if release else "Unknown",
|
||||
"album_country": (get_country(release.get("country", "WZ")) or "Worldwide") if release else "Unknown",
|
||||
"album_tracks": [track['recording']['title'] for track in release.get('medium-list', [{}])[0].get('track-list', [])[:3]]
|
||||
}
|
||||
metadata_cache["album_by_id"][release_id] = album_metadata[release_id]
|
||||
|
||||
@@ -142,7 +118,7 @@ def get_album_metadata(album_id):
|
||||
"album_name": release.get("title") if release else "Unknown",
|
||||
"album_date": release.get("date") if release else "Unknown",
|
||||
"album_country": (get_country(release.get("country", "WZ")) or "Worldwide") if release else "Unknown",
|
||||
"album_tracks": [track['recording']['title'] for track in release.get('medium-list', [])[0].get('track-list', {})[:3]]
|
||||
"album_tracks": [track['recording']['title'] for track in release.get('medium-list', [{}])[0].get('track-list', [])[:3]]
|
||||
}
|
||||
metadata_cache["album_by_id"][release["id"]] = album_metadata
|
||||
|
||||
@@ -162,6 +138,8 @@ def get_music_metadata(artist=None, title=None, musicbrainz_id=None):
|
||||
else:
|
||||
query = title
|
||||
|
||||
recording_id = None
|
||||
|
||||
if query in metadata_cache["query_results"]:
|
||||
recording_id = metadata_cache["query_results"][query]
|
||||
else:
|
||||
@@ -186,19 +164,30 @@ def get_music_metadata(artist=None, title=None, musicbrainz_id=None):
|
||||
if recording_id in metadata_cache["recording_by_id"]:
|
||||
detailed = metadata_cache["recording_by_id"][recording_id]
|
||||
else:
|
||||
detailed = music_api.get_recording_by_id(
|
||||
recording_id,
|
||||
includes=["artists", "releases", "isrcs", "tags", "ratings"]
|
||||
)["recording"]
|
||||
metadata_cache["recording_by_id"][recording_id] = {
|
||||
"title": detailed["title"],
|
||||
"artist-credit": [{"artist": {"id": artist_data["artist"]["id"]}} for artist_data in detailed.get("artist-credit", {}) if isinstance(artist_data, dict)],
|
||||
"isrc-list": detailed["isrc-list"] if "isrc-list" in detailed else [],
|
||||
"rating": {"rating": detailed["rating"]["rating"]} if "rating" in detailed else {},
|
||||
"tags": detailed.get("tag-list", []),
|
||||
"release-list": [{"id": release["id"], "title": release["title"], "status": release.get("status"), "date": release.get("date"), "country": release.get("country", "WZ")} for release in detailed["release-list"]] if "release-list" in detailed else [],
|
||||
"release-event-count": detailed.get("release-event-count", 0)
|
||||
}
|
||||
if recording_id:
|
||||
detailed = music_api.get_recording_by_id(
|
||||
recording_id,
|
||||
includes=["artists", "releases", "isrcs", "tags", "ratings"]
|
||||
)["recording"]
|
||||
metadata_cache["recording_by_id"][recording_id] = {
|
||||
"title": detailed["title"],
|
||||
"artist-credit": [{"artist": {"id": artist_data["artist"]["id"]}} for artist_data in detailed.get("artist-credit", {}) if isinstance(artist_data, dict)],
|
||||
"isrc-list": detailed["isrc-list"] if "isrc-list" in detailed else [],
|
||||
"rating": {"rating": detailed["rating"]["rating"]} if "rating" in detailed else {},
|
||||
"tags": detailed.get("tag-list", []),
|
||||
"release-list": [{"id": release["id"], "title": release["title"], "status": release.get("status"), "date": release.get("date"), "country": release.get("country", "WZ")} for release in detailed["release-list"]] if "release-list" in detailed else [],
|
||||
"release-event-count": detailed.get("release-event-count", 0)
|
||||
}
|
||||
else:
|
||||
detailed = metadata_cache["recording_by_id"][recording_id] = {
|
||||
"title": title,
|
||||
"artist-credit": [],
|
||||
"isrc-list": [],
|
||||
"rating": {},
|
||||
"tags": [],
|
||||
"release-list": [],
|
||||
"release-event-count": 0
|
||||
}
|
||||
|
||||
with open("metadata_cache.json", "w") as file:
|
||||
file.write(json.dumps(metadata_cache))
|
||||
@@ -213,73 +202,7 @@ def get_music_metadata(artist=None, title=None, musicbrainz_id=None):
|
||||
"musicbrainz_rating": detailed["rating"]["rating"] if "rating" in detailed.get("rating", {}) else "Unknown",
|
||||
"tags": [tag["name"] for tag in detailed.get("tag-list", [])]
|
||||
}
|
||||
return music_metadata, artist_metadata, album_metadata, get_lyrics(', '.join([artist for artist in artist_metadata]), detailed["title"])[0]
|
||||
|
||||
def get_lyrics(artist, title):
|
||||
metadata_cache = ensure_metadata_file()
|
||||
|
||||
if (artist, title) in metadata_cache["lyrics_by_artist_title"]:
|
||||
return metadata_cache["lyrics_by_artist_title"][(artist, title)]
|
||||
else:
|
||||
if artist:
|
||||
query = f"{artist} - {title}"
|
||||
else:
|
||||
query = title
|
||||
|
||||
query_string = urllib.parse.urlencode({"q": query})
|
||||
full_url = f"{LRCLIB_BASE_URL}?{query_string}"
|
||||
|
||||
with urllib.request.urlopen(full_url) as request:
|
||||
data = json.loads(request.read().decode("utf-8"))
|
||||
|
||||
for result in data:
|
||||
if result.get("plainLyrics") and result.get("syncedLyrics"):
|
||||
metadata_cache["lyrics_by_artist_title"][(artist, title)] = (result["plainLyrics"], result["syncedLyrics"])
|
||||
return (result["plainLyrics"], result["syncedLyrics"])
|
||||
|
||||
with open("metadata_cache.json", "w") as file:
|
||||
file.write(json.dumps(metadata_cache))
|
||||
|
||||
if artist: # if there was an artist, it might have been misleading. For example, on Youtube, the uploader might not be the artist. We retry with only title.
|
||||
return get_lyrics(None, title)
|
||||
|
||||
def fetch_image_bytes(url):
|
||||
try:
|
||||
req = Request(url, headers={"User-Agent": "csd4ni3l/music-player/git python-musicbrainzngs/0.7.1 ( csd4ni3l@proton.me )"})
|
||||
with urlopen(req, timeout=10) as resp:
|
||||
return resp.read()
|
||||
except (HTTPError, URLError) as e:
|
||||
logging.debug(f"Error fetching {url}: {e}")
|
||||
return None
|
||||
|
||||
def download_cover_art(mb_album_id, size=250):
|
||||
path = os.path.join(COVER_CACHE_DIR, f"{mb_album_id}_{size}.png")
|
||||
if os.path.exists(path):
|
||||
return mb_album_id, Image.open(path)
|
||||
|
||||
url = f"https://coverartarchive.org/release/{mb_album_id}/front-{size}"
|
||||
img_bytes = fetch_image_bytes(url)
|
||||
if not img_bytes:
|
||||
return mb_album_id, None
|
||||
|
||||
try:
|
||||
img = Image.open(BytesIO(img_bytes)).convert("RGBA")
|
||||
img.save(path)
|
||||
return mb_album_id, img
|
||||
except Exception as e:
|
||||
logging.debug(f"Failed to decode/save image for {mb_album_id}: {e}")
|
||||
return mb_album_id, None
|
||||
|
||||
def download_albums_cover_art(album_ids, size=250, max_workers=5):
|
||||
music_api.set_useragent(MUSICBRAINZ_PROJECT_NAME, MUSCIBRAINZ_VERSION, MUSICBRAINZ_CONTACT)
|
||||
os.makedirs(COVER_CACHE_DIR, exist_ok=True)
|
||||
images = {}
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = [executor.submit(download_cover_art, album_id, size) for album_id in album_ids]
|
||||
for future in as_completed(futures):
|
||||
album_id, img = future.result()
|
||||
images[album_id] = arcade.Texture(img) if img else None
|
||||
return images
|
||||
return music_metadata, artist_metadata, album_metadata, get_lyrics(', '.join([artist for artist in artist_metadata]), detailed["title"])
|
||||
|
||||
def search_recordings(search_term):
|
||||
music_api.set_useragent(MUSICBRAINZ_PROJECT_NAME, MUSCIBRAINZ_VERSION, MUSICBRAINZ_CONTACT)
|
||||
@@ -1,4 +1,4 @@
|
||||
import logging, sys, traceback, pyglet, arcade, arcade.gui, textwrap
|
||||
import logging, sys, traceback, pyglet, arcade, arcade.gui, textwrap, os, json
|
||||
|
||||
from utils.constants import menu_background_color
|
||||
|
||||
@@ -196,4 +196,19 @@ def get_wordwrapped_text(text, width=18):
|
||||
else:
|
||||
output_text = '\n'.join(textwrap.wrap(text, width=width))
|
||||
|
||||
return output_text
|
||||
return output_text
|
||||
|
||||
def ensure_metadata_file():
|
||||
if os.path.exists("metadata_cache.json") and os.path.isfile("metadata_cache.json"):
|
||||
with open("metadata_cache.json", "r") as file:
|
||||
metadata_cache = json.load(file)
|
||||
else:
|
||||
metadata_cache = {
|
||||
"query_results": {},
|
||||
"recording_by_id": {},
|
||||
"artist_by_id": {},
|
||||
"lyrics_by_artist_title": {},
|
||||
"album_by_id": {}
|
||||
}
|
||||
|
||||
return metadata_cache
|
||||
Reference in New Issue
Block a user