new tool
[hband-tools.git] / user-tools / uniproc
blob0ad9df24869519ecb86ddcf1fb7cf98ec3ccf92e
1 #!/usr/bin/env perl
3 =pod
5 =head1 NAME
7 uniproc - Universal data processing tool
9 =head1 SYNOPSIS
11 uniproc [I<OPTIONS>] I<INPUTFILE> I<COMMAND> [I<ARGS>]
13 =head1 DESCRIPTION
15 Take each line from I<INPUTFILE> as B<DATA>,
16 pass each piece of B<DATA> to I<COMMAND> as the last argument,
17 then record the exit status.
18 Can be well parallelized.
20 Use a wrapper command/script for I<COMMAND> if you want either of these:
22 =over 4
24 =item save I<COMMAND>'s output as well. By default it goes to STDOUT.
26 =item pass B<DATA> on the STDIN or in environment variable instead of command argument.
28 =back
30 Maybe interrupted, then re-run to process the remaining data
31 or to re-try the failed ones.
33 May append new lines to I<INPUTFILE> between executions,
34 but not edit or reorder old ones, otherwise results get confused.
36 =head1 OPTIONS
38 =over 4
40 =item --retry
42 Process those data which were previously failed.
44 =back
46 =head1 FILES
48 It maintains I<INPUTFILE>.uniproc file
49 by writing the processing status of each lines of input data in it line-by-line.
50 Processing status is either:
52 =over 4
54 =item all spaces
56 processing not yet started
58 =item periods
60 in progress
62 =item digits (possibly padded by spaces)
64 result status (exit code)
66 =item exclamation mark (C<!>) followed by hexadecimal digits
68 termination signal (I<COMMAND> teminated abnormally)
70 =item EOF (ie. fewer lines than input data)
72 processing not yet started
74 =back
76 I<INPUTFILE>.uniproc is locked while read/write to ensure consistency.
77 I<INPUTFILE>.uniproc.B<NUM> are the name of the files which hold the lock for the currently in-progress processes,
78 where B<NUM> is the line number of the corresponding piece of data in I<INPUTFILE>.
79 A lock is held on each of these I<INPUTFILE>.proc.B<NUM> files by the respective instance of I<COMMAND>
80 to detect if the processing is still going or the process crashed.
82 =cut
85 use Fcntl qw/:flock :seek/;
86 use Data::Dumper;
87 use Getopt::Long qw/:config no_ignore_case no_bundling no_getopt_compat no_auto_abbrev require_order/;
88 use IPC::Run qw/run/;
89 use POSIX;
90 use Pod::Usage;
92 $OptRetry = 0;
94 GetOptions(
95 'retry' => \$OptRetry,
96 'help' => sub { pod2usage(-exitval=>0, -verbose=>99); },
97 ) or pod2usage(-exitval=>2, -verbose=>99);
99 pod2usage(-exitval=>2, -verbose=>99) unless scalar @ARGV >= 2;
102 sub debug_msg
104 #warn @_;
107 sub deriv_processing_status
109 my $child_status = shift;
110 my $status = sprintf '%3d', WEXITSTATUS($child_status);
111 $status = sprintf '!%02x', WTERMSIG($child_status) if WIFSIGNALED($child_status);
112 return $status;
115 sub fopen
117 my $path = shift;
118 my $opts = shift; # supported opts: rw, no_create, lock
119 my $mode = '<'; # default mode is read-only, no-create
120 if($opts->{'rw'})
122 if(not $opts->{'no_create'})
124 open my $fh, '>>', $path or die "$0: $path: $!\n";
125 close $fh;
127 $mode = '+<';
130 open my $fh, $mode, $path or die "$0: $path: $!\n";
131 seek $fh, 0, SEEK_SET or die "$0: seek: $path: $!\n";
132 if($opts->{'lock'})
134 flock $fh, LOCK_EX or die "$0: flock: $path: $!\n";
136 return $fh
139 sub extend_resultsfile
141 my $fname = $ResultsFile;
142 my $fh = shift;
143 my $extended_size = shift;
145 seek $fh, 0, SEEK_END;
146 my $size = tell $fh;
147 my $endpos_last_complete_record = int($size / $ResultsRecordLength) * $ResultsRecordLength;
148 my $records_to_append = ($extended_size - $endpos_last_complete_record) / $ResultsRecordLength;
149 debug_msg "go to offset $endpos_last_complete_record to extend by $records_to_append records\n";
150 seek $fh, $size, SEEK_SET;
151 # fill up with empty status records
152 print {$Results_fh} " \n" x $records_to_append or die "$0: write: $fname: $!\n";
155 sub record_status
157 my $linenum = shift;
158 my $status = shift;
159 die "$0: size mismatch: length(\"$status\") != $ResultsRecordLength - 1\n" if length($status) != $ResultsRecordLength - 1;
161 my $offset = $linenum * $ResultsRecordLength;
162 debug_msg "go to offset $offset to record data # $linenum 's status \"$status\"\n";
163 seek $Results_fh, $offset, SEEK_SET;
164 if(eof $Results_fh)
166 debug_msg "eof\n";
167 # results file is not big enough, let's extend
168 extend_resultsfile($Results_fh, $offset);
169 seek $Results_fh, $offset, SEEK_SET;
171 print {$Results_fh} "$status\n" or die "$0: write: $ResultsFile: $!\n";
174 sub input_data
176 # TODO use index file, if exists, to seek in.
178 my $asked_line = shift;
179 my $fh = fopen $InputFile, {no_create=>1,};
180 my $linenum = 0;
181 my $data = undef;
182 while(my $line = <$fh>)
184 if($linenum == $asked_line)
186 $data = $line;
187 chomp $data;
188 last;
190 $linenum++;
192 close $fh;
193 return $data;
196 sub count_input_records
198 # TODO use index file, if exists.
200 my $fh = fopen $InputFile, {no_create=>1,};
201 my $linenum = 0;
202 $linenum++ while scalar <$fh>;
203 close $fh;
204 return $linenum;
207 sub processing_lockfile_name
209 my $processing_number = shift;
210 return "$InputFile.uniproc.$processing_number";
213 sub still_in_progress
215 my $processing_number = shift;
216 my $lockfile = processing_lockfile_name($processing_number);
217 open my $fh, '<', $lockfile or return 0;
218 my $lock_ok = flock $fh, LOCK_EX|LOCK_NB;
219 close $fh;
220 return not $lock_ok;
223 sub get_next_data_number
225 debug_msg "go to offset ".($FirstUnprocessed*$ResultsRecordLength)." to read status of record # $FirstUnprocessed\n";
226 seek $Results_fh, $FirstUnprocessed*$ResultsRecordLength, SEEK_SET;
228 my $record_num = $FirstUnprocessed;
229 my $result;
230 while(1)
232 my $nbytes = read $Results_fh, $result, $ResultsRecordLength;
233 debug_msg "read only $nbytes bytes \"$result\" at record $record_num\n" if $nbytes < $ResultsRecordLength;
234 last if $nbytes < $ResultsRecordLength;
235 chomp $result;
237 if($result eq ' ')
239 $FirstUnprocessed = $record_num;
240 debug_msg "uninitialized $record_num\n";
241 return $record_num;
243 if($result eq '...')
245 # check if still locked
246 if(not still_in_progress($record_num))
248 debug_msg "crashed $record_num\n";
249 return $record_num;
252 if($OptRetry and ($result =~ /^!/ or ($result =~ /^\s*\d+$/ and $result > 0)))
254 $FirstUnprocessed = $record_num;
255 debug_msg "retry $record_num\n";
256 return $record_num;
259 $record_num++;
262 # check here if there are more input data than result records
263 my $input_records = count_input_records();
264 if($record_num < $input_records)
266 extend_resultsfile($Results_fh, $input_records * $ResultsRecordLength);
267 $FirstUnprocessed = $record_num;
268 debug_msg "new $record_num\n";
269 return $record_num;
272 # no more input data
273 debug_msg "no more input. input_records=$input_records\n";
274 return undef;
278 ($InputFile, $Command, @Args) = @ARGV;
280 $FirstUnprocessed = 0;
281 $ResultsRecordLength = 4;
282 $ResultsFile = "$InputFile.uniproc";
283 $Results_fh = undef;
285 while(1)
287 $Results_fh = fopen $ResultsFile, {rw=>1, lock=>1};
288 my $LineNum = get_next_data_number();
289 last if not defined $LineNum;
291 my $Data = input_data($LineNum);
293 my $InprogressFile = processing_lockfile_name($LineNum);
294 my $inprogress_fh = fopen $InprogressFile, {rw=>1, lock=>1};
296 record_status($LineNum, '...');
297 close $Results_fh;
299 run [$Command, @Args, $Data];
300 my $status = $?;
302 $Results_fh = fopen $ResultsFile, {rw=>1, lock=>1};
303 record_status($LineNum, deriv_processing_status($status));
304 close $Results_fh;
306 unlink $InprogressFile;
307 close $inprogress_fh;