2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.1"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
25 #define _XOPEN_SOURCE 700
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
46 /* process handling */
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid
, ...) {
61 dprintf(2, "prlimit() not implemented on this system\n");
73 posix_spawn_file_actions_t fa
;
83 unsigned threads_running
;
85 unsigned long long skip
;
87 sblist
* subst_entries
;
89 unsigned cmd_startarg
;
91 int delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
92 the top value in ms can be supplied via a command line switch.
93 this option makes only sense if the interval is somewhat smaller than the
94 expected runtime of the average job.
95 this option is useful to not overload a network app due to hundreds of
96 parallel connection tries on startup.
98 int buffered
:1; /* write stdout and stderr of each task into a file,
99 and print it to stdout once the process ends.
100 this prevents mixing up of the output of multiple tasks. */
101 int delayedflush
:1; /* only write to statefile whenever all processes are busy, and at program end.
102 this means faster program execution, but could also be imprecise if the number of
103 jobs is small or smaller than the available threadcount. */
104 int join_output
:1; /* join stdout and stderr of launched jobs into stdout */
108 prog_state_s prog_state
;
111 extern char** environ
;
113 int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
114 int ret
= snprintf(buf
, bufsize
, "%s/jd_proc_%.5lu_std%s.log",
115 prog_state
.tempdir
, (unsigned long) jobindex
, is_stderr
? "err" : "out");
116 return ret
> 0 && (size_t) ret
< bufsize
;
119 void launch_job(size_t jobindex
, char** argv
) {
120 char stdout_filename_buf
[256];
121 char stderr_filename_buf
[256];
122 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
124 if(job
->pid
!= -1) return;
126 if(prog_state
.buffered
) {
127 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
128 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
129 dprintf(2, "temp filename too long!\n");
134 errno
= posix_spawn_file_actions_init(&job
->fa
);
135 if(errno
) goto spawn_error
;
137 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
138 if(errno
) goto spawn_error
;
141 if(prog_state
.stdin_pipe
) {
146 job
->pipe
= pipes
[1];
147 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, pipes
[0], 0);
148 if(errno
) goto spawn_error
;
149 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[0]);
150 if(errno
) goto spawn_error
;
151 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[1]);
152 if(errno
) goto spawn_error
;
155 if(prog_state
.buffered
) {
156 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
157 if(errno
) goto spawn_error
;
158 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
159 if(errno
) goto spawn_error
;
162 if(!prog_state
.stdin_pipe
) {
163 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
164 if(errno
) goto spawn_error
;
167 if(prog_state
.buffered
) {
168 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
169 if(errno
) goto spawn_error
;
170 if(prog_state
.join_output
)
171 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
173 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
174 if(errno
) goto spawn_error
;
177 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
181 perror("posix_spawn");
183 prog_state
.threads_running
++;
184 if(prog_state
.limits
) {
186 sblist_iter(prog_state
.limits
, limit
) {
187 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
192 if(prog_state
.stdin_pipe
) close(pipes
[0]);
195 static void dump_output(size_t job_id
, int is_stderr
) {
196 char out_filename_buf
[256];
198 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
201 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
203 dst
= fopen(out_filename_buf
, "r");
205 while((nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
206 fwrite(buf
, 1, nread
, out_stream
);
207 if(nread
< sizeof(buf
)) break;
214 static void pass_stdin(stringptr
*line
) {
215 static size_t next_child
= 0;
216 if(next_child
>= sblist_getsize(prog_state
.job_infos
))
218 job_info
*job
= sblist_get(prog_state
.job_infos
, next_child
);
219 write(job
->pipe
, line
->ptr
, line
->size
);
223 static void close_pipes(void) {
225 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
226 job_info
*job
= sblist_get(prog_state
.job_infos
, i
);
231 /* wait till a child exits, reap it, and return its job index for slot reuse */
232 static size_t reap_child(void) {
237 do ret
= waitpid(-1, &retval
, 0);
238 while(ret
== -1 || !WIFEXITED(retval
));
240 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
241 job
= sblist_get(prog_state
.job_infos
, i
);
242 if(job
->pid
== ret
) {
244 posix_spawn_file_actions_destroy(&job
->fa
);
245 prog_state
.threads_running
--;
246 if(prog_state
.buffered
) {
248 if(!prog_state
.join_output
)
258 static size_t free_slots(void) {
259 return prog_state
.numthreads
- prog_state
.threads_running
;
262 __attribute__((noreturn
))
263 static void die(const char* msg
) {
268 static unsigned long parse_human_number(stringptr
* num
) {
269 unsigned long ret
= 0;
270 static const unsigned long mul
[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
271 const char* kmg
= "KMG";
273 if(num
&& num
->size
) {
274 ret
= atol(num
->ptr
);
275 if((kmgind
= strchr(kmg
, num
->ptr
[num
->size
-1])))
276 ret
*= mul
[kmgind
- kmg
];
281 static int syntax(void) {
283 "jobflow " VERSION
" (C) rofl0r\n"
284 "------------------\n"
285 "this program is intended to be used as a recipient of another programs output\n"
286 "it launches processes to which the current line can be passed as an argument\n"
287 "using {} for substitution (as in find -exec).\n"
289 "available options:\n\n"
290 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
291 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
292 "-pipe -exec ./mycommand {}\n"
295 " child processes receive input on stdin. if this option\n"
296 " is used, input will be evenly distributed to jobs.\n"
297 " all jobs will stay alive until EOF is received.\n"
299 " XXX=number of entries to skip\n"
301 " XXX=number of parallel processes to spawn\n"
303 " resume from last jobnumber stored in statefile\n"
306 " saves last launched jobnumber into a file\n"
308 " only write to statefile whenever all processes are busy,\n"
309 " and at program end\n"
310 "-delayedspinup=XXX\n"
311 " XXX=maximum amount of milliseconds\n"
312 " ...to wait when spinning up a fresh set of processes\n"
313 " a random value between 0 and the chosen amount is used to delay initial\n"
315 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
316 " activity on program startup\n"
318 " store the stdout and stderr of launched processes into a temporary file\n"
319 " which will be printed after a process has finished.\n"
320 " this prevents mixing up of output of different processes.\n"
322 " if -buffered, write both stdout and stderr into the same file.\n"
323 " this saves the chronological order of the output, and the combined output\n"
324 " will only be printed to stdout.\n"
325 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
326 " sets the rlimit of the new created processes.\n"
327 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
328 "-exec command with args\n"
329 " everything past -exec is treated as the command to execute on each line of\n"
330 " stdin received. the line can be passed as an argument using {}.\n"
331 " {.} passes everything before the last dot in a line as an argument.\n"
332 " it is possible to use multiple substitutions inside a single argument,\n"
333 " but currently only of one type.\n"
340 #define strtoll(a,b,c) strtoint64(a, strlen(a))
341 static int parse_args(int argc
, char** argv
) {
342 op_state op_b
, *op
= &op_b
;
343 op_init(op
, argc
, argv
);
345 if(argc
== 1 || op_hasflag(op
, SPL("-help")))
348 op_temp
= op_get(op
, SPL("threads"));
349 long long x
= op_temp
? strtoll(op_temp
,0,10) : 1;
350 if(x
<= 0) die("threadcount must be >= 1\n");
351 prog_state
.numthreads
= x
;
353 op_temp
= op_get(op
, SPL("statefile"));
354 prog_state
.statefile
= op_temp
;
356 op_temp
= op_get(op
, SPL("skip"));
357 prog_state
.skip
= op_temp
? strtoll(op_temp
,0,10) : 0;
358 if(op_hasflag(op
, SPL("resume"))) {
359 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
360 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
361 FILE *f
= fopen(prog_state
.statefile
, "r");
364 if(fgets(nb
, sizeof nb
, f
)) prog_state
.skip
= strtoll(nb
,0,10);
370 prog_state
.delayedflush
= 0;
371 if(op_hasflag(op
, SPL("delayedflush"))) {
372 if(!prog_state
.statefile
) die("-delayedflush needs -statefile\n");
373 prog_state
.delayedflush
= 1;
376 prog_state
.stdin_pipe
= 0;
377 if(op_hasflag(op
, SPL("pipe"))) prog_state
.stdin_pipe
= 1;
379 op_temp
= op_get(op
, SPL("delayedspinup"));
380 prog_state
.delayedspinup_interval
= op_temp
? strtoll(op_temp
,0,10) : 0;
382 prog_state
.cmd_startarg
= 0;
383 prog_state
.subst_entries
= NULL
;
385 if(op_hasflag(op
, SPL("exec"))) {
388 for(i
= 1; i
< (unsigned) argc
; i
++) {
389 if(str_equal(argv
[i
], "-exec")) {
394 if(r
&& r
< (unsigned) argc
) {
395 prog_state
.cmd_startarg
= r
;
398 if(!prog_state
.stdin_pipe
)
399 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
401 // save entries which must be substituted, to save some cycles.
402 for(i
= r
; i
< (unsigned) argc
; i
++) {
404 if(strstr(argv
[i
], "{}") || strstr(argv
[i
], "{.}")) {
405 if(prog_state
.stdin_pipe
)
406 die("argument substitution must not be used when -pipe option is given\n");
408 sblist_add(prog_state
.subst_entries
, &subst_ent
);
413 prog_state
.buffered
= 0;
414 if(op_hasflag(op
, SPL("buffered"))) {
415 prog_state
.buffered
= 1;
418 prog_state
.join_output
= 0;
419 if(op_hasflag(op
, SPL("joinoutput"))) {
420 if(!prog_state
.buffered
) die("-joinoutput needs -buffered\n");
421 prog_state
.join_output
= 1;
424 prog_state
.limits
= NULL
;
425 op_temp
= op_get(op
, SPL("limits"));
428 SPDECLAREC(limits
, op_temp
);
429 stringptrlist
* limit_list
= stringptr_splitc(limits
, ',');
431 stringptr
* key
, *value
;
433 if(stringptrlist_getsize(limit_list
)) {
434 prog_state
.limits
= sblist_new(sizeof(limit_rec
), stringptrlist_getsize(limit_list
));
435 for(i
= 0; i
< stringptrlist_getsize(limit_list
); i
++) {
436 kv
= stringptr_splitc(stringptrlist_get(limit_list
, i
), '=');
437 if(stringptrlist_getsize(kv
) != 2) continue;
438 key
= stringptrlist_get(kv
, 0);
439 value
= stringptrlist_get(kv
, 1);
440 if(EQ(key
, SPL("mem")))
441 lim
.limit
= RLIMIT_AS
;
442 else if(EQ(key
, SPL("cpu")))
443 lim
.limit
= RLIMIT_CPU
;
444 else if(EQ(key
, SPL("stack")))
445 lim
.limit
= RLIMIT_STACK
;
446 else if(EQ(key
, SPL("fsize")))
447 lim
.limit
= RLIMIT_FSIZE
;
448 else if(EQ(key
, SPL("nofiles")))
449 lim
.limit
= RLIMIT_NOFILE
;
451 die("unknown option passed to -limits");
453 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
455 die("could not query rlimits");
457 lim
.rl
.rlim_cur
= parse_human_number(value
);
458 sblist_add(prog_state
.limits
, &lim
);
459 stringptrlist_free(kv
);
461 stringptrlist_free(limit_list
);
467 static void init_queue(void) {
469 job_info ji
= {.pid
= -1};
471 for(i
= 0; i
< prog_state
.numthreads
; i
++)
472 sblist_add(prog_state
.job_infos
, &ji
);
475 static void write_statefile(unsigned long long n
, const char* tempfile
) {
476 int fd
= open(tempfile
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
478 dprintf(fd
, "%llu\n", n
+ 1ULL);
480 if(rename(tempfile
, prog_state
.statefile
) == -1)
486 // returns numbers of substitutions done, -1 on out of buffer.
487 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
488 int substitute_all(char* dest
, ssize_t dest_size
, stringptr
* source
, stringptr
* what
, stringptr
* whit
) {
491 for(i
= 0; dest_size
> 0 && i
< source
->size
; ) {
492 if(stringptr_here(source
, i
, what
)) {
493 if(dest_size
< (ssize_t
) whit
->size
) return -1;
494 memcpy(dest
, whit
->ptr
, whit
->size
);
496 dest_size
-= whit
->size
;
500 *dest
= source
->ptr
[i
];
506 if(!dest_size
) return -1;
511 int main(int argc
, char** argv
) {
512 char inbuf
[4096]; char* fgets_result
;
513 stringptr line_b
, *line
= &line_b
;
514 char* cmd_argv
[4096];
515 char subst_buf
[16][4096];
518 unsigned long long lineno
= 0;
520 unsigned spinup_counter
= 0;
522 char tempdir_buf
[256];
523 char temp_state
[256];
527 if(argc
> 4096) argc
= 4096;
529 prog_state
.threads_running
= 0;
531 if(parse_args(argc
, argv
)) return 1;
533 if(prog_state
.statefile
)
534 snprintf(temp_state
, sizeof(temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
536 prog_state
.tempdir
= NULL
;
538 if(prog_state
.buffered
) {
539 prog_state
.tempdir
= tempdir_buf
;
540 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
542 die("could not create tempdir\n");
545 /* if the stdout/stderr fds are not in O_APPEND mode,
546 the dup()'s of the fds in posix_spawn can cause different
547 file positions, causing the different processes to overwrite each others output.
549 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
551 if(fcntl(1, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
552 if(fcntl(2, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
555 if(prog_state
.cmd_startarg
) {
556 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
557 cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
559 cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
562 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
565 for(;(fgets_result
= fgets(inbuf
, sizeof(inbuf
), stdin
));lineno
++) {
566 if(prog_state
.skip
) {
570 if(!prog_state
.cmd_startarg
) {
571 dprintf(1, fgets_result
);
575 stringptr_fromchar(fgets_result
, line
);
577 if(!prog_state
.stdin_pipe
)
578 stringptr_chomp(line
);
580 if(prog_state
.subst_entries
) {
583 sblist_iter(prog_state
.subst_entries
, index
) {
584 SPDECLAREC(source
, argv
[*index
+ prog_state
.cmd_startarg
]);
586 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{}"), line
);
589 dprintf(2, "fatal: line too long for substitution: %s\n", line
->ptr
);
592 char* lastdot
= stringptr_rchr(line
, '.');
593 stringptr tilLastDot
= *line
;
594 if(lastdot
) tilLastDot
.size
= lastdot
- line
->ptr
;
595 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{.}"), &tilLastDot
);
596 if(ret
== -1) goto too_long
;
599 cmd_argv
[*index
] = subst_buf
[max_subst
];
606 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
607 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
612 launch_job(prog_state
.threads_running
, cmd_argv
);
613 else if(!prog_state
.stdin_pipe
)
614 launch_job(reap_child(), cmd_argv
);
616 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || free_slots() == 0)) {
617 write_statefile(lineno
, temp_state
);
620 if(prog_state
.stdin_pipe
)
626 if(prog_state
.stdin_pipe
) {
630 if(prog_state
.delayedflush
)
631 write_statefile(lineno
- 1, temp_state
);
633 while(prog_state
.threads_running
) reap_child();
635 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
636 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
637 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
639 if(prog_state
.tempdir
)
640 rmdir(prog_state
.tempdir
);