]> code.delx.au - offlineimap/blob - offlineimap/threadutil.py
Step 2 of SVN to arch tree conversion
[offlineimap] / offlineimap / threadutil.py
1 # Copyright (C) 2002, 2003 John Goerzen
2 # Thread support module
3 # <jgoerzen@complete.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
19 from threading import *
20 from StringIO import StringIO
21 import sys, traceback, thread, profile
22 from offlineimap.ui import UIBase # for getglobalui()
23
24 profiledir = None
25
26 def setprofiledir(newdir):
27 global profiledir
28 profiledir = newdir
29
30 ######################################################################
31 # General utilities
32 ######################################################################
33
34 def semaphorereset(semaphore, originalstate):
35 """Wait until the semaphore gets back to its original state -- all acquired
36 resources released."""
37 for i in range(originalstate):
38 semaphore.acquire()
39 # Now release these.
40 for i in range(originalstate):
41 semaphore.release()
42
43 def semaphorewait(semaphore):
44 semaphore.acquire()
45 semaphore.release()
46
47 def threadsreset(threadlist):
48 for thr in threadlist:
49 thr.join()
50
51 class threadlist:
52 def __init__(self):
53 self.lock = Lock()
54 self.list = []
55
56 def add(self, thread):
57 self.lock.acquire()
58 try:
59 self.list.append(thread)
60 finally:
61 self.lock.release()
62
63 def remove(self, thread):
64 self.lock.acquire()
65 try:
66 self.list.remove(thread)
67 finally:
68 self.lock.release()
69
70 def pop(self):
71 self.lock.acquire()
72 try:
73 if not len(self.list):
74 return None
75 return self.list.pop()
76 finally:
77 self.lock.release()
78
79 def reset(self):
80 while 1:
81 thread = self.pop()
82 if not thread:
83 return
84 thread.join()
85
86
87 ######################################################################
88 # Exit-notify threads
89 ######################################################################
90
91 exitcondition = Condition(Lock())
92 exitthreads = []
93 inited = 0
94
95 def initexitnotify():
96 """Initialize the exit notify system. This MUST be called from the
97 SAME THREAD that will call monitorloop BEFORE it calls monitorloop.
98 This SHOULD be called before the main thread starts any other
99 ExitNotifyThreads, or else it may miss the ability to catch the exit
100 status from them!"""
101 pass
102
103 def exitnotifymonitorloop(callback):
104 """Enter an infinite "monitoring" loop. The argument, callback,
105 defines the function to call when an ExitNotifyThread has terminated.
106 That function is called with a single argument -- the ExitNotifyThread
107 that has terminated. The monitor will not continue to monitor for
108 other threads until the function returns, so if it intends to perform
109 long calculations, it should start a new thread itself -- but NOT
110 an ExitNotifyThread, or else an infinite loop may result. Furthermore,
111 the monitor will hold the lock all the while the other thread is waiting.
112 """
113 global exitcondition, exitthreads
114 while 1: # Loop forever.
115 exitcondition.acquire()
116 try:
117 while not len(exitthreads):
118 exitcondition.wait(1)
119
120 while len(exitthreads):
121 callback(exitthreads.pop(0)) # Pull off in order added!
122 finally:
123 exitcondition.release()
124
125 def threadexited(thread):
126 """Called when a thread exits."""
127 ui = UIBase.getglobalui()
128 if thread.getExitCause() == 'EXCEPTION':
129 if isinstance(thread.getExitException(), SystemExit):
130 # Bring a SystemExit into the main thread.
131 # Do not send it back to UI layer right now.
132 # Maybe later send it to ui.terminate?
133 raise SystemExit
134 ui.threadException(thread) # Expected to terminate
135 sys.exit(100) # Just in case...
136 os._exit(100)
137 elif thread.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
138 ui.terminate()
139 # Just in case...
140 sys.exit(100)
141 os._exit(100)
142 else:
143 ui.threadExited(thread)
144
145 class ExitNotifyThread(Thread):
146 """This class is designed to alert a "monitor" to the fact that a thread has
147 exited and to provide for the ability for it to find out why."""
148 def run(self):
149 global exitcondition, exitthreads, profiledir
150 self.threadid = thread.get_ident()
151 try:
152 if not profiledir: # normal case
153 Thread.run(self)
154 else:
155 prof = profile.Profile()
156 try:
157 prof = prof.runctx("Thread.run(self)", globals(), locals())
158 except SystemExit:
159 pass
160 prof.dump_stats( \
161 profiledir + "/" + str(self.threadid) + "_" + \
162 self.getName() + ".prof")
163 except:
164 self.setExitCause('EXCEPTION')
165 self.setExitException(sys.exc_info()[1])
166 sbuf = StringIO()
167 traceback.print_exc(file = sbuf)
168 self.setExitStackTrace(sbuf.getvalue())
169 else:
170 self.setExitCause('NORMAL')
171 if not hasattr(self, 'exitmessage'):
172 self.setExitMessage(None)
173 exitcondition.acquire()
174 exitthreads.append(self)
175 exitcondition.notify()
176 exitcondition.release()
177
178 def setExitCause(self, cause):
179 self.exitcause = cause
180 def getExitCause(self):
181 """Returns the cause of the exit, one of:
182 'EXCEPTION' -- the thread aborted because of an exception
183 'NORMAL' -- normal termination."""
184 return self.exitcause
185 def setExitException(self, exc):
186 self.exitexception = exc
187 def getExitException(self):
188 """If getExitCause() is 'EXCEPTION', holds the value from
189 sys.exc_info()[1] for this exception."""
190 return self.exitexception
191 def setExitStackTrace(self, st):
192 self.exitstacktrace = st
193 def getExitStackTrace(self):
194 """If getExitCause() is 'EXCEPTION', returns a string representing
195 the stack trace for this exception."""
196 return self.exitstacktrace
197 def setExitMessage(self, msg):
198 """Sets the exit message to be fetched by a subsequent call to
199 getExitMessage. This message may be any object or type except
200 None."""
201 self.exitmessage = msg
202 def getExitMessage(self):
203 """For any exit cause, returns the message previously set by
204 a call to setExitMessage(), or None if there was no such message
205 set."""
206 return self.exitmessage
207
208
209 ######################################################################
210 # Instance-limited threads
211 ######################################################################
212
213 instancelimitedsems = {}
214 instancelimitedlock = Lock()
215
216 def initInstanceLimit(instancename, instancemax):
217 """Initialize the instance-limited thread implementation to permit
218 up to intancemax threads with the given instancename."""
219 instancelimitedlock.acquire()
220 if not instancelimitedsems.has_key(instancename):
221 instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
222 instancelimitedlock.release()
223
224 class InstanceLimitedThread(ExitNotifyThread):
225 def __init__(self, instancename, *args, **kwargs):
226 self.instancename = instancename
227
228 apply(ExitNotifyThread.__init__, (self,) + args, kwargs)
229
230 def start(self):
231 instancelimitedsems[self.instancename].acquire()
232 ExitNotifyThread.start(self)
233
234 def run(self):
235 try:
236 ExitNotifyThread.run(self)
237 finally:
238 instancelimitedsems[self.instancename].release()
239
240
241 ######################################################################
242 # Multi-lock -- capable of handling a single thread requesting a lock
243 # multiple times
244 ######################################################################
245
246 class MultiLock:
247 def __init__(self):
248 self.lock = Lock()
249 self.statuslock = Lock()
250 self.locksheld = {}
251
252 def acquire(self):
253 """Obtain a lock. Provides nice support for a single
254 thread trying to lock it several times -- as may be the case
255 if one I/O-using object calls others, while wanting to make it all
256 an atomic operation. Keeps a "lock request count" for the current
257 thread, and acquires the lock when it goes above zero, releases when
258 it goes below one.
259
260 This call is always blocking."""
261
262 # First, check to see if this thread already has a lock.
263 # If so, increment the lock count and just return.
264 self.statuslock.acquire()
265 try:
266 threadid = thread.get_ident()
267
268 if threadid in self.locksheld:
269 self.locksheld[threadid] += 1
270 return
271 else:
272 # This is safe because it is a per-thread structure
273 self.locksheld[threadid] = 1
274 finally:
275 self.statuslock.release()
276 self.lock.acquire()
277
278 def release(self):
279 self.statuslock.acquire()
280 try:
281 threadid = thread.get_ident()
282 if self.locksheld[threadid] > 1:
283 self.locksheld[threadid] -= 1
284 return
285 else:
286 del self.locksheld[threadid]
287 self.lock.release()
288 finally:
289 self.statuslock.release()
290
291