2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Tests for Constrained Network Server."""
16 import traffic_control
18 # The local interface to test on.
22 class PortAllocatorTest(unittest
.TestCase
):
23 """Unit tests for the Port Allocator class."""
25 # Expiration time for ports. In mock time.
29 # Mock out time.time() to accelerate port expiration testing.
30 self
._old
_time
= time
.time
31 self
._current
_time
= 0
32 time
.time
= lambda: self
._current
_time
34 # TODO(dalecurtis): Mock out actual calls to shadi's port setup.
35 self
._pa
= cns
.PortAllocator(cns
._DEFAULT
_CNS
_PORT
_RANGE
, self
._EXPIRY
_TIME
)
36 self
._MockTrafficControl
()
39 self
._pa
.Cleanup(all_ports
=True)
40 # Ensure ports are cleaned properly.
41 self
.assertEquals(self
._pa
._ports
, {})
42 time
.time
= self
._old
_time
43 self
._RestoreTrafficControl
()
45 def _MockTrafficControl(self
):
46 self
.old_CreateConstrainedPort
= traffic_control
.CreateConstrainedPort
47 self
.old_DeleteConstrainedPort
= traffic_control
.DeleteConstrainedPort
48 self
.old_TearDown
= traffic_control
.TearDown
50 traffic_control
.CreateConstrainedPort
= lambda config
: True
51 traffic_control
.DeleteConstrainedPort
= lambda config
: True
52 traffic_control
.TearDown
= lambda config
: True
54 def _RestoreTrafficControl(self
):
55 traffic_control
.CreateConstrainedPort
= self
.old_CreateConstrainedPort
56 traffic_control
.DeleteConstrainedPort
= self
.old_DeleteConstrainedPort
57 traffic_control
.TearDown
= self
.old_TearDown
59 def testPortAllocator(self
):
60 # Ensure Get() succeeds and returns the correct port.
61 self
.assertEquals(self
._pa
.Get('test'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0])
63 # Call again with the same key and make sure we get the same port.
64 self
.assertEquals(self
._pa
.Get('test'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0])
66 # Call with a different key and make sure we get a different port.
67 self
.assertEquals(self
._pa
.Get('test2'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 1)
69 # Update fake time so that ports should expire.
70 self
._current
_time
+= self
._EXPIRY
_TIME
+ 1
72 # Test to make sure cache is checked before expiring ports.
73 self
.assertEquals(self
._pa
.Get('test2'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 1)
75 # Update fake time so that ports should expire.
76 self
._current
_time
+= self
._EXPIRY
_TIME
+ 1
78 # Request a new port, old ports should be expired, so we should get the
79 # first port in the range. Make sure this is the only allocated port.
80 self
.assertEquals(self
._pa
.Get('test3'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0])
81 self
.assertEquals(self
._pa
._ports
.keys(), [cns
._DEFAULT
_CNS
_PORT
_RANGE
[0]])
83 def testPortAllocatorExpiresOnlyCorrectPorts(self
):
84 # Ensure Get() succeeds and returns the correct port.
85 self
.assertEquals(self
._pa
.Get('test'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0])
87 # Stagger port allocation and so we can ensure only ports older than the
88 # expiry time are actually expired.
89 self
._current
_time
+= self
._EXPIRY
_TIME
/ 2 + 1
91 # Call with a different key and make sure we get a different port.
92 self
.assertEquals(self
._pa
.Get('test2'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 1)
94 # After this sleep the port with key 'test' should expire on the next Get().
95 self
._current
_time
+= self
._EXPIRY
_TIME
/ 2 + 1
97 # Call with a different key and make sure we get the first port.
98 self
.assertEquals(self
._pa
.Get('test3'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0])
99 self
.assertEquals(set(self
._pa
._ports
.keys()), set([
100 cns
._DEFAULT
_CNS
_PORT
_RANGE
[0], cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 1]))
102 def testPortAllocatorNoExpiration(self
):
103 # Setup PortAllocator w/o port expiration.
104 self
._pa
= cns
.PortAllocator(cns
._DEFAULT
_CNS
_PORT
_RANGE
, 0)
106 # Ensure Get() succeeds and returns the correct port.
107 self
.assertEquals(self
._pa
.Get('test'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0])
109 # Update fake time to see if ports expire.
110 self
._current
_time
+= self
._EXPIRY
_TIME
112 # Send second Get() which would normally cause ports to expire. Ensure that
113 # the ports did not expire.
114 self
.assertEquals(self
._pa
.Get('test2'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 1)
115 self
.assertEquals(set(self
._pa
._ports
.keys()), set([
116 cns
._DEFAULT
_CNS
_PORT
_RANGE
[0], cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 1]))
118 def testPortAllocatorCleanMatchingIP(self
):
119 # Setup PortAllocator w/o port expiration.
120 self
._pa
= cns
.PortAllocator(cns
._DEFAULT
_CNS
_PORT
_RANGE
, 0)
122 # Ensure Get() succeeds and returns the correct port.
123 self
.assertEquals(self
._pa
.Get('ip1', t
=1), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0])
124 self
.assertEquals(self
._pa
.Get('ip1', t
=2),
125 cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 1)
126 self
.assertEquals(self
._pa
.Get('ip1', t
=3),
127 cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 2)
128 self
.assertEquals(self
._pa
.Get('ip2', t
=1),
129 cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 3)
131 self
._pa
.Cleanup(all_ports
=False, request_ip
='ip1')
133 self
.assertEquals(self
._pa
._ports
.keys(),
134 [cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 3])
135 self
.assertEquals(self
._pa
.Get('ip2'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0])
136 self
.assertEquals(self
._pa
.Get('ip1'), cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 1)
138 self
._pa
.Cleanup(all_ports
=False, request_ip
='ip2')
139 self
.assertEquals(self
._pa
._ports
.keys(),
140 [cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 1])
142 self
._pa
.Cleanup(all_ports
=False, request_ip
='abc')
143 self
.assertEquals(self
._pa
._ports
.keys(),
144 [cns
._DEFAULT
_CNS
_PORT
_RANGE
[0] + 1])
146 self
._pa
.Cleanup(all_ports
=False, request_ip
='ip1')
147 self
.assertEquals(self
._pa
._ports
.keys(), [])
150 class ConstrainedNetworkServerTest(unittest
.TestCase
):
151 """End to end tests for ConstrainedNetworkServer system.
153 These tests require root access and run the cherrypy server along with
154 tc/iptables commands.
157 # Amount of time to wait for the CNS to start up.
158 _SERVER_START_SLEEP_SECS
= 1
160 # Sample data used to verify file serving.
161 _TEST_DATA
= 'The quick brown fox jumps over the lazy dog'
163 # Server information.
164 _SERVER_URL
= ('http://127.0.0.1:%d/ServeConstrained?' %
165 cns
._DEFAULT
_SERVING
_PORT
)
167 # Setting for latency testing.
168 _LATENCY_TEST_SECS
= 1
170 def _StartServer(self
):
171 """Starts the CNS, returns pid."""
172 cmd
= ['python', 'cns.py', '--interface=%s' % _INTERFACE
]
173 process
= subprocess
.Popen(cmd
, stderr
=subprocess
.PIPE
)
175 # Wait for server to startup.
178 line
= process
.stderr
.readline()
179 if 'STARTED' in line
:
182 self
.fail('Failed to start CNS.')
186 self
._server
_pid
= self
._StartServer
()
188 # Create temp file for serving. Run after server start so if a failure
189 # during setUp() occurs we don't leave junk files around.
190 f
, self
._file
= tempfile
.mkstemp(dir=os
.getcwd())
191 os
.write(f
, self
._TEST
_DATA
)
194 # Strip cwd off so we have a proper relative path.
195 self
._relative
_fn
= self
._file
[len(os
.getcwd()) + 1:]
198 os
.unlink(self
._file
)
199 os
.kill(self
._server
_pid
, signal
.SIGTERM
)
201 def testServerServesFiles(self
):
204 f
= urllib2
.urlopen('%sf=%s' % (self
._SERVER
_URL
, self
._relative
_fn
))
206 # Verify file data is served correctly.
207 self
.assertEqual(self
._TEST
_DATA
, f
.read())
209 # For completeness ensure an unconstrained call takes less time than our
210 # artificial constraints checked in the tests below.
211 self
.assertTrue(time
.time() - now
< self
._LATENCY
_TEST
_SECS
)
213 def testServerLatencyConstraint(self
):
214 """Tests serving a file with a latency network constraint."""
215 # Abort if does not have root access.
216 self
.assertEqual(os
.geteuid(), 0, 'You need root access to run this test.')
219 base_url
= '%sf=%s' % (self
._SERVER
_URL
, self
._relative
_fn
)
220 url
= '%s&latency=%d' % (base_url
, self
._LATENCY
_TEST
_SECS
* 1000)
221 f
= urllib2
.urlopen(url
)
223 # Verify file data is served correctly.
224 self
.assertEqual(self
._TEST
_DATA
, f
.read())
226 # Verify the request took longer than the requested latency.
227 self
.assertTrue(time
.time() - now
> self
._LATENCY
_TEST
_SECS
)
229 # Verify the server properly redirected the URL.
230 self
.assertTrue(f
.geturl().startswith(base_url
.replace(
231 str(cns
._DEFAULT
_SERVING
_PORT
), str(cns
._DEFAULT
_CNS
_PORT
_RANGE
[0]))))
234 class ConstrainedNetworkServerUnitTests(unittest
.TestCase
):
235 """ConstrainedNetworkServer class unit tests."""
237 def testGetServerURL(self
):
238 """Test server URL is correct when using Cherrypy port."""
239 cns_obj
= cns
.ConstrainedNetworkServer(self
.DummyOptions(), None)
241 self
.assertEqual(cns_obj
._GetServerURL
('ab/xz.webm', port
=1234, t
=1),
242 'http://127.0.0.1:1234/ServeConstrained?f=ab/xz.webm&t=1')
244 def testGetServerURLWithLocalServer(self
):
245 """Test server URL is correct when using --local-server-port port."""
246 cns_obj
= cns
.ConstrainedNetworkServer(self
.DummyOptionsWithServer(), None)
248 self
.assertEqual(cns_obj
._GetServerURL
('ab/xz.webm', port
=1234, t
=1),
249 'http://127.0.0.1:1234/media/ab/xz.webm?t=1')
251 class DummyOptions(object):
254 cherrypy
.url
= lambda: 'http://127.0.0.1:9000/ServeConstrained'
255 local_server_port
= None
257 class DummyOptionsWithServer(object):
260 cherrypy
.url
= lambda: 'http://127.0.0.1:9000/ServeConstrained'
261 local_server_port
= 8080
264 if __name__
== '__main__':