From 80483a3178f7eb6a26d48773abc9c93d8ebbc490 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Wed, 15 Feb 2017 23:37:15 +0000 Subject: [PATCH] add a new -bulk command line option which can be used in pipe mode this turns off bulk copy for pipe mode by default, and turns it on only when requested. --- jobflow.c | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/jobflow.c b/jobflow.c index 9fae67e..91f2a26 100644 --- a/jobflow.c +++ b/jobflow.c @@ -107,6 +107,7 @@ typedef struct { jobs is small or smaller than the available threadcount. */ int join_output:1; /* join stdout and stderr of launched jobs into stdout */ int pipe_mode:1; + size_t bulk_bytes; } prog_state_s; prog_state_s prog_state; @@ -326,6 +327,15 @@ static int syntax(void) { " if -buffered, write both stdout and stderr into the same file.\n" " this saves the chronological order of the output, and the combined output\n" " will only be printed to stdout.\n" + "-bulk=XXX\n" + " do bulk copies with a buffer of XXX bytes. only usable in pipe mode.\n" + " this passes (almost) the entire buffer to the next scheduled job.\n" + " the passed buffer will be truncated to the last line break boundary,\n" + " so jobs always get entire lines to work with.\n" + " this option is useful when you have huge input files and relatively short\n" + " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n" + " XXX must be a multiple of 4KB. the suffixes G/M/K are detected.\n" + " actual memory allocation will be twice the amount passed.\n" "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n" " sets the rlimit of the new created processes.\n" " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n" @@ -426,6 +436,15 @@ static int parse_args(int argc, char** argv) { prog_state.join_output = 1; } + prog_state.bulk_bytes = 0; + op_temp = op_get(op, SPL("bulk")); + if(op_temp) { + SPDECLAREC(value, op_temp); + prog_state.bulk_bytes = parse_human_number(value); + if(prog_state.bulk_bytes % 4096) + die("bulk size must be a multiple of 4096\n"); + } + prog_state.limits = NULL; op_temp = op_get(op, SPL("limits")); if(op_temp) { @@ -597,9 +616,6 @@ static char* mystrnrchr(const char *in, int ch, size_t end) { return 0; } -#define BULK_KB 16 -#define BULK_BUFSZ BULK_KB*1024 - int main(int argc, char** argv) { unsigned i; @@ -648,18 +664,19 @@ int main(int argc, char** argv) { prog_state.lineno = 0; size_t left = 0; + const size_t chunksize = prog_state.bulk_bytes ? prog_state.bulk_bytes : 16*1024; - char *mem = mmap(NULL, BULK_BUFSZ*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0); + char *mem = mmap(NULL, chunksize*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0); char *buf1 = mem; - char *buf2 = mem+BULK_BUFSZ; + char *buf2 = mem+chunksize; char *in, *inbuf; int exitcode = 1; while(1) { - inbuf = buf1+BULK_BUFSZ-left; - memcpy(inbuf, buf2+BULK_BUFSZ-left, left); - ssize_t n = read(0, buf2, BULK_BUFSZ); + inbuf = buf1+chunksize-left; + memcpy(inbuf, buf2+chunksize-left, left); + ssize_t n = read(0, buf2, chunksize); if(n == -1) { perror("read"); goto out; @@ -668,10 +685,11 @@ int main(int argc, char** argv) { in = inbuf; while(left) { char *p; - if(!prog_state.pipe_mode) - p = mystrnchr (in, '\n', left); - else + if(prog_state.pipe_mode && prog_state.bulk_bytes) p = mystrnrchr(in, '\n', left); + else + p = mystrnchr (in, '\n', left); + if(!p) break; ptrdiff_t diff = (p - in) + 1; if(!dispatch_line(in, diff, argv)) @@ -683,7 +701,7 @@ int main(int argc, char** argv) { if(left) dispatch_line(in, left, argv); break; } - if(left > BULK_BUFSZ) { + if(left > chunksize) { dprintf(2, "error: input line length exceeds buffer size\n"); goto out; } -- 2.11.4.GIT