]> code.delx.au - pymsnt/blob - src/tlib/msn/test_msn.py
Hurrah for unit testing
[pymsnt] / src / tlib / msn / test_msn.py
1 # Copyright (c) 2001-2005 Twisted Matrix Laboratories.
2 # Copyright (c) 2005 James Bunton
3 # See LICENSE for details.
4
5 """
6 Test cases for msn.
7 """
8
9 # Twisted imports
10 from twisted.protocols import loopback
11 from twisted.protocols.basic import LineReceiver
12 from twisted.internet.defer import Deferred
13 from twisted.internet import reactor
14 from twisted.trial import unittest
15
16 # System imports
17 import StringIO, sys, urllib
18
19 import msn
20
21
22 def printError(f):
23 print f
24
25 class StringIOWithoutClosing(StringIO.StringIO):
26 disconnecting = 0
27 def close(self): pass
28 def loseConnection(self): pass
29
30 class PassportTests(unittest.TestCase):
31
32 def setUp(self):
33 self.result = []
34 self.deferred = Deferred()
35 self.deferred.addCallback(lambda r: self.result.append(r))
36 self.deferred.addErrback(printError)
37
38 def testNexus(self):
39 protocol = msn.PassportNexus(self.deferred, 'https://foobar.com/somepage.quux')
40 headers = {
41 'Content-Length' : '0',
42 'Content-Type' : 'text/html',
43 'PassportURLs' : 'DARealm=Passport.Net,DALogin=login.myserver.com/,DAReg=reg.myserver.com'
44 }
45 transport = StringIOWithoutClosing()
46 protocol.makeConnection(transport)
47 protocol.dataReceived('HTTP/1.0 200 OK\r\n')
48 for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
49 protocol.dataReceived('\r\n')
50 self.failUnless(self.result[0] == "https://login.myserver.com/")
51
52 def _doLoginTest(self, response, headers):
53 protocol = msn.PassportLogin(self.deferred,'foo@foo.com','testpass','https://foo.com/', 'a')
54 protocol.makeConnection(StringIOWithoutClosing())
55 protocol.dataReceived(response)
56 for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
57 protocol.dataReceived('\r\n')
58
59 def testPassportLoginSuccess(self):
60 headers = {
61 'Content-Length' : '0',
62 'Content-Type' : 'text/html',
63 'Authentication-Info' : "Passport1.4 da-status=success,tname=MSPAuth," +
64 "tname=MSPProf,tname=MSPSec,from-PP='somekey'," +
65 "ru=http://messenger.msn.com"
66 }
67 self._doLoginTest('HTTP/1.1 200 OK\r\n', headers)
68 self.failUnless(self.result[0] == (msn.LOGIN_SUCCESS, 'somekey'))
69
70 def testPassportLoginFailure(self):
71 headers = {
72 'Content-Type' : 'text/html',
73 'WWW-Authenticate' : 'Passport1.4 da-status=failed,' +
74 'srealm=Passport.NET,ts=-3,prompt,cburl=http://host.com,' +
75 'cbtxt=the%20error%20message'
76 }
77 self._doLoginTest('HTTP/1.1 401 Unauthorized\r\n', headers)
78 self.failUnless(self.result[0] == (msn.LOGIN_FAILURE, 'the error message'))
79
80 def testPassportLoginRedirect(self):
81 headers = {
82 'Content-Type' : 'text/html',
83 'Authentication-Info' : 'Passport1.4 da-status=redir',
84 'Location' : 'https://newlogin.host.com/'
85 }
86 self._doLoginTest('HTTP/1.1 302 Found\r\n', headers)
87 self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a'))
88
89
90 class DummySwitchboardClient(msn.SwitchboardClient):
91 def userTyping(self, message):
92 self.state = 'TYPING'
93
94 def gotSendRequest(self, fileName, fileSize, cookie, message):
95 if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
96
97
98 class DummyNotificationServer(msn.MSNEventBase):
99 def __init__(self):
100 pass
101
102 class DummyNotificationClient(msn.NotificationClient):
103 def loggedIn(self, userHandle, verified):
104 if userHandle == 'foo@bar.com' and verified:
105 self.state = 'LOGIN'
106
107 def gotProfile(self, message):
108 self.state = 'PROFILE'
109
110 def gotContactStatus(self, code, userHandle, screenName):
111 if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
112 c = self.factory.contacts.getContact(userHandle)
113 if c.caps & msn.MSNContact.MSNC1 and c.msnobj:
114 self.state = 'INITSTATUS'
115
116 def contactStatusChanged(self, code, userHandle, screenName):
117 if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
118 self.state = 'NEWSTATUS'
119
120 def contactOffline(self, userHandle):
121 if userHandle == "foo@bar.com": self.state = 'OFFLINE'
122
123 def statusChanged(self, code):
124 if code == msn.STATUS_HIDDEN: self.state = 'MYSTATUS'
125
126 def listSynchronized(self, *args):
127 self.state = 'GOTLIST'
128
129 def gotPhoneNumber(self, userHandle, phoneType, number):
130 self.state = 'GOTPHONE'
131
132 def userRemovedMe(self, userHandle):
133 c = self.factory.contacts.getContact(userHandle)
134 if not c: self.state = 'USERREMOVEDME'
135
136 def userAddedMe(self, userGuid, userHandle, screenName):
137 c = self.factory.contacts.getContact(userHandle)
138 if c and (c.lists | msn.PENDING_LIST) and (screenName == 'Screen Name'):
139 self.state = 'USERADDEDME'
140
141 def gotSwitchboardInvitation(self, sessionID, host, port, key, userHandle, screenName):
142 if sessionID == 1234 and \
143 host == '192.168.1.1' and \
144 port == 1863 and \
145 key == '123.456' and \
146 userHandle == 'foo@foo.com' and \
147 screenName == 'Screen Name':
148 self.state = 'SBINVITED'
149
150 def gotMSNAlert(self, body, action, subscr):
151 self.state = 'NOTIFICATION'
152
153
154 class DummyPingNotificationServer(LineReceiver):
155 def lineReceived(self, line):
156 if line.startswith("PNG") and self.good:
157 self.sendLine("QNG 50")
158
159 class DummyPingNotificationClient(msn.NotificationClient):
160 def connectionMade(self):
161 self.pingCheckerStart()
162
163 def sendLine(self, line):
164 msn.NotificationClient.sendLine(self, line)
165 self.count += 1
166 if self.count > 10:
167 self.transport.loseConnection() # But not for real, just to end the test
168
169 def connectionLost(self, reason):
170 if self.count <= 10:
171 self.state = 'DISCONNECTED'
172
173
174 class NotificationTests(unittest.TestCase):
175 """ testing the various events in NotificationClient """
176 #skip = "Not ready"
177
178 def setUp(self):
179 self.client = DummyNotificationClient()
180 self.client.factory = msn.NotificationFactory()
181 self.client.state = 'START'
182
183 def tearDown(self):
184 self.client = None
185
186 def testLogin(self):
187 self.client.lineReceived('USR 1 OK foo@bar.com 1')
188 self.failUnless((self.client.state == 'LOGIN'), 'Failed to detect successful login')
189
190 def testProfile(self):
191 m = 'MSG Hotmail Hotmail 353\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsprofile; charset=UTF-8\r\n'
192 m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n'
193 m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n'
194 m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n'
195 map(self.client.dataReceived, [x+'\r\n' for x in m.split('\r\n')[:-1]])
196 self.failUnless((self.client.state == 'PROFILE'), 'Failed to detect initial profile')
197
198 def testMSNAlert(self):
199 m = '<NOTIFICATION ver="2" id="1342902633" siteid="199999999" siteurl="http://alerts.msn.com">\r\n'
200 m += '<TO pid="0x0006BFFD:0x8582C0FB" name="example@passport.com"/>\r\n'
201 m += '<MSG pri="1" id="1342902633">\r\n'
202 m += '<SUBSCR url="http://g.msn.com/3ALMSNTRACKING/199999999ToastChange?http://alerts.msn.com/Alerts/MyAlerts.aspx?strela=1"/>\r\n'
203 m += '<ACTION url="http://g.msn.com/3ALMSNTRACKING/199999999ToastAction?http://alerts.msn.com/Alerts/MyAlerts.aspx?strela=1"/>\r\n'
204 m += '<BODY lang="3076" icon="">\r\n'
205 m += '<TEXT>utf8-encoded text</TEXT></BODY></MSG>\r\n'
206 m += '</NOTIFICATION>\r\n'
207 cmd = 'NOT %s\r\n' % str(len(m))
208 m = cmd + m
209 # Whee, lots of fun to test that lineReceived & dataReceived work well with input coming
210 # in in (fairly) arbitrary chunks.
211 map(self.client.dataReceived, [x+'\r\n' for x in m.split('\r\n')[:-1]])
212 self.failUnless((self.client.state == 'NOTIFICATION'), 'Failed to detect MSN Alert message')
213
214 def testListSync(self):
215 self.client.makeConnection(StringIOWithoutClosing())
216 msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 1)
217 lines = [
218 "SYN %s 2005-04-23T18:57:44.8130000-07:00 2005-04-23T18:57:54.2070000-07:00 1 3" % self.client.currentID,
219 "GTC A",
220 "BLP AL",
221 "LSG Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy",
222 "LSG Other%20Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz",
223 "LSG More%20Other%20Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyya",
224 "LST N=userHandle@email.com F=Some%20Name C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx 13 yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy,yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz",
225 ]
226 map(self.client.lineReceived, lines)
227 contacts = self.client.factory.contacts
228 contact = contacts.getContact('userHandle@email.com')
229 #self.failUnless(contacts.version == 100, "Invalid contact list version")
230 self.failUnless(contact.screenName == 'Some Name', "Invalid screen-name for user")
231 self.failUnless(contacts.groups == {'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy': 'Friends', \
232 'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz': 'Other Friends', \
233 'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyya': 'More Other Friends'} \
234 , "Did not get proper group list")
235 self.failUnless(contact.groups == ['yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy', \
236 'yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz'] and \
237 contact.lists == 13, "Invalid contact list/group info")
238 self.failUnless(self.client.state == 'GOTLIST', "Failed to call list sync handler")
239 self.client.logOut()
240
241 def testStatus(self):
242 # Set up the contact list
243 self.client.makeConnection(StringIOWithoutClosing())
244 msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 1)
245 lines = [
246 "SYN %s 2005-04-23T18:57:44.8130000-07:00 2005-04-23T18:57:54.2070000-07:00 1 0" % self.client.currentID,
247 "GTC A",
248 "BLP AL",
249 "LST N=foo@bar.com F=Some%20Name C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx 13 yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy,yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyz",
250 ]
251 map(self.client.lineReceived, lines)
252 # Now test!
253 msnobj = urllib.quote('<msnobj Creator="buddy1@hotmail.com" Size="24539" Type="3" Location="TFR2C.tmp" Friendly="AAA=" SHA1D="trC8SlFx2sWQxZMIBAWSEnXc8oQ=" SHA1C="U32o6bosZzluJq82eAtMpx5dIEI="/>')
254 t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 268435456 ' + msnobj, 'INITSTATUS', 'Failed to detect initial status report'),
255 ('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'),
256 ('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'),
257 ('CHG 1 HDN 0 ' + msnobj, 'MYSTATUS', 'Failed to detect my status changing')]
258 for i in t:
259 self.client.lineReceived(i[0])
260 self.failUnless((self.client.state == i[1]), i[2])
261 self.client.logOut()
262
263 def testAsyncPhoneChange(self):
264 c = msn.MSNContact(userHandle='userHandle@email.com')
265 self.client.factory.contacts = msn.MSNContactList()
266 self.client.factory.contacts.addContact(c)
267 self.client.makeConnection(StringIOWithoutClosing())
268 self.client.lineReceived("BPR 101 userHandle@email.com PHH 123%20456")
269 c = self.client.factory.contacts.getContact('userHandle@email.com')
270 self.failUnless(self.client.state == 'GOTPHONE', "Did not fire phone change callback")
271 self.failUnless(c.homePhone == '123 456', "Did not update the contact's phone number")
272 self.failUnless(self.client.factory.contacts.version == 101, "Did not update list version")
273
274 def testLateBPR(self):
275 """
276 This test makes sure that if a BPR response that was meant
277 to be part of a SYN response (but came after the last LST)
278 is received, the correct contact is updated and all is well
279 """
280 self.client.makeConnection(StringIOWithoutClosing())
281 msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 1)
282 lines = [
283 "SYN %s 2005-04-23T18:57:44.8130000-07:00 2005-04-23T18:57:54.2070000-07:00 1 0" % self.client.currentID,
284 "GTC A",
285 "BLP AL",
286 "LSG Friends yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy",
287 "LST N=userHandle@email.com F=Some%20Name C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx 13 yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy",
288 "BPR PHH 123%20456"
289 ]
290 map(self.client.lineReceived, lines)
291 contact = self.client.factory.contacts.getContact('userHandle@email.com')
292 self.failUnless(contact.homePhone == '123 456', "Did not update contact's phone number")
293 self.client.logOut()
294
295 def testUserRemovedMe(self):
296 self.client.factory.contacts = msn.MSNContactList()
297 contact = msn.MSNContact(userHandle='foo@foo.com')
298 contact.addToList(msn.REVERSE_LIST)
299 self.client.factory.contacts.addContact(contact)
300 self.client.lineReceived("REM 0 RL 100 foo@foo.com")
301 self.failUnless(self.client.state == 'USERREMOVEDME', "Failed to remove user from reverse list")
302
303 def testUserAddedMe(self):
304 self.client.factory.contacts = msn.MSNContactList()
305 self.client.lineReceived("ADC 0 RL N=foo@foo.com F=Screen%20Name")
306 self.failUnless(self.client.state == 'USERADDEDME', "Failed to add user to reverse lise")
307
308 def testAsyncSwitchboardInvitation(self):
309 self.client.lineReceived("RNG 1234 192.168.1.1:1863 CKI 123.456 foo@foo.com Screen%20Name")
310 self.failUnless(self.client.state == "SBINVITED")
311
312 class NotificationPingTests(unittest.TestCase):
313 """ tests pinging in the NotificationClient class """
314 # skip = "Not working yet"
315
316 def setUp(self):
317 msn.PINGSPEED = 0.1
318 self.client = DummyPingNotificationClient()
319 self.server = DummyPingNotificationServer()
320 self.client.state = 'CONNECTED'
321 self.client.count = 0
322
323 def tearDown(self):
324 msn.PINGSPEED = 50.0
325 self.client.logOut()
326
327 def testPingGood(self):
328 self.server.good = True
329 loopback.loopback(self.client, self.server)
330 self.failUnless((self.client.state == 'CONNECTED'), 'Should be connected.')
331
332 def testPingBad(self):
333 self.server.good = False
334 loopback.loopback(self.client, self.server)
335 self.failUnless((self.client.state == 'DISCONNECTED'), 'Should be disconnected.')
336
337
338 class MessageHandlingTests(unittest.TestCase):
339 """ testing various message handling methods from SwichboardClient """
340 skip = "Not ready"
341
342 def setUp(self):
343 self.client = DummySwitchboardClient()
344 self.client.state = 'START'
345
346 def tearDown(self):
347 self.client = None
348
349 def testTypingCheck(self):
350 m = msn.MSNMessage()
351 m.setHeader('Content-Type', 'text/x-msmsgscontrol')
352 m.setHeader('TypingUser', 'foo@bar')
353 self.client.checkMessage(m)
354 self.failUnless((self.client.state == 'TYPING'), 'Failed to detect typing notification')
355
356 def testFileInvitation(self, lazyClient=False):
357 m = msn.MSNMessage()
358 m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
359 m.message += 'Application-Name: File Transfer\r\n'
360 if not lazyClient:
361 m.message += 'Application-GUID: {5D3E02AB-6190-11d3-BBBB-00C04F795683}\r\n'
362 m.message += 'Invitation-Command: Invite\r\n'
363 m.message += 'Invitation-Cookie: 1234\r\n'
364 m.message += 'Application-File: foobar.ext\r\n'
365 m.message += 'Application-FileSize: 31337\r\n\r\n'
366 self.client.checkMessage(m)
367 self.failUnless((self.client.state == 'INVITATION'), 'Failed to detect file transfer invitation')
368
369 def testFileInvitationMissingGUID(self):
370 return self.testFileInvitation(True)
371
372 def testFileResponse(self):
373 d = Deferred()
374 d.addCallback(self.fileResponse)
375 self.client.cookies['iCookies'][1234] = (d, None)
376 m = msn.MSNMessage()
377 m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
378 m.message += 'Invitation-Command: ACCEPT\r\n'
379 m.message += 'Invitation-Cookie: 1234\r\n\r\n'
380 self.client.checkMessage(m)
381 self.failUnless((self.client.state == 'RESPONSE'), 'Failed to detect file transfer response')
382
383 def testFileInfo(self):
384 d = Deferred()
385 d.addCallback(self.fileInfo)
386 self.client.cookies['external'][1234] = (d, None)
387 m = msn.MSNMessage()
388 m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
389 m.message += 'Invitation-Command: ACCEPT\r\n'
390 m.message += 'Invitation-Cookie: 1234\r\n'
391 m.message += 'IP-Address: 192.168.0.1\r\n'
392 m.message += 'Port: 6891\r\n'
393 m.message += 'AuthCookie: 4321\r\n\r\n'
394 self.client.checkMessage(m)
395 self.failUnless((self.client.state == 'INFO'), 'Failed to detect file transfer info')
396
397 def fileResponse(self, (accept, cookie, info)):
398 if accept and cookie == 1234: self.client.state = 'RESPONSE'
399
400 def fileInfo(self, (accept, ip, port, aCookie, info)):
401 if accept and ip == '192.168.0.1' and port == 6891 and aCookie == 4321: self.client.state = 'INFO'
402
403
404 class FileTransferTestCase(unittest.TestCase):
405 """ test FileSend against FileReceive """
406 skip = "Not ready"
407
408 def setUp(self):
409 self.input = StringIOWithoutClosing()
410 self.input.writelines(['a'] * 7000)
411 self.input.seek(0)
412 self.output = StringIOWithoutClosing()
413
414 def tearDown(self):
415 self.input = None
416 self.output = None
417
418 def testFileTransfer(self):
419 auth = 1234
420 sender = msn.FileSend(self.input)
421 sender.auth = auth
422 sender.fileSize = 7000
423 client = msn.FileReceive(auth, "foo@bar.com", self.output)
424 client.fileSize = 7000
425 loopback.loopback(sender, client)
426 self.failUnless((client.completed and sender.completed), "send failed to complete")
427 self.failUnless((self.input.getvalue() == self.output.getvalue()), "saved file does not match original")