ConnectionPool: avoid undefined behavior for hash iteration
[MogileFS-Server.git] / lib / MogileFS / ConnectionPool.pm
blob7d4b00f3b60a19a7bb304f25cc0f9c38f7b5c385
1 # a connection pool class with queueing.
2 # (something doesn't sound quite right with that...)
3 # This requires Danga::Socket to drive, but may also function without it
4 # via conn_get/conn_put.
5 package MogileFS::ConnectionPool;
6 use strict;
7 use warnings;
8 use Carp qw(croak confess);
9 use Time::HiRes;
11 use constant NEVER => (0xffffffff << 32) | 0xffffffff; # portable version :P
13 sub new {
14 my ($class, $conn_class, $opts) = @_;
16 $opts ||= {};
17 my $self = bless {
18 fdmap => {}, # { fd -> conn }
19 idle => {}, # ip:port -> [ MogileFS::Connection::Poolable, ... ]
20 queue => [], # [ [ ip, port, callback ], ... ]
21 timer => undef, # Danga::Socket::Timer object
22 timeouts => {}, # { fd -> conn }
23 inflight => {}, # ip:port -> { fd -> callback }
24 total_inflight => 0, # number of inflight connections
25 dest_capacity => $opts->{dest_capacity},
26 total_capacity => $opts->{total_capacity},
27 class => $conn_class,
28 scheduled => 0, # set if we'll start tasks on next tick
29 on_next_tick => [],
30 next_expiry => NEVER,
31 }, $class;
33 # total_capacity=20 matches what we used with LWP
34 $self->{total_capacity} ||= 20;
36 # allow users to specify per-destination capacity limits
37 $self->{dest_capacity} ||= $self->{total_capacity};
39 return $self;
42 # retrieves an idle connection for the [IP, port] pair
43 sub _conn_idle_get {
44 my ($self, $ip, $port) = @_;
46 my $key = "$ip:$port";
47 my $idle = $self->{idle}->{$key} or return;
49 # the Danga::Socket event loop may detect hangups and close sockets,
50 # However not all MFS workers run this event loop, so we need to
51 # validate the connection when retrieving a connection from the pool
52 while (my $conn = pop @$idle) {
53 # make sure the socket is valid:
55 # due to response callback ordering, we actually place connections
56 # in the pool before invoking the user-supplied response callback
57 # (to allow connections to get reused ASAP)
58 my $sock = $conn->sock or next;
60 # hope this returns EAGAIN, not using OO->sysread here since
61 # Net::HTTP::NB overrides that and we _want_ to hit EAGAIN here
62 my $r = sysread($sock, my $byte, 1);
64 # good, connection is possibly still alive if we got EAGAIN
65 return $conn if (!defined $r && $!{EAGAIN});
67 my $err = $!;
68 if (defined $r) {
69 if ($r == 0) {
70 # a common case and to be expected
71 $err = "server dropped idle connection";
72 } else {
73 # this is a bug either on our side or the HTTP server
74 Mgd::error("Bug: unexpected got $r bytes from idle conn to ". $conn->host_port. ") (byte=$byte)");
78 # connection is bad, close the socket and move onto the
79 # next idle connection if there is one.
80 $conn->close($err);
83 return;
86 # creates a new connection if under capacity
87 # returns undef if we're at capacity (or on EMFILE/ENFILE)
88 sub _conn_new_maybe {
89 my ($self, $ip, $port) = @_;
90 my $key = "$ip:$port";
92 # we only call this sub if we don't have idle connections, so
93 # we don't check {idle} here
95 # make sure we're not already at capacity for this destination
96 my $nr_inflight = scalar keys %{$self->{inflight}->{$key} ||= {}};
97 return if ($nr_inflight >= $self->{dest_capacity});
99 # see how we're doing with regard to total capacity:
100 if ($self->_total_connections >= $self->{total_capacity}) {
101 # see if we have idle connections for other pools to kill
102 if ($self->{total_inflight} < $self->{total_capacity}) {
103 # we have idle connections to other destinations, drop one of those
104 $self->_conn_drop_idle;
105 # fall through to creating a new connection
106 } else {
107 # we're at total capacity for the entire pool
108 return;
112 # we're hopefully under capacity if we got here, create a new connection
113 $self->_conn_new($ip, $port);
116 # creates new connection and registers it in our fdmap
117 # returns error string if resources (FDs, buffers) aren't available
118 sub _conn_new {
119 my ($self, $ip, $port) = @_;
121 # calls MogileFS::Connection::{HTTP,Mogstored}->new:
122 my $conn = $self->{class}->new($ip, $port);
123 if ($conn) {
124 # register the connection
125 $self->{fdmap}->{$conn->fd} = $conn;
126 $conn->set_pool($self);
128 return $conn;
129 } else {
130 # EMFILE/ENFILE should never happen as the capacity for this
131 # pool is far under the system defaults. Just give up on
132 # EMFILE/ENFILE like any other error.
133 return "failed to create socket to $ip:$port ($!)";
137 # retrieves a connection, may return undef if at capacity
138 sub _conn_get {
139 my ($self, $ip, $port) = @_;
141 # if we have idle connections, always use them first
142 $self->_conn_idle_get($ip, $port) || $self->_conn_new_maybe($ip, $port);
145 # Pulls a connection out of the pool for synchronous use.
146 # This may create a new connection (independent of pool limits).
147 # The connection returned by this is _blocking_. This is currently
148 # only used by replicate.
149 sub conn_get {
150 my ($self, $ip, $port) = @_;
151 my $conn = $self->_conn_idle_get($ip, $port);
153 if ($conn) {
154 # in case the connection never comes back, let refcounting close() it:
155 delete $self->{fdmap}->{$conn->fd};
156 } else {
157 $conn = $self->_conn_new($ip, $port);
158 unless (ref $conn) {
159 $! = $conn; # $conn is an error message :<
160 return;
162 delete $self->{fdmap}->{$conn->fd};
163 my $timeout = MogileFS->config("node_timeout");
164 MogileFS::Util::wait_for_writeability($conn->fd, $timeout) or return;
167 return $conn;
170 # retrieves a connection from the connection pool and executes
171 # inflight_cb on it. If the pool is at capacity, this will queue the task.
172 # This relies on Danga::Socket->EventLoop
173 sub start {
174 my ($self, $ip, $port, $inflight_cb) = @_;
176 my $conn = $self->_conn_get($ip, $port);
177 if ($conn) {
178 $self->_conn_run($conn, $inflight_cb);
179 } else { # we're too busy right now, queue up
180 $self->enqueue($ip, $port, $inflight_cb);
184 # returns the total number of connections we have
185 sub _total_connections {
186 my ($self) = @_;
187 return scalar keys %{$self->{fdmap}};
190 # marks a connection as no longer inflight, returns the inflight
191 # callback if the connection was active, undef if not
192 sub inflight_cb_expire {
193 my ($self, $conn) = @_;
194 my $inflight_cb = delete $self->{inflight}->{$conn->key}->{$conn->fd};
195 $self->{total_inflight}-- if $inflight_cb;
197 return $inflight_cb;
200 # schedules the event loop to dequeue and run a task on the next
201 # tick of the Danga::Socket event loop. Call this
202 # 1) whenever a task is enqueued
203 # 2) whenever a task is complete
204 sub schedule_queued {
205 my ($self) = @_;
207 # AddTimer(0) to avoid potential stack overflow
208 $self->{scheduled} ||= Danga::Socket->AddTimer(0, sub {
209 $self->{scheduled} = undef;
210 my $queue = $self->{queue};
212 my $total_capacity = $self->{total_capacity};
213 my $i = 0;
215 while ($self->{total_inflight} < $total_capacity
216 && $i <= (scalar(@$queue) - 1)) {
217 my ($ip, $port, $cb) = @{$queue->[$i]};
219 my $conn = $self->_conn_get($ip, $port);
220 if ($conn) {
221 splice(@$queue, $i, 1); # remove from queue
222 $self->_conn_run($conn, $cb);
223 } else {
224 # this queue object cannot be dequeued, skip it for now
225 $i++;
231 # Call this when done using an (inflight) connection
232 # This possibly places a connection in the connection pool.
233 # This will close the connection of the pool is already at capacity.
234 # This will also start the next queued callback, or retry if needed
235 sub conn_persist {
236 my ($self, $conn) = @_;
238 # schedule the next request if we're done with any connection
239 $self->schedule_queued;
240 $self->conn_put($conn);
243 # The opposite of conn_get, this returns a connection retrieved with conn_get
244 # back to the connection pool, making it available for future use. Dead
245 # connections are not stored.
246 # This is currently only used by replicate.
247 sub conn_put {
248 my ($self, $conn) = @_;
250 my $key = $conn->key;
251 # we do not store dead connections
252 my $peer_addr = $conn->peer_addr_string;
254 if ($peer_addr) {
255 # connection is still alive, respect capacity limits
256 my $idle = $self->{idle}->{$key} ||= [];
258 # register it in the fdmap just in case:
259 $self->{fdmap}->{$conn->fd} = $conn;
261 if ($self->_dest_total($conn) < $self->{dest_capacity}) {
262 $conn->mark_idle;
263 push @$idle, $conn; # yay, connection is reusable
264 $conn->set_timeout(undef); # clear timeout
265 return 1; # success
269 # we have too many connections or the socket is dead, caller
270 # should close after returning from this function.
271 return 0;
274 # enqueues a request (inflight_cb) and schedules it to run ASAP
275 # This must be used with Danga::Socket->EventLoop
276 sub enqueue {
277 my ($self, $ip, $port, $inflight_cb) = @_;
279 push @{$self->{queue}}, [ $ip, $port, $inflight_cb ];
281 # we have something in the queue, make sure it's run soon
282 $self->schedule_queued;
285 # returns the total connections to the host of a given connection
286 sub _dest_total {
287 my ($self, $conn) = @_;
288 my $key = $conn->key;
289 my $inflight = scalar keys %{$self->{inflight}->{$key}};
290 my $idle = scalar @{$self->{idle}->{$key}};
291 return $idle + $inflight;
294 # only call this from the event_hup/event_err callbacks used by Danga::Socket
295 sub conn_drop {
296 my ($self, $conn, $close_reason) = @_;
298 my $fd = $conn->fd;
299 my $key = $conn->key;
301 # event_read must handle errors anyways, so hand off
302 # error handling to the event_read callback if inflight.
303 return $conn->event_read if $self->{inflight}->{$key}->{$fd};
305 # we get here if and only if the socket is idle, we can drop it ourselves
306 # splice out the socket we're closing from the idle pool
307 my $idle = $self->{idle}->{$key};
308 foreach my $i (0 .. (scalar(@$idle) - 1)) {
309 my $old = $idle->[$i];
310 if ($old->sock) {
311 if ($old->fd == $fd) {
312 splice(@$idle, $i, 1);
313 $conn->close($close_reason);
314 return;
316 } else {
317 # some connections may have expired but not been spliced out, yet
318 # splice it out here since we're iterating anyways
319 splice(@$idle, $i, 1);
324 # unregisters and prepares connection to be closed
325 # Returns the inflight callback if there was one
326 sub conn_close_prepare {
327 my ($self, $conn, $close_reason) = @_;
329 if ($conn->sock) {
330 my $fd = $conn->fd;
332 my $valid = delete $self->{fdmap}->{$fd};
333 delete $self->{timeouts}->{$fd};
335 my $inflight_cb = $self->inflight_cb_expire($conn);
337 # $valid may be undef in replicate worker which removes connections
338 # from fdmap. However, valid==undef connections should never have
339 # an inflight_cb
340 if ($inflight_cb && !$valid) {
341 croak("BUG: dropping unregistered conn with callback: $conn");
343 return $inflight_cb;
347 # schedules cb to run on the next tick of the event loop,
348 # (immediately after this tick runs)
349 sub on_next_tick {
350 my ($self, $cb) = @_;
351 my $on_next_tick = $self->{on_next_tick};
352 push @$on_next_tick, $cb;
354 if (scalar(@$on_next_tick) == 1) {
355 Danga::Socket->AddTimer(0, sub {
356 # prevent scheduled callbacks from being called on _this_ tick
357 $on_next_tick = $self->{on_next_tick};
358 $self->{on_next_tick} = [];
360 while (my $sub = shift @$on_next_tick) {
361 $sub->()
367 # marks a connection inflight and invokes cb on it
368 # $conn may be a error string, in which case we'll invoke the user-supplied
369 # callback with a mock error (this mimics how LWP fakes an HTTP response
370 # even if the socket could not be created/connected)
371 sub _conn_run {
372 my ($self, $conn, $cb) = @_;
374 if (ref $conn) {
375 my $inflight = $self->{inflight}->{$conn->key} ||= {};
376 $inflight->{$conn->fd} = $cb; # stash callback for retrying
377 $self->{total_inflight}++;
378 $cb->($conn);
379 } else {
380 # fake an error message on the response callback
381 $self->on_next_tick(sub {
382 # fatal error creating the socket, do not queue
383 my $mfs_err = $conn;
384 $self->{class}->new_err($mfs_err, $cb);
386 # onto the next request
387 $self->schedule_queued;
392 # drops an idle connection from the idle connection pool (so we can open
393 # another socket without incurring out-of-FD errors)
394 # Only call when you're certain there's a connection to drop
395 # XXX This is O(destinations), unfortunately
396 sub _conn_drop_idle {
397 my ($self) = @_;
398 my $idle = $self->{idle};
400 foreach my $val (values %$idle) {
401 my $conn = shift @$val or next;
403 $conn->close("idle_expire") if $conn->sock;
404 return;
407 confess("BUG: unable to drop an idle connection");
410 # checks for expired connections, this can be expensive if there
411 # are many concurrent connections waiting on timeouts, but still
412 # better than having AddTimer create a Danga::Socket::Timer object
413 # every time a timeout is reset.
414 sub check_timeouts {
415 my ($self) = @_;
416 my $timeouts = $self->{timeouts};
417 my @fds = keys %$timeouts;
418 my $next_expiry = NEVER;
419 my $now = Time::HiRes::time();
421 # this is O(n) where n is concurrent connections
422 foreach my $fd (@fds) {
423 my $conn = $timeouts->{$fd};
424 if ($conn->expired($now)) {
425 delete $timeouts->{$fd};
426 } else {
427 # look for the next timeout
428 my $expiry = $conn->expiry;
429 if ($expiry) {
430 $next_expiry = $expiry if $expiry < $next_expiry;
431 } else {
432 # just in case, this may not happen...
433 delete $timeouts->{$fd};
438 # schedule the wakeup for the next timeout
439 if ($next_expiry == NEVER) {
440 $self->{timer} = undef;
441 } else {
442 my $timeout = $next_expiry - $now;
443 $timeout = 0 if $timeout <= 0;
444 $self->{timer} = Danga::Socket->AddTimer($timeout, sub {
445 $self->check_timeouts;
448 $self->{next_expiry} = $next_expiry;
451 # registers a timeout for a given connection, each connection may only
452 # have one pending timeout. Timeout may be undef to cancel the current
453 # timeout.
454 sub register_timeout {
455 my ($self, $conn, $timeout) = @_;
457 if ($conn->sock) {
458 my $fd = $conn->fd;
459 if ($timeout) {
460 $self->{timeouts}->{$fd} = $conn;
461 my $next_expiry = $self->{next_expiry};
462 my $old_timer = $self->{timer};
463 my $expiry = $timeout + Time::HiRes::time();
465 if (!$old_timer || $expiry < $next_expiry) {
466 $self->{next_expiry} = $expiry;
467 $self->{timer} = Danga::Socket->AddTimer($timeout, sub {
468 $self->check_timeouts;
470 $old_timer->cancel if $old_timer;
472 } else {
473 delete $self->{timeouts}->{$fd};
475 } elsif ($timeout) { # this may never happen...
476 # no FD, so we must allocate a new Danga::Socket::Timer object
477 # add 1msec to avoid FP rounding problems leading to missed
478 # expiration when calling conn->expired
479 Danga::Socket->AddTimer($timeout + 0.001, sub { $conn->expired });