client connection should always be nonblocking
[MogileFS-Server.git] / t / http.t
blob92a30183b97b8c375e985d9e2b935a561fc0cbd5
1 # this test reaches inside MogileFS::Host and MogileFS::Connection::HTTP
2 # internals to ensure error handling and odd corner cases are handled
3 # (existing tests may not exercise this in monitor)
4 use strict;
5 use warnings;
6 use Test::More;
7 use MogileFS::Server;
8 use MogileFS::Test;
9 use MogileFS::Util qw/wait_for_readability/;
10 use Danga::Socket;
11 use IO::Socket::INET;
12 use Socket qw(TCP_NODELAY);
14 # bind a random TCP port for testing
15 my %lopts = (
16     LocalAddr => "127.0.0.1",
17     LocalPort => 0,
18     Proto => "tcp",
19     ReuseAddr => 1,
20     Listen => 1024
22 my $http = IO::Socket::INET->new(%lopts);
23 $http->sockopt(TCP_NODELAY, 1);
24 my $http_get = IO::Socket::INET->new(%lopts);
25 $http_get->sockopt(TCP_NODELAY, 1);
27 my $host_args = {
28     hostid => 1,
29     hostname => 'mockhost',
30     hostip => $http->sockhost,
31     http_port => $http->sockport,
32     http_get_port => $http_get->sockport,
34 my $host = MogileFS::Host->new_from_args($host_args);
36 # required, defaults to 20 in normal server
37 MogileFS::Config->set_config("conn_pool_size", 13);
39 MogileFS::Host->_init_pools;
41 my $idle_pool = $MogileFS::Host::http_pool->{idle};
42 is("MogileFS::Host", ref($host), "host created");
43 MogileFS::Config->set_config("node_timeout", 1);
45 is(13, $MogileFS::Host::http_pool->{total_capacity}, "conn_pool_size took effect");
47 # hit the http_get_port
49     my $resp;
50     Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
51     $host->http_get("GET", "/read-only", undef, sub { $resp = $_[0] });
53     server_do(sub {
54         my $s = $http_get->accept;
55         my $buf = read_one_request($s);
56         if ($buf =~ m{\AGET /read-only HTTP/1\.0\r\n})  {
57             $s->syswrite("HTTP/1.1 200\r\nContent-Length: 0\r\n\r\n");
58         }
59         sleep 6; # wait for SIGKILL
60     },
61     sub {
62         Danga::Socket->EventLoop;
63         ok($resp->is_success, "HTTP response is success");
64         is(200, $resp->code, "got HTTP 200 response");
65         my $pool = $idle_pool->{"$host->{hostip}:$host->{http_get_port}"};
66         is(1, scalar @$pool, "connection placed in GET pool");
67     });
69     has_nothing_inflight();
70     has_nothing_queued();
73 # simulate a trickled response from server
75     my $resp;
76     Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
77     $host->http("GET", "/trickle", undef, sub { $resp = $_[0] });
78     server_do(sub {
79         my $s = $http->accept;
80         my $buf = read_one_request($s);
81         my $r = "HTTP/1.1 200 OK\r\nContent-Length: 100\r\n\r\n";
82         if ($buf =~ /trickle/) {
83             foreach my $x (split(//, $r)) {
84                 $s->syswrite($x);
85                 sleep 0.01;
86             }
87             foreach my $i (1..100) {
88                 $s->syswrite($i % 10);
89                 sleep 0.1;
90             }
91         }
92         sleep 6;
93     },
94     sub {
95         Danga::Socket->EventLoop;
96         ok($resp->is_success, "HTTP response is successful");
97         my $expect = "";
98         foreach my $i (1..100) {
99             $expect .= $i % 10;
100         }
101         is($expect, $resp->content, "response matches expected");
102     });
104     has_nothing_inflight();
105     has_nothing_queued();
108 # simulate a differently trickled response from server
110     my $resp;
111     Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
112     my $body = "*" x 100;
113     $host->http("GET", "/trickle-head-body", undef, sub { $resp = $_[0] });
114     server_do(sub {
115         my $s = $http->accept;
116         my $buf = read_one_request($s);
117         my $r = "HTTP/1.1 200 OK\r\nContent-Length: 100\r\n\r\n";
118         if ($buf =~ /trickle-head-body/) {
119             $s->syswrite($r);
120             sleep 1;
121             $s->syswrite($body);
122         }
123         sleep 6;
124     },
125     sub {
126         Danga::Socket->EventLoop;
127         ok($resp->is_success, "HTTP response is successful on trickle");
128         is($resp->content, $body, "trickled response matches expected");
129     });
131     has_nothing_inflight();
132     has_nothing_queued();
135 # simulate a server that disconnected after a (very short) idle time
136 # despite supporting persistent conns
138     my $resp;
139     Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
140     $host->http("GET", "/foo", undef, sub { $resp = $_[0] });
141     my $conn;
143     server_do(sub {
144         my $s = $http->accept;
145         my $buf = read_one_request($s);
146         if ($buf =~ m{\AGET /foo HTTP/1\.0\r\n})  {
147             $s->syswrite("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n");
148         }
149         sleep 6; # wait for SIGKILL
150     },
151     sub {
152         Danga::Socket->EventLoop;
153         ok($resp->is_success, "HTTP response is success");
154         my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
155         is(1, scalar @$pool, "connection placed in pool");
156         $conn = $pool->[0];
157     });
159     # try again, server didn't actually keep the connection alive,
160     $resp = undef;
161     Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
162     $host->http("GET", "/again", undef, sub { $resp = $_[0] });
164     server_do(sub {
165         my $s = $http->accept;
166         my $buf = read_one_request($s);
167         if ($buf =~ m{\AGET /again HTTP/1\.0\r\n})  {
168             $s->syswrite("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n");
169         }
170         sleep 6; # wait for SIGKILL
171     },
172     sub {
173         Danga::Socket->EventLoop;
174         my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
175         is(1, scalar @$pool, "new connection placed in pool");
176         isnt($conn, $pool->[0], "reference not reused");
177     });
179     has_nothing_inflight();
180     has_nothing_queued();
183 # simulate persistent connection reuse
185     my $resp;
186     my $nr = 6;
187     my $conn;
189     my $failsafe = Danga::Socket->AddTimer(5, sub { $resp = "FAIL TIMEOUT" });
190     Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
192     server_do(sub {
193         my $s = $http->accept;
194         my $buf;
195         foreach my $i (1..$nr) {
196             $buf = read_one_request($s);
197             if ($buf =~ m{\AGET /$i HTTP/1\.0\r\n}) {
198                 $s->syswrite("HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\n$i");
199             }
200         }
201         sleep 6; # wait for SIGKILL
202     },
203     sub {
204         foreach my $i (1..$nr) {
205             $resp = undef;
206             $host->http("GET", "/$i", undef, sub { $resp = $_[0] });
207             Danga::Socket->EventLoop;
208             is(ref($resp), "HTTP::Response", "got HTTP response");
209             ok($resp->is_success, "HTTP response is successful");
210             is($i, $resp->content, "response matched");
211             my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
212             is(1, scalar @$pool, "connection placed in connection pool");
214             if ($i == 1) {
215                 $conn = $pool->[0];
216                 is("MogileFS::Connection::HTTP", ref($conn), "got connection");
217             } else {
218                 ok($conn == $pool->[0], "existing connection reused (#$i)");
219             }
220         }
221     });
222     $failsafe->cancel;
224     has_nothing_inflight();
225     has_nothing_queued();
228 # simulate a node_timeout
229 sub sim_node_timeout {
230     my ($send_header) = @_;
231     my $resp;
233     # we need this timer (just to exist) to break out of the event loop
234     my $t = Danga::Socket->AddTimer(1.2, sub { fail("timer should not fire") });
236     my $req = "/node-time-me-out-";
237     $req .= $send_header ? 1 : 0;
238     Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
239     $host->http("GET", $req, undef, sub { $resp = $_[0] });
241     server_do(sub {
242         my $s = $http->accept;
243         my $buf = read_one_request($s);
244         if ($buf =~ /node-time-me-out/) {
245             if ($send_header) {
246                 $s->syswrite("HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\n");
247             }
248             sleep 60; # wait to trigger timeout
249         } else {
250             # nuke the connection to _NOT_ trigger timeout
251             $s->syswrite("HTTP/1.1 404 Not Found\r\n\r\n");
252             close($s);
253         }
254     },
255     sub {
256         Danga::Socket->EventLoop;
257         $t->cancel;
258         ok(! $resp->is_success, "HTTP response is not successful");
259         like($resp->message, qr/node_timeout/, "node_timeout hit");
260         my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
261         is(0, scalar @$pool, "connection pool is empty");
262     });
264     has_nothing_inflight();
265     has_nothing_queued();
268 sim_node_timeout(0);
269 sim_node_timeout(1);
271 # server just drops connection
273     my $resp;
275     # we want an empty pool to avoid retries
276     my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
277     is(0, scalar @$pool, "connection pool is empty");
279     Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
280     $host->http("GET", "/drop-me", undef, sub { $resp = $_[0] });
282     server_do(sub {
283         my $s = $http->accept;
284         my $buf = read_one_request($s);
285         close $s if ($buf =~ /drop-me/);
286         sleep 6;
287     },
288     sub {
289         Danga::Socket->EventLoop;
290         ok(! $resp->is_success, "HTTP response is not successful");
291         my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
292         is(0, scalar @$pool, "connection pool is empty");
293     });
295     has_nothing_inflight();
296     has_nothing_queued();
299 # server is not running
301     my $resp;
303     # we want an empty pool to avoid retries
304     my $pool = $idle_pool->{"$host->{hostip}:$host->{http_port}"};
305     is(0, scalar @$pool, "connection pool is empty");
307     Danga::Socket->SetPostLoopCallback(sub { ! defined($resp) });
308     $http->close; # $http is unusable after this
309     $host->http("GET", "/fail", undef, sub { $resp = $_[0] });
310     Danga::Socket->EventLoop;
311     ok(! $resp->is_success, "HTTP response is not successful");
312     ok($resp->header("X-MFS-Error"), "X-MFS-Error is set");
313     is(0, scalar @$pool, "connection pool is empty");
315     has_nothing_inflight();
316     has_nothing_queued();
319 done_testing();
321 sub has_nothing_inflight {
322     my $inflight = $MogileFS::Host::http_pool->{inflight};
323     my $n = 0;
324     foreach my $host_port (keys %$inflight) {
325         $n += scalar keys %{$inflight->{$host_port}};
326     }
327     is($MogileFS::Host::http_pool->{total_inflight}, 0, "nothing is counted to be inflight");
328     is($n, 0, "nothing is really inflight");
331 sub has_nothing_queued {
332     is(scalar @{$MogileFS::Host::http_pool->{queue}}, 0, "connection pool task queue is empty");
335 sub server_do {
336     my ($child, $parent) = @_;
337     my $pid = fork;
338     fail("fork failed: $!") unless defined($pid);
340     if ($pid == 0) {
341         $child->();
342     } else {
343         $parent->();
344         is(1, kill(9, $pid), "child killed");
345         is($pid, waitpid($pid, 0), "child reaped");
346     }
349 sub read_one_request {
350     my $s = shift;
352     my $fd = fileno($s);
353     wait_for_readability($fd, 5);
354     my $buf = "";
355     do {
356         $s->sysread($buf, 4096, length($buf));
357     } while wait_for_readability($fd, 0.1) && $buf !~ /\r\n\r\n/;
358     return $buf;