X-Git-Url: https://code.delx.au/pymsnt/blobdiff_plain/638fa9875fdb5b2fd976348caee431322af796e4..3ba3e33be5a07bb0666f678deefe8ecae7acadc3:/src/tlib/msn/test_msn.py diff --git a/src/tlib/msn/test_msn.py b/src/tlib/msn/test_msn.py index 237a594..428e5fa 100644 --- a/src/tlib/msn/test_msn.py +++ b/src/tlib/msn/test_msn.py @@ -15,10 +15,9 @@ from twisted.python import failure from twisted.trial import unittest # System imports -import StringIO, sys, urllib +import StringIO, sys, urllib, random, struct import msn -import msnft def printError(f): @@ -146,18 +145,18 @@ class DummyNotificationClient(msn.NotificationClient): def gotProfile(self, message): self.state = 'PROFILE' - def gotContactStatus(self, code, userHandle, screenName): + def gotContactStatus(self, userHandle, code, screenName): if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name": c = self.factory.contacts.getContact(userHandle) if c.caps & msn.MSNContact.MSNC1 and c.msnobj: self.state = 'INITSTATUS' - def contactStatusChanged(self, code, userHandle, screenName): + def contactStatusChanged(self, userHandle, code, screenName): if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name": self.state = 'NEWSTATUS' def contactAvatarChanged(self, userHandle, hash): - if userHandle == "foo@bar.com" and hash == "trC8SlFx2sWQxZMIBAWSEnXc8oQ=": + if userHandle == "foo@bar.com" and hash == "b6b0bc4a5171dac590c593080405921275dcf284": self.state = 'NEWAVATAR' elif self.state == 'NEWAVATAR' and hash == "": self.state = 'AVATARGONE' @@ -505,7 +504,8 @@ class FakeNotificationClient(msn.NotificationClient): if listType & msn.FORWARD_LIST and \ userGuid == "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" and \ userHandle == "foo@bar.com" and \ - screenName == "foo@bar.com": + screenName == "foo@bar.com" and \ + self.factory.contacts.getContact(userHandle): self.test = 'PASS' self.transport.loseConnection() d = self.addContact(msn.FORWARD_LIST, "foo@bar.com") @@ -515,7 +515,8 @@ class FakeNotificationClient(msn.NotificationClient): def testcb((listType, userGuid, userHandle, screenName)): if listType & msn.ALLOW_LIST and \ userHandle == "foo@bar.com" and \ - not userGuid and not screenName: + not userGuid and not screenName and \ + self.factory.contacts.getContact(userHandle): self.test = 'PASS' self.transport.loseConnection() d = self.addContact(msn.ALLOW_LIST, "foo@bar.com") @@ -540,9 +541,8 @@ class FakeNotificationClient(msn.NotificationClient): d.addCallback(testcb) def doScreenNameChange(self): - def testcb((screenName,)): - if screenName == "Some new name": - self.test = 'PASS' + def testcb(*args): + self.test = 'PASS' self.transport.loseConnection() d = self.changeScreenName("Some new name") d.addCallback(testcb) @@ -701,7 +701,8 @@ class NotificationChallengeTests(unittest.TestCase): challenges = [('13038318816579321232', 'b01c13020e374d4fa20abfad6981b7a9'), ('23055170411503520698', 'ae906c3f2946d25e7da1b08b0b247659'), ('37819769320541083311', 'db79d37dadd9031bef996893321da480'), - ('93662730714769834295', 'd619dfbb1414004d34d0628766636568')] + ('93662730714769834295', 'd619dfbb1414004d34d0628766636568'), + ('31154116582196216093', '95e96c4f8cfdba6f065c8869b5e984e9')] for challenge, response in challenges: self.server.doChallenge(challenge, response) self.failUnless(self.loop.doSteps(10), 'Failed to disconnect') @@ -809,7 +810,7 @@ class DummySwitchboardClient(msn.SwitchboardClient): if userHandle == "friend@hotmail.com": self.state = 'USERLEFT' - def userTyping(self, message): + def gotContactTyping(self, message): if message.userHandle == 'foo@bar.com': self.state = 'USERTYPING' @@ -965,6 +966,9 @@ class DummySwitchboardP2PClient(msn.SwitchboardClient): if message.message == "Test Message" and message.userHandle == "foo1@bar.com": self.status = "GOTMESSAGE" + def gotFileReceive(self, fileReceive): + self.fileReceive = fileReceive + class SwitchboardP2PTests(unittest.TestCase): def setUp(self): self.server = DummySwitchboardP2PServer() @@ -982,17 +986,27 @@ class SwitchboardP2PTests(unittest.TestCase): self.loop1.disconnect() self.loop2.disconnect() + def _loop(self, steps=1): + for i in xrange(steps): + self.loop1.doSteps(1) + self.loop2.doSteps(1) + def testMessage(self): self.client1.sendMessage(msn.MSNMessage(message='Test Message')) - self.loop1.doSteps(10) - self.loop2.doSteps(10) + self._loop() self.failUnless((self.client2.status == "GOTMESSAGE"), "Fake switchboard server not working.") + + def _generateData(self): + data = '' + for i in xrange(3000): + data += struct.pack("client1) + msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1) + fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData)) + def accepted((yes,)): + if yes: + fileSend.write(fileData) + fileSend.close() + else: + raise "TransferDeclined" + def failed(): + raise "TransferError" + d.addCallback(accepted) + d.addErrback(failed) + + # Let the request get pushed to client1 + self._loop(10) + + # Receive the file + def finished(data): + self.gotFile = (data == fileData) + fileBuffer = msn.StringBuffer(finished) + fileReceive = self.client1.fileReceive + self.failUnless((fileReceive.filename == "myfile.txt" and fileReceive.filesize == len(fileData)), "Filename or length wrong.") + fileReceive.accept(fileBuffer) + + # Lets transfer!! + self._loop(10) + + self.failUnless((self.gotFile), "Failed to transfer file") + + def testFilesHappyChunkedPath(self): + fileData = self._generateData() + self.gotFile = False + + # Send the file (client2->client1) + msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1) + fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData)) + def accepted((yes,)): + if yes: + fileSend.write(fileData[:len(fileData)/2]) + fileSend.write(fileData[len(fileData)/2:]) + fileSend.close() + else: + raise "TransferDeclined" + def failed(): + raise "TransferError" + d.addCallback(accepted) + d.addErrback(failed) + + # Let the request get pushed to client1 + self._loop(10) + + # Receive the file + def finished(data): + self.gotFile = (data == fileData) + fileBuffer = msn.StringBuffer(finished) + fileReceive = self.client1.fileReceive + self.failUnless((fileReceive.filename == "myfile.txt" and fileReceive.filesize == len(fileData)), "Filename or length wrong.") + fileReceive.accept(fileBuffer) + + # Lets transfer!! + self._loop(10) + + self.failUnless((self.gotFile), "Failed to transfer file") + + def testTwoFilesSequential(self): + self.testFilesHappyPath() + self.testFilesHappyPath() + + def testFilesDeclinePath(self): + fileData = self._generateData() + self.gotDecline = False + + # Send the file (client2->client1) + msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1) + fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData)) + def accepted((yes,)): + self.failUnless((not yes), "Failed to understand a decline.") + self.gotDecline = True + def failed(): + raise "TransferError" + d.addCallback(accepted) + d.addErrback(failed) + + # Let the request get pushed to client1 + self._loop(10) + + # Decline the file + fileReceive = self.client1.fileReceive + fileReceive.reject() + + # Let the decline get pushed to client2 + self._loop(10) + + self.failUnless((self.gotDecline), "Failed to understand a decline, ignored.") + ################ # MSNFTP tests # @@ -1019,6 +1131,7 @@ class SwitchboardP2PTests(unittest.TestCase): class FileTransferTestCase(unittest.TestCase): """ test FileSend against FileReceive """ + skip = "Not implemented" def setUp(self): self.input = StringIOWithoutClosing()