1 # a connection pool class with queueing.
2 # (something doesn't sound quite right with that...)
3 # This requires Danga::Socket to drive, but may also function without it
4 # via conn_get/conn_put.
5 package MogileFS
::ConnectionPool
;
8 use Carp
qw(croak confess);
11 use constant NEVER
=> (0xffffffff << 32) | 0xffffffff; # portable version :P
14 my ($class, $conn_class, $opts) = @_;
18 fdmap
=> {}, # { fd -> conn }
19 idle
=> {}, # ip:port -> [ MogileFS::Connection::Poolable, ... ]
20 queue
=> [], # [ [ ip, port, callback ], ... ]
21 timer
=> undef, # Danga::Socket::Timer object
22 timeouts
=> {}, # { fd -> conn }
23 inflight
=> {}, # ip:port -> { fd -> callback }
24 total_inflight
=> 0, # number of inflight connections
25 dest_capacity
=> $opts->{dest_capacity
},
26 total_capacity
=> $opts->{total_capacity
},
28 scheduled
=> 0, # set if we'll start tasks on next tick
33 # total_capacity=20 matches what we used with LWP
34 $self->{total_capacity
} ||= 20;
36 # allow users to specify per-destination capacity limits
37 $self->{dest_capacity
} ||= $self->{total_capacity
};
42 # retrieves an idle connection for the [IP, port] pair
44 my ($self, $ip, $port) = @_;
46 my $key = "$ip:$port";
47 my $idle = $self->{idle
}->{$key} or return;
49 # the Danga::Socket event loop may detect hangups and close sockets,
50 # However not all MFS workers run this event loop, so we need to
51 # validate the connection when retrieving a connection from the pool
52 while (my $conn = pop @
$idle) {
53 # make sure the socket is valid:
55 # due to response callback ordering, we actually place connections
56 # in the pool before invoking the user-supplied response callback
57 # (to allow connections to get reused ASAP)
58 my $sock = $conn->sock or next;
60 # hope this returns EAGAIN, not using OO->sysread here since
61 # Net::HTTP::NB overrides that and we _want_ to hit EAGAIN here
62 my $r = sysread($sock, my $byte, 1);
64 # good, connection is possibly still alive if we got EAGAIN
65 return $conn if (!defined $r && $!{EAGAIN
});
70 # a common case and to be expected
71 $err = "server dropped idle connection";
73 # this is a bug either on our side or the HTTP server
74 Mgd
::error
("Bug: unexpected got $r bytes from idle conn to ". $conn->host_port. ") (byte=$byte)");
78 # connection is bad, close the socket and move onto the
79 # next idle connection if there is one.
86 # creates a new connection if under capacity
87 # returns undef if we're at capacity (or on EMFILE/ENFILE)
89 my ($self, $ip, $port) = @_;
90 my $key = "$ip:$port";
92 # we only call this sub if we don't have idle connections, so
93 # we don't check {idle} here
95 # make sure we're not already at capacity for this destination
96 my $nr_inflight = scalar keys %{$self->{inflight
}->{$key} ||= {}};
97 return if ($nr_inflight >= $self->{dest_capacity
});
99 # see how we're doing with regard to total capacity:
100 if ($self->_total_connections >= $self->{total_capacity
}) {
101 # see if we have idle connections for other pools to kill
102 if ($self->{total_inflight
} < $self->{total_capacity
}) {
103 # we have idle connections to other destinations, drop one of those
104 $self->_conn_drop_idle;
105 # fall through to creating a new connection
107 # we're at total capacity for the entire pool
112 # we're hopefully under capacity if we got here, create a new connection
113 $self->_conn_new($ip, $port);
116 # creates new connection and registers it in our fdmap
117 # returns error string if resources (FDs, buffers) aren't available
119 my ($self, $ip, $port) = @_;
121 # calls MogileFS::Connection::{HTTP,Mogstored}->new:
122 my $conn = $self->{class}->new($ip, $port);
124 # register the connection
125 $self->{fdmap
}->{$conn->fd} = $conn;
126 $conn->set_pool($self);
130 # EMFILE/ENFILE should never happen as the capacity for this
131 # pool is far under the system defaults. Just give up on
132 # EMFILE/ENFILE like any other error.
133 return "failed to create socket to $ip:$port ($!)";
137 # retrieves a connection, may return undef if at capacity
139 my ($self, $ip, $port) = @_;
141 # if we have idle connections, always use them first
142 $self->_conn_idle_get($ip, $port) || $self->_conn_new_maybe($ip, $port);
145 # Pulls a connection out of the pool for synchronous use.
146 # This may create a new connection (independent of pool limits).
147 # The connection returned by this is _blocking_. This is currently
148 # only used by replicate.
150 my ($self, $ip, $port) = @_;
151 my $conn = $self->_conn_idle_get($ip, $port);
154 # in case the connection never comes back, let refcounting close() it:
155 delete $self->{fdmap
}->{$conn->fd};
157 $conn = $self->_conn_new($ip, $port);
159 $! = $conn; # $conn is an error message :<
162 delete $self->{fdmap
}->{$conn->fd};
163 my $timeout = MogileFS
->config("node_timeout");
164 MogileFS
::Util
::wait_for_writeability
($conn->fd, $timeout) or return;
170 # retrieves a connection from the connection pool and executes
171 # inflight_cb on it. If the pool is at capacity, this will queue the task.
172 # This relies on Danga::Socket->EventLoop
174 my ($self, $ip, $port, $inflight_cb) = @_;
176 my $conn = $self->_conn_get($ip, $port);
178 $self->_conn_run($conn, $inflight_cb);
179 } else { # we're too busy right now, queue up
180 $self->enqueue($ip, $port, $inflight_cb);
184 # returns the total number of connections we have
185 sub _total_connections
{
187 return scalar keys %{$self->{fdmap
}};
190 # marks a connection as no longer inflight, returns the inflight
191 # callback if the connection was active, undef if not
192 sub inflight_cb_expire
{
193 my ($self, $conn) = @_;
194 my $inflight_cb = delete $self->{inflight
}->{$conn->key}->{$conn->fd};
195 $self->{total_inflight
}-- if $inflight_cb;
200 # schedules the event loop to dequeue and run a task on the next
201 # tick of the Danga::Socket event loop. Call this
202 # 1) whenever a task is enqueued
203 # 2) whenever a task is complete
204 sub schedule_queued
{
207 # AddTimer(0) to avoid potential stack overflow
208 $self->{scheduled
} ||= Danga
::Socket
->AddTimer(0, sub {
209 $self->{scheduled
} = undef;
210 my $queue = $self->{queue
};
212 my $total_capacity = $self->{total_capacity
};
215 while ($self->{total_inflight
} < $total_capacity
216 && $i <= (scalar(@
$queue) - 1)) {
217 my ($ip, $port, $cb) = @
{$queue->[$i]};
219 my $conn = $self->_conn_get($ip, $port);
221 splice(@
$queue, $i, 1); # remove from queue
222 $self->_conn_run($conn, $cb);
224 # this queue object cannot be dequeued, skip it for now
231 # Call this when done using an (inflight) connection
232 # This possibly places a connection in the connection pool.
233 # This will close the connection of the pool is already at capacity.
234 # This will also start the next queued callback, or retry if needed
236 my ($self, $conn) = @_;
238 # schedule the next request if we're done with any connection
239 $self->schedule_queued;
240 $self->conn_put($conn);
243 # The opposite of conn_get, this returns a connection retrieved with conn_get
244 # back to the connection pool, making it available for future use. Dead
245 # connections are not stored.
246 # This is currently only used by replicate.
248 my ($self, $conn) = @_;
250 my $key = $conn->key;
251 # we do not store dead connections
252 my $peer_addr = $conn->peer_addr_string;
255 # connection is still alive, respect capacity limits
256 my $idle = $self->{idle
}->{$key} ||= [];
258 # register it in the fdmap just in case:
259 $self->{fdmap
}->{$conn->fd} = $conn;
261 if ($self->_dest_total($conn) < $self->{dest_capacity
}) {
263 push @
$idle, $conn; # yay, connection is reusable
264 $conn->set_timeout(undef); # clear timeout
269 # we have too many connections or the socket is dead, caller
270 # should close after returning from this function.
274 # enqueues a request (inflight_cb) and schedules it to run ASAP
275 # This must be used with Danga::Socket->EventLoop
277 my ($self, $ip, $port, $inflight_cb) = @_;
279 push @
{$self->{queue
}}, [ $ip, $port, $inflight_cb ];
281 # we have something in the queue, make sure it's run soon
282 $self->schedule_queued;
285 # returns the total connections to the host of a given connection
287 my ($self, $conn) = @_;
288 my $key = $conn->key;
289 my $inflight = scalar keys %{$self->{inflight
}->{$key}};
290 my $idle = scalar @
{$self->{idle
}->{$key}};
291 return $idle + $inflight;
294 # only call this from the event_hup/event_err callbacks used by Danga::Socket
296 my ($self, $conn, $close_reason) = @_;
299 my $key = $conn->key;
301 # event_read must handle errors anyways, so hand off
302 # error handling to the event_read callback if inflight.
303 return $conn->event_read if $self->{inflight
}->{$key}->{$fd};
305 # we get here if and only if the socket is idle, we can drop it ourselves
306 # splice out the socket we're closing from the idle pool
307 my $idle = $self->{idle
}->{$key};
308 foreach my $i (0 .. (scalar(@
$idle) - 1)) {
309 my $old = $idle->[$i];
311 if ($old->fd == $fd) {
312 splice(@
$idle, $i, 1);
313 $conn->close($close_reason);
317 # some connections may have expired but not been spliced out, yet
318 # splice it out here since we're iterating anyways
319 splice(@
$idle, $i, 1);
324 # unregisters and prepares connection to be closed
325 # Returns the inflight callback if there was one
326 sub conn_close_prepare
{
327 my ($self, $conn, $close_reason) = @_;
332 my $valid = delete $self->{fdmap
}->{$fd};
333 delete $self->{timeouts
}->{$fd};
335 my $inflight_cb = $self->inflight_cb_expire($conn);
337 # $valid may be undef in replicate worker which removes connections
338 # from fdmap. However, valid==undef connections should never have
340 if ($inflight_cb && !$valid) {
341 croak
("BUG: dropping unregistered conn with callback: $conn");
347 # schedules cb to run on the next tick of the event loop,
348 # (immediately after this tick runs)
350 my ($self, $cb) = @_;
351 my $on_next_tick = $self->{on_next_tick
};
352 push @
$on_next_tick, $cb;
354 if (scalar(@
$on_next_tick) == 1) {
355 Danga
::Socket
->AddTimer(0, sub {
356 # prevent scheduled callbacks from being called on _this_ tick
357 $on_next_tick = $self->{on_next_tick
};
358 $self->{on_next_tick
} = [];
360 while (my $sub = shift @
$on_next_tick) {
367 # marks a connection inflight and invokes cb on it
368 # $conn may be a error string, in which case we'll invoke the user-supplied
369 # callback with a mock error (this mimics how LWP fakes an HTTP response
370 # even if the socket could not be created/connected)
372 my ($self, $conn, $cb) = @_;
375 my $inflight = $self->{inflight
}->{$conn->key} ||= {};
376 $inflight->{$conn->fd} = $cb; # stash callback for retrying
377 $self->{total_inflight
}++;
380 # fake an error message on the response callback
381 $self->on_next_tick(sub {
382 # fatal error creating the socket, do not queue
384 $self->{class}->new_err($mfs_err, $cb);
386 # onto the next request
387 $self->schedule_queued;
392 # drops an idle connection from the idle connection pool (so we can open
393 # another socket without incurring out-of-FD errors)
394 # Only call when you're certain there's a connection to drop
395 # XXX This is O(destinations), unfortunately
396 sub _conn_drop_idle
{
398 my $idle = $self->{idle
};
400 foreach my $val (values %$idle) {
401 my $conn = shift @
$val or next;
403 $conn->close("idle_expire") if $conn->sock;
407 confess
("BUG: unable to drop an idle connection");
410 # checks for expired connections, this can be expensive if there
411 # are many concurrent connections waiting on timeouts, but still
412 # better than having AddTimer create a Danga::Socket::Timer object
413 # every time a timeout is reset.
416 my $timeouts = $self->{timeouts
};
417 my @fds = keys %$timeouts;
418 my $next_expiry = NEVER
;
419 my $now = Time
::HiRes
::time();
421 # this is O(n) where n is concurrent connections
422 foreach my $fd (@fds) {
423 my $conn = $timeouts->{$fd};
424 if ($conn->expired($now)) {
425 delete $timeouts->{$fd};
427 # look for the next timeout
428 my $expiry = $conn->expiry;
430 $next_expiry = $expiry if $expiry < $next_expiry;
432 # just in case, this may not happen...
433 delete $timeouts->{$fd};
438 # schedule the wakeup for the next timeout
439 if ($next_expiry == NEVER
) {
440 $self->{timer
} = undef;
442 my $timeout = $next_expiry - $now;
443 $timeout = 0 if $timeout <= 0;
444 $self->{timer
} = Danga
::Socket
->AddTimer($timeout, sub {
445 $self->check_timeouts;
448 $self->{next_expiry
} = $next_expiry;
451 # registers a timeout for a given connection, each connection may only
452 # have one pending timeout. Timeout may be undef to cancel the current
454 sub register_timeout
{
455 my ($self, $conn, $timeout) = @_;
460 $self->{timeouts
}->{$fd} = $conn;
461 my $next_expiry = $self->{next_expiry
};
462 my $old_timer = $self->{timer
};
463 my $expiry = $timeout + Time
::HiRes
::time();
465 if (!$old_timer || $expiry < $next_expiry) {
466 $self->{next_expiry
} = $expiry;
467 $self->{timer
} = Danga
::Socket
->AddTimer($timeout, sub {
468 $self->check_timeouts;
470 $old_timer->cancel if $old_timer;
473 delete $self->{timeouts
}->{$fd};
475 } elsif ($timeout) { # this may never happen...
476 # no FD, so we must allocate a new Danga::Socket::Timer object
477 # add 1msec to avoid FP rounding problems leading to missed
478 # expiration when calling conn->expired
479 Danga
::Socket
->AddTimer($timeout + 0.001, sub { $conn->expired });