1 # this test reaches inside MogileFS::Host and MogileFS::Connection::HTTP
2 # internals to ensure error handling and odd corner cases are handled
3 # (existing tests may not exercise this in monitor)
9 use MogileFS::Util qw/wait_for_readability/;
12 use Socket qw(TCP_NODELAY);
14 # bind a random TCP port for testing
16 LocalAddr => "127.0.0.1",
22 my $http = IO::Socket::INET->new(%lopts);
23 $http->sockopt(TCP_NODELAY, 1);
24 my $http_get = IO::Socket::INET->new(%lopts);
25 $http_get->sockopt(TCP_NODELAY, 1);
29 hostname => 'mockhost',
30 hostip => $http->sockhost,
31 http_port => $http->sockport,
32 http_get_port => $http_get->sockport,
34 my $host = MogileFS::Host->new_from_args($host_args);
36 # required, defaults to 20 in normal server
37 MogileFS::Config->set_config("conn_pool_size", 13);
39 MogileFS::Host->_init_pools;
41 my $idle_pool = $MogileFS::Host::http_pool->{idle};
42 is("MogileFS::Host", ref($host), "host created");
43 MogileFS::Config->set_config("node_timeout", 1);
45 is(13, $MogileFS::Host::http_pool->{total_capacity}, "conn_pool_size took effect");
47 # hit the http_get_port
50 Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
51 $host->http_get("GET", "/read-only", undef, sub { $resp = $_[0] });
54 my $s = $http_get->accept;
55 my $buf = read_one_request($s);
56 if ($buf =~ m{\AGET /read-only HTTP/1\.0\r\n}) {
57 $s->syswrite("HTTP/1.1 200\r\nContent-Length: 0\r\n\r\n");
59 sleep 6; # wait for SIGKILL
62 Danga::Socket->EventLoop;
63 ok($resp->is_success, "HTTP response is success");
64 is(200, $resp->code, "got HTTP 200 response");
65 my $pool = $idle_pool->{"$host->{hostip}:$host->{http_get_port}"};
66 is(1, scalar @$pool, "connection placed in GET pool");
69 has_nothing_inflight();
73 # simulate a trickled response from server
76 Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
77 $host->http("GET", "/trickle", undef, sub { $resp = $_[0] });
79 my $s = $http->accept;
80 my $buf = read_one_request($s);
81 my $r = "HTTP/1.1 200 OK\r\nContent-Length: 100\r\n\r\n";
82 if ($buf =~ /trickle/) {
83 foreach my $x (split(//, $r)) {
87 foreach my $i (1..100) {
88 $s->syswrite($i % 10);
95 Danga::Socket->EventLoop;
96 ok($resp->is_success, "HTTP response is successful");
98 foreach my $i (1..100) {
101 is($expect, $resp->content, "response matches expected");
104 has_nothing_inflight();
105 has_nothing_queued();
108 # simulate a differently trickled response from server
111 Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
112 my $body = "*" x 100;
113 $host->http("GET", "/trickle-head-body", undef, sub { $resp = $_[0] });
115 my $s = $http->accept;
116 my $buf = read_one_request($s);
117 my $r = "HTTP/1.1 200 OK\r\nContent-Length: 100\r\n\r\n";
118 if ($buf =~ /trickle-head-body/) {
126 Danga::Socket->EventLoop;
127 ok($resp->is_success, "HTTP response is successful on trickle");
128 is($resp->content, $body, "trickled response matches expected");
131 has_nothing_inflight();
132 has_nothing_queued();
135 # simulate a server that disconnected after a (very short) idle time
136 # despite supporting persistent conns
139 Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
140 $host->http("GET", "/foo", undef, sub { $resp = $_[0] });
144 my $s = $http->accept;
145 my $buf = read_one_request($s);
146 if ($buf =~ m{\AGET /foo HTTP/1\.0\r\n}) {
147 $s->syswrite("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n");
149 sleep 6; # wait for SIGKILL
152 Danga::Socket->EventLoop;
153 ok($resp->is_success, "HTTP response is success");
154 my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
155 is(1, scalar @$pool, "connection placed in pool");
159 # try again, server didn't actually keep the connection alive,
161 Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
162 $host->http("GET", "/again", undef, sub { $resp = $_[0] });
165 my $s = $http->accept;
166 my $buf = read_one_request($s);
167 if ($buf =~ m{\AGET /again HTTP/1\.0\r\n}) {
168 $s->syswrite("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n");
170 sleep 6; # wait for SIGKILL
173 Danga::Socket->EventLoop;
174 my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
175 is(1, scalar @$pool, "new connection placed in pool");
176 isnt($conn, $pool->[0], "reference not reused");
179 has_nothing_inflight();
180 has_nothing_queued();
183 # simulate persistent connection reuse
189 my $failsafe = Danga::Socket->AddTimer(5, sub { $resp = "FAIL TIMEOUT" });
190 Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
193 my $s = $http->accept;
195 foreach my $i (1..$nr) {
196 $buf = read_one_request($s);
197 if ($buf =~ m{\AGET /$i HTTP/1\.0\r\n}) {
198 $s->syswrite("HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\n$i");
201 sleep 6; # wait for SIGKILL
204 foreach my $i (1..$nr) {
206 $host->http("GET", "/$i", undef, sub { $resp = $_[0] });
207 Danga::Socket->EventLoop;
208 is(ref($resp), "HTTP::Response", "got HTTP response");
209 ok($resp->is_success, "HTTP response is successful");
210 is($i, $resp->content, "response matched");
211 my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
212 is(1, scalar @$pool, "connection placed in connection pool");
216 is("MogileFS::Connection::HTTP", ref($conn), "got connection");
218 ok($conn == $pool->[0], "existing connection reused (#$i)");
224 has_nothing_inflight();
225 has_nothing_queued();
228 # simulate a node_timeout
229 sub sim_node_timeout {
230 my ($send_header) = @_;
233 # we need this timer (just to exist) to break out of the event loop
234 my $t = Danga::Socket->AddTimer(1.2, sub { fail("timer should not fire") });
236 my $req = "/node-time-me-out-";
237 $req .= $send_header ? 1 : 0;
238 Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
239 $host->http("GET", $req, undef, sub { $resp = $_[0] });
242 my $s = $http->accept;
243 my $buf = read_one_request($s);
244 if ($buf =~ /node-time-me-out/) {
246 $s->syswrite("HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\n");
248 sleep 60; # wait to trigger timeout
250 # nuke the connection to _NOT_ trigger timeout
251 $s->syswrite("HTTP/1.1 404 Not Found\r\n\r\n");
256 Danga::Socket->EventLoop;
258 ok(! $resp->is_success, "HTTP response is not successful");
259 like($resp->message, qr/node_timeout/, "node_timeout hit");
260 my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
261 is(0, scalar @$pool, "connection pool is empty");
264 has_nothing_inflight();
265 has_nothing_queued();
271 # server just drops connection
275 # we want an empty pool to avoid retries
276 my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
277 is(0, scalar @$pool, "connection pool is empty");
279 Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
280 $host->http("GET", "/drop-me", undef, sub { $resp = $_[0] });
283 my $s = $http->accept;
284 my $buf = read_one_request($s);
285 close $s if ($buf =~ /drop-me/);
289 Danga::Socket->EventLoop;
290 ok(! $resp->is_success, "HTTP response is not successful");
291 my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
292 is(0, scalar @$pool, "connection pool is empty");
295 has_nothing_inflight();
296 has_nothing_queued();
299 # server is not running
303 # we want an empty pool to avoid retries
304 my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
305 is(0, scalar @$pool, "connection pool is empty");
307 Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
308 $http->close; # $http is unusable after this
309 $host->http("GET", "/fail", undef, sub { $resp = $_[0] });
310 Danga::Socket->EventLoop;
311 ok(! $resp->is_success, "HTTP response is not successful");
312 ok($resp->header("X-MFS-Error"), "X-MFS-Error is set");
313 is(0, scalar @$pool, "connection pool is empty");
315 has_nothing_inflight();
316 has_nothing_queued();
321 sub has_nothing_inflight {
322 my $inflight = $MogileFS::Host::http_pool->{inflight};
324 foreach my $host_port (keys %$inflight) {
325 $n += scalar keys %{$inflight->{$host_port}};
327 is($MogileFS::Host::http_pool->{total_inflight}, 0, "nothing is counted to be inflight");
328 is($n, 0, "nothing is really inflight");
331 sub has_nothing_queued {
332 is(scalar @{$MogileFS::Host::http_pool->{queue}}, 0, "connection pool task queue is empty");
336 my ($child, $parent) = @_;
338 fail("fork failed: $!") unless defined($pid);
344 is(1, kill(9, $pid), "child killed");
345 is($pid, waitpid($pid, 0), "child reaped");
349 sub read_one_request {
353 wait_for_readability($fd, 5);
356 $s->sysread($buf, 4096, length($buf));
357 } while wait_for_readability($fd, 0.1) && $buf !~ /\r\n\r\n/;