Net::REPL::Client: Allow read/print portions to be overridden separately.
[thrasher.git] / perl / lib / Thrasher / ConnectionManager.pm
blob25afc7a56a506217cb7285a542a60802d89e8693
1 package Thrasher::ConnectionManager;
2 use strict;
3 use warnings;
5 =pod
7 =head1 NAME
9 Thrasher::ConnectionManager - manage the act of connecting to the
10 legacy services
12 =head1 DESCRIPTION
14 Thrasher::ConnectionManager centralizes and separates the management
15 of whether or not we want to connect to a legacy server. Likely this
16 will get more complicated later.
18 Suppose we have a couple hundred users on the gateway when the
19 connection is lost to the legacy service, that is, I<entirely> lost.
20 Later, it comes back. We don't want to hammer the service with
21 repeated connection attempts from each user.
23 We also want to be able to control the rate at which re-connection
24 attempts are made. If someone tries to log on during the period when
25 the connection is down, we don't really want them to add to the
26 failure proceedings.
28 ::ConnectionManager implements the ability to schedule connection
29 attempts, collect information about whether the logins are successful
30 or not, and decide when to attempt to log in again.
32 This module sort of functions as a singleton (sort of), exporting the
33 following function:
35 =over 4
37 =item *
39 C<request_connect>($connection_closure): Request that the given
40 closure be run at some point in the future. This closure should
41 I<eventually> result in a call to C<connection_success> or
42 C<connection_failure>, which may itself arise from a callback
43 or something (like for a connection), which is why we can't
44 just use the return value. The connection manager has no
45 way to enforce this.
47 A connection is successful if the user successfully logged on.
48 That is, a password failure is a failure too. This avoids the
49 case where we could hammer a service that was having
50 authentication problems.
52 =back
54 =cut
56 use Carp qw(confess);
58 our $NOW = undef;
60 # Setting this to 1 will simply bypass this entire thing.
61 our $IGNORE_CONNECTION_MANAGER = 0;
63 use base 'Exporter';
64 our @EXPORT_OK = qw(decayable connection_success connection_failure
65 request_connect schedule_request);
66 our %EXPORT_TAGS = (all => \@EXPORT_OK);
68 use Thrasher::Log qw(log debug);
69 require Thrasher::Component;
71 sub decayable {
72 return new Thrasher::ConnectionManager::Decayable(@_);
75 # So, the design goals:
76 # * If there's no reason to suspect network troubles, connect away.
77 # * If there's a total network disconnect, a bare minimum of
78 # connection attempts should be made, not a straight
79 # (number of logins) * (constant frequency) * time
80 # * It can't take too long to recover from a total network failure.
81 # Even for hundreds of people, it should recover relatively quickly.
82 # * If at all possible, this should adapt to rate limiting. (It may
83 # not be possible if the legacy service simply cuts of your
84 # service, rather than metering it.)
86 # Approach: We define a "decayable number", which exponentially
87 # decays over time, as seen in
88 # Thrasher::ConnectionManager::Decayable. As successful connections
89 # are made, the count of successful connections is incremented. As
90 # unsuccessful connections are made, the count of unsuccessful
91 # connections is incremented. The ratio of the two is used to
92 # determine connection time, bounded on both ends by a limit
93 # (if it's below a certain number, we simply connect immediately,
94 # if it's above a certain number we assume it needs to be bounded
95 # to something like one attempt every 30 seconds). We also, after
96 # a certain point, start queuing up connection requests. This way, the
97 # system reacts to the current apparent state of the connection
98 # to the legacy service without hammering on the legacy system,
99 # which may be considered hostile.
102 ### TUNABLE PARAMETERS
104 # This is the base decay rate applied to our two counters,
105 # applied directly to the failure and indirectly to the success
106 # counter.
107 our $decay_rate = .65;
109 # Optimism is how much more likely we are to think that we can
110 # connect again. Values about that cause the system to "remember"
111 # success more strongly than failure.
112 our $optimism = 3;
114 my $log30 = CORE::log(30);
116 ### Values
118 our $success = decayable(5, 1 - ((1-$decay_rate) / $optimism));
119 our $failure = decayable(0, $decay_rate);
121 # This is incremented once per connection attempt and is used to
122 # limit the total rate of reconnect attempts.
123 # Please, Thrasher, don't hammer 'em.
124 our $hammering = decayable(0, .05);
125 our $hammering_limit = 15;
127 # Things queued up because we can't try to connect yet.
128 my @queue;
130 my $next_connection_attempt_time;
132 # This takes the clousure to run and the time to run it. This
133 # allows you to either call out to the event loop, or, in testing,
134 # override this and capture the execution attempts to verify
135 # the time is correct.
136 our $scheduler = sub {
137 my $closure = shift;
138 my $timeout = shift;
140 return $Thrasher::Component::COMPONENT->{'event_loop'}->schedule($closure,
141 $timeout);
144 # "If I make a request right now, what will the delay and
145 # forcibly_enqueue values be?"
146 sub schedule_request {
147 # Check that we aren't hammering the remote server.
148 if ($hammering->value > $hammering_limit) {
149 log("Hammering: " . $hammering->value . " of $hammering_limit."
150 ." Delaying connection request.");
151 return [0, 1];
154 my $current_success = $success->value;
155 my $current_failure = $failure->value;
157 if ($current_failure < 0.0001) {
158 log("Never failed, immediately connecting.");
159 return [0, 0];
161 if ($current_success < 0.00001 && $current_failure > 0.00001) {
162 log("Can't remember having had failure or success, immediately connecting.");
163 return [0, 1];
165 if ($current_success < 0.0001) {
166 log("Can't remember having succeeded ($current_success), "
167 ."but I remember failure ($current_failure). Max delay.");
168 return [30, 0];
171 my $ratio = $current_failure / $current_success;
173 if ($ratio > $log30) {
174 log("Failure/success ratio: $ratio, ($current_failure / $current_success), capped to 30 second delay");
175 return [30, 0];
178 my $delay = exp($ratio) - 1;
179 log("Failure/success ratio: $ratio ($current_failure / $current_success), comes out to $delay for delay");
180 return [$delay, 0];
183 # This returns true if the connection will be immediate,
184 # otherwise it returns false.
185 sub request_connect {
186 my $connection_closure = shift;
188 # Deal with this when we once again reschedule things.
189 if (defined($next_connection_attempt_time) &&
190 ($NOW || time) < $next_connection_attempt_time) {
191 push @queue, $connection_closure;
192 return 0;
195 if (!@queue) {
196 my ($delay, $forcibly_enqueue) = @{schedule_request()};
198 # Catch this in the next scheduling run.
199 if ($forcibly_enqueue) {
200 push @queue, $connection_closure;
201 schedule_connection_executor(1);
202 return 0;
205 # Little to no delay called for, just go for it
206 # (leaning on hammering protection to prevent hammering)
207 if ($delay < 1) {
208 service_request($connection_closure);
209 return 1;
212 # We're not in a hammering delay and we need to
213 # reschedule to delay beyond a second
214 push @queue, $connection_closure;
215 schedule_connection_executor($delay);
216 return 0;
217 } else {
218 push @queue, $connection_closure;
219 return 0;
223 # schedule_connection_executor($timeout_in_seconds)
224 my $run_scheduled = 0;
225 sub schedule_connection_executor {
226 my $timeout = shift;
228 if ($run_scheduled) {
229 # request_connect() called in between scheduled
230 # execution_runner() calls.
231 return;
234 $next_connection_attempt_time = ($NOW || time) + $timeout;
236 my $execution_runner = sub {
237 log(scalar(@queue) . ' connection attempts queued.');
238 my $new_timeout = eval {
239 return connection_executor();
241 if ($@) {
242 $run_scheduled = 0;
243 die;
245 elsif (! $new_timeout) {
246 debug("Executor done.");
247 $run_scheduled = 0;
248 return 0;
250 elsif ($new_timeout == $timeout) {
251 debug("Next executor in $timeout seconds (not rescheduled).");
252 return 1;
254 else {
255 debug("Next executor in $new_timeout seconds (rescheduled).");
256 $run_scheduled = 0;
257 schedule_connection_executor($new_timeout);
258 return 0;
262 $run_scheduled = $scheduler->($execution_runner,
263 $timeout * 1000);
266 # connection_executor() is what is run by the scheduler, which will
267 # propogate itself until it runs out of work.
269 # Returns the new $timeout_in_seconds until the desired next run, or
270 # false when no next run should be scheduled.
271 sub connection_executor {
272 # If we're still in a hammering lock, delay another second
273 if ($hammering->value >= $hammering_limit) {
274 return 1;
277 if (!@queue) {
278 # We've finally emptied the queue. Resume normal usage.
279 return 0;
282 # Otherwise, it's time to take one thing off the queue and run it.
283 my $current_closure = shift @queue;
284 service_request($current_closure);
286 # And now it's time to schedule the next request
287 my ($delay, $forcibly_enqueue) = @{schedule_request()};
288 if ($forcibly_enqueue) {
289 return 1;
292 if ($delay < 1) {
293 goto &connection_executor;
296 # Otherwise, reschedule $delay seconds
297 return $delay;
300 sub service_request {
301 my $closure = shift;
302 $hammering->add(1);
303 local $@;
305 if (!defined($closure)) {
306 confess "Undefined closure passed to service_request.";
308 log("Servicing connection request.");
309 eval {
310 $closure->();
312 if ($@) {
313 log("Failure in connection callback: $@");
317 sub connection_success {
318 $success->add(1);
321 sub connection_failure {
322 my $current_schedule = schedule_request;
323 if ($current_schedule->[0] != 30) {
324 $failure->add(10);
325 } else {
326 log("Declining to increment failure.");
330 package Thrasher::ConnectionManager::Decayable;
331 use strict;
332 use warnings;
334 # This implements a "decayable" number. It starts with the value
335 # you give it, and is multiplied by the decay rate raise to the
336 # power of the number of minutes since the last time it was queried.
337 # The fact that this tends to decay to zero is desired.
338 # Example: new Thrasher::ConnectionManager::Decayable(16, .5)
339 # will be 8 in one minute, 4 in the next, 2 in the next, and so on.
340 # The transition is smooth; it will be 11.3137... after 30 seconds.
342 use Time::HiRes qw(time tv_interval);
344 sub new {
345 my $class = shift;
346 my $current_value = shift;
347 my $decay_rate = shift;
348 my $now = $Thrasher::ConnectionManager::NOW || scalar(time);
350 my $decayable = { value => $current_value,
351 decay_rate => $decay_rate,
352 last_computed => $now };
353 bless $decayable, $class;
355 return $decayable;
358 sub value {
359 my $self = shift;
360 my $now = $Thrasher::ConnectionManager::NOW || scalar(time);
362 my $since_last_computation = $now - $self->{last_computed};
363 my $minutes = $since_last_computation / 60;
365 my $decay = $self->{decay_rate} ** $minutes;
366 my $current_value = $self->{value} * $decay;
368 $self->{value} = $current_value;
369 $self->{last_computed} = $now;
371 return $self->{value};
374 sub add {
375 my $self = shift;
376 my $inc = shift;
378 my $value = $self->value;
379 $self->{value} = $value + $inc;
381 return $self->{value};
384 sub subtract {
385 my $self = shift;
386 my $dec = shift;
387 $self->add(-$dec);