1 # Blackbox tests for http_test
3 # Copyright (C) Noel Power noel.power@suse.com
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from http
.server
import HTTPServer
, BaseHTTPRequestHandler
24 from samba
.logger
import get_samba_logger
25 from samba
.tests
import BlackboxTestCase
, BlackboxProcessError
27 logger
= get_samba_logger(name
=__name__
)
28 COMMAND
= "bin/http_test"
29 def make_chunks(msg
, chunk_size
):
31 while len(msg
) > chunk_size
:
32 chunk
= msg
[:chunk_size
]
34 msg
= msg
[chunk_size
:]
39 # simple handler, spits back the 'path' passed in
40 # GET or POST and a chunked encoded http response
41 # where the chunk size is 10 octets
42 class ChunkHTTPRequestHandler(BaseHTTPRequestHandler
):
44 msg
= bytes(self
.path
, encoding
="utf-8")
45 chunks
= make_chunks(msg
, 10)
47 self
.send_response(200)
48 self
.send_header('content-type', 'application/json; charset=UTF-8')
49 if self
.path
== "usegziptransferencoding":
50 self
.send_header('Transfer-Encoding', 'gzip')
52 self
.send_header('Transfer-Encoding', 'chunked')
56 resp
= resp
+ ("%x" % len(chunk
)).encode("utf-8") + b
'\r\n' + chunk
+ b
'\r\n'
58 self
.wfile
.write(resp
)
65 class HttpChunkBlackboxTests(BlackboxTestCase
):
67 self
.server
= HTTPServer((os
.getenv("SERVER_IP", "localhost"), 8080),
68 ChunkHTTPRequestHandler
,
69 bind_and_activate
=False)
70 self
.t
= threading
.Thread(target
=HttpChunkBlackboxTests
.http_server
, args
=(self
,))
71 self
.t
.setDaemon(True)
78 def http_server(self
):
79 self
.server
.server_bind()
80 self
.server
.server_activate()
81 self
.server
.serve_forever()
83 def test_single_chunk(self
):
86 resp
= self
.check_output("%s -U%% -I%s --uri %s" % (COMMAND
, os
.getenv("SERVER_IP", "localhost"), msg
))
87 self
.assertEqual(msg
,resp
.decode('utf-8'))
88 except BlackboxProcessError
as e
:
89 print("Failed with: %s" % e
)
92 def test_multi_chunks(self
):
94 msg
= "snglechunksnglechunksnglechunksnglechunksnglechunk"
95 resp
= self
.check_output("%s -U%% -I%s --uri %s" % (COMMAND
, os
.getenv("SERVER_IP", "localhost"), msg
))
96 self
.assertEqual(msg
, resp
.decode('utf-8'))
97 except BlackboxProcessError
as e
:
98 print("Failed with: %s" % e
)
101 def test_exceed_request_size(self
):
103 msg
= "snglechunksnglechunksnglechunksnglechunksnglechunk"
104 resp
= self
.check_output("%s -d11 -U%% -I%s --rsize 49 --uri %s" % (COMMAND
, os
.getenv("SERVER_IP", "localhost"), msg
))
105 self
.fail("unexpected success")
106 except BlackboxProcessError
as e
:
107 if "http_read_chunk: size 50 exceeds max content len 49 skipping body" not in e
.stderr
.decode('utf-8'):
109 if "unexpected 0 len response" not in e
.stdout
.decode('utf-8'):
112 def test_exact_request_size(self
):
114 msg
= "snglechunksnglechunksnglechunksnglechunksnglechunk"
115 resp
= self
.check_output("%s -U%% -I%s --rsize 50 --uri %s" % (COMMAND
, os
.getenv("SERVER_IP", "localhost"), msg
))
116 self
.assertEqual(msg
, resp
.decode('utf-8'))
117 except BlackboxProcessError
as e
:
118 print("Failed with: %s" % e
)
121 def test_gzip_transfer_encoding(self
):
123 msg
= "usegziptransferencoding"
124 resp
= self
.check_output("%s -U%% -I%s --rsize 50 --uri %s" % (COMMAND
, os
.getenv("SERVER_IP", "localhost"), msg
))
125 self
.assertEqual(msg
, resp
.decode('utf-8'))
126 self
.fail("unexpected success")
127 except BlackboxProcessError
as e
:
128 if "http_response_needs_body: Unsupported transfer encoding type gzip" not in e
.stderr
.decode('utf-8'):