7 uniproc - Universal data processing tool
11 uniproc [I<OPTIONS>] I<INPUTFILE> I<COMMAND> [I<ARGS>]
15 Take each line from I<INPUTFILE> as B<DATA>,
16 pass each piece of B<DATA> to I<COMMAND> as the last argument,
17 then record the exit status.
18 Can be well parallelized.
20 Use a wrapper command/script for I<COMMAND> if you want either of these:
24 =item save I<COMMAND>'s output as well. By default it goes to STDOUT.
26 =item pass B<DATA> on the STDIN or in environment variable instead of command argument.
30 Maybe interrupted, then re-run to process the remaining data
31 or to re-try the failed ones.
33 May append new lines to I<INPUTFILE> between executions,
34 but not edit or reorder old ones, otherwise results get confused.
42 Process those data which were previously failed.
48 It maintains I<INPUTFILE>.uniproc file
49 by writing the processing status of each lines of input data in it line-by-line.
50 Processing status is either:
56 processing not yet started
62 =item digits (possibly padded by spaces)
64 result status (exit code)
66 =item exclamation mark (C<!>) followed by hexadecimal digits
68 termination signal (I<COMMAND> teminated abnormally)
70 =item EOF (ie. fewer lines than input data)
72 processing not yet started
76 I<INPUTFILE>.uniproc is locked while read/write to ensure consistency.
77 I<INPUTFILE>.uniproc.B<NUM> are the name of the files which hold the lock for the currently in-progress processes,
78 where B<NUM> is the line number of the corresponding piece of data in I<INPUTFILE>.
79 A lock is held on each of these I<INPUTFILE>.proc.B<NUM> files by the respective instance of I<COMMAND>
80 to detect if the processing is still going or the process crashed.
85 use Fcntl qw
/:flock :seek/;
87 use Getopt
::Long qw
/:config no_ignore_case no_bundling no_getopt_compat no_auto_abbrev require_order/;
95 'retry' => \
$OptRetry,
96 'help' => sub { pod2usage
(-exitval
=>0, -verbose
=>99); },
97 ) or pod2usage
(-exitval
=>2, -verbose
=>99);
99 pod2usage
(-exitval
=>2, -verbose
=>99) unless scalar @ARGV >= 2;
107 sub deriv_processing_status
109 my $child_status = shift;
110 my $status = sprintf '%3d', WEXITSTATUS
($child_status);
111 $status = sprintf '!%02x', WTERMSIG
($child_status) if WIFSIGNALED
($child_status);
118 my $opts = shift; # supported opts: rw, no_create, lock
119 my $mode = '<'; # default mode is read-only, no-create
122 if(not $opts->{'no_create'})
124 open my $fh, '>>', $path or die "$0: $path: $!\n";
130 open my $fh, $mode, $path or die "$0: $path: $!\n";
131 seek $fh, 0, SEEK_SET
or die "$0: seek: $path: $!\n";
134 flock $fh, LOCK_EX
or die "$0: flock: $path: $!\n";
139 sub extend_resultsfile
141 my $fname = $ResultsFile;
143 my $extended_size = shift;
145 seek $fh, 0, SEEK_END
;
147 my $endpos_last_complete_record = int($size / $ResultsRecordLength) * $ResultsRecordLength;
148 my $records_to_append = ($extended_size - $endpos_last_complete_record) / $ResultsRecordLength;
149 debug_msg
"go to offset $endpos_last_complete_record to extend by $records_to_append records\n";
150 seek $fh, $size, SEEK_SET
;
151 # fill up with empty status records
152 print {$Results_fh} " \n" x
$records_to_append or die "$0: write: $fname: $!\n";
159 die "$0: size mismatch: length(\"$status\") != $ResultsRecordLength - 1\n" if length($status) != $ResultsRecordLength - 1;
161 my $offset = $linenum * $ResultsRecordLength;
162 debug_msg
"go to offset $offset to record data # $linenum 's status \"$status\"\n";
163 seek $Results_fh, $offset, SEEK_SET
;
167 # results file is not big enough, let's extend
168 extend_resultsfile
($Results_fh, $offset);
169 seek $Results_fh, $offset, SEEK_SET
;
171 print {$Results_fh} "$status\n" or die "$0: write: $ResultsFile: $!\n";
176 # TODO use index file, if exists, to seek in.
178 my $asked_line = shift;
179 my $fh = fopen
$InputFile, {no_create
=>1,};
182 while(my $line = <$fh>)
184 if($linenum == $asked_line)
196 sub count_input_records
198 # TODO use index file, if exists.
200 my $fh = fopen
$InputFile, {no_create
=>1,};
202 $linenum++ while scalar <$fh>;
207 sub processing_lockfile_name
209 my $processing_number = shift;
210 return "$InputFile.uniproc.$processing_number";
213 sub still_in_progress
215 my $processing_number = shift;
216 my $lockfile = processing_lockfile_name
($processing_number);
217 open my $fh, '<', $lockfile or return 0;
218 my $lock_ok = flock $fh, LOCK_EX
|LOCK_NB
;
223 sub get_next_data_number
225 debug_msg
"go to offset ".($FirstUnprocessed*$ResultsRecordLength)." to read status of record # $FirstUnprocessed\n";
226 seek $Results_fh, $FirstUnprocessed*$ResultsRecordLength, SEEK_SET
;
228 my $record_num = $FirstUnprocessed;
232 my $nbytes = read $Results_fh, $result, $ResultsRecordLength;
233 debug_msg
"read only $nbytes bytes \"$result\" at record $record_num\n" if $nbytes < $ResultsRecordLength;
234 last if $nbytes < $ResultsRecordLength;
239 $FirstUnprocessed = $record_num;
240 debug_msg
"uninitialized $record_num\n";
245 # check if still locked
246 if(not still_in_progress
($record_num))
248 debug_msg
"crashed $record_num\n";
252 if($OptRetry and ($result =~ /^!/ or ($result =~ /^\s*\d+$/ and $result > 0)))
254 $FirstUnprocessed = $record_num;
255 debug_msg
"retry $record_num\n";
262 # check here if there are more input data than result records
263 my $input_records = count_input_records
();
264 if($record_num < $input_records)
266 extend_resultsfile
($Results_fh, $input_records * $ResultsRecordLength);
267 $FirstUnprocessed = $record_num;
268 debug_msg
"new $record_num\n";
273 debug_msg
"no more input. input_records=$input_records\n";
278 ($InputFile, $Command, @Args) = @ARGV;
280 $FirstUnprocessed = 0;
281 $ResultsRecordLength = 4;
282 $ResultsFile = "$InputFile.uniproc";
287 $Results_fh = fopen
$ResultsFile, {rw
=>1, lock=>1};
288 my $LineNum = get_next_data_number
();
289 last if not defined $LineNum;
291 my $Data = input_data
($LineNum);
293 my $InprogressFile = processing_lockfile_name
($LineNum);
294 my $inprogress_fh = fopen
$InprogressFile, {rw
=>1, lock=>1};
296 record_status
($LineNum, '...');
299 run
[$Command, @Args, $Data];
302 $Results_fh = fopen
$ResultsFile, {rw
=>1, lock=>1};
303 record_status
($LineNum, deriv_processing_status
($status));
306 unlink $InprogressFile;
307 close $inprogress_fh;