import os
from eyed3.core import VARIOUS_TYPE
from eyed3.utils.prompt import prompt
from nicfit.console.ansi import Style, Fg
from ..orm import Artist, Library, Image, IMAGE_TABLES
from ..core import Command
from ..console import promptArtist, selectArtist
from ..util import normalizeCountry, commonDirectoryPrefix, mostCommonItem
"""Metadata management commands."""
[docs]@Command.register
class SplitArtists(Command):
NAME = "split-artists"
HELP = "Split a single artist name into N distinct artists."
_library_arg_nargs = 1
def _initArgParser(self, parser):
super()._initArgParser(parser)
parser.add_argument("artist", help="The name of the artist to split.")
def _displayArtistMusic(self, artist, albums, singles):
if albums:
print("%d albums by %s:" % (len(albums),
Style.bright(Fg.blue(artist.name))))
for alb in albums:
print("%s %s" % (str(alb.getBestDate()).center(17),
alb.title))
if singles:
print("%d single tracks by %s" %
(len(singles), Style.bright(Fg.blue(artist.name))))
for s in singles:
print("\t%s" % (s.title))
def _run(self):
session = self.db_session
lib = session.query(Library).filter(Library.name == self.args.lib).one()
artists = session.query(Artist).filter(Artist.lib_id == lib.id)\
.filter(Artist.name == self.args.artist)\
.all()
if not artists:
print("Artist not found: %s" % self.args.artist)
return 1
elif len(artists) > 1:
artist = selectArtist(Fg.blue("Select which '%s' to split...") %
artists[0].name,
choices=artists, allow_create=False)
else:
artist = artists[0]
# Albums by artist
albums = list(artist.albums) + artist.getAlbumsByType(VARIOUS_TYPE)
# Singles by artist and compilations the artist appears on
singles = artist.getTrackSingles()
if len(albums) < 2 and len(singles) < 2:
print("%d albums and %d singles found for '%s', nothing to do." %
(len(albums), len(singles), artist.name))
return 0
self._displayArtistMusic(artist, albums, singles)
def _validN(_n):
try:
return _n > 1 and _n <= len(albums)
except Exception:
return False
n = prompt("\nEnter the number of distinct artists", type_=int,
validate=_validN)
new_artists = []
for i in range(1, n + 1):
print(Style.bright("\n%s #%d") % (Fg.blue(artist.name), i))
# Reuse original artist for first
a = artist if i == 1 else Artist(name=artist.name,
date_added=artist.date_added,
lib_id=artist.lib_id)
a.origin_city = prompt(" City", required=False)
a.origin_state = prompt(" State", required=False)
a.origin_country = prompt(" Country", required=False,
type_=normalizeCountry)
new_artists.append(a)
if not Artist.checkUnique(new_artists):
print(Fg.red("Artists must be unique."))
return 1
for a in new_artists:
session.add(a)
# New Artist objects need IDs
session.flush()
print(Style.bright("\nAssign albums to the correct artist."))
for i, a in enumerate(new_artists):
print("Enter %s%d%s for %s from %s%s%s" %
(Style.BRIGHT, i + 1, Style.RESET_BRIGHT,
a.name,
Style.BRIGHT, a.origin(country_code="iso3c"),
Style.RESET_BRIGHT))
# prompt for correct artists
def _promptForArtist(_text):
a = prompt(_text, type_=int,
choices=range(1, len(new_artists) + 1))
return new_artists[a - 1]
print("")
for alb in albums:
# Get some of the path to help the decision
path = commonDirectoryPrefix(*[t.path for t in alb.tracks])
path = os.path.join(*path.split(os.sep)[-2:])
a = _promptForArtist("%s (%s)" % (alb.title, path))
if alb.type != VARIOUS_TYPE:
alb.artist_id = a.id
for track in alb.tracks:
if track.artist_id == artist.id:
track.artist_id = a.id
print("")
for track in singles:
a = _promptForArtist(track.title)
track.artist_id = a.id
session.flush()
[docs]@Command.register
class MergeArtists(Command):
NAME = "merge-artists"
HELP = "Merge two or more artists into a single artist."
_library_arg_nargs = 1
def _initArgParser(self, parser):
super()._initArgParser(parser)
parser.add_argument("artists", nargs="+",
help="The artist names to merge.")
def _run(self):
session = self.db_session
lib = session.query(Library).filter(Library.name == self.args.lib).one()
merge_list = []
for artist_arg in self.args.artists:
artists = session.query(Artist)\
.filter(Artist.name == artist_arg)\
.filter(Artist.lib_id == lib.id).all()
if len(artists) == 1:
merge_list.append(artists[0])
elif len(artists) > 1:
merge_list += selectArtist(
Fg.blue("Select the artists to merge..."),
multiselect=True, choices=artists)
if len(merge_list) > 1:
# Reuse lowest id
artist_ids = {a.id: a for a in merge_list}
min_id = min(*artist_ids.keys())
artist = artist_ids[min_id]
mc = mostCommonItem
new_artist = promptArtist(
"Merging %d artists into new artist..." % len(merge_list),
default_name=mc([a.name for a in merge_list]),
default_city=mc([a.origin_city for a in merge_list]),
default_state=mc([a.origin_state for a in merge_list]),
default_country=mc([a.origin_country for a in merge_list]),
artist=artist)
new_artist.lib_id = lib.id
else:
print("Nothing to do, %s" %
("artist not found" if not len(merge_list)
else "only one artist found"))
return 1
assert(new_artist in merge_list)
for artist in merge_list:
if artist is new_artist:
continue
with session.no_autoflush:
for alb in list(artist.albums):
if alb.type != VARIOUS_TYPE:
alb.artist_id = new_artist.id
artist.albums.remove(alb)
with session.no_autoflush:
new_artist.albums.append(alb)
for track in alb.tracks:
if track.artist_id == artist.id:
# gotta check in case alb is type various
track.artist_id = new_artist.id
for track in artist.getTrackSingles():
track.artist_id = new_artist.id
# flush to get new artist ids in sync before delete, otherwise
# cascade happens.
session.flush()
session.delete(artist)
session.flush()
# FIXME: prompt for whether the tags should be updated with the new
# FIXME: name if it is new.
[docs]@Command.register
class Images(Command):
NAME = "image"
HELP = "Image mgmt."
_library_arg_nargs = 1
def _initArgParser(self, parser):
super()._initArgParser(parser)
parser.add_argument("ids", nargs="+", help="The image IDs operate on.")
parser.add_argument("--remove", action="store_true",
help="Remove images from the database.")
def _run(self):
for id_ in self.args.ids:
image = self.db_session.query(Image).filter(Image.id == int(id_)).first()
if not image:
print(f"Image not found: {id_}")
continue
if self.args.remove:
# For orm types for these, but clean jump tables
for table in IMAGE_TABLES:
self.db_session.execute(f"DELETE FROM {table.name} where img_id={id_}")
self.db_session.delete(image)
else:
print(image)
self.db_session.commit()