]> code.delx.au - offlineimap/blob - offlineimap/repository/Maildir.py
Update changelog
[offlineimap] / offlineimap / repository / Maildir.py
1 # Maildir repository support
2 # Copyright (C) 2002 John Goerzen
3 # <jgoerzen@complete.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 from Base import BaseRepository
20 from offlineimap import folder, imaputil
21 from offlineimap.ui import UIBase
22 from mailbox import Maildir
23 import os
24 from stat import *
25
26 class MaildirRepository(BaseRepository):
27 def __init__(self, reposname, account):
28 """Initialize a MaildirRepository object. Takes a path name
29 to the directory holding all the Maildir directories."""
30 BaseRepository.__init__(self, reposname, account)
31
32 self.root = self.getlocalroot()
33 self.folders = None
34 self.ui = UIBase.getglobalui()
35 self.debug("MaildirRepository initialized, sep is " + repr(self.getsep()))
36 self.folder_atimes = []
37
38 # Create the top-level folder if it doesn't exist
39 if not os.path.isdir(self.root):
40 os.mkdir(self.root, 0700)
41
42 def _append_folder_atimes(self, foldername):
43 p = os.path.join(self.root, foldername)
44 new = os.path.join(p, 'new')
45 cur = os.path.join(p, 'cur')
46 f = p, os.stat(new)[ST_ATIME], os.stat(cur)[ST_ATIME]
47 self.folder_atimes.append(f)
48
49 def restore_folder_atimes(self):
50 if not self.folder_atimes:
51 return
52
53 for f in self.folder_atimes:
54 t = f[1], os.stat(os.path.join(f[0], 'new'))[ST_MTIME]
55 os.utime(os.path.join(f[0], 'new'), t)
56 t = f[2], os.stat(os.path.join(f[0], 'cur'))[ST_MTIME]
57 os.utime(os.path.join(f[0], 'cur'), t)
58
59 def getlocalroot(self):
60 return os.path.expanduser(self.getconf('localfolders'))
61
62 def debug(self, msg):
63 self.ui.debug('maildir', msg)
64
65 def getsep(self):
66 return self.getconf('sep', '.').strip()
67
68 def makefolder(self, foldername):
69 self.debug("makefolder called with arg " + repr(foldername))
70 # Do the chdir thing so the call to makedirs does not make the
71 # self.root directory (we'd prefer to raise an error in that case),
72 # but will make the (relative) paths underneath it. Need to use
73 # makedirs to support a / separator.
74 self.debug("Is dir? " + repr(os.path.isdir(foldername)))
75 if self.getsep() == '/':
76 for invalid in ['new', 'cur', 'tmp', 'offlineimap.uidvalidity']:
77 for component in foldername.split('/'):
78 assert component != invalid, "When using nested folders (/ as a separator in the account config), your folder names may not contain 'new', 'cur', 'tmp', or 'offlineimap.uidvalidity'."
79
80 assert foldername.find('./') == -1, "Folder names may not contain ../"
81 assert not foldername.startswith('/'), "Folder names may not begin with /"
82
83 oldcwd = os.getcwd()
84 os.chdir(self.root)
85
86 # If we're using hierarchical folders, it's possible that sub-folders
87 # may be created before higher-up ones. If this is the case,
88 # makedirs will fail because the higher-up dir already exists.
89 # So, check to see if this is indeed the case.
90
91 if (self.getsep() == '/' or self.getconfboolean('existsok', 0) or foldername == '.') \
92 and os.path.isdir(foldername):
93 self.debug("makefolder: %s already is a directory" % foldername)
94 # Already exists. Sanity-check that it's not a Maildir.
95 for subdir in ['cur', 'new', 'tmp']:
96 assert not os.path.isdir(os.path.join(foldername, subdir)), \
97 "Tried to create folder %s but it already had dir %s" %\
98 (foldername, subdir)
99 else:
100 self.debug("makefolder: calling makedirs %s" % foldername)
101 os.makedirs(foldername, 0700)
102 self.debug("makefolder: creating cur, new, tmp")
103 for subdir in ['cur', 'new', 'tmp']:
104 os.mkdir(os.path.join(foldername, subdir), 0700)
105 # Invalidate the cache
106 self.folders = None
107 os.chdir(oldcwd)
108
109 def deletefolder(self, foldername):
110 self.ui.warn("NOT YET IMPLEMENTED: DELETE FOLDER %s" % foldername)
111
112 def getfolder(self, foldername):
113 if self.config.has_option('Repository ' + self.name, 'restoreatime') and self.config.getboolean('Repository ' + self.name, 'restoreatime'):
114 self._append_folder_atimes(foldername)
115 return folder.Maildir.MaildirFolder(self.root, foldername,
116 self.getsep(), self,
117 self.accountname, self.config)
118
119 def _getfolders_scandir(self, root, extension = None):
120 self.debug("_GETFOLDERS_SCANDIR STARTING. root = %s, extension = %s" \
121 % (root, extension))
122 # extension willl only be non-None when called recursively when
123 # getsep() returns '/'.
124 retval = []
125
126 # Configure the full path to this repository -- "toppath"
127
128 if extension == None:
129 toppath = root
130 else:
131 toppath = os.path.join(root, extension)
132
133 self.debug(" toppath = %s" % toppath)
134
135 # Iterate over directories in top.
136 for dirname in os.listdir(toppath) + ['.']:
137 self.debug(" *** top of loop")
138 self.debug(" dirname = %s" % dirname)
139 if dirname in ['cur', 'new', 'tmp', 'offlineimap.uidvalidity']:
140 self.debug(" skipping this dir (Maildir special)")
141 # Bypass special files.
142 continue
143 fullname = os.path.join(toppath, dirname)
144 self.debug(" fullname = %s" % fullname)
145 if not os.path.isdir(fullname):
146 self.debug(" skipping this entry (not a directory)")
147 # Not a directory -- not a folder.
148 continue
149 foldername = dirname
150 if extension != None:
151 foldername = os.path.join(extension, dirname)
152 if (os.path.isdir(os.path.join(fullname, 'cur')) and
153 os.path.isdir(os.path.join(fullname, 'new')) and
154 os.path.isdir(os.path.join(fullname, 'tmp'))):
155 # This directory has maildir stuff -- process
156 self.debug(" This is a maildir folder.")
157
158 self.debug(" foldername = %s" % foldername)
159
160 if self.config.has_option('Repository ' + self.name, 'restoreatime') and self.config.getboolean('Repository ' + self.name, 'restoreatime'):
161 self._append_folder_atimes(foldername)
162 retval.append(folder.Maildir.MaildirFolder(self.root, foldername,
163 self.getsep(), self, self.accountname,
164 self.config))
165 if self.getsep() == '/' and dirname != '.':
166 # Check sub-directories for folders.
167 retval.extend(self._getfolders_scandir(root, foldername))
168 self.debug("_GETFOLDERS_SCANDIR RETURNING %s" % \
169 repr([x.getname() for x in retval]))
170 return retval
171
172 def getfolders(self):
173 if self.folders == None:
174 self.folders = self._getfolders_scandir(self.root)
175 return self.folders
176