]> code.delx.au - offlineimap/blob - offlineimap/threadutil.py
Yet another Python threading workaround.
[offlineimap] / offlineimap / threadutil.py
1 # Copyright (C) 2002, 2003 John Goerzen
2 # Thread support module
3 # <jgoerzen@complete.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 from threading import *
20 from StringIO import StringIO
21 from Queue import Queue, Empty
22 import sys, traceback, thread, time
23 from offlineimap.ui import UIBase # for getglobalui()
24
25 profiledir = None
26
27 def setprofiledir(newdir):
28 global profiledir
29 profiledir = newdir
30
31 ######################################################################
32 # General utilities
33 ######################################################################
34
35 def semaphorereset(semaphore, originalstate):
36 """Wait until the semaphore gets back to its original state -- all acquired
37 resources released."""
38 for i in range(originalstate):
39 semaphore.acquire()
40 # Now release these.
41 for i in range(originalstate):
42 semaphore.release()
43
44 def semaphorewait(semaphore):
45 semaphore.acquire()
46 semaphore.release()
47
48 def threadsreset(threadlist):
49 for thr in threadlist:
50 thr.join()
51
52 class threadlist:
53 def __init__(self):
54 self.lock = Lock()
55 self.list = []
56
57 def add(self, thread):
58 self.lock.acquire()
59 try:
60 self.list.append(thread)
61 finally:
62 self.lock.release()
63
64 def remove(self, thread):
65 self.lock.acquire()
66 try:
67 self.list.remove(thread)
68 finally:
69 self.lock.release()
70
71 def pop(self):
72 self.lock.acquire()
73 try:
74 if not len(self.list):
75 return None
76 return self.list.pop()
77 finally:
78 self.lock.release()
79
80 def reset(self):
81 while 1:
82 thread = self.pop()
83 if not thread:
84 return
85 thread.join()
86
87
88 ######################################################################
89 # Exit-notify threads
90 ######################################################################
91
92 exitthreads = Queue(100)
93 inited = 0
94
95 def initexitnotify():
96 """Initialize the exit notify system. This MUST be called from the
97 SAME THREAD that will call monitorloop BEFORE it calls monitorloop.
98 This SHOULD be called before the main thread starts any other
99 ExitNotifyThreads, or else it may miss the ability to catch the exit
100 status from them!"""
101 pass
102
103 def exitnotifymonitorloop(callback):
104 """Enter an infinite "monitoring" loop. The argument, callback,
105 defines the function to call when an ExitNotifyThread has terminated.
106 That function is called with a single argument -- the ExitNotifyThread
107 that has terminated. The monitor will not continue to monitor for
108 other threads until the function returns, so if it intends to perform
109 long calculations, it should start a new thread itself -- but NOT
110 an ExitNotifyThread, or else an infinite loop may result. Furthermore,
111 the monitor will hold the lock all the while the other thread is waiting.
112 """
113 global exitthreads
114 while 1: # Loop forever.
115 try:
116 thrd = exitthreads.get(False)
117 print "exitnotifymonitorloop: Got thread\n"
118 callback(thrd)
119 print "exitnotifymonitorloop: callback done\n"
120 exitthreads.task_done()
121 except Empty:
122 time.sleep(1)
123
124 def threadexited(thread):
125 """Called when a thread exits."""
126 ui = UIBase.getglobalui()
127 if thread.getExitCause() == 'EXCEPTION':
128 if isinstance(thread.getExitException(), SystemExit):
129 # Bring a SystemExit into the main thread.
130 # Do not send it back to UI layer right now.
131 # Maybe later send it to ui.terminate?
132 raise SystemExit
133 ui.threadException(thread) # Expected to terminate
134 sys.exit(100) # Just in case...
135 os._exit(100)
136 elif thread.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
137 ui.terminate()
138 # Just in case...
139 sys.exit(100)
140 os._exit(100)
141 else:
142 ui.threadExited(thread)
143
144 class ExitNotifyThread(Thread):
145 """This class is designed to alert a "monitor" to the fact that a thread has
146 exited and to provide for the ability for it to find out why."""
147 def run(self):
148 global exitthreads, profiledir
149 self.threadid = thread.get_ident()
150 try:
151 if not profiledir: # normal case
152 Thread.run(self)
153 else:
154 import profile
155 prof = profile.Profile()
156 try:
157 prof = prof.runctx("Thread.run(self)", globals(), locals())
158 except SystemExit:
159 pass
160 prof.dump_stats( \
161 profiledir + "/" + str(self.threadid) + "_" + \
162 self.getName() + ".prof")
163 except:
164 self.setExitCause('EXCEPTION')
165 self.setExitException(sys.exc_info()[1])
166 sbuf = StringIO()
167 traceback.print_exc(file = sbuf)
168 self.setExitStackTrace(sbuf.getvalue())
169 else:
170 self.setExitCause('NORMAL')
171 if not hasattr(self, 'exitmessage'):
172 self.setExitMessage(None)
173
174 exitthreads.put(self, True)
175
176 def setExitCause(self, cause):
177 self.exitcause = cause
178 def getExitCause(self):
179 """Returns the cause of the exit, one of:
180 'EXCEPTION' -- the thread aborted because of an exception
181 'NORMAL' -- normal termination."""
182 return self.exitcause
183 def setExitException(self, exc):
184 self.exitexception = exc
185 def getExitException(self):
186 """If getExitCause() is 'EXCEPTION', holds the value from
187 sys.exc_info()[1] for this exception."""
188 return self.exitexception
189 def setExitStackTrace(self, st):
190 self.exitstacktrace = st
191 def getExitStackTrace(self):
192 """If getExitCause() is 'EXCEPTION', returns a string representing
193 the stack trace for this exception."""
194 return self.exitstacktrace
195 def setExitMessage(self, msg):
196 """Sets the exit message to be fetched by a subsequent call to
197 getExitMessage. This message may be any object or type except
198 None."""
199 self.exitmessage = msg
200 def getExitMessage(self):
201 """For any exit cause, returns the message previously set by
202 a call to setExitMessage(), or None if there was no such message
203 set."""
204 return self.exitmessage
205
206
207 ######################################################################
208 # Instance-limited threads
209 ######################################################################
210
211 instancelimitedsems = {}
212 instancelimitedlock = Lock()
213
214 def initInstanceLimit(instancename, instancemax):
215 """Initialize the instance-limited thread implementation to permit
216 up to intancemax threads with the given instancename."""
217 instancelimitedlock.acquire()
218 if not instancelimitedsems.has_key(instancename):
219 instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
220 instancelimitedlock.release()
221
222 class InstanceLimitedThread(ExitNotifyThread):
223 def __init__(self, instancename, *args, **kwargs):
224 self.instancename = instancename
225
226 apply(ExitNotifyThread.__init__, (self,) + args, kwargs)
227
228 def start(self):
229 instancelimitedsems[self.instancename].acquire()
230 ExitNotifyThread.start(self)
231
232 def run(self):
233 try:
234 ExitNotifyThread.run(self)
235 finally:
236 instancelimitedsems[self.instancename].release()
237
238
239 ######################################################################
240 # Multi-lock -- capable of handling a single thread requesting a lock
241 # multiple times
242 ######################################################################
243
244 class MultiLock:
245 def __init__(self):
246 self.lock = Lock()
247 self.statuslock = Lock()
248 self.locksheld = {}
249
250 def acquire(self):
251 """Obtain a lock. Provides nice support for a single
252 thread trying to lock it several times -- as may be the case
253 if one I/O-using object calls others, while wanting to make it all
254 an atomic operation. Keeps a "lock request count" for the current
255 thread, and acquires the lock when it goes above zero, releases when
256 it goes below one.
257
258 This call is always blocking."""
259
260 # First, check to see if this thread already has a lock.
261 # If so, increment the lock count and just return.
262 self.statuslock.acquire()
263 try:
264 threadid = thread.get_ident()
265
266 if threadid in self.locksheld:
267 self.locksheld[threadid] += 1
268 return
269 else:
270 # This is safe because it is a per-thread structure
271 self.locksheld[threadid] = 1
272 finally:
273 self.statuslock.release()
274 self.lock.acquire()
275
276 def release(self):
277 self.statuslock.acquire()
278 try:
279 threadid = thread.get_ident()
280 if self.locksheld[threadid] > 1:
281 self.locksheld[threadid] -= 1
282 return
283 else:
284 del self.locksheld[threadid]
285 self.lock.release()
286 finally:
287 self.statuslock.release()
288
289