]> code.delx.au - pymsnt/blob - src/main.py
Send offline presence to users outside of session as well as message.
[pymsnt] / src / main.py
1 # Copyright 2004-2006 James Bunton <james@delx.cjb.net>
2 # Licensed for distribution under the GPL version 2, check COPYING for details
3
4 import os, os.path, time, sys, codecs, getopt
5 reload(sys)
6 sys.setdefaultencoding("utf-8")
7 sys.stdout = codecs.lookup('utf-8')[-1](sys.stdout)
8
9 # Find the best reactor
10 selectWarning = "Unable to install any good reactors (kqueue, epoll, poll).\nWe fell back to using select. You may have scalability problems.\nThis reactor will not support more than 1024 connections at a time."
11 try:
12 from twisted.internet import epollreactor as bestreactor
13 except:
14 try:
15 from twisted.internet import kqreactor as bestreactor
16 except:
17 try:
18 from twisted.internet import pollreactor as bestreactor
19 except:
20 try:
21 from twisted.internet import selectreactor as bestreactor
22 print selectWarning
23 except:
24 try:
25 from twisted.internet import default as bestreactor
26 print selectWarning
27 except:
28 print "Unable to find a reactor.\nExiting..."
29 sys.exit(1)
30 bestreactor.install()
31
32
33
34 # Must load config before everything else
35 import config
36 import xmlconfig
37 configFile = "config.xml"
38 configOptions = {}
39 opts, args = getopt.getopt(sys.argv[1:], "bc:o:dDgtlp:h", ["background", "config=", "option=", "debug", "Debug", "garbage", "traceback", "log=", "pid=", "help"])
40 for o, v in opts:
41 if o in ("-c", "--config"):
42 configFile = v
43 elif o in ("-b", "--background"):
44 config.background = True
45 elif o in ("-d", "--debug"):
46 config.debugLevel = "2"
47 elif o in ("-D", "--Debug"):
48 config.debugLevel = "3"
49 elif o in ("-g", "--garbage"):
50 import gc
51 gc.set_debug(gc.DEBUG_LEAK|gc.DEBUG_STATS)
52 elif o in ("-t", "--traceback"):
53 config.debugLevel = "1"
54 elif o in ("-l", "--log"):
55 config.debugFile = v
56 elif o in ("-p", "--pid"):
57 config.pid = v
58 elif o in ("-o", "--option"):
59 var, setting = v.split("=", 2)
60 configOptions[var] = setting
61 elif o in ("-h", "--help"):
62 print "%s [options]" % sys.argv[0]
63 print " -h print this help"
64 print " -b daemonize/background transport"
65 print " -c <file> read configuration from this file"
66 print " -d print debugging output"
67 print " -D print extended debugging output"
68 print " -g print garbage collection output"
69 print " -t print debugging only on traceback"
70 print " -l <file> write debugging output to file"
71 print " -p <file> write process ID to file"
72 print " -o <var>=<setting> set config var to setting"
73 sys.exit(0)
74
75 xmlconfig.reloadConfig(configFile, configOptions)
76
77 if config.reactor:
78 # They picked their own reactor. Lets install it.
79 del sys.modules["twisted.internet.reactor"]
80 if config.reactor == "epoll":
81 from twisted.internet import epollreactor
82 epollreactor.install()
83 elif config.reactor == "poll":
84 from twisted.internet import pollreactor
85 pollreactor.install()
86 elif config.reactor == "kqueue":
87 from twisted.internet import kqreactor
88 kqreactor.install()
89 elif len(config.reactor) > 0:
90 print "Unknown reactor: ", config.reactor, ". Using select(), reactor."
91
92
93 from twisted.internet import reactor, task
94 from twisted.internet.defer import Deferred
95 from tlib.xmlw import Element, jid, component
96 from debug import LogEvent, INFO, WARN, ERROR
97
98 import debug
99 import svninfo
100 import utils
101 import xdb
102 import avatar
103 import session
104 import jabw
105 import disco
106 import register
107 import misciq
108 import ft
109 import lang
110 import legacy
111 import housekeep
112
113
114
115 class PyTransport(component.Service):
116 def __init__(self):
117 LogEvent(INFO)
118 try:
119 LogEvent(INFO, msg="SVN r" + str(svninfo.getSVNVersion()))
120 except:
121 pass
122
123 # Discovery, as well as some builtin features
124 self.discovery = disco.ServerDiscovery(self)
125 self.discovery.addIdentity("gateway", legacy.id, config.discoName, config.jid)
126 self.discovery.addIdentity("conference", "text", config.discoName + " Chatrooms", config.jid)
127 self.discovery.addFeature(disco.XCONFERENCE, None, config.jid) # So that clients know you can create groupchat rooms on the server
128 self.discovery.addFeature("jabber:iq:conference", None, config.jid) # We don't actually support this, but Psi has a bug where it looks for this instead of the above
129 self.discovery.addIdentity("client", "pc", "MSN Messenger", "USER")
130
131 self.xdb = xdb.XDB(config.jid, legacy.mangle)
132 self.avatarCache = avatar.AvatarCache()
133 self.registermanager = register.RegisterManager(self)
134 self.gatewayTranslator = misciq.GatewayTranslator(self)
135 self.versionTeller = misciq.VersionTeller(self)
136 self.pingService = misciq.PingService(self)
137 self.adHocCommands = misciq.AdHocCommands(self)
138 self.vCardFactory = misciq.VCardFactory(self)
139 self.iqAvatarFactor = misciq.IqAvatarFactory(self)
140 self.connectUsers = misciq.ConnectUsers(self)
141 if config.ftJabberPort:
142 self.ftSOCKS5Receive = ft.Proxy65(int(config.ftJabberPort))
143 self.ftSOCKS5Send = misciq.Socks5FileTransfer(self)
144 if config.ftOOBPort:
145 self.ftOOBReceive = ft.FileTransferOOBReceive(int(config.ftOOBPort))
146 self.ftOOBSend = misciq.FileTransferOOBSend(self)
147 self.statistics = misciq.Statistics(self)
148 self.startTime = int(time.time())
149
150 self.xmlstream = None
151 self.sessions = {}
152
153 # Groupchat ID handling
154 self.lastID = 0
155 self.reservedIDs = []
156
157 # Message IDs
158 self.messageID = 0
159
160 self.loopTask = task.LoopingCall(self.loopFunc)
161 self.loopTask.start(60.0)
162
163 def removeMe(self):
164 LogEvent(INFO)
165 for session in self.sessions.copy():
166 self.sessions[session].removeMe()
167
168 def makeMessageID(self):
169 self.messageID += 1
170 return str(self.messageID)
171
172 def makeID(self):
173 newID = "r" + str(self.lastID)
174 self.lastID += 1
175 if self.reservedIDs.count(newID) > 0:
176 # Ack, it's already used.. Try again
177 return self.makeID()
178 else:
179 return newID
180
181 def reserveID(self, ID):
182 self.reservedIDs.append(ID)
183
184 def loopFunc(self):
185 numsessions = len(self.sessions)
186
187 #if config.debugOn and numsessions > 0:
188 # print "Sessions:"
189 # for key in self.sessions:
190 # print "\t" + self.sessions[key].jabberID
191
192 self.statistics.stats["Uptime"] = int(time.time()) - self.startTime
193 self.statistics.stats["OnlineUsers"] = numsessions
194 legacy.updateStats(self.statistics)
195 if numsessions > 0:
196 oldDict = self.sessions.copy()
197 self.sessions = {}
198 for key in oldDict:
199 s = oldDict[key]
200 if not s.alive:
201 LogEvent(WARN, "", "Ghost session found.")
202 # Don't add it to the new dictionary. Effectively removing it
203 else:
204 self.sessions[key] = s
205
206 def componentConnected(self, xmlstream):
207 LogEvent(INFO)
208 self.xmlstream = xmlstream
209 self.xmlstream.addObserver("/iq", self.discovery.onIq)
210 self.xmlstream.addObserver("/presence", self.onPresence)
211 self.xmlstream.addObserver("/message", self.onMessage)
212 self.xmlstream.addObserver("/route", self.onRouteMessage)
213 if config.useXCP:
214 pres = Element((None, "presence"))
215 pres.attributes["to"] = "presence@-internal"
216 pres.attributes["from"] = config.compjid
217 x = pres.addElement("x")
218 x.attributes["xmlns"] = "http://www.jabber.com/schemas/component-presence.xsd"
219 x.attributes["xmlns:config"] = "http://www.jabber.com/config"
220 x.attributes["config:version"] = "1"
221 x.attributes["protocol-version"] = "1.0"
222 x.attributes["config-ns"] = legacy.url + "/component"
223 self.send(pres)
224
225 def componentDisconnected(self):
226 LogEvent(INFO)
227 self.xmlstream = None
228
229 def onRouteMessage(self, el):
230 for child in el.elements():
231 if child.name == "message":
232 self.onMessage(child)
233 elif child.name == "presence":
234 # Ignore any presence broadcasts about other XCP components
235 if child.getAttribute("to") and child.getAttribute("to").find("@-internal") > 0: return
236 self.onPresence(child)
237 elif child.name == "iq":
238 self.discovery.onIq(child)
239
240 def onMessage(self, el):
241 fro = el.getAttribute("from")
242 try:
243 froj = jid.intern(fro)
244 except Exception, e:
245 LogEvent(WARN, "", "Failed stringprep.")
246 return
247 mtype = el.getAttribute("type")
248 s = self.sessions.get(froj.userhost(), None)
249 if mtype == "error" and s:
250 LogEvent(INFO, s.jabberID, "Removing session because of message type=error")
251 s.removeMe()
252 elif s:
253 s.onMessage(el)
254 elif mtype != "error":
255 to = el.getAttribute("to")
256 ulang = utils.getLang(el)
257 body = None
258 for child in el.elements():
259 if child.name == "body":
260 body = child.__str__()
261 LogEvent(INFO, "", "Sending error response to a message outside of session.")
262 jabw.sendErrorMessage(self, fro, to, "auth", "not-authorized", lang.get(ulang).notLoggedIn, body)
263 jabw.sendPresence(self, fro, to, ptype="unavailable")
264
265 def onPresence(self, el):
266 fro = el.getAttribute("from")
267 to = el.getAttribute("to")
268 try:
269 froj = jid.intern(fro)
270 toj = jid.intern(to)
271 except Exception, e:
272 LogEvent(WARN, "", "Failed stringprep.")
273 return
274
275 ptype = el.getAttribute("type")
276 s = self.sessions.get(froj.userhost())
277 if ptype == "error" and s:
278 LogEvent(INFO, s.jabberID, "Removing session because of message type=error")
279 s.removeMe()
280 elif s:
281 s.onPresence(el)
282 else:
283 ulang = utils.getLang(el)
284 ptype = el.getAttribute("type")
285 if to.find('@') < 0:
286 # If the presence packet is to the transport (not a user) and there isn't already a session
287 if not el.getAttribute("type"): # Don't create a session unless they're sending available presence
288 LogEvent(INFO, "", "Attempting to create a new session.")
289 s = session.makeSession(self, froj.userhost(), ulang)
290 if s:
291 self.statistics.stats["TotalUsers"] += 1
292 self.sessions[froj.userhost()] = s
293 LogEvent(INFO, "", "New session created.")
294 # Send the first presence
295 s.onPresence(el)
296 else:
297 LogEvent(INFO, "", "Failed to create session")
298 jabw.sendMessage(self, to=froj.userhost(), fro=config.jid, body=lang.get(ulang).notRegistered)
299
300 elif el.getAttribute("type") != "error":
301 LogEvent(INFO, "", "Sending unavailable presence to non-logged in user.")
302 jabw.sendPresence(self, fro, to, ptype="unavailable")
303 return
304
305 elif ptype and (ptype.startswith("subscribe") or ptype.startswith("unsubscribe")):
306 # They haven't logged in, and are trying to change subscription to a user
307 # Lets log them in and then do it
308 LogEvent(INFO, "", "Attempting to create a session to do subscription stuff.")
309 s = session.makeSession(self, froj.userhost(), ulang)
310 if s:
311 self.sessions[froj.userhost()] = s
312 LogEvent(INFO, "", "New session created.")
313 # Tell the session there's a new resource
314 s.handleResourcePresence(froj.userhost(), froj.resource, toj.userhost(), toj.resource, 0, None, None, None)
315 # Send this subscription
316 s.onPresence(el)
317
318
319 class App:
320 def __init__(self):
321 # Check for any other instances
322 if config.pid and os.name != "posix":
323 config.pid = ""
324 if config.pid:
325 twistd.checkPID(config.pid)
326
327 # Do any auto-update stuff
328 housekeep.init()
329
330 # Daemonise the process and write the PID file
331 if config.background and os.name == "posix":
332 twistd.daemonize()
333 if config.pid:
334 self.writePID()
335
336 # Initialise debugging, and do the other half of SIGHUPstuff
337 debug.reloadConfig()
338 legacy.reloadConfig()
339
340 # Start the service
341 jid = config.jid
342 if config.useXCP and config.compjid:
343 jid = config.compjid
344 self.c = component.buildServiceManager(jid, config.secret, "tcp:%s:%s" % (config.mainServer, config.port))
345 self.transportSvc = PyTransport()
346 self.transportSvc.setServiceParent(self.c)
347 self.c.startService()
348 reactor.addSystemEventTrigger('before', 'shutdown', self.shuttingDown)
349
350 def writePID(self):
351 # Create a PID file
352 pid = str(os.getpid())
353 pf = open(config.pid, "w")
354 pf.write("%s\n" % pid)
355 pf.close()
356
357 def shuttingDown(self):
358 self.transportSvc.removeMe()
359 # Keep the transport running for another 3 seconds
360 def cb(ignored=None):
361 if config.pid:
362 twistd.removePID(config.pid)
363 d = Deferred()
364 d.addCallback(cb)
365 reactor.callLater(3.0, d.callback, None)
366 return d
367
368
369
370 def SIGHUPstuff(*args):
371 global configFile, configOptions
372 xmlconfig.reloadConfig(configFile, configOptions)
373 if config.pid and os.name != "posix":
374 config.pid = ""
375 debug.reloadConfig()
376 legacy.reloadConfig()
377
378 if os.name == "posix":
379 import signal
380 # Set SIGHUP to reload the config file & close & open debug file
381 signal.signal(signal.SIGHUP, SIGHUPstuff)
382 # Load some scripts for PID and daemonising
383 from twisted.scripts import twistd
384
385
386 def main():
387 # Create the application
388 app = App()
389 reactor.run()
390
391 if __name__ == "__main__":
392 main()
393