From 91e039b2152a84318d57d32f5ecd254a66c676a0 Mon Sep 17 00:00:00 2001 From: rofl0r Date: Mon, 13 Feb 2017 20:56:57 +0000 Subject: [PATCH] add new -pipe command line option sometimes the called program wants input on stdin, and while it is easily possible to write a short shell script wrapper that takes an argument and passes it to the stdin of that program, it's more convenient to pass input directly. and in some cases it is critical that no cpu time gets lost by launching the extra wrapper program. --- jobflow.c | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 73 insertions(+), 15 deletions(-) diff --git a/jobflow.c b/jobflow.c index b1b7312..899121e 100644 --- a/jobflow.c +++ b/jobflow.c @@ -69,6 +69,7 @@ static int prlimit(int pid, ...) { typedef struct { pid_t pid; + int pipe; posix_spawn_file_actions_t fa; } job_info; @@ -101,6 +102,7 @@ typedef struct { this means faster program execution, but could also be imprecise if the number of jobs is small or smaller than the available threadcount. */ int join_output:1; /* join stdout and stderr of launched jobs into stdout */ + int stdin_pipe:1; } prog_state_s; prog_state_s prog_state; @@ -131,9 +133,25 @@ void launch_job(size_t jobindex, char** argv) { errno = posix_spawn_file_actions_init(&job->fa); if(errno) goto spawn_error; + errno = posix_spawn_file_actions_addclose(&job->fa, 0); if(errno) goto spawn_error; + int pipes[2]; + if(prog_state.stdin_pipe) { + if(pipe(pipes)) { + perror("pipe"); + goto spawn_error; + } + job->pipe = pipes[1]; + errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0); + if(errno) goto spawn_error; + errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]); + if(errno) goto spawn_error; + errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]); + if(errno) goto spawn_error; + } + if(prog_state.buffered) { errno = posix_spawn_file_actions_addclose(&job->fa, 1); if(errno) goto spawn_error; @@ -141,8 +159,10 @@ void launch_job(size_t jobindex, char** argv) { if(errno) goto spawn_error; } - errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0); - if(errno) goto spawn_error; + if(!prog_state.stdin_pipe) { + errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0); + if(errno) goto spawn_error; + } if(prog_state.buffered) { errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); @@ -169,6 +189,7 @@ void launch_job(size_t jobindex, char** argv) { } } } + if(prog_state.stdin_pipe) close(pipes[0]); } static void dump_output(size_t job_id, int is_stderr) { @@ -190,6 +211,23 @@ static void dump_output(size_t job_id, int is_stderr) { } } +static void pass_stdin(stringptr *line) { + static size_t next_child = 0; + if(next_child >= sblist_getsize(prog_state.job_infos)) + next_child = 0; + job_info *job = sblist_get(prog_state.job_infos, next_child); + write(job->pipe, line->ptr, line->size); + next_child++; +} + +static void close_pipes(void) { + size_t i; + for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) { + job_info *job = sblist_get(prog_state.job_infos, i); + close(job->pipe); + } +} + /* wait till a child exits, reap it, and return its job index for slot reuse */ static size_t reap_child(void) { size_t i; @@ -221,13 +259,6 @@ static size_t free_slots(void) { return prog_state.numthreads - prog_state.threads_running; } -static void add_job(char **argv) { - if(free_slots()) - launch_job(prog_state.threads_running, argv); - else - launch_job(reap_child(), argv); -} - __attribute__((noreturn)) static void die(const char* msg) { dprintf(2, msg); @@ -258,8 +289,12 @@ static int syntax(void) { "available options:\n\n" "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n" "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n" - "-exec ./mycommand {}\n" + "-pipe -exec ./mycommand {}\n" "\n" + "-pipe\n" + " child processes receive input on stdin. if this option\n" + " is used, input will be evenly distributed to jobs.\n" + " all jobs will stay alive until EOF is received.\n" "-skip=XXX\n" " XXX=number of entries to skip\n" "-threads=XXX\n" @@ -338,6 +373,9 @@ static int parse_args(int argc, char** argv) { prog_state.delayedflush = 1; } + prog_state.stdin_pipe = 0; + if(op_hasflag(op, SPL("pipe"))) prog_state.stdin_pipe = 1; + op_temp = op_get(op, SPL("delayedspinup")); prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0; @@ -357,11 +395,18 @@ static int parse_args(int argc, char** argv) { prog_state.cmd_startarg = r; } + if(!prog_state.stdin_pipe) + prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16); + // save entries which must be substituted, to save some cycles. - prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16); for(i = r; i < (unsigned) argc; i++) { subst_ent = i - r; - if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) sblist_add(prog_state.subst_entries, &subst_ent); + if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) { + if(prog_state.stdin_pipe) + die("argument substitution must not be used when -pipe option is given\n"); + + sblist_add(prog_state.subst_entries, &subst_ent); + } } } @@ -526,11 +571,14 @@ int main(int argc, char** argv) { dprintf(1, fgets_result); continue; } + stringptr_fromchar(fgets_result, line); - stringptr_chomp(line); - max_subst = 0; + if(!prog_state.stdin_pipe) + stringptr_chomp(line); + if(prog_state.subst_entries) { + max_subst = 0; uint32_t* index; sblist_iter(prog_state.subst_entries, index) { SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]); @@ -560,15 +608,25 @@ int main(int argc, char** argv) { spinup_counter++; } - add_job(cmd_argv); + if(free_slots()) + launch_job(prog_state.threads_running, cmd_argv); + else if(!prog_state.stdin_pipe) + launch_job(reap_child(), cmd_argv); if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) { write_statefile(lineno, temp_state); } + + if(prog_state.stdin_pipe) + pass_stdin(line); } out: + if(prog_state.stdin_pipe) { + close_pipes(); + } + if(prog_state.delayedflush) write_statefile(lineno - 1, temp_state); -- 2.11.4.GIT