]> code.delx.au - offlineimap/blob - offlineimap/threadutil.py
Eliminate one more fsync
[offlineimap] / offlineimap / threadutil.py
1 # Copyright (C) 2002, 2003 John Goerzen
2 # Thread support module
3 # <jgoerzen@complete.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 from threading import *
20 from StringIO import StringIO
21 from Queue import Queue
22 import sys, traceback, thread
23 from offlineimap.ui import UIBase # for getglobalui()
24
25 profiledir = None
26
27 def setprofiledir(newdir):
28 global profiledir
29 profiledir = newdir
30
31 ######################################################################
32 # General utilities
33 ######################################################################
34
35 def semaphorereset(semaphore, originalstate):
36 """Wait until the semaphore gets back to its original state -- all acquired
37 resources released."""
38 for i in range(originalstate):
39 semaphore.acquire()
40 # Now release these.
41 for i in range(originalstate):
42 semaphore.release()
43
44 def semaphorewait(semaphore):
45 semaphore.acquire()
46 semaphore.release()
47
48 def threadsreset(threadlist):
49 for thr in threadlist:
50 thr.join()
51
52 class threadlist:
53 def __init__(self):
54 self.lock = Lock()
55 self.list = []
56
57 def add(self, thread):
58 self.lock.acquire()
59 try:
60 self.list.append(thread)
61 finally:
62 self.lock.release()
63
64 def remove(self, thread):
65 self.lock.acquire()
66 try:
67 self.list.remove(thread)
68 finally:
69 self.lock.release()
70
71 def pop(self):
72 self.lock.acquire()
73 try:
74 if not len(self.list):
75 return None
76 return self.list.pop()
77 finally:
78 self.lock.release()
79
80 def reset(self):
81 while 1:
82 thread = self.pop()
83 if not thread:
84 return
85 thread.join()
86
87
88 ######################################################################
89 # Exit-notify threads
90 ######################################################################
91
92 exitthreads = Queue(5)
93 inited = 0
94
95 def initexitnotify():
96 """Initialize the exit notify system. This MUST be called from the
97 SAME THREAD that will call monitorloop BEFORE it calls monitorloop.
98 This SHOULD be called before the main thread starts any other
99 ExitNotifyThreads, or else it may miss the ability to catch the exit
100 status from them!"""
101 pass
102
103 def exitnotifymonitorloop(callback):
104 """Enter an infinite "monitoring" loop. The argument, callback,
105 defines the function to call when an ExitNotifyThread has terminated.
106 That function is called with a single argument -- the ExitNotifyThread
107 that has terminated. The monitor will not continue to monitor for
108 other threads until the function returns, so if it intends to perform
109 long calculations, it should start a new thread itself -- but NOT
110 an ExitNotifyThread, or else an infinite loop may result. Furthermore,
111 the monitor will hold the lock all the while the other thread is waiting.
112 """
113 global exitthreads
114 while 1: # Loop forever.
115 callback(exitthreads.get(True))
116 exitthreads.task_done()
117
118 def threadexited(thread):
119 """Called when a thread exits."""
120 ui = UIBase.getglobalui()
121 if thread.getExitCause() == 'EXCEPTION':
122 if isinstance(thread.getExitException(), SystemExit):
123 # Bring a SystemExit into the main thread.
124 # Do not send it back to UI layer right now.
125 # Maybe later send it to ui.terminate?
126 raise SystemExit
127 ui.threadException(thread) # Expected to terminate
128 sys.exit(100) # Just in case...
129 os._exit(100)
130 elif thread.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
131 ui.terminate()
132 # Just in case...
133 sys.exit(100)
134 os._exit(100)
135 else:
136 ui.threadExited(thread)
137
138 class ExitNotifyThread(Thread):
139 """This class is designed to alert a "monitor" to the fact that a thread has
140 exited and to provide for the ability for it to find out why."""
141 def run(self):
142 global exitthreads, profiledir
143 self.threadid = thread.get_ident()
144 try:
145 if not profiledir: # normal case
146 Thread.run(self)
147 else:
148 import profile
149 prof = profile.Profile()
150 try:
151 prof = prof.runctx("Thread.run(self)", globals(), locals())
152 except SystemExit:
153 pass
154 prof.dump_stats( \
155 profiledir + "/" + str(self.threadid) + "_" + \
156 self.getName() + ".prof")
157 except:
158 self.setExitCause('EXCEPTION')
159 self.setExitException(sys.exc_info()[1])
160 sbuf = StringIO()
161 traceback.print_exc(file = sbuf)
162 self.setExitStackTrace(sbuf.getvalue())
163 else:
164 self.setExitCause('NORMAL')
165 if not hasattr(self, 'exitmessage'):
166 self.setExitMessage(None)
167
168 exitthreads.put(self, True)
169
170 def setExitCause(self, cause):
171 self.exitcause = cause
172 def getExitCause(self):
173 """Returns the cause of the exit, one of:
174 'EXCEPTION' -- the thread aborted because of an exception
175 'NORMAL' -- normal termination."""
176 return self.exitcause
177 def setExitException(self, exc):
178 self.exitexception = exc
179 def getExitException(self):
180 """If getExitCause() is 'EXCEPTION', holds the value from
181 sys.exc_info()[1] for this exception."""
182 return self.exitexception
183 def setExitStackTrace(self, st):
184 self.exitstacktrace = st
185 def getExitStackTrace(self):
186 """If getExitCause() is 'EXCEPTION', returns a string representing
187 the stack trace for this exception."""
188 return self.exitstacktrace
189 def setExitMessage(self, msg):
190 """Sets the exit message to be fetched by a subsequent call to
191 getExitMessage. This message may be any object or type except
192 None."""
193 self.exitmessage = msg
194 def getExitMessage(self):
195 """For any exit cause, returns the message previously set by
196 a call to setExitMessage(), or None if there was no such message
197 set."""
198 return self.exitmessage
199
200
201 ######################################################################
202 # Instance-limited threads
203 ######################################################################
204
205 instancelimitedsems = {}
206 instancelimitedlock = Lock()
207
208 def initInstanceLimit(instancename, instancemax):
209 """Initialize the instance-limited thread implementation to permit
210 up to intancemax threads with the given instancename."""
211 instancelimitedlock.acquire()
212 if not instancelimitedsems.has_key(instancename):
213 instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
214 instancelimitedlock.release()
215
216 class InstanceLimitedThread(ExitNotifyThread):
217 def __init__(self, instancename, *args, **kwargs):
218 self.instancename = instancename
219
220 apply(ExitNotifyThread.__init__, (self,) + args, kwargs)
221
222 def start(self):
223 instancelimitedsems[self.instancename].acquire()
224 ExitNotifyThread.start(self)
225
226 def run(self):
227 try:
228 ExitNotifyThread.run(self)
229 finally:
230 instancelimitedsems[self.instancename].release()
231
232
233 ######################################################################
234 # Multi-lock -- capable of handling a single thread requesting a lock
235 # multiple times
236 ######################################################################
237
238 class MultiLock:
239 def __init__(self):
240 self.lock = Lock()
241 self.statuslock = Lock()
242 self.locksheld = {}
243
244 def acquire(self):
245 """Obtain a lock. Provides nice support for a single
246 thread trying to lock it several times -- as may be the case
247 if one I/O-using object calls others, while wanting to make it all
248 an atomic operation. Keeps a "lock request count" for the current
249 thread, and acquires the lock when it goes above zero, releases when
250 it goes below one.
251
252 This call is always blocking."""
253
254 # First, check to see if this thread already has a lock.
255 # If so, increment the lock count and just return.
256 self.statuslock.acquire()
257 try:
258 threadid = thread.get_ident()
259
260 if threadid in self.locksheld:
261 self.locksheld[threadid] += 1
262 return
263 else:
264 # This is safe because it is a per-thread structure
265 self.locksheld[threadid] = 1
266 finally:
267 self.statuslock.release()
268 self.lock.acquire()
269
270 def release(self):
271 self.statuslock.acquire()
272 try:
273 threadid = thread.get_ident()
274 if self.locksheld[threadid] > 1:
275 self.locksheld[threadid] -= 1
276 return
277 else:
278 del self.locksheld[threadid]
279 self.lock.release()
280 finally:
281 self.statuslock.release()
282
283