fix build of some test progs
[rofl0r-jobflow.git] / jobflow.c
blob91f2a269ad0eb36d6397920a49a70b43eda252c3
1 /*
2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.1"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdint.h>
41 #include <stddef.h>
42 #include <errno.h>
43 #include <time.h>
44 #include <assert.h>
45 #include <sys/mman.h>
47 /* process handling */
49 #include <fcntl.h>
50 #include <spawn.h>
51 #include <sys/wait.h>
52 #include <sys/stat.h>
54 #include <sys/resource.h>
56 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
57 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
58 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
59 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
60 static int prlimit(int pid, ...) {
61 (void) pid;
62 dprintf(2, "prlimit() not implemented on this system\n");
63 errno = EINVAL;
64 return -1;
66 #endif
69 #include <sys/time.h>
71 typedef struct {
72 pid_t pid;
73 int pipe;
74 posix_spawn_file_actions_t fa;
75 } job_info;
77 typedef struct {
78 int limit;
79 struct rlimit rl;
80 } limit_rec;
82 typedef struct {
83 char temp_state[256];
84 char* cmd_argv[4096];
85 unsigned long long lineno;
86 unsigned numthreads;
87 unsigned threads_running;
88 char* statefile;
89 unsigned long long skip;
90 sblist* job_infos;
91 sblist* subst_entries;
92 sblist* limits;
93 unsigned cmd_startarg;
94 char* tempdir;
95 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
96 the top value in ms can be supplied via a command line switch.
97 this option makes only sense if the interval is somewhat smaller than the
98 expected runtime of the average job.
99 this option is useful to not overload a network app due to hundreds of
100 parallel connection tries on startup.
102 int buffered:1; /* write stdout and stderr of each task into a file,
103 and print it to stdout once the process ends.
104 this prevents mixing up of the output of multiple tasks. */
105 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
106 this means faster program execution, but could also be imprecise if the number of
107 jobs is small or smaller than the available threadcount. */
108 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
109 int pipe_mode:1;
110 size_t bulk_bytes;
111 } prog_state_s;
113 prog_state_s prog_state;
116 extern char** environ;
118 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
119 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
120 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
121 return ret > 0 && (size_t) ret < bufsize;
124 void launch_job(size_t jobindex, char** argv) {
125 char stdout_filename_buf[256];
126 char stderr_filename_buf[256];
127 job_info* job = sblist_get(prog_state.job_infos, jobindex);
129 if(job->pid != -1) return;
131 if(prog_state.buffered) {
132 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
133 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
134 dprintf(2, "temp filename too long!\n");
135 return;
139 errno = posix_spawn_file_actions_init(&job->fa);
140 if(errno) goto spawn_error;
142 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
143 if(errno) goto spawn_error;
145 int pipes[2];
146 if(prog_state.pipe_mode) {
147 if(pipe(pipes)) {
148 perror("pipe");
149 goto spawn_error;
151 job->pipe = pipes[1];
152 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
153 if(errno) goto spawn_error;
154 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
155 if(errno) goto spawn_error;
156 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
157 if(errno) goto spawn_error;
160 if(prog_state.buffered) {
161 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
162 if(errno) goto spawn_error;
163 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
164 if(errno) goto spawn_error;
167 if(!prog_state.pipe_mode) {
168 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
169 if(errno) goto spawn_error;
172 if(prog_state.buffered) {
173 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
174 if(errno) goto spawn_error;
175 if(prog_state.join_output)
176 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
177 else
178 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
179 if(errno) goto spawn_error;
182 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
183 if(errno) {
184 spawn_error:
185 job->pid = -1;
186 perror("posix_spawn");
187 } else {
188 prog_state.threads_running++;
189 if(prog_state.limits) {
190 limit_rec* limit;
191 sblist_iter(prog_state.limits, limit) {
192 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
193 perror("prlimit");
197 if(prog_state.pipe_mode)
198 close(pipes[0]);
201 static void dump_output(size_t job_id, int is_stderr) {
202 char out_filename_buf[256];
203 char buf[4096];
204 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
205 size_t nread;
207 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
209 dst = fopen(out_filename_buf, "r");
210 if(dst) {
211 while((nread = fread(buf, 1, sizeof(buf), dst))) {
212 fwrite(buf, 1, nread, out_stream);
213 if(nread < sizeof(buf)) break;
215 fclose(dst);
216 fflush(out_stream);
220 static void pass_stdin(stringptr *line) {
221 static size_t next_child = 0;
222 if(next_child >= sblist_getsize(prog_state.job_infos))
223 next_child = 0;
224 job_info *job = sblist_get(prog_state.job_infos, next_child);
225 write(job->pipe, line->ptr, line->size);
226 next_child++;
229 static void close_pipes(void) {
230 size_t i;
231 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
232 job_info *job = sblist_get(prog_state.job_infos, i);
233 close(job->pipe);
237 /* wait till a child exits, reap it, and return its job index for slot reuse */
238 static size_t reap_child(void) {
239 size_t i;
240 job_info* job;
241 int ret, retval;
243 do ret = waitpid(-1, &retval, 0);
244 while(ret == -1 || !WIFEXITED(retval));
246 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
247 job = sblist_get(prog_state.job_infos, i);
248 if(job->pid == ret) {
249 job->pid = -1;
250 posix_spawn_file_actions_destroy(&job->fa);
251 prog_state.threads_running--;
252 if(prog_state.buffered) {
253 dump_output(i, 0);
254 if(!prog_state.join_output)
255 dump_output(i, 1);
257 return i;
260 assert(0);
261 return -1;
264 static size_t free_slots(void) {
265 return prog_state.numthreads - prog_state.threads_running;
268 __attribute__((noreturn))
269 static void die(const char* msg) {
270 dprintf(2, msg);
271 exit(1);
274 static unsigned long parse_human_number(stringptr* num) {
275 unsigned long ret = 0;
276 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
277 const char* kmg = "KMG";
278 char* kmgind;
279 if(num && num->size) {
280 ret = atol(num->ptr);
281 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
282 ret *= mul[kmgind - kmg];
284 return ret;
287 static int syntax(void) {
288 dprintf(2,
289 "jobflow " VERSION " (C) rofl0r\n"
290 "------------------\n"
291 "this program is intended to be used as a recipient of another programs output\n"
292 "it launches processes to which the current line can be passed as an argument\n"
293 "using {} for substitution (as in find -exec).\n"
294 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
295 "stdin of child processes. input will be then evenly distributed to jobs,\n"
296 "until EOF is received.\n"
297 "\n"
298 "available options:\n\n"
299 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
300 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
301 "-exec ./mycommand {}\n"
302 "\n"
303 "-skip=XXX\n"
304 " XXX=number of entries to skip\n"
305 "-threads=XXX\n"
306 " XXX=number of parallel processes to spawn\n"
307 "-resume\n"
308 " resume from last jobnumber stored in statefile\n"
309 "-statefile=XXX\n"
310 " XXX=filename\n"
311 " saves last launched jobnumber into a file\n"
312 "-delayedflush\n"
313 " only write to statefile whenever all processes are busy,\n"
314 " and at program end\n"
315 "-delayedspinup=XXX\n"
316 " XXX=maximum amount of milliseconds\n"
317 " ...to wait when spinning up a fresh set of processes\n"
318 " a random value between 0 and the chosen amount is used to delay initial\n"
319 " spinup.\n"
320 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
321 " activity on program startup\n"
322 "-buffered\n"
323 " store the stdout and stderr of launched processes into a temporary file\n"
324 " which will be printed after a process has finished.\n"
325 " this prevents mixing up of output of different processes.\n"
326 "-joinoutput\n"
327 " if -buffered, write both stdout and stderr into the same file.\n"
328 " this saves the chronological order of the output, and the combined output\n"
329 " will only be printed to stdout.\n"
330 "-bulk=XXX\n"
331 " do bulk copies with a buffer of XXX bytes. only usable in pipe mode.\n"
332 " this passes (almost) the entire buffer to the next scheduled job.\n"
333 " the passed buffer will be truncated to the last line break boundary,\n"
334 " so jobs always get entire lines to work with.\n"
335 " this option is useful when you have huge input files and relatively short\n"
336 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
337 " XXX must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
338 " actual memory allocation will be twice the amount passed.\n"
339 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
340 " sets the rlimit of the new created processes.\n"
341 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
342 "-exec command with args\n"
343 " everything past -exec is treated as the command to execute on each line of\n"
344 " stdin received. the line can be passed as an argument using {}.\n"
345 " {.} passes everything before the last dot in a line as an argument.\n"
346 " it is possible to use multiple substitutions inside a single argument,\n"
347 " but currently only of one type.\n"
348 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
349 "\n"
351 return 1;
354 #undef strtoll
355 #define strtoll(a,b,c) strtoint64(a, strlen(a))
356 static int parse_args(int argc, char** argv) {
357 op_state op_b, *op = &op_b;
358 op_init(op, argc, argv);
359 char *op_temp;
360 if(op_hasflag(op, SPL("-help")))
361 return syntax();
363 op_temp = op_get(op, SPL("threads"));
364 long long x = op_temp ? strtoll(op_temp,0,10) : 1;
365 if(x <= 0) die("threadcount must be >= 1\n");
366 prog_state.numthreads = x;
368 op_temp = op_get(op, SPL("statefile"));
369 prog_state.statefile = op_temp;
371 op_temp = op_get(op, SPL("skip"));
372 prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
373 if(op_hasflag(op, SPL("resume"))) {
374 if(!prog_state.statefile) die("-resume needs -statefile\n");
375 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
376 FILE *f = fopen(prog_state.statefile, "r");
377 if(f) {
378 char nb[64];
379 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
380 fclose(f);
385 prog_state.delayedflush = 0;
386 if(op_hasflag(op, SPL("delayedflush"))) {
387 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
388 prog_state.delayedflush = 1;
391 prog_state.pipe_mode = 0;
393 op_temp = op_get(op, SPL("delayedspinup"));
394 prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0;
396 prog_state.cmd_startarg = 0;
397 prog_state.subst_entries = NULL;
399 if(op_hasflag(op, SPL("exec"))) {
400 uint32_t subst_ent;
401 unsigned i, r = 0;
402 for(i = 1; i < (unsigned) argc; i++) {
403 if(str_equal(argv[i], "-exec")) {
404 r = i + 1;
405 break;
408 if(r && r < (unsigned) argc) {
409 prog_state.cmd_startarg = r;
412 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
414 // save entries which must be substituted, to save some cycles.
415 for(i = r; i < (unsigned) argc; i++) {
416 subst_ent = i - r;
417 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) {
418 sblist_add(prog_state.subst_entries, &subst_ent);
421 if(sblist_getsize(prog_state.subst_entries) == 0) {
422 prog_state.pipe_mode = 1;
423 sblist_free(prog_state.subst_entries);
424 prog_state.subst_entries = 0;
428 prog_state.buffered = 0;
429 if(op_hasflag(op, SPL("buffered"))) {
430 prog_state.buffered = 1;
433 prog_state.join_output = 0;
434 if(op_hasflag(op, SPL("joinoutput"))) {
435 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
436 prog_state.join_output = 1;
439 prog_state.bulk_bytes = 0;
440 op_temp = op_get(op, SPL("bulk"));
441 if(op_temp) {
442 SPDECLAREC(value, op_temp);
443 prog_state.bulk_bytes = parse_human_number(value);
444 if(prog_state.bulk_bytes % 4096)
445 die("bulk size must be a multiple of 4096\n");
448 prog_state.limits = NULL;
449 op_temp = op_get(op, SPL("limits"));
450 if(op_temp) {
451 unsigned i;
452 SPDECLAREC(limits, op_temp);
453 stringptrlist* limit_list = stringptr_splitc(limits, ',');
454 stringptrlist* kv;
455 stringptr* key, *value;
456 limit_rec lim;
457 if(stringptrlist_getsize(limit_list)) {
458 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
459 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
460 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
461 if(stringptrlist_getsize(kv) != 2) continue;
462 key = stringptrlist_get(kv, 0);
463 value = stringptrlist_get(kv, 1);
464 if(EQ(key, SPL("mem")))
465 lim.limit = RLIMIT_AS;
466 else if(EQ(key, SPL("cpu")))
467 lim.limit = RLIMIT_CPU;
468 else if(EQ(key, SPL("stack")))
469 lim.limit = RLIMIT_STACK;
470 else if(EQ(key, SPL("fsize")))
471 lim.limit = RLIMIT_FSIZE;
472 else if(EQ(key, SPL("nofiles")))
473 lim.limit = RLIMIT_NOFILE;
474 else
475 die("unknown option passed to -limits");
477 if(getrlimit(lim.limit, &lim.rl) == -1) {
478 perror("getrlimit");
479 die("could not query rlimits");
481 lim.rl.rlim_cur = parse_human_number(value);
482 sblist_add(prog_state.limits, &lim);
483 stringptrlist_free(kv);
485 stringptrlist_free(limit_list);
488 return 0;
491 static void init_queue(void) {
492 unsigned i;
493 job_info ji = {.pid = -1};
495 for(i = 0; i < prog_state.numthreads; i++)
496 sblist_add(prog_state.job_infos, &ji);
499 static void write_statefile(unsigned long long n, const char* tempfile) {
500 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
501 if(fd != -1) {
502 dprintf(fd, "%llu\n", n + 1ULL);
503 close(fd);
504 if(rename(tempfile, prog_state.statefile) == -1)
505 perror("rename");
506 } else
507 perror("open");
510 // returns numbers of substitutions done, -1 on out of buffer.
511 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
512 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
513 size_t i;
514 int ret = 0;
515 for(i = 0; dest_size > 0 && i < source->size; ) {
516 if(stringptr_here(source, i, what)) {
517 if(dest_size < (ssize_t) whit->size) return -1;
518 memcpy(dest, whit->ptr, whit->size);
519 dest += whit->size;
520 dest_size -= whit->size;
521 ret++;
522 i += what->size;
523 } else {
524 *dest = source->ptr[i];
525 dest++;
526 dest_size--;
527 i++;
530 if(!dest_size) return -1;
531 *dest = 0;
532 return ret;
535 static int dispatch_line(char* inbuf, size_t len, char** argv) {
536 char subst_buf[16][4096];
538 stringptr line_b, *line = &line_b;
540 prog_state.lineno++;
541 static unsigned spinup_counter = 0;
544 if(prog_state.skip) {
545 prog_state.skip--;
546 return 1;
548 if(!prog_state.cmd_startarg) {
549 write(1, inbuf, len);
550 return 1;
553 line->ptr = inbuf; line->size = len;
555 if(!prog_state.pipe_mode)
556 stringptr_chomp(line);
558 if(prog_state.subst_entries) {
559 unsigned max_subst = 0;
560 uint32_t* index;
561 sblist_iter(prog_state.subst_entries, index) {
562 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
563 int ret;
564 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
565 if(ret == -1) {
566 too_long:
567 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
568 return 0;
569 } else if(!ret) {
570 char* lastdot = stringptr_rchr(line, '.');
571 stringptr tilLastDot = *line;
572 if(lastdot) tilLastDot.size = lastdot - line->ptr;
573 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
574 if(ret == -1) goto too_long;
576 if(ret) {
577 prog_state.cmd_argv[*index] = subst_buf[max_subst];
578 max_subst++;
584 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
585 msleep(rand() % (prog_state.delayedspinup_interval + 1));
586 spinup_counter++;
589 if(free_slots())
590 launch_job(prog_state.threads_running, prog_state.cmd_argv);
591 else if(!prog_state.pipe_mode)
592 launch_job(reap_child(), prog_state.cmd_argv);
594 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
595 write_statefile(prog_state.lineno, prog_state.temp_state);
598 if(prog_state.pipe_mode)
599 pass_stdin(line);
601 return 1;
604 static char* mystrnchr(const char *in, int ch, size_t end) {
605 const char *e = in+end;
606 const char *p = in;
607 while(p != e && *p != ch) p++;
608 if(*p == ch) return (char*)p;
609 return 0;
611 static char* mystrnrchr(const char *in, int ch, size_t end) {
612 const char *e = in+end-1;
613 const char *p = in;
614 while(p != e && *e != ch) e--;
615 if(*e == ch) return (char*)e;
616 return 0;
619 int main(int argc, char** argv) {
620 unsigned i;
622 char tempdir_buf[256];
624 srand(time(NULL));
626 if(argc > 4096) argc = 4096;
628 prog_state.threads_running = 0;
630 if(parse_args(argc, argv)) return 1;
632 if(prog_state.statefile)
633 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
635 prog_state.tempdir = NULL;
637 if(prog_state.buffered) {
638 prog_state.tempdir = tempdir_buf;
639 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
640 perror("mkdtemp");
641 die("could not create tempdir\n");
643 } else {
644 /* if the stdout/stderr fds are not in O_APPEND mode,
645 the dup()'s of the fds in posix_spawn can cause different
646 file positions, causing the different processes to overwrite each others output.
647 testcase:
648 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
650 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
651 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
654 if(prog_state.cmd_startarg) {
655 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
656 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
658 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
661 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
662 init_queue();
664 prog_state.lineno = 0;
666 size_t left = 0;
667 const size_t chunksize = prog_state.bulk_bytes ? prog_state.bulk_bytes : 16*1024;
669 char *mem = mmap(NULL, chunksize*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
670 char *buf1 = mem;
671 char *buf2 = mem+chunksize;
672 char *in, *inbuf;
674 int exitcode = 1;
676 while(1) {
677 inbuf = buf1+chunksize-left;
678 memcpy(inbuf, buf2+chunksize-left, left);
679 ssize_t n = read(0, buf2, chunksize);
680 if(n == -1) {
681 perror("read");
682 goto out;
684 left += n;
685 in = inbuf;
686 while(left) {
687 char *p;
688 if(prog_state.pipe_mode && prog_state.bulk_bytes)
689 p = mystrnrchr(in, '\n', left);
690 else
691 p = mystrnchr (in, '\n', left);
693 if(!p) break;
694 ptrdiff_t diff = (p - in) + 1;
695 if(!dispatch_line(in, diff, argv))
696 goto out;
697 left -= diff;
698 in += diff;
700 if(!n) {
701 if(left) dispatch_line(in, left, argv);
702 break;
704 if(left > chunksize) {
705 dprintf(2, "error: input line length exceeds buffer size\n");
706 goto out;
710 exitcode = 0;
712 out:
714 if(prog_state.pipe_mode) {
715 close_pipes();
718 if(prog_state.delayedflush)
719 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
721 while(prog_state.threads_running) reap_child();
723 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
724 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
725 if(prog_state.limits) sblist_free(prog_state.limits);
727 if(prog_state.tempdir)
728 rmdir(prog_state.tempdir);
731 fflush(stdout);
732 fflush(stderr);
735 return exitcode;