add new -pipe command line option
[rofl0r-jobflow.git] / jobflow.c
blob899121eef011ab5e748a93b6672ba8c327a78de5
1 /*
2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.1"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdint.h>
41 #include <stddef.h>
42 #include <errno.h>
43 #include <time.h>
44 #include <assert.h>
46 /* process handling */
48 #include <fcntl.h>
49 #include <spawn.h>
50 #include <sys/wait.h>
51 #include <sys/stat.h>
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid, ...) {
60 (void) pid;
61 dprintf(2, "prlimit() not implemented on this system\n");
62 errno = EINVAL;
63 return -1;
65 #endif
68 #include <sys/time.h>
70 typedef struct {
71 pid_t pid;
72 int pipe;
73 posix_spawn_file_actions_t fa;
74 } job_info;
76 typedef struct {
77 int limit;
78 struct rlimit rl;
79 } limit_rec;
81 typedef struct {
82 unsigned numthreads;
83 unsigned threads_running;
84 char* statefile;
85 unsigned long long skip;
86 sblist* job_infos;
87 sblist* subst_entries;
88 sblist* limits;
89 unsigned cmd_startarg;
90 char* tempdir;
91 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
92 the top value in ms can be supplied via a command line switch.
93 this option makes only sense if the interval is somewhat smaller than the
94 expected runtime of the average job.
95 this option is useful to not overload a network app due to hundreds of
96 parallel connection tries on startup.
98 int buffered:1; /* write stdout and stderr of each task into a file,
99 and print it to stdout once the process ends.
100 this prevents mixing up of the output of multiple tasks. */
101 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
102 this means faster program execution, but could also be imprecise if the number of
103 jobs is small or smaller than the available threadcount. */
104 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
105 int stdin_pipe:1;
106 } prog_state_s;
108 prog_state_s prog_state;
111 extern char** environ;
113 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
114 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
115 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
116 return ret > 0 && (size_t) ret < bufsize;
119 void launch_job(size_t jobindex, char** argv) {
120 char stdout_filename_buf[256];
121 char stderr_filename_buf[256];
122 job_info* job = sblist_get(prog_state.job_infos, jobindex);
124 if(job->pid != -1) return;
126 if(prog_state.buffered) {
127 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
128 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
129 dprintf(2, "temp filename too long!\n");
130 return;
134 errno = posix_spawn_file_actions_init(&job->fa);
135 if(errno) goto spawn_error;
137 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
138 if(errno) goto spawn_error;
140 int pipes[2];
141 if(prog_state.stdin_pipe) {
142 if(pipe(pipes)) {
143 perror("pipe");
144 goto spawn_error;
146 job->pipe = pipes[1];
147 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
148 if(errno) goto spawn_error;
149 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
150 if(errno) goto spawn_error;
151 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
152 if(errno) goto spawn_error;
155 if(prog_state.buffered) {
156 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
157 if(errno) goto spawn_error;
158 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
159 if(errno) goto spawn_error;
162 if(!prog_state.stdin_pipe) {
163 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
164 if(errno) goto spawn_error;
167 if(prog_state.buffered) {
168 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
169 if(errno) goto spawn_error;
170 if(prog_state.join_output)
171 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
172 else
173 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
174 if(errno) goto spawn_error;
177 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
178 if(errno) {
179 spawn_error:
180 job->pid = -1;
181 perror("posix_spawn");
182 } else {
183 prog_state.threads_running++;
184 if(prog_state.limits) {
185 limit_rec* limit;
186 sblist_iter(prog_state.limits, limit) {
187 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
188 perror("prlimit");
192 if(prog_state.stdin_pipe) close(pipes[0]);
195 static void dump_output(size_t job_id, int is_stderr) {
196 char out_filename_buf[256];
197 char buf[4096];
198 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
199 size_t nread;
201 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
203 dst = fopen(out_filename_buf, "r");
204 if(dst) {
205 while((nread = fread(buf, 1, sizeof(buf), dst))) {
206 fwrite(buf, 1, nread, out_stream);
207 if(nread < sizeof(buf)) break;
209 fclose(dst);
210 fflush(out_stream);
214 static void pass_stdin(stringptr *line) {
215 static size_t next_child = 0;
216 if(next_child >= sblist_getsize(prog_state.job_infos))
217 next_child = 0;
218 job_info *job = sblist_get(prog_state.job_infos, next_child);
219 write(job->pipe, line->ptr, line->size);
220 next_child++;
223 static void close_pipes(void) {
224 size_t i;
225 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
226 job_info *job = sblist_get(prog_state.job_infos, i);
227 close(job->pipe);
231 /* wait till a child exits, reap it, and return its job index for slot reuse */
232 static size_t reap_child(void) {
233 size_t i;
234 job_info* job;
235 int ret, retval;
237 do ret = waitpid(-1, &retval, 0);
238 while(ret == -1 || !WIFEXITED(retval));
240 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
241 job = sblist_get(prog_state.job_infos, i);
242 if(job->pid == ret) {
243 job->pid = -1;
244 posix_spawn_file_actions_destroy(&job->fa);
245 prog_state.threads_running--;
246 if(prog_state.buffered) {
247 dump_output(i, 0);
248 if(!prog_state.join_output)
249 dump_output(i, 1);
251 return i;
254 assert(0);
255 return -1;
258 static size_t free_slots(void) {
259 return prog_state.numthreads - prog_state.threads_running;
262 __attribute__((noreturn))
263 static void die(const char* msg) {
264 dprintf(2, msg);
265 exit(1);
268 static unsigned long parse_human_number(stringptr* num) {
269 unsigned long ret = 0;
270 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
271 const char* kmg = "KMG";
272 char* kmgind;
273 if(num && num->size) {
274 ret = atol(num->ptr);
275 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
276 ret *= mul[kmgind - kmg];
278 return ret;
281 static int syntax(void) {
282 dprintf(2,
283 "jobflow " VERSION " (C) rofl0r\n"
284 "------------------\n"
285 "this program is intended to be used as a recipient of another programs output\n"
286 "it launches processes to which the current line can be passed as an argument\n"
287 "using {} for substitution (as in find -exec).\n"
288 "\n"
289 "available options:\n\n"
290 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
291 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
292 "-pipe -exec ./mycommand {}\n"
293 "\n"
294 "-pipe\n"
295 " child processes receive input on stdin. if this option\n"
296 " is used, input will be evenly distributed to jobs.\n"
297 " all jobs will stay alive until EOF is received.\n"
298 "-skip=XXX\n"
299 " XXX=number of entries to skip\n"
300 "-threads=XXX\n"
301 " XXX=number of parallel processes to spawn\n"
302 "-resume\n"
303 " resume from last jobnumber stored in statefile\n"
304 "-statefile=XXX\n"
305 " XXX=filename\n"
306 " saves last launched jobnumber into a file\n"
307 "-delayedflush\n"
308 " only write to statefile whenever all processes are busy,\n"
309 " and at program end\n"
310 "-delayedspinup=XXX\n"
311 " XXX=maximum amount of milliseconds\n"
312 " ...to wait when spinning up a fresh set of processes\n"
313 " a random value between 0 and the chosen amount is used to delay initial\n"
314 " spinup.\n"
315 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
316 " activity on program startup\n"
317 "-buffered\n"
318 " store the stdout and stderr of launched processes into a temporary file\n"
319 " which will be printed after a process has finished.\n"
320 " this prevents mixing up of output of different processes.\n"
321 "-joinoutput\n"
322 " if -buffered, write both stdout and stderr into the same file.\n"
323 " this saves the chronological order of the output, and the combined output\n"
324 " will only be printed to stdout.\n"
325 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
326 " sets the rlimit of the new created processes.\n"
327 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
328 "-exec command with args\n"
329 " everything past -exec is treated as the command to execute on each line of\n"
330 " stdin received. the line can be passed as an argument using {}.\n"
331 " {.} passes everything before the last dot in a line as an argument.\n"
332 " it is possible to use multiple substitutions inside a single argument,\n"
333 " but currently only of one type.\n"
334 "\n"
336 return 1;
339 #undef strtoll
340 #define strtoll(a,b,c) strtoint64(a, strlen(a))
341 static int parse_args(int argc, char** argv) {
342 op_state op_b, *op = &op_b;
343 op_init(op, argc, argv);
344 char *op_temp;
345 if(argc == 1 || op_hasflag(op, SPL("-help")))
346 return syntax();
348 op_temp = op_get(op, SPL("threads"));
349 long long x = op_temp ? strtoll(op_temp,0,10) : 1;
350 if(x <= 0) die("threadcount must be >= 1\n");
351 prog_state.numthreads = x;
353 op_temp = op_get(op, SPL("statefile"));
354 prog_state.statefile = op_temp;
356 op_temp = op_get(op, SPL("skip"));
357 prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
358 if(op_hasflag(op, SPL("resume"))) {
359 if(!prog_state.statefile) die("-resume needs -statefile\n");
360 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
361 FILE *f = fopen(prog_state.statefile, "r");
362 if(f) {
363 char nb[64];
364 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
365 fclose(f);
370 prog_state.delayedflush = 0;
371 if(op_hasflag(op, SPL("delayedflush"))) {
372 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
373 prog_state.delayedflush = 1;
376 prog_state.stdin_pipe = 0;
377 if(op_hasflag(op, SPL("pipe"))) prog_state.stdin_pipe = 1;
379 op_temp = op_get(op, SPL("delayedspinup"));
380 prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0;
382 prog_state.cmd_startarg = 0;
383 prog_state.subst_entries = NULL;
385 if(op_hasflag(op, SPL("exec"))) {
386 uint32_t subst_ent;
387 unsigned i, r = 0;
388 for(i = 1; i < (unsigned) argc; i++) {
389 if(str_equal(argv[i], "-exec")) {
390 r = i + 1;
391 break;
394 if(r && r < (unsigned) argc) {
395 prog_state.cmd_startarg = r;
398 if(!prog_state.stdin_pipe)
399 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
401 // save entries which must be substituted, to save some cycles.
402 for(i = r; i < (unsigned) argc; i++) {
403 subst_ent = i - r;
404 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) {
405 if(prog_state.stdin_pipe)
406 die("argument substitution must not be used when -pipe option is given\n");
408 sblist_add(prog_state.subst_entries, &subst_ent);
413 prog_state.buffered = 0;
414 if(op_hasflag(op, SPL("buffered"))) {
415 prog_state.buffered = 1;
418 prog_state.join_output = 0;
419 if(op_hasflag(op, SPL("joinoutput"))) {
420 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
421 prog_state.join_output = 1;
424 prog_state.limits = NULL;
425 op_temp = op_get(op, SPL("limits"));
426 if(op_temp) {
427 unsigned i;
428 SPDECLAREC(limits, op_temp);
429 stringptrlist* limit_list = stringptr_splitc(limits, ',');
430 stringptrlist* kv;
431 stringptr* key, *value;
432 limit_rec lim;
433 if(stringptrlist_getsize(limit_list)) {
434 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
435 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
436 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
437 if(stringptrlist_getsize(kv) != 2) continue;
438 key = stringptrlist_get(kv, 0);
439 value = stringptrlist_get(kv, 1);
440 if(EQ(key, SPL("mem")))
441 lim.limit = RLIMIT_AS;
442 else if(EQ(key, SPL("cpu")))
443 lim.limit = RLIMIT_CPU;
444 else if(EQ(key, SPL("stack")))
445 lim.limit = RLIMIT_STACK;
446 else if(EQ(key, SPL("fsize")))
447 lim.limit = RLIMIT_FSIZE;
448 else if(EQ(key, SPL("nofiles")))
449 lim.limit = RLIMIT_NOFILE;
450 else
451 die("unknown option passed to -limits");
453 if(getrlimit(lim.limit, &lim.rl) == -1) {
454 perror("getrlimit");
455 die("could not query rlimits");
457 lim.rl.rlim_cur = parse_human_number(value);
458 sblist_add(prog_state.limits, &lim);
459 stringptrlist_free(kv);
461 stringptrlist_free(limit_list);
464 return 0;
467 static void init_queue(void) {
468 unsigned i;
469 job_info ji = {.pid = -1};
471 for(i = 0; i < prog_state.numthreads; i++)
472 sblist_add(prog_state.job_infos, &ji);
475 static void write_statefile(unsigned long long n, const char* tempfile) {
476 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
477 if(fd != -1) {
478 dprintf(fd, "%llu\n", n + 1ULL);
479 close(fd);
480 if(rename(tempfile, prog_state.statefile) == -1)
481 perror("rename");
482 } else
483 perror("open");
486 // returns numbers of substitutions done, -1 on out of buffer.
487 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
488 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
489 size_t i;
490 int ret = 0;
491 for(i = 0; dest_size > 0 && i < source->size; ) {
492 if(stringptr_here(source, i, what)) {
493 if(dest_size < (ssize_t) whit->size) return -1;
494 memcpy(dest, whit->ptr, whit->size);
495 dest += whit->size;
496 dest_size -= whit->size;
497 ret++;
498 i += what->size;
499 } else {
500 *dest = source->ptr[i];
501 dest++;
502 dest_size--;
503 i++;
506 if(!dest_size) return -1;
507 *dest = 0;
508 return ret;
511 int main(int argc, char** argv) {
512 char inbuf[4096]; char* fgets_result;
513 stringptr line_b, *line = &line_b;
514 char* cmd_argv[4096];
515 char subst_buf[16][4096];
516 unsigned max_subst;
518 unsigned long long lineno = 0;
519 unsigned i;
520 unsigned spinup_counter = 0;
522 char tempdir_buf[256];
523 char temp_state[256];
525 srand(time(NULL));
527 if(argc > 4096) argc = 4096;
529 prog_state.threads_running = 0;
531 if(parse_args(argc, argv)) return 1;
533 if(prog_state.statefile)
534 snprintf(temp_state, sizeof(temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
536 prog_state.tempdir = NULL;
538 if(prog_state.buffered) {
539 prog_state.tempdir = tempdir_buf;
540 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
541 perror("mkdtemp");
542 die("could not create tempdir\n");
544 } else {
545 /* if the stdout/stderr fds are not in O_APPEND mode,
546 the dup()'s of the fds in posix_spawn can cause different
547 file positions, causing the different processes to overwrite each others output.
548 testcase:
549 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
551 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
552 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
555 if(prog_state.cmd_startarg) {
556 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
557 cmd_argv[i - prog_state.cmd_startarg] = argv[i];
559 cmd_argv[argc - prog_state.cmd_startarg] = NULL;
562 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
563 init_queue();
565 for(;(fgets_result = fgets(inbuf, sizeof(inbuf), stdin));lineno++) {
566 if(prog_state.skip) {
567 prog_state.skip--;
568 continue;
570 if(!prog_state.cmd_startarg) {
571 dprintf(1, fgets_result);
572 continue;
575 stringptr_fromchar(fgets_result, line);
577 if(!prog_state.stdin_pipe)
578 stringptr_chomp(line);
580 if(prog_state.subst_entries) {
581 max_subst = 0;
582 uint32_t* index;
583 sblist_iter(prog_state.subst_entries, index) {
584 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
585 int ret;
586 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
587 if(ret == -1) {
588 too_long:
589 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
590 goto out;
591 } else if(!ret) {
592 char* lastdot = stringptr_rchr(line, '.');
593 stringptr tilLastDot = *line;
594 if(lastdot) tilLastDot.size = lastdot - line->ptr;
595 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
596 if(ret == -1) goto too_long;
598 if(ret) {
599 cmd_argv[*index] = subst_buf[max_subst];
600 max_subst++;
606 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
607 msleep(rand() % (prog_state.delayedspinup_interval + 1));
608 spinup_counter++;
611 if(free_slots())
612 launch_job(prog_state.threads_running, cmd_argv);
613 else if(!prog_state.stdin_pipe)
614 launch_job(reap_child(), cmd_argv);
616 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
617 write_statefile(lineno, temp_state);
620 if(prog_state.stdin_pipe)
621 pass_stdin(line);
624 out:
626 if(prog_state.stdin_pipe) {
627 close_pipes();
630 if(prog_state.delayedflush)
631 write_statefile(lineno - 1, temp_state);
633 while(prog_state.threads_running) reap_child();
635 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
636 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
637 if(prog_state.limits) sblist_free(prog_state.limits);
639 if(prog_state.tempdir)
640 rmdir(prog_state.tempdir);
643 fflush(stdout);
644 fflush(stderr);
647 return 0;