from twisted.trial import unittest
# System imports
-import StringIO, sys, urllib
+import StringIO, sys, urllib, random, struct
import msn
-import msnft
def printError(f):
def gotProfile(self, message):
self.state = 'PROFILE'
- def gotContactStatus(self, code, userHandle, screenName):
+ def gotContactStatus(self, userHandle, code, screenName):
if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
c = self.factory.contacts.getContact(userHandle)
if c.caps & msn.MSNContact.MSNC1 and c.msnobj:
self.state = 'INITSTATUS'
- def contactStatusChanged(self, code, userHandle, screenName):
+ def contactStatusChanged(self, userHandle, code, screenName):
if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
self.state = 'NEWSTATUS'
def contactAvatarChanged(self, userHandle, hash):
- if userHandle == "foo@bar.com" and hash == "trC8SlFx2sWQxZMIBAWSEnXc8oQ=":
+ if userHandle == "foo@bar.com" and hash == "b6b0bc4a5171dac590c593080405921275dcf284":
self.state = 'NEWAVATAR'
elif self.state == 'NEWAVATAR' and hash == "":
self.state = 'AVATARGONE'
if listType & msn.FORWARD_LIST and \
userGuid == "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" and \
userHandle == "foo@bar.com" and \
- screenName == "foo@bar.com":
+ screenName == "foo@bar.com" and \
+ self.factory.contacts.getContact(userHandle):
self.test = 'PASS'
self.transport.loseConnection()
d = self.addContact(msn.FORWARD_LIST, "foo@bar.com")
def testcb((listType, userGuid, userHandle, screenName)):
if listType & msn.ALLOW_LIST and \
userHandle == "foo@bar.com" and \
- not userGuid and not screenName:
+ not userGuid and not screenName and \
+ self.factory.contacts.getContact(userHandle):
self.test = 'PASS'
self.transport.loseConnection()
d = self.addContact(msn.ALLOW_LIST, "foo@bar.com")
d.addCallback(testcb)
def doScreenNameChange(self):
- def testcb((screenName,)):
- if screenName == "Some new name":
- self.test = 'PASS'
+ def testcb(*args):
+ self.test = 'PASS'
self.transport.loseConnection()
d = self.changeScreenName("Some new name")
d.addCallback(testcb)
challenges = [('13038318816579321232', 'b01c13020e374d4fa20abfad6981b7a9'),
('23055170411503520698', 'ae906c3f2946d25e7da1b08b0b247659'),
('37819769320541083311', 'db79d37dadd9031bef996893321da480'),
- ('93662730714769834295', 'd619dfbb1414004d34d0628766636568')]
+ ('93662730714769834295', 'd619dfbb1414004d34d0628766636568'),
+ ('31154116582196216093', '95e96c4f8cfdba6f065c8869b5e984e9')]
for challenge, response in challenges:
self.server.doChallenge(challenge, response)
self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
if userHandle == "friend@hotmail.com":
self.state = 'USERLEFT'
- def userTyping(self, message):
+ def gotContactTyping(self, message):
if message.userHandle == 'foo@bar.com':
self.state = 'USERTYPING'
if message.message == "Test Message" and message.userHandle == "foo1@bar.com":
self.status = "GOTMESSAGE"
+ def gotFileReceive(self, fileReceive):
+ self.fileReceive = fileReceive
+
class SwitchboardP2PTests(unittest.TestCase):
def setUp(self):
self.server = DummySwitchboardP2PServer()
self.loop1.disconnect()
self.loop2.disconnect()
+ def _loop(self, steps=1):
+ for i in xrange(steps):
+ self.loop1.doSteps(1)
+ self.loop2.doSteps(1)
+
def testMessage(self):
self.client1.sendMessage(msn.MSNMessage(message='Test Message'))
- self.loop1.doSteps(10)
- self.loop2.doSteps(10)
+ self._loop()
self.failUnless((self.client2.status == "GOTMESSAGE"), "Fake switchboard server not working.")
+
+ def _generateData(self):
+ data = ''
+ for i in xrange(3000):
+ data += struct.pack("<L", random.randint(0, sys.maxint))
+ return data
def testAvatars(self):
self.gotAvatar = False
# Set up the avatar for client1
- imageData = 'avatar image data' * 1000
+ imageData = self._generateData()
self.client1.msnobj = msn.MSNObject()
self.client1.msnobj.setData('foo1@bar.com', imageData)
self.client1.msnobj.makeText()
d.addCallback(avatarCallback)
# Let them do their thing
- for i in xrange(100):
- self.loop1.doSteps(1)
- self.loop2.doSteps(1)
+ self._loop(10)
# Check that client2 got the avatar
self.failUnless((self.gotAvatar), "Failed to transfer avatar")
+ def testFilesHappyPath(self):
+ fileData = self._generateData()
+ self.gotFile = False
+
+ # Send the file (client2->client1)
+ msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1)
+ fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData))
+ def accepted((yes,)):
+ if yes:
+ fileSend.write(fileData)
+ fileSend.close()
+ else:
+ raise "TransferDeclined"
+ def failed():
+ raise "TransferError"
+ d.addCallback(accepted)
+ d.addErrback(failed)
+
+ # Let the request get pushed to client1
+ self._loop(10)
+
+ # Receive the file
+ def finished(data):
+ self.gotFile = (data == fileData)
+ fileBuffer = msn.StringBuffer(finished)
+ fileReceive = self.client1.fileReceive
+ self.failUnless((fileReceive.filename == "myfile.txt" and fileReceive.filesize == len(fileData)), "Filename or length wrong.")
+ fileReceive.accept(fileBuffer)
+
+ # Lets transfer!!
+ self._loop(10)
+
+ self.failUnless((self.gotFile), "Failed to transfer file")
+
+ def testFilesHappyChunkedPath(self):
+ fileData = self._generateData()
+ self.gotFile = False
+
+ # Send the file (client2->client1)
+ msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1)
+ fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData))
+ def accepted((yes,)):
+ if yes:
+ fileSend.write(fileData[:len(fileData)/2])
+ fileSend.write(fileData[len(fileData)/2:])
+ fileSend.close()
+ else:
+ raise "TransferDeclined"
+ def failed():
+ raise "TransferError"
+ d.addCallback(accepted)
+ d.addErrback(failed)
+
+ # Let the request get pushed to client1
+ self._loop(10)
+
+ # Receive the file
+ def finished(data):
+ self.gotFile = (data == fileData)
+ fileBuffer = msn.StringBuffer(finished)
+ fileReceive = self.client1.fileReceive
+ self.failUnless((fileReceive.filename == "myfile.txt" and fileReceive.filesize == len(fileData)), "Filename or length wrong.")
+ fileReceive.accept(fileBuffer)
+
+ # Lets transfer!!
+ self._loop(10)
+
+ self.failUnless((self.gotFile), "Failed to transfer file")
+
+ def testTwoFilesSequential(self):
+ self.testFilesHappyPath()
+ self.testFilesHappyPath()
+
+ def testFilesDeclinePath(self):
+ fileData = self._generateData()
+ self.gotDecline = False
+
+ # Send the file (client2->client1)
+ msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1)
+ fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData))
+ def accepted((yes,)):
+ self.failUnless((not yes), "Failed to understand a decline.")
+ self.gotDecline = True
+ def failed():
+ raise "TransferError"
+ d.addCallback(accepted)
+ d.addErrback(failed)
+
+ # Let the request get pushed to client1
+ self._loop(10)
+
+ # Decline the file
+ fileReceive = self.client1.fileReceive
+ fileReceive.reject()
+
+ # Let the decline get pushed to client2
+ self._loop(10)
+
+ self.failUnless((self.gotDecline), "Failed to understand a decline, ignored.")
+
################
# MSNFTP tests #
class FileTransferTestCase(unittest.TestCase):
""" test FileSend against FileReceive """
+ skip = "Not implemented"
def setUp(self):
self.input = StringIOWithoutClosing()