make getpeername() return the original socket address which before it was intercepted
[hband-tools.git] / user-tools / uniproc
blobb0d7e66239d7f181377fd0ea1f5140d63d0790e7
1 #!/usr/bin/env perl
3 =pod
5 =head1 NAME
7 uniproc - Universal data processing tool
9 =head1 SYNOPSIS
11 uniproc [I<OPTIONS>] I<INPUTFILE> I<COMMAND> [I<ARGS>]
13 =head1 DESCRIPTION
15 Take each line from I<INPUTFILE> as B<DATA>,
16 pass each piece of B<DATA> to I<COMMAND> as the last argument,
17 then record the exit status.
19 Can be well parallelized.
20 uniproc(1) itself does not run multiple instances of I<COMMAND> in parallel, just in series,
21 but if you start multiple instances of uniproc(1), then you can run I<COMMAND>s concurrently.
22 Locking ensures no overlapping data being processes.
24 Use a wrapper command/script for I<COMMAND> if you want either of these:
26 =over 4
28 =item save I<COMMAND>'s output as well.
30 By default it goes to STDOUT.
31 See redirexec(1) for example.
33 =item pass B<DATA> on the STDIN or in environment variable instead of command argument.
35 See args2env(1) for example.
37 =back
39 If re-run after an interrupt, won't process already processed data.
40 But you may re-try the failed ones by B<--retry> option.
42 The user may append new lines of data to I<INPUTFILE> between executions or during runtime,
43 but not edit or reorder old ones, otherwise results get confused.
45 =head1 OPTIONS
47 =over 4
49 =item --retry
51 Process those data which were previously failed (according to B<< I<INPUTFILE>.uniproc >> state file) too,
52 besides the unprocessed ones.
54 =item -1, --one-shot
56 Process only 1 item, then exit.
57 Default is to process as many items in series as possible.
59 =back
61 =head1 FILES
63 It maintains I<INPUTFILE>.uniproc file
64 by writing the processing status of each lines of input data in it line-by-line.
65 Processing status is either:
67 =over 4
69 =item all spaces ( )
71 processing not yet started
73 =item periods (...)
75 in progress
77 =item digits, possibly padded by spaces ( 0)
79 result status (exit code)
81 =item exclamation mark (C<!>) followed by hexadecimal digits (!0f)
83 termination signal (I<COMMAND> teminated abnormally)
85 =item EOF (ie. fewer lines than input data)
87 processing not yet started
89 =back
91 I<INPUTFILE>.uniproc is locked while read/write to ensure consistency.
92 I<INPUTFILE>.uniproc.B<NUM> are the name of the files which hold the lock for the currently in-progress processes,
93 where B<NUM> is the line number of the corresponding piece of data in I<INPUTFILE>.
94 A lock is held on each of these I<INPUTFILE>.proc.B<NUM> files by the respective instance of I<COMMAND>
95 to detect if the processing is still going or the process crashed.
97 =head1 LIMITATION
99 Due to currently used locking mechanism (Fcntl(3perl)), running on multiple hosts may void locking.
101 =head1 ENVIRONMENT
103 When running I<COMMAND>, the following environment is set:
105 =over 4
107 =item UNIPROC_DATANUM
109 Number of the particular piece of data (ie. line number in I<INPUTFILE>)
110 which is need to be processed by the current process.
112 =back
114 =head1 EXAMPLES
116 Display the data processing status before each line of data:
118 paste datafile.uniproc datafile
120 How much competed?
122 awk -v total=$(wc -l < datafile) 'BEGIN{ok=ip=fail=0} {if($1==0){ok++} else if($1=="..."){ip++} else if($1!=""){fail++}} END{print "total: "total", completed: "ok" ("(ok*100/total)"%), in-progress: "ip" ("(ip*100/total)"%), failed: "fail" ("(fail*100/total)"%)"}' datafile.uniproc
124 Output:
126 total: 8, completed: 4 (50%), in-progress: 1 (12.5%), failed: 1 (12.5%)
128 Record output of data processing into a file per each piece of data:
130 uniproc datafile sh -c 'some-command "$@" | tee output-$UNIPROC_DATANUM' --
132 uniproc datafile substenv -e UNIPROC_DATANUM redirexec '1:a:file:output-$UNIPROC_DATANUM' some-command
134 Display data number, processing status, input data, (last line of) output data in a table:
136 join -t $'\t' <(nl -ba -v0 datafile.uniproc) <(nl -ba -v0 datafile) | foreach -t --prefix-add-data --prefix-add-tab tail -n1 output-{0}
138 =cut
141 use Fcntl qw/:flock :seek/;
142 use Data::Dumper;
143 use Getopt::Long qw/:config no_ignore_case no_bundling no_getopt_compat no_auto_abbrev require_order/;
144 use IPC::Run qw/run/;
145 use POSIX;
146 use Pod::Usage;
148 $OptRetry = 0;
149 $OptDebug = 0;
150 $OptShots = undef;
152 GetOptions(
153 'retry' => \$OptRetry,
154 '1|oneshot|one-shot' => sub { $OptShots = 1; },
155 'debug' => \$OptDebug,
156 'help' => sub { pod2usage(-exitval=>0, -verbose=>99); },
157 ) or pod2usage(-exitval=>2, -verbose=>99);
159 pod2usage(-exitval=>2, -verbose=>99) unless scalar @ARGV >= 2;
162 sub debug_msg
164 warn @_ if $OptDebug;
167 sub deriv_processing_status
169 my $child_status = shift;
170 my $status = sprintf '%3d', WEXITSTATUS($child_status);
171 $status = sprintf '!%02x', WTERMSIG($child_status) if WIFSIGNALED($child_status);
172 return $status;
175 sub fopen
177 my $path = shift;
178 my $opts = shift; # supported opts: rw, no_create, lock
179 my $mode = '<'; # default mode is read-only, no-create
180 if($opts->{'rw'})
182 if(not $opts->{'no_create'})
184 open my $fh, '>>', $path or die "$0: $path: $!\n";
185 close $fh;
187 $mode = '+<';
190 open my $fh, $mode, $path or die "$0: $path: $!\n";
191 seek $fh, 0, SEEK_SET or die "$0: seek: $path: $!\n";
192 if($opts->{'lock'})
194 flock $fh, LOCK_EX or die "$0: flock: $path: $!\n";
196 return $fh;
199 sub extend_resultsfile
201 my $fname = $ResultsFile;
202 my $fh = shift;
203 my $extended_size = shift;
205 seek $fh, 0, SEEK_END;
206 my $size = tell $fh;
207 my $endpos_last_complete_record = int($size / $ResultsRecordLength) * $ResultsRecordLength;
208 my $records_to_append = ($extended_size - $endpos_last_complete_record) / $ResultsRecordLength;
209 debug_msg "go to offset $endpos_last_complete_record to extend by $records_to_append records\n";
210 seek $fh, $size, SEEK_SET;
211 # fill up with empty status records
212 print {$Results_fh} " \n" x $records_to_append or die "$0: write: $fname: $!\n";
215 sub record_status
217 my $linenum = shift;
218 my $status = shift;
219 die "$0: size mismatch: length(\"$status\") != $ResultsRecordLength - 1\n" if length($status) != $ResultsRecordLength - 1;
221 my $offset = $linenum * $ResultsRecordLength;
222 debug_msg "go to offset $offset to record data # $linenum 's status \"$status\"\n";
223 seek $Results_fh, $offset, SEEK_SET;
224 if(eof $Results_fh)
226 debug_msg "eof\n";
227 # results file is not big enough, let's extend
228 extend_resultsfile($Results_fh, $offset);
229 seek $Results_fh, $offset, SEEK_SET;
231 print {$Results_fh} "$status\n" or die "$0: write: $ResultsFile: $!\n";
234 sub input_data
236 # TODO use index file, if exists, to seek in.
238 my $asked_line = shift;
239 my $fh = fopen $InputFile, {no_create=>1,};
240 my $linenum = 0;
241 my $data = undef;
242 while(my $line = <$fh>)
244 if($linenum == $asked_line)
246 $data = $line;
247 chomp $data;
248 last;
250 $linenum++;
252 close $fh;
253 return $data;
256 sub count_input_records
258 # TODO use index file, if exists.
260 my $fh = fopen $InputFile, {no_create=>1,};
261 my $linenum = 0;
262 $linenum++ while scalar <$fh>;
263 close $fh;
264 return $linenum;
267 sub processing_lockfile_name
269 my $processing_number = shift;
270 return "$InputFile.uniproc.$processing_number";
273 sub still_in_progress
275 my $processing_number = shift;
276 my $lockfile = processing_lockfile_name($processing_number);
277 open my $fh, '<', $lockfile or return 0;
278 my $lock_ok = flock $fh, LOCK_EX|LOCK_NB;
279 close $fh;
280 return not $lock_ok;
283 sub get_next_data_number
285 debug_msg "go to offset ".($FirstUnprocessed*$ResultsRecordLength)." to read status of record # $FirstUnprocessed\n";
286 seek $Results_fh, $FirstUnprocessed*$ResultsRecordLength, SEEK_SET;
288 my $record_num = $FirstUnprocessed;
289 my $result;
290 while(1)
292 my $nbytes = read $Results_fh, $result, $ResultsRecordLength;
293 debug_msg "read only $nbytes bytes \"$result\" at record $record_num\n" if $nbytes < $ResultsRecordLength;
294 last if $nbytes < $ResultsRecordLength;
295 chomp $result;
297 if($result eq ' ')
299 $FirstUnprocessed = $record_num;
300 debug_msg "uninitialized $record_num\n";
301 return $record_num;
303 if($result eq '...')
305 # check if still locked
306 if(not still_in_progress($record_num))
308 debug_msg "crashed $record_num\n";
309 return $record_num;
312 if($OptRetry and ($result =~ /^!/ or ($result =~ /^\s*\d+$/ and $result > 0)))
314 $FirstUnprocessed = $record_num;
315 debug_msg "retry $record_num\n";
316 return $record_num;
319 $record_num++;
322 # check here if there are more input data than result records
323 my $input_records = count_input_records();
324 if($record_num < $input_records)
326 extend_resultsfile($Results_fh, $input_records * $ResultsRecordLength);
327 $FirstUnprocessed = $record_num;
328 debug_msg "new $record_num\n";
329 return $record_num;
332 # no more input data
333 debug_msg "no more input. input_records=$input_records\n";
334 return undef;
338 ($InputFile, $Command, @Args) = @ARGV;
340 $FirstUnprocessed = 0;
341 $ResultsRecordLength = 4;
342 $ResultsFile = "$InputFile.uniproc";
343 $Results_fh = undef;
344 $num_shots = 0;
346 while(not defined $OptShots or $num_shots < $OptShots)
348 $Results_fh = fopen $ResultsFile, {rw=>1, lock=>1};
349 my $LineNum = get_next_data_number();
350 last if not defined $LineNum;
352 my $Data = input_data($LineNum);
354 my $InprogressFile = processing_lockfile_name($LineNum);
355 my $inprogress_fh = fopen $InprogressFile, {rw=>1, lock=>1};
357 record_status($LineNum, '...');
358 close $Results_fh;
360 $ENV{'UNIPROC_DATANUM'} = $LineNum;
361 run [$Command, @Args, $Data];
362 my $status = $?;
364 $Results_fh = fopen $ResultsFile, {rw=>1, lock=>1};
365 record_status($LineNum, deriv_processing_status($status));
366 close $Results_fh;
368 unlink $InprogressFile;
369 close $inprogress_fh;
371 $num_shots++;