]> code.delx.au - offlineimap/blob - offlineimap/head/offlineimap/threadutil.py
3458f6cafeef1053f34ac782ef1a91e62af6c1a4
[offlineimap] / offlineimap / head / offlineimap / threadutil.py
1 # Copyright (C) 2002 John Goerzen
2 # Thread support module
3 # <jgoerzen@complete.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; version 2 of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
18 from threading import *
19 from StringIO import StringIO
20 import sys, traceback, thread, profile
21 from offlineimap.ui import UIBase # for getglobalui()
22
23 profiledir = None
24
25 def setprofiledir(newdir):
26 global profiledir
27 profiledir = newdir
28
29 ######################################################################
30 # General utilities
31 ######################################################################
32
33 def semaphorereset(semaphore, originalstate):
34 """Wait until the semaphore gets back to its original state -- all acquired
35 resources released."""
36 for i in range(originalstate):
37 semaphore.acquire()
38 # Now release these.
39 for i in range(originalstate):
40 semaphore.release()
41
42 def semaphorewait(semaphore):
43 semaphore.acquire()
44 semaphore.release()
45
46 def threadsreset(threadlist):
47 for thr in threadlist:
48 thr.join()
49
50 class threadlist:
51 def __init__(self):
52 self.lock = Lock()
53 self.list = []
54
55 def add(self, thread):
56 self.lock.acquire()
57 try:
58 self.list.append(thread)
59 finally:
60 self.lock.release()
61
62 def remove(self, thread):
63 self.lock.acquire()
64 try:
65 self.list.remove(thread)
66 finally:
67 self.lock.release()
68
69 def pop(self):
70 self.lock.acquire()
71 try:
72 if not len(self.list):
73 return None
74 return self.list.pop()
75 finally:
76 self.lock.release()
77
78 def reset(self):
79 while 1:
80 thread = self.pop()
81 if not thread:
82 return
83 thread.join()
84
85
86 ######################################################################
87 # Exit-notify threads
88 ######################################################################
89
90 exitcondition = Condition(Lock())
91 exitthreads = []
92 inited = 0
93
94 def initexitnotify():
95 """Initialize the exit notify system. This MUST be called from the
96 SAME THREAD that will call monitorloop BEFORE it calls monitorloop.
97 This SHOULD be called before the main thread starts any other
98 ExitNotifyThreads, or else it may miss the ability to catch the exit
99 status from them!"""
100 pass
101
102 def exitnotifymonitorloop(callback):
103 """Enter an infinite "monitoring" loop. The argument, callback,
104 defines the function to call when an ExitNotifyThread has terminated.
105 That function is called with a single argument -- the ExitNotifyThread
106 that has terminated. The monitor will not continue to monitor for
107 other threads until the function returns, so if it intends to perform
108 long calculations, it should start a new thread itself -- but NOT
109 an ExitNotifyThread, or else an infinite loop may result. Furthermore,
110 the monitor will hold the lock all the while the other thread is waiting.
111 """
112 global exitcondition, exitthreads
113 while 1: # Loop forever.
114 exitcondition.acquire()
115 while not len(exitthreads):
116 exitcondition.wait(1)
117
118 while len(exitthreads):
119 callback(exitthreads.pop(0)) # Pull off in order added!
120 exitcondition.release()
121
122 def threadexited(thread):
123 """Called when a thread exits."""
124 ui = UIBase.getglobalui()
125 if thread.getExitCause() == 'EXCEPTION':
126 if isinstance(thread.getExitException(), SystemExit):
127 # Bring a SystemExit into the main thread.
128 # Do not send it back to UI layer right now.
129 # Maybe later send it to ui.terminate?
130 raise SystemExit
131 ui.threadException(thread) # Expected to terminate
132 sys.exit(100) # Just in case...
133 os._exit(100)
134 elif thread.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
135 ui.terminate()
136 # Just in case...
137 sys.exit(100)
138 os._exit(100)
139 else:
140 ui.threadExited(thread)
141
142 class ExitNotifyThread(Thread):
143 """This class is designed to alert a "monitor" to the fact that a thread has
144 exited and to provide for the ability for it to find out why."""
145 def run(self):
146 global exitcondition, exitthreads, profiledir
147 self.threadid = thread.get_ident()
148 try:
149 if not profiledir: # normal case
150 Thread.run(self)
151 else:
152 prof = profile.Profile()
153 try:
154 prof = prof.runctx("Thread.run(self)", globals(), locals())
155 except SystemExit:
156 pass
157 prof.dump_stats( \
158 profiledir + "/" + str(self.threadid) + "_" + \
159 self.getName() + ".prof")
160 except:
161 self.setExitCause('EXCEPTION')
162 self.setExitException(sys.exc_info()[1])
163 sbuf = StringIO()
164 traceback.print_exc(file = sbuf)
165 self.setExitStackTrace(sbuf.getvalue())
166 else:
167 self.setExitCause('NORMAL')
168 if not hasattr(self, 'exitmessage'):
169 self.setExitMessage(None)
170 exitcondition.acquire()
171 exitthreads.append(self)
172 exitcondition.notify()
173 exitcondition.release()
174
175 def setExitCause(self, cause):
176 self.exitcause = cause
177 def getExitCause(self):
178 """Returns the cause of the exit, one of:
179 'EXCEPTION' -- the thread aborted because of an exception
180 'NORMAL' -- normal termination."""
181 return self.exitcause
182 def setExitException(self, exc):
183 self.exitexception = exc
184 def getExitException(self):
185 """If getExitCause() is 'EXCEPTION', holds the value from
186 sys.exc_info()[1] for this exception."""
187 return self.exitexception
188 def setExitStackTrace(self, st):
189 self.exitstacktrace = st
190 def getExitStackTrace(self):
191 """If getExitCause() is 'EXCEPTION', returns a string representing
192 the stack trace for this exception."""
193 return self.exitstacktrace
194 def setExitMessage(self, msg):
195 """Sets the exit message to be fetched by a subsequent call to
196 getExitMessage. This message may be any object or type except
197 None."""
198 self.exitmessage = msg
199 def getExitMessage(self):
200 """For any exit cause, returns the message previously set by
201 a call to setExitMessage(), or None if there was no such message
202 set."""
203 return self.exitmessage
204
205
206 ######################################################################
207 # Instance-limited threads
208 ######################################################################
209
210 instancelimitedsems = {}
211 instancelimitedlock = Lock()
212
213 def initInstanceLimit(instancename, instancemax):
214 """Initialize the instance-limited thread implementation to permit
215 up to intancemax threads with the given instancename."""
216 instancelimitedlock.acquire()
217 if not instancelimitedsems.has_key(instancename):
218 instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
219 instancelimitedlock.release()
220
221 class InstanceLimitedThread(ExitNotifyThread):
222 def __init__(self, instancename, *args, **kwargs):
223 self.instancename = instancename
224
225 apply(ExitNotifyThread.__init__, (self,) + args, kwargs)
226
227 def start(self):
228 instancelimitedsems[self.instancename].acquire()
229 ExitNotifyThread.start(self)
230
231 def run(self):
232 try:
233 ExitNotifyThread.run(self)
234 finally:
235 instancelimitedsems[self.instancename].release()
236
237
238 ######################################################################
239 # Multi-lock -- capable of handling a single thread requesting a lock
240 # multiple times
241 ######################################################################
242
243 class MultiLock:
244 def __init__(self):
245 self.lock = Lock()
246 self.statuslock = Lock()
247 self.locksheld = {}
248
249 def acquire(self):
250 """Obtain a lock. Provides nice support for a single
251 thread trying to lock it several times -- as may be the case
252 if one I/O-using object calls others, while wanting to make it all
253 an atomic operation. Keeps a "lock request count" for the current
254 thread, and acquires the lock when it goes above zero, releases when
255 it goes below one.
256
257 This call is always blocking."""
258
259 # First, check to see if this thread already has a lock.
260 # If so, increment the lock count and just return.
261 self.statuslock.acquire()
262 try:
263 threadid = thread.get_ident()
264
265 if threadid in self.locksheld:
266 self.locksheld[threadid] += 1
267 return
268 else:
269 # This is safe because it is a per-thread structure
270 self.locksheld[threadid] = 1
271 finally:
272 self.statuslock.release()
273 self.lock.acquire()
274
275 def release(self):
276 self.statuslock.acquire()
277 try:
278 threadid = thread.get_ident()
279 if self.locksheld[threadid] > 1:
280 self.locksheld[threadid] -= 1
281 return
282 else:
283 del self.locksheld[threadid]
284 self.lock.release()
285 finally:
286 self.statuslock.release()
287
288