]> code.delx.au - notipod/blob - libnotipod.py
Various improvements:
[notipod] / libnotipod.py
1 #!/usr/bin/env python
2 # Copyright 2009 James Bunton <jamesbunton@fastmail.fm>
3 # Licensed for distribution under the GPL version 2, check COPYING for details
4
5 import collections
6 import logging
7 import os
8 import shutil
9 import sys
10 import urllib
11
12 from Foundation import *
13
14
15 def read_plist(filename):
16 try:
17 data = buffer(open(filename).read())
18 except IOError:
19 return None
20 plist, fmt, err = NSPropertyListSerialization.propertyListFromData_mutabilityOption_format_errorDescription_(data, NSPropertyListMutableContainers, None, None)
21 if err is not None:
22 errStr = err.encode("utf-8")
23 err.release() # Doesn't follow Cocoa conventions for some reason
24 raise TypeError(errStr)
25 return plist
26
27 class Playlist(NSObject):
28 def init(self):
29 return self
30
31 def set(self, name, pid, ptype, tracks, parent):
32 self.name = name
33 self.pid = pid
34 self.ptype = ptype
35 self.children = []
36 self.tracks = tracks
37 self.parent = parent
38 if parent is not None:
39 parent.children.append(self)
40
41 class ITunesLibrary(NSObject):
42 def load_(self, filename=None):
43 if filename is None:
44 filename = getattr(self, "filename", None)
45 if filename is None:
46 filename = os.path.expanduser("~/Music/iTunes/iTunes Music Library.xml")
47 self.filename = filename
48 self.mtime = os.stat(filename).st_mtime
49 yield "Reading library..."
50 plist = read_plist(os.path.expanduser(filename))
51 if plist is None:
52 raise Exception("Could not find music library: " + filename)
53 self.folder = self.loc2name(plist["Music Folder"])
54 pl_tracks = plist["Tracks"]
55 pl_lookup = {}
56 self.playlists = []
57 self.track2playlist = collections.defaultdict(set)
58 self.track2filename = {}
59 for pl_playlist in plist["Playlists"]:
60 playlist = self.make_playlist(pl_playlist, pl_tracks, pl_lookup)
61 if not playlist:
62 continue
63 yield "Read playlist: " + playlist.name
64 self.playlists.append(playlist)
65 pl_lookup[playlist.pid] = playlist
66
67 def needs_reload(self):
68 return os.stat(self.filename).st_mtime > self.mtime
69
70 def loc2name(self, location):
71 return urllib.splithost(urllib.splittype(urllib.unquote(location))[1])[1]
72
73 def make_playlist(self, pl_playlist, pl_tracks, pl_lookup):
74 if int(pl_playlist.get("Master", 0)):
75 return
76 kind = int(pl_playlist.get("Distinguished Kind", -1))
77 if kind == 26:
78 # Don't do genius
79 return
80
81 name = pl_playlist["Name"]
82 pid = pl_playlist["Playlist Persistent ID"]
83 if kind > 0:
84 ptype = {
85 2: "movies",
86 3: "tv-shows",
87 4: "music",
88 5: "books",
89 10: "podcasts",
90 19: "purchased",
91 22: "itunes-dj",
92 31: "itunes-u",
93 }.get(kind, "playlist")
94 elif pl_playlist.has_key("Smart Info"):
95 ptype = "smart-playlist"
96 elif int(pl_playlist.get("Folder", 0)):
97 ptype = "folder"
98 else:
99 ptype = "playlist"
100
101 parent = None
102 try:
103 parent_pid = pl_playlist["Parent Persistent ID"]
104 parent = pl_lookup[parent_pid]
105 except KeyError:
106 pass
107
108 tracks = []
109 for item in pl_playlist.get("Playlist Items", []):
110 trackID = item["Track ID"]
111 item = pl_tracks[str(trackID)]
112 self.track2playlist[trackID].add(pid)
113 tracks.append(trackID)
114 if trackID not in self.track2filename:
115 if item["Track Type"] != "File":
116 continue
117 filename = str(item["Location"])
118 filename = self.loc2name(filename)
119 filename = filename.decode("utf-8")
120 if not filename.startswith(self.folder):
121 logging.warn("Skipping: " + filename)
122 continue
123 filename = strip_prefix(filename, self.folder)
124 self.track2filename[trackID] = filename
125
126 playlist = Playlist.alloc().init()
127 playlist.set(name, pid, ptype, tracks, parent)
128 return playlist
129
130 def has_playlist_name(self, name):
131 for p in self.get_playlists():
132 if p.name == name:
133 return True
134 return False
135
136 def get_playlist_name(self, name):
137 for playlist in self.get_playlists():
138 if playlist.name == name:
139 return playlist
140
141 def get_playlist_pid(self, pid):
142 for playlist in self.get_playlists():
143 if playlist.pid == pid:
144 return playlist
145
146 def get_track_filename(self, trackID):
147 return self.track2filename.get(trackID, None)
148
149 def get_track_playlists(self, trackID):
150 return self.track2playlist.get(trackID, [])
151
152 def get_playlists(self):
153 return self.playlists
154
155
156 encoded_names = {}
157 valid_chars = frozenset("\\/-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
158 def encode_filename(filename):
159 try:
160 return encoded_names[filename]
161 except KeyError:
162 pass
163 orig_filename = filename
164 filename = filename.encode("ascii", "ignore")
165 filename = "".join(c for c in filename if c in valid_chars)
166 if filename in encoded_names:
167 a, b = os.path.splitext(filename)
168 a += "-dup"
169 filename = a + b
170 encoded_names[orig_filename] = filename
171 return filename
172
173 def strip_prefix(s, prefix):
174 assert s.startswith(prefix)
175 s = s[len(prefix):]
176 if s.startswith("/"):
177 s = s[1:]
178 return s
179
180 def mkdirhier(path):
181 if os.path.isdir(path):
182 return
183 paths = [path]
184 while path != "/":
185 path = os.path.split(path)[0]
186 paths.append(path)
187 for path in reversed(paths):
188 try:
189 os.mkdir(path)
190 except OSError:
191 pass
192
193 def delete_playlists(dry_run, dest):
194 dest = os.path.join(dest, "-Playlists-")
195 try:
196 filenames = os.listdir(dest)
197 except OSError:
198 return
199
200 for filename in filenames:
201 if not filename.lower().endswith(".m3u"):
202 continue
203 filename = os.path.join(dest, filename)
204 logging.info("Deleting: " + filename)
205 if not dry_run:
206 try:
207 os.unlink(filename)
208 except OSError:
209 pass
210
211 def export_m3u(dry_run, dest, path_prefix, playlist_name, files):
212 dest = os.path.join(dest, "-Playlists-")
213 mkdirhier(dest)
214 playlist_name = playlist_name.replace("/", "-")
215 playlist_file = os.path.join(dest, playlist_name) + ".m3u"
216 playlist_file = encode_filename(playlist_file)
217 logging.info("Writing: " + playlist_file)
218
219 if dry_run:
220 return
221
222 sep = "/"
223 if path_prefix.find("\\") > 0:
224 sep = "\\"
225 if path_prefix[-1] != sep:
226 path_prefix += sep
227
228 f = open(playlist_file, "w")
229 for filename in files:
230 if sep == "\\":
231 filename = filename.replace("/", "\\")
232 filename = encode_filename(filename)
233 f.write("%s%s\n" % (path_prefix, filename))
234 f.close()
235
236 def sync(dry_run, source, dest, files_to_copy):
237 join = os.path.join
238
239 logging.info("Calculating files to sync and deleting old files")
240 source = source.encode("utf-8")
241 dest = dest.encode("utf-8")
242 filemap = {}
243 class SyncFile(object): pass
244 for f in files_to_copy:
245 sf = SyncFile()
246 sf.orig_filename = f.encode("utf-8")
247 sf.encoded_filename = encode_filename(f)
248 filemap[sf.encoded_filename.lower()] = sf
249 files_to_copy = set(filemap)
250
251 for dirpath, dirnames, filenames in os.walk(dest):
252 full_dirpath = dirpath
253 dirpath = strip_prefix(dirpath, dest)
254
255 for filename in filenames:
256 filename = join(dirpath, filename)
257
258 # Whenever 'file' is deleted OSX will helpfully remove '._file'
259 if not os.path.exists(join(dest, filename)):
260 continue
261
262 if filename.lower() in files_to_copy:
263 source_filename = filemap[filename.lower()].orig_filename
264 sourcestat = os.stat(join(source, source_filename))
265 deststat = os.stat(join(dest, filename))
266 same_time = abs(sourcestat.st_mtime - deststat.st_mtime) < 5
267 same_size = sourcestat.st_size == deststat.st_size
268 if same_time and same_size:
269 files_to_copy.remove(filename.lower())
270 yield "Keep: " + filename
271 else:
272 yield "Update: " + filename
273
274 elif not filename.startswith("-Playlists-"):
275 yield "Delete: " + filename
276 if not dry_run:
277 os.unlink(join(dest, filename))
278
279 if len(os.listdir(full_dirpath)) == 0:
280 yield "Delete: " + dirpath
281 if not dry_run:
282 os.rmdir(full_dirpath)
283
284
285 logging.info("Copying new files")
286 files_to_copy = list(files_to_copy)
287 files_to_copy.sort()
288 for filename in files_to_copy:
289 yield "Copy: " + filemap[filename].orig_filename
290 if not dry_run:
291 source_file = join(source, filemap[filename].orig_filename)
292 dest_file = join(dest, filemap[filename].encoded_filename)
293 mkdirhier(os.path.dirname(dest_file))
294 shutil.copy2(source_file, dest_file)
295
296