7 uniproc - Universal data processing tool
11 uniproc [I<OPTIONS>] I<INPUTFILE> I<COMMAND> [I<ARGS>]
15 Take each line from I<INPUTFILE> as B<DATA>,
16 pass each piece of B<DATA> to I<COMMAND> as the last argument,
17 then record the exit status.
19 Can be well parallelized.
20 uniproc(1) itself does not run multiple instances of I<COMMAND> in parallel, just in series,
21 but if you start multiple instances of uniproc(1), then you can run I<COMMAND>s concurrently.
22 Locking ensures no overlapping data being processes.
24 Use a wrapper command/script for I<COMMAND> if you want either of these:
28 =item save I<COMMAND>'s output as well.
30 By default it goes to STDOUT.
31 See redirexec(1) for example.
33 =item pass B<DATA> on the STDIN or in environment variable instead of command argument.
35 See args2env(1) for example.
39 If re-run after an interrupt, won't process already processed data.
40 But you may re-try the failed ones by B<--retry> option.
42 The user may append new lines of data to I<INPUTFILE> between executions or during runtime,
43 but not edit or reorder old ones, otherwise results get confused.
51 Process those data which were previously failed (according to B<< I<INPUTFILE>.uniproc >> state file).
57 It maintains I<INPUTFILE>.uniproc file
58 by writing the processing status of each lines of input data in it line-by-line.
59 Processing status is either:
65 processing not yet started
71 =item digits, possibly padded by spaces ( 0)
73 result status (exit code)
75 =item exclamation mark (C<!>) followed by hexadecimal digits (!0f)
77 termination signal (I<COMMAND> teminated abnormally)
79 =item EOF (ie. fewer lines than input data)
81 processing not yet started
85 I<INPUTFILE>.uniproc is locked while read/write to ensure consistency.
86 I<INPUTFILE>.uniproc.B<NUM> are the name of the files which hold the lock for the currently in-progress processes,
87 where B<NUM> is the line number of the corresponding piece of data in I<INPUTFILE>.
88 A lock is held on each of these I<INPUTFILE>.proc.B<NUM> files by the respective instance of I<COMMAND>
89 to detect if the processing is still going or the process crashed.
93 Due to currently used locking mechanism (Fcntl(3perl)), running on multiple hosts may void locking.
97 When running I<COMMAND>, the following environment is set:
101 =item UNIPROC_DATANUM
103 Number of the particular piece of data (ie. line number in I<INPUTFILE>)
104 which is need to be processed by the current process.
110 Display the data processing status before each line of data:
112 paste datafile.uniproc datafile
114 Record output of data processing into a file per each piece of data:
116 uniproc datafile sh -c 'some-command "$@" | tee output-$UNIPROC_DATANUM' --
118 uniproc datafile substenv -e UNIPROC_DATANUM redirexec '1:a:file:output-$UNIPROC_DATANUM' some-command
120 Display data number, processing status, input data, (last line of) output data in a table:
122 join -t $'\t' <(nl -ba -v0 datafile.uniproc) <(nl -ba -v0 datafile) | foreach -t --prefix-add-data --prefix-add-tab tail -n1 output-{0}
127 use Fcntl qw
/:flock :seek/;
129 use Getopt
::Long qw
/:config no_ignore_case no_bundling no_getopt_compat no_auto_abbrev require_order/;
130 use IPC
::Run qw
/run/;
138 'retry' => \
$OptRetry,
139 'debug' => \
$OptDebug,
140 'help' => sub { pod2usage
(-exitval
=>0, -verbose
=>99); },
141 ) or pod2usage
(-exitval
=>2, -verbose
=>99);
143 pod2usage
(-exitval
=>2, -verbose
=>99) unless scalar @ARGV >= 2;
148 warn @_ if $OptDebug;
151 sub deriv_processing_status
153 my $child_status = shift;
154 my $status = sprintf '%3d', WEXITSTATUS
($child_status);
155 $status = sprintf '!%02x', WTERMSIG
($child_status) if WIFSIGNALED
($child_status);
162 my $opts = shift; # supported opts: rw, no_create, lock
163 my $mode = '<'; # default mode is read-only, no-create
166 if(not $opts->{'no_create'})
168 open my $fh, '>>', $path or die "$0: $path: $!\n";
174 open my $fh, $mode, $path or die "$0: $path: $!\n";
175 seek $fh, 0, SEEK_SET
or die "$0: seek: $path: $!\n";
178 flock $fh, LOCK_EX
or die "$0: flock: $path: $!\n";
183 sub extend_resultsfile
185 my $fname = $ResultsFile;
187 my $extended_size = shift;
189 seek $fh, 0, SEEK_END
;
191 my $endpos_last_complete_record = int($size / $ResultsRecordLength) * $ResultsRecordLength;
192 my $records_to_append = ($extended_size - $endpos_last_complete_record) / $ResultsRecordLength;
193 debug_msg
"go to offset $endpos_last_complete_record to extend by $records_to_append records\n";
194 seek $fh, $size, SEEK_SET
;
195 # fill up with empty status records
196 print {$Results_fh} " \n" x
$records_to_append or die "$0: write: $fname: $!\n";
203 die "$0: size mismatch: length(\"$status\") != $ResultsRecordLength - 1\n" if length($status) != $ResultsRecordLength - 1;
205 my $offset = $linenum * $ResultsRecordLength;
206 debug_msg
"go to offset $offset to record data # $linenum 's status \"$status\"\n";
207 seek $Results_fh, $offset, SEEK_SET
;
211 # results file is not big enough, let's extend
212 extend_resultsfile
($Results_fh, $offset);
213 seek $Results_fh, $offset, SEEK_SET
;
215 print {$Results_fh} "$status\n" or die "$0: write: $ResultsFile: $!\n";
220 # TODO use index file, if exists, to seek in.
222 my $asked_line = shift;
223 my $fh = fopen
$InputFile, {no_create
=>1,};
226 while(my $line = <$fh>)
228 if($linenum == $asked_line)
240 sub count_input_records
242 # TODO use index file, if exists.
244 my $fh = fopen
$InputFile, {no_create
=>1,};
246 $linenum++ while scalar <$fh>;
251 sub processing_lockfile_name
253 my $processing_number = shift;
254 return "$InputFile.uniproc.$processing_number";
257 sub still_in_progress
259 my $processing_number = shift;
260 my $lockfile = processing_lockfile_name
($processing_number);
261 open my $fh, '<', $lockfile or return 0;
262 my $lock_ok = flock $fh, LOCK_EX
|LOCK_NB
;
267 sub get_next_data_number
269 debug_msg
"go to offset ".($FirstUnprocessed*$ResultsRecordLength)." to read status of record # $FirstUnprocessed\n";
270 seek $Results_fh, $FirstUnprocessed*$ResultsRecordLength, SEEK_SET
;
272 my $record_num = $FirstUnprocessed;
276 my $nbytes = read $Results_fh, $result, $ResultsRecordLength;
277 debug_msg
"read only $nbytes bytes \"$result\" at record $record_num\n" if $nbytes < $ResultsRecordLength;
278 last if $nbytes < $ResultsRecordLength;
283 $FirstUnprocessed = $record_num;
284 debug_msg
"uninitialized $record_num\n";
289 # check if still locked
290 if(not still_in_progress
($record_num))
292 debug_msg
"crashed $record_num\n";
296 if($OptRetry and ($result =~ /^!/ or ($result =~ /^\s*\d+$/ and $result > 0)))
298 $FirstUnprocessed = $record_num;
299 debug_msg
"retry $record_num\n";
306 # check here if there are more input data than result records
307 my $input_records = count_input_records
();
308 if($record_num < $input_records)
310 extend_resultsfile
($Results_fh, $input_records * $ResultsRecordLength);
311 $FirstUnprocessed = $record_num;
312 debug_msg
"new $record_num\n";
317 debug_msg
"no more input. input_records=$input_records\n";
322 ($InputFile, $Command, @Args) = @ARGV;
324 $FirstUnprocessed = 0;
325 $ResultsRecordLength = 4;
326 $ResultsFile = "$InputFile.uniproc";
331 $Results_fh = fopen
$ResultsFile, {rw
=>1, lock=>1};
332 my $LineNum = get_next_data_number
();
333 last if not defined $LineNum;
335 my $Data = input_data
($LineNum);
337 my $InprogressFile = processing_lockfile_name
($LineNum);
338 my $inprogress_fh = fopen
$InprogressFile, {rw
=>1, lock=>1};
340 record_status
($LineNum, '...');
343 $ENV{'UNIPROC_DATANUM'} = $LineNum;
344 run
[$Command, @Args, $Data];
347 $Results_fh = fopen
$ResultsFile, {rw
=>1, lock=>1};
348 record_status
($LineNum, deriv_processing_status
($status));
351 unlink $InprogressFile;
352 close $inprogress_fh;