From abf537adcb73586dc7ca41f2c57c46aaaac3b970 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Wed, 15 Feb 2017 21:08:34 +0000 Subject: [PATCH] use bulk copy in pipe mode while testing the new pipe mode, it turned out that reading line- by-line, piping line-by-line, and writing line-by-line has a huge syscall overhead. for example just echoing the input is 500x slower than busybox cat. that's probably not a big problem whit tasks that take considerable time themselves, but if you want to parallelize simple tasks this overhead will dominate the runtime. what we now do is reading a big chunk of stdin (16KB) into a buffer that's page-aligned due to usage of mmap(), and in case pipe-mode is used we just pass the biggest possible hunk to one task. in non-pipe mode (that means command line argument permutation is used, we're forced to process line-by-line anyway, so in that case we just pass line-after-line from our readbuf on, until we can not find any newlines. the non-processed part of the buffer will then be copied to a memory area just before the read buffer, and another chunk will be fetched. passing a 16KB chunk to a single task is not optimal for all use cases. for example the total input might be less than that but you're using long-running tasks, that you want to evenly distribute among several cores. in that case all input will be passed to the first job and all other jobs idle. thus the plan is to add an option to explicitly enable this mode. right now if pipe mode is used, all options that rely on the line number counter, such as -skip, etc, are broken as well, since it is currently unknown how many lines are passed on. --- jobflow.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/jobflow.c b/jobflow.c index 8351daf..8c8de3d 100644 --- a/jobflow.c +++ b/jobflow.c @@ -42,6 +42,7 @@ along with this program. If not, see . #include #include #include +#include /* process handling */ @@ -580,6 +581,24 @@ static int dispatch_line(char* inbuf, size_t len, char** argv) { return 1; } +static char* mystrnchr(const char *in, int ch, size_t end) { + const char *e = in+end; + const char *p = in; + while(p != e && *p != ch) p++; + if(*p == ch) return (char*)p; + return 0; +} +static char* mystrnrchr(const char *in, int ch, size_t end) { + const char *e = in+end-1; + const char *p = in; + while(p != e && *e != ch) e--; + if(*e == ch) return (char*)e; + return 0; +} + +#define BULK_KB 16 +#define BULK_BUFSZ BULK_KB*1024 + int main(int argc, char** argv) { unsigned i; @@ -627,11 +646,43 @@ int main(int argc, char** argv) { prog_state.lineno = 0; - char inbuf[4096]; - while(fgets(inbuf, sizeof(inbuf), stdin)) { - if(!dispatch_line(inbuf, strlen(inbuf), argv)) break; + size_t left = 0; + + char *mem = mmap(NULL, BULK_BUFSZ*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0); + char *buf1 = mem; + char *buf2 = mem+BULK_BUFSZ; + char *in, *inbuf; + + while(1) { + inbuf = buf1+BULK_BUFSZ-left; + memcpy(inbuf, buf2+BULK_BUFSZ-left, left); + ssize_t n = read(0, buf2, BULK_BUFSZ); + if(n == -1) { + perror("read"); + goto out; + } + left += n; + in = inbuf; + while(left) { + char *p; + if(!prog_state.stdin_pipe) + p = mystrnchr (in, '\n', left); + else + p = mystrnrchr(in, '\n', left); + if(!p) break; + ptrdiff_t diff = (p - in) + 1; + if(!dispatch_line(in, diff, argv)) goto out; + left -= diff; + in += diff; + } + if(!n) { + if(left) dispatch_line(in, left, argv); + break; + } } + out: + if(prog_state.stdin_pipe) { close_pipes(); } -- 2.11.4.GIT