]> code.delx.au - pymsnt/blob - src/tlib/msn/test_msn.py
NotificationClient now has tests for everything.
[pymsnt] / src / tlib / msn / test_msn.py
1 # Copyright (c) 2001-2005 Twisted Matrix Laboratories.
2 # Copyright (c) 2005 James Bunton
3 # See LICENSE for details.
4
5 """
6 Test cases for msn.
7 """
8
9 # Twisted imports
10 from twisted.protocols import loopback
11 from twisted.protocols.basic import LineReceiver
12 from twisted.internet.defer import Deferred
13 from twisted.internet import reactor
14 from twisted.trial import unittest
15
16 # System imports
17 import StringIO, sys, urllib
18
19 import msn
20
21
22 def printError(f):
23 print f
24
25 class StringIOWithoutClosing(StringIO.StringIO):
26 disconnecting = 0
27 def close(self): pass
28 def loseConnection(self): pass
29
30
31 ##################
32 # Passport tests #
33 ##################
34
35 class PassportTests(unittest.TestCase):
36
37 def setUp(self):
38 self.result = []
39 self.deferred = Deferred()
40 self.deferred.addCallback(lambda r: self.result.append(r))
41 self.deferred.addErrback(printError)
42
43 def testNexus(self):
44 protocol = msn.PassportNexus(self.deferred, 'https://foobar.com/somepage.quux')
45 headers = {
46 'Content-Length' : '0',
47 'Content-Type' : 'text/html',
48 'PassportURLs' : 'DARealm=Passport.Net,DALogin=login.myserver.com/,DAReg=reg.myserver.com'
49 }
50 transport = StringIOWithoutClosing()
51 protocol.makeConnection(transport)
52 protocol.dataReceived('HTTP/1.0 200 OK\r\n')
53 for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
54 protocol.dataReceived('\r\n')
55 self.failUnless(self.result[0] == "https://login.myserver.com/")
56
57 def _doLoginTest(self, response, headers):
58 protocol = msn.PassportLogin(self.deferred,'foo@foo.com','testpass','https://foo.com/', 'a')
59 protocol.makeConnection(StringIOWithoutClosing())
60 protocol.dataReceived(response)
61 for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
62 protocol.dataReceived('\r\n')
63
64 def testPassportLoginSuccess(self):
65 headers = {
66 'Content-Length' : '0',
67 'Content-Type' : 'text/html',
68 'Authentication-Info' : "Passport1.4 da-status=success,tname=MSPAuth," +
69 "tname=MSPProf,tname=MSPSec,from-PP='somekey'," +
70 "ru=http://messenger.msn.com"
71 }
72 self._doLoginTest('HTTP/1.1 200 OK\r\n', headers)
73 self.failUnless(self.result[0] == (msn.LOGIN_SUCCESS, 'somekey'))
74
75 def testPassportLoginFailure(self):
76 headers = {
77 'Content-Type' : 'text/html',
78 'WWW-Authenticate' : 'Passport1.4 da-status=failed,' +
79 'srealm=Passport.NET,ts=-3,prompt,cburl=http://host.com,' +
80 'cbtxt=the%20error%20message'
81 }
82 self._doLoginTest('HTTP/1.1 401 Unauthorized\r\n', headers)
83 self.failUnless(self.result[0] == (msn.LOGIN_FAILURE, 'the error message'))
84
85 def testPassportLoginRedirect(self):
86 headers = {
87 'Content-Type' : 'text/html',
88 'Authentication-Info' : 'Passport1.4 da-status=redir',
89 'Location' : 'https://newlogin.host.com/'
90 }
91 self._doLoginTest('HTTP/1.1 302 Found\r\n', headers)
92 self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a'))
93
94
95
96 ######################
97 # Notification tests #
98 ######################
99
100 class DummyNotificationClient(msn.NotificationClient):
101 def loggedIn(self, userHandle, verified):
102 if userHandle == 'foo@bar.com' and verified:
103 self.state = 'LOGIN'
104
105 def gotProfile(self, message):
106 self.state = 'PROFILE'
107
108 def gotContactStatus(self, code, userHandle, screenName):
109 if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
110 c = self.factory.contacts.getContact(userHandle)
111 if c.caps & msn.MSNContact.MSNC1 and c.msnobj:
112 self.state = 'INITSTATUS'
113
114 def contactStatusChanged(self, code, userHandle, screenName):
115 if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
116 self.state = 'NEWSTATUS'
117
118 def contactAvatarChanged(self, userHandle, hash):
119 if userHandle == "foo@bar.com" and hash == "trC8SlFx2sWQxZMIBAWSEnXc8oQ=":
120 self.state = 'NEWAVATAR'
121 elif self.state == 'NEWAVATAR' and hash == "":
122 self.state = 'AVATARGONE'
123
124 def contactPersonalChanged(self, userHandle, personal):
125 if userHandle == 'foo@bar.com' and personal == 'My Personal Message':
126 self.state = 'GOTPERSONAL'
127 elif userHandle == 'foo@bar.com' and personal == '':
128 self.state = 'PERSONALGONE'
129
130 def contactOffline(self, userHandle):
131 if userHandle == "foo@bar.com": self.state = 'OFFLINE'
132
133 def statusChanged(self, code):
134 if code == msn.STATUS_HIDDEN: self.state = 'MYSTATUS'
135
136 def listSynchronized(self, *args):
137 self.state = 'GOTLIST'
138
139 def gotPhoneNumber(self, userHandle, phoneType, number):
140 self.state = 'GOTPHONE'
141
142 def userRemovedMe(self, userHandle):
143 c = self.factory.contacts.getContact(userHandle)
144 if not c: self.state = 'USERREMOVEDME'
145
146 def userAddedMe(self, userGuid, userHandle, screenName):
147 c = self.factory.contacts.getContact(userHandle)
148 if c and (c.lists | msn.PENDING_LIST) and (screenName == 'Screen Name'):
149 self.state = 'USERADDEDME'
150
151 def gotSwitchboardInvitation(self, sessionID, host, port, key, userHandle, screenName):
152 if sessionID == 1234 and \
153 host == '192.168.1.1' and \
154 port == 1863 and \
155 key == '123.456' and \
156 userHandle == 'foo@foo.com' and \
157 screenName == 'Screen Name':
158 self.state = 'SBINVITED'
159
160 def gotMSNAlert(self, body, action, subscr):
161 self.state = 'NOTIFICATION'
162
163 def gotInitialEmailNotification(self, inboxunread, foldersunread):
164 if inboxunread == 1 and foldersunread == 0:
165 self.state = 'INITEMAIL1'
166 else:
167 self.state = 'INITEMAIL2'
168
169 def gotRealtimeEmailNotification(self, mailfrom, fromaddr, subject):
170 if mailfrom == 'Some Person' and fromaddr == 'example@passport.com' and subject == 'newsubject':
171 self.state = 'REALTIMEEMAIL'
172
173 class NotificationTests(unittest.TestCase):
174 """ testing the various events in NotificationClient """
175
176 def setUp(self):
177 self.client = DummyNotificationClient()
178 self.client.factory = msn.NotificationFactory()
179 self.client.state = 'START'
180
181 def tearDown(self):
182 self.client = None
183
184 def testLogin(self):
185 self.client.lineReceived('USR 1 OK foo@bar.com 1')
186 self.failUnless((self.client.state == 'LOGIN'), 'Failed to detect successful login')
187
188 def testProfile(self):
189 m = 'MSG Hotmail Hotmail 353\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsprofile; charset=UTF-8\r\n'
190 m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n'
191 m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n'
192 m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n'
193 self.client.dataReceived(m)
194 self.failUnless((self.client.state == 'PROFILE'), 'Failed to detect initial profile')
195
196 def testInitialEmailNotification(self):
197 m = 'MIME-Version: 1.0\r\nContent-Type: text/x-msmsgsinitialemailnotification; charset=UTF-8\r\n'
198 m += '\r\nInbox-Unread: 1\r\nFolders-Unread: 0\r\nInbox-URL: /cgi-bin/HoTMaiL\r\n'
199 m += 'Folders-URL: /cgi-bin/folders\r\nPost-URL: http://www.hotmail.com\r\n\r\n'
200 m = 'MSG Hotmail Hotmail %s\r\n' % (str(len(m))) + m
201 self.client.dataReceived(m)
202 self.failUnless((self.client.state == 'INITEMAIL1'), 'Failed to detect initial email notification')
203
204 def testNoInitialEmailNotification(self):
205 m = 'MIME-Version: 1.0\r\nContent-Type: text/x-msmsgsinitialemailnotification; charset=UTF-8\r\n'
206 m += '\r\nInbox-Unread: 0\r\nFolders-Unread: 0\r\nInbox-URL: /cgi-bin/HoTMaiL\r\n'
207 m += 'Folders-URL: /cgi-bin/folders\r\nPost-URL: http://www.hotmail.com\r\n\r\n'
208 m = 'MSG Hotmail Hotmail %s\r\n' % (str(len(m))) + m
209 self.client.dataReceived(m)
210 self.failUnless((self.client.state != 'INITEMAIL2'), 'Detected initial email notification when I should not have')
211
212 def testRealtimeEmailNotification(self):
213 m = 'MSG Hotmail Hotmail 356\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsemailnotification; charset=UTF-8\r\n'
214 m += '\r\nFrom: Some Person\r\nMessage-URL: /cgi-bin/getmsg?msg=MSG1050451140.21&start=2310&len=2059&curmbox=ACTIVE\r\n'
215 m += 'Post-URL: https://loginnet.passport.com/ppsecure/md5auth.srf?lc=1038\r\n'
216 m += 'Subject: =?"us-ascii"?Q?newsubject?=\r\nDest-Folder: ACTIVE\r\nFrom-Addr: example@passport.com\r\nid: 2\r\n'
217 self.client.dataReceived(m)
218 self.failUnless((self.client.state == 'REALTIMEEMAIL'), 'Failed to detect realtime email notification')
219
220 def testMSNAlert(self):
221 m = '<NOTIFICATION ver="2" id="1342902633" siteid="199999999" siteurl="http://alerts.msn.com">\r\n'
222 m += '<TO pid="0x0006BFFD:0x8582C0FB" name="example@passport.com"/>\r\n'
223 m += '<MSG pri="1" id="1342902633">\r\n'
224 m += '<SUBSCR url="http://g.msn.com/3ALMSNTRACKING/199999999ToastChange?http://alerts.msn.com/Alerts/MyAlerts.aspx?strela=1"/>\r\n'
225 m += '<ACTION url="http://g.msn.com/3ALMSNTRACKING/199999999ToastAction?http://alerts.msn.com/Alerts/MyAlerts.aspx?strela=1"/>\r\n'
226 m += '<BODY lang="3076" icon="">\r\n'
227 m += '<TEXT>utf8-encoded text</TEXT></BODY></MSG>\r\n'
228 m += '</NOTIFICATION>\r\n'
229 cmd = 'NOT %s\r\n' % str(len(m))
230 m = cmd + m
231 # Whee, lots of fun to test that lineReceived & dataReceived work well with input coming
232 # in in (fairly) arbitrary chunks.
233 map(self.client.dataReceived, [x+'\r\n' for x in m.split('\r\n')[:-1]])
234 self.failUnless((self.client.state == 'NOTIFICATION'), 'Failed to detect MSN Alert message')
235
236 def testListSync(self):
237 self.client.makeConnection(StringIOWithoutClosing())
238 msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 1)
239 lines = [
240 "SYN %s 2005-04-23T18:57:44.8130000-07:00 2005-04-23T18:57:54.2070000-07:00 1 3" % self.client.currentID,
241 "GTC A",
242 "BLP AL",
243 "LSG Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy",
244 "LSG Other%20Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz",
245 "LSG More%20Other%20Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyya",
246 "LST N=userHandle@email.com F=Some%20Name C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx 13 yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy,yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz",
247 ]
248 map(self.client.lineReceived, lines)
249 contacts = self.client.factory.contacts
250 contact = contacts.getContact('userHandle@email.com')
251 #self.failUnless(contacts.version == 100, "Invalid contact list version")
252 self.failUnless(contact.screenName == 'Some Name', "Invalid screen-name for user")
253 self.failUnless(contacts.groups == {'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy': 'Friends', \
254 'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz': 'Other Friends', \
255 'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyya': 'More Other Friends'} \
256 , "Did not get proper group list")
257 self.failUnless(contact.groups == ['yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy', \
258 'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz'] and \
259 contact.lists == 13, "Invalid contact list/group info")
260 self.failUnless(self.client.state == 'GOTLIST', "Failed to call list sync handler")
261 self.client.logOut()
262
263 def testStatus(self):
264 # Set up the contact list
265 self.client.makeConnection(StringIOWithoutClosing())
266 msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 1)
267 lines = [
268 "SYN %s 2005-04-23T18:57:44.8130000-07:00 2005-04-23T18:57:54.2070000-07:00 1 0" % self.client.currentID,
269 "GTC A",
270 "BLP AL",
271 "LST N=foo@bar.com F=Some%20Name C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx 13 yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy,yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz",
272 ]
273 map(self.client.lineReceived, lines)
274 # Now test!
275 msnobj = urllib.quote('<msnobj Creator="buddy1@hotmail.com" Size="24539" Type="3" Location="TFR2C.tmp" Friendly="AAA=" SHA1D="trC8SlFx2sWQxZMIBAWSEnXc8oQ=" SHA1C="U32o6bosZzluJq82eAtMpx5dIEI="/>')
276 t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 268435456 ' + msnobj, 'INITSTATUS', 'Failed to detect initial status report'),
277 ('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'),
278 ('NLN AWY foo@bar.com Test%20Name 0 ' + msnobj, 'NEWAVATAR', 'Failed to detect contact avatar change'),
279 ('NLN AWY foo@bar.com Test%20Name 0', 'AVATARGONE', 'Failed to detect contact avatar disappearing'),
280 ('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'),
281 ('CHG 1 HDN 0 ' + msnobj, 'MYSTATUS', 'Failed to detect my status changing')]
282 for i in t:
283 self.client.lineReceived(i[0])
284 self.failUnless((self.client.state == i[1]), i[2])
285
286 # Test UBX
287 self.client.dataReceived('UBX foo@bar.com 72\r\n<Data><PSM>My Personal Message</PSM><CurrentMedia></CurrentMedia></Data>')
288 self.failUnless((self.client.state == 'GOTPERSONAL'), 'Failed to detect new personal message')
289 self.client.dataReceived('UBX foo@bar.com 0\r\n')
290 self.failUnless((self.client.state == 'PERSONALGONE'), 'Failed to detect personal message disappearing')
291 self.client.logOut()
292
293 def testAsyncPhoneChange(self):
294 c = msn.MSNContact(userHandle='userHandle@email.com')
295 self.client.factory.contacts = msn.MSNContactList()
296 self.client.factory.contacts.addContact(c)
297 self.client.makeConnection(StringIOWithoutClosing())
298 self.client.lineReceived("BPR 101 userHandle@email.com PHH 123%20456")
299 c = self.client.factory.contacts.getContact('userHandle@email.com')
300 self.failUnless(self.client.state == 'GOTPHONE', "Did not fire phone change callback")
301 self.failUnless(c.homePhone == '123 456', "Did not update the contact's phone number")
302 self.failUnless(self.client.factory.contacts.version == 101, "Did not update list version")
303
304 def testLateBPR(self):
305 """
306 This test makes sure that if a BPR response that was meant
307 to be part of a SYN response (but came after the last LST)
308 is received, the correct contact is updated and all is well
309 """
310 self.client.makeConnection(StringIOWithoutClosing())
311 msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 1)
312 lines = [
313 "SYN %s 2005-04-23T18:57:44.8130000-07:00 2005-04-23T18:57:54.2070000-07:00 1 0" % self.client.currentID,
314 "GTC A",
315 "BLP AL",
316 "LSG Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy",
317 "LST N=userHandle@email.com F=Some%20Name C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx 13 yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy",
318 "BPR PHH 123%20456"
319 ]
320 map(self.client.lineReceived, lines)
321 contact = self.client.factory.contacts.getContact('userHandle@email.com')
322 self.failUnless(contact.homePhone == '123 456', "Did not update contact's phone number")
323 self.client.logOut()
324
325 def testUserRemovedMe(self):
326 self.client.factory.contacts = msn.MSNContactList()
327 contact = msn.MSNContact(userHandle='foo@foo.com')
328 contact.addToList(msn.REVERSE_LIST)
329 self.client.factory.contacts.addContact(contact)
330 self.client.lineReceived("REM 0 RL foo@foo.com")
331 self.failUnless(self.client.state == 'USERREMOVEDME', "Failed to remove user from reverse list")
332
333 def testUserAddedMe(self):
334 self.client.factory.contacts = msn.MSNContactList()
335 self.client.lineReceived("ADC 0 RL N=foo@foo.com F=Screen%20Name")
336 self.failUnless(self.client.state == 'USERADDEDME', "Failed to add user to reverse lise")
337
338 def testAsyncSwitchboardInvitation(self):
339 self.client.lineReceived("RNG 1234 192.168.1.1:1863 CKI 123.456 foo@foo.com Screen%20Name")
340 self.failUnless((self.client.state == 'SBINVITED'), 'Failed to detect switchboard invitation')
341
342
343 #######################################
344 # Notification with fake server tests #
345 #######################################
346
347 class FakeNotificationServer(msn.MSNEventBase):
348 def handle_CHG(self, params):
349 if len(params) < 4:
350 params.append('')
351 self.sendLine("CHG %s %s %s %s" % (params[0], params[1], params[2], params[3]))
352
353 def handle_BLP(self, params):
354 self.sendLine("BLP %s %s 100" % (params[0], params[1]))
355
356 def handle_ADC(self, params):
357 trid = params[0]
358 list = msn.listCodeToID[params[1].lower()]
359 if list == msn.FORWARD_LIST:
360 userHandle = ""
361 screenName = ""
362 userGuid = ""
363 groups = ""
364 for p in params[2:]:
365 if p[0] == 'N':
366 userHandle = p[2:]
367 elif p[0] == 'F':
368 screenName = p[2:]
369 elif p[0] == 'C':
370 userGuid = p[2:]
371 else:
372 groups = p
373 if userHandle and userGuid:
374 self.transport.loseConnection()
375 return
376 if userHandle:
377 if not screenName:
378 self.transport.loseConnection()
379 else:
380 self.sendLine("ADC %s FL N=%s F=%s C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx %s" % (trid, userHandle, screenName, groups))
381 return
382 if userGuid:
383 raise "NotImplementedError"
384 else:
385 if len(params) != 3:
386 self.transport.loseConnection()
387 if not params[2].startswith("N=") and params[2].count('@') == 1:
388 self.transport.loseConnection()
389 self.sendLine("ADC %s %s %s" % (params[0], params[1], params[2]))
390
391 def handle_REM(self, params):
392 if len(params) != 3:
393 self.transport.loseConnection()
394 return
395 try:
396 trid = int(params[0])
397 listType = msn.listCodeToID[params[1].lower()]
398 except:
399 self.transport.loseConnection()
400 if listType == msn.FORWARD_LIST and params[2].count('@') > 0:
401 self.transport.loseConnection()
402 elif listType != msn.FORWARD_LIST and params[2].count('@') != 1:
403 self.transport.loseConnection()
404 else:
405 self.sendLine("REM %s %s %s" % (params[0], params[1], params[2]))
406
407 def handle_PRP(self, params):
408 if len(params) != 3:
409 self.transport.loseConnection()
410 if params[1] == "MFN":
411 self.sendLine("PRP %s MFN %s" % (params[0], params[2]))
412 else:
413 # Only friendly names are implemented
414 self.transport.loseConnection()
415
416 def handle_UUX(self, params):
417 if len(params) != 2:
418 self.transport.loseConnection()
419 return
420 l = int(params[1])
421 if l > 0:
422 self.currentMessage = msn.MSNMessage(length=l, userHandle=params[0], screenName="UUX", specialMessage=True)
423 self.setRawMode()
424 else:
425 self.sendLine("UUX %s 0" % params[0])
426
427 def checkMessage(self, message):
428 if message.specialMessage:
429 if message.screenName == "UUX":
430 self.sendLine("UUX %s 0" % message.userHandle)
431 return 0
432 return 1
433
434 def handle_XFR(self, params):
435 if len(params) != 2:
436 self.transport.loseConnection()
437 return
438 if params[1] != "SB":
439 self.transport.loseConnection()
440 return
441 self.sendLine("XFR %s SB 129.129.129.129:1234 CKI SomeSecret" % params[0])
442
443
444
445 class FakeNotificationClient(msn.NotificationClient):
446 def doStatusChange(self):
447 def testcb((status,)):
448 if status == msn.STATUS_AWAY:
449 self.test = 'PASS'
450 self.transport.loseConnection()
451 d = self.changeStatus(msn.STATUS_AWAY)
452 d.addCallback(testcb)
453
454 def doPrivacyMode(self):
455 def testcb((priv,)):
456 if priv.upper() == 'AL':
457 self.test = 'PASS'
458 self.transport.loseConnection()
459 d = self.setPrivacyMode(True)
460 d.addCallback(testcb)
461
462 def doAddContactFL(self):
463 def testcb((listType, userGuid, userHandle, screenName)):
464 if listType & msn.FORWARD_LIST and \
465 userGuid == "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" and \
466 userHandle == "foo@bar.com" and \
467 screenName == "foo@bar.com":
468 self.test = 'PASS'
469 self.transport.loseConnection()
470 d = self.addContact(msn.FORWARD_LIST, "foo@bar.com")
471 d.addCallback(testcb)
472
473 def doAddContactAL(self):
474 def testcb((listType, userGuid, userHandle, screenName)):
475 if listType & msn.ALLOW_LIST and \
476 userHandle == "foo@bar.com" and \
477 not userGuid and not screenName:
478 self.test = 'PASS'
479 self.transport.loseConnection()
480 d = self.addContact(msn.ALLOW_LIST, "foo@bar.com")
481 d.addCallback(testcb)
482
483 def doRemContactFL(self):
484 def testcb((listType, userHandle, groupID)):
485 if listType & msn.FORWARD_LIST and \
486 userHandle == "foo@bar.com":
487 self.test = 'PASS'
488 self.transport.loseConnection()
489 d = self.remContact(msn.FORWARD_LIST, "foo@bar.com")
490 d.addCallback(testcb)
491
492 def doRemContactAL(self):
493 def testcb((listType, userHandle, groupID)):
494 if listType & msn.ALLOW_LIST and \
495 userHandle == "foo@bar.com":
496 self.test = 'PASS'
497 self.transport.loseConnection()
498 d = self.remContact(msn.ALLOW_LIST, "foo@bar.com")
499 d.addCallback(testcb)
500
501 def doScreenNameChange(self):
502 def testcb((screenName,)):
503 if screenName == "Some new name":
504 self.test = 'PASS'
505 self.transport.loseConnection()
506 d = self.changeScreenName("Some new name")
507 d.addCallback(testcb)
508
509 def doPersonalChange(self, personal):
510 def testcb((checkPersonal,)):
511 if checkPersonal == personal:
512 self.test = 'PASS'
513 self.transport.loseConnection()
514 d = self.changePersonalMessage(personal)
515 d.addCallback(testcb)
516
517 def doAvatarChange(self, data):
518 def testcb(ignored):
519 self.test = 'PASS'
520 self.transport.loseConnection()
521 d = self.changeAvatar(data, True)
522 d.addCallback(testcb)
523
524 def doRequestSwitchboard(self):
525 def testcb((host, port, key)):
526 if host == "129.129.129.129" and port == 1234 and key == "SomeSecret":
527 self.test = 'PASS'
528 self.transport.loseConnection()
529 d = self.requestSwitchboardServer()
530 d.addCallback(testcb)
531
532 class FakeServerNotificationTests(unittest.TestCase):
533 """ tests the NotificationClient against a fake server. """
534
535 def setUp(self):
536 self.client = FakeNotificationClient()
537 self.client.factory = msn.NotificationFactory()
538 self.client.test = 'FAIL'
539 self.server = FakeNotificationServer()
540
541 def tearDown(self):
542 pass
543
544 def testChangeStatus(self):
545 reactor.callLater(0, self.client.doStatusChange)
546 loopback.loopback(self.client, self.server)
547 self.failUnless((self.client.test == 'PASS'), 'Failed to change status properly')
548
549 def testSetPrivacyMode(self):
550 self.client.factory.contacts = msn.MSNContactList()
551 reactor.callLater(0, self.client.doPrivacyMode)
552 loopback.loopback(self.client, self.server)
553 self.failUnless((self.client.test == 'PASS'), 'Failed to change privacy mode')
554
555 def testSyncList(self):
556 reactor.callLater(0, self.client.doSyncList)
557 loopback.loopback(self.client, self.server)
558 self.failUnless((self.client.test == 'PASS'), 'Failed to synchronise list')
559 testSyncList.skip = "Will do after list versions."
560
561 def testAddContactFL(self):
562 self.client.factory.contacts = msn.MSNContactList()
563 reactor.callLater(0, self.client.doAddContactFL)
564 loopback.loopback(self.client, self.server)
565 self.failUnless((self.client.test == 'PASS'), 'Failed to add contact to forward list')
566
567 def testAddContactAL(self):
568 self.client.factory.contacts = msn.MSNContactList()
569 reactor.callLater(0, self.client.doAddContactAL)
570 loopback.loopback(self.client, self.server)
571 self.failUnless((self.client.test == 'PASS'), 'Failed to add contact to allow list')
572
573 def testRemContactFL(self):
574 self.client.factory.contacts = msn.MSNContactList()
575 self.client.factory.contacts.addContact(msn.MSNContact(userGuid="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", userHandle="foo@bar.com", screenName="Some guy", lists=msn.FORWARD_LIST))
576 reactor.callLater(0, self.client.doRemContactFL)
577 loopback.loopback(self.client, self.server)
578 self.failUnless((self.client.test == 'PASS'), 'Failed to remove contact from forward list')
579
580 def testRemContactAL(self):
581 self.client.factory.contacts = msn.MSNContactList()
582 self.client.factory.contacts.addContact(msn.MSNContact(userGuid="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", userHandle="foo@bar.com", screenName="Some guy", lists=msn.ALLOW_LIST))
583 reactor.callLater(0, self.client.doRemContactAL)
584 loopback.loopback(self.client, self.server)
585 self.failUnless((self.client.test == 'PASS'), 'Failed to remove contact from allow list')
586
587 def testChangedScreenName(self):
588 reactor.callLater(0, self.client.doScreenNameChange)
589 loopback.loopback(self.client, self.server)
590 self.failUnless((self.client.test == 'PASS'), 'Failed to change screen name properly')
591
592 def testChangePersonal1(self):
593 reactor.callLater(0, lambda: self.client.doPersonalChange("Some personal message"))
594 loopback.loopback(self.client, self.server)
595 self.failUnless((self.client.test == 'PASS'), 'Failed to change personal message properly')
596
597 def testChangePersonal2(self):
598 reactor.callLater(0, lambda: self.client.doPersonalChange(""))
599 loopback.loopback(self.client, self.server)
600 self.failUnless((self.client.test == 'PASS'), 'Failed to change personal message properly')
601
602 def testChangeAvatar(self):
603 reactor.callLater(0, lambda: self.client.doAvatarChange("DATADATADATADATA"))
604 loopback.loopback(self.client, self.server)
605 self.failUnless((self.client.test == 'PASS'), 'Failed to change avatar properly')
606
607 def testRequestSwitchboard(self):
608 reactor.callLater(0, self.client.doRequestSwitchboard)
609 loopback.loopback(self.client, self.server)
610 self.failUnless((self.client.test == 'PASS'), 'Failed to request switchboard')
611
612
613 #################################
614 # Notification Challenges tests #
615 #################################
616
617 class DummyChallengeNotificationServer(msn.MSNEventBase):
618 def doChallenge(self, challenge, response):
619 self.state = 0
620 self.response = response
621 self.sendLine("CHL 0 " + challenge)
622
623 def checkMessage(self, message):
624 if message.message == self.response:
625 self.state = "PASS"
626 self.transport.loseConnection()
627 return 0
628
629 def handle_QRY(self, params):
630 self.state = 1
631 if len(params) == 3 and params[1] == "PROD0090YUAUV{2B" and params[2] == "32":
632 self.state = 2
633 self.currentMessage = msn.MSNMessage(length=32, userHandle="QRY", screenName="QRY", specialMessage=True)
634 self.setRawMode()
635 else:
636 self.transport.loseConnection()
637
638 class DummyChallengeNotificationClient(msn.NotificationClient):
639 def connectionMade(self):
640 msn.MSNEventBase.connectionMade(self)
641
642 def handle_CHL(self, params):
643 msn.NotificationClient.handle_CHL(self, params)
644 self.transport.loseConnection()
645
646
647 class NotificationChallengeTests(unittest.TestCase):
648 """ tests the responses to the CHLs the server sends """
649
650 def setUp(self):
651 self.client = DummyChallengeNotificationClient()
652 self.server = DummyChallengeNotificationServer()
653
654 def tearDown(self):
655 pass
656
657 def testChallenges(self):
658 challenges = [('13038318816579321232', 'b01c13020e374d4fa20abfad6981b7a9'),
659 ('23055170411503520698', 'ae906c3f2946d25e7da1b08b0b247659'),
660 ('37819769320541083311', 'db79d37dadd9031bef996893321da480'),
661 ('93662730714769834295', 'd619dfbb1414004d34d0628766636568')]
662 for challenge, response in challenges:
663 reactor.callLater(0, lambda: self.server.doChallenge(challenge, response))
664 loopback.loopback(self.client, self.server)
665 self.failUnless((self.server.state == 'PASS'), 'Incorrect challenge response.')
666
667
668 ###########################
669 # Notification ping tests #
670 ###########################
671
672 class DummyPingNotificationServer(LineReceiver):
673 def lineReceived(self, line):
674 if line.startswith("PNG") and self.good:
675 self.sendLine("QNG 50")
676
677 class DummyPingNotificationClient(msn.NotificationClient):
678 def connectionMade(self):
679 self.pingCheckerStart()
680
681 def sendLine(self, line):
682 msn.NotificationClient.sendLine(self, line)
683 self.count += 1
684 if self.count > 10:
685 self.transport.loseConnection() # But not for real, just to end the test
686
687 def connectionLost(self, reason):
688 if self.count <= 10:
689 self.state = 'DISCONNECTED'
690
691 class NotificationPingTests(unittest.TestCase):
692 """ tests pinging in the NotificationClient class """
693
694 def setUp(self):
695 msn.PINGSPEED = 0.1
696 self.client = DummyPingNotificationClient()
697 self.server = DummyPingNotificationServer()
698 self.client.state = 'CONNECTED'
699 self.client.count = 0
700
701 def tearDown(self):
702 msn.PINGSPEED = 50.0
703 self.client.logOut()
704
705 def testPingGood(self):
706 self.server.good = True
707 loopback.loopback(self.client, self.server)
708 self.failUnless((self.client.state == 'CONNECTED'), 'Should be connected.')
709
710 def testPingBad(self):
711 self.server.good = False
712 loopback.loopback(self.client, self.server)
713 self.failUnless((self.client.state == 'DISCONNECTED'), 'Should be disconnected.')
714
715
716
717
718 #############################
719 # Switchboard message tests #
720 #############################
721
722 class DummySwitchboardClient(msn.SwitchboardClient):
723 def userTyping(self, message):
724 self.state = 'TYPING'
725
726 def gotSendRequest(self, fileName, fileSize, cookie, message):
727 if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
728
729
730 class MessageHandlingTests(unittest.TestCase):
731 """ testing various message handling methods from SwichboardClient """
732 skip = "Not ready"
733
734 def setUp(self):
735 self.client = DummySwitchboardClient()
736 self.client.state = 'START'
737
738 def tearDown(self):
739 self.client = None
740
741 def testTypingCheck(self):
742 m = msn.MSNMessage()
743 m.setHeader('Content-Type', 'text/x-msmsgscontrol')
744 m.setHeader('TypingUser', 'foo@bar')
745 self.client.checkMessage(m)
746 self.failUnless((self.client.state == 'TYPING'), 'Failed to detect typing notification')
747
748 def testFileInvitation(self, lazyClient=False):
749 m = msn.MSNMessage()
750 m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
751 m.message += 'Application-Name: File Transfer\r\n'
752 if not lazyClient:
753 m.message += 'Application-GUID: {5D3E02AB-6190-11d3-BBBB-00C04F795683}\r\n'
754 m.message += 'Invitation-Command: Invite\r\n'
755 m.message += 'Invitation-Cookie: 1234\r\n'
756 m.message += 'Application-File: foobar.ext\r\n'
757 m.message += 'Application-FileSize: 31337\r\n\r\n'
758 self.client.checkMessage(m)
759 self.failUnless((self.client.state == 'INVITATION'), 'Failed to detect file transfer invitation')
760
761 def testFileInvitationMissingGUID(self):
762 return self.testFileInvitation(True)
763
764 def testFileResponse(self):
765 d = Deferred()
766 d.addCallback(self.fileResponse)
767 self.client.cookies['iCookies'][1234] = (d, None)
768 m = msn.MSNMessage()
769 m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
770 m.message += 'Invitation-Command: ACCEPT\r\n'
771 m.message += 'Invitation-Cookie: 1234\r\n\r\n'
772 self.client.checkMessage(m)
773 self.failUnless((self.client.state == 'RESPONSE'), 'Failed to detect file transfer response')
774
775 def testFileInfo(self):
776 d = Deferred()
777 d.addCallback(self.fileInfo)
778 self.client.cookies['external'][1234] = (d, None)
779 m = msn.MSNMessage()
780 m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
781 m.message += 'Invitation-Command: ACCEPT\r\n'
782 m.message += 'Invitation-Cookie: 1234\r\n'
783 m.message += 'IP-Address: 192.168.0.1\r\n'
784 m.message += 'Port: 6891\r\n'
785 m.message += 'AuthCookie: 4321\r\n\r\n'
786 self.client.checkMessage(m)
787 self.failUnless((self.client.state == 'INFO'), 'Failed to detect file transfer info')
788
789 def fileResponse(self, (accept, cookie, info)):
790 if accept and cookie == 1234: self.client.state = 'RESPONSE'
791
792 def fileInfo(self, (accept, ip, port, aCookie, info)):
793 if accept and ip == '192.168.0.1' and port == 6891 and aCookie == 4321: self.client.state = 'INFO'
794
795
796
797 ################
798 # MSNFTP tests #
799 ################
800
801 class FileTransferTestCase(unittest.TestCase):
802 """ test FileSend against FileReceive """
803 skip = "Not ready"
804
805 def setUp(self):
806 self.input = StringIOWithoutClosing()
807 self.input.writelines(['a'] * 7000)
808 self.input.seek(0)
809 self.output = StringIOWithoutClosing()
810
811 def tearDown(self):
812 self.input = None
813 self.output = None
814
815 def testFileTransfer(self):
816 auth = 1234
817 sender = msn.FileSend(self.input)
818 sender.auth = auth
819 sender.fileSize = 7000
820 client = msn.FileReceive(auth, "foo@bar.com", self.output)
821 client.fileSize = 7000
822 loopback.loopback(sender, client)
823 self.failUnless((client.completed and sender.completed), "send failed to complete")
824 self.failUnless((self.input.getvalue() == self.output.getvalue()), "saved file does not match original")