]> code.delx.au - notipod/blob - libnotipod.py
Version 1.7
[notipod] / libnotipod.py
1 #!/usr/bin/env python
2 # Copyright 2009 James Bunton <jamesbunton@fastmail.fm>
3 # Licensed for distribution under the GPL version 2, check COPYING for details
4
5 import logging
6 import os
7 import shutil
8 import sys
9 import urllib
10
11 from Foundation import *
12
13
14 def read_plist(filename):
15 try:
16 data = buffer(open(filename).read())
17 except IOError:
18 return None
19 plist, fmt, err = NSPropertyListSerialization.propertyListFromData_mutabilityOption_format_errorDescription_(data, NSPropertyListMutableContainers, None, None)
20 if err is not None:
21 errStr = err.encode("utf-8")
22 err.release() # Doesn't follow Cocoa conventions for some reason
23 raise TypeError(errStr)
24 return plist
25
26 class Playlist(NSObject):
27 def init(self):
28 return self
29
30 def set(self, name, pid, ptype, tracks, parent):
31 self.name = name
32 self.pid = pid
33 self.ptype = ptype
34 self.children = []
35 self.tracks = tracks
36 self.parent = parent
37 if parent is not None:
38 parent.children.append(self)
39
40 class ITunesLibrary(NSObject):
41 def load_(self, filename):
42 if filename is None:
43 filename = "~/Music/iTunes/iTunes Music Library.xml"
44 filename = os.path.expanduser(filename)
45 yield "Reading library..."
46 plist = read_plist(os.path.expanduser(filename))
47 if plist is None:
48 raise Exception("Could not find music library: " + filename)
49 self.folder = self.loc2name(plist["Music Folder"])
50 pl_tracks = plist["Tracks"]
51 pl_lookup = {}
52 self.playlists = []
53 for pl_playlist in plist["Playlists"]:
54 playlist = self.make_playlist(pl_playlist, pl_tracks, pl_lookup)
55 if not playlist:
56 continue
57 yield "Read playlist: " + playlist.name
58 self.playlists.append(playlist)
59 pl_lookup[playlist.pid] = playlist
60
61 def loc2name(self, location):
62 return urllib.splithost(urllib.splittype(urllib.unquote(location))[1])[1]
63
64 def make_playlist(self, pl_playlist, pl_tracks, pl_lookup):
65 if int(pl_playlist.get("Master", 0)):
66 return
67 kind = int(pl_playlist.get("Distinguished Kind", -1))
68 if kind == 26:
69 # Don't do genius
70 return
71
72 name = pl_playlist["Name"]
73 pid = pl_playlist["Playlist Persistent ID"]
74 if kind > 0:
75 ptype = {
76 2: "movies",
77 3: "tv-shows",
78 4: "music",
79 5: "books",
80 10: "podcasts",
81 19: "purchased",
82 22: "itunes-dj",
83 31: "itunes-u",
84 }.get(kind, "playlist")
85 elif pl_playlist.has_key("Smart Info"):
86 ptype = "smart-playlist"
87 elif int(pl_playlist.get("Folder", 0)):
88 ptype = "folder"
89 else:
90 ptype = "playlist"
91
92 parent = None
93 try:
94 parent_pid = pl_playlist["Parent Persistent ID"]
95 parent = pl_lookup[parent_pid]
96 except KeyError:
97 pass
98 tracks = []
99 for item in pl_playlist.get("Playlist Items", []):
100 trackID = item["Track ID"]
101 filename = str(pl_tracks[str(trackID)]["Location"])
102 filename = self.loc2name(filename)
103 filename = filename.decode("utf-8")
104 if not filename.startswith(self.folder):
105 logging.warn("Skipping: " + filename)
106 continue
107 filename = strip_prefix(filename, self.folder)
108 tracks.append(filename)
109 playlist = Playlist.alloc().init()
110 playlist.set(name, pid, ptype, tracks, parent)
111 return playlist
112
113 def has_playlist_name(self, name):
114 for p in self.get_playlists():
115 if p.name == name:
116 return True
117 return False
118
119 def get_playlist_name(self, name):
120 for playlist in self.get_playlists():
121 if playlist.name == name:
122 return playlist
123
124 def get_playlist_pid(self, pid):
125 for playlist in self.get_playlists():
126 if playlist.pid == pid:
127 return playlist
128
129 def get_playlists(self):
130 return self.playlists
131
132
133 encoded_names = {}
134 valid_chars = frozenset("\\/-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
135 def encode_filename(filename):
136 try:
137 return encoded_names[filename]
138 except KeyError:
139 pass
140 orig_filename = filename
141 filename = filename.encode("ascii", "ignore")
142 filename = "".join(c for c in filename if c in valid_chars)
143 if filename in encoded_names:
144 a, b = os.path.splitext(filename)
145 a += "-dup"
146 filename = a + b
147 encoded_names[orig_filename] = filename
148 return filename
149
150 def strip_prefix(s, prefix):
151 assert s.startswith(prefix)
152 s = s[len(prefix):]
153 if s.startswith("/"):
154 s = s[1:]
155 return s
156
157 def mkdirhier(path):
158 if os.path.isdir(path):
159 return
160 paths = [path]
161 while path != "/":
162 path = os.path.split(path)[0]
163 paths.append(path)
164 for path in reversed(paths):
165 try:
166 os.mkdir(path)
167 except OSError:
168 pass
169
170 def export_m3u(dry_run, dest, path_prefix, playlist_name, files):
171 if dry_run:
172 return
173 if not path_prefix:
174 path_prefix = "../"
175 playlist_file = os.path.join(dest, "-Playlists-", playlist_name) + ".m3u"
176 playlist_file = encode_filename(playlist_file)
177 mkdirhier(os.path.dirname(playlist_file))
178 logging.info("Writing: " + playlist_file)
179 f = open(playlist_file, "w")
180 for filename in files:
181 if path_prefix.find("\\") > 0:
182 filename = filename.replace("/", "\\")
183 filename = encode_filename(filename)
184 f.write("%s%s\n" % (path_prefix, filename))
185 f.close()
186
187 def sync(dry_run, source, dest, files_to_copy):
188 join = os.path.join
189
190 logging.info("Calculating files to sync and deleting old files")
191 source = source.encode("utf-8")
192 dest = dest.encode("utf-8")
193 filemap = {}
194 class SyncFile(object): pass
195 for f in files_to_copy:
196 sf = SyncFile()
197 sf.orig_filename = f.encode("utf-8")
198 sf.encoded_filename = encode_filename(f)
199 filemap[sf.encoded_filename.lower()] = sf
200 files_to_copy = set(filemap)
201
202 for dirpath, dirnames, filenames in os.walk(dest):
203 full_dirpath = dirpath
204 dirpath = strip_prefix(dirpath, dest)
205
206 for filename in filenames:
207 filename = join(dirpath, filename)
208
209 # Whenever 'file' is deleted OSX will helpfully remove '._file'
210 if not os.path.exists(join(dest, filename)):
211 continue
212
213 if filename.lower() in files_to_copy:
214 source_filename = filemap[filename.lower()].orig_filename
215 sourcestat = os.stat(join(source, source_filename))
216 deststat = os.stat(join(dest, filename))
217 same_time = abs(sourcestat.st_mtime - deststat.st_mtime) < 5
218 same_size = sourcestat.st_size == deststat.st_size
219 if same_time and same_size:
220 files_to_copy.remove(filename.lower())
221 yield "Keep: " + filename
222 else:
223 yield "Update: " + filename
224
225 elif not filename.startswith("-Playlists-"):
226 yield "Delete: " + filename
227 if not dry_run:
228 os.unlink(join(dest, filename))
229
230 if len(os.listdir(full_dirpath)) == 0:
231 yield "Delete: " + dirpath
232 if not dry_run:
233 os.rmdir(full_dirpath)
234
235
236 logging.info("Copying new files")
237 files_to_copy = list(files_to_copy)
238 files_to_copy.sort()
239 for filename in files_to_copy:
240 yield "Copy: " + filemap[filename].orig_filename
241 if not dry_run:
242 source_file = join(source, filemap[filename].orig_filename)
243 dest_file = join(dest, filemap[filename].encoded_filename)
244 mkdirhier(os.path.dirname(dest_file))
245 shutil.copy2(source_file, dest_file)
246
247