Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / media / tools / constrained_network_server / traffic_control_test.py
blobd641253323531c31d11e9b44a3d3a1f22dcd9a79
1 #!/usr/bin/env python
3 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
7 """End-to-end tests for traffic control library."""
8 import os
9 import re
10 import sys
11 import unittest
13 import traffic_control
16 class TrafficControlTests(unittest.TestCase):
17 """System tests for traffic_control functions.
19 These tests require root access.
20 """
21 # A dummy interface name to use instead of real interface.
22 _INTERFACE = 'myeth'
24 def setUp(self):
25 """Setup a dummy interface."""
26 # If we update to python version 2.7 or newer we can use setUpClass() or
27 # unittest.skipIf().
28 if os.getuid() != 0:
29 sys.exit('You need root access to run these tests.')
31 command = ['ip', 'link', 'add', 'name', self._INTERFACE, 'type', 'dummy']
32 traffic_control._Exec(command, 'Error creating dummy interface %s.' %
33 self._INTERFACE)
35 def tearDown(self):
36 """Teardown the dummy interface and any network constraints on it."""
37 # Deleting the dummy interface deletes all associated constraints.
38 command = ['ip', 'link', 'del', self._INTERFACE]
39 traffic_control._Exec(command)
41 def testExecOutput(self):
42 output = traffic_control._Exec(['echo', ' Test '])
43 self.assertEqual(output, 'Test')
45 def testExecException(self):
46 self.assertRaises(traffic_control.TrafficControlError,
47 traffic_control._Exec, command=['ls', '!doesntExist!'])
49 def testExecErrorCustomMsg(self):
50 try:
51 traffic_control._Exec(['ls', '!doesntExist!'], msg='test_msg')
52 self.fail('No exception raised for invalid command.')
53 except traffic_control.TrafficControlError as e:
54 self.assertEqual(e.msg, 'test_msg')
56 def testAddRootQdisc(self):
57 """Checks adding a root qdisc is successful."""
58 config = {'interface': self._INTERFACE}
59 root_detail = 'qdisc htb 1: root'
60 # Assert no htb root at startup.
61 command = ['tc', 'qdisc', 'ls', 'dev', config['interface']]
62 output = traffic_control._Exec(command)
63 self.assertFalse(root_detail in output)
65 traffic_control._AddRootQdisc(config['interface'])
66 output = traffic_control._Exec(command)
67 # Assert htb root is added.
68 self.assertTrue(root_detail in output)
70 def testConfigureClassAdd(self):
71 """Checks adding and deleting a class to the root qdisc."""
72 config = {
73 'interface': self._INTERFACE,
74 'port': 12345,
75 'server_port': 33333,
76 'bandwidth': 2000
78 class_detail = ('class htb 1:%x root prio 0 rate %dKbit ceil %dKbit' %
79 (config['port'], config['bandwidth'], config['bandwidth']))
81 # Add root qdisc.
82 traffic_control._AddRootQdisc(config['interface'])
84 # Assert class does not exist prior to adding it.
85 command = ['tc', 'class', 'ls', 'dev', config['interface']]
86 output = traffic_control._Exec(command)
87 self.assertFalse(class_detail in output)
89 # Add class to root.
90 traffic_control._ConfigureClass('add', config)
92 # Assert class is added.
93 command = ['tc', 'class', 'ls', 'dev', config['interface']]
94 output = traffic_control._Exec(command)
95 self.assertTrue(class_detail in output)
97 # Delete class.
98 traffic_control._ConfigureClass('del', config)
100 # Assert class is deleted.
101 command = ['tc', 'class', 'ls', 'dev', config['interface']]
102 output = traffic_control._Exec(command)
103 self.assertFalse(class_detail in output)
105 def testAddSubQdisc(self):
106 """Checks adding a sub qdisc to existing class."""
107 config = {
108 'interface': self._INTERFACE,
109 'port': 12345,
110 'server_port': 33333,
111 'bandwidth': 2000,
112 'latency': 250,
113 'loss': 5
115 qdisc_re_detail = ('qdisc netem %x: parent 1:%x .* delay %d.0ms loss %d%%' %
116 (config['port'], config['port'], config['latency'],
117 config['loss']))
118 # Add root qdisc.
119 traffic_control._AddRootQdisc(config['interface'])
121 # Add class to root.
122 traffic_control._ConfigureClass('add', config)
124 # Assert qdisc does not exist prior to adding it.
125 command = ['tc', 'qdisc', 'ls', 'dev', config['interface']]
126 output = traffic_control._Exec(command)
127 handle_id_re = re.search(qdisc_re_detail, output)
128 self.assertEqual(handle_id_re, None)
130 # Add qdisc to class.
131 traffic_control._AddSubQdisc(config)
133 # Assert qdisc is added.
134 command = ['tc', 'qdisc', 'ls', 'dev', config['interface']]
135 output = traffic_control._Exec(command)
136 handle_id_re = re.search(qdisc_re_detail, output)
137 self.assertNotEqual(handle_id_re, None)
139 def testAddDeleteFilter(self):
140 config = {
141 'interface': self._INTERFACE,
142 'port': 12345,
143 'bandwidth': 2000
145 # Assert no filter exists.
146 command = ['tc', 'filter', 'list', 'dev', config['interface'], 'parent',
147 '1:0']
148 output = traffic_control._Exec(command)
149 self.assertEqual(output, '')
151 # Create the root and class to which the filter will be attached.
152 # Add root qdisc.
153 traffic_control._AddRootQdisc(config['interface'])
155 # Add class to root.
156 traffic_control._ConfigureClass('add', config)
158 # Add the filter.
159 traffic_control._AddFilter(config['interface'], config['port'])
160 handle_id = traffic_control._GetFilterHandleId(config['interface'],
161 config['port'])
162 self.assertNotEqual(handle_id, None)
164 # Delete the filter.
165 # The output of tc filter list is not None because tc adds default filters.
166 traffic_control._DeleteFilter(config['interface'], config['port'])
167 self.assertRaises(traffic_control.TrafficControlError,
168 traffic_control._GetFilterHandleId, config['interface'],
169 config['port'])
172 if __name__ == '__main__':
173 unittest.main()