]>
code.delx.au - offlineimap/blob - offlineimap/threadutil.py
1 # Copyright (C) 2002, 2003 John Goerzen
2 # Thread support module
3 # <jgoerzen@complete.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 from threading
import *
20 from StringIO
import StringIO
21 from Queue
import Queue
, Empty
22 import sys
, traceback
, thread
, time
23 from offlineimap
.ui
import UIBase
# for getglobalui()
27 def setprofiledir(newdir
):
31 ######################################################################
33 ######################################################################
35 def semaphorereset(semaphore
, originalstate
):
36 """Wait until the semaphore gets back to its original state -- all acquired
37 resources released."""
38 for i
in range(originalstate
):
41 for i
in range(originalstate
):
44 def semaphorewait(semaphore
):
48 def threadsreset(threadlist
):
49 for thr
in threadlist
:
57 def add(self
, thread
):
60 self
.list.append(thread
)
64 def remove(self
, thread
):
67 self
.list.remove(thread
)
74 if not len(self
.list):
76 return self
.list.pop()
88 ######################################################################
90 ######################################################################
92 exitthreads
= Queue(100)
96 """Initialize the exit notify system. This MUST be called from the
97 SAME THREAD that will call monitorloop BEFORE it calls monitorloop.
98 This SHOULD be called before the main thread starts any other
99 ExitNotifyThreads, or else it may miss the ability to catch the exit
103 def exitnotifymonitorloop(callback
):
104 """Enter an infinite "monitoring" loop. The argument, callback,
105 defines the function to call when an ExitNotifyThread has terminated.
106 That function is called with a single argument -- the ExitNotifyThread
107 that has terminated. The monitor will not continue to monitor for
108 other threads until the function returns, so if it intends to perform
109 long calculations, it should start a new thread itself -- but NOT
110 an ExitNotifyThread, or else an infinite loop may result. Furthermore,
111 the monitor will hold the lock all the while the other thread is waiting.
114 while 1: # Loop forever.
116 thrd
= exitthreads
.get(False)
121 def threadexited(thread
):
122 """Called when a thread exits."""
123 ui
= UIBase
.getglobalui()
124 if thread
.getExitCause() == 'EXCEPTION':
125 if isinstance(thread
.getExitException(), SystemExit):
126 # Bring a SystemExit into the main thread.
127 # Do not send it back to UI layer right now.
128 # Maybe later send it to ui.terminate?
130 ui
.threadException(thread
) # Expected to terminate
131 sys
.exit(100) # Just in case...
133 elif thread
.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
139 ui
.threadExited(thread
)
141 class ExitNotifyThread(Thread
):
142 """This class is designed to alert a "monitor" to the fact that a thread has
143 exited and to provide for the ability for it to find out why."""
145 global exitthreads
, profiledir
146 self
.threadid
= thread
.get_ident()
148 if not profiledir
: # normal case
152 prof
= profile
.Profile()
154 prof
= prof
.runctx("Thread.run(self)", globals(), locals())
158 profiledir
+ "/" + str(self
.threadid
) + "_" + \
159 self
.getName() + ".prof")
161 self
.setExitCause('EXCEPTION')
163 self
.setExitException(sys
.exc_info()[1])
165 traceback
.print_exc(file = sbuf
)
166 self
.setExitStackTrace(sbuf
.getvalue())
168 self
.setExitCause('NORMAL')
169 if not hasattr(self
, 'exitmessage'):
170 self
.setExitMessage(None)
173 exitthreads
.put(self
, True)
175 def setExitCause(self
, cause
):
176 self
.exitcause
= cause
177 def getExitCause(self
):
178 """Returns the cause of the exit, one of:
179 'EXCEPTION' -- the thread aborted because of an exception
180 'NORMAL' -- normal termination."""
181 return self
.exitcause
182 def setExitException(self
, exc
):
183 self
.exitexception
= exc
184 def getExitException(self
):
185 """If getExitCause() is 'EXCEPTION', holds the value from
186 sys.exc_info()[1] for this exception."""
187 return self
.exitexception
188 def setExitStackTrace(self
, st
):
189 self
.exitstacktrace
= st
190 def getExitStackTrace(self
):
191 """If getExitCause() is 'EXCEPTION', returns a string representing
192 the stack trace for this exception."""
193 return self
.exitstacktrace
194 def setExitMessage(self
, msg
):
195 """Sets the exit message to be fetched by a subsequent call to
196 getExitMessage. This message may be any object or type except
198 self
.exitmessage
= msg
199 def getExitMessage(self
):
200 """For any exit cause, returns the message previously set by
201 a call to setExitMessage(), or None if there was no such message
203 return self
.exitmessage
206 ######################################################################
207 # Instance-limited threads
208 ######################################################################
210 instancelimitedsems
= {}
211 instancelimitedlock
= Lock()
213 def initInstanceLimit(instancename
, instancemax
):
214 """Initialize the instance-limited thread implementation to permit
215 up to intancemax threads with the given instancename."""
216 instancelimitedlock
.acquire()
217 if not instancelimitedsems
.has_key(instancename
):
218 instancelimitedsems
[instancename
] = BoundedSemaphore(instancemax
)
219 instancelimitedlock
.release()
221 class InstanceLimitedThread(ExitNotifyThread
):
222 def __init__(self
, instancename
, *args
, **kwargs
):
223 self
.instancename
= instancename
225 apply(ExitNotifyThread
.__init
__, (self
,) + args
, kwargs
)
228 instancelimitedsems
[self
.instancename
].acquire()
229 ExitNotifyThread
.start(self
)
233 ExitNotifyThread
.run(self
)
235 if instancelimitedsems
and instancelimitedsems
[self
.instancename
]:
236 instancelimitedsems
[self
.instancename
].release()
239 ######################################################################
240 # Multi-lock -- capable of handling a single thread requesting a lock
242 ######################################################################
247 self
.statuslock
= Lock()
251 """Obtain a lock. Provides nice support for a single
252 thread trying to lock it several times -- as may be the case
253 if one I/O-using object calls others, while wanting to make it all
254 an atomic operation. Keeps a "lock request count" for the current
255 thread, and acquires the lock when it goes above zero, releases when
258 This call is always blocking."""
260 # First, check to see if this thread already has a lock.
261 # If so, increment the lock count and just return.
262 self
.statuslock
.acquire()
264 threadid
= thread
.get_ident()
266 if threadid
in self
.locksheld
:
267 self
.locksheld
[threadid
] += 1
270 # This is safe because it is a per-thread structure
271 self
.locksheld
[threadid
] = 1
273 self
.statuslock
.release()
277 self
.statuslock
.acquire()
279 threadid
= thread
.get_ident()
280 if self
.locksheld
[threadid
] > 1:
281 self
.locksheld
[threadid
] -= 1
284 del self
.locksheld
[threadid
]
287 self
.statuslock
.release()