2 Copyright (C) 2012,2014,2016,2017,2018 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.2.4"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
25 #define _XOPEN_SOURCE 700
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/sblist.h"
32 #include "../lib/include/strlib.h"
33 #include "../lib/include/macros.h"
46 /* process handling */
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid
, ...) {
61 dprintf(2, "prlimit() not implemented on this system\n");
69 /* some small helper funcs from libulz */
71 static int msleep(long millisecs
) {
72 struct timespec req
, rem
;
73 req
.tv_sec
= millisecs
/ 1000;
74 req
.tv_nsec
= (millisecs
% 1000) * 1000 * 1000;
76 while((ret
= nanosleep(&req
, &rem
)) == -1 && errno
== EINTR
) req
= rem
;
80 static const char ulz_conv_cypher
[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
81 static const size_t ulz_conv_cypher_len
= sizeof(ulz_conv_cypher
) - 1;
82 static char* ulz_mkdtemp(char* templ
) {
83 size_t i
, l
= strlen(templ
);
89 for(i
= l
- 6; i
< l
; i
++) templ
[i
] = ulz_conv_cypher
[rand() % ulz_conv_cypher_len
];
90 if(mkdir(templ
, S_IRWXU
) == -1) {
91 if(errno
== EEXIST
) goto loop
;
97 static size_t gen_fn(char* buf
, const char* prefix
, size_t pl
, const char* tmpdir
) {
98 size_t tl
= strlen(tmpdir
);
100 memcpy(buf
+a
, tmpdir
, tl
);
102 memcpy(buf
+a
,prefix
,pl
);
104 memcpy(buf
+a
,"XXXXXX", 7);
108 /* calls mkdtemp on /dev/shm and on failure on /tmp, to get the fastest possible
109 * storage. returns size of the string returned in buffer */
110 static size_t mktempdir(const char* prefix
, char* buffer
, size_t bufsize
) {
111 size_t ret
, pl
= strlen(prefix
);
112 if(bufsize
< sizeof("/dev/shm/") -1 + pl
+ sizeof("XXXXXX")) return 0;
113 ret
= gen_fn(buffer
, prefix
, pl
, "/dev/shm/");
114 if(!ulz_mkdtemp(buffer
)) {
115 ret
= gen_fn(buffer
, prefix
, pl
, "/tmp/");
116 if(!ulz_mkdtemp(buffer
)) return 0;
125 posix_spawn_file_actions_t fa
;
134 char temp_state
[256];
135 char* cmd_argv
[4096];
136 unsigned long long lineno
;
138 unsigned threads_running
;
141 unsigned long long skip
;
143 sblist
* subst_entries
;
145 unsigned cmd_startarg
;
147 int delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
148 the top value in ms can be supplied via a command line switch.
149 this option makes only sense if the interval is somewhat smaller than the
150 expected runtime of the average job.
151 this option is useful to not overload a network app due to hundreds of
152 parallel connection tries on startup.
154 int buffered
:1; /* write stdout and stderr of each task into a file,
155 and print it to stdout once the process ends.
156 this prevents mixing up of the output of multiple tasks. */
157 int delayedflush
:1; /* only write to statefile whenever all processes are busy, and at program end.
158 this means faster program execution, but could also be imprecise if the number of
159 jobs is small or smaller than the available threadcount. */
160 int join_output
:1; /* join stdout and stderr of launched jobs into stdout */
165 prog_state_s prog_state
;
168 extern char** environ
;
170 int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
171 int ret
= snprintf(buf
, bufsize
, "%s/jd_proc_%.5lu_std%s.log",
172 prog_state
.tempdir
, (unsigned long) jobindex
, is_stderr
? "err" : "out");
173 return ret
> 0 && (size_t) ret
< bufsize
;
176 void launch_job(size_t jobindex
, char** argv
) {
177 char stdout_filename_buf
[256];
178 char stderr_filename_buf
[256];
179 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
181 if(job
->pid
!= -1) return;
183 if(prog_state
.buffered
) {
184 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
185 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
186 dprintf(2, "temp filename too long!\n");
191 errno
= posix_spawn_file_actions_init(&job
->fa
);
192 if(errno
) goto spawn_error
;
194 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
195 if(errno
) goto spawn_error
;
198 if(prog_state
.pipe_mode
) {
203 job
->pipe
= pipes
[1];
204 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, pipes
[0], 0);
205 if(errno
) goto spawn_error
;
206 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[0]);
207 if(errno
) goto spawn_error
;
208 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[1]);
209 if(errno
) goto spawn_error
;
212 if(prog_state
.buffered
) {
213 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
214 if(errno
) goto spawn_error
;
215 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
216 if(errno
) goto spawn_error
;
219 if(!prog_state
.pipe_mode
) {
220 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
221 if(errno
) goto spawn_error
;
224 if(prog_state
.buffered
) {
225 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
226 if(errno
) goto spawn_error
;
227 if(prog_state
.join_output
)
228 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
230 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
231 if(errno
) goto spawn_error
;
234 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
238 perror("posix_spawn");
240 prog_state
.threads_running
++;
241 if(prog_state
.limits
) {
243 sblist_iter(prog_state
.limits
, limit
) {
244 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
249 if(prog_state
.pipe_mode
)
253 static void dump_output(size_t job_id
, int is_stderr
) {
254 char out_filename_buf
[256];
256 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
259 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
261 dst
= fopen(out_filename_buf
, "r");
263 while((nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
264 fwrite(buf
, 1, nread
, out_stream
);
265 if(nread
< sizeof(buf
)) break;
269 unlink(out_filename_buf
);
273 static void write_all(int fd
, void* buf
, size_t size
) {
278 ssize_t n
= write(fd
, p
, left
);
281 if(errno
== EINTR
) continue;
293 static void pass_stdin(char *line
, size_t len
) {
294 static size_t next_child
= 0;
295 if(next_child
>= sblist_getsize(prog_state
.job_infos
))
297 job_info
*job
= sblist_get(prog_state
.job_infos
, next_child
);
298 write_all(job
->pipe
, line
, len
);
302 static void close_pipes(void) {
304 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
305 job_info
*job
= sblist_get(prog_state
.job_infos
, i
);
310 /* wait till a child exits, reap it, and return its job index for slot reuse */
311 static size_t reap_child(void) {
316 do ret
= waitpid(-1, &retval
, 0);
317 while(ret
== -1 || !WIFEXITED(retval
));
319 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
320 job
= sblist_get(prog_state
.job_infos
, i
);
321 if(job
->pid
== ret
) {
323 posix_spawn_file_actions_destroy(&job
->fa
);
324 prog_state
.threads_running
--;
325 if(prog_state
.buffered
) {
327 if(!prog_state
.join_output
)
337 static size_t free_slots(void) {
338 return prog_state
.numthreads
- prog_state
.threads_running
;
341 __attribute__((noreturn
))
342 static void die(const char* msg
) {
347 static unsigned long parse_human_number(const char* num
) {
348 unsigned long ret
= 0;
349 static const unsigned long mul
[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
350 const char* kmg
= "KMG";
351 const char* kmgind
, *p
;
354 while(isdigit(*(++p
)));
355 if(*p
&& (kmgind
= strchr(kmg
, *p
)))
356 ret
*= mul
[kmgind
- kmg
];
360 static int syntax(void) {
362 "jobflow " VERSION
" (C) rofl0r\n"
363 "------------------------\n"
364 "this program is intended to be used as a recipient of another programs output\n"
365 "it launches processes to which the current line can be passed as an argument\n"
366 "using {} for substitution (as in find -exec).\n"
367 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
368 "stdin of child processes. input will be then evenly distributed to jobs,\n"
369 "until EOF is received. we call this 'pipe mode'.\n"
371 "available options:\n\n"
372 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
373 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
375 "-exec ./mycommand {}\n"
378 " XXX=number of entries to skip\n"
379 "-threads=XXX (alternative: -j=XXX)\n"
380 " XXX=number of parallel processes to spawn\n"
382 " resume from last jobnumber stored in statefile\n"
384 " use XXX as the EOF marker on stdin\n"
385 " if the marker is encountered, behave as if stdin was closed\n"
386 " not compatible with pipe/bulk mode\n"
389 " saves last launched jobnumber into a file\n"
391 " only write to statefile whenever all processes are busy,\n"
392 " and at program end\n"
393 "-delayedspinup=XXX\n"
394 " XXX=maximum amount of milliseconds\n"
395 " ...to wait when spinning up a fresh set of processes\n"
396 " a random value between 0 and the chosen amount is used to delay initial\n"
398 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
399 " activity on program startup\n"
401 " store the stdout and stderr of launched processes into a temporary file\n"
402 " which will be printed after a process has finished.\n"
403 " this prevents mixing up of output of different processes.\n"
405 " if -buffered, write both stdout and stderr into the same file.\n"
406 " this saves the chronological order of the output, and the combined output\n"
407 " will only be printed to stdout.\n"
409 " do bulk copies with a buffer of XXX bytes. only usable in pipe mode.\n"
410 " this passes (almost) the entire buffer to the next scheduled job.\n"
411 " the passed buffer will be truncated to the last line break boundary,\n"
412 " so jobs always get entire lines to work with.\n"
413 " this option is useful when you have huge input files and relatively short\n"
414 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
415 " XXX must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
416 " actual memory allocation will be twice the amount passed.\n"
417 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
418 " than that probably doesn't make sense.\n"
419 " if no size is passed (i.e. only -bulk), a default of 4K will be used.\n"
420 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
421 " sets the rlimit of the new created processes.\n"
422 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
423 "-exec command with args\n"
424 " everything past -exec is treated as the command to execute on each line of\n"
425 " stdin received. the line can be passed as an argument using {}.\n"
426 " {.} passes everything before the last dot in a line as an argument.\n"
427 " it is possible to use multiple substitutions inside a single argument,\n"
428 " but currently only of one type.\n"
429 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
435 static int parse_args(int argc
, char** argv
) {
436 op_state op_b
, *op
= &op_b
;
437 op_init(op
, argc
, argv
);
439 if(op_hasflag(op
, SPL("help")))
442 op_temp
= op_get(op
, SPL("threads"));
443 if(!op_temp
) op_temp
= op_get(op
, SPL("j"));
444 long long x
= op_temp
? strtoll(op_temp
,0,10) : 1;
445 if(x
<= 0) die("threadcount must be >= 1\n");
446 prog_state
.numthreads
= x
;
448 op_temp
= op_get(op
, SPL("statefile"));
449 prog_state
.statefile
= op_temp
;
451 op_temp
= op_get(op
, SPL("eof"));
452 prog_state
.eof_marker
= op_temp
;
454 op_temp
= op_get(op
, SPL("skip"));
455 prog_state
.skip
= op_temp
? strtoll(op_temp
,0,10) : 0;
456 if(op_hasflag(op
, SPL("resume"))) {
457 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
458 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
459 FILE *f
= fopen(prog_state
.statefile
, "r");
462 if(fgets(nb
, sizeof nb
, f
)) prog_state
.skip
= strtoll(nb
,0,10);
468 prog_state
.delayedflush
= 0;
469 if(op_hasflag(op
, SPL("delayedflush"))) {
470 if(!prog_state
.statefile
) die("-delayedflush needs -statefile\n");
471 prog_state
.delayedflush
= 1;
474 prog_state
.pipe_mode
= 0;
476 op_temp
= op_get(op
, SPL("delayedspinup"));
477 prog_state
.delayedspinup_interval
= op_temp
? strtoll(op_temp
,0,10) : 0;
479 prog_state
.cmd_startarg
= 0;
480 prog_state
.subst_entries
= NULL
;
482 if(op_hasflag(op
, SPL("exec"))) {
485 for(i
= 1; i
< (unsigned) argc
; i
++) {
486 if(str_equal(argv
[i
], "-exec") || str_equal(argv
[i
], "--exec")) {
491 if(r
&& r
< (unsigned) argc
) {
492 prog_state
.cmd_startarg
= r
;
495 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
497 // save entries which must be substituted, to save some cycles.
498 for(i
= r
; i
< (unsigned) argc
; i
++) {
500 if(strstr(argv
[i
], "{}") || strstr(argv
[i
], "{.}")) {
501 sblist_add(prog_state
.subst_entries
, &subst_ent
);
504 if(sblist_getsize(prog_state
.subst_entries
) == 0) {
505 prog_state
.pipe_mode
= 1;
506 sblist_free(prog_state
.subst_entries
);
507 prog_state
.subst_entries
= 0;
511 prog_state
.buffered
= 0;
512 if(op_hasflag(op
, SPL("buffered"))) {
513 prog_state
.buffered
= 1;
516 prog_state
.join_output
= 0;
517 if(op_hasflag(op
, SPL("joinoutput"))) {
518 if(!prog_state
.buffered
) die("-joinoutput needs -buffered\n");
519 prog_state
.join_output
= 1;
522 prog_state
.bulk_bytes
= 0;
523 op_temp
= op_get(op
, SPL("bulk"));
525 prog_state
.bulk_bytes
= parse_human_number(op_temp
);
526 if(prog_state
.bulk_bytes
% 4096)
527 die("bulk size must be a multiple of 4096\n");
528 } else if(op_hasflag(op
, SPL("bulk")))
529 prog_state
.bulk_bytes
= 4096;
531 prog_state
.limits
= NULL
;
532 op_temp
= op_get(op
, SPL("limits"));
535 char *limits
= op_temp
;
537 limits
+= strspn(limits
, ",");
538 size_t l
= strcspn(limits
, ",");
540 size_t l2
= strcspn(limits
, "=");
541 if(l2
>= l
) die("syntax error in limits argument");
543 if(!prog_state
.limits
)
544 prog_state
.limits
= sblist_new(sizeof(limit_rec
), 4);
545 static const struct { int lim_val
; const char lim_name
[8]; } lim_tab
[] = {
546 { RLIMIT_AS
, "mem" },
547 { RLIMIT_CPU
, "cpu" },
548 { RLIMIT_STACK
, "stack" },
549 { RLIMIT_FSIZE
, "fsize" },
550 { RLIMIT_NOFILE
, "nofiles" },
552 for(i
=0; i
<ARRAY_SIZE(lim_tab
);++i
)
553 if(!strncmp(limits
, lim_tab
[i
].lim_name
, l2
)) {
554 lim
.limit
= lim_tab
[i
].lim_val
;
557 if(i
>= ARRAY_SIZE(lim_tab
))
558 die("unknown option passed to -limits");
559 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
561 die("could not query rlimits");
563 lim
.rl
.rlim_cur
= parse_human_number(limits
+l2
+1);
564 sblist_add(prog_state
.limits
, &lim
);
571 static void init_queue(void) {
573 job_info ji
= {.pid
= -1};
575 for(i
= 0; i
< prog_state
.numthreads
; i
++)
576 sblist_add(prog_state
.job_infos
, &ji
);
579 static void write_statefile(unsigned long long n
, const char* tempfile
) {
580 int fd
= open(tempfile
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
582 dprintf(fd
, "%llu\n", n
+ 1ULL);
584 if(rename(tempfile
, prog_state
.statefile
) == -1)
590 static int str_here(char* haystack
, size_t hay_size
, size_t bufpos
,
591 char* needle
, size_t needle_size
) {
592 if(needle_size
<= hay_size
- bufpos
) {
593 if(!memcmp(needle
, haystack
+ bufpos
, needle_size
))
598 // returns numbers of substitutions done, -1 on out of buffer.
599 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
600 int substitute_all(char *dest
, ssize_t dest_size
,
601 char *src
, size_t src_size
,
602 char *what
, size_t what_size
,
603 char *whit
, size_t whit_size
) {
606 for(i
= 0; dest_size
> 0 && i
< src_size
; ) {
607 if(str_here(src
, src_size
, i
, what
, what_size
)) {
608 if(dest_size
< (ssize_t
) whit_size
) return -1;
609 memcpy(dest
, whit
, whit_size
);
611 dest_size
-= whit_size
;
621 if(!dest_size
) return -1;
626 static char* mystrnchr(const char *in
, int ch
, size_t end
) {
627 const char *e
= in
+end
;
629 while(p
!= e
&& *p
!= ch
) p
++;
630 if(p
!= e
) return (char*)p
;
633 static char* mystrnrchr(const char *in
, int ch
, size_t end
) {
634 const char *e
= in
+end
-1;
636 while(p
!= e
&& *e
!= ch
) e
--;
637 if(*e
== ch
) return (char*)e
;
640 static char* mystrnrchr_chk(const char *in
, int ch
, size_t end
) {
642 return mystrnrchr(in
, ch
, end
);
645 static int need_linecounter(void) {
646 return !!prog_state
.skip
|| prog_state
.statefile
;
648 static size_t count_linefeeds(const char *buf
, size_t len
) {
649 const char *p
= buf
, *e
= buf
+len
;
652 if(*p
== '\n') cnt
++;
658 static int match_eof(char* inbuf
, size_t len
) {
659 if(!prog_state
.eof_marker
) return 0;
660 size_t l
= strlen(prog_state
.eof_marker
);
661 return l
== len
-1 && !memcmp(prog_state
.eof_marker
, inbuf
, l
);
664 static inline int islb(int p
) { return p
== '\n' || p
== '\r'; }
665 static void chomp(char *s
, size_t *len
) {
666 while(*len
&& islb(s
[*len
-1])) s
[--(*len
)] = 0;
669 #define MAX_SUBSTS 16
670 static int dispatch_line(char* inbuf
, size_t len
, char** argv
) {
671 char subst_buf
[MAX_SUBSTS
][4096];
672 static unsigned spinup_counter
= 0;
674 stringptr line_b
, *line
= &line_b
;
676 if(!prog_state
.bulk_bytes
)
678 else if(need_linecounter()) {
679 prog_state
.lineno
+= count_linefeeds(inbuf
, len
);
682 if(prog_state
.skip
) {
683 if(!prog_state
.bulk_bytes
) {
687 while(len
&& prog_state
.skip
) {
688 char *q
= mystrnchr(inbuf
, '\n', len
);
690 ptrdiff_t diff
= (q
- inbuf
) + 1;
701 if(!prog_state
.cmd_startarg
) {
702 write_all(1, inbuf
, len
);
706 if(!prog_state
.pipe_mode
)
709 line
->ptr
= inbuf
; line
->size
= len
;
711 if(prog_state
.subst_entries
) {
712 unsigned max_subst
= 0;
714 sblist_iter(prog_state
.subst_entries
, index
) {
715 if(max_subst
>= MAX_SUBSTS
) break;
716 char *source
= argv
[*index
+ prog_state
.cmd_startarg
];
717 size_t source_len
= strlen(source
);
719 ret
= substitute_all(subst_buf
[max_subst
], 4096,
722 line
->ptr
, line
->size
);
725 dprintf(2, "fatal: line too long for substitution: %s\n", line
->ptr
);
728 char* lastdot
= mystrnrchr_chk(line
->ptr
, '.', line
->size
);
729 stringptr tilLastDot
= *line
;
730 if(lastdot
) tilLastDot
.size
= lastdot
- line
->ptr
;
731 ret
= substitute_all(subst_buf
[max_subst
], 4096,
734 tilLastDot
.ptr
, tilLastDot
.size
);
735 if(ret
== -1) goto too_long
;
738 prog_state
.cmd_argv
[*index
] = subst_buf
[max_subst
];
745 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
746 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
751 launch_job(prog_state
.threads_running
, prog_state
.cmd_argv
);
752 else if(!prog_state
.pipe_mode
)
753 launch_job(reap_child(), prog_state
.cmd_argv
);
755 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || free_slots() == 0)) {
756 write_statefile(prog_state
.lineno
, prog_state
.temp_state
);
759 if(prog_state
.pipe_mode
)
760 pass_stdin(line
->ptr
, line
->size
);
765 int main(int argc
, char** argv
) {
768 char tempdir_buf
[256];
772 if(argc
> 4096) argc
= 4096;
774 prog_state
.threads_running
= 0;
776 if(parse_args(argc
, argv
)) return 1;
778 if(prog_state
.statefile
)
779 snprintf(prog_state
.temp_state
, sizeof(prog_state
.temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
781 prog_state
.tempdir
= NULL
;
783 if(prog_state
.buffered
) {
784 prog_state
.tempdir
= tempdir_buf
;
785 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
787 die("could not create tempdir\n");
790 /* if the stdout/stderr fds are not in O_APPEND mode,
791 the dup()'s of the fds in posix_spawn can cause different
792 file positions, causing the different processes to overwrite each others output.
794 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
796 if(fcntl(1, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
797 if(fcntl(2, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
800 if(prog_state
.cmd_startarg
) {
801 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
802 prog_state
.cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
804 prog_state
.cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
807 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
810 prog_state
.lineno
= 0;
812 size_t left
= 0, bytes_read
= 0;
813 const size_t chunksize
= prog_state
.bulk_bytes
? prog_state
.bulk_bytes
: 16*1024;
815 char *mem
= mmap(NULL
, chunksize
*2, PROT_READ
| PROT_WRITE
, MAP_PRIVATE
| MAP_ANON
, -1, 0);
817 char *buf2
= mem
+chunksize
;
823 inbuf
= buf1
+chunksize
-left
;
824 memcpy(inbuf
, buf2
+bytes_read
-left
, left
);
825 ssize_t n
= read(0, buf2
, chunksize
);
835 if(prog_state
.pipe_mode
&& prog_state
.bulk_bytes
)
836 p
= mystrnrchr(in
, '\n', left
);
838 p
= mystrnchr (in
, '\n', left
);
841 ptrdiff_t diff
= (p
- in
) + 1;
842 if(match_eof(in
, diff
)) {
846 if(!dispatch_line(in
, diff
, argv
))
852 if(left
&& !match_eof(in
, left
)) dispatch_line(in
, left
, argv
);
855 if(left
> chunksize
) {
856 dprintf(2, "error: input line length exceeds buffer size\n");
865 if(prog_state
.pipe_mode
) {
869 if(prog_state
.delayedflush
)
870 write_statefile(prog_state
.lineno
- 1, prog_state
.temp_state
);
872 while(prog_state
.threads_running
) reap_child();
874 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
875 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
876 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
878 if(prog_state
.tempdir
)
879 rmdir(prog_state
.tempdir
);