2 Copyright (C) 2012,2014,2016,2017,2018 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.2.4"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
25 #define _XOPEN_SOURCE 700
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/sblist.h"
32 #include "../lib/include/strlib.h"
33 #include "../lib/include/macros.h"
46 /* process handling */
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid
, ...) {
61 dprintf(2, "prlimit() not implemented on this system\n");
69 /* some small helper funcs from libulz */
71 static int msleep(long millisecs
) {
72 struct timespec req
, rem
;
73 req
.tv_sec
= millisecs
/ 1000;
74 req
.tv_nsec
= (millisecs
% 1000) * 1000 * 1000;
76 while((ret
= nanosleep(&req
, &rem
)) == -1 && errno
== EINTR
) req
= rem
;
80 static const char ulz_conv_cypher
[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
81 static const size_t ulz_conv_cypher_len
= sizeof(ulz_conv_cypher
) - 1;
82 static char* ulz_mkdtemp(char* templ
) {
83 size_t i
, l
= strlen(templ
);
89 for(i
= l
- 6; i
< l
; i
++) templ
[i
] = ulz_conv_cypher
[rand() % ulz_conv_cypher_len
];
90 if(mkdir(templ
, S_IRWXU
) == -1) {
91 if(errno
== EEXIST
) goto loop
;
97 static size_t gen_fn(char* buf
, const char* prefix
, size_t pl
, const char* tmpdir
) {
98 size_t tl
= strlen(tmpdir
);
100 memcpy(buf
+a
, tmpdir
, tl
);
102 memcpy(buf
+a
,prefix
,pl
);
104 memcpy(buf
+a
,"XXXXXX", 7);
108 /* calls mkdtemp on /dev/shm and on failure on /tmp, to get the fastest possible
109 * storage. returns size of the string returned in buffer */
110 static size_t mktempdir(const char* prefix
, char* buffer
, size_t bufsize
) {
111 size_t ret
, pl
= strlen(prefix
);
112 if(bufsize
< sizeof("/dev/shm/") -1 + pl
+ sizeof("XXXXXX")) return 0;
113 ret
= gen_fn(buffer
, prefix
, pl
, "/dev/shm/");
114 if(!ulz_mkdtemp(buffer
)) {
115 ret
= gen_fn(buffer
, prefix
, pl
, "/tmp/");
116 if(!ulz_mkdtemp(buffer
)) return 0;
125 posix_spawn_file_actions_t fa
;
134 char temp_state
[256];
135 char* cmd_argv
[4096];
136 unsigned long long lineno
;
138 unsigned threads_running
;
141 unsigned long long skip
;
143 sblist
* subst_entries
;
145 unsigned cmd_startarg
;
147 int delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
148 the top value in ms can be supplied via a command line switch.
149 this option makes only sense if the interval is somewhat smaller than the
150 expected runtime of the average job.
151 this option is useful to not overload a network app due to hundreds of
152 parallel connection tries on startup.
154 int buffered
:1; /* write stdout and stderr of each task into a file,
155 and print it to stdout once the process ends.
156 this prevents mixing up of the output of multiple tasks. */
157 int delayedflush
:1; /* only write to statefile whenever all processes are busy, and at program end.
158 this means faster program execution, but could also be imprecise if the number of
159 jobs is small or smaller than the available threadcount. */
160 int join_output
:1; /* join stdout and stderr of launched jobs into stdout */
165 prog_state_s prog_state
;
168 extern char** environ
;
170 int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
171 int ret
= snprintf(buf
, bufsize
, "%s/jd_proc_%.5lu_std%s.log",
172 prog_state
.tempdir
, (unsigned long) jobindex
, is_stderr
? "err" : "out");
173 return ret
> 0 && (size_t) ret
< bufsize
;
176 void launch_job(size_t jobindex
, char** argv
) {
177 char stdout_filename_buf
[256];
178 char stderr_filename_buf
[256];
179 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
181 if(job
->pid
!= -1) return;
183 if(prog_state
.buffered
) {
184 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
185 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
186 dprintf(2, "temp filename too long!\n");
191 errno
= posix_spawn_file_actions_init(&job
->fa
);
192 if(errno
) goto spawn_error
;
194 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
195 if(errno
) goto spawn_error
;
198 if(prog_state
.pipe_mode
) {
203 job
->pipe
= pipes
[1];
204 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, pipes
[0], 0);
205 if(errno
) goto spawn_error
;
206 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[0]);
207 if(errno
) goto spawn_error
;
208 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[1]);
209 if(errno
) goto spawn_error
;
212 if(prog_state
.buffered
) {
213 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
214 if(errno
) goto spawn_error
;
215 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
216 if(errno
) goto spawn_error
;
219 if(!prog_state
.pipe_mode
) {
220 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
221 if(errno
) goto spawn_error
;
224 if(prog_state
.buffered
) {
225 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
226 if(errno
) goto spawn_error
;
227 if(prog_state
.join_output
)
228 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
230 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
231 if(errno
) goto spawn_error
;
234 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
238 perror("posix_spawn");
240 prog_state
.threads_running
++;
241 if(prog_state
.limits
) {
243 sblist_iter(prog_state
.limits
, limit
) {
244 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
249 if(prog_state
.pipe_mode
)
253 static void dump_output(size_t job_id
, int is_stderr
) {
254 char out_filename_buf
[256];
256 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
259 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
261 dst
= fopen(out_filename_buf
, "r");
263 while((nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
264 fwrite(buf
, 1, nread
, out_stream
);
265 if(nread
< sizeof(buf
)) break;
269 unlink(out_filename_buf
);
273 static void write_all(int fd
, void* buf
, size_t size
) {
278 ssize_t n
= write(fd
, p
, left
);
281 if(errno
== EINTR
) continue;
293 static void pass_stdin(char *line
, size_t len
) {
294 static size_t next_child
= 0;
295 if(next_child
>= sblist_getsize(prog_state
.job_infos
))
297 job_info
*job
= sblist_get(prog_state
.job_infos
, next_child
);
298 write_all(job
->pipe
, line
, len
);
302 static void close_pipes(void) {
304 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
305 job_info
*job
= sblist_get(prog_state
.job_infos
, i
);
310 /* wait till a child exits, reap it, and return its job index for slot reuse */
311 static size_t reap_child(void) {
316 do ret
= waitpid(-1, &retval
, 0);
317 while(ret
== -1 || !WIFEXITED(retval
));
319 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
320 job
= sblist_get(prog_state
.job_infos
, i
);
321 if(job
->pid
== ret
) {
323 posix_spawn_file_actions_destroy(&job
->fa
);
324 prog_state
.threads_running
--;
325 if(prog_state
.buffered
) {
327 if(!prog_state
.join_output
)
337 static size_t free_slots(void) {
338 return prog_state
.numthreads
- prog_state
.threads_running
;
341 __attribute__((noreturn
))
342 static void die(const char* msg
) {
347 static unsigned long parse_human_number(const char* num
) {
348 unsigned long ret
= 0;
349 static const unsigned long mul
[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
350 const char* kmg
= "KMG";
351 const char* kmgind
, *p
;
354 while(isdigit(*(++p
)));
355 if(*p
&& (kmgind
= strchr(kmg
, *p
)))
356 ret
*= mul
[kmgind
- kmg
];
360 static int syntax(void) {
362 "jobflow " VERSION
" (C) rofl0r\n"
363 "------------------------\n"
364 "this program is intended to be used as a recipient of another programs output\n"
365 "it launches processes to which the current line can be passed as an argument\n"
366 "using {} for substitution (as in find -exec).\n"
367 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
368 "stdin of child processes. input will be then evenly distributed to jobs,\n"
369 "until EOF is received. we call this 'pipe mode'.\n"
371 "available options:\n\n"
372 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
373 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
375 "-exec ./mycommand {}\n"
378 " XXX=number of entries to skip\n"
379 "-threads=XXX (alternative: -j=XXX)\n"
380 " XXX=number of parallel processes to spawn\n"
382 " resume from last jobnumber stored in statefile\n"
384 " use XXX as the EOF marker on stdin\n"
385 " if the marker is encountered, behave as if stdin was closed\n"
386 " not compatible with pipe/bulk mode\n"
389 " saves last launched jobnumber into a file\n"
391 " only write to statefile whenever all processes are busy,\n"
392 " and at program end\n"
393 "-delayedspinup=XXX\n"
394 " XXX=maximum amount of milliseconds\n"
395 " ...to wait when spinning up a fresh set of processes\n"
396 " a random value between 0 and the chosen amount is used to delay initial\n"
398 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
399 " activity on program startup\n"
401 " store the stdout and stderr of launched processes into a temporary file\n"
402 " which will be printed after a process has finished.\n"
403 " this prevents mixing up of output of different processes.\n"
405 " if -buffered, write both stdout and stderr into the same file.\n"
406 " this saves the chronological order of the output, and the combined output\n"
407 " will only be printed to stdout.\n"
409 " do bulk copies with a buffer of XXX bytes. only usable in pipe mode.\n"
410 " this passes (almost) the entire buffer to the next scheduled job.\n"
411 " the passed buffer will be truncated to the last line break boundary,\n"
412 " so jobs always get entire lines to work with.\n"
413 " this option is useful when you have huge input files and relatively short\n"
414 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
415 " XXX must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
416 " actual memory allocation will be twice the amount passed.\n"
417 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
418 " than that probably doesn't make sense.\n"
419 " if no size is passed (i.e. only -bulk), a default of 4K will be used.\n"
420 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
421 " sets the rlimit of the new created processes.\n"
422 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
423 "-exec command with args\n"
424 " everything past -exec is treated as the command to execute on each line of\n"
425 " stdin received. the line can be passed as an argument using {}.\n"
426 " {.} passes everything before the last dot in a line as an argument.\n"
427 " it is possible to use multiple substitutions inside a single argument,\n"
428 " but currently only of one type.\n"
429 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
436 #define strtoll(a,b,c) strtoint64(a, strlen(a))
437 static int parse_args(int argc
, char** argv
) {
438 op_state op_b
, *op
= &op_b
;
439 op_init(op
, argc
, argv
);
441 if(op_hasflag(op
, SPL("help")))
444 op_temp
= op_get(op
, SPL("threads"));
445 if(!op_temp
) op_temp
= op_get(op
, SPL("j"));
446 long long x
= op_temp
? strtoll(op_temp
,0,10) : 1;
447 if(x
<= 0) die("threadcount must be >= 1\n");
448 prog_state
.numthreads
= x
;
450 op_temp
= op_get(op
, SPL("statefile"));
451 prog_state
.statefile
= op_temp
;
453 op_temp
= op_get(op
, SPL("eof"));
454 prog_state
.eof_marker
= op_temp
;
456 op_temp
= op_get(op
, SPL("skip"));
457 prog_state
.skip
= op_temp
? strtoll(op_temp
,0,10) : 0;
458 if(op_hasflag(op
, SPL("resume"))) {
459 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
460 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
461 FILE *f
= fopen(prog_state
.statefile
, "r");
464 if(fgets(nb
, sizeof nb
, f
)) prog_state
.skip
= strtoll(nb
,0,10);
470 prog_state
.delayedflush
= 0;
471 if(op_hasflag(op
, SPL("delayedflush"))) {
472 if(!prog_state
.statefile
) die("-delayedflush needs -statefile\n");
473 prog_state
.delayedflush
= 1;
476 prog_state
.pipe_mode
= 0;
478 op_temp
= op_get(op
, SPL("delayedspinup"));
479 prog_state
.delayedspinup_interval
= op_temp
? strtoll(op_temp
,0,10) : 0;
481 prog_state
.cmd_startarg
= 0;
482 prog_state
.subst_entries
= NULL
;
484 if(op_hasflag(op
, SPL("exec"))) {
487 for(i
= 1; i
< (unsigned) argc
; i
++) {
488 if(str_equal(argv
[i
], "-exec") || str_equal(argv
[i
], "--exec")) {
493 if(r
&& r
< (unsigned) argc
) {
494 prog_state
.cmd_startarg
= r
;
497 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
499 // save entries which must be substituted, to save some cycles.
500 for(i
= r
; i
< (unsigned) argc
; i
++) {
502 if(strstr(argv
[i
], "{}") || strstr(argv
[i
], "{.}")) {
503 sblist_add(prog_state
.subst_entries
, &subst_ent
);
506 if(sblist_getsize(prog_state
.subst_entries
) == 0) {
507 prog_state
.pipe_mode
= 1;
508 sblist_free(prog_state
.subst_entries
);
509 prog_state
.subst_entries
= 0;
513 prog_state
.buffered
= 0;
514 if(op_hasflag(op
, SPL("buffered"))) {
515 prog_state
.buffered
= 1;
518 prog_state
.join_output
= 0;
519 if(op_hasflag(op
, SPL("joinoutput"))) {
520 if(!prog_state
.buffered
) die("-joinoutput needs -buffered\n");
521 prog_state
.join_output
= 1;
524 prog_state
.bulk_bytes
= 0;
525 op_temp
= op_get(op
, SPL("bulk"));
527 prog_state
.bulk_bytes
= parse_human_number(op_temp
);
528 if(prog_state
.bulk_bytes
% 4096)
529 die("bulk size must be a multiple of 4096\n");
530 } else if(op_hasflag(op
, SPL("bulk")))
531 prog_state
.bulk_bytes
= 4096;
533 prog_state
.limits
= NULL
;
534 op_temp
= op_get(op
, SPL("limits"));
537 char *limits
= op_temp
;
539 limits
+= strspn(limits
, ",");
540 size_t l
= strcspn(limits
, ",");
542 size_t l2
= strcspn(limits
, "=");
543 if(l2
>= l
) die("syntax error in limits argument");
545 if(!prog_state
.limits
)
546 prog_state
.limits
= sblist_new(sizeof(limit_rec
), 4);
547 static const struct { int lim_val
; const char lim_name
[8]; } lim_tab
[] = {
548 { RLIMIT_AS
, "mem" },
549 { RLIMIT_CPU
, "cpu" },
550 { RLIMIT_STACK
, "stack" },
551 { RLIMIT_FSIZE
, "fsize" },
552 { RLIMIT_NOFILE
, "nofiles" },
554 for(i
=0; i
<ARRAY_SIZE(lim_tab
);++i
)
555 if(!strncmp(limits
, lim_tab
[i
].lim_name
, l2
)) {
556 lim
.limit
= lim_tab
[i
].lim_val
;
559 if(i
>= ARRAY_SIZE(lim_tab
))
560 die("unknown option passed to -limits");
561 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
563 die("could not query rlimits");
565 lim
.rl
.rlim_cur
= parse_human_number(limits
+l2
+1);
566 sblist_add(prog_state
.limits
, &lim
);
573 static void init_queue(void) {
575 job_info ji
= {.pid
= -1};
577 for(i
= 0; i
< prog_state
.numthreads
; i
++)
578 sblist_add(prog_state
.job_infos
, &ji
);
581 static void write_statefile(unsigned long long n
, const char* tempfile
) {
582 int fd
= open(tempfile
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
584 dprintf(fd
, "%llu\n", n
+ 1ULL);
586 if(rename(tempfile
, prog_state
.statefile
) == -1)
592 // returns numbers of substitutions done, -1 on out of buffer.
593 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
594 int substitute_all(char* dest
, ssize_t dest_size
, stringptr
* source
, stringptr
* what
, stringptr
* whit
) {
597 for(i
= 0; dest_size
> 0 && i
< source
->size
; ) {
598 if(stringptr_here(source
, i
, what
)) {
599 if(dest_size
< (ssize_t
) whit
->size
) return -1;
600 memcpy(dest
, whit
->ptr
, whit
->size
);
602 dest_size
-= whit
->size
;
606 *dest
= source
->ptr
[i
];
612 if(!dest_size
) return -1;
617 static char* mystrnchr(const char *in
, int ch
, size_t end
) {
618 const char *e
= in
+end
;
620 while(p
!= e
&& *p
!= ch
) p
++;
621 if(p
!= e
) return (char*)p
;
624 static char* mystrnrchr(const char *in
, int ch
, size_t end
) {
625 const char *e
= in
+end
-1;
627 while(p
!= e
&& *e
!= ch
) e
--;
628 if(*e
== ch
) return (char*)e
;
631 static char* mystrnrchr_chk(const char *in
, int ch
, size_t end
) {
633 return mystrnrchr(in
, ch
, end
);
636 static int need_linecounter(void) {
637 return !!prog_state
.skip
|| prog_state
.statefile
;
639 static size_t count_linefeeds(const char *buf
, size_t len
) {
640 const char *p
= buf
, *e
= buf
+len
;
643 if(*p
== '\n') cnt
++;
649 static int match_eof(char* inbuf
, size_t len
) {
650 if(!prog_state
.eof_marker
) return 0;
651 size_t l
= strlen(prog_state
.eof_marker
);
652 return l
== len
-1 && !memcmp(prog_state
.eof_marker
, inbuf
, l
);
655 static inline int islb(int p
) { return p
== '\n' || p
== '\r'; }
656 static void chomp(char *s
, size_t *len
) {
657 while(*len
&& islb(s
[*len
-1])) s
[--(*len
)] = 0;
660 #define MAX_SUBSTS 16
661 static int dispatch_line(char* inbuf
, size_t len
, char** argv
) {
662 char subst_buf
[MAX_SUBSTS
][4096];
663 static unsigned spinup_counter
= 0;
665 stringptr line_b
, *line
= &line_b
;
667 if(!prog_state
.bulk_bytes
)
669 else if(need_linecounter()) {
670 prog_state
.lineno
+= count_linefeeds(inbuf
, len
);
673 if(prog_state
.skip
) {
674 if(!prog_state
.bulk_bytes
) {
678 while(len
&& prog_state
.skip
) {
679 char *q
= mystrnchr(inbuf
, '\n', len
);
681 ptrdiff_t diff
= (q
- inbuf
) + 1;
692 if(!prog_state
.cmd_startarg
) {
693 write_all(1, inbuf
, len
);
697 if(!prog_state
.pipe_mode
)
700 line
->ptr
= inbuf
; line
->size
= len
;
702 if(prog_state
.subst_entries
) {
703 unsigned max_subst
= 0;
705 sblist_iter(prog_state
.subst_entries
, index
) {
706 if(max_subst
>= MAX_SUBSTS
) break;
707 SPDECLAREC(source
, argv
[*index
+ prog_state
.cmd_startarg
]);
709 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{}"), line
);
712 dprintf(2, "fatal: line too long for substitution: %s\n", line
->ptr
);
715 char* lastdot
= mystrnrchr_chk(line
->ptr
, '.', line
->size
);
716 stringptr tilLastDot
= *line
;
717 if(lastdot
) tilLastDot
.size
= lastdot
- line
->ptr
;
718 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{.}"), &tilLastDot
);
719 if(ret
== -1) goto too_long
;
722 prog_state
.cmd_argv
[*index
] = subst_buf
[max_subst
];
729 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
730 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
735 launch_job(prog_state
.threads_running
, prog_state
.cmd_argv
);
736 else if(!prog_state
.pipe_mode
)
737 launch_job(reap_child(), prog_state
.cmd_argv
);
739 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || free_slots() == 0)) {
740 write_statefile(prog_state
.lineno
, prog_state
.temp_state
);
743 if(prog_state
.pipe_mode
)
744 pass_stdin(line
->ptr
, line
->size
);
749 int main(int argc
, char** argv
) {
752 char tempdir_buf
[256];
756 if(argc
> 4096) argc
= 4096;
758 prog_state
.threads_running
= 0;
760 if(parse_args(argc
, argv
)) return 1;
762 if(prog_state
.statefile
)
763 snprintf(prog_state
.temp_state
, sizeof(prog_state
.temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
765 prog_state
.tempdir
= NULL
;
767 if(prog_state
.buffered
) {
768 prog_state
.tempdir
= tempdir_buf
;
769 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
771 die("could not create tempdir\n");
774 /* if the stdout/stderr fds are not in O_APPEND mode,
775 the dup()'s of the fds in posix_spawn can cause different
776 file positions, causing the different processes to overwrite each others output.
778 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
780 if(fcntl(1, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
781 if(fcntl(2, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
784 if(prog_state
.cmd_startarg
) {
785 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
786 prog_state
.cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
788 prog_state
.cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
791 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
794 prog_state
.lineno
= 0;
796 size_t left
= 0, bytes_read
= 0;
797 const size_t chunksize
= prog_state
.bulk_bytes
? prog_state
.bulk_bytes
: 16*1024;
799 char *mem
= mmap(NULL
, chunksize
*2, PROT_READ
| PROT_WRITE
, MAP_PRIVATE
| MAP_ANON
, -1, 0);
801 char *buf2
= mem
+chunksize
;
807 inbuf
= buf1
+chunksize
-left
;
808 memcpy(inbuf
, buf2
+bytes_read
-left
, left
);
809 ssize_t n
= read(0, buf2
, chunksize
);
819 if(prog_state
.pipe_mode
&& prog_state
.bulk_bytes
)
820 p
= mystrnrchr(in
, '\n', left
);
822 p
= mystrnchr (in
, '\n', left
);
825 ptrdiff_t diff
= (p
- in
) + 1;
826 if(match_eof(in
, diff
)) {
830 if(!dispatch_line(in
, diff
, argv
))
836 if(left
&& !match_eof(in
, left
)) dispatch_line(in
, left
, argv
);
839 if(left
> chunksize
) {
840 dprintf(2, "error: input line length exceeds buffer size\n");
849 if(prog_state
.pipe_mode
) {
853 if(prog_state
.delayedflush
)
854 write_statefile(prog_state
.lineno
- 1, prog_state
.temp_state
);
856 while(prog_state
.threads_running
) reap_child();
858 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
859 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
860 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
862 if(prog_state
.tempdir
)
863 rmdir(prog_state
.tempdir
);