Limit the number of simultaneous connect attempts passed to libpurple.
[thrasher.git] / perl / lib / Thrasher / ConnectionManager.pm
blobe31ef805c86523b42063e95278414815ce2d17d5
1 package Thrasher::ConnectionManager;
2 use strict;
3 use warnings;
5 =pod
7 =head1 NAME
9 Thrasher::ConnectionManager - manage the act of connecting to the
10 legacy services
12 =head1 DESCRIPTION
14 Thrasher::ConnectionManager centralizes and separates the management
15 of whether or not we want to connect to a legacy server. Likely this
16 will get more complicated later.
18 Suppose we have a couple hundred users on the gateway when the
19 connection is lost to the legacy service, that is, I<entirely> lost.
20 Later, it comes back. We don't want to hammer the service with
21 repeated connection attempts from each user.
23 We also want to be able to control the rate at which re-connection
24 attempts are made. If someone tries to log on during the period when
25 the connection is down, we don't really want them to add to the
26 failure proceedings.
28 ::ConnectionManager implements the ability to schedule connection
29 attempts, collect information about whether the logins are successful
30 or not, and decide when to attempt to log in again.
32 This module sort of functions as a singleton (sort of), exporting the
33 following function:
35 =over 4
37 =item *
39 C<request_connect>($connection_closure): Request that the given
40 closure be run at some point in the future. This closure should
41 I<eventually> result in a call to C<connection_success> or
42 C<connection_failure>, which may itself arise from a callback
43 or something (like for a connection), which is why we can't
44 just use the return value. The connection manager has no
45 way to enforce this.
47 A connection is successful if the user successfully logged on.
48 That is, a password failure is a failure too. This avoids the
49 case where we could hammer a service that was having
50 authentication problems.
52 =back
54 =cut
56 use Carp qw(confess);
58 our $NOW = undef;
60 # Setting this to 1 will simply bypass this entire thing.
61 our $IGNORE_CONNECTION_MANAGER = 0;
63 use base 'Exporter';
64 our @EXPORT_OK = qw(decayable connection_success connection_failure
65 request_connect schedule_request);
66 our %EXPORT_TAGS = (all => \@EXPORT_OK);
68 use Thrasher::Log qw(log debug);
69 require Thrasher::Component;
71 # $SIMULTANEOUS_CONNECT_BOUND's initial value sets the maximum number
72 # of simultaneous, possibly asynchronous, connection attempts. It will
73 # be decremented each time a connection attempt starts and incremented
74 # when it finishes. Thus, at any given time while the
75 # ConnectionManager is managing it will be the current number of new
76 # connect attempts that could start at that time.
78 # This should usually be a low limit. Most protocols have several DNS
79 # lookups and TCP connections per IM login.
80 our $SIMULTANEOUS_CONNECT_BOUND = 2;
82 sub decayable {
83 return new Thrasher::ConnectionManager::Decayable(@_);
86 # So, the design goals:
87 # * If there's no reason to suspect network troubles, connect away.
88 # * If there's a total network disconnect, a bare minimum of
89 # connection attempts should be made, not a straight
90 # (number of logins) * (constant frequency) * time
91 # * It can't take too long to recover from a total network failure.
92 # Even for hundreds of people, it should recover relatively quickly.
93 # * If at all possible, this should adapt to rate limiting. (It may
94 # not be possible if the legacy service simply cuts of your
95 # service, rather than metering it.)
97 # Approach: We define a "decayable number", which exponentially
98 # decays over time, as seen in
99 # Thrasher::ConnectionManager::Decayable. As successful connections
100 # are made, the count of successful connections is incremented. As
101 # unsuccessful connections are made, the count of unsuccessful
102 # connections is incremented. The ratio of the two is used to
103 # determine connection time, bounded on both ends by a limit
104 # (if it's below a certain number, we simply connect immediately,
105 # if it's above a certain number we assume it needs to be bounded
106 # to something like one attempt every 30 seconds). We also, after
107 # a certain point, start queuing up connection requests. This way, the
108 # system reacts to the current apparent state of the connection
109 # to the legacy service without hammering on the legacy system,
110 # which may be considered hostile.
113 ### TUNABLE PARAMETERS
115 # This is the base decay rate applied to our two counters,
116 # applied directly to the failure and indirectly to the success
117 # counter.
118 our $decay_rate = .65;
120 # Optimism is how much more likely we are to think that we can
121 # connect again. Values about that cause the system to "remember"
122 # success more strongly than failure.
123 our $optimism = 3;
125 my $log30 = CORE::log(30);
127 ### Values
129 our $success = decayable(5, 1 - ((1-$decay_rate) / $optimism));
130 our $failure = decayable(0, $decay_rate);
132 # This is incremented once per connection attempt and is used to
133 # limit the total rate of reconnect attempts.
134 # Please, Thrasher, don't hammer 'em.
135 our $hammering = decayable(0, .05);
136 our $hammering_limit = 15;
138 # Things queued up because we can't try to connect yet.
139 my @queue;
141 my $next_connection_attempt_time;
143 # This takes the clousure to run and the time to run it. This
144 # allows you to either call out to the event loop, or, in testing,
145 # override this and capture the execution attempts to verify
146 # the time is correct.
147 our $scheduler = sub {
148 my $closure = shift;
149 my $timeout = shift;
151 return $Thrasher::Component::COMPONENT->{'event_loop'}->schedule($closure,
152 $timeout);
155 # "If I make a request right now, what will the delay and
156 # forcibly_enqueue values be?"
157 sub schedule_request {
158 # Check that we aren't hammering the remote server.
159 if ($hammering->value > $hammering_limit) {
160 log("Hammering: " . $hammering->value . " of $hammering_limit."
161 ." Delaying connection request.");
162 return [0, 1];
165 my $current_success = $success->value;
166 my $current_failure = $failure->value;
168 if ($current_failure < 0.0001) {
169 log("Never failed, immediately connecting.");
170 return [0, 0];
172 if ($current_success < 0.00001 && $current_failure > 0.00001) {
173 log("Can't remember having had failure or success, immediately connecting.");
174 return [0, 1];
176 if ($current_success < 0.0001) {
177 log("Can't remember having succeeded ($current_success), "
178 ."but I remember failure ($current_failure). Max delay.");
179 return [30, 0];
182 my $ratio = $current_failure / $current_success;
184 if ($ratio > $log30) {
185 log("Failure/success ratio: $ratio, ($current_failure / $current_success), capped to 30 second delay");
186 return [30, 0];
189 my $delay = exp($ratio) - 1;
190 log("Failure/success ratio: $ratio ($current_failure / $current_success), comes out to $delay for delay");
191 return [$delay, 0];
194 # This returns true if the connection will be immediate,
195 # otherwise it returns false.
196 sub request_connect {
197 my $connection_closure = shift;
199 # Deal with this when we once again reschedule things.
200 if (defined($next_connection_attempt_time) &&
201 ($NOW || time) < $next_connection_attempt_time) {
202 push @queue, $connection_closure;
203 return 0;
206 if ($SIMULTANEOUS_CONNECT_BOUND <= 0) {
207 log('Enough connect attempts in progress; delaying.');
208 push(@queue, $connection_closure);
209 schedule_connection_executor(1);
210 return 0;
212 elsif (!@queue) {
213 my ($delay, $forcibly_enqueue) = @{schedule_request()};
215 # Catch this in the next scheduling run.
216 if ($forcibly_enqueue) {
217 push @queue, $connection_closure;
218 schedule_connection_executor(1);
219 return 0;
222 # Little to no delay called for, just go for it
223 # (leaning on hammering protection to prevent hammering)
224 if ($delay < 1) {
225 service_request($connection_closure);
226 return 1;
229 # We're not in a hammering delay and we need to
230 # reschedule to delay beyond a second
231 push @queue, $connection_closure;
232 schedule_connection_executor($delay);
233 return 0;
234 } else {
235 push @queue, $connection_closure;
236 return 0;
240 # schedule_connection_executor($timeout_in_seconds)
241 my $run_scheduled = 0;
242 sub schedule_connection_executor {
243 my $timeout = shift;
245 if ($run_scheduled) {
246 # request_connect() called in between scheduled
247 # execution_runner() calls.
248 return;
251 $next_connection_attempt_time = ($NOW || time) + $timeout;
253 my $execution_runner = sub {
254 log(scalar(@queue) . ' connection attempts queued.');
255 if (@queue && $SIMULTANEOUS_CONNECT_BOUND <= 0) {
256 debug('Enough connect attempts in progress (not rescheduled).');
257 return 1;
260 my $new_timeout = eval {
261 return connection_executor();
263 if ($@) {
264 $run_scheduled = 0;
265 die;
267 elsif (! $new_timeout) {
268 debug("Executor done.");
269 $run_scheduled = 0;
270 return 0;
272 elsif ($new_timeout == $timeout) {
273 debug("Next executor in $timeout seconds (not rescheduled).");
274 return 1;
276 else {
277 debug("Next executor in $new_timeout seconds (rescheduled).");
278 $run_scheduled = 0;
279 schedule_connection_executor($new_timeout);
280 return 0;
284 $run_scheduled = $scheduler->($execution_runner,
285 $timeout * 1000);
288 # connection_executor() is what is run by the scheduler, which will
289 # propogate itself until it runs out of work.
291 # Returns the new $timeout_in_seconds until the desired next run, or
292 # false when no next run should be scheduled.
293 sub connection_executor {
294 # If we're still in a hammering lock, delay another second
295 if ($hammering->value >= $hammering_limit) {
296 return 1;
299 if (!@queue) {
300 # We've finally emptied the queue. Resume normal usage.
301 return 0;
304 elsif ($SIMULTANEOUS_CONNECT_BOUND <= 0) {
305 # Delay repeat until some connection attempts finish.
306 return 1;
309 # Otherwise, it's time to take one thing off the queue and run it.
310 my $current_closure = shift @queue;
311 service_request($current_closure);
313 # And now it's time to schedule the next request
314 my ($delay, $forcibly_enqueue) = @{schedule_request()};
315 if ($forcibly_enqueue) {
316 return 1;
319 if ($delay < 1) {
320 goto &connection_executor;
323 # Otherwise, reschedule $delay seconds
324 return $delay;
327 sub service_request {
328 my $closure = shift;
329 $hammering->add(1);
330 local $@;
332 if (!defined($closure)) {
333 confess "Undefined closure passed to service_request.";
335 log("Servicing connection request.");
336 eval {
337 $closure->();
339 if ($@) {
340 log("Failure in connection callback: $@");
342 else {
343 $SIMULTANEOUS_CONNECT_BOUND--;
347 sub connection_success {
348 $SIMULTANEOUS_CONNECT_BOUND++;
350 debug('Connection success managed.');
351 $success->add(1);
354 sub connection_failure {
355 $SIMULTANEOUS_CONNECT_BOUND++;
357 debug('Connection failure managed.');
358 my $current_schedule = schedule_request;
359 if ($current_schedule->[0] != 30) {
360 $failure->add(10);
361 } else {
362 log("Declining to increment failure.");
366 package Thrasher::ConnectionManager::Decayable;
367 use strict;
368 use warnings;
370 # This implements a "decayable" number. It starts with the value
371 # you give it, and is multiplied by the decay rate raise to the
372 # power of the number of minutes since the last time it was queried.
373 # The fact that this tends to decay to zero is desired.
374 # Example: new Thrasher::ConnectionManager::Decayable(16, .5)
375 # will be 8 in one minute, 4 in the next, 2 in the next, and so on.
376 # The transition is smooth; it will be 11.3137... after 30 seconds.
378 use Time::HiRes qw(time tv_interval);
380 sub new {
381 my $class = shift;
382 my $current_value = shift;
383 my $decay_rate = shift;
384 my $now = $Thrasher::ConnectionManager::NOW || scalar(time);
386 my $decayable = { value => $current_value,
387 decay_rate => $decay_rate,
388 last_computed => $now };
389 bless $decayable, $class;
391 return $decayable;
394 sub value {
395 my $self = shift;
396 my $now = $Thrasher::ConnectionManager::NOW || scalar(time);
398 my $since_last_computation = $now - $self->{last_computed};
399 my $minutes = $since_last_computation / 60;
401 my $decay = $self->{decay_rate} ** $minutes;
402 my $current_value = $self->{value} * $decay;
404 $self->{value} = $current_value;
405 $self->{last_computed} = $now;
407 return $self->{value};
410 sub add {
411 my $self = shift;
412 my $inc = shift;
414 my $value = $self->value;
415 $self->{value} = $value + $inc;
417 return $self->{value};
420 sub subtract {
421 my $self = shift;
422 my $dec = shift;
423 $self->add(-$dec);