]> code.delx.au - notipod/blob - libnotipod.py
Allow path_prefix to be set from a file in the playlists directory
[notipod] / libnotipod.py
1 #!/usr/bin/env python
2 # Copyright 2009 James Bunton <jamesbunton@fastmail.fm>
3 # Licensed for distribution under the GPL version 2, check COPYING for details
4
5 import collections
6 import logging
7 import os
8 import shutil
9 import sys
10 import urllib
11
12 from Foundation import *
13
14
15 def read_plist(filename):
16 try:
17 data = buffer(open(filename).read())
18 except IOError:
19 return None
20 plist, fmt, err = NSPropertyListSerialization.propertyListFromData_mutabilityOption_format_errorDescription_(data, NSPropertyListMutableContainers, None, None)
21 if err is not None:
22 errStr = err.encode("utf-8")
23 err.release() # Doesn't follow Cocoa conventions for some reason
24 raise TypeError(errStr)
25 return plist
26
27 class Playlist(NSObject):
28 def init(self):
29 return self
30
31 def set(self, name, pid, ptype, tracks, parent):
32 self.name = name
33 self.pid = pid
34 self.ptype = ptype
35 self.children = []
36 self.tracks = tracks
37 self.parent = parent
38 if parent is not None:
39 parent.children.append(self)
40
41 class ITunesLibrary(NSObject):
42 def load_(self, filename):
43 if filename is None:
44 filename = "~/Music/iTunes/iTunes Music Library.xml"
45 filename = os.path.expanduser(filename)
46 yield "Reading library..."
47 plist = read_plist(os.path.expanduser(filename))
48 if plist is None:
49 raise Exception("Could not find music library: " + filename)
50 self.folder = self.loc2name(plist["Music Folder"])
51 pl_tracks = plist["Tracks"]
52 pl_lookup = {}
53 self.playlists = []
54 self.track2playlist = collections.defaultdict(set)
55 self.track2filename = {}
56 for pl_playlist in plist["Playlists"]:
57 playlist = self.make_playlist(pl_playlist, pl_tracks, pl_lookup)
58 if not playlist:
59 continue
60 yield "Read playlist: " + playlist.name
61 self.playlists.append(playlist)
62 pl_lookup[playlist.pid] = playlist
63
64 def loc2name(self, location):
65 return urllib.splithost(urllib.splittype(urllib.unquote(location))[1])[1]
66
67 def make_playlist(self, pl_playlist, pl_tracks, pl_lookup):
68 if int(pl_playlist.get("Master", 0)):
69 return
70 kind = int(pl_playlist.get("Distinguished Kind", -1))
71 if kind == 26:
72 # Don't do genius
73 return
74
75 name = pl_playlist["Name"]
76 pid = pl_playlist["Playlist Persistent ID"]
77 if kind > 0:
78 ptype = {
79 2: "movies",
80 3: "tv-shows",
81 4: "music",
82 5: "books",
83 10: "podcasts",
84 19: "purchased",
85 22: "itunes-dj",
86 31: "itunes-u",
87 }.get(kind, "playlist")
88 elif pl_playlist.has_key("Smart Info"):
89 ptype = "smart-playlist"
90 elif int(pl_playlist.get("Folder", 0)):
91 ptype = "folder"
92 else:
93 ptype = "playlist"
94
95 parent = None
96 try:
97 parent_pid = pl_playlist["Parent Persistent ID"]
98 parent = pl_lookup[parent_pid]
99 except KeyError:
100 pass
101
102 tracks = []
103 for item in pl_playlist.get("Playlist Items", []):
104 trackID = item["Track ID"]
105 item = pl_tracks[str(trackID)]
106 self.track2playlist[trackID].add(pid)
107 tracks.append(trackID)
108 if trackID not in self.track2filename:
109 if item["Track Type"] != "File":
110 continue
111 filename = str(item["Location"])
112 filename = self.loc2name(filename)
113 filename = filename.decode("utf-8")
114 if not filename.startswith(self.folder):
115 logging.warn("Skipping: " + filename)
116 continue
117 filename = strip_prefix(filename, self.folder)
118 self.track2filename[trackID] = filename
119
120 playlist = Playlist.alloc().init()
121 playlist.set(name, pid, ptype, tracks, parent)
122 return playlist
123
124 def has_playlist_name(self, name):
125 for p in self.get_playlists():
126 if p.name == name:
127 return True
128 return False
129
130 def get_playlist_name(self, name):
131 for playlist in self.get_playlists():
132 if playlist.name == name:
133 return playlist
134
135 def get_playlist_pid(self, pid):
136 for playlist in self.get_playlists():
137 if playlist.pid == pid:
138 return playlist
139
140 def get_track_filename(self, trackID):
141 return self.track2filename.get(trackID, None)
142
143 def get_track_playlists(self, trackID):
144 return self.track2playlist.get(trackID, [])
145
146 def get_playlists(self):
147 return self.playlists
148
149
150 encoded_names = {}
151 valid_chars = frozenset("\\/-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
152 def encode_filename(filename):
153 try:
154 return encoded_names[filename]
155 except KeyError:
156 pass
157 orig_filename = filename
158 filename = filename.encode("ascii", "ignore")
159 filename = "".join(c for c in filename if c in valid_chars)
160 if filename in encoded_names:
161 a, b = os.path.splitext(filename)
162 a += "-dup"
163 filename = a + b
164 encoded_names[orig_filename] = filename
165 return filename
166
167 def strip_prefix(s, prefix):
168 assert s.startswith(prefix)
169 s = s[len(prefix):]
170 if s.startswith("/"):
171 s = s[1:]
172 return s
173
174 def mkdirhier(path):
175 if os.path.isdir(path):
176 return
177 paths = [path]
178 while path != "/":
179 path = os.path.split(path)[0]
180 paths.append(path)
181 for path in reversed(paths):
182 try:
183 os.mkdir(path)
184 except OSError:
185 pass
186
187 def export_m3u(dry_run, dest, path_prefix, playlist_name, files):
188 if dry_run:
189 return
190 dest = os.path.join(dest, "-Playlists-")
191 if not path_prefix:
192 try:
193 f = open(os.path.join(dest, ".path_prefix"))
194 path_prefix = f.read().strip()
195 except:
196 path_prefix = "../"
197 playlist_file = os.path.join(dest, playlist_name) + ".m3u"
198 playlist_file = encode_filename(playlist_file)
199 mkdirhier(os.path.dirname(playlist_file))
200 logging.info("Writing: " + playlist_file)
201 f = open(playlist_file, "w")
202 for filename in files:
203 if path_prefix.find("\\") > 0:
204 filename = filename.replace("/", "\\")
205 filename = encode_filename(filename)
206 f.write("%s%s\n" % (path_prefix, filename))
207 f.close()
208
209 def sync(dry_run, source, dest, files_to_copy):
210 join = os.path.join
211
212 logging.info("Calculating files to sync and deleting old files")
213 source = source.encode("utf-8")
214 dest = dest.encode("utf-8")
215 filemap = {}
216 class SyncFile(object): pass
217 for f in files_to_copy:
218 sf = SyncFile()
219 sf.orig_filename = f.encode("utf-8")
220 sf.encoded_filename = encode_filename(f)
221 filemap[sf.encoded_filename.lower()] = sf
222 files_to_copy = set(filemap)
223
224 for dirpath, dirnames, filenames in os.walk(dest):
225 full_dirpath = dirpath
226 dirpath = strip_prefix(dirpath, dest)
227
228 for filename in filenames:
229 filename = join(dirpath, filename)
230
231 # Whenever 'file' is deleted OSX will helpfully remove '._file'
232 if not os.path.exists(join(dest, filename)):
233 continue
234
235 if filename.lower() in files_to_copy:
236 source_filename = filemap[filename.lower()].orig_filename
237 sourcestat = os.stat(join(source, source_filename))
238 deststat = os.stat(join(dest, filename))
239 same_time = abs(sourcestat.st_mtime - deststat.st_mtime) < 5
240 same_size = sourcestat.st_size == deststat.st_size
241 if same_time and same_size:
242 files_to_copy.remove(filename.lower())
243 yield "Keep: " + filename
244 else:
245 yield "Update: " + filename
246
247 elif not filename.startswith("-Playlists-"):
248 yield "Delete: " + filename
249 if not dry_run:
250 os.unlink(join(dest, filename))
251
252 if len(os.listdir(full_dirpath)) == 0:
253 yield "Delete: " + dirpath
254 if not dry_run:
255 os.rmdir(full_dirpath)
256
257
258 logging.info("Copying new files")
259 files_to_copy = list(files_to_copy)
260 files_to_copy.sort()
261 for filename in files_to_copy:
262 yield "Copy: " + filemap[filename].orig_filename
263 if not dry_run:
264 source_file = join(source, filemap[filename].orig_filename)
265 dest_file = join(dest, filemap[filename].encoded_filename)
266 mkdirhier(os.path.dirname(dest_file))
267 shutil.copy2(source_file, dest_file)
268
269