1 # a connection pool class with queueing.
2 # (something doesn't sound quite right with that...)
3 # This requires Danga::Socket to drive, but may also function without it
4 # via conn_get/conn_put.
5 package MogileFS
::ConnectionPool
;
8 use Carp
qw(croak confess);
11 use constant NEVER
=> (0xffffffff << 32) | 0xffffffff; # portable version :P
14 my ($class, $conn_class, $opts) = @_;
18 fdmap
=> {}, # { fd -> conn }
19 idle
=> {}, # ip:port -> [ MogileFS::Connection::Poolable, ... ]
20 queue
=> [], # [ [ ip, port, callback ], ... ]
21 timer
=> undef, # Danga::Socket::Timer object
22 timeouts
=> {}, # { fd -> conn }
23 inflight
=> {}, # ip:port -> { fd -> callback }
24 total_inflight
=> 0, # number of inflight connections
25 dest_capacity
=> $opts->{dest_capacity
},
26 total_capacity
=> $opts->{total_capacity
},
28 scheduled
=> 0, # set if we'll start tasks on next tick
33 # total_capacity=20 matches what we used with LWP
34 $self->{total_capacity
} ||= 20;
36 # allow users to specify per-destination capacity limits
37 $self->{dest_capacity
} ||= $self->{total_capacity
};
42 # retrieves an idle connection for the [IP, port] pair
44 my ($self, $ip, $port) = @_;
46 my $key = "$ip:$port";
47 my $idle = $self->{idle
}->{$key} or return;
49 # the Danga::Socket event loop may detect hangups and close sockets,
50 # However not all MFS workers run this event loop, so we need to
51 # validate the connection when retrieving a connection from the pool
52 while (my $conn = pop @
$idle) {
53 # make sure the socket is valid:
55 # due to response callback ordering, we actually place connections
56 # in the pool before invoking the user-supplied response callback
57 # (to allow connections to get reused ASAP)
58 my $sock = $conn->sock or next;
60 # hope this returns EAGAIN, not using OO->sysread here since
61 # Net::HTTP::NB overrides that and we _want_ to hit EAGAIN here
62 my $r = sysread($sock, my $byte, 1);
64 # good, connection is possibly still alive if we got EAGAIN
65 return $conn if (!defined $r && $!{EAGAIN
});
70 # a common case and to be expected
71 $err = "server dropped idle connection";
73 # this is a bug either on our side or the HTTP server
74 Mgd
::error
("Bug: unexpected got $r bytes from idle conn to ". $conn->host_port. ") (byte=$byte)");
78 # connection is bad, close the socket and move onto the
79 # next idle connection if there is one.
86 # creates a new connection if under capacity
87 # returns undef if we're at capacity (or on EMFILE/ENFILE)
89 my ($self, $ip, $port) = @_;
90 my $key = "$ip:$port";
92 # we only call this sub if we don't have idle connections, so
93 # we don't check {idle} here
95 # make sure we're not already at capacity for this destination
96 my $nr_inflight = scalar keys %{$self->{inflight
}->{$key} ||= {}};
97 return if ($nr_inflight >= $self->{dest_capacity
});
99 # see how we're doing with regard to total capacity:
100 if ($self->_total_connections >= $self->{total_capacity
}) {
101 # see if we have idle connections for other pools to kill
102 if ($self->{total_inflight
} < $self->{total_capacity
}) {
103 # we have idle connections to other destinations, drop one of those
104 $self->_conn_drop_idle;
105 # fall through to creating a new connection
107 # we're at total capacity for the entire pool
112 # we're hopefully under capacity if we got here, create a new connection
113 $self->_conn_new($ip, $port);
116 # creates new connection and registers it in our fdmap
117 # returns undef if resources (FDs, buffers) aren't available
119 my ($self, $ip, $port) = @_;
121 # calls MogileFS::Connection::{HTTP,Mogstored}->new:
122 my $conn = $self->{class}->new($ip, $port);
124 # register the connection
125 $self->{fdmap
}->{$conn->fd} = $conn;
126 $conn->set_pool($self);
130 # EMFILE/ENFILE should never happen as the capacity for this
131 # pool is far under the system defaults. Just give up on
132 # EMFILE/ENFILE like any other error.
134 Mgd
::log('err', "failed to create socket to $ip:$port ($mfs_err)");
140 # retrieves a connection, may return undef if at capacity
142 my ($self, $ip, $port) = @_;
144 # if we have idle connections, always use them first
145 $self->_conn_idle_get($ip, $port) || $self->_conn_new_maybe($ip, $port);
148 # Pulls a connection out of the pool for synchronous use.
149 # This may create a new connection (independent of pool limits).
150 # The connection returned by this is _blocking_. This is currently
151 # only used by replicate.
153 my ($self, $ip, $port) = @_;
154 my $conn = $self->_conn_idle_get($ip, $port);
157 # in case the connection never comes back, let refcounting close() it:
158 delete $self->{fdmap
}->{$conn->fd};
160 $conn = $self->_conn_new($ip, $port);
162 $! = $conn; # $conn is an error message :<
165 delete $self->{fdmap
}->{$conn->fd};
166 my $timeout = MogileFS
->config("node_timeout");
167 MogileFS
::Util
::wait_for_writeability
($conn->fd, $timeout) or return;
173 # retrieves a connection from the connection pool and executes
174 # inflight_cb on it. If the pool is at capacity, this will queue the task.
175 # This relies on Danga::Socket->EventLoop
177 my ($self, $ip, $port, $inflight_cb) = @_;
179 my $conn = $self->_conn_get($ip, $port);
181 $self->_conn_run($conn, $inflight_cb);
182 } else { # we're too busy right now, queue up
183 $self->enqueue($ip, $port, $inflight_cb);
187 # returns the total number of connections we have
188 sub _total_connections
{
190 return scalar keys %{$self->{fdmap
}};
193 # marks a connection as no longer inflight, returns the inflight
194 # callback if the connection was active, undef if not
195 sub inflight_cb_expire
{
196 my ($self, $conn) = @_;
197 my $inflight_cb = delete $self->{inflight
}->{$conn->key}->{$conn->fd};
198 $self->{total_inflight
}-- if $inflight_cb;
203 # schedules the event loop to dequeue and run a task on the next
204 # tick of the Danga::Socket event loop. Call this
205 # 1) whenever a task is enqueued
206 # 2) whenever a task is complete
207 sub schedule_queued
{
210 # AddTimer(0) to avoid potential stack overflow
211 $self->{scheduled
} ||= Danga
::Socket
->AddTimer(0, sub {
212 $self->{scheduled
} = undef;
213 my $queue = $self->{queue
};
215 my $total_capacity = $self->{total_capacity
};
218 while ($self->{total_inflight
} < $total_capacity
219 && $i <= (scalar(@
$queue) - 1)) {
220 my ($ip, $port, $cb) = @
{$queue->[$i]};
222 my $conn = $self->_conn_get($ip, $port);
224 splice(@
$queue, $i, 1); # remove from queue
225 $self->_conn_run($conn, $cb);
227 # this queue object cannot be dequeued, skip it for now
234 # Call this when done using an (inflight) connection
235 # This possibly places a connection in the connection pool.
236 # This will close the connection of the pool is already at capacity.
237 # This will also start the next queued callback, or retry if needed
239 my ($self, $conn) = @_;
241 # schedule the next request if we're done with any connection
242 $self->schedule_queued;
243 $self->conn_put($conn);
246 # The opposite of conn_get, this returns a connection retrieved with conn_get
247 # back to the connection pool, making it available for future use. Dead
248 # connections are not stored.
249 # This is currently only used by replicate.
251 my ($self, $conn) = @_;
253 my $key = $conn->key;
254 # we do not store dead connections
255 my $peer_addr = $conn->peer_addr_string;
258 # connection is still alive, respect capacity limits
259 my $idle = $self->{idle
}->{$key} ||= [];
261 # register it in the fdmap just in case:
262 $self->{fdmap
}->{$conn->fd} = $conn;
264 if ($self->_dest_total($conn) < $self->{dest_capacity
}) {
266 push @
$idle, $conn; # yay, connection is reusable
267 $conn->set_timeout(undef); # clear timeout
272 # we have too many connections or the socket is dead, caller
273 # should close after returning from this function.
277 # enqueues a request (inflight_cb) and schedules it to run ASAP
278 # This must be used with Danga::Socket->EventLoop
280 my ($self, $ip, $port, $inflight_cb) = @_;
282 push @
{$self->{queue
}}, [ $ip, $port, $inflight_cb ];
284 # we have something in the queue, make sure it's run soon
285 $self->schedule_queued;
288 # returns the total connections to the host of a given connection
290 my ($self, $conn) = @_;
291 my $key = $conn->key;
292 my $inflight = scalar keys %{$self->{inflight
}->{$key}};
293 my $idle = scalar @
{$self->{idle
}->{$key}};
294 return $idle + $inflight;
297 # only call this from the event_hup/event_err callbacks used by Danga::Socket
299 my ($self, $conn, $close_reason) = @_;
302 my $key = $conn->key;
304 # event_read must handle errors anyways, so hand off
305 # error handling to the event_read callback if inflight.
306 return $conn->event_read if $self->{inflight
}->{$key}->{$fd};
308 # we get here if and only if the socket is idle, we can drop it ourselves
309 # splice out the socket we're closing from the idle pool
310 my $idle = $self->{idle
}->{$key};
311 foreach my $i (0 .. (scalar(@
$idle) - 1)) {
312 my $old = $idle->[$i];
314 if ($old->fd == $fd) {
315 splice(@
$idle, $i, 1);
316 $conn->close($close_reason);
320 # some connections may have expired but not been spliced out, yet
321 # splice it out here since we're iterating anyways
322 splice(@
$idle, $i, 1);
327 # unregisters and prepares connection to be closed
328 # Returns the inflight callback if there was one
329 sub conn_close_prepare
{
330 my ($self, $conn, $close_reason) = @_;
335 my $valid = delete $self->{fdmap
}->{$fd};
336 delete $self->{timeouts
}->{$fd};
338 my $inflight_cb = $self->inflight_cb_expire($conn);
340 # $valid may be undef in replicate worker which removes connections
341 # from fdmap. However, valid==undef connections should never have
343 if ($inflight_cb && !$valid) {
344 croak
("BUG: dropping unregistered conn with callback: $conn");
350 # schedules cb to run on the next tick of the event loop,
351 # (immediately after this tick runs)
353 my ($self, $cb) = @_;
354 my $on_next_tick = $self->{on_next_tick
};
355 push @
$on_next_tick, $cb;
357 if (scalar(@
$on_next_tick) == 1) {
358 Danga
::Socket
->AddTimer(0, sub {
359 # prevent scheduled callbacks from being called on _this_ tick
360 $on_next_tick = $self->{on_next_tick
};
361 $self->{on_next_tick
} = [];
363 while (my $sub = shift @
$on_next_tick) {
370 # marks a connection inflight and invokes cb on it
371 # $conn may be a error string, in which case we'll invoke the user-supplied
372 # callback with a mock error (this mimics how LWP fakes an HTTP response
373 # even if the socket could not be created/connected)
375 my ($self, $conn, $cb) = @_;
378 my $inflight = $self->{inflight
}->{$conn->key} ||= {};
379 $inflight->{$conn->fd} = $cb; # stash callback for retrying
380 $self->{total_inflight
}++;
383 # fake an error message on the response callback
384 $self->on_next_tick(sub {
385 # fatal error creating the socket, do not queue
387 $self->{class}->new_err($mfs_err, $cb);
389 # onto the next request
390 $self->schedule_queued;
395 # drops an idle connection from the idle connection pool (so we can open
396 # another socket without incurring out-of-FD errors)
397 # Only call when you're certain there's a connection to drop
398 # XXX This is O(destinations), unfortunately
399 sub _conn_drop_idle
{
401 my $idle = $self->{idle
};
403 # using "each" on the hash since it preserves the internal iterator
404 # of the hash across invocations of this sub. This should preserve
405 # the balance of idle connections in a big pool with many hosts.
406 # Thus we loop twice to ensure we scan the entire idle connection
409 while (my (undef, $val) = each %$idle) {
410 my $conn = shift @
$val or next;
412 $conn->close("idle_expire") if $conn->sock;
417 confess
("BUG: unable to drop an idle connection");
420 # checks for expired connections, this can be expensive if there
421 # are many concurrent connections waiting on timeouts, but still
422 # better than having AddTimer create a Danga::Socket::Timer object
423 # every time a timeout is reset.
426 my $timeouts = $self->{timeouts
};
427 my @fds = keys %$timeouts;
428 my $next_expiry = NEVER
;
429 my $now = Time
::HiRes
::time();
431 # this is O(n) where n is concurrent connections
432 foreach my $fd (@fds) {
433 my $conn = $timeouts->{$fd};
434 if ($conn->expired($now)) {
435 delete $timeouts->{$fd};
437 # look for the next timeout
438 my $expiry = $conn->expiry;
440 $next_expiry = $expiry if $expiry < $next_expiry;
442 # just in case, this may not happen...
443 delete $timeouts->{$fd};
448 # schedule the wakeup for the next timeout
449 if ($next_expiry == NEVER
) {
450 $self->{timer
} = undef;
452 my $timeout = $next_expiry - $now;
453 $timeout = 0 if $timeout <= 0;
454 $self->{timer
} = Danga
::Socket
->AddTimer($timeout, sub {
455 $self->check_timeouts;
458 $self->{next_expiry
} = $next_expiry;
461 # registers a timeout for a given connection, each connection may only
462 # have one pending timeout. Timeout may be undef to cancel the current
464 sub register_timeout
{
465 my ($self, $conn, $timeout) = @_;
470 $self->{timeouts
}->{$fd} = $conn;
471 my $next_expiry = $self->{next_expiry
};
472 my $old_timer = $self->{timer
};
473 my $expiry = $timeout + Time
::HiRes
::time();
475 if (!$old_timer || $expiry < $next_expiry) {
476 $self->{next_expiry
} = $expiry;
477 $self->{timer
} = Danga
::Socket
->AddTimer($timeout, sub {
478 $self->check_timeouts;
480 $old_timer->cancel if $old_timer;
483 delete $self->{timeouts
}->{$fd};
485 } elsif ($timeout) { # this may never happen...
486 # no FD, so we must allocate a new Danga::Socket::Timer object
487 # add 1msec to avoid FP rounding problems leading to missed
488 # expiration when calling conn->expired
489 Danga
::Socket
->AddTimer($timeout + 0.001, sub { $conn->expired });