import logging
import os
import shutil
+import sys
import urllib
from Foundation import *
raise TypeError(errStr)
return plist
-class Playlist(object):
- def __init__(self, name, tracks, parent=None):
+class Playlist(NSObject):
+ def init(self):
+ return self
+
+ def set(self, name, pid, tracks, parent):
self.name = name
+ self.pid = pid
self.children = []
self.tracks = tracks
+ self.parent = parent
if parent is not None:
parent.children.append(self)
-class Library(NSObject):
- def init(self):
- return self.initWithFilename_("~/Music/iTunes/iTunes Music Library.xml")
-
- def initWithFilename_(self, filename):
+class ITunesLibrary(NSObject):
+ def load_(self, filename):
+ if filename is None:
+ filename = "~/Music/iTunes/iTunes Music Library.xml"
filename = os.path.expanduser(filename)
+ yield "Reading library..."
plist = read_plist(os.path.expanduser(filename))
self.folder = self.loc2name(plist["Music Folder"])
pl_tracks = plist["Tracks"]
- self.playlists = []
+ self.playlists = {}
for pl_playlist in plist["Playlists"]:
- self.playlists.append(self.make_playlist(pl_playlist, pl_tracks))
- return self
+ playlist = self.make_playlist(pl_playlist, pl_tracks)
+ yield "Read playlist: " + playlist.name
+ self.playlists[playlist.pid] = playlist
def loc2name(self, location):
return urllib.splithost(urllib.splittype(urllib.unquote(location))[1])[1]
def make_playlist(self, pl_playlist, pl_tracks):
name = pl_playlist["Name"]
+ pid = pl_playlist["Playlist Persistent ID"]
+ parent = None
+ try:
+ parent_pid = pl_playlist["Parent Persistent ID"]
+ parent = self.playlists.get(parent_pid)
+ except KeyError:
+ pass
tracks = []
for item in pl_playlist.get("Playlist Items", []):
trackID = item["Track ID"]
filename = str(pl_tracks[str(trackID)]["Location"])
filename = self.loc2name(filename)
- filename = filename[len(self.folder):]
- filename = eval(repr(filename).lstrip("u")).decode("utf-8")
+ filename = filename.decode("utf-8")
+ if not filename.startswith(self.folder):
+ logging.warn("Skipping: " + filename)
+ continue
+ filename = strip_prefix(filename, self.folder)
tracks.append(filename)
- return Playlist(name, tracks)
+ playlist = Playlist.alloc().init()
+ playlist.set(name, pid, tracks, parent)
+ return playlist
- def has_playlist(self, playlist):
- for p in self.playlists:
- if p.name == playlist:
+ def has_playlist_name(self, name):
+ for p in self.get_playlists():
+ if p.name == name:
return True
return False
- def get_playlist(self, name):
- playlist = [p for p in self.playlists if p.name == name][0]
- return playlist.tracks
+ def get_playlist_name(self, name):
+ for playlist in self.get_playlists():
+ if playlist.name == name:
+ return playlist
+
+ def get_playlist_pid(self, pid):
+ for playlist in self.get_playlists():
+ if playlist.pid == pid:
+ return playlist
- def list_playlists(self):
- return [p.name for p in self.playlists]
+ def get_playlists(self):
+ return self.playlists.values()
def outlineView_numberOfChildrenOfItem_(self, view, item):
if item == None:
return item.name
-def export_m3u(dry_run, dest, drive_letter, music_dir, playlist_name, files):
- if dry_run:
- return
- f = open(os.path.join(dest, playlist_name) + ".m3u", "w")
- for filename in files:
- filename = filename.replace("/", "\\").encode("utf-8")
- f.write("%s:\\%s\\%s\n" % (drive_letter, music_dir, filename))
- f.close()
+encoded_names = {}
+valid_chars = frozenset("\\/-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
+def encode_filename(filename):
+ try:
+ return encoded_names[filename]
+ except KeyError:
+ pass
+ orig_filename = filename
+ filename = filename.encode("ascii", "ignore")
+ filename = "".join(c for c in filename if c in valid_chars)
+ if filename in encoded_names:
+ a, b = os.path.splitext(filename)
+ a += "-dup"
+ filename = a + b
+ encoded_names[orig_filename] = filename
+ return filename
def strip_prefix(s, prefix):
assert s.startswith(prefix)
except OSError:
pass
-def sync(dry_run, source, dest, files):
+def export_m3u(dry_run, dest, path_prefix, playlist_name, files):
+ if dry_run:
+ return
+ if not path_prefix:
+ path_prefix = "../"
+ playlist_file = os.path.join(dest, "-Playlists-", playlist_name) + ".m3u"
+ mkdirhier(os.path.dirname(playlist_file))
+ logging.info("Writing: " + playlist_file)
+ f = open(playlist_file, "w")
+ for filename in files:
+ if path_prefix.find("\\") > 0:
+ filename = filename.replace("/", "\\")
+ filename = encode_filename(filename)
+ f.write("%s%s\n" % (path_prefix, filename))
+ f.close()
+
+def sync(dry_run, source, dest, files_to_copy):
join = os.path.join
logging.info("Calculating files to sync and deleting old files")
- files = set(files)
+ source = source.encode("utf-8")
+ dest = dest.encode("utf-8")
+ filemap = {}
+ class SyncFile(object): pass
+ for f in files_to_copy:
+ sf = SyncFile()
+ sf.orig_filename = f.encode("utf-8")
+ sf.encoded_filename = encode_filename(f)
+ filemap[sf.encoded_filename.lower()] = sf
+ files_to_copy = set(filemap)
+
for dirpath, dirnames, filenames in os.walk(dest):
full_dirpath = dirpath
dirpath = strip_prefix(dirpath, dest)
for filename in filenames:
- filename = join(dirpath, filename).decode("utf-8")
+ filename = join(dirpath, filename)
# Whenever 'file' is deleted OSX will helpfully remove '._file'
if not os.path.exists(join(dest, filename)):
continue
- if filename in files:
- sourcestat = os.stat(join(source, filename))
+ if filename.lower() in files_to_copy:
+ source_filename = filemap[filename.lower()].orig_filename
+ sourcestat = os.stat(join(source, source_filename))
deststat = os.stat(join(dest, filename))
same_time = abs(sourcestat.st_mtime - deststat.st_mtime) < 5
same_size = sourcestat.st_size == deststat.st_size
if same_time and same_size:
- files.remove(filename)
- logging.debug("keep: " + filename)
+ files_to_copy.remove(filename.lower())
+ yield "Keep: " + filename
else:
- logging.debug("update: " + filename)
+ yield "Update: " + filename
- elif not filename.startswith("Playlists/"):
- logging.debug("delete: " + filename)
+ elif not filename.startswith("-Playlists-"):
+ yield "Delete: " + filename
if not dry_run:
os.unlink(join(dest, filename))
if len(os.listdir(full_dirpath)) == 0:
- logging.debug("rmdir: " + dirpath)
+ yield "Delete: " + dirpath
if not dry_run:
os.rmdir(full_dirpath)
logging.info("Copying new files")
- files = list(files)
- files.sort()
- for filename in files:
- logging.debug("copy: " + filename)
+ files_to_copy = list(files_to_copy)
+ files_to_copy.sort()
+ for filename in files_to_copy:
+ yield "Copy: " + filemap[filename].orig_filename
if not dry_run:
mkdirhier(os.path.dirname(join(dest, filename)))
- shutil.copy2(join(source, filename), join(dest, filename))
+ shutil.copy2(
+ join(source, filemap[filename].orig_filename),
+ join(dest, filemap[filename].encoded_filename)
+ )