1 package Fedora
::Rebuild
::Scheduler
;
9 use MooseX
::Types
::Moose
;
10 use Term
::ProgressBar
;
13 use version
0.77; our $VERSION = version
->declare("v0.12.1");
15 # Maximal number of jobs to run in parallel. Default is 1.
16 has
'limit' => (is
=> 'ro', isa
=> 'Int',
17 lazy_build
=> 1, builder
=> '_build_limit');
18 # Total number of jobs to proceed. If set to positive number, a progress bar
19 # will updated and printed to stdout after finishing a job. Default value is
20 # zero meaning not to print any progress bar.
21 has
'total' => (is
=> 'ro', isa
=> 'Int', lazy_build
=> 1);
22 # Label of scheduler to print as title of the progress bar.
23 has
'name' => (is
=> 'ro', isa
=> 'Str', lazy_build
=> 1);
25 has
'limiter' => (is
=> 'ro', isa
=> 'Thread::Semaphore',
26 lazy_build
=> 1, init_arg
=> undef);
27 has
'workers' => (is
=> 'ro', isa
=> 'HashRef[Int]',
28 lazy_build
=> 1, init_arg
=> undef);
29 has
'progress' => (is
=> 'rw', isa
=> 'Term::ProgressBar',
30 lazy_build
=> 1, init_arg
=> undef);
31 has
'done' => (is
=> 'rw', isa
=> 'Int',
32 lazy_build
=> 1, init_arg
=> undef);
33 has
'next_progress_update' => (is
=> 'rw', isa
=> 'Num',
34 lazy_build
=> 1, init_arg
=> undef);
36 # Allow to construct as Fedora::Rebuild::Scheduler->new(NUMBER);
37 around BUILDARGS
=> sub {
41 if (@_ == 1 && !ref $_[0]) {
42 return $class->$orig(limit
=> $_[0] // 1);
45 return $class->$orig(@_);
52 if ($self->limit < 1) {
53 croak
"Constructing scheduler with limit bellow 1 dead-locks the " .
59 my ($self, $value) = @_;
64 my ($self, $value) = @_;
69 my ($self, $value) = @_;
75 #print "Worker limit is $limit\n";
76 return Thread
::Semaphore
->new($self->limit);
85 return Term
::ProgressBar
->new({
86 count
=> $self->total,
97 sub _build_next_progress_update
{
101 # Submit a job with arguments.
102 # Return identifier of a spawned job or undef if error occured.
103 # Blocks until the job can be spawned.
105 my ($self, $job, @args) = @_;
106 #print "A Job (@args) submitted into scheduler\n";
107 confess
"$job must be reference to CODE" unless(ref $job eq 'CODE');
109 $self->limiter->down;
110 my $thread = threads
->create(\
&slave
, $self, $job, @args);
112 if (! defined $thread) {
113 carp
"Could not spawn thread for job (@args): $!\n" .
114 "Decreasing worker limit by one\n";
117 #print "Job (@args) spawned\n";
118 my $tid = $thread->tid;
119 ${$self->workers}{$tid} = undef;
124 # Run a job and return reference to array [return value of job, exception $@].
125 # Return value of job will be undefined in case of defined exception.
127 my ($self, $job, @args) = @_;
130 $retval = &{$job}(@args);
134 #print "A job (@args) died with $@\n";
137 #print "A job (@args) terminated with $retval\n";
138 return [$retval, $@
];
141 # If argument is true collects finished jobs only,
142 # otherwise blocks and waits for unfinished workers.
143 # Return job exit states [retval, exception] hashed by worker ID.
145 my ($self, $non_blocking) = @_;
148 if ((defined $non_blocking) && $non_blocking) {
149 #print "Gathering finished jobs...\n";
150 @threads = threads
->list(threads
::joinable
);
152 #print "Waiting for unfinished jobs\n";
153 @threads = threads
->list();
155 # XXX: take care only about threads spawned by $self instance to allow
156 # concurent class instances.
157 foreach my $thread (grep { exists ${$self->workers}{$_->tid} } @threads) {
158 #print "Waiting on thread " . $thread->tid . "...\n";
159 my $retval = $thread->join;
160 #print "Thread " . $thread->tid . " terminated with $retval.\n";
161 $finished{$thread->tid} = $retval;
163 $self->done($self->done() + 1);
164 if ($self->total > 0 && $self->done >= $self->next_progress_update) {
165 $self->next_progress_update($self->progress->update($self->done));
171 # Collects finished jobs only.
172 # Return job exit states hashed by worker ID.
175 return $self->finish(1);