jobd: don't update new projects that are not fully mirrored yet
[girocco/msimkins.git] / jobd / jobd.pl
blobffb442744858cf502d70dc18ef7a22ce598d2704
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
14 use Girocco::Config;
15 use Girocco::Project;
16 use Girocco::User;
18 # Options
19 my $quiet;
20 my $progress;
21 my $kill_after = 900;
22 my $max_par = 20;
23 my $max_par_intensive = 3; # no command line option right now
24 my $lockfile = "/tmp/jobd.lock";
25 my $all_once;
26 my $one;
28 ######### Jobs {{{1
30 sub update_project {
31 my $job = shift;
32 my $p = $job->{'project'};
33 check_project_exists($job) || return;
34 if (-e get_project_path($p).".nofetch") {
35 job_skip($job);
36 return setup_gc($job);
38 if (-e get_project_path($p).".clone_in_progress") {
39 job_skip($job, "initial mirroring not complete yet");
40 return setup_gc($job);
42 if (my $ts = is_operation_uptodate($p, 'lastrefresh', $Girocco::Config::min_mirror_interval)) {
43 job_skip($job, "not needed right now, last run at $ts");
44 setup_gc($job);
45 return;
47 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
50 sub gc_project {
51 my $job = shift;
52 my $p = $job->{'project'};
53 check_project_exists($job) || return;
54 if (my $ts = is_operation_uptodate($p, 'lastgc', $Girocco::Config::min_gc_interval)) {
55 job_skip($job, "not needed right now, last run at $ts");
56 return;
58 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
61 sub setup_gc {
62 my $job = shift;
63 queue_job(
64 project => $job->{'project'},
65 type => 'gc',
66 command => \&gc_project,
67 intensive => 1,
71 sub check_project_exists {
72 my $job = shift;
73 my $p = $job->{'project'};
74 if (!-d get_project_path($p)) {
75 job_skip($job, "non-existent project");
76 return 0;
81 sub get_project_path {
82 "$Girocco::Config::reporoot/".shift().".git/";
85 sub is_operation_uptodate {
86 my ($project, $which, $threshold) = @_;
87 my $path = get_project_path($project);
88 my $timestamp = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.$which"`;
89 my $unix_ts = `date +%s -d "$timestamp"`;
90 (time - $unix_ts) <= $threshold ? $timestamp : undef;
93 sub queue_one {
94 my $project = shift;
95 queue_job(
96 project => $project,
97 type => 'update',
98 command => \&update_project,
99 on_success => \&setup_gc,
100 on_error => \&setup_gc,
104 sub queue_all {
105 queue_one($_) for (Girocco::Project->get_full_list());
108 ######### Daemon operation {{{1
110 my @queue;
111 my @running;
112 my $perpetual = 1;
113 my $locked = 0;
114 my $jobs_executed;
115 my $jobs_skipped;
116 my @jobs_killed;
118 sub handle_softexit {
119 error("Waiting for outstanding jobs to finish... ".
120 "^C again to exit immediately");
121 @queue = ();
122 $perpetual = 0;
123 $SIG{'INT'} = \&handle_exit;
126 sub handle_exit {
127 error("Killing outstanding jobs...");
128 $SIG{'TERM'} = 'IGNORE';
129 for (@running) {
130 kill 'KILL', -($_->{'pid'});
132 unlink $lockfile if ($locked);
133 exit(0);
136 sub queue_job {
137 my %opts = @_;
138 $opts{'queued_at'} = time;
139 $opts{'dont_run'} = 0;
140 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
141 push @queue, \%opts;
144 sub run_job {
145 my $job = shift;
147 push @running, $job;
148 $job->{'command'}->($job);
149 if ($job->{'dont_run'}) {
150 pop @running;
151 $jobs_skipped++;
152 return;
156 sub _job_name {
157 my $job = shift;
158 "[".$job->{'type'}."::".$job->{'project'}."]";
161 # Only one of those per job!
162 sub exec_job_command {
163 my ($job, $command, $err_only) = @_;
165 my $pid;
166 if (!defined($pid = fork)) {
167 error(_job_name($job) ." Can't fork job: $!");
168 $job->{'finished'} = 1;
169 return;
171 if (!$pid) {
172 open STDIN, '/dev/null' || do {
173 error(_job_name($job) ."Can't read from /dev/null: $!");
174 $job->{'finished'} = 1;
175 return;
177 if ($err_only) {
178 open STDOUT, '>/dev/null' || do {
179 error(_job_name($job) ." Can't write to /dev/null: $!");
180 $job->{'finished'} = 1;
181 return;
184 # New process group so we can keep track of all of its children
185 if (!defined(POSIX::setpgid(0, 0))) {
186 error(_job_name($job) ." Can't create process group: $!");
187 $job->{'finished'} = 1;
188 return;
190 # "Prevent" races
191 select(undef, undef, undef, 0.1);
192 exec @$command;
193 # Stop perl from complaining
194 exit $?;
196 $job->{'pid'} = $pid;
197 $job->{'finished'} = 0;
198 $job->{'started_at'} = time;
201 sub job_skip {
202 my ($job, $msg) = @_;
203 $job->{'dont_run'} = 1;
204 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
207 sub reap_hanging_jobs {
208 for (@running) {
209 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > $kill_after) {
210 $_->{'finished'} = 1;
211 kill 'KILL', -($_->{'pid'});
212 print STDERR _job_name($_) ." KILLED due to timeout\n";
213 push @jobs_killed, _job_name($_);
218 sub reap_finished_jobs {
219 my $pid;
220 my $finished_any = 0;
221 while (1) {
222 $pid = waitpid(-1, WNOHANG);
223 last if $pid < 1;
224 $finished_any = 1;
226 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
227 if ($?) {
228 # XXX- we currently don't care
230 if (@child && !$child[0]->{'finished'}) {
231 $child[0]->{'on_success'}->($child[0]) if defined($child[0]->{'on_success'});
232 $child[0]->{'finished'} = 1;
233 $jobs_executed++;
234 } elsif (@child) {
235 $child[0]->{'on_error'}->($child[0]) if defined($child[0]->{'on_error'});
238 @running = grep { $_->{'finished'} == 0 } @running;
239 $finished_any;
242 sub have_intensive_jobs {
243 grep { $_->{'intensive'} == 1 } @running;
246 sub run_queue {
247 my $last_progress = time;
248 $jobs_executed = 0;
249 $jobs_skipped = 0;
250 @jobs_killed = ();
251 if ($progress) {
252 printf STDERR "--- Processing %d queued jobs\n", scalar(@queue);
254 $SIG{'INT'} = \&handle_softexit;
255 $SIG{'TERM'} = \&handle_exit;
256 while (@queue || @running) {
257 reap_hanging_jobs();
258 my $proceed_immediately = reap_finished_jobs();
259 # Back off if we're too busy
260 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue) {
261 sleep 1 unless $proceed_immediately;
262 if ($progress && (time - $last_progress) >= 60) {
263 printf STDERR "STATUS: %d queued, %d running, %d finished, %d skipped, %d killed\n", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
264 if (@running) {
265 my @run_status;
266 for (@running) {
267 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
269 error("STATUS: currently running: ". join(', ', @run_status));
271 $last_progress = time;
273 next;
275 # Run next
276 run_job(shift(@queue)) if @queue;
278 if ($progress) {
279 printf STDERR "--- Queue processed. %d jobs executed, %d skipped, %d killed. Now restarting.\n", $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
283 sub run_perpetually {
284 if (-e $lockfile) {
285 die "Lockfile exists. Please make sure no other instance of jobd is running.";
287 open LOCK, '>', $lockfile || die "Cannot create lockfile $lockfile: $!";
288 print LOCK $$;
289 close LOCK;
290 $locked = 1;
292 while ($perpetual) {
293 queue_all();
294 run_queue();
296 unlink $lockfile;
299 ######### Helpers {{{1
301 sub error($) {
302 print STDERR shift()."\n";
304 sub fatal($) {
305 error(shift);
306 exit 1;
309 ######### Main {{{1
311 # Parse options
312 Getopt::Long::Configure('bundling');
313 my $parse_res = GetOptions(
314 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
315 'quiet|q' => \$quiet,
316 'progress|P' => \$progress,
317 'kill-after|k=i' => \$kill_after,
318 'max-parallel|p=i' => \$max_par,
319 'lockfile|l=s' => \$lockfile,
320 'all-once|a' => \$all_once,
321 'one|o=s' => \$one,
322 ) || pod2usage(2);
323 fatal("Error: can only use one out of --all-once and --one")
324 if ($all_once && $one);
326 unless ($quiet) {
327 $ENV{'show_progress'} = '1';
328 $progress = 1;
331 if ($one) {
332 queue_one($one);
333 run_queue();
334 exit;
337 if ($all_once) {
338 queue_all();
339 run_queue();
340 exit;
343 run_perpetually();
345 ########## Documentation {{{1
347 __END__
349 =head1 NAME
351 jobd - Perform Girocco maintenance jobs
353 =head1 SYNOPSIS
355 jobd [options]
357 Options:
358 -h | --help detailed instructions
359 -q | --quiet run quietly
360 -P | --progress show occasional status updates
361 -k SECONDS | --kill-after=SECONDS how long to wait before killing jobs
362 -p NUM | --max-parallel=NUM how many jobs to run at the same time
363 -l FILE | --lockfile=FILE create a lockfile in the given location
364 -a | --all-once process the list only once
365 -o PRJNAME | --one=PRJNAME process only one project
367 =head1 OPTIONS
369 =over 8
371 =item B<--help>
373 Print the full description of jobd's options.
375 =item B<--quiet>
377 Suppress non-error messages, e.g. for use when running this task as a cronjob.
379 =item B<--progress>
381 Show information about the current status of the job queue occasionally. This
382 is automatically enabled if --quiet is not given.
384 =item B<--kill-after=SECONDS>
386 Kill supervised jobs after a certain time to avoid hanging the daemon.
388 =item B<--max-parallel=NUM>
390 Run no more than that many jobs at the same time.
392 =item B<--lockfile=FILE>
394 For perpetual operation, create a lockfile in that place and clean it up after
395 finishing/aborting.
397 =item B<--all-once>
399 Instead of perpetuously processing all projects over and over again, process
400 them just once and then exit.
402 =item B<--one=PRJNAME>
404 Process only the given project (given as just the project name without C<.git>
405 suffix) and then exit.
407 =back
409 =head1 DESCRIPTION
411 jobd is Girocco's repositories maintenance servant; it periodically checks all
412 the repositories and updates mirrored repositories and repacks push-mode
413 repositories when needed.
415 =cut