2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.1"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
25 #define _XOPEN_SOURCE 700
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
46 /* process handling */
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid
, ...) {
61 dprintf(2, "prlimit() not implemented on this system\n");
73 posix_spawn_file_actions_t fa
;
84 unsigned long long lineno
;
86 unsigned threads_running
;
88 unsigned long long skip
;
90 sblist
* subst_entries
;
92 unsigned cmd_startarg
;
94 int delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
95 the top value in ms can be supplied via a command line switch.
96 this option makes only sense if the interval is somewhat smaller than the
97 expected runtime of the average job.
98 this option is useful to not overload a network app due to hundreds of
99 parallel connection tries on startup.
101 int buffered
:1; /* write stdout and stderr of each task into a file,
102 and print it to stdout once the process ends.
103 this prevents mixing up of the output of multiple tasks. */
104 int delayedflush
:1; /* only write to statefile whenever all processes are busy, and at program end.
105 this means faster program execution, but could also be imprecise if the number of
106 jobs is small or smaller than the available threadcount. */
107 int join_output
:1; /* join stdout and stderr of launched jobs into stdout */
111 prog_state_s prog_state
;
114 extern char** environ
;
116 int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
117 int ret
= snprintf(buf
, bufsize
, "%s/jd_proc_%.5lu_std%s.log",
118 prog_state
.tempdir
, (unsigned long) jobindex
, is_stderr
? "err" : "out");
119 return ret
> 0 && (size_t) ret
< bufsize
;
122 void launch_job(size_t jobindex
, char** argv
) {
123 char stdout_filename_buf
[256];
124 char stderr_filename_buf
[256];
125 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
127 if(job
->pid
!= -1) return;
129 if(prog_state
.buffered
) {
130 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
131 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
132 dprintf(2, "temp filename too long!\n");
137 errno
= posix_spawn_file_actions_init(&job
->fa
);
138 if(errno
) goto spawn_error
;
140 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
141 if(errno
) goto spawn_error
;
144 if(prog_state
.stdin_pipe
) {
149 job
->pipe
= pipes
[1];
150 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, pipes
[0], 0);
151 if(errno
) goto spawn_error
;
152 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[0]);
153 if(errno
) goto spawn_error
;
154 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[1]);
155 if(errno
) goto spawn_error
;
158 if(prog_state
.buffered
) {
159 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
160 if(errno
) goto spawn_error
;
161 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
162 if(errno
) goto spawn_error
;
165 if(!prog_state
.stdin_pipe
) {
166 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
167 if(errno
) goto spawn_error
;
170 if(prog_state
.buffered
) {
171 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
172 if(errno
) goto spawn_error
;
173 if(prog_state
.join_output
)
174 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
176 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
177 if(errno
) goto spawn_error
;
180 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
184 perror("posix_spawn");
186 prog_state
.threads_running
++;
187 if(prog_state
.limits
) {
189 sblist_iter(prog_state
.limits
, limit
) {
190 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
195 if(prog_state
.stdin_pipe
) close(pipes
[0]);
198 static void dump_output(size_t job_id
, int is_stderr
) {
199 char out_filename_buf
[256];
201 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
204 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
206 dst
= fopen(out_filename_buf
, "r");
208 while((nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
209 fwrite(buf
, 1, nread
, out_stream
);
210 if(nread
< sizeof(buf
)) break;
217 static void pass_stdin(stringptr
*line
) {
218 static size_t next_child
= 0;
219 if(next_child
>= sblist_getsize(prog_state
.job_infos
))
221 job_info
*job
= sblist_get(prog_state
.job_infos
, next_child
);
222 write(job
->pipe
, line
->ptr
, line
->size
);
226 static void close_pipes(void) {
228 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
229 job_info
*job
= sblist_get(prog_state
.job_infos
, i
);
234 /* wait till a child exits, reap it, and return its job index for slot reuse */
235 static size_t reap_child(void) {
240 do ret
= waitpid(-1, &retval
, 0);
241 while(ret
== -1 || !WIFEXITED(retval
));
243 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
244 job
= sblist_get(prog_state
.job_infos
, i
);
245 if(job
->pid
== ret
) {
247 posix_spawn_file_actions_destroy(&job
->fa
);
248 prog_state
.threads_running
--;
249 if(prog_state
.buffered
) {
251 if(!prog_state
.join_output
)
261 static size_t free_slots(void) {
262 return prog_state
.numthreads
- prog_state
.threads_running
;
265 __attribute__((noreturn
))
266 static void die(const char* msg
) {
271 static unsigned long parse_human_number(stringptr
* num
) {
272 unsigned long ret
= 0;
273 static const unsigned long mul
[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
274 const char* kmg
= "KMG";
276 if(num
&& num
->size
) {
277 ret
= atol(num
->ptr
);
278 if((kmgind
= strchr(kmg
, num
->ptr
[num
->size
-1])))
279 ret
*= mul
[kmgind
- kmg
];
284 static int syntax(void) {
286 "jobflow " VERSION
" (C) rofl0r\n"
287 "------------------\n"
288 "this program is intended to be used as a recipient of another programs output\n"
289 "it launches processes to which the current line can be passed as an argument\n"
290 "using {} for substitution (as in find -exec).\n"
292 "available options:\n\n"
293 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
294 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
295 "-pipe -exec ./mycommand {}\n"
298 " child processes receive input on stdin. if this option\n"
299 " is used, input will be evenly distributed to jobs.\n"
300 " all jobs will stay alive until EOF is received.\n"
302 " XXX=number of entries to skip\n"
304 " XXX=number of parallel processes to spawn\n"
306 " resume from last jobnumber stored in statefile\n"
309 " saves last launched jobnumber into a file\n"
311 " only write to statefile whenever all processes are busy,\n"
312 " and at program end\n"
313 "-delayedspinup=XXX\n"
314 " XXX=maximum amount of milliseconds\n"
315 " ...to wait when spinning up a fresh set of processes\n"
316 " a random value between 0 and the chosen amount is used to delay initial\n"
318 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
319 " activity on program startup\n"
321 " store the stdout and stderr of launched processes into a temporary file\n"
322 " which will be printed after a process has finished.\n"
323 " this prevents mixing up of output of different processes.\n"
325 " if -buffered, write both stdout and stderr into the same file.\n"
326 " this saves the chronological order of the output, and the combined output\n"
327 " will only be printed to stdout.\n"
328 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
329 " sets the rlimit of the new created processes.\n"
330 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
331 "-exec command with args\n"
332 " everything past -exec is treated as the command to execute on each line of\n"
333 " stdin received. the line can be passed as an argument using {}.\n"
334 " {.} passes everything before the last dot in a line as an argument.\n"
335 " it is possible to use multiple substitutions inside a single argument,\n"
336 " but currently only of one type.\n"
343 #define strtoll(a,b,c) strtoint64(a, strlen(a))
344 static int parse_args(int argc
, char** argv
) {
345 op_state op_b
, *op
= &op_b
;
346 op_init(op
, argc
, argv
);
348 if(argc
== 1 || op_hasflag(op
, SPL("-help")))
351 op_temp
= op_get(op
, SPL("threads"));
352 long long x
= op_temp
? strtoll(op_temp
,0,10) : 1;
353 if(x
<= 0) die("threadcount must be >= 1\n");
354 prog_state
.numthreads
= x
;
356 op_temp
= op_get(op
, SPL("statefile"));
357 prog_state
.statefile
= op_temp
;
359 op_temp
= op_get(op
, SPL("skip"));
360 prog_state
.skip
= op_temp
? strtoll(op_temp
,0,10) : 0;
361 if(op_hasflag(op
, SPL("resume"))) {
362 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
363 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
364 FILE *f
= fopen(prog_state
.statefile
, "r");
367 if(fgets(nb
, sizeof nb
, f
)) prog_state
.skip
= strtoll(nb
,0,10);
373 prog_state
.delayedflush
= 0;
374 if(op_hasflag(op
, SPL("delayedflush"))) {
375 if(!prog_state
.statefile
) die("-delayedflush needs -statefile\n");
376 prog_state
.delayedflush
= 1;
379 prog_state
.stdin_pipe
= 0;
380 if(op_hasflag(op
, SPL("pipe"))) prog_state
.stdin_pipe
= 1;
382 op_temp
= op_get(op
, SPL("delayedspinup"));
383 prog_state
.delayedspinup_interval
= op_temp
? strtoll(op_temp
,0,10) : 0;
385 prog_state
.cmd_startarg
= 0;
386 prog_state
.subst_entries
= NULL
;
388 if(op_hasflag(op
, SPL("exec"))) {
391 for(i
= 1; i
< (unsigned) argc
; i
++) {
392 if(str_equal(argv
[i
], "-exec")) {
397 if(r
&& r
< (unsigned) argc
) {
398 prog_state
.cmd_startarg
= r
;
401 if(!prog_state
.stdin_pipe
)
402 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
404 // save entries which must be substituted, to save some cycles.
405 for(i
= r
; i
< (unsigned) argc
; i
++) {
407 if(strstr(argv
[i
], "{}") || strstr(argv
[i
], "{.}")) {
408 if(prog_state
.stdin_pipe
)
409 die("argument substitution must not be used when -pipe option is given\n");
411 sblist_add(prog_state
.subst_entries
, &subst_ent
);
416 prog_state
.buffered
= 0;
417 if(op_hasflag(op
, SPL("buffered"))) {
418 prog_state
.buffered
= 1;
421 prog_state
.join_output
= 0;
422 if(op_hasflag(op
, SPL("joinoutput"))) {
423 if(!prog_state
.buffered
) die("-joinoutput needs -buffered\n");
424 prog_state
.join_output
= 1;
427 prog_state
.limits
= NULL
;
428 op_temp
= op_get(op
, SPL("limits"));
431 SPDECLAREC(limits
, op_temp
);
432 stringptrlist
* limit_list
= stringptr_splitc(limits
, ',');
434 stringptr
* key
, *value
;
436 if(stringptrlist_getsize(limit_list
)) {
437 prog_state
.limits
= sblist_new(sizeof(limit_rec
), stringptrlist_getsize(limit_list
));
438 for(i
= 0; i
< stringptrlist_getsize(limit_list
); i
++) {
439 kv
= stringptr_splitc(stringptrlist_get(limit_list
, i
), '=');
440 if(stringptrlist_getsize(kv
) != 2) continue;
441 key
= stringptrlist_get(kv
, 0);
442 value
= stringptrlist_get(kv
, 1);
443 if(EQ(key
, SPL("mem")))
444 lim
.limit
= RLIMIT_AS
;
445 else if(EQ(key
, SPL("cpu")))
446 lim
.limit
= RLIMIT_CPU
;
447 else if(EQ(key
, SPL("stack")))
448 lim
.limit
= RLIMIT_STACK
;
449 else if(EQ(key
, SPL("fsize")))
450 lim
.limit
= RLIMIT_FSIZE
;
451 else if(EQ(key
, SPL("nofiles")))
452 lim
.limit
= RLIMIT_NOFILE
;
454 die("unknown option passed to -limits");
456 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
458 die("could not query rlimits");
460 lim
.rl
.rlim_cur
= parse_human_number(value
);
461 sblist_add(prog_state
.limits
, &lim
);
462 stringptrlist_free(kv
);
464 stringptrlist_free(limit_list
);
470 static void init_queue(void) {
472 job_info ji
= {.pid
= -1};
474 for(i
= 0; i
< prog_state
.numthreads
; i
++)
475 sblist_add(prog_state
.job_infos
, &ji
);
478 static void write_statefile(unsigned long long n
, const char* tempfile
) {
479 int fd
= open(tempfile
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
481 dprintf(fd
, "%llu\n", n
+ 1ULL);
483 if(rename(tempfile
, prog_state
.statefile
) == -1)
489 // returns numbers of substitutions done, -1 on out of buffer.
490 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
491 int substitute_all(char* dest
, ssize_t dest_size
, stringptr
* source
, stringptr
* what
, stringptr
* whit
) {
494 for(i
= 0; dest_size
> 0 && i
< source
->size
; ) {
495 if(stringptr_here(source
, i
, what
)) {
496 if(dest_size
< (ssize_t
) whit
->size
) return -1;
497 memcpy(dest
, whit
->ptr
, whit
->size
);
499 dest_size
-= whit
->size
;
503 *dest
= source
->ptr
[i
];
509 if(!dest_size
) return -1;
514 static int dispatch_line(char* inbuf
, size_t len
, char** argv
) {
515 char subst_buf
[16][4096];
517 stringptr line_b
, *line
= &line_b
;
520 static unsigned spinup_counter
= 0;
523 if(prog_state
.skip
) {
527 if(!prog_state
.cmd_startarg
) {
528 dprintf(1, "%s", inbuf
);
532 line
->ptr
= inbuf
; line
->size
= len
;
534 if(!prog_state
.stdin_pipe
)
535 stringptr_chomp(line
);
537 if(prog_state
.subst_entries
) {
538 unsigned max_subst
= 0;
540 sblist_iter(prog_state
.subst_entries
, index
) {
541 SPDECLAREC(source
, argv
[*index
+ prog_state
.cmd_startarg
]);
543 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{}"), line
);
546 dprintf(2, "fatal: line too long for substitution: %s\n", line
->ptr
);
549 char* lastdot
= stringptr_rchr(line
, '.');
550 stringptr tilLastDot
= *line
;
551 if(lastdot
) tilLastDot
.size
= lastdot
- line
->ptr
;
552 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{.}"), &tilLastDot
);
553 if(ret
== -1) goto too_long
;
556 prog_state
.cmd_argv
[*index
] = subst_buf
[max_subst
];
563 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
564 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
569 launch_job(prog_state
.threads_running
, prog_state
.cmd_argv
);
570 else if(!prog_state
.stdin_pipe
)
571 launch_job(reap_child(), prog_state
.cmd_argv
);
573 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || free_slots() == 0)) {
574 write_statefile(prog_state
.lineno
, prog_state
.temp_state
);
577 if(prog_state
.stdin_pipe
)
583 int main(int argc
, char** argv
) {
586 char tempdir_buf
[256];
590 if(argc
> 4096) argc
= 4096;
592 prog_state
.threads_running
= 0;
594 if(parse_args(argc
, argv
)) return 1;
596 if(prog_state
.statefile
)
597 snprintf(prog_state
.temp_state
, sizeof(prog_state
.temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
599 prog_state
.tempdir
= NULL
;
601 if(prog_state
.buffered
) {
602 prog_state
.tempdir
= tempdir_buf
;
603 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
605 die("could not create tempdir\n");
608 /* if the stdout/stderr fds are not in O_APPEND mode,
609 the dup()'s of the fds in posix_spawn can cause different
610 file positions, causing the different processes to overwrite each others output.
612 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
614 if(fcntl(1, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
615 if(fcntl(2, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
618 if(prog_state
.cmd_startarg
) {
619 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
620 prog_state
.cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
622 prog_state
.cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
625 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
628 prog_state
.lineno
= 0;
631 while(fgets(inbuf
, sizeof(inbuf
), stdin
)) {
632 if(!dispatch_line(inbuf
, strlen(inbuf
), argv
)) break;
635 if(prog_state
.stdin_pipe
) {
639 if(prog_state
.delayedflush
)
640 write_statefile(prog_state
.lineno
- 1, prog_state
.temp_state
);
642 while(prog_state
.threads_running
) reap_child();
644 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
645 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
646 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
648 if(prog_state
.tempdir
)
649 rmdir(prog_state
.tempdir
);