]>
code.delx.au - proxy/blob - test_proxy.py
14 return s
.getsockname()[1]
16 SOCKS_PORT
= get_free_port()
17 ECHO_PORT
= get_free_port()
18 ECHO_PORT_B
= struct
.pack(">H", ECHO_PORT
)
21 class SocketHelper(object):
22 def init_socket(self
):
23 self
.sock
= socket
.socket(socket
.AF_INET
)
24 self
.sock
.connect(("localhost", SOCKS_PORT
))
26 def init_ipv6_socket(self
):
27 self
.sock
= socket
.socket(socket
.AF_INET6
)
28 self
.sock
.connect(("localhost", SOCKS_PORT
))
30 def destroy_socket(self
):
34 l
= self
.sock
.send(msg
)
35 self
.assertEqual(len(msg
), l
)
37 def send_proxy_length(self
, x
):
38 self
.send(("%s" % x
).zfill(10).encode("ascii") + b
"\n")
40 def recv(self
, expected_length
):
41 result
= self
.sock
.recv(16384)
42 self
.assertEqual(expected_length
, len(result
), str(result
))
47 result
= self
.sock
.recv(1)
48 self
.assertEqual(0, len(result
), str(result
))
49 except ConnectionResetError
:
52 def assertAuthSuccess(self
):
54 self
.assertEqual(b
"\x05\x00", result
)
56 def assertAuthFail(self
):
58 self
.assertEqual(b
"\x05\xff", result
)
60 def assertRequestSuccess(self
):
61 self
.assertRequestResponse(0)
63 def assertRequestFail(self
, reply
):
64 self
.assertRequestResponse(reply
)
67 def assertRequestResponse(self
, reply
):
68 reply
= struct
.pack(">B", reply
)
69 expected
= b
"\x05" + reply
+ b
"\x00\x01\x00\x00\x00\x00\x00\x00"
70 result
= self
.recv(10)
71 self
.assertEqual(expected
, result
)
73 class TestAuthNegotiation(SocketHelper
, unittest
.TestCase
):
74 def run(self
, result
=None):
76 unittest
.TestCase
.run(self
, result
)
84 def test_one_method_success(self
):
85 self
.send(b
"\x05\x01\x00")
86 self
.assertAuthSuccess()
88 def test_two_methods_success_first(self
):
89 self
.send(b
"\x05\x02\x00\x80")
90 self
.assertAuthSuccess()
92 def test_two_methods_success_second(self
):
93 self
.send(b
"\x05\x02\x80\x00")
94 self
.assertAuthSuccess()
96 def test_no_methods_fail(self
):
97 self
.send(b
"\x05\x00")
100 def test_no_matching_methods_fail(self
):
101 self
.send(b
"\x05\x01\x80")
102 self
.assertAuthFail()
104 def test_invalid_version_fail(self
):
105 self
.send(b
"\x04\x01\x00")
109 class TestRequestNegotiation(SocketHelper
, unittest
.TestCase
):
110 def run(self
, result
=None):
112 unittest
.TestCase
.run(self
, result
)
116 self
.send(b
"\x05\x01\x00")
117 self
.assertAuthSuccess()
120 self
.destroy_socket()
122 def test_invalid_version(self
):
123 self
.send(b
"\x04\x01\x00\x01\x7f\x00\x00\x01\x00\01")
124 self
.assertRequestFail(1)
126 def test_invalid_command(self
):
127 self
.send(b
"\x05\x02\x00\x01\x7f\x00\x00\x01\x00\x01")
128 self
.assertRequestFail(7)
130 def test_invalid_reserved_section(self
):
131 self
.send(b
"\x05\x01\x01\x01\x7f\x00\x00\x01\x00\x01")
132 self
.assertRequestFail(1)
134 def test_invalid_address_type(self
):
135 self
.send(b
"\x05\x01\x00\x09\x7f\x00\x00\x01\x00\x01")
136 self
.assertRequestFail(8)
138 def test_ipv4_success(self
):
139 self
.send(b
"\x05\x01\x00\x01\x7f\x00\x00\x01" + ECHO_PORT_B
)
140 self
.assertRequestSuccess()
142 def test_ipv4_bad_port(self
):
143 self
.send(b
"\x05\x01\x00\x01\x7f\x00\x00\x01\xff\xff")
144 self
.assertRequestFail(4)
146 def test_ipv4_bad_host(self
):
147 self
.send(b
"\x05\x01\x00\x01\x7f\x00\x00\x00\xff\xff")
148 self
.assertRequestFail(4)
150 def test_dns_success(self
):
151 self
.send(b
"\x05\x01\x00\x03\x09localhost" + ECHO_PORT_B
)
152 self
.assertRequestSuccess()
154 def test_dns_remote_success(self
):
155 self
.send(b
"\x05\x01\x00\x03\x0bexample.com\x00P")
156 self
.assertRequestSuccess()
158 def test_dns_bad_port(self
):
159 self
.send(b
"\x05\x01\x00\x03\x09localhost\xff\xff")
160 self
.assertRequestFail(4)
162 def test_dns_bad_host(self
):
163 self
.send(b
"\x05\x01\x00\x03\x09f.invalid" + ECHO_PORT_B
)
164 self
.assertRequestFail(4)
166 def test_dns_invalid_host(self
):
167 self
.send(b
"\x05\x01\x00\x03\x00" + ECHO_PORT_B
)
170 def test_ipv6_success(self
):
171 self
.send(b
"\x05\x01\x00\x04" + (b
"\x00"*15) + b
"\x01" + ECHO_PORT_B
)
172 self
.assertRequestSuccess()
174 def test_ipv6_bad_port(self
):
175 self
.send(b
"\x05\x01\x00\x04" + (b
"\x00"*15) + b
"\x01" + b
"\xff\xff")
176 self
.assertRequestFail(4)
178 def test_ipv6_bad_host(self
):
179 self
.send(b
"\x05\x01\x00\x04" + b
"\xfe\x80" + (b
"\x00"*13) + b
"\x01" + ECHO_PORT_B
)
180 self
.assertRequestFail(4)
183 class ProxyPacketHelper(object):
184 def test_one_packet(self
):
185 self
.send_proxy_length(3)
187 result
= self
.recv(3)
188 self
.assertEqual(b
"foo", result
)
191 def test_no_received_data(self
):
192 self
.send_proxy_length(0)
196 def test_two_packets(self
):
197 self
.send_proxy_length(6)
200 result
= self
.recv(3)
201 self
.assertEqual(b
"foo", result
)
204 result
= self
.recv(3)
205 self
.assertEqual(b
"bar", result
)
209 def test_large_packet(self
):
211 self
.send_proxy_length(len(msg
))
215 part
= self
.sock
.recv(4)
216 self
.assertEqual(b
"1234", part
)
221 class TestIPv4Proxy(SocketHelper
, ProxyPacketHelper
, unittest
.TestCase
):
222 def run(self
, result
=None):
224 unittest
.TestCase
.run(self
, result
)
229 self
.send(b
"\x05\x01\x00")
230 self
.assertAuthSuccess()
232 self
.send(b
"\x05\x01\x00\x01\x7f\x00\x00\x01" + ECHO_PORT_B
)
233 self
.assertRequestSuccess()
236 self
.destroy_socket()
238 class TestDNSProxy(SocketHelper
, ProxyPacketHelper
, unittest
.TestCase
):
239 def run(self
, result
=None):
241 unittest
.TestCase
.run(self
, result
)
246 self
.send(b
"\x05\x01\x00")
247 self
.assertAuthSuccess()
249 self
.send(b
"\x05\x01\x00\x03\x09localhost" + ECHO_PORT_B
)
250 self
.assertRequestSuccess()
253 self
.destroy_socket()
255 class TestIPv6Proxy(SocketHelper
, ProxyPacketHelper
, unittest
.TestCase
):
256 def run(self
, result
=None):
258 unittest
.TestCase
.run(self
, result
)
263 self
.send(b
"\x05\x01\x00")
264 self
.assertAuthSuccess()
266 self
.send(b
"\x05\x01\x00\x04" + (b
"\x00"*15) + b
"\x01" + ECHO_PORT_B
)
267 self
.assertRequestSuccess()
270 self
.destroy_socket()
272 class TestPermissions(SocketHelper
, unittest
.TestCase
):
273 def assert_connection_allowed(self
):
276 self
.send(b
"\x05\x01\x00")
277 self
.assertAuthSuccess()
279 self
.destroy_socket()
281 def assert_ipv6_connection_allowed(self
):
283 self
.init_ipv6_socket()
284 self
.send(b
"\x05\x01\x00")
285 self
.assertAuthSuccess()
287 self
.destroy_socket()
289 def assert_connection_blocked(self
):
292 self
.send(b
"\x05\x01\x00")
294 except ConnectionResetError
:
297 self
.destroy_socket()
299 def test_allow_all_connections(self
):
300 with
SocksServer({"ALLOW_ALL": "1"}):
301 self
.assert_connection_allowed()
303 def test_block_all_connections(self
):
304 with
SocksServer({}):
305 self
.assert_connection_blocked()
307 def test_allow_ipv4_host(self
):
308 with
SocksServer({"ALLOW_HOST1": "127.0.0.1"}):
309 self
.assert_connection_allowed()
311 def test_allow_ipv6_host(self
):
312 with
SocksServer({"ALLOW_HOST1": "::1"}):
313 self
.assert_ipv6_connection_allowed()
315 def test_allow_multiple_hosts(self
):
316 with
SocksServer({"ALLOW_HOST1": "foo.invalid", "ALLOW_HOST2": "localhost"}):
317 self
.assert_connection_allowed()
320 class EchoServer(object):
324 self
.sock
= socket
.socket(socket
.AF_INET6
, socket
.SOCK_STREAM
)
325 self
.sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
326 self
.sock
.bind(("", ECHO_PORT
))
329 self
.thread
= threading
.Thread(target
=self
.run_echo_server
)
334 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
336 self
.sock
.shutdown(socket
.SHUT_RDWR
)
339 def run_echo_server(self
):
342 client
, addr
= self
.sock
.accept()
344 self
.handle_echo_client(client
)
351 def handle_echo_client(self
, client
):
352 line
= client
.recv(10+1)
355 num_bytes
= int(line
)
358 # force the test app to handle many packets by using small ones
359 data
= client
.recv(16)
362 num_bytes
-= len(data
)
364 l
= client
.send(data
)
367 class SocksServer(object):
368 def __init__(self
, extra_env
={"ALLOW_ALL": "1"}):
370 self
.env
["LISTEN_PORT"] = str(SOCKS_PORT
)
371 self
.env
.update(extra_env
)
372 self
.log_output
= open("out.log", "a")
375 self
.process
= subprocess
.Popen(
376 args
=["./socks5server"],
377 stdout
=self
.log_output
,
378 stderr
=self
.log_output
,
386 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
387 self
.log_output
.close()
388 self
.process
.terminate()
391 def wait_for_port(self
):
392 start_time
= time
.time()
393 with socket
.socket(socket
.AF_INET
) as s
:
394 while start_time
+ 10 > time
.time():
396 s
.connect(("localhost", SOCKS_PORT
))
398 except ConnectionRefusedError
:
402 if __name__
== "__main__":