]> code.delx.au - pymsnt/blob - src/tlib/msn/test_msn.py
Basic tests for switchboard functionality
[pymsnt] / src / tlib / msn / test_msn.py
1 # Copyright (c) 2001-2005 Twisted Matrix Laboratories.
2 # Copyright (c) 2005 James Bunton
3 # See LICENSE for details.
4
5 """
6 Test cases for msn.
7 """
8
9 # Twisted imports
10 from twisted.protocols import loopback
11 from twisted.protocols.basic import LineReceiver
12 from twisted.internet.defer import Deferred
13 from twisted.internet import reactor
14 from twisted.trial import unittest
15
16 # System imports
17 import StringIO, sys, urllib
18
19 import msn
20 import msnft
21
22
23 def printError(f):
24 print f
25
26 class StringIOWithoutClosing(StringIO.StringIO):
27 disconnecting = 0
28 def close(self): pass
29 def loseConnection(self): pass
30
31
32 ##################
33 # Passport tests #
34 ##################
35
36 class PassportTests(unittest.TestCase):
37
38 def setUp(self):
39 self.result = []
40 self.deferred = Deferred()
41 self.deferred.addCallback(lambda r: self.result.append(r))
42 self.deferred.addErrback(printError)
43
44 def testNexus(self):
45 protocol = msn.PassportNexus(self.deferred, 'https://foobar.com/somepage.quux')
46 headers = {
47 'Content-Length' : '0',
48 'Content-Type' : 'text/html',
49 'PassportURLs' : 'DARealm=Passport.Net,DALogin=login.myserver.com/,DAReg=reg.myserver.com'
50 }
51 transport = StringIOWithoutClosing()
52 protocol.makeConnection(transport)
53 protocol.dataReceived('HTTP/1.0 200 OK\r\n')
54 for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
55 protocol.dataReceived('\r\n')
56 self.failUnless(self.result[0] == "https://login.myserver.com/")
57
58 def _doLoginTest(self, response, headers):
59 protocol = msn.PassportLogin(self.deferred,'foo@foo.com','testpass','https://foo.com/', 'a')
60 protocol.makeConnection(StringIOWithoutClosing())
61 protocol.dataReceived(response)
62 for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
63 protocol.dataReceived('\r\n')
64
65 def testPassportLoginSuccess(self):
66 headers = {
67 'Content-Length' : '0',
68 'Content-Type' : 'text/html',
69 'Authentication-Info' : "Passport1.4 da-status=success,tname=MSPAuth," +
70 "tname=MSPProf,tname=MSPSec,from-PP='somekey'," +
71 "ru=http://messenger.msn.com"
72 }
73 self._doLoginTest('HTTP/1.1 200 OK\r\n', headers)
74 self.failUnless(self.result[0] == (msn.LOGIN_SUCCESS, 'somekey'))
75
76 def testPassportLoginFailure(self):
77 headers = {
78 'Content-Type' : 'text/html',
79 'WWW-Authenticate' : 'Passport1.4 da-status=failed,' +
80 'srealm=Passport.NET,ts=-3,prompt,cburl=http://host.com,' +
81 'cbtxt=the%20error%20message'
82 }
83 self._doLoginTest('HTTP/1.1 401 Unauthorized\r\n', headers)
84 self.failUnless(self.result[0] == (msn.LOGIN_FAILURE, 'the error message'))
85
86 def testPassportLoginRedirect(self):
87 headers = {
88 'Content-Type' : 'text/html',
89 'Authentication-Info' : 'Passport1.4 da-status=redir',
90 'Location' : 'https://newlogin.host.com/'
91 }
92 self._doLoginTest('HTTP/1.1 302 Found\r\n', headers)
93 self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a'))
94
95
96
97 ######################
98 # Notification tests #
99 ######################
100
101 class DummyNotificationClient(msn.NotificationClient):
102 def loggedIn(self, userHandle, verified):
103 if userHandle == 'foo@bar.com' and verified:
104 self.state = 'LOGIN'
105
106 def gotProfile(self, message):
107 self.state = 'PROFILE'
108
109 def gotContactStatus(self, code, userHandle, screenName):
110 if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
111 c = self.factory.contacts.getContact(userHandle)
112 if c.caps & msn.MSNContact.MSNC1 and c.msnobj:
113 self.state = 'INITSTATUS'
114
115 def contactStatusChanged(self, code, userHandle, screenName):
116 if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
117 self.state = 'NEWSTATUS'
118
119 def contactAvatarChanged(self, userHandle, hash):
120 if userHandle == "foo@bar.com" and hash == "trC8SlFx2sWQxZMIBAWSEnXc8oQ=":
121 self.state = 'NEWAVATAR'
122 elif self.state == 'NEWAVATAR' and hash == "":
123 self.state = 'AVATARGONE'
124
125 def contactPersonalChanged(self, userHandle, personal):
126 if userHandle == 'foo@bar.com' and personal == 'My Personal Message':
127 self.state = 'GOTPERSONAL'
128 elif userHandle == 'foo@bar.com' and personal == '':
129 self.state = 'PERSONALGONE'
130
131 def contactOffline(self, userHandle):
132 if userHandle == "foo@bar.com": self.state = 'OFFLINE'
133
134 def statusChanged(self, code):
135 if code == msn.STATUS_HIDDEN: self.state = 'MYSTATUS'
136
137 def listSynchronized(self, *args):
138 self.state = 'GOTLIST'
139
140 def gotPhoneNumber(self, userHandle, phoneType, number):
141 self.state = 'GOTPHONE'
142
143 def userRemovedMe(self, userHandle):
144 c = self.factory.contacts.getContact(userHandle)
145 if not c: self.state = 'USERREMOVEDME'
146
147 def userAddedMe(self, userGuid, userHandle, screenName):
148 c = self.factory.contacts.getContact(userHandle)
149 if c and (c.lists | msn.PENDING_LIST) and (screenName == 'Screen Name'):
150 self.state = 'USERADDEDME'
151
152 def gotSwitchboardInvitation(self, sessionID, host, port, key, userHandle, screenName):
153 if sessionID == 1234 and \
154 host == '192.168.1.1' and \
155 port == 1863 and \
156 key == '123.456' and \
157 userHandle == 'foo@foo.com' and \
158 screenName == 'Screen Name':
159 self.state = 'SBINVITED'
160
161 def gotMSNAlert(self, body, action, subscr):
162 self.state = 'NOTIFICATION'
163
164 def gotInitialEmailNotification(self, inboxunread, foldersunread):
165 if inboxunread == 1 and foldersunread == 0:
166 self.state = 'INITEMAIL1'
167 else:
168 self.state = 'INITEMAIL2'
169
170 def gotRealtimeEmailNotification(self, mailfrom, fromaddr, subject):
171 if mailfrom == 'Some Person' and fromaddr == 'example@passport.com' and subject == 'newsubject':
172 self.state = 'REALTIMEEMAIL'
173
174 class NotificationTests(unittest.TestCase):
175 """ testing the various events in NotificationClient """
176
177 def setUp(self):
178 self.client = DummyNotificationClient()
179 self.client.factory = msn.NotificationFactory()
180 self.client.state = 'START'
181
182 def tearDown(self):
183 self.client = None
184
185 def testLogin(self):
186 self.client.lineReceived('USR 1 OK foo@bar.com 1')
187 self.failUnless((self.client.state == 'LOGIN'), 'Failed to detect successful login')
188
189 def testProfile(self):
190 m = 'MSG Hotmail Hotmail 353\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsprofile; charset=UTF-8\r\n'
191 m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n'
192 m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n'
193 m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n'
194 self.client.dataReceived(m)
195 self.failUnless((self.client.state == 'PROFILE'), 'Failed to detect initial profile')
196
197 def testInitialEmailNotification(self):
198 m = 'MIME-Version: 1.0\r\nContent-Type: text/x-msmsgsinitialemailnotification; charset=UTF-8\r\n'
199 m += '\r\nInbox-Unread: 1\r\nFolders-Unread: 0\r\nInbox-URL: /cgi-bin/HoTMaiL\r\n'
200 m += 'Folders-URL: /cgi-bin/folders\r\nPost-URL: http://www.hotmail.com\r\n\r\n'
201 m = 'MSG Hotmail Hotmail %s\r\n' % (str(len(m))) + m
202 self.client.dataReceived(m)
203 self.failUnless((self.client.state == 'INITEMAIL1'), 'Failed to detect initial email notification')
204
205 def testNoInitialEmailNotification(self):
206 m = 'MIME-Version: 1.0\r\nContent-Type: text/x-msmsgsinitialemailnotification; charset=UTF-8\r\n'
207 m += '\r\nInbox-Unread: 0\r\nFolders-Unread: 0\r\nInbox-URL: /cgi-bin/HoTMaiL\r\n'
208 m += 'Folders-URL: /cgi-bin/folders\r\nPost-URL: http://www.hotmail.com\r\n\r\n'
209 m = 'MSG Hotmail Hotmail %s\r\n' % (str(len(m))) + m
210 self.client.dataReceived(m)
211 self.failUnless((self.client.state != 'INITEMAIL2'), 'Detected initial email notification when I should not have')
212
213 def testRealtimeEmailNotification(self):
214 m = 'MSG Hotmail Hotmail 356\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsemailnotification; charset=UTF-8\r\n'
215 m += '\r\nFrom: Some Person\r\nMessage-URL: /cgi-bin/getmsg?msg=MSG1050451140.21&start=2310&len=2059&curmbox=ACTIVE\r\n'
216 m += 'Post-URL: https://loginnet.passport.com/ppsecure/md5auth.srf?lc=1038\r\n'
217 m += 'Subject: =?"us-ascii"?Q?newsubject?=\r\nDest-Folder: ACTIVE\r\nFrom-Addr: example@passport.com\r\nid: 2\r\n'
218 self.client.dataReceived(m)
219 self.failUnless((self.client.state == 'REALTIMEEMAIL'), 'Failed to detect realtime email notification')
220
221 def testMSNAlert(self):
222 m = '<NOTIFICATION ver="2" id="1342902633" siteid="199999999" siteurl="http://alerts.msn.com">\r\n'
223 m += '<TO pid="0x0006BFFD:0x8582C0FB" name="example@passport.com"/>\r\n'
224 m += '<MSG pri="1" id="1342902633">\r\n'
225 m += '<SUBSCR url="http://g.msn.com/3ALMSNTRACKING/199999999ToastChange?http://alerts.msn.com/Alerts/MyAlerts.aspx?strela=1"/>\r\n'
226 m += '<ACTION url="http://g.msn.com/3ALMSNTRACKING/199999999ToastAction?http://alerts.msn.com/Alerts/MyAlerts.aspx?strela=1"/>\r\n'
227 m += '<BODY lang="3076" icon="">\r\n'
228 m += '<TEXT>utf8-encoded text</TEXT></BODY></MSG>\r\n'
229 m += '</NOTIFICATION>\r\n'
230 cmd = 'NOT %s\r\n' % str(len(m))
231 m = cmd + m
232 # Whee, lots of fun to test that lineReceived & dataReceived work well with input coming
233 # in in (fairly) arbitrary chunks.
234 map(self.client.dataReceived, [x+'\r\n' for x in m.split('\r\n')[:-1]])
235 self.failUnless((self.client.state == 'NOTIFICATION'), 'Failed to detect MSN Alert message')
236
237 def testListSync(self):
238 self.client.makeConnection(StringIOWithoutClosing())
239 msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 1)
240 lines = [
241 "SYN %s 2005-04-23T18:57:44.8130000-07:00 2005-04-23T18:57:54.2070000-07:00 1 3" % self.client.currentID,
242 "GTC A",
243 "BLP AL",
244 "LSG Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy",
245 "LSG Other%20Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz",
246 "LSG More%20Other%20Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyya",
247 "LST N=userHandle@email.com F=Some%20Name C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx 13 yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy,yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz",
248 ]
249 map(self.client.lineReceived, lines)
250 contacts = self.client.factory.contacts
251 contact = contacts.getContact('userHandle@email.com')
252 #self.failUnless(contacts.version == 100, "Invalid contact list version")
253 self.failUnless(contact.screenName == 'Some Name', "Invalid screen-name for user")
254 self.failUnless(contacts.groups == {'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy': 'Friends', \
255 'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz': 'Other Friends', \
256 'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyya': 'More Other Friends'} \
257 , "Did not get proper group list")
258 self.failUnless(contact.groups == ['yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy', \
259 'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz'] and \
260 contact.lists == 13, "Invalid contact list/group info")
261 self.failUnless(self.client.state == 'GOTLIST', "Failed to call list sync handler")
262 self.client.logOut()
263
264 def testStatus(self):
265 # Set up the contact list
266 self.client.makeConnection(StringIOWithoutClosing())
267 msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 1)
268 lines = [
269 "SYN %s 2005-04-23T18:57:44.8130000-07:00 2005-04-23T18:57:54.2070000-07:00 1 0" % self.client.currentID,
270 "GTC A",
271 "BLP AL",
272 "LST N=foo@bar.com F=Some%20Name C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx 13 yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy,yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz",
273 ]
274 map(self.client.lineReceived, lines)
275 # Now test!
276 msnobj = urllib.quote('<msnobj Creator="buddy1@hotmail.com" Size="24539" Type="3" Location="TFR2C.tmp" Friendly="AAA=" SHA1D="trC8SlFx2sWQxZMIBAWSEnXc8oQ=" SHA1C="U32o6bosZzluJq82eAtMpx5dIEI="/>')
277 t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 268435456 ' + msnobj, 'INITSTATUS', 'Failed to detect initial status report'),
278 ('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'),
279 ('NLN AWY foo@bar.com Test%20Name 0 ' + msnobj, 'NEWAVATAR', 'Failed to detect contact avatar change'),
280 ('NLN AWY foo@bar.com Test%20Name 0', 'AVATARGONE', 'Failed to detect contact avatar disappearing'),
281 ('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'),
282 ('CHG 1 HDN 0 ' + msnobj, 'MYSTATUS', 'Failed to detect my status changing')]
283 for i in t:
284 self.client.lineReceived(i[0])
285 self.failUnless((self.client.state == i[1]), i[2])
286
287 # Test UBX
288 self.client.dataReceived('UBX foo@bar.com 72\r\n<Data><PSM>My Personal Message</PSM><CurrentMedia></CurrentMedia></Data>')
289 self.failUnless((self.client.state == 'GOTPERSONAL'), 'Failed to detect new personal message')
290 self.client.dataReceived('UBX foo@bar.com 0\r\n')
291 self.failUnless((self.client.state == 'PERSONALGONE'), 'Failed to detect personal message disappearing')
292 self.client.logOut()
293
294 def testAsyncPhoneChange(self):
295 c = msn.MSNContact(userHandle='userHandle@email.com')
296 self.client.factory.contacts = msn.MSNContactList()
297 self.client.factory.contacts.addContact(c)
298 self.client.makeConnection(StringIOWithoutClosing())
299 self.client.lineReceived("BPR 101 userHandle@email.com PHH 123%20456")
300 c = self.client.factory.contacts.getContact('userHandle@email.com')
301 self.failUnless(self.client.state == 'GOTPHONE', "Did not fire phone change callback")
302 self.failUnless(c.homePhone == '123 456', "Did not update the contact's phone number")
303 self.failUnless(self.client.factory.contacts.version == 101, "Did not update list version")
304
305 def testLateBPR(self):
306 """
307 This test makes sure that if a BPR response that was meant
308 to be part of a SYN response (but came after the last LST)
309 is received, the correct contact is updated and all is well
310 """
311 self.client.makeConnection(StringIOWithoutClosing())
312 msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 1)
313 lines = [
314 "SYN %s 2005-04-23T18:57:44.8130000-07:00 2005-04-23T18:57:54.2070000-07:00 1 0" % self.client.currentID,
315 "GTC A",
316 "BLP AL",
317 "LSG Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy",
318 "LST N=userHandle@email.com F=Some%20Name C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx 13 yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy",
319 "BPR PHH 123%20456"
320 ]
321 map(self.client.lineReceived, lines)
322 contact = self.client.factory.contacts.getContact('userHandle@email.com')
323 self.failUnless(contact.homePhone == '123 456', "Did not update contact's phone number")
324 self.client.logOut()
325
326 def testUserRemovedMe(self):
327 self.client.factory.contacts = msn.MSNContactList()
328 contact = msn.MSNContact(userHandle='foo@foo.com')
329 contact.addToList(msn.REVERSE_LIST)
330 self.client.factory.contacts.addContact(contact)
331 self.client.lineReceived("REM 0 RL foo@foo.com")
332 self.failUnless(self.client.state == 'USERREMOVEDME', "Failed to remove user from reverse list")
333
334 def testUserAddedMe(self):
335 self.client.factory.contacts = msn.MSNContactList()
336 self.client.lineReceived("ADC 0 RL N=foo@foo.com F=Screen%20Name")
337 self.failUnless(self.client.state == 'USERADDEDME', "Failed to add user to reverse lise")
338
339 def testAsyncSwitchboardInvitation(self):
340 self.client.lineReceived("RNG 1234 192.168.1.1:1863 CKI 123.456 foo@foo.com Screen%20Name")
341 self.failUnless((self.client.state == 'SBINVITED'), 'Failed to detect switchboard invitation')
342
343
344 #######################################
345 # Notification with fake server tests #
346 #######################################
347
348 class FakeNotificationServer(msn.MSNEventBase):
349 def handle_CHG(self, params):
350 if len(params) < 4:
351 params.append('')
352 self.sendLine("CHG %s %s %s %s" % (params[0], params[1], params[2], params[3]))
353
354 def handle_BLP(self, params):
355 self.sendLine("BLP %s %s 100" % (params[0], params[1]))
356
357 def handle_ADC(self, params):
358 trid = params[0]
359 list = msn.listCodeToID[params[1].lower()]
360 if list == msn.FORWARD_LIST:
361 userHandle = ""
362 screenName = ""
363 userGuid = ""
364 groups = ""
365 for p in params[2:]:
366 if p[0] == 'N':
367 userHandle = p[2:]
368 elif p[0] == 'F':
369 screenName = p[2:]
370 elif p[0] == 'C':
371 userGuid = p[2:]
372 else:
373 groups = p
374 if userHandle and userGuid:
375 self.transport.loseConnection()
376 return
377 if userHandle:
378 if not screenName:
379 self.transport.loseConnection()
380 else:
381 self.sendLine("ADC %s FL N=%s F=%s C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx %s" % (trid, userHandle, screenName, groups))
382 return
383 if userGuid:
384 raise "NotImplementedError"
385 else:
386 if len(params) != 3:
387 self.transport.loseConnection()
388 if not params[2].startswith("N=") and params[2].count('@') == 1:
389 self.transport.loseConnection()
390 self.sendLine("ADC %s %s %s" % (params[0], params[1], params[2]))
391
392 def handle_REM(self, params):
393 if len(params) != 3:
394 self.transport.loseConnection()
395 return
396 try:
397 trid = int(params[0])
398 listType = msn.listCodeToID[params[1].lower()]
399 except:
400 self.transport.loseConnection()
401 if listType == msn.FORWARD_LIST and params[2].count('@') > 0:
402 self.transport.loseConnection()
403 elif listType != msn.FORWARD_LIST and params[2].count('@') != 1:
404 self.transport.loseConnection()
405 else:
406 self.sendLine("REM %s %s %s" % (params[0], params[1], params[2]))
407
408 def handle_PRP(self, params):
409 if len(params) != 3:
410 self.transport.loseConnection()
411 if params[1] == "MFN":
412 self.sendLine("PRP %s MFN %s" % (params[0], params[2]))
413 else:
414 # Only friendly names are implemented
415 self.transport.loseConnection()
416
417 def handle_UUX(self, params):
418 if len(params) != 2:
419 self.transport.loseConnection()
420 return
421 l = int(params[1])
422 if l > 0:
423 self.currentMessage = msn.MSNMessage(length=l, userHandle=params[0], screenName="UUX", specialMessage=True)
424 self.setRawMode()
425 else:
426 self.sendLine("UUX %s 0" % params[0])
427
428 def checkMessage(self, message):
429 if message.specialMessage:
430 if message.screenName == "UUX":
431 self.sendLine("UUX %s 0" % message.userHandle)
432 return 0
433 return 1
434
435 def handle_XFR(self, params):
436 if len(params) != 2:
437 self.transport.loseConnection()
438 return
439 if params[1] != "SB":
440 self.transport.loseConnection()
441 return
442 self.sendLine("XFR %s SB 129.129.129.129:1234 CKI SomeSecret" % params[0])
443
444
445
446 class FakeNotificationClient(msn.NotificationClient):
447 def doStatusChange(self):
448 def testcb((status,)):
449 if status == msn.STATUS_AWAY:
450 self.test = 'PASS'
451 self.transport.loseConnection()
452 d = self.changeStatus(msn.STATUS_AWAY)
453 d.addCallback(testcb)
454
455 def doPrivacyMode(self):
456 def testcb((priv,)):
457 if priv.upper() == 'AL':
458 self.test = 'PASS'
459 self.transport.loseConnection()
460 d = self.setPrivacyMode(True)
461 d.addCallback(testcb)
462
463 def doAddContactFL(self):
464 def testcb((listType, userGuid, userHandle, screenName)):
465 if listType & msn.FORWARD_LIST and \
466 userGuid == "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" and \
467 userHandle == "foo@bar.com" and \
468 screenName == "foo@bar.com":
469 self.test = 'PASS'
470 self.transport.loseConnection()
471 d = self.addContact(msn.FORWARD_LIST, "foo@bar.com")
472 d.addCallback(testcb)
473
474 def doAddContactAL(self):
475 def testcb((listType, userGuid, userHandle, screenName)):
476 if listType & msn.ALLOW_LIST and \
477 userHandle == "foo@bar.com" and \
478 not userGuid and not screenName:
479 self.test = 'PASS'
480 self.transport.loseConnection()
481 d = self.addContact(msn.ALLOW_LIST, "foo@bar.com")
482 d.addCallback(testcb)
483
484 def doRemContactFL(self):
485 def testcb((listType, userHandle, groupID)):
486 if listType & msn.FORWARD_LIST and \
487 userHandle == "foo@bar.com":
488 self.test = 'PASS'
489 self.transport.loseConnection()
490 d = self.remContact(msn.FORWARD_LIST, "foo@bar.com")
491 d.addCallback(testcb)
492
493 def doRemContactAL(self):
494 def testcb((listType, userHandle, groupID)):
495 if listType & msn.ALLOW_LIST and \
496 userHandle == "foo@bar.com":
497 self.test = 'PASS'
498 self.transport.loseConnection()
499 d = self.remContact(msn.ALLOW_LIST, "foo@bar.com")
500 d.addCallback(testcb)
501
502 def doScreenNameChange(self):
503 def testcb((screenName,)):
504 if screenName == "Some new name":
505 self.test = 'PASS'
506 self.transport.loseConnection()
507 d = self.changeScreenName("Some new name")
508 d.addCallback(testcb)
509
510 def doPersonalChange(self, personal):
511 def testcb((checkPersonal,)):
512 if checkPersonal == personal:
513 self.test = 'PASS'
514 self.transport.loseConnection()
515 d = self.changePersonalMessage(personal)
516 d.addCallback(testcb)
517
518 def doAvatarChange(self, data):
519 def testcb(ignored):
520 self.test = 'PASS'
521 self.transport.loseConnection()
522 d = self.changeAvatar(data, True)
523 d.addCallback(testcb)
524
525 def doRequestSwitchboard(self):
526 def testcb((host, port, key)):
527 if host == "129.129.129.129" and port == 1234 and key == "SomeSecret":
528 self.test = 'PASS'
529 self.transport.loseConnection()
530 d = self.requestSwitchboardServer()
531 d.addCallback(testcb)
532
533 class FakeServerNotificationTests(unittest.TestCase):
534 """ tests the NotificationClient against a fake server. """
535
536 def setUp(self):
537 self.client = FakeNotificationClient()
538 self.client.factory = msn.NotificationFactory()
539 self.client.test = 'FAIL'
540 self.server = FakeNotificationServer()
541
542 def tearDown(self):
543 pass
544
545 def testChangeStatus(self):
546 reactor.callLater(0, self.client.doStatusChange)
547 loopback.loopback(self.client, self.server)
548 self.failUnless((self.client.test == 'PASS'), 'Failed to change status properly')
549
550 def testSetPrivacyMode(self):
551 self.client.factory.contacts = msn.MSNContactList()
552 reactor.callLater(0, self.client.doPrivacyMode)
553 loopback.loopback(self.client, self.server)
554 self.failUnless((self.client.test == 'PASS'), 'Failed to change privacy mode')
555
556 def testSyncList(self):
557 reactor.callLater(0, self.client.doSyncList)
558 loopback.loopback(self.client, self.server)
559 self.failUnless((self.client.test == 'PASS'), 'Failed to synchronise list')
560 testSyncList.skip = "Will do after list versions."
561
562 def testAddContactFL(self):
563 self.client.factory.contacts = msn.MSNContactList()
564 reactor.callLater(0, self.client.doAddContactFL)
565 loopback.loopback(self.client, self.server)
566 self.failUnless((self.client.test == 'PASS'), 'Failed to add contact to forward list')
567
568 def testAddContactAL(self):
569 self.client.factory.contacts = msn.MSNContactList()
570 reactor.callLater(0, self.client.doAddContactAL)
571 loopback.loopback(self.client, self.server)
572 self.failUnless((self.client.test == 'PASS'), 'Failed to add contact to allow list')
573
574 def testRemContactFL(self):
575 self.client.factory.contacts = msn.MSNContactList()
576 self.client.factory.contacts.addContact(msn.MSNContact(userGuid="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", userHandle="foo@bar.com", screenName="Some guy", lists=msn.FORWARD_LIST))
577 reactor.callLater(0, self.client.doRemContactFL)
578 loopback.loopback(self.client, self.server)
579 self.failUnless((self.client.test == 'PASS'), 'Failed to remove contact from forward list')
580
581 def testRemContactAL(self):
582 self.client.factory.contacts = msn.MSNContactList()
583 self.client.factory.contacts.addContact(msn.MSNContact(userGuid="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", userHandle="foo@bar.com", screenName="Some guy", lists=msn.ALLOW_LIST))
584 reactor.callLater(0, self.client.doRemContactAL)
585 loopback.loopback(self.client, self.server)
586 self.failUnless((self.client.test == 'PASS'), 'Failed to remove contact from allow list')
587
588 def testChangedScreenName(self):
589 reactor.callLater(0, self.client.doScreenNameChange)
590 loopback.loopback(self.client, self.server)
591 self.failUnless((self.client.test == 'PASS'), 'Failed to change screen name properly')
592
593 def testChangePersonal1(self):
594 reactor.callLater(0, lambda: self.client.doPersonalChange("Some personal message"))
595 loopback.loopback(self.client, self.server)
596 self.failUnless((self.client.test == 'PASS'), 'Failed to change personal message properly')
597
598 def testChangePersonal2(self):
599 reactor.callLater(0, lambda: self.client.doPersonalChange(""))
600 loopback.loopback(self.client, self.server)
601 self.failUnless((self.client.test == 'PASS'), 'Failed to change personal message properly')
602
603 def testChangeAvatar(self):
604 reactor.callLater(0, lambda: self.client.doAvatarChange("DATADATADATADATA"))
605 loopback.loopback(self.client, self.server)
606 self.failUnless((self.client.test == 'PASS'), 'Failed to change avatar properly')
607
608 def testRequestSwitchboard(self):
609 reactor.callLater(0, self.client.doRequestSwitchboard)
610 loopback.loopback(self.client, self.server)
611 self.failUnless((self.client.test == 'PASS'), 'Failed to request switchboard')
612
613
614 #################################
615 # Notification challenges tests #
616 #################################
617
618 class DummyChallengeNotificationServer(msn.MSNEventBase):
619 def doChallenge(self, challenge, response):
620 self.state = 0
621 self.response = response
622 self.sendLine("CHL 0 " + challenge)
623
624 def checkMessage(self, message):
625 if message.message == self.response:
626 self.state = "PASS"
627 self.transport.loseConnection()
628 return 0
629
630 def handle_QRY(self, params):
631 self.state = 1
632 if len(params) == 3 and params[1] == "PROD0090YUAUV{2B" and params[2] == "32":
633 self.state = 2
634 self.currentMessage = msn.MSNMessage(length=32, userHandle="QRY", screenName="QRY", specialMessage=True)
635 self.setRawMode()
636 else:
637 self.transport.loseConnection()
638
639 class DummyChallengeNotificationClient(msn.NotificationClient):
640 def connectionMade(self):
641 msn.MSNEventBase.connectionMade(self)
642
643 def handle_CHL(self, params):
644 msn.NotificationClient.handle_CHL(self, params)
645 self.transport.loseConnection()
646
647
648 class NotificationChallengeTests(unittest.TestCase):
649 """ tests the responses to the CHLs the server sends """
650
651 def setUp(self):
652 self.client = DummyChallengeNotificationClient()
653 self.server = DummyChallengeNotificationServer()
654
655 def tearDown(self):
656 pass
657
658 def testChallenges(self):
659 challenges = [('13038318816579321232', 'b01c13020e374d4fa20abfad6981b7a9'),
660 ('23055170411503520698', 'ae906c3f2946d25e7da1b08b0b247659'),
661 ('37819769320541083311', 'db79d37dadd9031bef996893321da480'),
662 ('93662730714769834295', 'd619dfbb1414004d34d0628766636568')]
663 for challenge, response in challenges:
664 reactor.callLater(0, lambda: self.server.doChallenge(challenge, response))
665 loopback.loopback(self.client, self.server)
666 self.failUnless((self.server.state == 'PASS'), 'Incorrect challenge response.')
667
668
669 ###########################
670 # Notification ping tests #
671 ###########################
672
673 class DummyPingNotificationServer(LineReceiver):
674 def lineReceived(self, line):
675 if line.startswith("PNG") and self.good:
676 self.sendLine("QNG 50")
677
678 class DummyPingNotificationClient(msn.NotificationClient):
679 def connectionMade(self):
680 self.pingCheckerStart()
681
682 def sendLine(self, line):
683 msn.NotificationClient.sendLine(self, line)
684 self.count += 1
685 if self.count > 10:
686 self.transport.loseConnection() # But not for real, just to end the test
687
688 def connectionLost(self, reason):
689 if self.count <= 10:
690 self.state = 'DISCONNECTED'
691
692 class NotificationPingTests(unittest.TestCase):
693 """ tests pinging in the NotificationClient class """
694
695 def setUp(self):
696 msn.PINGSPEED = 0.1
697 self.client = DummyPingNotificationClient()
698 self.server = DummyPingNotificationServer()
699 self.client.state = 'CONNECTED'
700 self.client.count = 0
701
702 def tearDown(self):
703 msn.PINGSPEED = 50.0
704 self.client.logOut()
705
706 def testPingGood(self):
707 self.server.good = True
708 loopback.loopback(self.client, self.server)
709 self.failUnless((self.client.state == 'CONNECTED'), 'Should be connected.')
710
711 def testPingBad(self):
712 self.server.good = False
713 loopback.loopback(self.client, self.server)
714 self.failUnless((self.client.state == 'DISCONNECTED'), 'Should be disconnected.')
715
716
717
718
719 ###########################
720 # Switchboard basic tests #
721 ###########################
722
723 class DummySwitchboardServer(msn.MSNEventBase):
724 def handle_USR(self, params):
725 if len(params) != 3:
726 self.transport.loseConnection()
727 if params[1] == 'foo@bar.com' and params[2] == 'somekey':
728 self.sendLine("USR %s OK %s %s" % (params[0], params[1], params[1]))
729
730 def handle_ANS(self, params):
731 if len(params) != 4:
732 self.transport.loseConnection()
733 if params[1] == 'foo@bar.com' and params[2] == 'somekey' and params[3] == 'someSID':
734 self.sendLine("ANS %s OK" % params[0])
735
736 def handle_CAL(self, params):
737 if len(params) != 2:
738 self.transport.loseConnection()
739 if params[1] == 'friend@hotmail.com':
740 self.sendLine("CAL %s RINGING 1111122" % params[0])
741 else:
742 self.transport.loseConnection()
743
744 def checkMessage(self, message):
745 if message.message == 'Hi how are you today?':
746 self.sendLine("ACK " + message.userHandle) # Relies on TRID getting stored in userHandle trick
747 else:
748 self.transport.loseConnection()
749 return 0
750
751 class DummySwitchboardClient(msn.SwitchboardClient):
752 def loggedIn(self):
753 self.state = 'LOGGEDIN'
754 self.transport.loseConnection()
755
756 def gotChattingUsers(self, users):
757 if users == {'fred@hotmail.com': 'fred', 'jack@email.com': 'jack has a nickname!'}:
758 self.state = 'GOTCHATTINGUSERS'
759
760 def userJoined(self, userHandle, screenName):
761 if userHandle == "friend@hotmail.com" and screenName == "friend nickname":
762 self.state = 'USERJOINED'
763
764 def userLeft(self, userHandle):
765 if userHandle == "friend@hotmail.com":
766 self.state = 'USERLEFT'
767
768 def userTyping(self, message):
769 if message.userHandle == 'foo@bar.com':
770 self.state = 'USERTYPING'
771
772 def gotMessage(self, message):
773 if message.userHandle == 'friend@hotmail.com' and \
774 message.screenName == 'Friend Nickname' and \
775 message.message == 'Hello.':
776 self.state = 'GOTMESSAGE'
777
778 def doSendInvite(self):
779 def testcb((sid,)):
780 if sid == 1111122:
781 self.state = 'INVITESUCCESS'
782 self.transport.loseConnection()
783 d = self.inviteUser('friend@hotmail.com')
784 d.addCallback(testcb)
785
786 def doSendMessage(self):
787 def testcb(ignored):
788 self.state = 'MESSAGESUCCESS'
789 self.transport.loseConnection()
790 m = msn.MSNMessage()
791 m.setHeader("Content-Type", "text/plain; charset=UTF-8")
792 m.message = 'Hi how are you today?'
793 m.ack = msn.MSNMessage.MESSAGE_ACK
794 d = self.sendMessage(m)
795 d.addCallback(testcb)
796
797
798 class SwitchboardBasicTests(unittest.TestCase):
799 """ Tests basic functionality of switchboard sessions """
800 def setUp(self):
801 self.client = DummySwitchboardClient()
802 self.client.state = 'START'
803 self.client.userHandle = 'foo@bar.com'
804 self.client.key = 'somekey'
805 self.client.sessionID = 'someSID'
806 self.server = DummySwitchboardServer()
807
808 def tearDown(self):
809 pass
810
811 def _testSB(self, reply):
812 self.client.reply = reply
813 loopback.loopback(self.client, self.server)
814 self.failUnless((self.client.state == 'LOGGEDIN'), 'Failed to login with reply='+str(reply))
815
816 def testReply(self):
817 self._testSB(True)
818
819 def testAsync(self):
820 self._testSB(False)
821
822 def testChattingUsers(self):
823 lines = ["IRO 1 1 2 fred@hotmail.com fred",
824 "IRO 1 2 2 jack@email.com jack%20has%20a%20nickname%21"]
825 for line in lines:
826 self.client.lineReceived(line)
827 self.failUnless((self.client.state == 'GOTCHATTINGUSERS'), 'Failed to get chatting users')
828
829 def testUserJoined(self):
830 self.client.lineReceived("JOI friend@hotmail.com friend%20nickname")
831 self.failUnless((self.client.state == 'USERJOINED'), 'Failed to notice user joining')
832
833 def testUserLeft(self):
834 self.client.lineReceived("BYE friend@hotmail.com")
835 self.failUnless((self.client.state == 'USERLEFT'), 'Failed to notice user leaving')
836
837 def testTypingCheck(self):
838 m = 'MSG foo@bar.com Foo 80\r\n'
839 m += 'MIME-Version: 1.0\r\n'
840 m += 'Content-Type: text/x-msmsgscontrol\r\n'
841 m += 'TypingUser: foo@bar\r\n'
842 m += '\r\n\r\n'
843 self.client.dataReceived(m)
844 self.failUnless((self.client.state == 'USERTYPING'), 'Failed to detect typing notification')
845
846 def testGotMessage(self):
847 m = 'MSG friend@hotmail.com Friend%20Nickname 68\r\n'
848 m += 'MIME-Version: 1.0\r\n'
849 m += 'Content-Type: text/plain; charset=UTF-8\r\n'
850 m += '\r\nHello.'
851 self.client.dataReceived(m)
852 self.failUnless((self.client.state == 'GOTMESSAGE'), 'Failed to detect message')
853
854 def testInviteUser(self):
855 self.client.connectionMade = lambda: None
856 reactor.callLater(0, self.client.doSendInvite)
857 loopback.loopback(self.client, self.server)
858 self.failUnless((self.client.state == 'INVITESUCCESS'), 'Failed to invite user')
859
860 def testSendMessage(self):
861 self.client.connectionMade = lambda: None
862 reactor.callLater(0, self.client.doSendMessage)
863 loopback.loopback(self.client, self.server)
864 self.failUnless((self.client.state == 'MESSAGESUCCESS'), 'Failed to send message')
865
866
867
868
869 ################
870 # MSNFTP tests #
871 ################
872
873 class FileTransferTestCase(unittest.TestCase):
874 """ test FileSend against FileReceive """
875
876 def setUp(self):
877 self.input = StringIOWithoutClosing()
878 self.input.writelines(['a'] * 7000)
879 self.input.seek(0)
880 self.output = StringIOWithoutClosing()
881
882 def tearDown(self):
883 self.input = None
884 self.output = None
885
886 def testFileTransfer(self):
887 auth = 1234
888 sender = msnft.MSNFTP_FileSend(self.input)
889 sender.auth = auth
890 sender.fileSize = 7000
891 client = msnft.MSNFTP_FileReceive(auth, "foo@bar.com", self.output)
892 client.fileSize = 7000
893 loopback.loopback(sender, client)
894 self.failUnless((client.completed and sender.completed), "send failed to complete")
895 self.failUnless((self.input.getvalue() == self.output.getvalue()), "saved file does not match original")
896
897