1 package Thrasher
::ConnectionManager
;
9 Thrasher::ConnectionManager - manage the act of connecting to the
14 Thrasher::ConnectionManager centralizes and separates the management
15 of whether or not we want to connect to a legacy server. Likely this
16 will get more complicated later.
18 Suppose we have a couple hundred users on the gateway when the
19 connection is lost to the legacy service, that is, I<entirely> lost.
20 Later, it comes back. We don't want to hammer the service with
21 repeated connection attempts from each user.
23 We also want to be able to control the rate at which re-connection
24 attempts are made. If someone tries to log on during the period when
25 the connection is down, we don't really want them to add to the
28 ::ConnectionManager implements the ability to schedule connection
29 attempts, collect information about whether the logins are successful
30 or not, and decide when to attempt to log in again.
32 This module sort of functions as a singleton (sort of), exporting the
39 C<request_connect>($connection_closure): Request that the given
40 closure be run at some point in the future. This closure should
41 I<eventually> result in a call to C<connection_success> or
42 C<connection_failure>, which may itself arise from a callback
43 or something (like for a connection), which is why we can't
44 just use the return value. The connection manager has no
47 A connection is successful if the user successfully logged on.
48 That is, a password failure is a failure too. This avoids the
49 case where we could hammer a service that was having
50 authentication problems.
60 # Setting this to 1 will simply bypass this entire thing.
61 our $IGNORE_CONNECTION_MANAGER = 0;
64 our @EXPORT_OK = qw(decayable connection_success connection_failure
65 request_connect schedule_request);
66 our %EXPORT_TAGS = (all
=> \
@EXPORT_OK);
68 use Thrasher
::Log
qw(log debug);
69 require Thrasher
::Component
;
71 # $SIMULTANEOUS_CONNECT_BOUND's initial value sets the maximum number
72 # of simultaneous, possibly asynchronous, connection attempts. It will
73 # be decremented each time a connection attempt starts and incremented
74 # when it finishes. Thus, at any given time while the
75 # ConnectionManager is managing it will be the current number of new
76 # connect attempts that could start at that time.
78 # This should usually be a low limit. Most protocols have several DNS
79 # lookups and TCP connections per IM login.
80 our $SIMULTANEOUS_CONNECT_BOUND = 2;
83 return new Thrasher
::ConnectionManager
::Decayable
(@_);
86 # So, the design goals:
87 # * If there's no reason to suspect network troubles, connect away.
88 # * If there's a total network disconnect, a bare minimum of
89 # connection attempts should be made, not a straight
90 # (number of logins) * (constant frequency) * time
91 # * It can't take too long to recover from a total network failure.
92 # Even for hundreds of people, it should recover relatively quickly.
93 # * If at all possible, this should adapt to rate limiting. (It may
94 # not be possible if the legacy service simply cuts of your
95 # service, rather than metering it.)
97 # Approach: We define a "decayable number", which exponentially
98 # decays over time, as seen in
99 # Thrasher::ConnectionManager::Decayable. As successful connections
100 # are made, the count of successful connections is incremented. As
101 # unsuccessful connections are made, the count of unsuccessful
102 # connections is incremented. The ratio of the two is used to
103 # determine connection time, bounded on both ends by a limit
104 # (if it's below a certain number, we simply connect immediately,
105 # if it's above a certain number we assume it needs to be bounded
106 # to something like one attempt every 30 seconds). We also, after
107 # a certain point, start queuing up connection requests. This way, the
108 # system reacts to the current apparent state of the connection
109 # to the legacy service without hammering on the legacy system,
110 # which may be considered hostile.
113 ### TUNABLE PARAMETERS
115 # This is the base decay rate applied to our two counters,
116 # applied directly to the failure and indirectly to the success
118 our $decay_rate = .65;
120 # Optimism is how much more likely we are to think that we can
121 # connect again. Values about that cause the system to "remember"
122 # success more strongly than failure.
125 my $log30 = CORE
::log(30);
129 our $success = decayable
(5, 1 - ((1-$decay_rate) / $optimism));
130 our $failure = decayable
(0, $decay_rate);
132 # This is incremented once per connection attempt and is used to
133 # limit the total rate of reconnect attempts.
134 # Please, Thrasher, don't hammer 'em.
135 our $hammering = decayable
(0, .05);
136 our $hammering_limit = 15;
138 # Things queued up because we can't try to connect yet.
141 my $next_connection_attempt_time;
143 # This takes the clousure to run and the time to run it. This
144 # allows you to either call out to the event loop, or, in testing,
145 # override this and capture the execution attempts to verify
146 # the time is correct.
147 our $scheduler = sub {
151 return $Thrasher::Component
::COMPONENT
->{'event_loop'}->schedule($closure,
155 # "If I make a request right now, what will the delay and
156 # forcibly_enqueue values be?"
157 sub schedule_request
{
158 # Check that we aren't hammering the remote server.
159 if ($hammering->value > $hammering_limit) {
160 log("Hammering: " . $hammering->value . " of $hammering_limit."
161 ." Delaying connection request.");
165 my $current_success = $success->value;
166 my $current_failure = $failure->value;
168 if ($current_failure < 0.0001) {
169 log("Never failed, immediately connecting.");
172 if ($current_success < 0.00001 && $current_failure > 0.00001) {
173 log("Can't remember having had failure or success, immediately connecting.");
176 if ($current_success < 0.0001) {
177 log("Can't remember having succeeded ($current_success), "
178 ."but I remember failure ($current_failure). Max delay.");
182 my $ratio = $current_failure / $current_success;
184 if ($ratio > $log30) {
185 log("Failure/success ratio: $ratio, ($current_failure / $current_success), capped to 30 second delay");
189 my $delay = exp($ratio) - 1;
190 log("Failure/success ratio: $ratio ($current_failure / $current_success), comes out to $delay for delay");
194 # This returns true if the connection will be immediate,
195 # otherwise it returns false.
196 sub request_connect
{
197 my $connection_closure = shift;
199 # Deal with this when we once again reschedule things.
200 if (defined($next_connection_attempt_time) &&
201 ($NOW || time) < $next_connection_attempt_time) {
202 push @queue, $connection_closure;
206 if ($SIMULTANEOUS_CONNECT_BOUND <= 0) {
207 log('Enough connect attempts in progress; delaying.');
208 push(@queue, $connection_closure);
209 schedule_connection_executor
(1);
213 my ($delay, $forcibly_enqueue) = @
{schedule_request
()};
215 # Catch this in the next scheduling run.
216 if ($forcibly_enqueue) {
217 push @queue, $connection_closure;
218 schedule_connection_executor
(1);
222 # Little to no delay called for, just go for it
223 # (leaning on hammering protection to prevent hammering)
225 service_request
($connection_closure);
229 # We're not in a hammering delay and we need to
230 # reschedule to delay beyond a second
231 push @queue, $connection_closure;
232 schedule_connection_executor
($delay);
235 push @queue, $connection_closure;
240 # schedule_connection_executor($timeout_in_seconds)
241 my $run_scheduled = 0;
242 sub schedule_connection_executor
{
245 if ($run_scheduled) {
246 # request_connect() called in between scheduled
247 # execution_runner() calls.
251 $next_connection_attempt_time = ($NOW || time) + $timeout;
253 my $execution_runner = sub {
254 log(scalar(@queue) . ' connection attempts queued.');
255 if (@queue && $SIMULTANEOUS_CONNECT_BOUND <= 0) {
256 debug
('Enough connect attempts in progress (not rescheduled).');
260 my $new_timeout = eval {
261 return connection_executor
();
267 elsif (! $new_timeout) {
268 debug
("Executor done.");
272 elsif ($new_timeout == $timeout) {
273 debug
("Next executor in $timeout seconds (not rescheduled).");
277 debug
("Next executor in $new_timeout seconds (rescheduled).");
279 schedule_connection_executor
($new_timeout);
284 $run_scheduled = $scheduler->($execution_runner,
288 # connection_executor() is what is run by the scheduler, which will
289 # propogate itself until it runs out of work.
291 # Returns the new $timeout_in_seconds until the desired next run, or
292 # false when no next run should be scheduled.
293 sub connection_executor
{
294 # If we're still in a hammering lock, delay another second
295 if ($hammering->value >= $hammering_limit) {
300 # We've finally emptied the queue. Resume normal usage.
304 elsif ($SIMULTANEOUS_CONNECT_BOUND <= 0) {
305 # Delay repeat until some connection attempts finish.
309 # Otherwise, it's time to take one thing off the queue and run it.
310 my $current_closure = shift @queue;
311 service_request
($current_closure);
313 # And now it's time to schedule the next request
314 my ($delay, $forcibly_enqueue) = @
{schedule_request
()};
315 if ($forcibly_enqueue) {
320 goto &connection_executor
;
323 # Otherwise, reschedule $delay seconds
327 sub service_request
{
332 if (!defined($closure)) {
333 confess
"Undefined closure passed to service_request.";
335 log("Servicing connection request.");
340 log("Failure in connection callback: $@");
343 $SIMULTANEOUS_CONNECT_BOUND--;
347 sub connection_success
{
348 $SIMULTANEOUS_CONNECT_BOUND++;
350 debug
('Connection success managed.');
354 sub connection_failure
{
355 $SIMULTANEOUS_CONNECT_BOUND++;
357 debug
('Connection failure managed.');
358 my $current_schedule = schedule_request
;
359 if ($current_schedule->[0] != 30) {
362 log("Declining to increment failure.");
366 package Thrasher
::ConnectionManager
::Decayable
;
370 # This implements a "decayable" number. It starts with the value
371 # you give it, and is multiplied by the decay rate raise to the
372 # power of the number of minutes since the last time it was queried.
373 # The fact that this tends to decay to zero is desired.
374 # Example: new Thrasher::ConnectionManager::Decayable(16, .5)
375 # will be 8 in one minute, 4 in the next, 2 in the next, and so on.
376 # The transition is smooth; it will be 11.3137... after 30 seconds.
378 use Time
::HiRes
qw(time tv_interval);
382 my $current_value = shift;
383 my $decay_rate = shift;
384 my $now = $Thrasher::ConnectionManager
::NOW
|| scalar(time);
386 my $decayable = { value
=> $current_value,
387 decay_rate
=> $decay_rate,
388 last_computed
=> $now };
389 bless $decayable, $class;
396 my $now = $Thrasher::ConnectionManager
::NOW
|| scalar(time);
398 my $since_last_computation = $now - $self->{last_computed
};
399 my $minutes = $since_last_computation / 60;
401 my $decay = $self->{decay_rate
} ** $minutes;
402 my $current_value = $self->{value
} * $decay;
404 $self->{value
} = $current_value;
405 $self->{last_computed
} = $now;
407 return $self->{value
};
414 my $value = $self->value;
415 $self->{value
} = $value + $inc;
417 return $self->{value
};