from twisted.protocols import loopback
from twisted.protocols.basic import LineReceiver
from twisted.internet.defer import Deferred
-from twisted.internet import reactor
+from twisted.internet import reactor, main
+from twisted.python import failure
from twisted.trial import unittest
# System imports
-import StringIO, sys, urllib
+import StringIO, sys, urllib, random, struct
import msn
def close(self): pass
def loseConnection(self): pass
+class LoopbackCon:
+ def __init__(self, con1, con2):
+ self.con1 = con1
+ self.con2 = con2
+ self.con1ToCon2 = loopback.LoopbackRelay(con1)
+ self.con2ToCon1 = loopback.LoopbackRelay(con2)
+ con2.makeConnection(self.con1ToCon2)
+ con1.makeConnection(self.con2ToCon1)
+ self.connected = True
+
+ def doSteps(self, steps=1):
+ """ Returns true if the connection finished """
+ count = 0
+ while count < steps:
+ reactor.iterate(0.01)
+ self.con1ToCon2.clearBuffer()
+ self.con2ToCon1.clearBuffer()
+ if self.con1ToCon2.shouldLose:
+ self.con1ToCon2.clearBuffer()
+ count = -1
+ break
+ elif self.con2ToCon1.shouldLose:
+ count = -1
+ break
+ else:
+ count += 1
+ if count == -1:
+ self.disconnect()
+ return True
+ return False
+
+ def disconnect(self):
+ if self.connected:
+ self.con1.connectionLost(failure.Failure(main.CONNECTION_DONE))
+ self.con2.connectionLost(failure.Failure(main.CONNECTION_DONE))
+ reactor.iterate()
+
+
+
##################
# Passport tests #
def gotProfile(self, message):
self.state = 'PROFILE'
- def gotContactStatus(self, code, userHandle, screenName):
+ def gotContactStatus(self, userHandle, code, screenName):
if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
c = self.factory.contacts.getContact(userHandle)
if c.caps & msn.MSNContact.MSNC1 and c.msnobj:
self.state = 'INITSTATUS'
- def contactStatusChanged(self, code, userHandle, screenName):
+ def contactStatusChanged(self, userHandle, code, screenName):
if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
self.state = 'NEWSTATUS'
def contactAvatarChanged(self, userHandle, hash):
- if userHandle == "foo@bar.com" and hash == "trC8SlFx2sWQxZMIBAWSEnXc8oQ=":
+ if userHandle == "foo@bar.com" and hash == "b6b0bc4a5171dac590c593080405921275dcf284":
self.state = 'NEWAVATAR'
elif self.state == 'NEWAVATAR' and hash == "":
self.state = 'AVATARGONE'
contact = msn.MSNContact(userHandle='foo@foo.com')
contact.addToList(msn.REVERSE_LIST)
self.client.factory.contacts.addContact(contact)
- self.client.lineReceived("REM 0 RL 100 foo@foo.com")
+ self.client.lineReceived("REM 0 RL foo@foo.com")
self.failUnless(self.client.state == 'USERREMOVEDME', "Failed to remove user from reverse list")
def testUserAddedMe(self):
def handle_BLP(self, params):
self.sendLine("BLP %s %s 100" % (params[0], params[1]))
+ def handle_ADC(self, params):
+ trid = params[0]
+ list = msn.listCodeToID[params[1].lower()]
+ if list == msn.FORWARD_LIST:
+ userHandle = ""
+ screenName = ""
+ userGuid = ""
+ groups = ""
+ for p in params[2:]:
+ if p[0] == 'N':
+ userHandle = p[2:]
+ elif p[0] == 'F':
+ screenName = p[2:]
+ elif p[0] == 'C':
+ userGuid = p[2:]
+ else:
+ groups = p
+ if userHandle and userGuid:
+ self.transport.loseConnection()
+ return
+ if userHandle:
+ if not screenName:
+ self.transport.loseConnection()
+ else:
+ self.sendLine("ADC %s FL N=%s F=%s C=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx %s" % (trid, userHandle, screenName, groups))
+ return
+ if userGuid:
+ raise "NotImplementedError"
+ else:
+ if len(params) != 3:
+ self.transport.loseConnection()
+ if not params[2].startswith("N=") and params[2].count('@') == 1:
+ self.transport.loseConnection()
+ self.sendLine("ADC %s %s %s" % (params[0], params[1], params[2]))
+
+ def handle_REM(self, params):
+ if len(params) != 3:
+ self.transport.loseConnection()
+ return
+ try:
+ trid = int(params[0])
+ listType = msn.listCodeToID[params[1].lower()]
+ except:
+ self.transport.loseConnection()
+ if listType == msn.FORWARD_LIST and params[2].count('@') > 0:
+ self.transport.loseConnection()
+ elif listType != msn.FORWARD_LIST and params[2].count('@') != 1:
+ self.transport.loseConnection()
+ else:
+ self.sendLine("REM %s %s %s" % (params[0], params[1], params[2]))
+
+ def handle_PRP(self, params):
+ if len(params) != 3:
+ self.transport.loseConnection()
+ if params[1] == "MFN":
+ self.sendLine("PRP %s MFN %s" % (params[0], params[2]))
+ else:
+ # Only friendly names are implemented
+ self.transport.loseConnection()
+
+ def handle_UUX(self, params):
+ if len(params) != 2:
+ self.transport.loseConnection()
+ return
+ l = int(params[1])
+ if l > 0:
+ self.currentMessage = msn.MSNMessage(length=l, userHandle=params[0], screenName="UUX", specialMessage=True)
+ self.setRawMode()
+ else:
+ self.sendLine("UUX %s 0" % params[0])
+
+ def checkMessage(self, message):
+ if message.specialMessage:
+ if message.screenName == "UUX":
+ self.sendLine("UUX %s 0" % message.userHandle)
+ return 0
+ return 1
+
+ def handle_XFR(self, params):
+ if len(params) != 2:
+ self.transport.loseConnection()
+ return
+ if params[1] != "SB":
+ self.transport.loseConnection()
+ return
+ self.sendLine("XFR %s SB 129.129.129.129:1234 CKI SomeSecret" % params[0])
+
+
+
class FakeNotificationClient(msn.NotificationClient):
def doStatusChange(self):
def testcb((status,)):
if status == msn.STATUS_AWAY:
self.test = 'PASS'
- self.transport.loseConnection()
+ self.transport.loseConnection()
d = self.changeStatus(msn.STATUS_AWAY)
d.addCallback(testcb)
def testcb((priv,)):
if priv.upper() == 'AL':
self.test = 'PASS'
- self.transport.loseConnection()
+ self.transport.loseConnection()
d = self.setPrivacyMode(True)
d.addCallback(testcb)
+ def doAddContactFL(self):
+ def testcb((listType, userGuid, userHandle, screenName)):
+ if listType & msn.FORWARD_LIST and \
+ userGuid == "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" and \
+ userHandle == "foo@bar.com" and \
+ screenName == "foo@bar.com" and \
+ self.factory.contacts.getContact(userHandle):
+ self.test = 'PASS'
+ self.transport.loseConnection()
+ d = self.addContact(msn.FORWARD_LIST, "foo@bar.com")
+ d.addCallback(testcb)
+
+ def doAddContactAL(self):
+ def testcb((listType, userGuid, userHandle, screenName)):
+ if listType & msn.ALLOW_LIST and \
+ userHandle == "foo@bar.com" and \
+ not userGuid and not screenName and \
+ self.factory.contacts.getContact(userHandle):
+ self.test = 'PASS'
+ self.transport.loseConnection()
+ d = self.addContact(msn.ALLOW_LIST, "foo@bar.com")
+ d.addCallback(testcb)
+
+ def doRemContactFL(self):
+ def testcb((listType, userHandle, groupID)):
+ if listType & msn.FORWARD_LIST and \
+ userHandle == "foo@bar.com":
+ self.test = 'PASS'
+ self.transport.loseConnection()
+ d = self.remContact(msn.FORWARD_LIST, "foo@bar.com")
+ d.addCallback(testcb)
+
+ def doRemContactAL(self):
+ def testcb((listType, userHandle, groupID)):
+ if listType & msn.ALLOW_LIST and \
+ userHandle == "foo@bar.com":
+ self.test = 'PASS'
+ self.transport.loseConnection()
+ d = self.remContact(msn.ALLOW_LIST, "foo@bar.com")
+ d.addCallback(testcb)
+
+ def doScreenNameChange(self):
+ def testcb(*args):
+ self.test = 'PASS'
+ self.transport.loseConnection()
+ d = self.changeScreenName("Some new name")
+ d.addCallback(testcb)
+
+ def doPersonalChange(self, personal):
+ def testcb((checkPersonal,)):
+ if checkPersonal == personal:
+ self.test = 'PASS'
+ self.transport.loseConnection()
+ d = self.changePersonalMessage(personal)
+ d.addCallback(testcb)
+
+ def doAvatarChange(self, data):
+ def testcb(ignored):
+ self.test = 'PASS'
+ self.transport.loseConnection()
+ d = self.changeAvatar(data, True)
+ d.addCallback(testcb)
+
+ def doRequestSwitchboard(self):
+ def testcb((host, port, key)):
+ if host == "129.129.129.129" and port == 1234 and key == "SomeSecret":
+ self.test = 'PASS'
+ self.transport.loseConnection()
+ d = self.requestSwitchboardServer()
+ d.addCallback(testcb)
+
class FakeServerNotificationTests(unittest.TestCase):
""" tests the NotificationClient against a fake server. """
def setUp(self):
self.client = FakeNotificationClient()
self.client.factory = msn.NotificationFactory()
+ self.client.test = 'FAIL'
self.server = FakeNotificationServer()
+ self.loop = LoopbackCon(self.client, self.server)
def tearDown(self):
- pass
+ self.loop.disconnect()
def testChangeStatus(self):
- reactor.callLater(0, self.client.doStatusChange)
- loopback.loopback(self.client, self.server)
+ self.client.doStatusChange()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to change status properly')
def testSetPrivacyMode(self):
self.client.factory.contacts = msn.MSNContactList()
- reactor.callLater(0, self.client.doPrivacyMode)
- loopback.loopback(self.client, self.server)
+ self.client.doPrivacyMode()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.client.test == 'PASS'), 'Failed to change privacy mode')
+ def testSyncList(self):
+ self.client.doSyncList()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.test == 'PASS'), 'Failed to synchronise list')
+ testSyncList.skip = "Will do after list versions."
+
+ def testAddContactFL(self):
+ self.client.factory.contacts = msn.MSNContactList()
+ self.client.doAddContactFL()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.test == 'PASS'), 'Failed to add contact to forward list')
+
+ def testAddContactAL(self):
+ self.client.factory.contacts = msn.MSNContactList()
+ self.client.doAddContactAL()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.test == 'PASS'), 'Failed to add contact to allow list')
+
+ def testRemContactFL(self):
+ self.client.factory.contacts = msn.MSNContactList()
+ self.client.factory.contacts.addContact(msn.MSNContact(userGuid="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", userHandle="foo@bar.com", screenName="Some guy", lists=msn.FORWARD_LIST))
+ self.client.doRemContactFL()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.test == 'PASS'), 'Failed to remove contact from forward list')
+
+ def testRemContactAL(self):
+ self.client.factory.contacts = msn.MSNContactList()
+ self.client.factory.contacts.addContact(msn.MSNContact(userGuid="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", userHandle="foo@bar.com", screenName="Some guy", lists=msn.ALLOW_LIST))
+ self.client.doRemContactAL()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.test == 'PASS'), 'Failed to remove contact from allow list')
+
+ def testChangedScreenName(self):
+ self.client.doScreenNameChange()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.test == 'PASS'), 'Failed to change screen name properly')
+
+ def testChangePersonal1(self):
+ self.client.doPersonalChange("Some personal message")
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.test == 'PASS'), 'Failed to change personal message properly')
+
+ def testChangePersonal2(self):
+ self.client.doPersonalChange("")
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.test == 'PASS'), 'Failed to change personal message properly')
+
+ def testChangeAvatar(self):
+ self.client.doAvatarChange("DATADATADATADATA")
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.test == 'PASS'), 'Failed to change avatar properly')
+
+ def testRequestSwitchboard(self):
+ self.client.doRequestSwitchboard()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.test == 'PASS'), 'Failed to request switchboard')
+
+
#################################
-# Notification Challenges tests #
+# Notification challenges tests #
#################################
class DummyChallengeNotificationServer(msn.MSNEventBase):
def setUp(self):
self.client = DummyChallengeNotificationClient()
self.server = DummyChallengeNotificationServer()
+ self.loop = LoopbackCon(self.client, self.server)
def tearDown(self):
- pass
+ self.loop.disconnect()
def testChallenges(self):
challenges = [('13038318816579321232', 'b01c13020e374d4fa20abfad6981b7a9'),
('23055170411503520698', 'ae906c3f2946d25e7da1b08b0b247659'),
('37819769320541083311', 'db79d37dadd9031bef996893321da480'),
- ('93662730714769834295', 'd619dfbb1414004d34d0628766636568')]
+ ('93662730714769834295', 'd619dfbb1414004d34d0628766636568'),
+ ('31154116582196216093', '95e96c4f8cfdba6f065c8869b5e984e9')]
for challenge, response in challenges:
- reactor.callLater(0, lambda: self.server.doChallenge(challenge, response))
- loopback.loopback(self.client, self.server)
+ self.server.doChallenge(challenge, response)
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
self.failUnless((self.server.state == 'PASS'), 'Incorrect challenge response.')
self.server = DummyPingNotificationServer()
self.client.state = 'CONNECTED'
self.client.count = 0
+ self.loop = LoopbackCon(self.client, self.server)
def tearDown(self):
msn.PINGSPEED = 50.0
self.client.logOut()
+ self.loop.disconnect()
def testPingGood(self):
self.server.good = True
- loopback.loopback(self.client, self.server)
+ self.loop.doSteps(100)
self.failUnless((self.client.state == 'CONNECTED'), 'Should be connected.')
def testPingBad(self):
self.server.good = False
- loopback.loopback(self.client, self.server)
+ self.loop.doSteps(100)
self.failUnless((self.client.state == 'DISCONNECTED'), 'Should be disconnected.')
-#############################
-# Switchboard message tests #
-#############################
+###########################
+# Switchboard basic tests #
+###########################
+
+class DummySwitchboardServer(msn.MSNEventBase):
+ def handle_USR(self, params):
+ if len(params) != 3:
+ self.transport.loseConnection()
+ if params[1] == 'foo@bar.com' and params[2] == 'somekey':
+ self.sendLine("USR %s OK %s %s" % (params[0], params[1], params[1]))
+
+ def handle_ANS(self, params):
+ if len(params) != 4:
+ self.transport.loseConnection()
+ if params[1] == 'foo@bar.com' and params[2] == 'somekey' and params[3] == 'someSID':
+ self.sendLine("ANS %s OK" % params[0])
+
+ def handle_CAL(self, params):
+ if len(params) != 2:
+ self.transport.loseConnection()
+ if params[1] == 'friend@hotmail.com':
+ self.sendLine("CAL %s RINGING 1111122" % params[0])
+ else:
+ self.transport.loseConnection()
+
+ def checkMessage(self, message):
+ if message.message == 'Hi how are you today?':
+ self.sendLine("ACK " + message.userHandle) # Relies on TRID getting stored in userHandle trick
+ else:
+ self.transport.loseConnection()
+ return 0
class DummySwitchboardClient(msn.SwitchboardClient):
- def userTyping(self, message):
- self.state = 'TYPING'
+ def loggedIn(self):
+ self.state = 'LOGGEDIN'
+ self.transport.loseConnection()
+
+ def gotChattingUsers(self, users):
+ if users == {'fred@hotmail.com': 'fred', 'jack@email.com': 'jack has a nickname!'}:
+ self.state = 'GOTCHATTINGUSERS'
- def gotSendRequest(self, fileName, fileSize, cookie, message):
- if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
+ def userJoined(self, userHandle, screenName):
+ if userHandle == "friend@hotmail.com" and screenName == "friend nickname":
+ self.state = 'USERJOINED'
+ def userLeft(self, userHandle):
+ if userHandle == "friend@hotmail.com":
+ self.state = 'USERLEFT'
-class MessageHandlingTests(unittest.TestCase):
- """ testing various message handling methods from SwichboardClient """
- skip = "Not ready"
+ def gotContactTyping(self, message):
+ if message.userHandle == 'foo@bar.com':
+ self.state = 'USERTYPING'
+ def gotMessage(self, message):
+ if message.userHandle == 'friend@hotmail.com' and \
+ message.screenName == 'Friend Nickname' and \
+ message.message == 'Hello.':
+ self.state = 'GOTMESSAGE'
+
+ def doSendInvite(self):
+ def testcb((sid,)):
+ if sid == 1111122:
+ self.state = 'INVITESUCCESS'
+ self.transport.loseConnection()
+ d = self.inviteUser('friend@hotmail.com')
+ d.addCallback(testcb)
+
+ def doSendMessage(self):
+ def testcb(ignored):
+ self.state = 'MESSAGESUCCESS'
+ self.transport.loseConnection()
+ m = msn.MSNMessage()
+ m.setHeader("Content-Type", "text/plain; charset=UTF-8")
+ m.message = 'Hi how are you today?'
+ m.ack = msn.MSNMessage.MESSAGE_ACK
+ d = self.sendMessage(m)
+ d.addCallback(testcb)
+
+
+class SwitchboardBasicTests(unittest.TestCase):
+ """ Tests basic functionality of switchboard sessions """
def setUp(self):
self.client = DummySwitchboardClient()
self.client.state = 'START'
-
+ self.client.userHandle = 'foo@bar.com'
+ self.client.key = 'somekey'
+ self.client.sessionID = 'someSID'
+ self.server = DummySwitchboardServer()
+ self.loop = LoopbackCon(self.client, self.server)
+
def tearDown(self):
- self.client = None
+ self.loop.disconnect()
+
+ def _testSB(self, reply):
+ self.client.reply = reply
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.state == 'LOGGEDIN'), 'Failed to login with reply='+str(reply))
+
+ def testReply(self):
+ self._testSB(True)
+
+ def testAsync(self):
+ self._testSB(False)
+
+ def testChattingUsers(self):
+ lines = ["IRO 1 1 2 fred@hotmail.com fred",
+ "IRO 1 2 2 jack@email.com jack%20has%20a%20nickname%21"]
+ for line in lines:
+ self.client.lineReceived(line)
+ self.failUnless((self.client.state == 'GOTCHATTINGUSERS'), 'Failed to get chatting users')
+
+ def testUserJoined(self):
+ self.client.lineReceived("JOI friend@hotmail.com friend%20nickname")
+ self.failUnless((self.client.state == 'USERJOINED'), 'Failed to notice user joining')
+
+ def testUserLeft(self):
+ self.client.lineReceived("BYE friend@hotmail.com")
+ self.failUnless((self.client.state == 'USERLEFT'), 'Failed to notice user leaving')
def testTypingCheck(self):
- m = msn.MSNMessage()
- m.setHeader('Content-Type', 'text/x-msmsgscontrol')
- m.setHeader('TypingUser', 'foo@bar')
- self.client.checkMessage(m)
- self.failUnless((self.client.state == 'TYPING'), 'Failed to detect typing notification')
+ m = 'MSG foo@bar.com Foo 80\r\n'
+ m += 'MIME-Version: 1.0\r\n'
+ m += 'Content-Type: text/x-msmsgscontrol\r\n'
+ m += 'TypingUser: foo@bar\r\n'
+ m += '\r\n\r\n'
+ self.client.dataReceived(m)
+ self.failUnless((self.client.state == 'USERTYPING'), 'Failed to detect typing notification')
- def testFileInvitation(self, lazyClient=False):
- m = msn.MSNMessage()
- m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
- m.message += 'Application-Name: File Transfer\r\n'
- if not lazyClient:
- m.message += 'Application-GUID: {5D3E02AB-6190-11d3-BBBB-00C04F795683}\r\n'
- m.message += 'Invitation-Command: Invite\r\n'
- m.message += 'Invitation-Cookie: 1234\r\n'
- m.message += 'Application-File: foobar.ext\r\n'
- m.message += 'Application-FileSize: 31337\r\n\r\n'
- self.client.checkMessage(m)
- self.failUnless((self.client.state == 'INVITATION'), 'Failed to detect file transfer invitation')
-
- def testFileInvitationMissingGUID(self):
- return self.testFileInvitation(True)
-
- def testFileResponse(self):
- d = Deferred()
- d.addCallback(self.fileResponse)
- self.client.cookies['iCookies'][1234] = (d, None)
- m = msn.MSNMessage()
- m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
- m.message += 'Invitation-Command: ACCEPT\r\n'
- m.message += 'Invitation-Cookie: 1234\r\n\r\n'
- self.client.checkMessage(m)
- self.failUnless((self.client.state == 'RESPONSE'), 'Failed to detect file transfer response')
-
- def testFileInfo(self):
- d = Deferred()
- d.addCallback(self.fileInfo)
- self.client.cookies['external'][1234] = (d, None)
- m = msn.MSNMessage()
- m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
- m.message += 'Invitation-Command: ACCEPT\r\n'
- m.message += 'Invitation-Cookie: 1234\r\n'
- m.message += 'IP-Address: 192.168.0.1\r\n'
- m.message += 'Port: 6891\r\n'
- m.message += 'AuthCookie: 4321\r\n\r\n'
- self.client.checkMessage(m)
- self.failUnless((self.client.state == 'INFO'), 'Failed to detect file transfer info')
+ def testGotMessage(self):
+ m = 'MSG friend@hotmail.com Friend%20Nickname 68\r\n'
+ m += 'MIME-Version: 1.0\r\n'
+ m += 'Content-Type: text/plain; charset=UTF-8\r\n'
+ m += '\r\nHello.'
+ self.client.dataReceived(m)
+ self.failUnless((self.client.state == 'GOTMESSAGE'), 'Failed to detect message')
+
+ def testInviteUser(self):
+ self.client.connectionMade = lambda: None
+ self.client.doSendInvite()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.state == 'INVITESUCCESS'), 'Failed to invite user')
- def fileResponse(self, (accept, cookie, info)):
- if accept and cookie == 1234: self.client.state = 'RESPONSE'
+ def testSendMessage(self):
+ self.client.connectionMade = lambda: None
+ self.client.doSendMessage()
+ self.failUnless(self.loop.doSteps(10), 'Failed to disconnect')
+ self.failUnless((self.client.state == 'MESSAGESUCCESS'), 'Failed to send message')
- def fileInfo(self, (accept, ip, port, aCookie, info)):
- if accept and ip == '192.168.0.1' and port == 6891 and aCookie == 4321: self.client.state = 'INFO'
+################
+# MSNP2P tests #
+################
+
+class DummySwitchboardP2PServerHelper(msn.MSNEventBase):
+ def __init__(self, server):
+ msn.MSNEventBase.__init__(self)
+ self.server = server
+
+ def handle_USR(self, params):
+ if len(params) != 3:
+ self.transport.loseConnection()
+ self.userHandle = params[1]
+ if params[1] == 'foo1@bar.com' and params[2] == 'somekey1':
+ self.sendLine("USR %s OK %s %s" % (params[0], params[1], params[1]))
+ if params[1] == 'foo2@bar.com' and params[2] == 'somekey2':
+ self.sendLine("USR %s OK %s %s" % (params[0], params[1], params[1]))
+
+ def checkMessage(self, message):
+ return 1
+
+ def gotMessage(self, message):
+ message.userHandle = self.userHandle
+ message.screenName = self.userHandle
+ self.server.gotMessage(message, self)
+
+ def sendMessage(self, message):
+ if message.length == 0: message.length = message._calcMessageLen()
+ self.sendLine("MSG %s %s %s" % (message.userHandle, message.screenName, message.length))
+ self.sendLine('MIME-Version: %s' % message.getHeader('MIME-Version'))
+ self.sendLine('Content-Type: %s' % message.getHeader('Content-Type'))
+ for header in [h for h in message.headers.items() if h[0].lower() not in ('mime-version','content-type')]:
+ self.sendLine("%s: %s" % (header[0], header[1]))
+ self.transport.write("\r\n")
+ self.transport.write(message.message)
+
+
+class DummySwitchboardP2PServer:
+ def __init__(self):
+ self.clients = []
+
+ def newClient(self):
+ c = DummySwitchboardP2PServerHelper(self)
+ self.clients.append(c)
+ return c
+
+ def gotMessage(self, message, sender):
+ for c in self.clients:
+ if c != sender:
+ c.sendMessage(message)
+
+class DummySwitchboardP2PClient(msn.SwitchboardClient):
+ def gotMessage(self, message):
+ if message.message == "Test Message" and message.userHandle == "foo1@bar.com":
+ self.status = "GOTMESSAGE"
+
+ def gotFileReceive(self, fileReceive):
+ self.fileReceive = fileReceive
+
+class SwitchboardP2PTests(unittest.TestCase):
+ def setUp(self):
+ self.server = DummySwitchboardP2PServer()
+ self.client1 = DummySwitchboardP2PClient()
+ self.client1.key = 'somekey1'
+ self.client1.userHandle = 'foo1@bar.com'
+ self.client2 = DummySwitchboardP2PClient()
+ self.client2.key = 'somekey2'
+ self.client2.userHandle = 'foo2@bar.com'
+ self.client2.status = "INIT"
+ self.loop1 = LoopbackCon(self.client1, self.server.newClient())
+ self.loop2 = LoopbackCon(self.client2, self.server.newClient())
+
+ def tearDown(self):
+ self.loop1.disconnect()
+ self.loop2.disconnect()
+
+ def _loop(self, steps=1):
+ for i in xrange(steps):
+ self.loop1.doSteps(1)
+ self.loop2.doSteps(1)
+
+ def testMessage(self):
+ self.client1.sendMessage(msn.MSNMessage(message='Test Message'))
+ self._loop()
+ self.failUnless((self.client2.status == "GOTMESSAGE"), "Fake switchboard server not working.")
+
+ def _generateData(self):
+ data = ''
+ for i in xrange(3000):
+ data += struct.pack("<L", random.randint(0, sys.maxint))
+ return data
+
+ def testAvatars(self):
+ self.gotAvatar = False
+
+ # Set up the avatar for client1
+ imageData = self._generateData()
+ self.client1.msnobj = msn.MSNObject()
+ self.client1.msnobj.setData('foo1@bar.com', imageData)
+ self.client1.msnobj.makeText()
+
+ # Make client2 request the avatar
+ def avatarCallback((data,)):
+ self.gotAvatar = (data == imageData)
+ msnContact = msn.MSNContact(userHandle='foo1@bar.com', msnobj=self.client1.msnobj)
+ d = self.client2.sendAvatarRequest(msnContact)
+ d.addCallback(avatarCallback)
+
+ # Let them do their thing
+ self._loop(10)
+
+ # Check that client2 got the avatar
+ self.failUnless((self.gotAvatar), "Failed to transfer avatar")
+
+ def testFilesHappyPath(self):
+ fileData = self._generateData()
+ self.gotFile = False
+
+ # Send the file (client2->client1)
+ msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1)
+ fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData))
+ def accepted((yes,)):
+ if yes:
+ fileSend.write(fileData)
+ fileSend.close()
+ else:
+ raise "TransferDeclined"
+ def failed():
+ raise "TransferError"
+ d.addCallback(accepted)
+ d.addErrback(failed)
+
+ # Let the request get pushed to client1
+ self._loop(10)
+
+ # Receive the file
+ def finished(data):
+ self.gotFile = (data == fileData)
+ fileBuffer = msn.StringBuffer(finished)
+ fileReceive = self.client1.fileReceive
+ self.failUnless((fileReceive.filename == "myfile.txt" and fileReceive.filesize == len(fileData)), "Filename or length wrong.")
+ fileReceive.accept(fileBuffer)
+
+ # Lets transfer!!
+ self._loop(10)
+
+ self.failUnless((self.gotFile), "Failed to transfer file")
+
+ def testFilesHappyChunkedPath(self):
+ fileData = self._generateData()
+ self.gotFile = False
+
+ # Send the file (client2->client1)
+ msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1)
+ fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData))
+ def accepted((yes,)):
+ if yes:
+ fileSend.write(fileData[:len(fileData)/2])
+ fileSend.write(fileData[len(fileData)/2:])
+ fileSend.close()
+ else:
+ raise "TransferDeclined"
+ def failed():
+ raise "TransferError"
+ d.addCallback(accepted)
+ d.addErrback(failed)
+
+ # Let the request get pushed to client1
+ self._loop(10)
+
+ # Receive the file
+ def finished(data):
+ self.gotFile = (data == fileData)
+ fileBuffer = msn.StringBuffer(finished)
+ fileReceive = self.client1.fileReceive
+ self.failUnless((fileReceive.filename == "myfile.txt" and fileReceive.filesize == len(fileData)), "Filename or length wrong.")
+ fileReceive.accept(fileBuffer)
+
+ # Lets transfer!!
+ self._loop(10)
+
+ self.failUnless((self.gotFile), "Failed to transfer file")
+
+ def testTwoFilesSequential(self):
+ self.testFilesHappyPath()
+ self.testFilesHappyPath()
+
+ def testFilesDeclinePath(self):
+ fileData = self._generateData()
+ self.gotDecline = False
+
+ # Send the file (client2->client1)
+ msnContact = msn.MSNContact(userHandle='foo1@bar.com', caps=msn.MSNContact.MSNC1)
+ fileSend, d = self.client2.sendFile(msnContact, "myfile.txt", len(fileData))
+ def accepted((yes,)):
+ self.failUnless((not yes), "Failed to understand a decline.")
+ self.gotDecline = True
+ def failed():
+ raise "TransferError"
+ d.addCallback(accepted)
+ d.addErrback(failed)
+
+ # Let the request get pushed to client1
+ self._loop(10)
+
+ # Decline the file
+ fileReceive = self.client1.fileReceive
+ fileReceive.reject()
+
+ # Let the decline get pushed to client2
+ self._loop(10)
+
+ self.failUnless((self.gotDecline), "Failed to understand a decline, ignored.")
################
class FileTransferTestCase(unittest.TestCase):
""" test FileSend against FileReceive """
- skip = "Not ready"
+ skip = "Not implemented"
def setUp(self):
self.input = StringIOWithoutClosing()
def testFileTransfer(self):
auth = 1234
- sender = msn.FileSend(self.input)
+ sender = msnft.MSNFTP_FileSend(self.input)
sender.auth = auth
sender.fileSize = 7000
- client = msn.FileReceive(auth, "foo@bar.com", self.output)
+ client = msnft.MSNFTP_FileReceive(auth, "foo@bar.com", self.output)
client.fileSize = 7000
- loopback.loopback(sender, client)
+ loop = LoopbackCon(client, sender)
+ loop.doSteps(100)
self.failUnless((client.completed and sender.completed), "send failed to complete")
self.failUnless((self.input.getvalue() == self.output.getvalue()), "saved file does not match original")
+
+