cc: Fix logic for detecting when raster tasks were throttled
[chromium-blink-merge.git] / media / tools / constrained_network_server / traffic_control_unittest.py
bloba6781e9dec28bc98442d592402f393081d48385b
1 #!/usr/bin/env python
3 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
7 """Tests for traffic control library."""
8 import unittest
10 import traffic_control
13 class TrafficControlUnitTests(unittest.TestCase):
14 """Unit tests for traffic control."""
16 # Stores commands called by the traffic control _Exec function.
17 commands = []
19 def _ExecMock(self, command, **kwargs):
20 """Mocks traffic_control._Exec and adds the command to commands list."""
21 cmd_list = [str(x) for x in command]
22 self.commands.append(' '.join(cmd_list))
23 return ''
25 def setUp(self):
26 """Resets the commands list and set the _Exec mock function."""
27 self.commands = []
28 self._old_Exec = traffic_control._Exec
29 traffic_control._Exec = self._ExecMock
31 def tearDown(self):
32 """Resets the _Exec mock function to the original."""
33 traffic_control._Exec = self._old_Exec
35 def testCreateConstrainedPort(self):
36 config = {
37 'interface': 'fakeeth',
38 'port': 12345,
39 'server_port': 8888,
40 'bandwidth': 256,
41 'latency': 100,
42 'loss': 2
44 traffic_control.CreateConstrainedPort(config)
45 expected = [
46 'sudo tc qdisc add dev fakeeth root handle 1: htb',
47 'sudo tc class add dev fakeeth parent 1: classid 1:3039 htb rate '
48 '256kbit ceil 256kbit',
49 'sudo tc qdisc add dev fakeeth parent 1:3039 handle 3039:0 netem loss '
50 '2% delay 100ms',
51 'sudo tc filter add dev fakeeth protocol ip parent 1: prio 1 u32 match '
52 'ip sport 12345 0xffff flowid 1:3039',
53 'sudo iptables -t nat -A PREROUTING -i fakeeth -p tcp --dport 12345 -j '
54 'REDIRECT --to-port 8888',
55 'sudo iptables -t nat -A OUTPUT -p tcp --dport 12345 -j REDIRECT '
56 '--to-port 8888'
58 self.assertEqual(expected, self.commands)
60 def testCreateConstrainedPortDefaults(self):
61 config = {
62 'interface': 'fakeeth',
63 'port': 12345,
64 'server_port': 8888,
65 'latency': None
67 traffic_control.CreateConstrainedPort(config)
68 expected = [
69 'sudo tc qdisc add dev fakeeth root handle 1: htb',
70 'sudo tc class add dev fakeeth parent 1: classid 1:3039 htb rate '
71 '%dkbit ceil %dkbit' % (traffic_control._DEFAULT_MAX_BANDWIDTH_KBIT,
72 traffic_control._DEFAULT_MAX_BANDWIDTH_KBIT),
73 'sudo tc qdisc add dev fakeeth parent 1:3039 handle 3039:0 netem',
74 'sudo tc filter add dev fakeeth protocol ip parent 1: prio 1 u32 '
75 'match ip sport 12345 0xffff flowid 1:3039',
76 'sudo iptables -t nat -A PREROUTING -i fakeeth -p tcp --dport 12345 -j '
77 'REDIRECT --to-port 8888',
78 'sudo iptables -t nat -A OUTPUT -p tcp --dport 12345 -j REDIRECT '
79 '--to-port 8888'
81 self.assertEqual(expected, self.commands)
83 def testDeleteConstrainedPort(self):
84 config = {
85 'interface': 'fakeeth',
86 'port': 12345,
87 'server_port': 8888,
88 'bandwidth': 256,
90 _old_GetFilterHandleId = traffic_control._GetFilterHandleId
91 traffic_control._GetFilterHandleId = lambda interface, port: '800::800'
93 try:
94 traffic_control.DeleteConstrainedPort(config)
95 expected = [
96 'sudo tc filter del dev fakeeth protocol ip parent 1:0 handle '
97 '800::800 prio 1 u32',
98 'sudo tc class del dev fakeeth parent 1: classid 1:3039 htb rate '
99 '256kbit ceil 256kbit',
100 'sudo iptables -t nat -D PREROUTING -i fakeeth -p tcp --dport 12345 '
101 '-j REDIRECT --to-port 8888',
102 'sudo iptables -t nat -D OUTPUT -p tcp --dport 12345 -j REDIRECT '
103 '--to-port 8888']
104 self.assertEqual(expected, self.commands)
105 finally:
106 traffic_control._GetFilterHandleId = _old_GetFilterHandleId
108 def testTearDown(self):
109 config = {'interface': 'fakeeth'}
111 traffic_control.TearDown(config)
112 expected = [
113 'sudo tc qdisc del dev fakeeth root',
114 'sudo iptables -t nat -F'
116 self.assertEqual(expected, self.commands)
118 def testGetFilterHandleID(self):
119 # Check seach for handle ID command.
120 self.assertRaises(traffic_control.TrafficControlError,
121 traffic_control._GetFilterHandleId, 'fakeeth', 1)
122 self.assertEquals(self.commands, ['sudo tc filter list dev fakeeth parent '
123 '1:'])
125 # Check with handle ID available.
126 traffic_control._Exec = (lambda command, msg:
127 'filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht '
128 '800 bkt 0 flowid 1:1\nmatch 08ae0000/ffff0000 at 20')
129 output = traffic_control._GetFilterHandleId('fakeeth', 1)
130 self.assertEqual(output, '800::800')
132 # Check with handle ID not available.
133 traffic_control._Exec = (lambda command, msg:
134 'filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht '
135 '800 bkt 0 flowid 1:11\nmatch 08ae0000/ffff0000 at 20')
136 self.assertRaises(traffic_control.TrafficControlError,
137 traffic_control._GetFilterHandleId, 'fakeeth', 1)
139 traffic_control._Exec = lambda command, msg: 'NO ID IN HERE'
140 self.assertRaises(traffic_control.TrafficControlError,
141 traffic_control._GetFilterHandleId, 'fakeeth', 1)
144 if __name__ == '__main__':
145 unittest.main()