7 uniproc - Universal data processing tool
11 uniproc [I<OPTIONS>] I<INPUTFILE> I<COMMAND> [I<ARGS>]
15 Take each line from I<INPUTFILE> as B<DATA>,
16 pass each piece of B<DATA> to I<COMMAND> as the last argument,
17 then record the exit status.
19 Can be well parallelized.
20 uniproc(1) itself does not run multiple instances of I<COMMAND> in parallel, just in series,
21 but if you start multiple instances of uniproc(1), then you can run I<COMMAND>s concurrently.
22 Locking ensures no overlapping data being processes.
24 Use a wrapper command/script for I<COMMAND> if you want either of these:
28 =item save I<COMMAND>'s output as well.
30 By default it goes to STDOUT.
31 See redirexec(1) for example.
33 =item pass B<DATA> on the STDIN or in environment variable instead of command argument.
35 See args2env(1) for example.
39 If re-run after an interrupt, won't process already processed data.
40 But you may re-try the failed ones by B<--retry> option.
42 The user may append new lines of data to I<INPUTFILE> between executions or during runtime,
43 but not edit or reorder old ones, otherwise results get confused.
51 Process those data which were previously failed (according to B<< I<INPUTFILE>.uniproc >> state file) too,
52 besides the unprocessed ones.
56 Process only 1 item, then exit.
57 Default is to process as many items in series as possible.
63 It maintains I<INPUTFILE>.uniproc file
64 by writing the processing status of each lines of input data in it line-by-line.
65 Processing status is either:
71 processing not yet started
77 =item digits, possibly padded by spaces ( 0)
79 result status (exit code)
81 =item exclamation mark (C<!>) followed by hexadecimal digits (!0f)
83 termination signal (I<COMMAND> teminated abnormally)
85 =item EOF (ie. fewer lines than input data)
87 processing not yet started
91 I<INPUTFILE>.uniproc is locked while read/write to ensure consistency.
92 I<INPUTFILE>.uniproc.B<NUM> are the name of the files which hold the lock for the currently in-progress processes,
93 where B<NUM> is the line number of the corresponding piece of data in I<INPUTFILE>.
94 A lock is held on each of these I<INPUTFILE>.proc.B<NUM> files by the respective instance of I<COMMAND>
95 to detect if the processing is still going or the process crashed.
99 Due to currently used locking mechanism (Fcntl(3perl)), running on multiple hosts may void locking.
103 When running I<COMMAND>, the following environment is set:
107 =item UNIPROC_DATANUM
109 Number of the particular piece of data (ie. line number in I<INPUTFILE>)
110 which is need to be processed by the current process.
116 Display the data processing status before each line of data:
118 paste datafile.uniproc datafile
122 awk -v total=$(wc -l < datafile) 'BEGIN{ok=ip=fail=0} {if($1==0){ok++} else if($1=="..."){ip++} else if($1!=""){fail++}} END{print "total: "total", completed: "ok" ("(ok*100/total)"%), in-progress: "ip" ("(ip*100/total)"%), failed: "fail" ("(fail*100/total)"%)"}' datafile.uniproc
126 total: 8, completed: 4 (50%), in-progress: 1 (12.5%), failed: 1 (12.5%)
128 Record output of data processing into a file per each piece of data:
130 uniproc datafile sh -c 'some-command "$@" | tee output-$UNIPROC_DATANUM' --
132 uniproc datafile substenv -e UNIPROC_DATANUM redirexec '1:a:file:output-$UNIPROC_DATANUM' some-command
134 Display data number, processing status, input data, (last line of) output data in a table:
136 join -t $'\t' <(nl -ba -v0 datafile.uniproc) <(nl -ba -v0 datafile) | foreach -t --prefix-add-data --prefix-add-tab tail -n1 output-{0}
141 use Fcntl qw
/:flock :seek/;
143 use Getopt
::Long qw
/:config no_ignore_case no_bundling no_getopt_compat no_auto_abbrev require_order/;
144 use IPC
::Run qw
/run/;
153 'retry' => \
$OptRetry,
154 '1|oneshot|one-shot' => sub { $OptShots = 1; },
155 'debug' => \
$OptDebug,
156 'help' => sub { pod2usage
(-exitval
=>0, -verbose
=>99); },
157 ) or pod2usage
(-exitval
=>2, -verbose
=>99);
159 pod2usage
(-exitval
=>2, -verbose
=>99) unless scalar @ARGV >= 2;
164 warn @_ if $OptDebug;
167 sub deriv_processing_status
169 my $child_status = shift;
170 my $status = sprintf '%3d', WEXITSTATUS
($child_status);
171 $status = sprintf '!%02x', WTERMSIG
($child_status) if WIFSIGNALED
($child_status);
178 my $opts = shift; # supported opts: rw, no_create, lock
179 my $mode = '<'; # default mode is read-only, no-create
182 if(not $opts->{'no_create'})
184 open my $fh, '>>', $path or die "$0: $path: $!\n";
190 open my $fh, $mode, $path or die "$0: $path: $!\n";
191 seek $fh, 0, SEEK_SET
or die "$0: seek: $path: $!\n";
194 flock $fh, LOCK_EX
or die "$0: flock: $path: $!\n";
199 sub extend_resultsfile
201 my $fname = $ResultsFile;
203 my $extended_size = shift;
205 seek $fh, 0, SEEK_END
;
207 my $endpos_last_complete_record = int($size / $ResultsRecordLength) * $ResultsRecordLength;
208 my $records_to_append = ($extended_size - $endpos_last_complete_record) / $ResultsRecordLength;
209 debug_msg
"go to offset $endpos_last_complete_record to extend by $records_to_append records\n";
210 seek $fh, $size, SEEK_SET
;
211 # fill up with empty status records
212 print {$Results_fh} " \n" x
$records_to_append or die "$0: write: $fname: $!\n";
219 die "$0: size mismatch: length(\"$status\") != $ResultsRecordLength - 1\n" if length($status) != $ResultsRecordLength - 1;
221 my $offset = $linenum * $ResultsRecordLength;
222 debug_msg
"go to offset $offset to record data # $linenum 's status \"$status\"\n";
223 seek $Results_fh, $offset, SEEK_SET
;
227 # results file is not big enough, let's extend
228 extend_resultsfile
($Results_fh, $offset);
229 seek $Results_fh, $offset, SEEK_SET
;
231 print {$Results_fh} "$status\n" or die "$0: write: $ResultsFile: $!\n";
236 # TODO use index file, if exists, to seek in.
238 my $asked_line = shift;
239 my $fh = fopen
$InputFile, {no_create
=>1,};
242 while(my $line = <$fh>)
244 if($linenum == $asked_line)
256 sub count_input_records
258 # TODO use index file, if exists.
260 my $fh = fopen
$InputFile, {no_create
=>1,};
262 $linenum++ while scalar <$fh>;
267 sub processing_lockfile_name
269 my $processing_number = shift;
270 return "$InputFile.uniproc.$processing_number";
273 sub still_in_progress
275 my $processing_number = shift;
276 my $lockfile = processing_lockfile_name
($processing_number);
277 open my $fh, '<', $lockfile or return 0;
278 my $lock_ok = flock $fh, LOCK_EX
|LOCK_NB
;
283 sub get_next_data_number
285 debug_msg
"go to offset ".($FirstUnprocessed*$ResultsRecordLength)." to read status of record # $FirstUnprocessed\n";
286 seek $Results_fh, $FirstUnprocessed*$ResultsRecordLength, SEEK_SET
;
288 my $record_num = $FirstUnprocessed;
292 my $nbytes = read $Results_fh, $result, $ResultsRecordLength;
293 debug_msg
"read only $nbytes bytes \"$result\" at record $record_num\n" if $nbytes < $ResultsRecordLength;
294 last if $nbytes < $ResultsRecordLength;
299 $FirstUnprocessed = $record_num;
300 debug_msg
"uninitialized $record_num\n";
305 # check if still locked
306 if(not still_in_progress
($record_num))
308 debug_msg
"crashed $record_num\n";
312 if($OptRetry and ($result =~ /^!/ or ($result =~ /^\s*\d+$/ and $result > 0)))
314 $FirstUnprocessed = $record_num;
315 debug_msg
"retry $record_num\n";
322 # check here if there are more input data than result records
323 my $input_records = count_input_records
();
324 if($record_num < $input_records)
326 extend_resultsfile
($Results_fh, $input_records * $ResultsRecordLength);
327 $FirstUnprocessed = $record_num;
328 debug_msg
"new $record_num\n";
333 debug_msg
"no more input. input_records=$input_records\n";
338 ($InputFile, $Command, @Args) = @ARGV;
340 $FirstUnprocessed = 0;
341 $ResultsRecordLength = 4;
342 $ResultsFile = "$InputFile.uniproc";
346 while(not defined $OptShots or $num_shots < $OptShots)
348 $Results_fh = fopen
$ResultsFile, {rw
=>1, lock=>1};
349 my $LineNum = get_next_data_number
();
350 last if not defined $LineNum;
352 my $Data = input_data
($LineNum);
354 my $InprogressFile = processing_lockfile_name
($LineNum);
355 my $inprogress_fh = fopen
$InprogressFile, {rw
=>1, lock=>1};
357 record_status
($LineNum, '...');
360 $ENV{'UNIPROC_DATANUM'} = $LineNum;
361 run
[$Command, @Args, $Data];
364 $Results_fh = fopen
$ResultsFile, {rw
=>1, lock=>1};
365 record_status
($LineNum, deriv_processing_status
($status));
368 unlink $InprogressFile;
369 close $inprogress_fh;