From f5b3a33cf0aead6a943f948ba49bb3b19a2e3c92 Mon Sep 17 00:00:00 2001 From: Andreas Hrubak Date: Tue, 30 Jul 2024 01:28:22 +0200 Subject: [PATCH] WIP: uniproc --- user-tools/uniproc | 249 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100755 user-tools/uniproc diff --git a/user-tools/uniproc b/user-tools/uniproc new file mode 100755 index 0000000..e2e3f3b --- /dev/null +++ b/user-tools/uniproc @@ -0,0 +1,249 @@ +#!/usr/bin/env perl + +=pod + +=head1 NAME + +uniproc - Universal data processing tool + +=head1 SYNOPSIS + +uniproc [I] I I [I] + +=head1 DESCRIPTION + +Take each line from I as B, +pass each piece of B to I as the last argument, +then record the exit status. +Can be well parallelized. + +Use a wrapper script for I if you want to: + +=over 4 + +=item save I's output as well. By default it goes to STDOUT. + +=item pass B on the STDIN or in environment variable instead of command argument. + +=back + +Maybe interrupted, then re-run to process the remaining data +or to re-try the failed ones. + +May append new lines to I between executions, +but not edit or reorder old ones, otherwise results get confused. + +=head1 OPTIONS + +=over 4 + +=item --retry + +Process those data which were previously failed. + +=back + +=head1 FILES + +It maintains I.uniproc file +by writing the processing status of each lines of input data in it line-by-line. +Processing status is either: + +=over 4 + +=item all spaces - processing not yet started + +=item periods - in progress + +=item digits (possibly padded by spaces) - result status (exit code) + +=item exclamation mark (C) followed by hexadecimal digits - termination signal (I teminated abnormally) + +=item EOF (ie. fewer lines than input data) - processing not yet started + +=back + +I.uniproc is locked while read/write to ensure consistency. +I.uniproc.B are the name of the files which hold the lock for the currently in-progress processes, +where B is the line number of the corresponding piece of data in I. +A lock is held on each of these I.proc.B files by the respective instance of I +to detect if the processing is still going or the process crashed. + +=cut + + +use Fcntl qw/:flock :seek/; +use Data::Dumper; +use Getopt::Long qw/:config no_ignore_case no_bundling no_getopt_compat no_auto_abbrev require_order/; +use IPC::Run qw/run/; +use POSIX; + +$OptRetry = 0; + +GetOptions( + 'retry' => \$OptRetry, + 'help' => sub { pod2usage(-exitval=>0, -verbose=>99); }, +) or pod2usage(-exitval=>2, -verbose=>99); + +pod2usage(-exitval=>2, -verbose=>99) unless scalar @ARGV >= 2; + +sub get_next_data +{ + +} + +sub deriv_processing_status +{ + my $child_status = shift; + my $status = sprintf '%3d', WEXITSTATUS($child_status); + $status = sprintf '!%02x', WTERMSIG($child_status) if WIFSIGNALED($child_status); + return $status; +} + +sub fopen +{ + my $path = shift; + my $opts = shift; # supported opts: rw, no_create, lock + my $mode = '<'; # default mode is read-only, no-create + if($opts->{'rw'}) + { + if(not $opts->{'no_create'}) + { + open my $fh, '>>', $path or die "$0: $path: $!\n"; + close $fh; + } + $mode = '+<'; + } + + open my $fh, $mode, $path or die "$0: $path: $!\n"; + seek $fh, 0, SEEK_SET or die "$0: seek: $path: $!\n"; + if($opts->{'lock'}) + { + flock $fh, LOCK_EX or die "$0: flock: $path: $!\n"; + } + return $fh +} + +sub extend_resultsfile +{ + my $fname = $ResultsFile; + my $fh = shift; + my $extended_size = shift; + + seek $fh, 0, SEEK_END; + my $size = tell $fh; + # round down to the nearest complete record + $size = int($size / $ResultsRecordLength); + seek $fh, $size, SEEK_SET; + # fill up with empty status records + print {$Results_fh} " \n" x (($extended_size - $size) / $ResultsRecordLength) or die "$0: write: $fname: $!\n"; +} + +sub record_status +{ + my $linenum = shift; + my $status = shift; # must be ($ResultsRecordLength - 1) bytes! + my $offset = $linenum * $ResultsRecordLength; + seek $Results_fh, $offset, SEEK_SET; + if(eof $Results_fh) + { + # results file is not big enough, let's extend + extend_resultsfile($Results_fh, $offset); + } + seek $Results_fh, $offset, SEEK_SET; + print {$Results_fh} "$status\n" die "$0: write: $ResultsFile: $!\n"; +} + +sub input_data +{ + my $asked_line = shift; + my $fh = fopen $InputFile, {no_create=>1,}; + my $linenum = 0; + while(my $line = <$InputFile>) + { + if($linenum == $asked_line) + { + close $fh; + return $line; + } + } + close $fh; + return undef; +} + +sub count_input_records +{ + my $fh = fopen $InputFile, {no_create=>1,}; + my $linenum = 0; + $linenum++ while <$InputFile>; + close $fh; + return $linenum; +} + +sub get_next_data +{ + seek $Results_fh, $FirstUnprocessed*$ResultsRecordLength, SEEK_SET; + + my $record_num = $FirstUnprocessed; + my $result; + while(1) + { + my $nbytes = read $Results_fh, $result, $ResultsRecordLength; + last if $nbytes < $ResultsRecordLength; + chomp $result; + + if($result eq ' ') + { + $FirstUnprocessed = $record_num; + return ($record_num, input_data($record_num)); + } + if($result eq '...') + { + # check if still locked + # TODO + } + + $record_num++; + } + + # check here if there are more input data than result records + my $input_records = count_input_records; + if($record_num < $input_records) + { + extend_resultsfile($Results_fh, $input_records * $ResultsRecordLength); + TODO FirstUnprocessed + TODO return + } + TODO no more input data +} + + +($InputFile, $Command, @Args) = @ARGV; + +$FirstUnprocessed = 0; +$ResultsRecordLength = 4; +$ResultsFile = "$InputFile.uniproc"; +$Results_fh = undef; + +while(1) +{ + $Results_fh = fopen $ResultsFile, {rw=>1, lock=>1}; + my ($LineNum, $Data) = get_next_data; + last if not defined $LineNum; + + my $InprogressFile = "$InputFile.uniproc.$LineNum"; + my $inprogress_fh = fopen $InprogressFile, {rw=>1, lock=>1}; + + record_status($LineNum, '...'); + close $Results_fh; + + run [$Command, @Args]; + my $status = $?; + + $Results_fh = fopen $ResultsFile, {rw=>1, lock=>1}; + record_status($LineNum, deriv_processing_status($status)); + close $Results_fh; + + close $inprogress_fh; + unlink $InprogressFile; +} -- 2.11.4.GIT