]> code.delx.au - pymsnt/blob - src/tlib/msn/test_msnw.py
Turned off MSN protocol debugging by default.
[pymsnt] / src / tlib / msn / test_msnw.py
1 # Copyright 2005-2006 James Bunton <james@delx.cjb.net>
2 # Licensed for distribution under the GPL version 2, check COPYING for details
3
4 """
5 Test cases for msnw (MSN Wrapper)
6 """
7
8 # Settings
9 TIMEOUT = 30.0 # Connection timeout in seconds
10 LOGGING = True
11 TWOFILES = False
12 FTSENDTEST = False
13 FTRECEIVETEST = False
14 USER1 = "messengertest1@hotmail.com"
15 PASS1 = "hellohello"
16 USER2 = "messengertest2@hotmail.com"
17 PASS2 = "hellohello"
18
19
20
21 # Twisted imports
22 from twisted.internet.defer import Deferred
23 from twisted.internet import reactor, error
24 from twisted.trial import unittest
25 from twisted.python import log
26
27 # System imports
28 import sys, random
29
30 # Local imports
31 import msnw
32 import msn
33
34
35 if LOGGING:
36 log.startLogging(sys.stdout)
37
38
39 def clearAccount(msncon):
40 """ Clears the contact list of the given MSNConnection. Returns a
41 Deferred which fires when the task is complete.
42 """
43 d = Deferred()
44 count = 0
45 checkCount = [0]
46 def cb(ignored=None):
47 checkCount[0] += 1
48 if checkCount[0] == count:
49 d.callback(None)
50
51 for msnContact in msncon.getContacts().contacts.values():
52 for list in [msn.FORWARD_LIST, msn.BLOCK_LIST, msn.ALLOW_LIST, msn.PENDING_LIST]:
53 if msnContact.lists & list:
54 msncon.remContact(list, msnContact.userHandle).addCallback(cb)
55 count += 1
56
57 if count == 0:
58 reactor.callLater(0, d.callback, None)
59 return d
60
61
62 ####################
63 # Basic connection #
64 ####################
65
66 class MSNConnection(msnw.MSNConnection):
67 def __init__(self, username, password, ident, testCase):
68 msnw.MSNConnection.__init__(self, username, password, ident)
69 self.testCase = testCase
70 self.message = None
71 self.contactAdded = None
72
73 def listSynchronized(self):
74 # Now we're fully connected
75 self.testCase.done = "SYNCED"
76
77 def gotMessage(self, userHandle, text):
78 self.testCase.done = "GOTMESSAGE"
79 self.message = (userHandle, text)
80
81 def contactAddedMe(self, userHandle):
82 self.contactAdded = userHandle
83
84
85 class TestsUtil:
86 def setUp(self):
87 self.failure = None
88 self.timeout = None
89 self.done = False
90 self.user1 = None
91 self.user2 = None
92
93 def tearDown(self):
94 if self.user1:
95 self.user1.logOut()
96 reactor.iterate(0.1)
97 if self.user2:
98 self.user2.logOut()
99 reactor.iterate(0.1)
100
101 def doLogins(self, both=True):
102 # Connect two accounts
103 self.user1 = MSNConnection(USER1, PASS1, "user1", self)
104 self.loop("Logging in user1.", cond="SYNCED")
105 if both:
106 self.user2 = MSNConnection(USER2, PASS2, "user2", self)
107 self.loop("Logging in user2.", cond="SYNCED")
108
109 def doPurgeContacts(self, both=True):
110 # Purge both contact lists
111 clearAccount(self.user1).addCallback(self.cb)
112 self.loop("Purging user1 contact list.")
113 if both:
114 clearAccount(self.user2).addCallback(self.cb)
115 self.loop("Purging user2 contact list.")
116
117 def doAddContacts(self, both=True):
118 # Adding users to each other's lists
119 self.user1.addContact(msn.FORWARD_LIST, USER2).addCallback(self.cb)
120 self.loop("Adding user2 to user1's forward list.")
121 self.user1.addContact(msn.ALLOW_LIST, USER2).addCallback(self.cb)
122 self.loop("Adding user2 to user1's allow list.")
123 if both:
124 self.user2.addContact(msn.FORWARD_LIST, USER1).addCallback(self.cb)
125 self.loop("Adding user1 to user2's forward list.")
126 self.user2.addContact(msn.ALLOW_LIST, USER1).addCallback(self.cb)
127 self.loop("Adding user1 to user2's allow list.")
128
129 # Check the contacts have seen each other
130 reactor.iterate(0.5) # One last chance to notice each other
131 self.failUnless((self.user1.contactAdded == USER2 and self.user2.contactAdded == USER1), "Contacts can't see each other.")
132
133 def cb(self, ignored=None):
134 self.done = True
135
136 def loop(self, failMsg, cond=True, timeout=TIMEOUT):
137 # Loops with a timeout
138 self.done = False
139 self.timeout = reactor.callLater(timeout, self.failed, "Timeout: " + failMsg)
140 if cond == True:
141 while not self.done:
142 reactor.iterate(0.1)
143 else:
144 while self.done != cond:
145 reactor.iterate(0.1)
146 try:
147 self.timeout.cancel()
148 except (error.AlreadyCancelled, error.AlreadyCalled):
149 pass
150 if self.failure:
151 self.fail(self.failure)
152 if cond:
153 self.failUnless((self.done == cond), "Failed: " + failMsg)
154
155 def failed(self, why):
156 self.failure = why
157 self.done = True
158
159 # The tests!
160
161 class BasicTests(unittest.TestCase, TestsUtil):
162 def setUp(self):
163 TestsUtil.setUp(self)
164
165 def tearDown(self):
166 TestsUtil.tearDown(self)
167
168 def testReconnect(self):
169 self.user1 = MSNConnection(USER1, PASS1, "user1", self)
170 self.loop("Looping user1.", cond="SYNCED")
171 self.user1.notificationClient.transport.loseConnection()
172 self.done = False
173 self.loop("Looping user1.", cond="SYNCED")
174 testReconnect.skip = FTRECEIVETEST or FTSENDTEST
175
176 def testConnect(self):
177 self.doLogins(both=False)
178 testConnect.skip = FTRECEIVETEST or FTSENDTEST
179
180 def testPurgeContacts(self):
181 self.doLogins()
182 self.doPurgeContacts()
183 testPurgeContacts.skip = FTRECEIVETEST or FTSENDTEST
184
185 def testAddContacts(self):
186 self.doLogins()
187 self.doPurgeContacts()
188 self.doAddContacts()
189 testAddContacts.skip = FTRECEIVETEST or FTSENDTEST
190
191 def testMessageExchange(self):
192 self.doLogins()
193 self.doPurgeContacts()
194 self.doAddContacts()
195 self.user1.sendMessage(USER2, "Hi user2")
196 self.loop("Timeout exchanging message.", cond="GOTMESSAGE")
197 self.failUnless((self.user2.message == (USER1, "Hi user2")), "Failed to transfer message.")
198 testMessageExchange.skip = FTRECEIVETEST or FTSENDTEST
199
200 def testFileSend(self):
201 if raw_input("\n\nALERT!!!\n\nPlease connect to account %s and accept the file transfer from %s. When you have received the complete file, send a message back to the client to signal success.\nType ok when you are ready: " % (USER2, USER1)).lower() != "ok":
202 raise unittest.SkipTest("User didn't type 'ok'")
203
204 data = "Testing 123\r\n" * 5000
205 def accepted((yes,)):
206 if yes:
207 self.fileSend.write(data)
208 self.fileSend.close()
209 else:
210 self.fail("File was not accepted.")
211 def failed():
212 self.fail("Transfer failed in invitation.")
213 def gotFileSend((fileSend, d)):
214 self.fileSend = fileSend
215 d.addCallbacks(accepted, failed)
216
217 self.doLogins(both=False)
218 self.doPurgeContacts(both=False)
219 self.doAddContacts(both=False)
220 d = self.user1.sendFile(USER2, "myfile.txt", len(data))
221 d.addCallback(gotFileSend)
222 self.loop("Sending file.", cond="GOTMESSAGE", timeout=60*60)
223 global TWOFILES
224 if TWOFILES:
225 d = self.user1.sendFile(USER2, "myfile2.txt", len(data))
226 d.addCallback(gotFileSend)
227 self.loop("Sending file.", cond="GOTMESSAGE", timeout=60*60)
228 testFileSend.skip = not FTSENDTEST
229
230 def testFileReceive(self):
231 if raw_input("\n\nALERT!!!\n\nPlease connect to account %s and send a file transfer to %s.\nType ok when you are ready: " % (USER2, USER1)).lower() != "ok":
232 raise unittest.SkipTest("User didn't type 'ok'")
233
234 def fileFinished(data):
235 #filename = "/tmp/msn" + str(random.randint(1000, 9999)) + ".dat"
236 filename = "/tmp/MSNFILE_" + self.fileReceive.filename
237 f = open(filename, "w")
238 f.write(data)
239 f.close()
240 print "Got file!", filename
241 # Terminate the loop in a little, let them send the BYE before
242 # we drop the connection
243 def wait():
244 self.done = "GOTFILE"
245 reactor.callLater(5, wait)
246
247 def gotFileReceive(fileReceive):
248 buffer = msn.StringBuffer(fileFinished)
249 self.fileReceive = fileReceive
250 self.fileReceive.accept(buffer)
251
252 self.doLogins(both=False)
253 self.user1.gotFileReceive = gotFileReceive
254 self.doPurgeContacts(both=False)
255 self.doAddContacts(both=False)
256 self.loop("Receiving file.", cond="GOTFILE", timeout=60*60)
257 global TWOFILES
258 if TWOFILES:
259 self.loop("Receiving file.", cond="GOTFILE", timeout=60*60)
260 testFileReceive.skip = not FTRECEIVETEST
261