Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / third_party / WebKit / LayoutTests / http / tests / websocket / connection-throttling_wsh.py
blob32263f0357794fece5edef46f06e76e7144b0b82
1 import cgi
2 import time
3 import threading
6 lock = threading.Lock()
7 connections = set()
8 next_test_id = 0
11 def web_socket_do_extra_handshake(request):
12 query_string = request.ws_resource.split('?', 1)
13 if len(query_string) == 1:
14 return
15 params = cgi.parse_qs(query_string[1])
16 mode = params["mode"][0]
17 if mode == "new_test":
18 new_test(request)
19 elif mode == "do_test":
20 do_test(request, params)
23 def new_test(request):
24 """Allocate a unique test id."""
25 global lock, next_test_id
26 with lock:
27 request.response = str(next_test_id)
28 next_test_id += 1
31 def do_test(request, params):
32 """Check that no other connection is happening at the same time."""
33 global lock, connections
34 id = params["id"][0]
35 with lock:
36 if id in connections:
37 request.response = "FAIL"
38 return
39 connections.add(id)
40 time.sleep(0.05)
41 with lock:
42 connections.remove(id)
43 request.response = "PASS"
46 def web_socket_transfer_data(request):
47 response = request.response
48 request.ws_stream.send_message(response)