1 ###########################################
3 ###########################################
6 use Log
::Log4perl
qw(:easy);
9 our $DB_VERSION = "1.1";
11 ###########################################
13 ###########################################
14 my($class, %options) = @_;
17 db_version
=> $DB_VERSION,
19 backend_options
=> {},
24 if($self->{db_file
}) {
25 # legacy option, translate
26 $self->{backend_options
} = {
27 db_file
=> $self->{db_file
},
29 $self->{backend
} = "YAML";
32 my $backend_class = "Throttler::Backend::$self->{backend}";
34 $self->{db
} = $backend_class->new(
35 %{ $self->{backend_options
} } );
43 if( $self->{ db
}->exists() ) {
44 DEBUG
"Backend store exists";
46 $self->{data
} = $self->{ db
}->load();
50 if($self->{data
}->{chain
} and
51 ($self->{data
}->{chain
}->{max_items
} != $options{max_items
} or
52 $self->{data
}->{chain
}->{interval
} != $options{interval
})) {
57 if($options{reset} or !$self->{ db
}->backend_store_ok() ) {
64 $self->{ db
}->create( \
%options ) or
65 LOGDIE
"Creating backend store failed";
69 max_items
=> $options{max_items
},
70 interval
=> $options{interval
},
73 $self->{db
}->save( $self->{data
} );
79 ###########################################
81 ###########################################
82 my($self, $options) = @_;
84 if( $self->{changed
} ) {
85 ERROR
"Bucket chain parameters have changed ",
86 "(max_items: $self->{data}->{chain}->{max_items}/",
87 "$options->{max_items} ",
88 "(interval: $self->{data}->{chain}->{interval}/",
89 "$options->{interval})", ", throwing old chain away";
93 DEBUG
"Creating bucket chain max_items=$options->{max_items} ",
94 "interval=$options->{interval}";
96 $self->{data
}->{chain
} = Throttler
::BucketChain
->new(
97 max_items
=> $options->{max_items
},
98 interval
=> $options->{interval
},
102 ###########################################
104 ###########################################
106 DEBUG
"Trying to get lock ($$)";
108 DEBUG
"Lock on ($$)";
111 ###########################################
113 ###########################################
116 $self->{db
}->unlock();
119 ###########################################
121 ###########################################
122 my($self, %options) = @_;
124 $self->{data
} = $self->{db
}->load();
125 my $ret = $self->{data
}->{chain
}->current_value(%options);
130 ###########################################
132 ###########################################
133 my($self, %options) = @_;
135 if(exists $options{key
}) {
136 DEBUG
"Pushing key $options{key}";
138 DEBUG
"Pushing keyless item";
143 $self->{data
} = $self->{db
}->load();
144 my $ret = $self->{data
}->{chain
}->try_push(%options);
145 $self->{db
}->save( $self->{data
} );
151 ###########################################
153 ###########################################
154 my($self, %options) = @_;
156 if(exists $options{key
}) {
157 DEBUG
"Resetting count for $options{key}";
159 DEBUG
"Resetting count for keyless item";
164 $self->{data
} = $self->{db
}->load();
165 my $ret = $self->{data
}->{chain
}->reset_key(%options);
166 $self->{db
}->save( $self->{data
} );
172 ###########################################
174 ###########################################
177 $self->{data
} = $self->{db
}->load();
178 my $ret = $self->{data
}->{chain
}->as_string();
183 ###########################################
185 ###########################################
187 my $ret = $self->{data
}->{chain
}->rotate();
191 package Throttler
::Range
;
193 ###########################################
195 ###########################################
196 my($class, $start, $stop) = @_;
205 ###########################################
207 ###########################################
209 return $self->{start
};
212 ###########################################
214 ###########################################
216 return $self->{stop
};
219 ###########################################
221 ###########################################
222 my($self, $time) = @_;
224 return ($time >= $self->{start
} and $time <= $self->{stop
});
227 ###########################################
228 package Throttler
::BucketChain
;
229 ###########################################
230 use Log
::Log4perl
qw(:easy);
232 our $DEFAULT_KEY = "_default";
234 ###########################################
236 ###########################################
237 my($class, %options) = @_;
245 if(!$self->{max_items
} or
246 !$self->{interval
}) {
247 LOGDIE
"Both max_items and interval need to be defined";
250 if(!$self->{nof_buckets
}) {
251 $self->{nof_buckets
} = 10;
254 if($self->{nof_buckets
} > $self->{interval
}) {
255 $self->{nof_buckets
} = $self->{interval
};
265 ###########################################
267 ###########################################
270 $self->{buckets
} = [];
272 my $bucket_time_span = int ($self->{interval
} /
273 $self->{nof_buckets
});
275 $self->{bucket_time_span
} = $bucket_time_span;
277 my $time_start = time() -
278 ($self->{nof_buckets
}-1) * $bucket_time_span;
280 for(1..$self->{nof_buckets
}) {
281 my $time_end = $time_start + $bucket_time_span - 1;
282 DEBUG
"Creating bucket ", hms
($time_start), " - ", hms
($time_end);
283 push @
{$self->{buckets
}}, {
284 time => Throttler
::Range
->new($time_start, $time_end),
287 $time_start = $time_end + 1;
290 $self->{head_bucket_idx
} = 0;
291 $self->{tail_bucket_idx
} = $#{$self->{buckets}};
294 ###########################################
296 ###########################################
299 $self->{current_idx
} = $self->{head_bucket_idx
};
300 return $self->{buckets
}->[ $self->{current_idx
} ];
303 ###########################################
305 ###########################################
308 $self->{current_idx
} = $self->{tail_bucket_idx
};
309 return $self->{buckets
}->[ $self->{current_idx
} ];
312 ###########################################
314 ###########################################
317 return undef if $self->{current_idx
} == $self->{tail_bucket_idx
};
319 $self->{current_idx
}++;
320 $self->{current_idx
} = 0 if $self->{current_idx
} > $#{$self->{buckets}};
322 return $self->{buckets
}->[ $self->{current_idx
} ];
325 ###########################################
327 ###########################################
331 push @t, ["#", "idx", ("Time: " . hms
(time)), "Key", "Count"];
335 for(my $b = $self->first_bucket(); $b; $b = $self->next_bucket()) {
336 my $span = hms
($b->{time}->min) . " - " . hms
($b->{time}->max);
337 my $idx = $self->{current_idx
};
338 my $count_string = $count;
340 if(! scalar keys %{$b->{count
}}) {
341 push @t, [$count_string, $idx, $span, "", ""];
344 foreach my $key (sort keys %{$b->{count
}}) {
345 push @t, [$count_string, $idx, $span, $key, $b->{count
}->{$key}];
353 return join('', map {"$_\n"} map {join "\t", @
$_} @t);
356 ###########################################
358 ###########################################
361 my ($sec,$min,$hour) = localtime($time);
362 return sprintf "%02d:%02d:%02d",
366 ###########################################
368 ###########################################
369 my($self, $time) = @_;
371 # ... and append a new one at the end
372 my $time_start = $self->{buckets
}->
373 [$self->{tail_bucket_idx
}]->{time}->max + 1;
374 my $time_end = $time_start + $self->{bucket_time_span
} - 1;
376 DEBUG
"Adding bucket: ", hms
($time_start), " - ", hms
($time_end);
378 $self->{tail_bucket_idx
}++;
379 $self->{tail_bucket_idx
} = 0 if $self->{tail_bucket_idx
} >
380 $#{$self->{buckets}};
381 $self->{head_bucket_idx
}++;
382 $self->{head_bucket_idx
} = 0 if $self->{head_bucket_idx
} >
383 $#{$self->{buckets}};
385 $self->{buckets
}->[ $self->{tail_bucket_idx
} ] = {
386 time => Throttler
::Range
->new($time_start, $time_end),
391 ###########################################
393 ###########################################
394 my($self, $time) = @_;
395 $time = time() unless defined $time;
397 # If the last bucket handles a time interval that doesn't cover
398 # $time, we need to rotate the bucket brigade. The first bucket
399 # will be cleared and re-used as the new last bucket of the chain.
401 DEBUG
"Rotating buckets time=", hms
($time), " ",
402 "head=", $self->{head_bucket_idx
};
404 if($self->last_bucket->{time}->{stop
} >= $time) {
405 # $time is still covered in the bucket brigade, we're golden
406 DEBUG
"Rotation not necessary (",
407 hms
($self->last_bucket->{time}->{stop
}),
408 " - ", hms
($time), ")";
412 # If we're too far off, just dump all buckets and re-init
413 if($self->{buckets
}->[ $self->{tail_bucket_idx
} ]->{time}->max <
414 $time - $self->{interval
}) {
415 DEBUG
"Too far off, resetting (", hms
($time), " >> ",
416 hms
($self->{buckets
}->[ $self->{head_bucket_idx
} ]->{time}->min),
422 while($self->last_bucket()->{time}->min <= $time) {
426 DEBUG
"After rotation: ",
427 hms
($self->{buckets
}->[ $self->{head_bucket_idx
} ]->{time}->min),
429 hms
($self->{buckets
}->[ $self->{tail_bucket_idx
} ]->{time}->max),
430 " (covers ", hms
($time), ")";
433 ###########################################
435 ###########################################
436 my($self, $time) = @_;
438 DEBUG
"Searching bucket for time=", hms
($time);
440 # Search in the newest bucket first, chances are it's there
441 my $last_bucket = $self->last_bucket();
442 if($last_bucket->{time}->member($time)) {
443 DEBUG hms
($time), " covered by last bucket";
447 for(my $b = $self->first_bucket(); $b; $b = $self->next_bucket()) {
448 if($b->{time}->member($time)) {
449 DEBUG
"Found bucket ", hms
($b->{time}->min),
450 " - ", hms
($b->{time}->max);
455 DEBUG
"No bucket found for time=", hms
($time);
459 ###########################################
461 ###########################################
462 my($self, %options) = @_;
464 my $key = $DEFAULT_KEY;
465 $key = $options{key
} if defined $options{key
};
467 DEBUG
"Getting current value for key=", $key;
471 for(0..$#{$self->{buckets}}) {
472 $val += $self->{buckets
}->[$_]->{count
}->{$key} if
473 exists $self->{buckets
}->[$_]->{count
}->{$key};
479 ###########################################
481 ###########################################
482 my($self, %options) = @_;
484 my $key = $DEFAULT_KEY;
485 $key = $options{key
} if defined $options{key
};
488 $time = $options{time} if defined $options{time};
491 $count = $options{count
} if defined $options{count
};
494 $force = $options{force
} if defined $options{force
};
496 DEBUG
"Trying to push $key ", hms
($time), " $count";
498 my $b = $self->bucket_find($time);
501 $self->rotate($time);
502 $b = $self->bucket_find($time);
505 # Determine the total count for this key
506 my $val = $self->current_value(%options);
508 if($val >= $self->{max_items
}) {
510 DEBUG
"Increasing (force) counter $key by $count ",
511 "($val|$self->{max_items})";
512 $b->{count
}->{$key} += $count;
514 DEBUG
"Not increasing counter $key by $count (already at max)";
518 DEBUG
"Increasing counter $key by $count ",
519 "($val|$self->{max_items})";
520 $b->{count
}->{$key} += $count;
524 LOGDIE
"Time $time is outside of bucket range\n", $self->as_string;
528 ###########################################
530 ###########################################
531 my ($self, %options) = @_;
533 my $key = $DEFAULT_KEY;
534 $key = $options{key
} if defined $options{key
};
536 DEBUG
"Resetting $key";
539 for(0..$#{$self->{buckets}}) {
540 if (exists $self->{buckets
}->[$_]->{count
}->{$key}) {
541 $total += $self->{buckets
}->[$_]->{count
}->{$key};
542 $self->{buckets
}->[$_]->{count
}->{$key} = 0;
549 ###########################################
550 package Throttler
::Backend
::Base
;
551 ###########################################
553 ###########################################
555 ###########################################
556 my($class, %options) = @_;
574 sub backend_store_ok
{ 1 }
576 ###########################################
577 package Throttler
::Backend
::Memory
;
578 ###########################################
579 use base
'Throttler::Backend::Base';
581 ###########################################
583 ###########################################
584 my($self, $data) = @_;
585 $self->{data
} = $data;
588 ###########################################
590 ###########################################
592 return $self->{data
};
595 ###########################################
596 package Throttler
::Backend
::YAML
;
597 ###########################################
598 use base
'Throttler::Backend::Base';
599 use Log
::Log4perl
qw(:easy);
600 use Fcntl
qw(:flock);
602 ###########################################
604 ###########################################
608 $YAML::LoadBlessed
= 1;
611 ###########################################
612 sub backend_store_ok
{
613 ###########################################
616 # Legacy instances used DBM::Deep, but those data stores will be
617 # replaced by YAML backends. If we reuse a backend data store, make
618 # sure it's a YAML file and not a DBM::Deep blob.
619 if(! -f
$self->{db_file
} ) {
628 ERROR
"$self->{db_file} apparently isn't a YAML file, we'll ",
629 "have to dump it and rebuild the bucket chain in YAML";
636 ###########################################
638 ###########################################
641 return -f
$self->{db_file
};
644 ###########################################
646 ###########################################
647 my($self, $data) = @_;
649 DEBUG
"Saving YAML file $self->{db_file}";
650 YAML
::DumpFile
( $self->{db_file
}, $data );
653 ###########################################
655 ###########################################
658 DEBUG
"Loading YAML file $self->{db_file}";
659 return YAML
::LoadFile
( $self->{db_file
} );
662 ###########################################
664 ###########################################
667 open $self->{fh
}, "+<", $self->{db_file
} or
668 LOGDIE
"Can't open $self->{db_file} for locking: $!";
669 flock $self->{fh
}, LOCK_EX
;
672 ###########################################
674 ###########################################
676 flock $self->{fh
}, LOCK_UN
;
685 Throttler - Limit data throughput
691 ### Simple: Limit throughput to 100 per hour
693 my $throttler = Throttler->new(
698 if($throttler->try_push()) {
699 print "Item can be pushed\n";
701 print "Item needs to wait\n";
704 ### Advanced: Use a persistent data store and throttle by key:
706 my $throttler = Throttler->new(
711 db_file => "/tmp/mythrottle.yml",
715 if($throttler->try_push(key => "somekey")) {
716 print "Item can be pushed\n";
721 C<Throttler> helps solving throttling tasks like "allow a single
722 IP only to send 100 emails per hour". It provides an optionally persistent
723 data store to keep track of what happened before and offers a simple
724 yes/no interface to an application, which can then focus on performing
725 the actual task (like sending email) or suppressing/postponing it.
727 When defining a throttler, you can tell it to keep its
728 internal data structures in memory:
730 # in-memory throttler
731 my $throttler = Throttler->new(
736 However, if the data structures need to be maintained across different
737 invocations of a script or several instances of scripts using the
738 throttler, using a persistent database is required:
740 # persistent throttler
741 my $throttler = Throttler->new(
746 db_file => "/tmp/mythrottle.yml",
750 The call above will reuse an existing backend store, given that the
751 C<max_items> and C<interval> settings are compatible and leave the
752 stored counter bucket chain contained therein intact. To specify that
753 the backend store should be rebuilt and all counters be reset, use
754 the C<reset =E<gt> 1> option of the Throttler object constructor.
756 In the simplest case, C<Throttler> just keeps track of single
757 events. It allows a certain number of events per time frame to succeed
758 and it recommends to block the rest:
760 if($throttler->try_push()) {
761 print "Item can be pushed\n";
763 print "Item needs to wait\n";
766 the C<force =E<gt> 1> option of the try_push() method will cause the
767 counter to be incremented regardless of threshold for use in scenarios
768 where max_items is a threshold rather than throttle condition:
770 if($throttler->try_push('force' => 1)) {
771 print "Item can be pushed\n";
773 print "Counter incremented, Item needs to wait\n";
776 When throttling different categories of items, like attempts to send
777 emails by IP address of the sender, a key can be used:
779 if($throttler->try_push( key => "192.168.0.1" )) {
780 print "Item can be pushed\n";
782 print "Item needs to wait\n";
785 In this case, each key will be tracked separately, even if the quota
786 for one key is maxed out, other keys will still succeed until their
791 To keep track of what happened within the specified time frame,
792 C<Throttler> maintains a round-robin data store, either in
793 memory or on disk. It splits up the controlled time interval into
794 buckets and maintains counters in each bucket:
797 +-----------------------------+
798 | 3 | 7 | 0 | 0 | 4 | 1 |
799 +-----------------------------+
800 4:10 4:20 4:30 4:40 4:50 5:00
802 To decide whether to allow a new event to happen or not, C<Throttler>
803 adds up all counters (3+7+4+1 = 15) and then compares the result
804 to the defined threshold. If the event is allowed, the corresponding
805 counter is increased (last column):
808 +-----------------------------+
809 | 3 | 7 | 0 | 0 | 4 | 2 |
810 +-----------------------------+
811 4:10 4:20 4:30 4:40 4:50 5:00
813 While time progresses, old buckets are expired and then reused
814 for new data. 10 minutes later, the bucket layout would look like this:
817 +-----------------------------+
818 | 7 | 0 | 0 | 4 | 2 | 0 |
819 +-----------------------------+
820 4:20 4:30 4:40 4:50 5:00 5:10
824 When used with a persistent data store, C<Throttler> protects
825 competing applications from clobbering the database by using the locking
826 mechanism offered with C<DBM::Deep>. Both the C<try_push()> and the
827 C<buckets_dump> function already perform locking behind the scenes.
829 If you see a need to lock the data store yourself, i.e. when trying to
830 push counters for several keys simultaneously, use
836 $throttler->unlock();
838 to protect the data store against competing applications.
842 Sometimes, you may need to reset a specific counter, e.g. if an IP
843 address has been unintentionally throttled:
845 my $count = $throttler->reset_key(key => "192.168.0.1");
847 The C<reset_key> method returns the total number of attempts so far.
849 =head2 ADVANCED USAGE
851 By default, C<Throttler> will decide on the number of buckets by
852 dividing the time interval by 10. It won't handle sub-seconds, though,
853 so if the time interval is less then 10 seconds, the number of buckets
854 will be equal to the number of seconds in the time interval.
856 If the default bucket allocation is unsatisfactory, you can specify
859 my $throttler = Throttler->new(
865 Mainly for debugging and testing purposes, you can specify a different
866 time than I<now> when trying to push an item:
868 if($throttler->try_push(
870 time => time() - 600 )) {
871 print "Item can be pushed in the past\n";
874 Also for debugging and testing purposes, you can obtain the current
877 my $val = $throttler->current_value(key => "somekey");
879 Speaking of debugging, there's a utility method C<buckets_dump> which
880 returns a string containing lines with tab-separated cells in them
881 representing what's in each bucket.
887 my $throttler = Throttler->new(
892 $throttler->try_push(key => "foobar");
893 $throttler->try_push(key => "foobar");
894 $throttler->try_push(key => "barfoo");
895 print $throttler->buckets_dump();
897 will print out something like
899 # idx Time: 14:43:00 Key Count
900 1 0 13:49:00 - 13:54:59
901 2 1 13:55:00 - 14:00:59
902 3 2 14:01:00 - 14:06:59
903 4 3 14:07:00 - 14:12:59
904 5 4 14:13:00 - 14:18:59
905 6 5 14:19:00 - 14:24:59
906 7 6 14:25:00 - 14:30:59
907 8 7 14:31:00 - 14:36:59
908 9 8 14:37:00 - 14:42:59
909 10 9 14:43:00 - 14:48:59 barfoo 1
912 and allow for further investigation.
916 Copyright 2007 by Mike Schilli, all rights reserved.
917 This program is free software, you can redistribute it and/or
918 modify it under the same terms as Perl itself.
922 2007, Mike Schilli <cpan@perlmeister.com>