From a4942ceaa6f50e4d8ebb5bf7eb0b5bb685e0679a Mon Sep 17 00:00:00 2001 From: rofl0r Date: Wed, 8 Jun 2016 17:17:48 +0100 Subject: [PATCH] use blocking wait() instead of sleeping when jobflow was created, it was mainly used for network jobs whose runtimes were dominated by network latency, so it went unnoticed that the polling loop with an arbitrary sleeptime is a very suboptimal design especially when running many short-running jobs with a small process count. an example is bulk conversion of the posix manpages package[0] with gzip. converting the contained ~1100 manpages with a single-process shell script took like 0.7s, but jobflow with -threads=1 wasted a horrible 25 seconds for the same task. the old code looped through all jobs and tested each one in a non-blocking way wether it terminated yet, then went to sleep. now we just wait until any subprocess terminates. this has the desired effect that we never sleep a nanosecond longer than necessary, and indeed the gzip conversion with a single thread now takes exactly the same time than the shell script, and with two threads (and 2 physical CPUs) takes exactly the half of the time. [0] http://www.kernel.org/pub/linux/docs/man-pages/man-pages-posix/ --- jobflow.c | 74 ++++++++++++++++++--------------------------------------------- 1 file changed, 21 insertions(+), 53 deletions(-) diff --git a/jobflow.c b/jobflow.c index fdc4140..6fdde05 100644 --- a/jobflow.c +++ b/jobflow.c @@ -38,13 +38,6 @@ along with this program. If not, see . #include #include -/* defines the amount of milliseconds to sleep between each call to the reaper, - * once all free slots are exhausted */ -#define SLEEP_MS 21 - -/* defines after how many milliseconds a reap of the running processes is obligatory. */ -#define REAP_INTERVAL_MS 100 - /* process handling */ #include @@ -83,7 +76,7 @@ typedef struct { #define MAX_SLOTS 128 typedef struct { - unsigned numthreads; + int numthreads; unsigned threads_running; char* statefile; unsigned skip; @@ -210,37 +203,22 @@ static void reapChilds(void) { job_info* job; int ret, retval; - prog_state.free_slots_count = 0; - + ret = waitpid(-1, &retval, 0); + if(ret == -1) return; + if(!WIFEXITED(retval)) return; for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) { job = sblist_get(prog_state.job_infos, i); - if(job->pid != -1) { - ret = waitpid(job->pid, &retval, WNOHANG); - if(ret != 0) { - // error or changed state. - if(ret == -1) { - perror("waitpid"); - continue; - } - if(!retval) { - //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL); - } else { - //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL); - } - job->pid = -1; - posix_spawn_file_actions_destroy(&job->fa); - //job->passed = 0; - releaseJobSlot(i); - prog_state.threads_running--; - - if(prog_state.buffered) { - dump_output(i, 0); - if(!prog_state.join_output) - dump_output(i, 1); - } - } - } else + if(job->pid == ret) { + job->pid = -1; + posix_spawn_file_actions_destroy(&job->fa); releaseJobSlot(i); + prog_state.threads_running--; + if(prog_state.buffered) { + dump_output(i, 0); + if(!prog_state.join_output) + dump_output(i, 1); + } + } } } @@ -325,6 +303,7 @@ static int parse_args(int argc, char** argv) { return syntax(); op_temp = op_get(op, SPL("threads")); prog_state.numthreads = op_temp ? atoi(op_temp) : 1; + if(prog_state.numthreads <= 0) die("threadcount must be >= 1\n"); op_temp = op_get(op, SPL("statefile")); prog_state.statefile = op_temp; @@ -426,14 +405,12 @@ static int parse_args(int argc, char** argv) { } static void init_queue(void) { - unsigned i; - job_info ji; - - ji.pid = -1; - memset(&ji.fa, 0, sizeof(ji.fa)); + int i; + job_info ji = {.pid = -1}; for(i = 0; i < prog_state.numthreads; i++) { sblist_add(prog_state.job_infos, &ji); + releaseJobSlot(i); } } @@ -480,8 +457,6 @@ int main(int argc, char** argv) { char subst_buf[16][4096]; unsigned max_subst; - struct timeval reapTime; - uint64_t n = 0; unsigned i; unsigned spinup_counter = 0; @@ -492,9 +467,9 @@ int main(int argc, char** argv) { srand(time(NULL)); if(argc > 4096) argc = 4096; + prog_state.threads_running = 0; prog_state.free_slots_count = 0; - gettimestamp(&reapTime); if(parse_args(argc, argv)) return 1; @@ -565,11 +540,7 @@ int main(int argc, char** argv) { } } - while(prog_state.free_slots_count == 0 || mspassed(&reapTime) > REAP_INTERVAL_MS) { - reapChilds(); - gettimestamp(&reapTime); - if(!prog_state.free_slots_count) msleep(SLEEP_MS); - } + while(!prog_state.free_slots_count) reapChilds(); if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) { msleep(rand() % (prog_state.delayedspinup_interval + 1)); @@ -592,10 +563,7 @@ int main(int argc, char** argv) { if(prog_state.delayedflush) write_statefile(n - 1, temp_state); - while(prog_state.threads_running) { - reapChilds(); - if(prog_state.threads_running) msleep(SLEEP_MS); - } + while(prog_state.threads_running) reapChilds(); if(prog_state.subst_entries) sblist_free(prog_state.subst_entries); if(prog_state.job_infos) sblist_free(prog_state.job_infos); -- 2.11.4.GIT