]> code.delx.au - notipod/blob - libsyncitunes.py
Unicode fixes, use only safe characters
[notipod] / libsyncitunes.py
1 #!/usr/bin/env python
2 # Copyright 2009 James Bunton <jamesbunton@fastmail.fm>
3 # Licensed for distribution under the GPL version 2, check COPYING for details
4
5 import logging
6 import os
7 import shutil
8 import sys
9 import urllib
10
11 from Foundation import *
12
13
14 def read_plist(filename):
15 try:
16 data = buffer(open(filename).read())
17 except IOError:
18 return None
19 plist, fmt, err = NSPropertyListSerialization.propertyListFromData_mutabilityOption_format_errorDescription_(data, NSPropertyListMutableContainers, None, None)
20 if err is not None:
21 errStr = err.encode("utf-8")
22 err.release() # Doesn't follow Cocoa conventions for some reason
23 raise TypeError(errStr)
24 return plist
25
26 class Playlist(NSObject):
27 def init(self):
28 return self
29
30 def set(self, name, pid, tracks, parent):
31 self.name = name
32 self.pid = pid
33 self.children = []
34 self.tracks = tracks
35 self.parent = parent
36 if parent is not None:
37 parent.children.append(self)
38
39 class ITunesLibrary(NSObject):
40 def init(self):
41 return self.initWithFilename_("~/Music/iTunes/iTunes Music Library.xml")
42
43 def initWithFilename_(self, filename):
44 filename = os.path.expanduser(filename)
45 plist = read_plist(os.path.expanduser(filename))
46 self.folder = self.loc2name(plist["Music Folder"])
47 pl_tracks = plist["Tracks"]
48 self.playlists = {}
49 for pl_playlist in plist["Playlists"]:
50 playlist = self.make_playlist(pl_playlist, pl_tracks)
51 self.playlists[playlist.pid] = playlist
52 return self
53
54 def loc2name(self, location):
55 return urllib.splithost(urllib.splittype(urllib.unquote(location))[1])[1]
56
57 def make_playlist(self, pl_playlist, pl_tracks):
58 name = pl_playlist["Name"]
59 pid = pl_playlist["Playlist Persistent ID"]
60 parent = None
61 try:
62 parent_pid = pl_playlist["Parent Persistent ID"]
63 parent = self.playlists.get(parent_pid)
64 except KeyError:
65 pass
66 tracks = []
67 for item in pl_playlist.get("Playlist Items", []):
68 trackID = item["Track ID"]
69 filename = str(pl_tracks[str(trackID)]["Location"])
70 filename = self.loc2name(filename)
71 filename = filename.decode("utf-8")
72 ### filename = eval(repr(filename).lstrip("u")).decode("utf-8")
73 if not filename.startswith(self.folder):
74 logging.warn("Skipping: " + filename)
75 continue
76 filename = strip_prefix(filename, self.folder)
77 tracks.append(filename)
78 playlist = Playlist.alloc().init()
79 playlist.set(name, pid, tracks, parent)
80 return playlist
81
82 def has_playlist_name(self, name):
83 for p in self.get_playlists():
84 if p.name == name:
85 return True
86 return False
87
88 def get_playlist_name(self, name):
89 for playlist in self.get_playlists():
90 if playlist.name == name:
91 return playlist
92
93 def get_playlist_pid(self, pid):
94 for playlist in self.get_playlists():
95 if playlist.pid == pid:
96 return playlist
97
98 def get_playlists(self):
99 return self.playlists.values()
100
101 def outlineView_numberOfChildrenOfItem_(self, view, item):
102 if item == None:
103 return len(self.playlists)
104 else:
105 return 0
106
107 def outlineView_isItemExpandable_(self, view, item):
108 return False
109
110 def outlineView_child_ofItem_(self, view, index, item):
111 if item == None:
112 return self.playlists[index]
113 else:
114 return None
115
116 def outlineView_objectValueForTableColumn_byItem_(self, view, column, item):
117 return item.name
118
119
120 encoded_names = {}
121 valid_chars = frozenset("\\/-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
122 def encode_filename(filename):
123 try:
124 return encoded_names[filename]
125 except KeyError:
126 pass
127 orig_filename = filename
128 filename = filename.encode("ascii", "ignore")
129 filename = "".join(c for c in filename if c in valid_chars)
130 if filename in encoded_names:
131 a, b = os.path.splitext(filename)
132 a += "-dup"
133 filename = a + b
134 encoded_names[orig_filename] = filename
135 return filename
136
137 def export_m3u(dry_run, dest, path_prefix, playlist_name, files):
138 if dry_run:
139 return
140 playlist_file = os.path.join(dest, playlist_name) + ".m3u"
141 logging.info("Writing: " + playlist_file)
142 f = open(playlist_file, "w")
143 for filename in files:
144 if path_prefix.find("\\") > 0:
145 filename = filename.replace("/", "\\")
146 filename = encode_filename(filename)
147 f.write("%s%s\n" % (path_prefix, filename))
148 f.close()
149
150 def strip_prefix(s, prefix):
151 assert s.startswith(prefix)
152 s = s[len(prefix):]
153 if s.startswith("/"):
154 s = s[1:]
155 return s
156
157 def mkdirhier(path):
158 if os.path.isdir(path):
159 return
160 paths = [path]
161 while path != "/":
162 path = os.path.split(path)[0]
163 paths.append(path)
164 for path in reversed(paths):
165 try:
166 os.mkdir(path)
167 except OSError:
168 pass
169
170 def sync(dry_run, source, dest, files):
171 join = os.path.join
172
173 logging.info("Calculating files to sync and deleting old files")
174 source = source.encode("utf-8")
175 dest = dest.encode("utf-8")
176 filemap = {}
177 for f in files:
178 filemap[encode_filename(f)] = f.encode("utf-8")
179 files = set(filemap.keys())
180 for dirpath, dirnames, filenames in os.walk(dest):
181 full_dirpath = dirpath
182 dirpath = strip_prefix(dirpath, dest)
183
184 for filename in filenames:
185 filename = join(dirpath, filename)
186
187 # Whenever 'file' is deleted OSX will helpfully remove '._file'
188 if not os.path.exists(join(dest, filename)):
189 continue
190
191 if filename in files:
192 sourcestat = os.stat(join(source, filename))
193 deststat = os.stat(join(dest, filename))
194 same_time = abs(sourcestat.st_mtime - deststat.st_mtime) < 5
195 same_size = sourcestat.st_size == deststat.st_size
196 if same_time and same_size:
197 files.remove(filename)
198 logging.debug("keep: " + filename)
199 else:
200 logging.debug("update: " + filename)
201
202 elif not filename.endswith(".m3u"):
203 logging.debug("delete: " + filename)
204 if not dry_run:
205 os.unlink(join(dest, filename))
206
207 if len(os.listdir(full_dirpath)) == 0:
208 logging.debug("rmdir: " + dirpath)
209 if not dry_run:
210 os.rmdir(full_dirpath)
211
212
213 logging.info("Copying new files")
214 files = list(files)
215 files.sort()
216 for filename in files:
217 logging.debug("copy: " + filename)
218 if not dry_run:
219 mkdirhier(os.path.dirname(join(dest, filename)))
220 shutil.copy2(join(source, filemap[filename]), join(dest, filename))
221
222