]> code.delx.au - offlineimap/blob - offlineimap/threadutil.py
Added docs for hooks
[offlineimap] / offlineimap / threadutil.py
1 # Copyright (C) 2002, 2003 John Goerzen
2 # Thread support module
3 # <jgoerzen@complete.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 from threading import *
20 from StringIO import StringIO
21 from Queue import Queue, Empty
22 import sys, traceback, thread, time
23 from offlineimap.ui import UIBase # for getglobalui()
24
25 profiledir = None
26
27 def setprofiledir(newdir):
28 global profiledir
29 profiledir = newdir
30
31 ######################################################################
32 # General utilities
33 ######################################################################
34
35 def semaphorereset(semaphore, originalstate):
36 """Wait until the semaphore gets back to its original state -- all acquired
37 resources released."""
38 for i in range(originalstate):
39 semaphore.acquire()
40 # Now release these.
41 for i in range(originalstate):
42 semaphore.release()
43
44 def semaphorewait(semaphore):
45 semaphore.acquire()
46 semaphore.release()
47
48 def threadsreset(threadlist):
49 for thr in threadlist:
50 thr.join()
51
52 class threadlist:
53 def __init__(self):
54 self.lock = Lock()
55 self.list = []
56
57 def add(self, thread):
58 self.lock.acquire()
59 try:
60 self.list.append(thread)
61 finally:
62 self.lock.release()
63
64 def remove(self, thread):
65 self.lock.acquire()
66 try:
67 self.list.remove(thread)
68 finally:
69 self.lock.release()
70
71 def pop(self):
72 self.lock.acquire()
73 try:
74 if not len(self.list):
75 return None
76 return self.list.pop()
77 finally:
78 self.lock.release()
79
80 def reset(self):
81 while 1:
82 thread = self.pop()
83 if not thread:
84 return
85 thread.join()
86
87
88 ######################################################################
89 # Exit-notify threads
90 ######################################################################
91
92 exitthreads = Queue(100)
93 inited = 0
94
95 def initexitnotify():
96 """Initialize the exit notify system. This MUST be called from the
97 SAME THREAD that will call monitorloop BEFORE it calls monitorloop.
98 This SHOULD be called before the main thread starts any other
99 ExitNotifyThreads, or else it may miss the ability to catch the exit
100 status from them!"""
101 pass
102
103 def exitnotifymonitorloop(callback):
104 """Enter an infinite "monitoring" loop. The argument, callback,
105 defines the function to call when an ExitNotifyThread has terminated.
106 That function is called with a single argument -- the ExitNotifyThread
107 that has terminated. The monitor will not continue to monitor for
108 other threads until the function returns, so if it intends to perform
109 long calculations, it should start a new thread itself -- but NOT
110 an ExitNotifyThread, or else an infinite loop may result. Furthermore,
111 the monitor will hold the lock all the while the other thread is waiting.
112 """
113 global exitthreads
114 while 1: # Loop forever.
115 try:
116 thrd = exitthreads.get(False)
117 callback(thrd)
118 except Empty:
119 time.sleep(1)
120
121 def threadexited(thread):
122 """Called when a thread exits."""
123 ui = UIBase.getglobalui()
124 if thread.getExitCause() == 'EXCEPTION':
125 if isinstance(thread.getExitException(), SystemExit):
126 # Bring a SystemExit into the main thread.
127 # Do not send it back to UI layer right now.
128 # Maybe later send it to ui.terminate?
129 raise SystemExit
130 ui.threadException(thread) # Expected to terminate
131 sys.exit(100) # Just in case...
132 os._exit(100)
133 elif thread.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
134 ui.terminate()
135 # Just in case...
136 sys.exit(100)
137 os._exit(100)
138 else:
139 ui.threadExited(thread)
140
141 class ExitNotifyThread(Thread):
142 """This class is designed to alert a "monitor" to the fact that a thread has
143 exited and to provide for the ability for it to find out why."""
144 def run(self):
145 global exitthreads, profiledir
146 self.threadid = thread.get_ident()
147 try:
148 if not profiledir: # normal case
149 Thread.run(self)
150 else:
151 import profile
152 prof = profile.Profile()
153 try:
154 prof = prof.runctx("Thread.run(self)", globals(), locals())
155 except SystemExit:
156 pass
157 prof.dump_stats( \
158 profiledir + "/" + str(self.threadid) + "_" + \
159 self.getName() + ".prof")
160 except:
161 self.setExitCause('EXCEPTION')
162 self.setExitException(sys.exc_info()[1])
163 sbuf = StringIO()
164 traceback.print_exc(file = sbuf)
165 self.setExitStackTrace(sbuf.getvalue())
166 else:
167 self.setExitCause('NORMAL')
168 if not hasattr(self, 'exitmessage'):
169 self.setExitMessage(None)
170
171 exitthreads.put(self, True)
172
173 def setExitCause(self, cause):
174 self.exitcause = cause
175 def getExitCause(self):
176 """Returns the cause of the exit, one of:
177 'EXCEPTION' -- the thread aborted because of an exception
178 'NORMAL' -- normal termination."""
179 return self.exitcause
180 def setExitException(self, exc):
181 self.exitexception = exc
182 def getExitException(self):
183 """If getExitCause() is 'EXCEPTION', holds the value from
184 sys.exc_info()[1] for this exception."""
185 return self.exitexception
186 def setExitStackTrace(self, st):
187 self.exitstacktrace = st
188 def getExitStackTrace(self):
189 """If getExitCause() is 'EXCEPTION', returns a string representing
190 the stack trace for this exception."""
191 return self.exitstacktrace
192 def setExitMessage(self, msg):
193 """Sets the exit message to be fetched by a subsequent call to
194 getExitMessage. This message may be any object or type except
195 None."""
196 self.exitmessage = msg
197 def getExitMessage(self):
198 """For any exit cause, returns the message previously set by
199 a call to setExitMessage(), or None if there was no such message
200 set."""
201 return self.exitmessage
202
203
204 ######################################################################
205 # Instance-limited threads
206 ######################################################################
207
208 instancelimitedsems = {}
209 instancelimitedlock = Lock()
210
211 def initInstanceLimit(instancename, instancemax):
212 """Initialize the instance-limited thread implementation to permit
213 up to intancemax threads with the given instancename."""
214 instancelimitedlock.acquire()
215 if not instancelimitedsems.has_key(instancename):
216 instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
217 instancelimitedlock.release()
218
219 class InstanceLimitedThread(ExitNotifyThread):
220 def __init__(self, instancename, *args, **kwargs):
221 self.instancename = instancename
222
223 apply(ExitNotifyThread.__init__, (self,) + args, kwargs)
224
225 def start(self):
226 instancelimitedsems[self.instancename].acquire()
227 ExitNotifyThread.start(self)
228
229 def run(self):
230 try:
231 ExitNotifyThread.run(self)
232 finally:
233 instancelimitedsems[self.instancename].release()
234
235
236 ######################################################################
237 # Multi-lock -- capable of handling a single thread requesting a lock
238 # multiple times
239 ######################################################################
240
241 class MultiLock:
242 def __init__(self):
243 self.lock = Lock()
244 self.statuslock = Lock()
245 self.locksheld = {}
246
247 def acquire(self):
248 """Obtain a lock. Provides nice support for a single
249 thread trying to lock it several times -- as may be the case
250 if one I/O-using object calls others, while wanting to make it all
251 an atomic operation. Keeps a "lock request count" for the current
252 thread, and acquires the lock when it goes above zero, releases when
253 it goes below one.
254
255 This call is always blocking."""
256
257 # First, check to see if this thread already has a lock.
258 # If so, increment the lock count and just return.
259 self.statuslock.acquire()
260 try:
261 threadid = thread.get_ident()
262
263 if threadid in self.locksheld:
264 self.locksheld[threadid] += 1
265 return
266 else:
267 # This is safe because it is a per-thread structure
268 self.locksheld[threadid] = 1
269 finally:
270 self.statuslock.release()
271 self.lock.acquire()
272
273 def release(self):
274 self.statuslock.acquire()
275 try:
276 threadid = thread.get_ident()
277 if self.locksheld[threadid] > 1:
278 self.locksheld[threadid] -= 1
279 return
280 else:
281 del self.locksheld[threadid]
282 self.lock.release()
283 finally:
284 self.statuslock.release()
285
286