bugfixes, features, documentation, examples, new tool
[hband-tools.git] / user-tools / uniproc
blobe2dc332f7d0b398a61fd37bd6ee07ac6da98485b
1 #!/usr/bin/env perl
3 =pod
5 =head1 NAME
7 uniproc - Universal data processing tool
9 =head1 SYNOPSIS
11 uniproc [I<OPTIONS>] I<INPUTFILE> I<COMMAND> [I<ARGS>]
13 =head1 DESCRIPTION
15 Take each line from I<INPUTFILE> as B<DATA>,
16 pass each piece of B<DATA> to I<COMMAND> as the last argument,
17 then record the exit status.
19 Can be well parallelized.
20 uniproc(1) itself does not run multiple instances of I<COMMAND> in parallel, just in series,
21 but if you start multiple instances of uniproc(1), then you can run I<COMMAND>s concurrently.
22 Locking ensures no overlapping data being processes.
24 Use a wrapper command/script for I<COMMAND> if you want either of these:
26 =over 4
28 =item save I<COMMAND>'s output as well.
30 By default it goes to STDOUT.
31 See redirexec(1) for example.
33 =item pass B<DATA> on the STDIN or in environment variable instead of command argument.
35 See args2env(1) for example.
37 =back
39 If re-run after an interrupt, won't process already processed data.
40 But you may re-try the failed ones by B<--retry> option.
42 The user may append new lines of data to I<INPUTFILE> between executions or during runtime,
43 but not edit or reorder old ones, otherwise results get confused.
45 =head1 OPTIONS
47 =over 4
49 =item --retry
51 Process those data which were previously failed (according to B<< I<INPUTFILE>.uniproc >> state file).
53 =back
55 =head1 FILES
57 It maintains I<INPUTFILE>.uniproc file
58 by writing the processing status of each lines of input data in it line-by-line.
59 Processing status is either:
61 =over 4
63 =item all spaces ( )
65 processing not yet started
67 =item periods (...)
69 in progress
71 =item digits, possibly padded by spaces ( 0)
73 result status (exit code)
75 =item exclamation mark (C<!>) followed by hexadecimal digits (!0f)
77 termination signal (I<COMMAND> teminated abnormally)
79 =item EOF (ie. fewer lines than input data)
81 processing not yet started
83 =back
85 I<INPUTFILE>.uniproc is locked while read/write to ensure consistency.
86 I<INPUTFILE>.uniproc.B<NUM> are the name of the files which hold the lock for the currently in-progress processes,
87 where B<NUM> is the line number of the corresponding piece of data in I<INPUTFILE>.
88 A lock is held on each of these I<INPUTFILE>.proc.B<NUM> files by the respective instance of I<COMMAND>
89 to detect if the processing is still going or the process crashed.
91 =head1 LIMITATION
93 Due to currently used locking mechanism (Fcntl(3perl)), running on multiple hosts may void locking.
95 =head1 ENVIRONMENT
97 When running I<COMMAND>, the following environment is set:
99 =over 4
101 =item UNIPROC_DATANUM
103 Number of the particular piece of data (ie. line number in I<INPUTFILE>)
104 which is need to be processed by the current process.
106 =back
108 =head1 EXAMPLES
110 Display the data processing status before each line of data:
112 paste datafile.uniproc datafile
114 Record output of data processing into a file per each piece of data:
116 uniproc datafile sh -c 'some-command "$@" | tee output-$UNIPROC_DATANUM' --
118 uniproc datafile substenv -e UNIPROC_DATANUM redirexec '1:a:file:output-$UNIPROC_DATANUM' some-command
120 Display data number, processing status, input data, (last line of) output data in a table:
122 join -t $'\t' <(nl -ba -v0 datafile.uniproc) <(nl -ba -v0 datafile) | foreach -t --prefix-add-data --prefix-add-tab tail -n1 output-{0}
124 =cut
127 use Fcntl qw/:flock :seek/;
128 use Data::Dumper;
129 use Getopt::Long qw/:config no_ignore_case no_bundling no_getopt_compat no_auto_abbrev require_order/;
130 use IPC::Run qw/run/;
131 use POSIX;
132 use Pod::Usage;
134 $OptRetry = 0;
135 $OptDebug = 0;
137 GetOptions(
138 'retry' => \$OptRetry,
139 'debug' => \$OptDebug,
140 'help' => sub { pod2usage(-exitval=>0, -verbose=>99); },
141 ) or pod2usage(-exitval=>2, -verbose=>99);
143 pod2usage(-exitval=>2, -verbose=>99) unless scalar @ARGV >= 2;
146 sub debug_msg
148 warn @_ if $OptDebug;
151 sub deriv_processing_status
153 my $child_status = shift;
154 my $status = sprintf '%3d', WEXITSTATUS($child_status);
155 $status = sprintf '!%02x', WTERMSIG($child_status) if WIFSIGNALED($child_status);
156 return $status;
159 sub fopen
161 my $path = shift;
162 my $opts = shift; # supported opts: rw, no_create, lock
163 my $mode = '<'; # default mode is read-only, no-create
164 if($opts->{'rw'})
166 if(not $opts->{'no_create'})
168 open my $fh, '>>', $path or die "$0: $path: $!\n";
169 close $fh;
171 $mode = '+<';
174 open my $fh, $mode, $path or die "$0: $path: $!\n";
175 seek $fh, 0, SEEK_SET or die "$0: seek: $path: $!\n";
176 if($opts->{'lock'})
178 flock $fh, LOCK_EX or die "$0: flock: $path: $!\n";
180 return $fh
183 sub extend_resultsfile
185 my $fname = $ResultsFile;
186 my $fh = shift;
187 my $extended_size = shift;
189 seek $fh, 0, SEEK_END;
190 my $size = tell $fh;
191 my $endpos_last_complete_record = int($size / $ResultsRecordLength) * $ResultsRecordLength;
192 my $records_to_append = ($extended_size - $endpos_last_complete_record) / $ResultsRecordLength;
193 debug_msg "go to offset $endpos_last_complete_record to extend by $records_to_append records\n";
194 seek $fh, $size, SEEK_SET;
195 # fill up with empty status records
196 print {$Results_fh} " \n" x $records_to_append or die "$0: write: $fname: $!\n";
199 sub record_status
201 my $linenum = shift;
202 my $status = shift;
203 die "$0: size mismatch: length(\"$status\") != $ResultsRecordLength - 1\n" if length($status) != $ResultsRecordLength - 1;
205 my $offset = $linenum * $ResultsRecordLength;
206 debug_msg "go to offset $offset to record data # $linenum 's status \"$status\"\n";
207 seek $Results_fh, $offset, SEEK_SET;
208 if(eof $Results_fh)
210 debug_msg "eof\n";
211 # results file is not big enough, let's extend
212 extend_resultsfile($Results_fh, $offset);
213 seek $Results_fh, $offset, SEEK_SET;
215 print {$Results_fh} "$status\n" or die "$0: write: $ResultsFile: $!\n";
218 sub input_data
220 # TODO use index file, if exists, to seek in.
222 my $asked_line = shift;
223 my $fh = fopen $InputFile, {no_create=>1,};
224 my $linenum = 0;
225 my $data = undef;
226 while(my $line = <$fh>)
228 if($linenum == $asked_line)
230 $data = $line;
231 chomp $data;
232 last;
234 $linenum++;
236 close $fh;
237 return $data;
240 sub count_input_records
242 # TODO use index file, if exists.
244 my $fh = fopen $InputFile, {no_create=>1,};
245 my $linenum = 0;
246 $linenum++ while scalar <$fh>;
247 close $fh;
248 return $linenum;
251 sub processing_lockfile_name
253 my $processing_number = shift;
254 return "$InputFile.uniproc.$processing_number";
257 sub still_in_progress
259 my $processing_number = shift;
260 my $lockfile = processing_lockfile_name($processing_number);
261 open my $fh, '<', $lockfile or return 0;
262 my $lock_ok = flock $fh, LOCK_EX|LOCK_NB;
263 close $fh;
264 return not $lock_ok;
267 sub get_next_data_number
269 debug_msg "go to offset ".($FirstUnprocessed*$ResultsRecordLength)." to read status of record # $FirstUnprocessed\n";
270 seek $Results_fh, $FirstUnprocessed*$ResultsRecordLength, SEEK_SET;
272 my $record_num = $FirstUnprocessed;
273 my $result;
274 while(1)
276 my $nbytes = read $Results_fh, $result, $ResultsRecordLength;
277 debug_msg "read only $nbytes bytes \"$result\" at record $record_num\n" if $nbytes < $ResultsRecordLength;
278 last if $nbytes < $ResultsRecordLength;
279 chomp $result;
281 if($result eq ' ')
283 $FirstUnprocessed = $record_num;
284 debug_msg "uninitialized $record_num\n";
285 return $record_num;
287 if($result eq '...')
289 # check if still locked
290 if(not still_in_progress($record_num))
292 debug_msg "crashed $record_num\n";
293 return $record_num;
296 if($OptRetry and ($result =~ /^!/ or ($result =~ /^\s*\d+$/ and $result > 0)))
298 $FirstUnprocessed = $record_num;
299 debug_msg "retry $record_num\n";
300 return $record_num;
303 $record_num++;
306 # check here if there are more input data than result records
307 my $input_records = count_input_records();
308 if($record_num < $input_records)
310 extend_resultsfile($Results_fh, $input_records * $ResultsRecordLength);
311 $FirstUnprocessed = $record_num;
312 debug_msg "new $record_num\n";
313 return $record_num;
316 # no more input data
317 debug_msg "no more input. input_records=$input_records\n";
318 return undef;
322 ($InputFile, $Command, @Args) = @ARGV;
324 $FirstUnprocessed = 0;
325 $ResultsRecordLength = 4;
326 $ResultsFile = "$InputFile.uniproc";
327 $Results_fh = undef;
329 while(1)
331 $Results_fh = fopen $ResultsFile, {rw=>1, lock=>1};
332 my $LineNum = get_next_data_number();
333 last if not defined $LineNum;
335 my $Data = input_data($LineNum);
337 my $InprogressFile = processing_lockfile_name($LineNum);
338 my $inprogress_fh = fopen $InprogressFile, {rw=>1, lock=>1};
340 record_status($LineNum, '...');
341 close $Results_fh;
343 $ENV{'UNIPROC_DATANUM'} = $LineNum;
344 run [$Command, @Args, $Data];
345 my $status = $?;
347 $Results_fh = fopen $ResultsFile, {rw=>1, lock=>1};
348 record_status($LineNum, deriv_processing_status($status));
349 close $Results_fh;
351 unlink $InprogressFile;
352 close $inprogress_fh;