]> code.delx.au - notipod/blob - itunes.py
Initial checkin
[notipod] / itunes.py
1 #!/usr/bin/env python
2 # Copyright 2009 James Bunton <jamesbunton@fastmail.fm>
3 # Licensed for distribution under the GPL version 2, check COPYING for details
4
5 import logging
6 import os
7 import shutil
8 import urllib
9
10 from Foundation import *
11
12
13 def read_plist(filename):
14 try:
15 data = buffer(open(filename).read())
16 except IOError:
17 return None
18 plist, fmt, err = NSPropertyListSerialization.propertyListFromData_mutabilityOption_format_errorDescription_(data, NSPropertyListMutableContainers, None, None)
19 if err is not None:
20 errStr = err.encode("utf-8")
21 err.release() # Doesn't follow Cocoa conventions for some reason
22 raise TypeError(errStr)
23 return plist
24
25 class Playlist(object):
26 def __init__(self, name, tracks, parent=None):
27 self.name = name
28 self.children = []
29 self.tracks = tracks
30 if parent is not None:
31 parent.children.append(self)
32
33 class Library(NSObject):
34 def init(self):
35 self.initWithFilename_("~/Music/iTunes/iTunes Music Library.xml")
36
37 def initWithFilename_(self, filename):
38 filename = os.path.expanduser(filename)
39 plist = read_plist(os.path.expanduser(filename))
40 self.folder = self.loc2name(plist["Music Folder"])
41 pl_tracks = plist["Tracks"]
42 self.playlists = []
43 for pl_playlist in plist["Playlists"]:
44 self.playlists.append(self.make_playlist(pl_playlist, pl_tracks))
45
46 def loc2name(self, location):
47 return urllib.splithost(urllib.splittype(urllib.unquote(location))[1])[1]
48
49 def make_playlist(self, pl_playlist, pl_tracks):
50 name = pl_playlist["Name"]
51 tracks = []
52 for item in pl_playlist.get("Playlist Items", []):
53 trackID = item["Track ID"]
54 filename = str(pl_tracks[str(trackID)]["Location"])
55 filename = self.loc2name(filename)
56 filename = filename[len(self.folder):]
57 filename = eval(repr(filename).lstrip("u")).decode("utf-8")
58 tracks.append(filename)
59 return Playlist(name, tracks)
60
61 def has_playlist(self, playlist):
62 for p in self.playlists:
63 if p.name == playlist:
64 return True
65 return False
66
67 def get_playlist(self, name):
68 playlist = [p for p in self.playlists if p.name == name]
69 return playlist.tracks
70
71 def list_playlists(self):
72 return [p.name for p in self.playlists]
73
74 def outlineView_numberOfChildrenOfItem_(self, view, item):
75 if item == None:
76 return len(self.playlists)
77 else:
78 return 0
79
80 def outlineView_isItemExpandable_(self, view, item):
81 return False
82
83 def outlineView_child_ofItem_(self, view, index, item):
84 if item == None:
85 return self.playlists[index]
86 else:
87 return None
88
89 def outlineView_objectValueForTableColumn_byItem_(self, view, column, item):
90 return item.name
91
92
93 def export_m3u(dry_run, dest, drive_letter, music_dir, playlist_name, files):
94 if dry_run:
95 return
96 f = open(os.path.join(dest, playlist_name) + ".m3u", "w")
97 for filename in files:
98 filename = filename.replace("/", "\\").encode("utf-8")
99 f.write("%s:\\%s\\%s\n" % (drive_letter, music_dir, filename))
100 f.close()
101
102 def strip_prefix(s, prefix):
103 assert s.startswith(prefix)
104 s = s[len(prefix):]
105 if s.startswith("/"):
106 s = s[1:]
107 return s
108
109 def mkdirhier(path):
110 if os.path.isdir(path):
111 return
112 paths = [path]
113 while path != "/":
114 path = os.path.split(path)[0]
115 paths.append(path)
116 for path in reversed(paths):
117 try:
118 os.mkdir(path)
119 except OSError:
120 pass
121
122 def sync(dry_run, source, dest, files):
123 join = os.path.join
124
125 logging.info("Calculating files to sync and deleting old files")
126 files = set(files)
127 for dirpath, dirnames, filenames in os.walk(dest):
128 full_dirpath = dirpath
129 dirpath = strip_prefix(dirpath, dest)
130
131 for filename in filenames:
132 filename = join(dirpath, filename).decode("utf-8")
133
134 # Whenever 'file' is deleted OSX will helpfully remove '._file'
135 if not os.path.exists(join(dest, filename)):
136 continue
137
138 if filename in files:
139 sourcestat = os.stat(join(source, filename))
140 deststat = os.stat(join(dest, filename))
141 same_time = abs(sourcestat.st_mtime - deststat.st_mtime) < 5
142 same_size = sourcestat.st_size == deststat.st_size
143 if same_time and same_size:
144 files.remove(filename)
145 logging.debug("keep: " + filename)
146 else:
147 logging.debug("update: " + filename)
148
149 elif not filename.startswith("Playlists/"):
150 logging.debug("delete: " + filename)
151 if not dry_run:
152 os.unlink(join(dest, filename))
153
154 if len(os.listdir(full_dirpath)) == 0:
155 logging.debug("rmdir: " + dirpath)
156 if not dry_run:
157 os.rmdir(full_dirpath)
158
159
160 logging.info("Copying new files")
161 files = list(files)
162 files.sort()
163 for filename in files:
164 logging.debug("copy: " + filename)
165 if not dry_run:
166 mkdirhier(os.path.dirname(join(dest, filename)))
167 shutil.copy2(join(source, filename), join(dest, filename))
168
169