WIP: uniproc
[hband-tools.git] / user-tools / uniproc
blobe2e3f3ba93efd5d9f426e63116284dccf820553a
1 #!/usr/bin/env perl
3 =pod
5 =head1 NAME
7 uniproc - Universal data processing tool
9 =head1 SYNOPSIS
11 uniproc [I<OPTIONS>] I<INPUTFILE> I<COMMAND> [I<ARGS>]
13 =head1 DESCRIPTION
15 Take each line from I<INPUTFILE> as B<DATA>,
16 pass each piece of B<DATA> to I<COMMAND> as the last argument,
17 then record the exit status.
18 Can be well parallelized.
20 Use a wrapper script for I<COMMAND> if you want to:
22 =over 4
24 =item save I<COMMAND>'s output as well. By default it goes to STDOUT.
26 =item pass B<DATA> on the STDIN or in environment variable instead of command argument.
28 =back
30 Maybe interrupted, then re-run to process the remaining data
31 or to re-try the failed ones.
33 May append new lines to I<INPUTFILE> between executions,
34 but not edit or reorder old ones, otherwise results get confused.
36 =head1 OPTIONS
38 =over 4
40 =item --retry
42 Process those data which were previously failed.
44 =back
46 =head1 FILES
48 It maintains I<INPUTFILE>.uniproc file
49 by writing the processing status of each lines of input data in it line-by-line.
50 Processing status is either:
52 =over 4
54 =item all spaces - processing not yet started
56 =item periods - in progress
58 =item digits (possibly padded by spaces) - result status (exit code)
60 =item exclamation mark (C<!>) followed by hexadecimal digits - termination signal (I<COMMAND> teminated abnormally)
62 =item EOF (ie. fewer lines than input data) - processing not yet started
64 =back
66 I<INPUTFILE>.uniproc is locked while read/write to ensure consistency.
67 I<INPUTFILE>.uniproc.B<NUM> are the name of the files which hold the lock for the currently in-progress processes,
68 where B<NUM> is the line number of the corresponding piece of data in I<INPUTFILE>.
69 A lock is held on each of these I<INPUTFILE>.proc.B<NUM> files by the respective instance of I<COMMAND>
70 to detect if the processing is still going or the process crashed.
72 =cut
75 use Fcntl qw/:flock :seek/;
76 use Data::Dumper;
77 use Getopt::Long qw/:config no_ignore_case no_bundling no_getopt_compat no_auto_abbrev require_order/;
78 use IPC::Run qw/run/;
79 use POSIX;
81 $OptRetry = 0;
83 GetOptions(
84 'retry' => \$OptRetry,
85 'help' => sub { pod2usage(-exitval=>0, -verbose=>99); },
86 ) or pod2usage(-exitval=>2, -verbose=>99);
88 pod2usage(-exitval=>2, -verbose=>99) unless scalar @ARGV >= 2;
90 sub get_next_data
95 sub deriv_processing_status
97 my $child_status = shift;
98 my $status = sprintf '%3d', WEXITSTATUS($child_status);
99 $status = sprintf '!%02x', WTERMSIG($child_status) if WIFSIGNALED($child_status);
100 return $status;
103 sub fopen
105 my $path = shift;
106 my $opts = shift; # supported opts: rw, no_create, lock
107 my $mode = '<'; # default mode is read-only, no-create
108 if($opts->{'rw'})
110 if(not $opts->{'no_create'})
112 open my $fh, '>>', $path or die "$0: $path: $!\n";
113 close $fh;
115 $mode = '+<';
118 open my $fh, $mode, $path or die "$0: $path: $!\n";
119 seek $fh, 0, SEEK_SET or die "$0: seek: $path: $!\n";
120 if($opts->{'lock'})
122 flock $fh, LOCK_EX or die "$0: flock: $path: $!\n";
124 return $fh
127 sub extend_resultsfile
129 my $fname = $ResultsFile;
130 my $fh = shift;
131 my $extended_size = shift;
133 seek $fh, 0, SEEK_END;
134 my $size = tell $fh;
135 # round down to the nearest complete record
136 $size = int($size / $ResultsRecordLength);
137 seek $fh, $size, SEEK_SET;
138 # fill up with empty status records
139 print {$Results_fh} " \n" x (($extended_size - $size) / $ResultsRecordLength) or die "$0: write: $fname: $!\n";
142 sub record_status
144 my $linenum = shift;
145 my $status = shift; # must be ($ResultsRecordLength - 1) bytes!
146 my $offset = $linenum * $ResultsRecordLength;
147 seek $Results_fh, $offset, SEEK_SET;
148 if(eof $Results_fh)
150 # results file is not big enough, let's extend
151 extend_resultsfile($Results_fh, $offset);
153 seek $Results_fh, $offset, SEEK_SET;
154 print {$Results_fh} "$status\n" die "$0: write: $ResultsFile: $!\n";
157 sub input_data
159 my $asked_line = shift;
160 my $fh = fopen $InputFile, {no_create=>1,};
161 my $linenum = 0;
162 while(my $line = <$InputFile>)
164 if($linenum == $asked_line)
166 close $fh;
167 return $line;
170 close $fh;
171 return undef;
174 sub count_input_records
176 my $fh = fopen $InputFile, {no_create=>1,};
177 my $linenum = 0;
178 $linenum++ while <$InputFile>;
179 close $fh;
180 return $linenum;
183 sub get_next_data
185 seek $Results_fh, $FirstUnprocessed*$ResultsRecordLength, SEEK_SET;
187 my $record_num = $FirstUnprocessed;
188 my $result;
189 while(1)
191 my $nbytes = read $Results_fh, $result, $ResultsRecordLength;
192 last if $nbytes < $ResultsRecordLength;
193 chomp $result;
195 if($result eq ' ')
197 $FirstUnprocessed = $record_num;
198 return ($record_num, input_data($record_num));
200 if($result eq '...')
202 # check if still locked
203 # TODO
206 $record_num++;
209 # check here if there are more input data than result records
210 my $input_records = count_input_records;
211 if($record_num < $input_records)
213 extend_resultsfile($Results_fh, $input_records * $ResultsRecordLength);
214 TODO FirstUnprocessed
215 TODO return
217 TODO no more input data
221 ($InputFile, $Command, @Args) = @ARGV;
223 $FirstUnprocessed = 0;
224 $ResultsRecordLength = 4;
225 $ResultsFile = "$InputFile.uniproc";
226 $Results_fh = undef;
228 while(1)
230 $Results_fh = fopen $ResultsFile, {rw=>1, lock=>1};
231 my ($LineNum, $Data) = get_next_data;
232 last if not defined $LineNum;
234 my $InprogressFile = "$InputFile.uniproc.$LineNum";
235 my $inprogress_fh = fopen $InprogressFile, {rw=>1, lock=>1};
237 record_status($LineNum, '...');
238 close $Results_fh;
240 run [$Command, @Args];
241 my $status = $?;
243 $Results_fh = fopen $ResultsFile, {rw=>1, lock=>1};
244 record_status($LineNum, deriv_processing_status($status));
245 close $Results_fh;
247 close $inprogress_fh;
248 unlink $InprogressFile;