upload all-in-1 doc
[hband-tools.git] / lib / multithrottler / Throttler.pm
blobdc99f7059f24448de8278fe5f323671f43519805
1 ###########################################
2 package Throttler;
3 ###########################################
4 use strict;
5 use warnings;
6 use Log::Log4perl qw(:easy);
8 our $VERSION = "0.08";
9 our $DB_VERSION = "1.1";
11 ###########################################
12 sub new {
13 ###########################################
14 my($class, %options) = @_;
16 my $self = {
17 db_version => $DB_VERSION,
18 backend => "Memory",
19 backend_options => {},
20 reset => 0,
21 %options,
24 if($self->{db_file}) {
25 # legacy option, translate
26 $self->{backend_options} = {
27 db_file => $self->{db_file},
29 $self->{backend} = "YAML";
32 my $backend_class = "Throttler::Backend::$self->{backend}";
34 $self->{db} = $backend_class->new(
35 %{ $self->{backend_options} } );
37 $self->{changed} = 0;
39 bless $self, $class;
41 my $create = 1;
43 if( $self->{ db }->exists() ) {
44 DEBUG "Backend store exists";
45 $self->lock();
46 $self->{data} = $self->{ db }->load();
48 $create = 0;
50 if($self->{data}->{chain} and
51 ($self->{data}->{chain}->{max_items} != $options{max_items} or
52 $self->{data}->{chain}->{interval} != $options{interval})) {
53 $self->{changed} = 1;
54 $create = 1;
57 if($options{reset} or !$self->{ db }->backend_store_ok() ) {
58 $create = 1;
60 $self->unlock();
63 if($create) {
64 $self->{ db }->create( \%options ) or
65 LOGDIE "Creating backend store failed";
67 # create bucket chain
68 $self->create( {
69 max_items => $options{max_items},
70 interval => $options{interval},
71 });
73 $self->{db}->save( $self->{data} );
76 return $self;
79 ###########################################
80 sub create {
81 ###########################################
82 my($self, $options) = @_;
84 if( $self->{changed} ) {
85 ERROR "Bucket chain parameters have changed ",
86 "(max_items: $self->{data}->{chain}->{max_items}/",
87 "$options->{max_items} ",
88 "(interval: $self->{data}->{chain}->{interval}/",
89 "$options->{interval})", ", throwing old chain away";
90 $self->{changed} = 0;
93 DEBUG "Creating bucket chain max_items=$options->{max_items} ",
94 "interval=$options->{interval}";
96 $self->{data}->{chain} = Throttler::BucketChain->new(
97 max_items => $options->{max_items},
98 interval => $options->{interval},
102 ###########################################
103 sub lock {
104 ###########################################
105 my($self) = @_;
106 DEBUG "Trying to get lock ($$)";
107 $self->{db}->lock();
108 DEBUG "Lock on ($$)";
111 ###########################################
112 sub unlock {
113 ###########################################
114 my($self) = @_;
115 DEBUG "Lock off";
116 $self->{db}->unlock();
119 ###########################################
120 sub current_value {
121 ###########################################
122 my($self, %options) = @_;
124 $self->{data} = $self->{db}->load();
125 my $ret = $self->{data}->{chain}->current_value(%options);
127 return $ret;
130 ###########################################
131 sub try_push {
132 ###########################################
133 my($self, %options) = @_;
135 if(exists $options{key}) {
136 DEBUG "Pushing key $options{key}";
137 } else {
138 DEBUG "Pushing keyless item";
141 $self->lock();
143 $self->{data} = $self->{db}->load();
144 my $ret = $self->{data}->{chain}->try_push(%options);
145 $self->{db}->save( $self->{data} );
147 $self->unlock();
148 return $ret;
151 ###########################################
152 sub reset_key {
153 ###########################################
154 my($self, %options) = @_;
156 if(exists $options{key}) {
157 DEBUG "Resetting count for $options{key}";
158 } else {
159 DEBUG "Resetting count for keyless item";
162 $self->lock();
164 $self->{data} = $self->{db}->load();
165 my $ret = $self->{data}->{chain}->reset_key(%options);
166 $self->{db}->save( $self->{data} );
168 $self->unlock();
169 return $ret;
172 ###########################################
173 sub buckets_dump {
174 ###########################################
175 my($self) = @_;
176 $self->lock();
177 $self->{data} = $self->{db}->load();
178 my $ret = $self->{data}->{chain}->as_string();
179 $self->unlock();
180 return $ret;
183 ###########################################
184 sub buckets_rotate {
185 ###########################################
186 my($self) = @_;
187 my $ret = $self->{data}->{chain}->rotate();
188 return $ret;
191 package Throttler::Range;
193 ###########################################
194 sub new {
195 ###########################################
196 my($class, $start, $stop) = @_;
198 my $self = {
199 start => $start,
200 stop => $stop,
202 bless $self, $class;
205 ###########################################
206 sub min {
207 ###########################################
208 my($self) = @_;
209 return $self->{start};
212 ###########################################
213 sub max {
214 ###########################################
215 my($self) = @_;
216 return $self->{stop};
219 ###########################################
220 sub member {
221 ###########################################
222 my($self, $time) = @_;
224 return ($time >= $self->{start} and $time <= $self->{stop});
227 ###########################################
228 package Throttler::BucketChain;
229 ###########################################
230 use Log::Log4perl qw(:easy);
232 our $DEFAULT_KEY = "_default";
234 ###########################################
235 sub new {
236 ###########################################
237 my($class, %options) = @_;
239 my $self = {
240 max_items => undef,
241 interval => undef,
242 %options,
245 if(!$self->{max_items} or
246 !$self->{interval}) {
247 LOGDIE "Both max_items and interval need to be defined";
250 if(!$self->{nof_buckets}) {
251 $self->{nof_buckets} = 10;
254 if($self->{nof_buckets} > $self->{interval}) {
255 $self->{nof_buckets} = $self->{interval};
258 bless $self, $class;
260 $self->reset();
262 return $self;
265 ###########################################
266 sub reset {
267 ###########################################
268 my($self) = @_;
270 $self->{buckets} = [];
272 my $bucket_time_span = int ($self->{interval} /
273 $self->{nof_buckets});
275 $self->{bucket_time_span} = $bucket_time_span;
277 my $time_start = time() -
278 ($self->{nof_buckets}-1) * $bucket_time_span;
280 for(1..$self->{nof_buckets}) {
281 my $time_end = $time_start + $bucket_time_span - 1;
282 DEBUG "Creating bucket ", hms($time_start), " - ", hms($time_end);
283 push @{$self->{buckets}}, {
284 time => Throttler::Range->new($time_start, $time_end),
285 count => {},
287 $time_start = $time_end + 1;
290 $self->{head_bucket_idx} = 0;
291 $self->{tail_bucket_idx} = $#{$self->{buckets}};
294 ###########################################
295 sub first_bucket {
296 ###########################################
297 my($self) = @_;
299 $self->{current_idx} = $self->{head_bucket_idx};
300 return $self->{buckets}->[ $self->{current_idx} ];
303 ###########################################
304 sub last_bucket {
305 ###########################################
306 my($self) = @_;
308 $self->{current_idx} = $self->{tail_bucket_idx};
309 return $self->{buckets}->[ $self->{current_idx} ];
312 ###########################################
313 sub next_bucket {
314 ###########################################
315 my($self) = @_;
317 return undef if $self->{current_idx} == $self->{tail_bucket_idx};
319 $self->{current_idx}++;
320 $self->{current_idx} = 0 if $self->{current_idx} > $#{$self->{buckets}};
322 return $self->{buckets}->[ $self->{current_idx} ];
325 ###########################################
326 sub as_string {
327 ###########################################
328 my($self) = @_;
330 my @t;
331 push @t, ["#", "idx", ("Time: " . hms(time)), "Key", "Count"];
333 my $count = 1;
335 for(my $b = $self->first_bucket(); $b; $b = $self->next_bucket()) {
336 my $span = hms($b->{time}->min) . " - " . hms($b->{time}->max);
337 my $idx = $self->{current_idx};
338 my $count_string = $count;
340 if(! scalar keys %{$b->{count}}) {
341 push @t, [$count_string, $idx, $span, "", ""];
344 foreach my $key (sort keys %{$b->{count}}) {
345 push @t, [$count_string, $idx, $span, $key, $b->{count}->{$key}];
346 $span = "";
347 $count_string = "";
348 $idx = "";
351 $count++;
353 return join('', map {"$_\n"} map {join "\t", @$_} @t);
356 ###########################################
357 sub hms {
358 ###########################################
359 my($time) = @_;
361 my ($sec,$min,$hour) = localtime($time);
362 return sprintf "%02d:%02d:%02d",
363 $hour, $min, $sec;
366 ###########################################
367 sub bucket_add {
368 ###########################################
369 my($self, $time) = @_;
371 # ... and append a new one at the end
372 my $time_start = $self->{buckets}->
373 [$self->{tail_bucket_idx}]->{time}->max + 1;
374 my $time_end = $time_start + $self->{bucket_time_span} - 1;
376 DEBUG "Adding bucket: ", hms($time_start), " - ", hms($time_end);
378 $self->{tail_bucket_idx}++;
379 $self->{tail_bucket_idx} = 0 if $self->{tail_bucket_idx} >
380 $#{$self->{buckets}};
381 $self->{head_bucket_idx}++;
382 $self->{head_bucket_idx} = 0 if $self->{head_bucket_idx} >
383 $#{$self->{buckets}};
385 $self->{buckets}->[ $self->{tail_bucket_idx} ] = {
386 time => Throttler::Range->new($time_start, $time_end),
387 count => {},
391 ###########################################
392 sub rotate {
393 ###########################################
394 my($self, $time) = @_;
395 $time = time() unless defined $time;
397 # If the last bucket handles a time interval that doesn't cover
398 # $time, we need to rotate the bucket brigade. The first bucket
399 # will be cleared and re-used as the new last bucket of the chain.
401 DEBUG "Rotating buckets time=", hms($time), " ",
402 "head=", $self->{head_bucket_idx};
404 if($self->last_bucket->{time}->{stop} >= $time) {
405 # $time is still covered in the bucket brigade, we're golden
406 DEBUG "Rotation not necessary (",
407 hms($self->last_bucket->{time}->{stop}),
408 " - ", hms($time), ")";
409 return 1;
412 # If we're too far off, just dump all buckets and re-init
413 if($self->{buckets}->[ $self->{tail_bucket_idx} ]->{time}->max <
414 $time - $self->{interval}) {
415 DEBUG "Too far off, resetting (", hms($time), " >> ",
416 hms($self->{buckets}->[ $self->{head_bucket_idx} ]->{time}->min),
417 ")";
418 $self->reset();
419 return 1;
422 while($self->last_bucket()->{time}->min <= $time) {
423 $self->bucket_add();
426 DEBUG "After rotation: ",
427 hms($self->{buckets}->[ $self->{head_bucket_idx} ]->{time}->min),
428 " - ",
429 hms($self->{buckets}->[ $self->{tail_bucket_idx} ]->{time}->max),
430 " (covers ", hms($time), ")";
433 ###########################################
434 sub bucket_find {
435 ###########################################
436 my($self, $time) = @_;
438 DEBUG "Searching bucket for time=", hms($time);
440 # Search in the newest bucket first, chances are it's there
441 my $last_bucket = $self->last_bucket();
442 if($last_bucket->{time}->member($time)) {
443 DEBUG hms($time), " covered by last bucket";
444 return $last_bucket;
447 for(my $b = $self->first_bucket(); $b; $b = $self->next_bucket()) {
448 if($b->{time}->member($time)) {
449 DEBUG "Found bucket ", hms($b->{time}->min),
450 " - ", hms($b->{time}->max);
451 return $b;
455 DEBUG "No bucket found for time=", hms($time);
456 return undef;
459 ###########################################
460 sub current_value {
461 ###########################################
462 my($self, %options) = @_;
464 my $key = $DEFAULT_KEY;
465 $key = $options{key} if defined $options{key};
467 DEBUG "Getting current value for key=", $key;
469 my $val = 0;
471 for(0..$#{$self->{buckets}}) {
472 $val += $self->{buckets}->[$_]->{count}->{$key} if
473 exists $self->{buckets}->[$_]->{count}->{$key};
476 return $val;
479 ###########################################
480 sub try_push {
481 ###########################################
482 my($self, %options) = @_;
484 my $key = $DEFAULT_KEY;
485 $key = $options{key} if defined $options{key};
487 my $time = time();
488 $time = $options{time} if defined $options{time};
490 my $count = 1;
491 $count = $options{count} if defined $options{count};
493 my $force = 0;
494 $force = $options{force} if defined $options{force};
496 DEBUG "Trying to push $key ", hms($time), " $count";
498 my $b = $self->bucket_find($time);
500 if(!$b) {
501 $self->rotate($time);
502 $b = $self->bucket_find($time);
505 # Determine the total count for this key
506 my $val = $self->current_value(%options);
508 if($val >= $self->{max_items}) {
509 if ($force) {
510 DEBUG "Increasing (force) counter $key by $count ",
511 "($val|$self->{max_items})";
512 $b->{count}->{$key} += $count;
513 } else {
514 DEBUG "Not increasing counter $key by $count (already at max)";
516 return 0;
517 } else {
518 DEBUG "Increasing counter $key by $count ",
519 "($val|$self->{max_items})";
520 $b->{count}->{$key} += $count;
521 return 1;
524 LOGDIE "Time $time is outside of bucket range\n", $self->as_string;
525 return undef;
528 ###########################################
529 sub reset_key {
530 ###########################################
531 my ($self, %options) = @_;
533 my $key = $DEFAULT_KEY;
534 $key = $options{key} if defined $options{key};
536 DEBUG "Resetting $key";
538 my $total = 0;
539 for(0..$#{$self->{buckets}}) {
540 if (exists $self->{buckets}->[$_]->{count}->{$key}) {
541 $total += $self->{buckets}->[$_]->{count}->{$key};
542 $self->{buckets}->[$_]->{count}->{$key} = 0;
546 return $total;
549 ###########################################
550 package Throttler::Backend::Base;
551 ###########################################
553 ###########################################
554 sub new {
555 ###########################################
556 my($class, %options) = @_;
558 my $self = {
559 %options,
562 bless $self, $class;
563 $self->init();
564 return $self;
567 sub exists { 0 }
568 sub create { 1 }
569 #sub save { }
570 #sub load { }
571 sub init { }
572 sub lock { }
573 sub unlock { }
574 sub backend_store_ok { 1 }
576 ###########################################
577 package Throttler::Backend::Memory;
578 ###########################################
579 use base 'Throttler::Backend::Base';
581 ###########################################
582 sub save {
583 ###########################################
584 my($self, $data) = @_;
585 $self->{data} = $data;
588 ###########################################
589 sub load {
590 ###########################################
591 my($self) = @_;
592 return $self->{data};
595 ###########################################
596 package Throttler::Backend::YAML;
597 ###########################################
598 use base 'Throttler::Backend::Base';
599 use Log::Log4perl qw(:easy);
600 use Fcntl qw(:flock);
602 ###########################################
603 sub init {
604 ###########################################
605 my($self) = @_;
607 require YAML;
608 $YAML::LoadBlessed = 1;
611 ###########################################
612 sub backend_store_ok {
613 ###########################################
614 my($self) = @_;
616 # Legacy instances used DBM::Deep, but those data stores will be
617 # replaced by YAML backends. If we reuse a backend data store, make
618 # sure it's a YAML file and not a DBM::Deep blob.
619 if(! -f $self->{db_file} ) {
620 return 1;
623 eval {
624 $self->load();
627 if($@) {
628 ERROR "$self->{db_file} apparently isn't a YAML file, we'll ",
629 "have to dump it and rebuild the bucket chain in YAML";
630 return 0;
633 return 1;
636 ###########################################
637 sub exists {
638 ###########################################
639 my($self) = @_;
641 return -f $self->{db_file};
644 ###########################################
645 sub save {
646 ###########################################
647 my($self, $data) = @_;
649 DEBUG "Saving YAML file $self->{db_file}";
650 YAML::DumpFile( $self->{db_file}, $data );
653 ###########################################
654 sub load {
655 ###########################################
656 my($self) = @_;
658 DEBUG "Loading YAML file $self->{db_file}";
659 return YAML::LoadFile( $self->{db_file} );
662 ###########################################
663 sub lock {
664 ###########################################
665 my($self) = @_;
667 open $self->{fh}, "+<", $self->{db_file} or
668 LOGDIE "Can't open $self->{db_file} for locking: $!";
669 flock $self->{fh}, LOCK_EX;
672 ###########################################
673 sub unlock {
674 ###########################################
675 my($self) = @_;
676 flock $self->{fh}, LOCK_UN;
681 __END__
683 =head1 NAME
685 Throttler - Limit data throughput
687 =head1 SYNOPSIS
689 use Throttler;
691 ### Simple: Limit throughput to 100 per hour
693 my $throttler = Throttler->new(
694 max_items => 100,
695 interval => 3600,
698 if($throttler->try_push()) {
699 print "Item can be pushed\n";
700 } else {
701 print "Item needs to wait\n";
704 ### Advanced: Use a persistent data store and throttle by key:
706 my $throttler = Throttler->new(
707 max_items => 100,
708 interval => 3600,
709 backend => "YAML",
710 backend_options => {
711 db_file => "/tmp/mythrottle.yml",
715 if($throttler->try_push(key => "somekey")) {
716 print "Item can be pushed\n";
719 =head1 DESCRIPTION
721 C<Throttler> helps solving throttling tasks like "allow a single
722 IP only to send 100 emails per hour". It provides an optionally persistent
723 data store to keep track of what happened before and offers a simple
724 yes/no interface to an application, which can then focus on performing
725 the actual task (like sending email) or suppressing/postponing it.
727 When defining a throttler, you can tell it to keep its
728 internal data structures in memory:
730 # in-memory throttler
731 my $throttler = Throttler->new(
732 max_items => 100,
733 interval => 3600,
736 However, if the data structures need to be maintained across different
737 invocations of a script or several instances of scripts using the
738 throttler, using a persistent database is required:
740 # persistent throttler
741 my $throttler = Throttler->new(
742 max_items => 100,
743 interval => 3600,
744 backend => "YAML",
745 backend_options => {
746 db_file => "/tmp/mythrottle.yml",
750 The call above will reuse an existing backend store, given that the
751 C<max_items> and C<interval> settings are compatible and leave the
752 stored counter bucket chain contained therein intact. To specify that
753 the backend store should be rebuilt and all counters be reset, use
754 the C<reset =E<gt> 1> option of the Throttler object constructor.
756 In the simplest case, C<Throttler> just keeps track of single
757 events. It allows a certain number of events per time frame to succeed
758 and it recommends to block the rest:
760 if($throttler->try_push()) {
761 print "Item can be pushed\n";
762 } else {
763 print "Item needs to wait\n";
766 the C<force =E<gt> 1> option of the try_push() method will cause the
767 counter to be incremented regardless of threshold for use in scenarios
768 where max_items is a threshold rather than throttle condition:
770 if($throttler->try_push('force' => 1)) {
771 print "Item can be pushed\n";
772 } else {
773 print "Counter incremented, Item needs to wait\n";
776 When throttling different categories of items, like attempts to send
777 emails by IP address of the sender, a key can be used:
779 if($throttler->try_push( key => "192.168.0.1" )) {
780 print "Item can be pushed\n";
781 } else {
782 print "Item needs to wait\n";
785 In this case, each key will be tracked separately, even if the quota
786 for one key is maxed out, other keys will still succeed until their
787 quota is reached.
789 =head2 HOW IT WORKS
791 To keep track of what happened within the specified time frame,
792 C<Throttler> maintains a round-robin data store, either in
793 memory or on disk. It splits up the controlled time interval into
794 buckets and maintains counters in each bucket:
796 1 hour ago Now
797 +-----------------------------+
798 | 3 | 7 | 0 | 0 | 4 | 1 |
799 +-----------------------------+
800 4:10 4:20 4:30 4:40 4:50 5:00
802 To decide whether to allow a new event to happen or not, C<Throttler>
803 adds up all counters (3+7+4+1 = 15) and then compares the result
804 to the defined threshold. If the event is allowed, the corresponding
805 counter is increased (last column):
807 1 hour ago Now
808 +-----------------------------+
809 | 3 | 7 | 0 | 0 | 4 | 2 |
810 +-----------------------------+
811 4:10 4:20 4:30 4:40 4:50 5:00
813 While time progresses, old buckets are expired and then reused
814 for new data. 10 minutes later, the bucket layout would look like this:
816 1 hour ago Now
817 +-----------------------------+
818 | 7 | 0 | 0 | 4 | 2 | 0 |
819 +-----------------------------+
820 4:20 4:30 4:40 4:50 5:00 5:10
822 =head2 LOCKING
824 When used with a persistent data store, C<Throttler> protects
825 competing applications from clobbering the database by using the locking
826 mechanism offered with C<DBM::Deep>. Both the C<try_push()> and the
827 C<buckets_dump> function already perform locking behind the scenes.
829 If you see a need to lock the data store yourself, i.e. when trying to
830 push counters for several keys simultaneously, use
832 $throttler->lock();
836 $throttler->unlock();
838 to protect the data store against competing applications.
840 =head2 RESETTING
842 Sometimes, you may need to reset a specific counter, e.g. if an IP
843 address has been unintentionally throttled:
845 my $count = $throttler->reset_key(key => "192.168.0.1");
847 The C<reset_key> method returns the total number of attempts so far.
849 =head2 ADVANCED USAGE
851 By default, C<Throttler> will decide on the number of buckets by
852 dividing the time interval by 10. It won't handle sub-seconds, though,
853 so if the time interval is less then 10 seconds, the number of buckets
854 will be equal to the number of seconds in the time interval.
856 If the default bucket allocation is unsatisfactory, you can specify
857 it yourself:
859 my $throttler = Throttler->new(
860 max_items => 100,
861 interval => 3600,
862 nof_buckets => 42,
865 Mainly for debugging and testing purposes, you can specify a different
866 time than I<now> when trying to push an item:
868 if($throttler->try_push(
869 key => "somekey",
870 time => time() - 600 )) {
871 print "Item can be pushed in the past\n";
874 Also for debugging and testing purposes, you can obtain the current
875 value of an item:
877 my $val = $throttler->current_value(key => "somekey");
879 Speaking of debugging, there's a utility method C<buckets_dump> which
880 returns a string containing lines with tab-separated cells in them
881 representing what's in each bucket.
883 So the code
885 use Throttler;
887 my $throttler = Throttler->new(
888 interval => 3600,
889 max_items => 10,
892 $throttler->try_push(key => "foobar");
893 $throttler->try_push(key => "foobar");
894 $throttler->try_push(key => "barfoo");
895 print $throttler->buckets_dump();
897 will print out something like
899 # idx Time: 14:43:00 Key Count
900 1 0 13:49:00 - 13:54:59
901 2 1 13:55:00 - 14:00:59
902 3 2 14:01:00 - 14:06:59
903 4 3 14:07:00 - 14:12:59
904 5 4 14:13:00 - 14:18:59
905 6 5 14:19:00 - 14:24:59
906 7 6 14:25:00 - 14:30:59
907 8 7 14:31:00 - 14:36:59
908 9 8 14:37:00 - 14:42:59
909 10 9 14:43:00 - 14:48:59 barfoo 1
910 foobar 2
912 and allow for further investigation.
914 =head1 LICENSE
916 Copyright 2007 by Mike Schilli, all rights reserved.
917 This program is free software, you can redistribute it and/or
918 modify it under the same terms as Perl itself.
920 =head1 AUTHOR
922 2007, Mike Schilli <cpan@perlmeister.com>