do not show usage screen without args, but use cat mode
[rofl0r-jobflow.git] / jobflow.c
blob9fae67e04119ad4bcdf01f1cacbe0e420bdbd823
1 /*
2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.1"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdint.h>
41 #include <stddef.h>
42 #include <errno.h>
43 #include <time.h>
44 #include <assert.h>
45 #include <sys/mman.h>
47 /* process handling */
49 #include <fcntl.h>
50 #include <spawn.h>
51 #include <sys/wait.h>
52 #include <sys/stat.h>
54 #include <sys/resource.h>
56 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
57 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
58 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
59 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
60 static int prlimit(int pid, ...) {
61 (void) pid;
62 dprintf(2, "prlimit() not implemented on this system\n");
63 errno = EINVAL;
64 return -1;
66 #endif
69 #include <sys/time.h>
71 typedef struct {
72 pid_t pid;
73 int pipe;
74 posix_spawn_file_actions_t fa;
75 } job_info;
77 typedef struct {
78 int limit;
79 struct rlimit rl;
80 } limit_rec;
82 typedef struct {
83 char temp_state[256];
84 char* cmd_argv[4096];
85 unsigned long long lineno;
86 unsigned numthreads;
87 unsigned threads_running;
88 char* statefile;
89 unsigned long long skip;
90 sblist* job_infos;
91 sblist* subst_entries;
92 sblist* limits;
93 unsigned cmd_startarg;
94 char* tempdir;
95 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
96 the top value in ms can be supplied via a command line switch.
97 this option makes only sense if the interval is somewhat smaller than the
98 expected runtime of the average job.
99 this option is useful to not overload a network app due to hundreds of
100 parallel connection tries on startup.
102 int buffered:1; /* write stdout and stderr of each task into a file,
103 and print it to stdout once the process ends.
104 this prevents mixing up of the output of multiple tasks. */
105 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
106 this means faster program execution, but could also be imprecise if the number of
107 jobs is small or smaller than the available threadcount. */
108 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
109 int pipe_mode:1;
110 } prog_state_s;
112 prog_state_s prog_state;
115 extern char** environ;
117 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
118 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
119 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
120 return ret > 0 && (size_t) ret < bufsize;
123 void launch_job(size_t jobindex, char** argv) {
124 char stdout_filename_buf[256];
125 char stderr_filename_buf[256];
126 job_info* job = sblist_get(prog_state.job_infos, jobindex);
128 if(job->pid != -1) return;
130 if(prog_state.buffered) {
131 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
132 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
133 dprintf(2, "temp filename too long!\n");
134 return;
138 errno = posix_spawn_file_actions_init(&job->fa);
139 if(errno) goto spawn_error;
141 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
142 if(errno) goto spawn_error;
144 int pipes[2];
145 if(prog_state.pipe_mode) {
146 if(pipe(pipes)) {
147 perror("pipe");
148 goto spawn_error;
150 job->pipe = pipes[1];
151 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
152 if(errno) goto spawn_error;
153 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
154 if(errno) goto spawn_error;
155 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
156 if(errno) goto spawn_error;
159 if(prog_state.buffered) {
160 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
161 if(errno) goto spawn_error;
162 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
163 if(errno) goto spawn_error;
166 if(!prog_state.pipe_mode) {
167 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
168 if(errno) goto spawn_error;
171 if(prog_state.buffered) {
172 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
173 if(errno) goto spawn_error;
174 if(prog_state.join_output)
175 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
176 else
177 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
178 if(errno) goto spawn_error;
181 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
182 if(errno) {
183 spawn_error:
184 job->pid = -1;
185 perror("posix_spawn");
186 } else {
187 prog_state.threads_running++;
188 if(prog_state.limits) {
189 limit_rec* limit;
190 sblist_iter(prog_state.limits, limit) {
191 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
192 perror("prlimit");
196 if(prog_state.pipe_mode)
197 close(pipes[0]);
200 static void dump_output(size_t job_id, int is_stderr) {
201 char out_filename_buf[256];
202 char buf[4096];
203 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
204 size_t nread;
206 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
208 dst = fopen(out_filename_buf, "r");
209 if(dst) {
210 while((nread = fread(buf, 1, sizeof(buf), dst))) {
211 fwrite(buf, 1, nread, out_stream);
212 if(nread < sizeof(buf)) break;
214 fclose(dst);
215 fflush(out_stream);
219 static void pass_stdin(stringptr *line) {
220 static size_t next_child = 0;
221 if(next_child >= sblist_getsize(prog_state.job_infos))
222 next_child = 0;
223 job_info *job = sblist_get(prog_state.job_infos, next_child);
224 write(job->pipe, line->ptr, line->size);
225 next_child++;
228 static void close_pipes(void) {
229 size_t i;
230 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
231 job_info *job = sblist_get(prog_state.job_infos, i);
232 close(job->pipe);
236 /* wait till a child exits, reap it, and return its job index for slot reuse */
237 static size_t reap_child(void) {
238 size_t i;
239 job_info* job;
240 int ret, retval;
242 do ret = waitpid(-1, &retval, 0);
243 while(ret == -1 || !WIFEXITED(retval));
245 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
246 job = sblist_get(prog_state.job_infos, i);
247 if(job->pid == ret) {
248 job->pid = -1;
249 posix_spawn_file_actions_destroy(&job->fa);
250 prog_state.threads_running--;
251 if(prog_state.buffered) {
252 dump_output(i, 0);
253 if(!prog_state.join_output)
254 dump_output(i, 1);
256 return i;
259 assert(0);
260 return -1;
263 static size_t free_slots(void) {
264 return prog_state.numthreads - prog_state.threads_running;
267 __attribute__((noreturn))
268 static void die(const char* msg) {
269 dprintf(2, msg);
270 exit(1);
273 static unsigned long parse_human_number(stringptr* num) {
274 unsigned long ret = 0;
275 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
276 const char* kmg = "KMG";
277 char* kmgind;
278 if(num && num->size) {
279 ret = atol(num->ptr);
280 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
281 ret *= mul[kmgind - kmg];
283 return ret;
286 static int syntax(void) {
287 dprintf(2,
288 "jobflow " VERSION " (C) rofl0r\n"
289 "------------------\n"
290 "this program is intended to be used as a recipient of another programs output\n"
291 "it launches processes to which the current line can be passed as an argument\n"
292 "using {} for substitution (as in find -exec).\n"
293 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
294 "stdin of child processes. input will be then evenly distributed to jobs,\n"
295 "until EOF is received.\n"
296 "\n"
297 "available options:\n\n"
298 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
299 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
300 "-exec ./mycommand {}\n"
301 "\n"
302 "-skip=XXX\n"
303 " XXX=number of entries to skip\n"
304 "-threads=XXX\n"
305 " XXX=number of parallel processes to spawn\n"
306 "-resume\n"
307 " resume from last jobnumber stored in statefile\n"
308 "-statefile=XXX\n"
309 " XXX=filename\n"
310 " saves last launched jobnumber into a file\n"
311 "-delayedflush\n"
312 " only write to statefile whenever all processes are busy,\n"
313 " and at program end\n"
314 "-delayedspinup=XXX\n"
315 " XXX=maximum amount of milliseconds\n"
316 " ...to wait when spinning up a fresh set of processes\n"
317 " a random value between 0 and the chosen amount is used to delay initial\n"
318 " spinup.\n"
319 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
320 " activity on program startup\n"
321 "-buffered\n"
322 " store the stdout and stderr of launched processes into a temporary file\n"
323 " which will be printed after a process has finished.\n"
324 " this prevents mixing up of output of different processes.\n"
325 "-joinoutput\n"
326 " if -buffered, write both stdout and stderr into the same file.\n"
327 " this saves the chronological order of the output, and the combined output\n"
328 " will only be printed to stdout.\n"
329 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
330 " sets the rlimit of the new created processes.\n"
331 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
332 "-exec command with args\n"
333 " everything past -exec is treated as the command to execute on each line of\n"
334 " stdin received. the line can be passed as an argument using {}.\n"
335 " {.} passes everything before the last dot in a line as an argument.\n"
336 " it is possible to use multiple substitutions inside a single argument,\n"
337 " but currently only of one type.\n"
338 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
339 "\n"
341 return 1;
344 #undef strtoll
345 #define strtoll(a,b,c) strtoint64(a, strlen(a))
346 static int parse_args(int argc, char** argv) {
347 op_state op_b, *op = &op_b;
348 op_init(op, argc, argv);
349 char *op_temp;
350 if(op_hasflag(op, SPL("-help")))
351 return syntax();
353 op_temp = op_get(op, SPL("threads"));
354 long long x = op_temp ? strtoll(op_temp,0,10) : 1;
355 if(x <= 0) die("threadcount must be >= 1\n");
356 prog_state.numthreads = x;
358 op_temp = op_get(op, SPL("statefile"));
359 prog_state.statefile = op_temp;
361 op_temp = op_get(op, SPL("skip"));
362 prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
363 if(op_hasflag(op, SPL("resume"))) {
364 if(!prog_state.statefile) die("-resume needs -statefile\n");
365 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
366 FILE *f = fopen(prog_state.statefile, "r");
367 if(f) {
368 char nb[64];
369 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
370 fclose(f);
375 prog_state.delayedflush = 0;
376 if(op_hasflag(op, SPL("delayedflush"))) {
377 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
378 prog_state.delayedflush = 1;
381 prog_state.pipe_mode = 0;
383 op_temp = op_get(op, SPL("delayedspinup"));
384 prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0;
386 prog_state.cmd_startarg = 0;
387 prog_state.subst_entries = NULL;
389 if(op_hasflag(op, SPL("exec"))) {
390 uint32_t subst_ent;
391 unsigned i, r = 0;
392 for(i = 1; i < (unsigned) argc; i++) {
393 if(str_equal(argv[i], "-exec")) {
394 r = i + 1;
395 break;
398 if(r && r < (unsigned) argc) {
399 prog_state.cmd_startarg = r;
402 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
404 // save entries which must be substituted, to save some cycles.
405 for(i = r; i < (unsigned) argc; i++) {
406 subst_ent = i - r;
407 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) {
408 sblist_add(prog_state.subst_entries, &subst_ent);
411 if(sblist_getsize(prog_state.subst_entries) == 0) {
412 prog_state.pipe_mode = 1;
413 sblist_free(prog_state.subst_entries);
414 prog_state.subst_entries = 0;
418 prog_state.buffered = 0;
419 if(op_hasflag(op, SPL("buffered"))) {
420 prog_state.buffered = 1;
423 prog_state.join_output = 0;
424 if(op_hasflag(op, SPL("joinoutput"))) {
425 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
426 prog_state.join_output = 1;
429 prog_state.limits = NULL;
430 op_temp = op_get(op, SPL("limits"));
431 if(op_temp) {
432 unsigned i;
433 SPDECLAREC(limits, op_temp);
434 stringptrlist* limit_list = stringptr_splitc(limits, ',');
435 stringptrlist* kv;
436 stringptr* key, *value;
437 limit_rec lim;
438 if(stringptrlist_getsize(limit_list)) {
439 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
440 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
441 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
442 if(stringptrlist_getsize(kv) != 2) continue;
443 key = stringptrlist_get(kv, 0);
444 value = stringptrlist_get(kv, 1);
445 if(EQ(key, SPL("mem")))
446 lim.limit = RLIMIT_AS;
447 else if(EQ(key, SPL("cpu")))
448 lim.limit = RLIMIT_CPU;
449 else if(EQ(key, SPL("stack")))
450 lim.limit = RLIMIT_STACK;
451 else if(EQ(key, SPL("fsize")))
452 lim.limit = RLIMIT_FSIZE;
453 else if(EQ(key, SPL("nofiles")))
454 lim.limit = RLIMIT_NOFILE;
455 else
456 die("unknown option passed to -limits");
458 if(getrlimit(lim.limit, &lim.rl) == -1) {
459 perror("getrlimit");
460 die("could not query rlimits");
462 lim.rl.rlim_cur = parse_human_number(value);
463 sblist_add(prog_state.limits, &lim);
464 stringptrlist_free(kv);
466 stringptrlist_free(limit_list);
469 return 0;
472 static void init_queue(void) {
473 unsigned i;
474 job_info ji = {.pid = -1};
476 for(i = 0; i < prog_state.numthreads; i++)
477 sblist_add(prog_state.job_infos, &ji);
480 static void write_statefile(unsigned long long n, const char* tempfile) {
481 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
482 if(fd != -1) {
483 dprintf(fd, "%llu\n", n + 1ULL);
484 close(fd);
485 if(rename(tempfile, prog_state.statefile) == -1)
486 perror("rename");
487 } else
488 perror("open");
491 // returns numbers of substitutions done, -1 on out of buffer.
492 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
493 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
494 size_t i;
495 int ret = 0;
496 for(i = 0; dest_size > 0 && i < source->size; ) {
497 if(stringptr_here(source, i, what)) {
498 if(dest_size < (ssize_t) whit->size) return -1;
499 memcpy(dest, whit->ptr, whit->size);
500 dest += whit->size;
501 dest_size -= whit->size;
502 ret++;
503 i += what->size;
504 } else {
505 *dest = source->ptr[i];
506 dest++;
507 dest_size--;
508 i++;
511 if(!dest_size) return -1;
512 *dest = 0;
513 return ret;
516 static int dispatch_line(char* inbuf, size_t len, char** argv) {
517 char subst_buf[16][4096];
519 stringptr line_b, *line = &line_b;
521 prog_state.lineno++;
522 static unsigned spinup_counter = 0;
525 if(prog_state.skip) {
526 prog_state.skip--;
527 return 1;
529 if(!prog_state.cmd_startarg) {
530 write(1, inbuf, len);
531 return 1;
534 line->ptr = inbuf; line->size = len;
536 if(!prog_state.pipe_mode)
537 stringptr_chomp(line);
539 if(prog_state.subst_entries) {
540 unsigned max_subst = 0;
541 uint32_t* index;
542 sblist_iter(prog_state.subst_entries, index) {
543 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
544 int ret;
545 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
546 if(ret == -1) {
547 too_long:
548 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
549 return 0;
550 } else if(!ret) {
551 char* lastdot = stringptr_rchr(line, '.');
552 stringptr tilLastDot = *line;
553 if(lastdot) tilLastDot.size = lastdot - line->ptr;
554 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
555 if(ret == -1) goto too_long;
557 if(ret) {
558 prog_state.cmd_argv[*index] = subst_buf[max_subst];
559 max_subst++;
565 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
566 msleep(rand() % (prog_state.delayedspinup_interval + 1));
567 spinup_counter++;
570 if(free_slots())
571 launch_job(prog_state.threads_running, prog_state.cmd_argv);
572 else if(!prog_state.pipe_mode)
573 launch_job(reap_child(), prog_state.cmd_argv);
575 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
576 write_statefile(prog_state.lineno, prog_state.temp_state);
579 if(prog_state.pipe_mode)
580 pass_stdin(line);
582 return 1;
585 static char* mystrnchr(const char *in, int ch, size_t end) {
586 const char *e = in+end;
587 const char *p = in;
588 while(p != e && *p != ch) p++;
589 if(*p == ch) return (char*)p;
590 return 0;
592 static char* mystrnrchr(const char *in, int ch, size_t end) {
593 const char *e = in+end-1;
594 const char *p = in;
595 while(p != e && *e != ch) e--;
596 if(*e == ch) return (char*)e;
597 return 0;
600 #define BULK_KB 16
601 #define BULK_BUFSZ BULK_KB*1024
603 int main(int argc, char** argv) {
604 unsigned i;
606 char tempdir_buf[256];
608 srand(time(NULL));
610 if(argc > 4096) argc = 4096;
612 prog_state.threads_running = 0;
614 if(parse_args(argc, argv)) return 1;
616 if(prog_state.statefile)
617 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
619 prog_state.tempdir = NULL;
621 if(prog_state.buffered) {
622 prog_state.tempdir = tempdir_buf;
623 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
624 perror("mkdtemp");
625 die("could not create tempdir\n");
627 } else {
628 /* if the stdout/stderr fds are not in O_APPEND mode,
629 the dup()'s of the fds in posix_spawn can cause different
630 file positions, causing the different processes to overwrite each others output.
631 testcase:
632 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
634 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
635 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
638 if(prog_state.cmd_startarg) {
639 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
640 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
642 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
645 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
646 init_queue();
648 prog_state.lineno = 0;
650 size_t left = 0;
652 char *mem = mmap(NULL, BULK_BUFSZ*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
653 char *buf1 = mem;
654 char *buf2 = mem+BULK_BUFSZ;
655 char *in, *inbuf;
657 int exitcode = 1;
659 while(1) {
660 inbuf = buf1+BULK_BUFSZ-left;
661 memcpy(inbuf, buf2+BULK_BUFSZ-left, left);
662 ssize_t n = read(0, buf2, BULK_BUFSZ);
663 if(n == -1) {
664 perror("read");
665 goto out;
667 left += n;
668 in = inbuf;
669 while(left) {
670 char *p;
671 if(!prog_state.pipe_mode)
672 p = mystrnchr (in, '\n', left);
673 else
674 p = mystrnrchr(in, '\n', left);
675 if(!p) break;
676 ptrdiff_t diff = (p - in) + 1;
677 if(!dispatch_line(in, diff, argv))
678 goto out;
679 left -= diff;
680 in += diff;
682 if(!n) {
683 if(left) dispatch_line(in, left, argv);
684 break;
686 if(left > BULK_BUFSZ) {
687 dprintf(2, "error: input line length exceeds buffer size\n");
688 goto out;
692 exitcode = 0;
694 out:
696 if(prog_state.pipe_mode) {
697 close_pipes();
700 if(prog_state.delayedflush)
701 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
703 while(prog_state.threads_running) reap_child();
705 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
706 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
707 if(prog_state.limits) sblist_free(prog_state.limits);
709 if(prog_state.tempdir)
710 rmdir(prog_state.tempdir);
713 fflush(stdout);
714 fflush(stderr);
717 return exitcode;