Add `koji' option to redefine Koji client
[Fedora-Rebuild.git] / lib / Fedora / Rebuild / Scheduler.pm
blob8b2a4a1feffa6cc9bb02d94022471c6ce30d7382
1 package Fedora::Rebuild::Scheduler;
2 use strict;
3 use warnings;
4 use Moose;
5 use Carp;
6 use threads;
7 use threads::shared;
8 use Thread::Semaphore;
9 use MooseX::Types::Moose;
10 use Term::ProgressBar;
11 use namespace::clean;
13 use version 0.77; our $VERSION = version->declare("v0.12.1");
15 # Maximal number of jobs to run in parallel. Default is 1.
16 has 'limit' => (is => 'ro', isa => 'Int',
17 lazy_build => 1, builder => '_build_limit');
18 # Total number of jobs to proceed. If set to positive number, a progress bar
19 # will updated and printed to stdout after finishing a job. Default value is
20 # zero meaning not to print any progress bar.
21 has 'total' => (is => 'ro', isa => 'Int', lazy_build => 1);
22 # Label of scheduler to print as title of the progress bar.
23 has 'name' => (is => 'ro', isa => 'Str', lazy_build => 1);
25 has 'limiter' => (is => 'ro', isa => 'Thread::Semaphore',
26 lazy_build => 1, init_arg => undef);
27 has 'workers' => (is => 'ro', isa => 'HashRef[Int]',
28 lazy_build => 1, init_arg => undef);
29 has 'progress' => (is => 'rw', isa => 'Term::ProgressBar',
30 lazy_build => 1, init_arg => undef);
31 has 'done' => (is => 'rw', isa => 'Int',
32 lazy_build => 1, init_arg => undef);
33 has 'next_progress_update' => (is => 'rw', isa => 'Num',
34 lazy_build => 1, init_arg => undef);
36 # Allow to construct as Fedora::Rebuild::Scheduler->new(NUMBER);
37 around BUILDARGS => sub {
38 my $orig = shift;
39 my $class = shift;
41 if (@_ == 1 && !ref $_[0]) {
42 return $class->$orig(limit => $_[0] // 1);
44 else {
45 return $class->$orig(@_);
49 sub BUILD {
50 my $self = shift;
52 if ($self->limit < 1) {
53 croak "Constructing scheduler with limit bellow 1 dead-locks the " .
54 "scheduler";
58 sub _build_limit {
59 my ($self, $value) = @_;
60 return $value;
63 sub _build_total {
64 my ($self, $value) = @_;
65 return $value // 0;
68 sub _build_name {
69 my ($self, $value) = @_;
70 return $value;
73 sub _build_limiter {
74 my $self = shift;
75 #print "Worker limit is $limit\n";
76 return Thread::Semaphore->new($self->limit);
79 sub _build_workers {
80 return {};
83 sub _build_progress {
84 my $self = shift;
85 return Term::ProgressBar->new({
86 count => $self->total,
87 name => $self->name,
88 ETA => 'linear',
89 fh => \*STDOUT
90 });
93 sub _build_done {
94 return 0;
97 sub _build_next_progress_update {
98 return 0;
101 # Submit a job with arguments.
102 # Return identifier of a spawned job or undef if error occured.
103 # Blocks until the job can be spawned.
104 sub schedule {
105 my ($self, $job, @args) = @_;
106 #print "A Job (@args) submitted into scheduler\n";
107 confess "$job must be reference to CODE" unless(ref $job eq 'CODE');
109 $self->limiter->down;
110 my $thread = threads->create(\&slave, $self, $job, @args);
112 if (! defined $thread) {
113 carp "Could not spawn thread for job (@args): $!\n" .
114 "Decreasing worker limit by one\n";
115 return undef;
116 } else {
117 #print "Job (@args) spawned\n";
118 my $tid = $thread->tid;
119 ${$self->workers}{$tid} = undef;
120 return $tid;
124 # Run a job and return reference to array [return value of job, exception $@].
125 # Return value of job will be undefined in case of defined exception.
126 sub slave {
127 my ($self, $job, @args) = @_;
128 my $retval;
129 eval {
130 $retval = &{$job}(@args);
132 $self->limiter->up;
133 if ($@) {
134 #print "A job (@args) died with $@\n";
135 return [undef, $@];
137 #print "A job (@args) terminated with $retval\n";
138 return [$retval, $@];
141 # If argument is true collects finished jobs only,
142 # otherwise blocks and waits for unfinished workers.
143 # Return job exit states [retval, exception] hashed by worker ID.
144 sub finish {
145 my ($self, $non_blocking) = @_;
146 my @threads;
147 my %finished;
148 if ((defined $non_blocking) && $non_blocking) {
149 #print "Gathering finished jobs...\n";
150 @threads = threads->list(threads::joinable);
151 } else {
152 #print "Waiting for unfinished jobs\n";
153 @threads = threads->list();
155 # XXX: take care only about threads spawned by $self instance to allow
156 # concurent class instances.
157 foreach my $thread (grep { exists ${$self->workers}{$_->tid} } @threads) {
158 #print "Waiting on thread " . $thread->tid . "...\n";
159 my $retval = $thread->join;
160 #print "Thread " . $thread->tid . " terminated with $retval.\n";
161 $finished{$thread->tid} = $retval;
163 $self->done($self->done() + 1);
164 if ($self->total > 0 && $self->done >= $self->next_progress_update) {
165 $self->next_progress_update($self->progress->update($self->done));
168 return %finished;
171 # Collects finished jobs only.
172 # Return job exit states hashed by worker ID.
173 sub finished {
174 my $self = shift;
175 return $self->finish(1);