fix buffer overflow with huge number of subst args
[rofl0r-jobflow.git] / jobflow.c
blobb6a17362e976b349f322cd411e1541cfc735621b
1 /*
2 Copyright (C) 2012,2014,2016,2017,2018 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.2.2"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdint.h>
41 #include <stddef.h>
42 #include <errno.h>
43 #include <time.h>
44 #include <assert.h>
45 #include <sys/mman.h>
47 /* process handling */
49 #include <fcntl.h>
50 #include <spawn.h>
51 #include <sys/wait.h>
52 #include <sys/stat.h>
54 #include <sys/resource.h>
56 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
57 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
58 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
59 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
60 static int prlimit(int pid, ...) {
61 (void) pid;
62 dprintf(2, "prlimit() not implemented on this system\n");
63 errno = EINVAL;
64 return -1;
66 #endif
69 #include <sys/time.h>
71 typedef struct {
72 pid_t pid;
73 int pipe;
74 posix_spawn_file_actions_t fa;
75 } job_info;
77 typedef struct {
78 int limit;
79 struct rlimit rl;
80 } limit_rec;
82 typedef struct {
83 char temp_state[256];
84 char* cmd_argv[4096];
85 unsigned long long lineno;
86 unsigned numthreads;
87 unsigned threads_running;
88 char* statefile;
89 unsigned long long skip;
90 sblist* job_infos;
91 sblist* subst_entries;
92 sblist* limits;
93 unsigned cmd_startarg;
94 char* tempdir;
95 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
96 the top value in ms can be supplied via a command line switch.
97 this option makes only sense if the interval is somewhat smaller than the
98 expected runtime of the average job.
99 this option is useful to not overload a network app due to hundreds of
100 parallel connection tries on startup.
102 int buffered:1; /* write stdout and stderr of each task into a file,
103 and print it to stdout once the process ends.
104 this prevents mixing up of the output of multiple tasks. */
105 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
106 this means faster program execution, but could also be imprecise if the number of
107 jobs is small or smaller than the available threadcount. */
108 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
109 int pipe_mode:1;
110 size_t bulk_bytes;
111 } prog_state_s;
113 prog_state_s prog_state;
116 extern char** environ;
118 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
119 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
120 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
121 return ret > 0 && (size_t) ret < bufsize;
124 void launch_job(size_t jobindex, char** argv) {
125 char stdout_filename_buf[256];
126 char stderr_filename_buf[256];
127 job_info* job = sblist_get(prog_state.job_infos, jobindex);
129 if(job->pid != -1) return;
131 if(prog_state.buffered) {
132 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
133 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
134 dprintf(2, "temp filename too long!\n");
135 return;
139 errno = posix_spawn_file_actions_init(&job->fa);
140 if(errno) goto spawn_error;
142 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
143 if(errno) goto spawn_error;
145 int pipes[2];
146 if(prog_state.pipe_mode) {
147 if(pipe(pipes)) {
148 perror("pipe");
149 goto spawn_error;
151 job->pipe = pipes[1];
152 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
153 if(errno) goto spawn_error;
154 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
155 if(errno) goto spawn_error;
156 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
157 if(errno) goto spawn_error;
160 if(prog_state.buffered) {
161 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
162 if(errno) goto spawn_error;
163 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
164 if(errno) goto spawn_error;
167 if(!prog_state.pipe_mode) {
168 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
169 if(errno) goto spawn_error;
172 if(prog_state.buffered) {
173 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
174 if(errno) goto spawn_error;
175 if(prog_state.join_output)
176 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
177 else
178 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
179 if(errno) goto spawn_error;
182 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
183 if(errno) {
184 spawn_error:
185 job->pid = -1;
186 perror("posix_spawn");
187 } else {
188 prog_state.threads_running++;
189 if(prog_state.limits) {
190 limit_rec* limit;
191 sblist_iter(prog_state.limits, limit) {
192 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
193 perror("prlimit");
197 if(prog_state.pipe_mode)
198 close(pipes[0]);
201 static void dump_output(size_t job_id, int is_stderr) {
202 char out_filename_buf[256];
203 char buf[4096];
204 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
205 size_t nread;
207 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
209 dst = fopen(out_filename_buf, "r");
210 if(dst) {
211 while((nread = fread(buf, 1, sizeof(buf), dst))) {
212 fwrite(buf, 1, nread, out_stream);
213 if(nread < sizeof(buf)) break;
215 fclose(dst);
216 fflush(out_stream);
217 unlink(out_filename_buf);
221 static void write_all(int fd, void* buf, size_t size) {
222 size_t left = size;
223 const char *p = buf;
224 while(1) {
225 if(left == 0) break;
226 ssize_t n = write(fd, p, left);
227 switch(n) {
228 case -1:
229 if(errno == EINTR) continue;
230 else {
231 perror("write");
232 return;
234 default:
235 p += n;
236 left -= n;
241 static void pass_stdin(stringptr *line) {
242 static size_t next_child = 0;
243 if(next_child >= sblist_getsize(prog_state.job_infos))
244 next_child = 0;
245 job_info *job = sblist_get(prog_state.job_infos, next_child);
246 write_all(job->pipe, line->ptr, line->size);
247 next_child++;
250 static void close_pipes(void) {
251 size_t i;
252 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
253 job_info *job = sblist_get(prog_state.job_infos, i);
254 close(job->pipe);
258 /* wait till a child exits, reap it, and return its job index for slot reuse */
259 static size_t reap_child(void) {
260 size_t i;
261 job_info* job;
262 int ret, retval;
264 do ret = waitpid(-1, &retval, 0);
265 while(ret == -1 || !WIFEXITED(retval));
267 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
268 job = sblist_get(prog_state.job_infos, i);
269 if(job->pid == ret) {
270 job->pid = -1;
271 posix_spawn_file_actions_destroy(&job->fa);
272 prog_state.threads_running--;
273 if(prog_state.buffered) {
274 dump_output(i, 0);
275 if(!prog_state.join_output)
276 dump_output(i, 1);
278 return i;
281 assert(0);
282 return -1;
285 static size_t free_slots(void) {
286 return prog_state.numthreads - prog_state.threads_running;
289 __attribute__((noreturn))
290 static void die(const char* msg) {
291 dprintf(2, msg);
292 exit(1);
295 static unsigned long parse_human_number(stringptr* num) {
296 unsigned long ret = 0;
297 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
298 const char* kmg = "KMG";
299 char* kmgind;
300 if(num && num->size) {
301 ret = atol(num->ptr);
302 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
303 ret *= mul[kmgind - kmg];
305 return ret;
308 static int syntax(void) {
309 dprintf(2,
310 "jobflow " VERSION " (C) rofl0r\n"
311 "------------------------\n"
312 "this program is intended to be used as a recipient of another programs output\n"
313 "it launches processes to which the current line can be passed as an argument\n"
314 "using {} for substitution (as in find -exec).\n"
315 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
316 "stdin of child processes. input will be then evenly distributed to jobs,\n"
317 "until EOF is received.\n"
318 "\n"
319 "available options:\n\n"
320 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
321 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
322 "-exec ./mycommand {}\n"
323 "\n"
324 "-skip=XXX\n"
325 " XXX=number of entries to skip\n"
326 "-threads=XXX (alternative: -j=XXX)\n"
327 " XXX=number of parallel processes to spawn\n"
328 "-resume\n"
329 " resume from last jobnumber stored in statefile\n"
330 "-statefile=XXX\n"
331 " XXX=filename\n"
332 " saves last launched jobnumber into a file\n"
333 "-delayedflush\n"
334 " only write to statefile whenever all processes are busy,\n"
335 " and at program end\n"
336 "-delayedspinup=XXX\n"
337 " XXX=maximum amount of milliseconds\n"
338 " ...to wait when spinning up a fresh set of processes\n"
339 " a random value between 0 and the chosen amount is used to delay initial\n"
340 " spinup.\n"
341 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
342 " activity on program startup\n"
343 "-buffered\n"
344 " store the stdout and stderr of launched processes into a temporary file\n"
345 " which will be printed after a process has finished.\n"
346 " this prevents mixing up of output of different processes.\n"
347 "-joinoutput\n"
348 " if -buffered, write both stdout and stderr into the same file.\n"
349 " this saves the chronological order of the output, and the combined output\n"
350 " will only be printed to stdout.\n"
351 "-bulk=XXX\n"
352 " do bulk copies with a buffer of XXX bytes. only usable in pipe mode.\n"
353 " this passes (almost) the entire buffer to the next scheduled job.\n"
354 " the passed buffer will be truncated to the last line break boundary,\n"
355 " so jobs always get entire lines to work with.\n"
356 " this option is useful when you have huge input files and relatively short\n"
357 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
358 " XXX must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
359 " actual memory allocation will be twice the amount passed.\n"
360 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
361 " than that probably doesn't make sense.\n"
362 " if no size is passed (i.e. only -bulk), a default of 4K will be used.\n"
363 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
364 " sets the rlimit of the new created processes.\n"
365 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
366 "-exec command with args\n"
367 " everything past -exec is treated as the command to execute on each line of\n"
368 " stdin received. the line can be passed as an argument using {}.\n"
369 " {.} passes everything before the last dot in a line as an argument.\n"
370 " it is possible to use multiple substitutions inside a single argument,\n"
371 " but currently only of one type.\n"
372 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
373 "\n"
375 return 1;
378 #undef strtoll
379 #define strtoll(a,b,c) strtoint64(a, strlen(a))
380 static int parse_args(int argc, char** argv) {
381 op_state op_b, *op = &op_b;
382 op_init(op, argc, argv);
383 char *op_temp;
384 if(op_hasflag(op, SPL("-help")))
385 return syntax();
387 op_temp = op_get(op, SPL("threads"));
388 if(!op_temp) op_temp = op_get(op, SPL("j"));
389 long long x = op_temp ? strtoll(op_temp,0,10) : 1;
390 if(x <= 0) die("threadcount must be >= 1\n");
391 prog_state.numthreads = x;
393 op_temp = op_get(op, SPL("statefile"));
394 prog_state.statefile = op_temp;
396 op_temp = op_get(op, SPL("skip"));
397 prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
398 if(op_hasflag(op, SPL("resume"))) {
399 if(!prog_state.statefile) die("-resume needs -statefile\n");
400 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
401 FILE *f = fopen(prog_state.statefile, "r");
402 if(f) {
403 char nb[64];
404 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
405 fclose(f);
410 prog_state.delayedflush = 0;
411 if(op_hasflag(op, SPL("delayedflush"))) {
412 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
413 prog_state.delayedflush = 1;
416 prog_state.pipe_mode = 0;
418 op_temp = op_get(op, SPL("delayedspinup"));
419 prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0;
421 prog_state.cmd_startarg = 0;
422 prog_state.subst_entries = NULL;
424 if(op_hasflag(op, SPL("exec"))) {
425 uint32_t subst_ent;
426 unsigned i, r = 0;
427 for(i = 1; i < (unsigned) argc; i++) {
428 if(str_equal(argv[i], "-exec")) {
429 r = i + 1;
430 break;
433 if(r && r < (unsigned) argc) {
434 prog_state.cmd_startarg = r;
437 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
439 // save entries which must be substituted, to save some cycles.
440 for(i = r; i < (unsigned) argc; i++) {
441 subst_ent = i - r;
442 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) {
443 sblist_add(prog_state.subst_entries, &subst_ent);
446 if(sblist_getsize(prog_state.subst_entries) == 0) {
447 prog_state.pipe_mode = 1;
448 sblist_free(prog_state.subst_entries);
449 prog_state.subst_entries = 0;
453 prog_state.buffered = 0;
454 if(op_hasflag(op, SPL("buffered"))) {
455 prog_state.buffered = 1;
458 prog_state.join_output = 0;
459 if(op_hasflag(op, SPL("joinoutput"))) {
460 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
461 prog_state.join_output = 1;
464 prog_state.bulk_bytes = 0;
465 op_temp = op_get(op, SPL("bulk"));
466 if(op_temp) {
467 SPDECLAREC(value, op_temp);
468 prog_state.bulk_bytes = parse_human_number(value);
469 if(prog_state.bulk_bytes % 4096)
470 die("bulk size must be a multiple of 4096\n");
471 } else if(op_hasflag(op, SPL("bulk")))
472 prog_state.bulk_bytes = 4096;
474 prog_state.limits = NULL;
475 op_temp = op_get(op, SPL("limits"));
476 if(op_temp) {
477 unsigned i;
478 SPDECLAREC(limits, op_temp);
479 stringptrlist* limit_list = stringptr_splitc(limits, ',');
480 stringptrlist* kv;
481 stringptr* key, *value;
482 limit_rec lim;
483 if(stringptrlist_getsize(limit_list)) {
484 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
485 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
486 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
487 if(stringptrlist_getsize(kv) != 2) continue;
488 key = stringptrlist_get(kv, 0);
489 value = stringptrlist_get(kv, 1);
490 if(EQ(key, SPL("mem")))
491 lim.limit = RLIMIT_AS;
492 else if(EQ(key, SPL("cpu")))
493 lim.limit = RLIMIT_CPU;
494 else if(EQ(key, SPL("stack")))
495 lim.limit = RLIMIT_STACK;
496 else if(EQ(key, SPL("fsize")))
497 lim.limit = RLIMIT_FSIZE;
498 else if(EQ(key, SPL("nofiles")))
499 lim.limit = RLIMIT_NOFILE;
500 else
501 die("unknown option passed to -limits");
503 if(getrlimit(lim.limit, &lim.rl) == -1) {
504 perror("getrlimit");
505 die("could not query rlimits");
507 lim.rl.rlim_cur = parse_human_number(value);
508 sblist_add(prog_state.limits, &lim);
509 stringptrlist_free(kv);
511 stringptrlist_free(limit_list);
514 return 0;
517 static void init_queue(void) {
518 unsigned i;
519 job_info ji = {.pid = -1};
521 for(i = 0; i < prog_state.numthreads; i++)
522 sblist_add(prog_state.job_infos, &ji);
525 static void write_statefile(unsigned long long n, const char* tempfile) {
526 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
527 if(fd != -1) {
528 dprintf(fd, "%llu\n", n + 1ULL);
529 close(fd);
530 if(rename(tempfile, prog_state.statefile) == -1)
531 perror("rename");
532 } else
533 perror("open");
536 // returns numbers of substitutions done, -1 on out of buffer.
537 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
538 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
539 size_t i;
540 int ret = 0;
541 for(i = 0; dest_size > 0 && i < source->size; ) {
542 if(stringptr_here(source, i, what)) {
543 if(dest_size < (ssize_t) whit->size) return -1;
544 memcpy(dest, whit->ptr, whit->size);
545 dest += whit->size;
546 dest_size -= whit->size;
547 ret++;
548 i += what->size;
549 } else {
550 *dest = source->ptr[i];
551 dest++;
552 dest_size--;
553 i++;
556 if(!dest_size) return -1;
557 *dest = 0;
558 return ret;
561 static char* mystrnchr(const char *in, int ch, size_t end) {
562 const char *e = in+end;
563 const char *p = in;
564 while(p != e && *p != ch) p++;
565 if(p != e) return (char*)p;
566 return 0;
568 static char* mystrnrchr(const char *in, int ch, size_t end) {
569 const char *e = in+end-1;
570 const char *p = in;
571 while(p != e && *e != ch) e--;
572 if(*e == ch) return (char*)e;
573 return 0;
576 static int need_linecounter(void) {
577 return !!prog_state.skip || prog_state.statefile;
579 static size_t count_linefeeds(const char *buf, size_t len) {
580 const char *p = buf, *e = buf+len;
581 size_t cnt = 0;
582 while(p < e) {
583 if(*p == '\n') cnt++;
584 p++;
586 return cnt;
588 #define MAX_SUBSTS 16
589 static int dispatch_line(char* inbuf, size_t len, char** argv) {
590 char subst_buf[MAX_SUBSTS][4096];
591 static unsigned spinup_counter = 0;
593 stringptr line_b, *line = &line_b;
595 if(!prog_state.bulk_bytes)
596 prog_state.lineno++;
597 else if(need_linecounter()) {
598 prog_state.lineno += count_linefeeds(inbuf, len);
601 if(prog_state.skip) {
602 if(!prog_state.bulk_bytes) {
603 prog_state.skip--;
604 return 1;
605 } else {
606 while(len && prog_state.skip) {
607 char *q = mystrnchr(inbuf, '\n', len);
608 if(q) {
609 ptrdiff_t diff = (q - inbuf) + 1;
610 inbuf += diff;
611 len -= diff;
612 prog_state.skip--;
613 } else {
614 return 1;
617 if(!len) return 1;
620 if(!prog_state.cmd_startarg) {
621 write_all(1, inbuf, len);
622 return 1;
625 line->ptr = inbuf; line->size = len;
627 if(!prog_state.pipe_mode)
628 stringptr_chomp(line);
630 if(prog_state.subst_entries) {
631 unsigned max_subst = 0;
632 uint32_t* index;
633 sblist_iter(prog_state.subst_entries, index) {
634 if(max_subst >= MAX_SUBSTS) break;
635 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
636 int ret;
637 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
638 if(ret == -1) {
639 too_long:
640 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
641 return 0;
642 } else if(!ret) {
643 char* lastdot = stringptr_rchr(line, '.');
644 stringptr tilLastDot = *line;
645 if(lastdot) tilLastDot.size = lastdot - line->ptr;
646 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
647 if(ret == -1) goto too_long;
649 if(ret) {
650 prog_state.cmd_argv[*index] = subst_buf[max_subst];
651 max_subst++;
657 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
658 msleep(rand() % (prog_state.delayedspinup_interval + 1));
659 spinup_counter++;
662 if(free_slots())
663 launch_job(prog_state.threads_running, prog_state.cmd_argv);
664 else if(!prog_state.pipe_mode)
665 launch_job(reap_child(), prog_state.cmd_argv);
667 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
668 write_statefile(prog_state.lineno, prog_state.temp_state);
671 if(prog_state.pipe_mode)
672 pass_stdin(line);
674 return 1;
677 int main(int argc, char** argv) {
678 unsigned i;
680 char tempdir_buf[256];
682 srand(time(NULL));
684 if(argc > 4096) argc = 4096;
686 prog_state.threads_running = 0;
688 if(parse_args(argc, argv)) return 1;
690 if(prog_state.statefile)
691 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
693 prog_state.tempdir = NULL;
695 if(prog_state.buffered) {
696 prog_state.tempdir = tempdir_buf;
697 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
698 perror("mkdtemp");
699 die("could not create tempdir\n");
701 } else {
702 /* if the stdout/stderr fds are not in O_APPEND mode,
703 the dup()'s of the fds in posix_spawn can cause different
704 file positions, causing the different processes to overwrite each others output.
705 testcase:
706 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
708 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
709 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
712 if(prog_state.cmd_startarg) {
713 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
714 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
716 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
719 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
720 init_queue();
722 prog_state.lineno = 0;
724 size_t left = 0, bytes_read = 0;
725 const size_t chunksize = prog_state.bulk_bytes ? prog_state.bulk_bytes : 16*1024;
727 char *mem = mmap(NULL, chunksize*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
728 char *buf1 = mem;
729 char *buf2 = mem+chunksize;
730 char *in, *inbuf;
732 int exitcode = 1;
734 while(1) {
735 inbuf = buf1+chunksize-left;
736 memcpy(inbuf, buf2+bytes_read-left, left);
737 ssize_t n = read(0, buf2, chunksize);
738 if(n == -1) {
739 perror("read");
740 goto out;
742 bytes_read = n;
743 left += n;
744 in = inbuf;
745 while(left) {
746 char *p;
747 if(prog_state.pipe_mode && prog_state.bulk_bytes)
748 p = mystrnrchr(in, '\n', left);
749 else
750 p = mystrnchr (in, '\n', left);
752 if(!p) break;
753 ptrdiff_t diff = (p - in) + 1;
754 if(!dispatch_line(in, diff, argv))
755 goto out;
756 left -= diff;
757 in += diff;
759 if(!n) {
760 if(left) dispatch_line(in, left, argv);
761 break;
763 if(left > chunksize) {
764 dprintf(2, "error: input line length exceeds buffer size\n");
765 goto out;
769 exitcode = 0;
771 out:
773 if(prog_state.pipe_mode) {
774 close_pipes();
777 if(prog_state.delayedflush)
778 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
780 while(prog_state.threads_running) reap_child();
782 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
783 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
784 if(prog_state.limits) sblist_free(prog_state.limits);
786 if(prog_state.tempdir)
787 rmdir(prog_state.tempdir);
790 fflush(stdout);
791 fflush(stderr);
794 return exitcode;