]> code.delx.au - bg-scripts/blob - lib/AsyncSocket.py
RandomBG: Make Listener non-writeable
[bg-scripts] / lib / AsyncSocket.py
1 #!/usr/bin/env python
2 # Copyright 2007 Greg Darke <gdar9540@usyd.edu.au>
3 # Licensed for distribution under the GPL version 2, check COPYING for details
4 # A async framework for sockets and fds (fds only supported under unix operating systems)
5 # NOTE: Orig version submitted for NETS3603 assignment 1 (Semester 1 - 2007)
6
7
8 from __future__ import division
9 import os, sys, select, socket, bisect, fcntl
10 from time import time
11
12 class Callback(object):
13 __slots__ = ['callback', 'callback_time']
14 def __init__(self, callback_time, callback):
15 self.callback_time = callback_time
16 self.callback = callback
17 def __call__(self):
18 return self.callback()
19 def __lt__(self, other):
20 if hasattr(other, 'callback_time'):
21 return self.callback_time < other.callback_time
22 else:
23 return NotImplemented
24
25 class AsyncSocketOwner(object):
26 """This is the object contains the 'main loop' of the application"""
27 def __init__(self):
28 self.sockets_input = []
29 self.socket_callbacks = {}
30 self.timer_callbacks = []
31 self._exit = False
32 self.state = {}
33
34 def print_state(self):
35 ### sys.stdout.write('\033[H\033[2J')
36 print "\n".join(['%s: %s' % v for v in self.state.items()])
37 self.addCallback(1.0, self.print_state)
38
39 def _check_timers_callbacks(self):
40 now = time()
41 i = bisect.bisect(self.timer_callbacks, Callback(now, None))
42 self.state['Processing Callbacks'] = '%d of %d' % (i,
43 len(self.timer_callbacks))
44 needCall = self.timer_callbacks[0:i]
45 self.timer_callbacks = self.timer_callbacks[i:]
46
47 for callback in needCall:
48 callback()
49
50 def exit(self):
51 self._exit = True
52
53 def mainLoop(self):
54 try:
55 while not self._exit:
56 if len(self.timer_callbacks) > 0:
57 timeout = max(self.timer_callbacks[0].callback_time - time(), 0)
58 # Wait until the next timer expires for input
59 inputready, outputready, exceptready = \
60 select.select(self.sockets_input, [], [], timeout)
61 else:
62 # Wait forever for input
63 inputready, outputready, exceptready = \
64 select.select(self.sockets_input, [], [])
65
66 # Handle any data received
67 self.state['Waiting sockets'] = len(inputready)
68 self.state['Socket count'] = len(self.sockets_input)
69 for s in inputready:
70 self.socket_callbacks[s](s)
71
72 # Handle timers:
73 if len(self.timer_callbacks) > 0 and \
74 self.timer_callbacks[0].callback_time < time():
75 self._check_timers_callbacks()
76 except KeyboardInterrupt:
77 pass
78
79 def _addFDCallback(self, fd, callback):
80 """Add a callback for a file descriptor, also add it to the select call"""
81 self.sockets_input.append(fd)
82 self.socket_callbacks[fd] = callback
83
84 def removeSocket(self, fd):
85 """Removes the sepecified fd from the event loop"""
86 self.sockets_input.remove(fd)
87 del self.socket_callbacks[fd]
88
89 def addFD(self, fd, callback):
90 """Adds a file descriptor to the event loop"""
91 # Turn blocking off
92 flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
93 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
94 self._addFDCallback(fd, callback)
95
96 def addSocket(self, s, callback):
97 """Adds a socket to the event loop"""
98 # Disable blocking - So now we have an async socket
99 s.setblocking(False)
100 self._addFDCallback(s, callback)
101
102 def addLineBufferedSocket(self, s, callback):
103 sockWrapper = LineBufferedAsyncClientConnection(s, callback, self)
104 s.setblocking(False)
105 self._addFDCallback(s, sockWrapper._dataArrived)
106
107 def addCallback(self, seconds, callback):
108 """Add a timer callback"""
109 # Keep the list of callbacks sorted to keep things more efficient (Note: This would be better with a heap)
110 cb = Callback(time() + seconds, callback)
111 bisect.insort(self.timer_callbacks, cb)
112 return cb
113
114 def removeCallback(self, callback_object):
115 """Remove a callback from the list. NB: If the time has fired/is in the list to be
116 fired, the outcome is undefined (currently it will be called - but this may change)"""
117 if callback_object in self.timer_callbacks:
118 self.timer_callbacks.remove(callback_object)
119
120 class LineBufferedAsyncClientConnection(object):
121 __slots__ = ['sock', 'callback', 'delim', 'eventLoop', 'linesBuffer', 'lineBuffer', 'closed']
122 def __init__(self, sock, callback, eventLoop, delim = '\n'):
123 self.sock = sock
124 self.callback = callback
125 self.delim = delim
126 self.eventLoop = eventLoop
127 self.linesBuffer = []
128 self.lineBuffer = ''
129
130 def _dataArrived(self, *args, **kwargs):
131 data = self.sock.recv(65535)
132 if not data:
133 self.closed = True
134 self.eventLoop.removeSocket(self.sock)
135 return
136
137 self.lineBuffer += data
138 newLinePos = self.lineBuffer.rfind(self.delim)
139 if newLinePos >= 0:
140 self.linesBuffer += self.lineBuffer[:newLinePos].split(self.delim)
141 self.lineBuffer = self.lineBuffer[newLinePos+1:]
142 self.callback(self)
143
144 def fileno(self):
145 """Return the encapsulated socket's fileno (used for select.select)"""
146 return self.sock.fileno()
147
148 def readline(self):
149 if not self.hasLine():
150 raise Exception('No data in buffer')
151 ret = self.linesBuffer[0]
152 del self.linesBuffer[0]
153 return ret
154
155 def write(self, data):
156 self.sock.write(data)
157 send = write
158
159 def hasLine(self):
160 return len(self.linesBuffer) > 0