7 uniproc - Universal data processing tool
11 uniproc [I<OPTIONS>] I<INPUTFILE> I<COMMAND> [I<ARGS>]
15 Take each line from I<INPUTFILE> as B<DATA>,
16 pass each piece of B<DATA> to I<COMMAND> as the last argument,
17 then record the exit status.
18 Can be well parallelized.
20 Use a wrapper script for I<COMMAND> if you want to:
24 =item save I<COMMAND>'s output as well. By default it goes to STDOUT.
26 =item pass B<DATA> on the STDIN or in environment variable instead of command argument.
30 Maybe interrupted, then re-run to process the remaining data
31 or to re-try the failed ones.
33 May append new lines to I<INPUTFILE> between executions,
34 but not edit or reorder old ones, otherwise results get confused.
42 Process those data which were previously failed.
48 It maintains I<INPUTFILE>.uniproc file
49 by writing the processing status of each lines of input data in it line-by-line.
50 Processing status is either:
54 =item all spaces - processing not yet started
56 =item periods - in progress
58 =item digits (possibly padded by spaces) - result status (exit code)
60 =item exclamation mark (C<!>) followed by hexadecimal digits - termination signal (I<COMMAND> teminated abnormally)
62 =item EOF (ie. fewer lines than input data) - processing not yet started
66 I<INPUTFILE>.uniproc is locked while read/write to ensure consistency.
67 I<INPUTFILE>.uniproc.B<NUM> are the name of the files which hold the lock for the currently in-progress processes,
68 where B<NUM> is the line number of the corresponding piece of data in I<INPUTFILE>.
69 A lock is held on each of these I<INPUTFILE>.proc.B<NUM> files by the respective instance of I<COMMAND>
70 to detect if the processing is still going or the process crashed.
75 use Fcntl qw
/:flock :seek/;
77 use Getopt
::Long qw
/:config no_ignore_case no_bundling no_getopt_compat no_auto_abbrev require_order/;
84 'retry' => \
$OptRetry,
85 'help' => sub { pod2usage
(-exitval
=>0, -verbose
=>99); },
86 ) or pod2usage
(-exitval
=>2, -verbose
=>99);
88 pod2usage
(-exitval
=>2, -verbose
=>99) unless scalar @ARGV >= 2;
95 sub deriv_processing_status
97 my $child_status = shift;
98 my $status = sprintf '%3d', WEXITSTATUS
($child_status);
99 $status = sprintf '!%02x', WTERMSIG
($child_status) if WIFSIGNALED
($child_status);
106 my $opts = shift; # supported opts: rw, no_create, lock
107 my $mode = '<'; # default mode is read-only, no-create
110 if(not $opts->{'no_create'})
112 open my $fh, '>>', $path or die "$0: $path: $!\n";
118 open my $fh, $mode, $path or die "$0: $path: $!\n";
119 seek $fh, 0, SEEK_SET
or die "$0: seek: $path: $!\n";
122 flock $fh, LOCK_EX
or die "$0: flock: $path: $!\n";
127 sub extend_resultsfile
129 my $fname = $ResultsFile;
131 my $extended_size = shift;
133 seek $fh, 0, SEEK_END
;
135 # round down to the nearest complete record
136 $size = int($size / $ResultsRecordLength);
137 seek $fh, $size, SEEK_SET
;
138 # fill up with empty status records
139 print {$Results_fh} " \n" x
(($extended_size - $size) / $ResultsRecordLength) or die "$0: write: $fname: $!\n";
145 my $status = shift; # must be ($ResultsRecordLength - 1) bytes!
146 my $offset = $linenum * $ResultsRecordLength;
147 seek $Results_fh, $offset, SEEK_SET
;
150 # results file is not big enough, let's extend
151 extend_resultsfile
($Results_fh, $offset);
153 seek $Results_fh, $offset, SEEK_SET
;
154 print {$Results_fh} "$status\n" die "$0: write: $ResultsFile: $!\n";
159 my $asked_line = shift;
160 my $fh = fopen
$InputFile, {no_create
=>1,};
162 while(my $line = <$InputFile>)
164 if($linenum == $asked_line)
174 sub count_input_records
176 my $fh = fopen
$InputFile, {no_create
=>1,};
178 $linenum++ while <$InputFile>;
185 seek $Results_fh, $FirstUnprocessed*$ResultsRecordLength, SEEK_SET
;
187 my $record_num = $FirstUnprocessed;
191 my $nbytes = read $Results_fh, $result, $ResultsRecordLength;
192 last if $nbytes < $ResultsRecordLength;
197 $FirstUnprocessed = $record_num;
198 return ($record_num, input_data
($record_num));
202 # check if still locked
209 # check here if there are more input data than result records
210 my $input_records = count_input_records
;
211 if($record_num < $input_records)
213 extend_resultsfile
($Results_fh, $input_records * $ResultsRecordLength);
214 TODO FirstUnprocessed
217 TODO
no more input data
221 ($InputFile, $Command, @Args) = @ARGV;
223 $FirstUnprocessed = 0;
224 $ResultsRecordLength = 4;
225 $ResultsFile = "$InputFile.uniproc";
230 $Results_fh = fopen
$ResultsFile, {rw
=>1, lock=>1};
231 my ($LineNum, $Data) = get_next_data
;
232 last if not defined $LineNum;
234 my $InprogressFile = "$InputFile.uniproc.$LineNum";
235 my $inprogress_fh = fopen
$InprogressFile, {rw
=>1, lock=>1};
237 record_status
($LineNum, '...');
240 run
[$Command, @Args];
243 $Results_fh = fopen
$ResultsFile, {rw
=>1, lock=>1};
244 record_status
($LineNum, deriv_processing_status
($status));
247 close $inprogress_fh;
248 unlink $InprogressFile;