factor main loop out of main()
[rofl0r-jobflow.git] / jobflow.c
blobb5f9e9af4c61ab45d2f2993328a70ee7010cf2f5
1 /*
2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.1"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdint.h>
41 #include <stddef.h>
42 #include <errno.h>
43 #include <time.h>
44 #include <assert.h>
46 /* process handling */
48 #include <fcntl.h>
49 #include <spawn.h>
50 #include <sys/wait.h>
51 #include <sys/stat.h>
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid, ...) {
60 (void) pid;
61 dprintf(2, "prlimit() not implemented on this system\n");
62 errno = EINVAL;
63 return -1;
65 #endif
68 #include <sys/time.h>
70 typedef struct {
71 pid_t pid;
72 int pipe;
73 posix_spawn_file_actions_t fa;
74 } job_info;
76 typedef struct {
77 int limit;
78 struct rlimit rl;
79 } limit_rec;
81 typedef struct {
82 char temp_state[256];
83 char* cmd_argv[4096];
84 unsigned long long lineno;
85 unsigned numthreads;
86 unsigned threads_running;
87 char* statefile;
88 unsigned long long skip;
89 sblist* job_infos;
90 sblist* subst_entries;
91 sblist* limits;
92 unsigned cmd_startarg;
93 char* tempdir;
94 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
95 the top value in ms can be supplied via a command line switch.
96 this option makes only sense if the interval is somewhat smaller than the
97 expected runtime of the average job.
98 this option is useful to not overload a network app due to hundreds of
99 parallel connection tries on startup.
101 int buffered:1; /* write stdout and stderr of each task into a file,
102 and print it to stdout once the process ends.
103 this prevents mixing up of the output of multiple tasks. */
104 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
105 this means faster program execution, but could also be imprecise if the number of
106 jobs is small or smaller than the available threadcount. */
107 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
108 int stdin_pipe:1;
109 } prog_state_s;
111 prog_state_s prog_state;
114 extern char** environ;
116 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
117 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
118 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
119 return ret > 0 && (size_t) ret < bufsize;
122 void launch_job(size_t jobindex, char** argv) {
123 char stdout_filename_buf[256];
124 char stderr_filename_buf[256];
125 job_info* job = sblist_get(prog_state.job_infos, jobindex);
127 if(job->pid != -1) return;
129 if(prog_state.buffered) {
130 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
131 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
132 dprintf(2, "temp filename too long!\n");
133 return;
137 errno = posix_spawn_file_actions_init(&job->fa);
138 if(errno) goto spawn_error;
140 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
141 if(errno) goto spawn_error;
143 int pipes[2];
144 if(prog_state.stdin_pipe) {
145 if(pipe(pipes)) {
146 perror("pipe");
147 goto spawn_error;
149 job->pipe = pipes[1];
150 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
151 if(errno) goto spawn_error;
152 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
153 if(errno) goto spawn_error;
154 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
155 if(errno) goto spawn_error;
158 if(prog_state.buffered) {
159 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
160 if(errno) goto spawn_error;
161 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
162 if(errno) goto spawn_error;
165 if(!prog_state.stdin_pipe) {
166 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
167 if(errno) goto spawn_error;
170 if(prog_state.buffered) {
171 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
172 if(errno) goto spawn_error;
173 if(prog_state.join_output)
174 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
175 else
176 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
177 if(errno) goto spawn_error;
180 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
181 if(errno) {
182 spawn_error:
183 job->pid = -1;
184 perror("posix_spawn");
185 } else {
186 prog_state.threads_running++;
187 if(prog_state.limits) {
188 limit_rec* limit;
189 sblist_iter(prog_state.limits, limit) {
190 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
191 perror("prlimit");
195 if(prog_state.stdin_pipe) close(pipes[0]);
198 static void dump_output(size_t job_id, int is_stderr) {
199 char out_filename_buf[256];
200 char buf[4096];
201 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
202 size_t nread;
204 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
206 dst = fopen(out_filename_buf, "r");
207 if(dst) {
208 while((nread = fread(buf, 1, sizeof(buf), dst))) {
209 fwrite(buf, 1, nread, out_stream);
210 if(nread < sizeof(buf)) break;
212 fclose(dst);
213 fflush(out_stream);
217 static void pass_stdin(stringptr *line) {
218 static size_t next_child = 0;
219 if(next_child >= sblist_getsize(prog_state.job_infos))
220 next_child = 0;
221 job_info *job = sblist_get(prog_state.job_infos, next_child);
222 write(job->pipe, line->ptr, line->size);
223 next_child++;
226 static void close_pipes(void) {
227 size_t i;
228 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
229 job_info *job = sblist_get(prog_state.job_infos, i);
230 close(job->pipe);
234 /* wait till a child exits, reap it, and return its job index for slot reuse */
235 static size_t reap_child(void) {
236 size_t i;
237 job_info* job;
238 int ret, retval;
240 do ret = waitpid(-1, &retval, 0);
241 while(ret == -1 || !WIFEXITED(retval));
243 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
244 job = sblist_get(prog_state.job_infos, i);
245 if(job->pid == ret) {
246 job->pid = -1;
247 posix_spawn_file_actions_destroy(&job->fa);
248 prog_state.threads_running--;
249 if(prog_state.buffered) {
250 dump_output(i, 0);
251 if(!prog_state.join_output)
252 dump_output(i, 1);
254 return i;
257 assert(0);
258 return -1;
261 static size_t free_slots(void) {
262 return prog_state.numthreads - prog_state.threads_running;
265 __attribute__((noreturn))
266 static void die(const char* msg) {
267 dprintf(2, msg);
268 exit(1);
271 static unsigned long parse_human_number(stringptr* num) {
272 unsigned long ret = 0;
273 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
274 const char* kmg = "KMG";
275 char* kmgind;
276 if(num && num->size) {
277 ret = atol(num->ptr);
278 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
279 ret *= mul[kmgind - kmg];
281 return ret;
284 static int syntax(void) {
285 dprintf(2,
286 "jobflow " VERSION " (C) rofl0r\n"
287 "------------------\n"
288 "this program is intended to be used as a recipient of another programs output\n"
289 "it launches processes to which the current line can be passed as an argument\n"
290 "using {} for substitution (as in find -exec).\n"
291 "\n"
292 "available options:\n\n"
293 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
294 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
295 "-pipe -exec ./mycommand {}\n"
296 "\n"
297 "-pipe\n"
298 " child processes receive input on stdin. if this option\n"
299 " is used, input will be evenly distributed to jobs.\n"
300 " all jobs will stay alive until EOF is received.\n"
301 "-skip=XXX\n"
302 " XXX=number of entries to skip\n"
303 "-threads=XXX\n"
304 " XXX=number of parallel processes to spawn\n"
305 "-resume\n"
306 " resume from last jobnumber stored in statefile\n"
307 "-statefile=XXX\n"
308 " XXX=filename\n"
309 " saves last launched jobnumber into a file\n"
310 "-delayedflush\n"
311 " only write to statefile whenever all processes are busy,\n"
312 " and at program end\n"
313 "-delayedspinup=XXX\n"
314 " XXX=maximum amount of milliseconds\n"
315 " ...to wait when spinning up a fresh set of processes\n"
316 " a random value between 0 and the chosen amount is used to delay initial\n"
317 " spinup.\n"
318 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
319 " activity on program startup\n"
320 "-buffered\n"
321 " store the stdout and stderr of launched processes into a temporary file\n"
322 " which will be printed after a process has finished.\n"
323 " this prevents mixing up of output of different processes.\n"
324 "-joinoutput\n"
325 " if -buffered, write both stdout and stderr into the same file.\n"
326 " this saves the chronological order of the output, and the combined output\n"
327 " will only be printed to stdout.\n"
328 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
329 " sets the rlimit of the new created processes.\n"
330 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
331 "-exec command with args\n"
332 " everything past -exec is treated as the command to execute on each line of\n"
333 " stdin received. the line can be passed as an argument using {}.\n"
334 " {.} passes everything before the last dot in a line as an argument.\n"
335 " it is possible to use multiple substitutions inside a single argument,\n"
336 " but currently only of one type.\n"
337 "\n"
339 return 1;
342 #undef strtoll
343 #define strtoll(a,b,c) strtoint64(a, strlen(a))
344 static int parse_args(int argc, char** argv) {
345 op_state op_b, *op = &op_b;
346 op_init(op, argc, argv);
347 char *op_temp;
348 if(argc == 1 || op_hasflag(op, SPL("-help")))
349 return syntax();
351 op_temp = op_get(op, SPL("threads"));
352 long long x = op_temp ? strtoll(op_temp,0,10) : 1;
353 if(x <= 0) die("threadcount must be >= 1\n");
354 prog_state.numthreads = x;
356 op_temp = op_get(op, SPL("statefile"));
357 prog_state.statefile = op_temp;
359 op_temp = op_get(op, SPL("skip"));
360 prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
361 if(op_hasflag(op, SPL("resume"))) {
362 if(!prog_state.statefile) die("-resume needs -statefile\n");
363 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
364 FILE *f = fopen(prog_state.statefile, "r");
365 if(f) {
366 char nb[64];
367 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
368 fclose(f);
373 prog_state.delayedflush = 0;
374 if(op_hasflag(op, SPL("delayedflush"))) {
375 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
376 prog_state.delayedflush = 1;
379 prog_state.stdin_pipe = 0;
380 if(op_hasflag(op, SPL("pipe"))) prog_state.stdin_pipe = 1;
382 op_temp = op_get(op, SPL("delayedspinup"));
383 prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0;
385 prog_state.cmd_startarg = 0;
386 prog_state.subst_entries = NULL;
388 if(op_hasflag(op, SPL("exec"))) {
389 uint32_t subst_ent;
390 unsigned i, r = 0;
391 for(i = 1; i < (unsigned) argc; i++) {
392 if(str_equal(argv[i], "-exec")) {
393 r = i + 1;
394 break;
397 if(r && r < (unsigned) argc) {
398 prog_state.cmd_startarg = r;
401 if(!prog_state.stdin_pipe)
402 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
404 // save entries which must be substituted, to save some cycles.
405 for(i = r; i < (unsigned) argc; i++) {
406 subst_ent = i - r;
407 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) {
408 if(prog_state.stdin_pipe)
409 die("argument substitution must not be used when -pipe option is given\n");
411 sblist_add(prog_state.subst_entries, &subst_ent);
416 prog_state.buffered = 0;
417 if(op_hasflag(op, SPL("buffered"))) {
418 prog_state.buffered = 1;
421 prog_state.join_output = 0;
422 if(op_hasflag(op, SPL("joinoutput"))) {
423 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
424 prog_state.join_output = 1;
427 prog_state.limits = NULL;
428 op_temp = op_get(op, SPL("limits"));
429 if(op_temp) {
430 unsigned i;
431 SPDECLAREC(limits, op_temp);
432 stringptrlist* limit_list = stringptr_splitc(limits, ',');
433 stringptrlist* kv;
434 stringptr* key, *value;
435 limit_rec lim;
436 if(stringptrlist_getsize(limit_list)) {
437 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
438 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
439 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
440 if(stringptrlist_getsize(kv) != 2) continue;
441 key = stringptrlist_get(kv, 0);
442 value = stringptrlist_get(kv, 1);
443 if(EQ(key, SPL("mem")))
444 lim.limit = RLIMIT_AS;
445 else if(EQ(key, SPL("cpu")))
446 lim.limit = RLIMIT_CPU;
447 else if(EQ(key, SPL("stack")))
448 lim.limit = RLIMIT_STACK;
449 else if(EQ(key, SPL("fsize")))
450 lim.limit = RLIMIT_FSIZE;
451 else if(EQ(key, SPL("nofiles")))
452 lim.limit = RLIMIT_NOFILE;
453 else
454 die("unknown option passed to -limits");
456 if(getrlimit(lim.limit, &lim.rl) == -1) {
457 perror("getrlimit");
458 die("could not query rlimits");
460 lim.rl.rlim_cur = parse_human_number(value);
461 sblist_add(prog_state.limits, &lim);
462 stringptrlist_free(kv);
464 stringptrlist_free(limit_list);
467 return 0;
470 static void init_queue(void) {
471 unsigned i;
472 job_info ji = {.pid = -1};
474 for(i = 0; i < prog_state.numthreads; i++)
475 sblist_add(prog_state.job_infos, &ji);
478 static void write_statefile(unsigned long long n, const char* tempfile) {
479 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
480 if(fd != -1) {
481 dprintf(fd, "%llu\n", n + 1ULL);
482 close(fd);
483 if(rename(tempfile, prog_state.statefile) == -1)
484 perror("rename");
485 } else
486 perror("open");
489 // returns numbers of substitutions done, -1 on out of buffer.
490 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
491 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
492 size_t i;
493 int ret = 0;
494 for(i = 0; dest_size > 0 && i < source->size; ) {
495 if(stringptr_here(source, i, what)) {
496 if(dest_size < (ssize_t) whit->size) return -1;
497 memcpy(dest, whit->ptr, whit->size);
498 dest += whit->size;
499 dest_size -= whit->size;
500 ret++;
501 i += what->size;
502 } else {
503 *dest = source->ptr[i];
504 dest++;
505 dest_size--;
506 i++;
509 if(!dest_size) return -1;
510 *dest = 0;
511 return ret;
514 static int dispatch_line(char* inbuf, size_t len, char** argv) {
515 char subst_buf[16][4096];
517 stringptr line_b, *line = &line_b;
519 prog_state.lineno++;
520 static unsigned spinup_counter = 0;
523 if(prog_state.skip) {
524 prog_state.skip--;
525 return 1;
527 if(!prog_state.cmd_startarg) {
528 dprintf(1, "%s", inbuf);
529 return 1;
532 line->ptr = inbuf; line->size = len;
534 if(!prog_state.stdin_pipe)
535 stringptr_chomp(line);
537 if(prog_state.subst_entries) {
538 unsigned max_subst = 0;
539 uint32_t* index;
540 sblist_iter(prog_state.subst_entries, index) {
541 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
542 int ret;
543 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
544 if(ret == -1) {
545 too_long:
546 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
547 return 0;
548 } else if(!ret) {
549 char* lastdot = stringptr_rchr(line, '.');
550 stringptr tilLastDot = *line;
551 if(lastdot) tilLastDot.size = lastdot - line->ptr;
552 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
553 if(ret == -1) goto too_long;
555 if(ret) {
556 prog_state.cmd_argv[*index] = subst_buf[max_subst];
557 max_subst++;
563 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
564 msleep(rand() % (prog_state.delayedspinup_interval + 1));
565 spinup_counter++;
568 if(free_slots())
569 launch_job(prog_state.threads_running, prog_state.cmd_argv);
570 else if(!prog_state.stdin_pipe)
571 launch_job(reap_child(), prog_state.cmd_argv);
573 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
574 write_statefile(prog_state.lineno, prog_state.temp_state);
577 if(prog_state.stdin_pipe)
578 pass_stdin(line);
580 return 1;
583 int main(int argc, char** argv) {
584 unsigned i;
586 char tempdir_buf[256];
588 srand(time(NULL));
590 if(argc > 4096) argc = 4096;
592 prog_state.threads_running = 0;
594 if(parse_args(argc, argv)) return 1;
596 if(prog_state.statefile)
597 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
599 prog_state.tempdir = NULL;
601 if(prog_state.buffered) {
602 prog_state.tempdir = tempdir_buf;
603 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
604 perror("mkdtemp");
605 die("could not create tempdir\n");
607 } else {
608 /* if the stdout/stderr fds are not in O_APPEND mode,
609 the dup()'s of the fds in posix_spawn can cause different
610 file positions, causing the different processes to overwrite each others output.
611 testcase:
612 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
614 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
615 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
618 if(prog_state.cmd_startarg) {
619 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
620 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
622 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
625 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
626 init_queue();
628 prog_state.lineno = 0;
630 char inbuf[4096];
631 while(fgets(inbuf, sizeof(inbuf), stdin)) {
632 if(!dispatch_line(inbuf, strlen(inbuf), argv)) break;
635 if(prog_state.stdin_pipe) {
636 close_pipes();
639 if(prog_state.delayedflush)
640 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
642 while(prog_state.threads_running) reap_child();
644 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
645 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
646 if(prog_state.limits) sblist_free(prog_state.limits);
648 if(prog_state.tempdir)
649 rmdir(prog_state.tempdir);
652 fflush(stdout);
653 fflush(stderr);
656 return 0;