]> code.delx.au - pymsnt/blobdiff - src/tlib/msn/test_msn.py
chmod +x PyMSNt.py
[pymsnt] / src / tlib / msn / test_msn.py
index 237a59407274aa47d8a0a5091ba4b0bc60d55e76..428e5fa0315f68fdcfdc7c15eb18d2d543ec1efc 100644 (file)
@@ -15,10 +15,9 @@ from twisted.python import failure
 from twisted.trial import unittest
 
 # System imports
-import StringIO, sys, urllib
+import StringIO, sys, urllib, random, struct
 
 import msn
-import msnft
 
 
 def printError(f):
@@ -146,18 +145,18 @@ class DummyNotificationClient(msn.NotificationClient):
     def gotProfile(self, message):
         self.state = 'PROFILE'
 
-    def gotContactStatus(self, code, userHandle, screenName):
+    def gotContactStatus(self, userHandle, code, screenName):
         if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
             c = self.factory.contacts.getContact(userHandle)
             if c.caps & msn.MSNContact.MSNC1 and c.msnobj:
                 self.state = 'INITSTATUS'
 
-    def contactStatusChanged(self, code, userHandle, screenName):
+    def contactStatusChanged(self, userHandle, code, screenName):
         if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
             self.state = 'NEWSTATUS'
 
     def contactAvatarChanged(self, userHandle, hash):
-        if userHandle == "foo@bar.com" and hash == "trC8SlFx2sWQxZMIBAWSEnXc8oQ=":
+        if userHandle == "foo@bar.com" and hash == "b6b0bc4a5171dac590c593080405921275dcf284":
             self.state = 'NEWAVATAR'
         elif self.state == 'NEWAVATAR' and hash == "":
             self.state = 'AVATARGONE'
@@ -505,7 +504,8 @@ class FakeNotificationClient(msn.NotificationClient):
             if listType & msn.FORWARD_LIST and \
                userGuid == "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" and \
                userHandle == "foo@bar.com" and \
-               screenName == "foo@bar.com":
+               screenName == "foo@bar.com" and \
+               self.factory.contacts.getContact(userHandle):
                 self.test = 'PASS'
             self.transport.loseConnection()
         d = self.addContact(msn.FORWARD_LIST, "foo@bar.com")
@@ -515,7 +515,8 @@ class FakeNotificationClient(msn.NotificationClient):
         def testcb((listType, userGuid, userHandle, screenName)):
             if listType & msn.ALLOW_LIST and \
                userHandle == "foo@bar.com" and \
-               not userGuid and not screenName:
+               not userGuid and not screenName and \
+               self.factory.contacts.getContact(userHandle):
                 self.test = 'PASS'
             self.transport.loseConnection()
         d = self.addContact(msn.ALLOW_LIST, "foo@bar.com")
@@ -540,9 +541,8 @@ class FakeNotificationClient(msn.NotificationClient):
         d.addCallback(testcb)
 
     def doScreenNameChange(self):
-        def testcb((screenName,)):
-            if screenName == "Some new name":
-                self.test = 'PASS'
+        def testcb(*args):
+            self.test = 'PASS'
             self.transport.loseConnection()
         d = self.changeScreenName("Some new name")
         d.addCallback(testcb)
@@ -701,7 +701,8 @@ class NotificationChallengeTests(unittest.TestCase):
         challenges = [('13038318816579321232', 'b01c13020e374d4fa20abfad6981b7a9'),
                       ('23055170411503520698', 'ae906c3f2946d25e7da1b08b0b247659'),
                       ('37819769320541083311', 'db79d37dadd9031bef996893321da480'),
-                      ('93662730714769834295', 'd619dfbb1414004d34d0628766636568')]
+                      ('93662730714769834295', 'd619dfbb1414004d34d0628766636568'),
+                      ('31154116582196216093', '95e96c4f8cfdba6f065c8869b5e984e9')]
         for challenge, response in challenges:
             self.server.doChallenge(challenge, response)
             self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
@@ -809,7 +810,7 @@ class DummySwitchboardClient(msn.SwitchboardClient):
         if userHandle == "friend@hotmail.com":
             self.state = 'USERLEFT'
 
-    def userTyping(self, message):
+    def gotContactTyping(self, message):
         if message.userHandle == 'foo@bar.com':
             self.state = 'USERTYPING'
 
@@ -965,6 +966,9 @@ class DummySwitchboardP2PClient(msn.SwitchboardClient):
         if message.message == "Test Message" and message.userHandle == "foo1@bar.com":
             self.status = "GOTMESSAGE"
 
+    def gotFileReceive(self, fileReceive):
+        self.fileReceive = fileReceive
+
 class SwitchboardP2PTests(unittest.TestCase):
     def setUp(self):
         self.server  = DummySwitchboardP2PServer()
@@ -982,17 +986,27 @@ class SwitchboardP2PTests(unittest.TestCase):
         self.loop1.disconnect()
         self.loop2.disconnect()
 
+    def _loop(self, steps=1):
+        for i in xrange(steps):
+            self.loop1.doSteps(1)
+            self.loop2.doSteps(1)
+
     def testMessage(self):
         self.client1.sendMessage(msn.MSNMessage(message='Test Message'))
-        self.loop1.doSteps(10)
-        self.loop2.doSteps(10)
+        self._loop()
         self.failUnless((self.client2.status == "GOTMESSAGE"), "Fake switchboard server not working.")
+
+    def _generateData(self):
+        data = ''
+        for i in xrange(3000):
+            data += struct.pack("<L", random.randint(0, sys.maxint))
+        return data
     
     def testAvatars(self):
         self.gotAvatar = False
 
         # Set up the avatar for client1
-        imageData = 'avatar image data' * 1000
+        imageData = self._generateData()
         self.client1.msnobj = msn.MSNObject()
         self.client1.msnobj.setData('foo1@bar.com', imageData)
         self.client1.msnobj.makeText()
@@ -1005,13 +1019,111 @@ class SwitchboardP2PTests(unittest.TestCase):
         d.addCallback(avatarCallback)
 
         # Let them do their thing
-        for i in xrange(100):
-            self.loop1.doSteps(1)
-            self.loop2.doSteps(1)
+        self._loop(10)
 
         # Check that client2 got the avatar
         self.failUnless((self.gotAvatar), "Failed to transfer avatar")
 
+    def testFilesHappyPath(self):
+        fileData = self._generateData()
+        self.gotFile = False
+
+        # Send the file (client2->client1)
+        msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1)
+        fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData))
+        def accepted((yes,)):
+            if yes:
+                fileSend.write(fileData)
+                fileSend.close()
+            else:
+                raise "TransferDeclined"
+        def failed():
+            raise "TransferError"
+        d.addCallback(accepted)
+        d.addErrback(failed)
+
+        # Let the request get pushed to client1
+        self._loop(10)
+
+        # Receive the file
+        def finished(data):
+            self.gotFile = (data == fileData)
+        fileBuffer = msn.StringBuffer(finished)
+        fileReceive = self.client1.fileReceive
+        self.failUnless((fileReceive.filename == "myfile.txt" and fileReceive.filesize == len(fileData)), "Filename or length wrong.")
+        fileReceive.accept(fileBuffer)
+
+        # Lets transfer!!
+        self._loop(10)
+
+        self.failUnless((self.gotFile), "Failed to transfer file")
+
+    def testFilesHappyChunkedPath(self):
+        fileData = self._generateData()
+        self.gotFile = False
+
+        # Send the file (client2->client1)
+        msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1)
+        fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData))
+        def accepted((yes,)):
+            if yes:
+                fileSend.write(fileData[:len(fileData)/2])
+                fileSend.write(fileData[len(fileData)/2:])
+                fileSend.close()
+            else:
+                raise "TransferDeclined"
+        def failed():
+            raise "TransferError"
+        d.addCallback(accepted)
+        d.addErrback(failed)
+
+        # Let the request get pushed to client1
+        self._loop(10)
+
+        # Receive the file
+        def finished(data):
+            self.gotFile = (data == fileData)
+        fileBuffer = msn.StringBuffer(finished)
+        fileReceive = self.client1.fileReceive
+        self.failUnless((fileReceive.filename == "myfile.txt" and fileReceive.filesize == len(fileData)), "Filename or length wrong.")
+        fileReceive.accept(fileBuffer)
+
+        # Lets transfer!!
+        self._loop(10)
+
+        self.failUnless((self.gotFile), "Failed to transfer file")
+
+    def testTwoFilesSequential(self):
+        self.testFilesHappyPath()
+        self.testFilesHappyPath()
+
+    def testFilesDeclinePath(self):
+        fileData = self._generateData()
+        self.gotDecline = False
+
+        # Send the file (client2->client1)
+        msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1)
+        fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData))
+        def accepted((yes,)):
+            self.failUnless((not yes), "Failed to understand a decline.")
+            self.gotDecline = True
+        def failed():
+            raise "TransferError"
+        d.addCallback(accepted)
+        d.addErrback(failed)
+
+        # Let the request get pushed to client1
+        self._loop(10)
+
+        # Decline the file
+        fileReceive = self.client1.fileReceive
+        fileReceive.reject()
+
+        # Let the decline get pushed to client2
+        self._loop(10)
+
+        self.failUnless((self.gotDecline), "Failed to understand a decline, ignored.")
+
 
 ################
 # MSNFTP tests #
@@ -1019,6 +1131,7 @@ class SwitchboardP2PTests(unittest.TestCase):
 
 class FileTransferTestCase(unittest.TestCase):
     """ test FileSend against FileReceive """
+    skip = "Not implemented"
 
     def setUp(self):
         self.input = StringIOWithoutClosing()