2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.1"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
25 #define _XOPEN_SOURCE 700
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
47 /* process handling */
54 #include <sys/resource.h>
56 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
57 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
58 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
59 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
60 static int prlimit(int pid
, ...) {
62 dprintf(2, "prlimit() not implemented on this system\n");
74 posix_spawn_file_actions_t fa
;
85 unsigned long long lineno
;
87 unsigned threads_running
;
89 unsigned long long skip
;
91 sblist
* subst_entries
;
93 unsigned cmd_startarg
;
95 int delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
96 the top value in ms can be supplied via a command line switch.
97 this option makes only sense if the interval is somewhat smaller than the
98 expected runtime of the average job.
99 this option is useful to not overload a network app due to hundreds of
100 parallel connection tries on startup.
102 int buffered
:1; /* write stdout and stderr of each task into a file,
103 and print it to stdout once the process ends.
104 this prevents mixing up of the output of multiple tasks. */
105 int delayedflush
:1; /* only write to statefile whenever all processes are busy, and at program end.
106 this means faster program execution, but could also be imprecise if the number of
107 jobs is small or smaller than the available threadcount. */
108 int join_output
:1; /* join stdout and stderr of launched jobs into stdout */
112 prog_state_s prog_state
;
115 extern char** environ
;
117 int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
118 int ret
= snprintf(buf
, bufsize
, "%s/jd_proc_%.5lu_std%s.log",
119 prog_state
.tempdir
, (unsigned long) jobindex
, is_stderr
? "err" : "out");
120 return ret
> 0 && (size_t) ret
< bufsize
;
123 void launch_job(size_t jobindex
, char** argv
) {
124 char stdout_filename_buf
[256];
125 char stderr_filename_buf
[256];
126 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
128 if(job
->pid
!= -1) return;
130 if(prog_state
.buffered
) {
131 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
132 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
133 dprintf(2, "temp filename too long!\n");
138 errno
= posix_spawn_file_actions_init(&job
->fa
);
139 if(errno
) goto spawn_error
;
141 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
142 if(errno
) goto spawn_error
;
145 if(prog_state
.pipe_mode
) {
150 job
->pipe
= pipes
[1];
151 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, pipes
[0], 0);
152 if(errno
) goto spawn_error
;
153 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[0]);
154 if(errno
) goto spawn_error
;
155 errno
= posix_spawn_file_actions_addclose(&job
->fa
, pipes
[1]);
156 if(errno
) goto spawn_error
;
159 if(prog_state
.buffered
) {
160 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
161 if(errno
) goto spawn_error
;
162 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
163 if(errno
) goto spawn_error
;
166 if(!prog_state
.pipe_mode
) {
167 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
168 if(errno
) goto spawn_error
;
171 if(prog_state
.buffered
) {
172 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
173 if(errno
) goto spawn_error
;
174 if(prog_state
.join_output
)
175 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
177 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
178 if(errno
) goto spawn_error
;
181 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
185 perror("posix_spawn");
187 prog_state
.threads_running
++;
188 if(prog_state
.limits
) {
190 sblist_iter(prog_state
.limits
, limit
) {
191 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
196 if(prog_state
.pipe_mode
)
200 static void dump_output(size_t job_id
, int is_stderr
) {
201 char out_filename_buf
[256];
203 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
206 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
208 dst
= fopen(out_filename_buf
, "r");
210 while((nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
211 fwrite(buf
, 1, nread
, out_stream
);
212 if(nread
< sizeof(buf
)) break;
219 static void pass_stdin(stringptr
*line
) {
220 static size_t next_child
= 0;
221 if(next_child
>= sblist_getsize(prog_state
.job_infos
))
223 job_info
*job
= sblist_get(prog_state
.job_infos
, next_child
);
224 write(job
->pipe
, line
->ptr
, line
->size
);
228 static void close_pipes(void) {
230 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
231 job_info
*job
= sblist_get(prog_state
.job_infos
, i
);
236 /* wait till a child exits, reap it, and return its job index for slot reuse */
237 static size_t reap_child(void) {
242 do ret
= waitpid(-1, &retval
, 0);
243 while(ret
== -1 || !WIFEXITED(retval
));
245 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
246 job
= sblist_get(prog_state
.job_infos
, i
);
247 if(job
->pid
== ret
) {
249 posix_spawn_file_actions_destroy(&job
->fa
);
250 prog_state
.threads_running
--;
251 if(prog_state
.buffered
) {
253 if(!prog_state
.join_output
)
263 static size_t free_slots(void) {
264 return prog_state
.numthreads
- prog_state
.threads_running
;
267 __attribute__((noreturn
))
268 static void die(const char* msg
) {
273 static unsigned long parse_human_number(stringptr
* num
) {
274 unsigned long ret
= 0;
275 static const unsigned long mul
[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
276 const char* kmg
= "KMG";
278 if(num
&& num
->size
) {
279 ret
= atol(num
->ptr
);
280 if((kmgind
= strchr(kmg
, num
->ptr
[num
->size
-1])))
281 ret
*= mul
[kmgind
- kmg
];
286 static int syntax(void) {
288 "jobflow " VERSION
" (C) rofl0r\n"
289 "------------------\n"
290 "this program is intended to be used as a recipient of another programs output\n"
291 "it launches processes to which the current line can be passed as an argument\n"
292 "using {} for substitution (as in find -exec).\n"
293 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
294 "stdin of child processes. input will be then evenly distributed to jobs,\n"
295 "until EOF is received.\n"
297 "available options:\n\n"
298 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
299 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
300 "-exec ./mycommand {}\n"
303 " XXX=number of entries to skip\n"
305 " XXX=number of parallel processes to spawn\n"
307 " resume from last jobnumber stored in statefile\n"
310 " saves last launched jobnumber into a file\n"
312 " only write to statefile whenever all processes are busy,\n"
313 " and at program end\n"
314 "-delayedspinup=XXX\n"
315 " XXX=maximum amount of milliseconds\n"
316 " ...to wait when spinning up a fresh set of processes\n"
317 " a random value between 0 and the chosen amount is used to delay initial\n"
319 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
320 " activity on program startup\n"
322 " store the stdout and stderr of launched processes into a temporary file\n"
323 " which will be printed after a process has finished.\n"
324 " this prevents mixing up of output of different processes.\n"
326 " if -buffered, write both stdout and stderr into the same file.\n"
327 " this saves the chronological order of the output, and the combined output\n"
328 " will only be printed to stdout.\n"
329 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
330 " sets the rlimit of the new created processes.\n"
331 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
332 "-exec command with args\n"
333 " everything past -exec is treated as the command to execute on each line of\n"
334 " stdin received. the line can be passed as an argument using {}.\n"
335 " {.} passes everything before the last dot in a line as an argument.\n"
336 " it is possible to use multiple substitutions inside a single argument,\n"
337 " but currently only of one type.\n"
344 #define strtoll(a,b,c) strtoint64(a, strlen(a))
345 static int parse_args(int argc
, char** argv
) {
346 op_state op_b
, *op
= &op_b
;
347 op_init(op
, argc
, argv
);
349 if(argc
== 1 || op_hasflag(op
, SPL("-help")))
352 op_temp
= op_get(op
, SPL("threads"));
353 long long x
= op_temp
? strtoll(op_temp
,0,10) : 1;
354 if(x
<= 0) die("threadcount must be >= 1\n");
355 prog_state
.numthreads
= x
;
357 op_temp
= op_get(op
, SPL("statefile"));
358 prog_state
.statefile
= op_temp
;
360 op_temp
= op_get(op
, SPL("skip"));
361 prog_state
.skip
= op_temp
? strtoll(op_temp
,0,10) : 0;
362 if(op_hasflag(op
, SPL("resume"))) {
363 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
364 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
365 FILE *f
= fopen(prog_state
.statefile
, "r");
368 if(fgets(nb
, sizeof nb
, f
)) prog_state
.skip
= strtoll(nb
,0,10);
374 prog_state
.delayedflush
= 0;
375 if(op_hasflag(op
, SPL("delayedflush"))) {
376 if(!prog_state
.statefile
) die("-delayedflush needs -statefile\n");
377 prog_state
.delayedflush
= 1;
380 prog_state
.pipe_mode
= 0;
382 op_temp
= op_get(op
, SPL("delayedspinup"));
383 prog_state
.delayedspinup_interval
= op_temp
? strtoll(op_temp
,0,10) : 0;
385 prog_state
.cmd_startarg
= 0;
386 prog_state
.subst_entries
= NULL
;
388 if(op_hasflag(op
, SPL("exec"))) {
391 for(i
= 1; i
< (unsigned) argc
; i
++) {
392 if(str_equal(argv
[i
], "-exec")) {
397 if(r
&& r
< (unsigned) argc
) {
398 prog_state
.cmd_startarg
= r
;
401 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
403 // save entries which must be substituted, to save some cycles.
404 for(i
= r
; i
< (unsigned) argc
; i
++) {
406 if(strstr(argv
[i
], "{}") || strstr(argv
[i
], "{.}")) {
407 sblist_add(prog_state
.subst_entries
, &subst_ent
);
410 if(sblist_getsize(prog_state
.subst_entries
) == 0) {
411 prog_state
.pipe_mode
= 1;
412 sblist_free(prog_state
.subst_entries
);
413 prog_state
.subst_entries
= 0;
417 prog_state
.buffered
= 0;
418 if(op_hasflag(op
, SPL("buffered"))) {
419 prog_state
.buffered
= 1;
422 prog_state
.join_output
= 0;
423 if(op_hasflag(op
, SPL("joinoutput"))) {
424 if(!prog_state
.buffered
) die("-joinoutput needs -buffered\n");
425 prog_state
.join_output
= 1;
428 prog_state
.limits
= NULL
;
429 op_temp
= op_get(op
, SPL("limits"));
432 SPDECLAREC(limits
, op_temp
);
433 stringptrlist
* limit_list
= stringptr_splitc(limits
, ',');
435 stringptr
* key
, *value
;
437 if(stringptrlist_getsize(limit_list
)) {
438 prog_state
.limits
= sblist_new(sizeof(limit_rec
), stringptrlist_getsize(limit_list
));
439 for(i
= 0; i
< stringptrlist_getsize(limit_list
); i
++) {
440 kv
= stringptr_splitc(stringptrlist_get(limit_list
, i
), '=');
441 if(stringptrlist_getsize(kv
) != 2) continue;
442 key
= stringptrlist_get(kv
, 0);
443 value
= stringptrlist_get(kv
, 1);
444 if(EQ(key
, SPL("mem")))
445 lim
.limit
= RLIMIT_AS
;
446 else if(EQ(key
, SPL("cpu")))
447 lim
.limit
= RLIMIT_CPU
;
448 else if(EQ(key
, SPL("stack")))
449 lim
.limit
= RLIMIT_STACK
;
450 else if(EQ(key
, SPL("fsize")))
451 lim
.limit
= RLIMIT_FSIZE
;
452 else if(EQ(key
, SPL("nofiles")))
453 lim
.limit
= RLIMIT_NOFILE
;
455 die("unknown option passed to -limits");
457 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
459 die("could not query rlimits");
461 lim
.rl
.rlim_cur
= parse_human_number(value
);
462 sblist_add(prog_state
.limits
, &lim
);
463 stringptrlist_free(kv
);
465 stringptrlist_free(limit_list
);
471 static void init_queue(void) {
473 job_info ji
= {.pid
= -1};
475 for(i
= 0; i
< prog_state
.numthreads
; i
++)
476 sblist_add(prog_state
.job_infos
, &ji
);
479 static void write_statefile(unsigned long long n
, const char* tempfile
) {
480 int fd
= open(tempfile
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
482 dprintf(fd
, "%llu\n", n
+ 1ULL);
484 if(rename(tempfile
, prog_state
.statefile
) == -1)
490 // returns numbers of substitutions done, -1 on out of buffer.
491 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
492 int substitute_all(char* dest
, ssize_t dest_size
, stringptr
* source
, stringptr
* what
, stringptr
* whit
) {
495 for(i
= 0; dest_size
> 0 && i
< source
->size
; ) {
496 if(stringptr_here(source
, i
, what
)) {
497 if(dest_size
< (ssize_t
) whit
->size
) return -1;
498 memcpy(dest
, whit
->ptr
, whit
->size
);
500 dest_size
-= whit
->size
;
504 *dest
= source
->ptr
[i
];
510 if(!dest_size
) return -1;
515 static int dispatch_line(char* inbuf
, size_t len
, char** argv
) {
516 char subst_buf
[16][4096];
518 stringptr line_b
, *line
= &line_b
;
521 static unsigned spinup_counter
= 0;
524 if(prog_state
.skip
) {
528 if(!prog_state
.cmd_startarg
) {
529 write(1, inbuf
, len
);
533 line
->ptr
= inbuf
; line
->size
= len
;
535 if(!prog_state
.pipe_mode
)
536 stringptr_chomp(line
);
538 if(prog_state
.subst_entries
) {
539 unsigned max_subst
= 0;
541 sblist_iter(prog_state
.subst_entries
, index
) {
542 SPDECLAREC(source
, argv
[*index
+ prog_state
.cmd_startarg
]);
544 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{}"), line
);
547 dprintf(2, "fatal: line too long for substitution: %s\n", line
->ptr
);
550 char* lastdot
= stringptr_rchr(line
, '.');
551 stringptr tilLastDot
= *line
;
552 if(lastdot
) tilLastDot
.size
= lastdot
- line
->ptr
;
553 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{.}"), &tilLastDot
);
554 if(ret
== -1) goto too_long
;
557 prog_state
.cmd_argv
[*index
] = subst_buf
[max_subst
];
564 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
565 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
570 launch_job(prog_state
.threads_running
, prog_state
.cmd_argv
);
571 else if(!prog_state
.pipe_mode
)
572 launch_job(reap_child(), prog_state
.cmd_argv
);
574 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || free_slots() == 0)) {
575 write_statefile(prog_state
.lineno
, prog_state
.temp_state
);
578 if(prog_state
.pipe_mode
)
584 static char* mystrnchr(const char *in
, int ch
, size_t end
) {
585 const char *e
= in
+end
;
587 while(p
!= e
&& *p
!= ch
) p
++;
588 if(*p
== ch
) return (char*)p
;
591 static char* mystrnrchr(const char *in
, int ch
, size_t end
) {
592 const char *e
= in
+end
-1;
594 while(p
!= e
&& *e
!= ch
) e
--;
595 if(*e
== ch
) return (char*)e
;
600 #define BULK_BUFSZ BULK_KB*1024
602 int main(int argc
, char** argv
) {
605 char tempdir_buf
[256];
609 if(argc
> 4096) argc
= 4096;
611 prog_state
.threads_running
= 0;
613 if(parse_args(argc
, argv
)) return 1;
615 if(prog_state
.statefile
)
616 snprintf(prog_state
.temp_state
, sizeof(prog_state
.temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
618 prog_state
.tempdir
= NULL
;
620 if(prog_state
.buffered
) {
621 prog_state
.tempdir
= tempdir_buf
;
622 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
624 die("could not create tempdir\n");
627 /* if the stdout/stderr fds are not in O_APPEND mode,
628 the dup()'s of the fds in posix_spawn can cause different
629 file positions, causing the different processes to overwrite each others output.
631 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
633 if(fcntl(1, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
634 if(fcntl(2, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
637 if(prog_state
.cmd_startarg
) {
638 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
639 prog_state
.cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
641 prog_state
.cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
644 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
647 prog_state
.lineno
= 0;
651 char *mem
= mmap(NULL
, BULK_BUFSZ
*2, PROT_READ
| PROT_WRITE
, MAP_PRIVATE
| MAP_ANON
, -1, 0);
653 char *buf2
= mem
+BULK_BUFSZ
;
657 inbuf
= buf1
+BULK_BUFSZ
-left
;
658 memcpy(inbuf
, buf2
+BULK_BUFSZ
-left
, left
);
659 ssize_t n
= read(0, buf2
, BULK_BUFSZ
);
668 if(!prog_state
.pipe_mode
)
669 p
= mystrnchr (in
, '\n', left
);
671 p
= mystrnrchr(in
, '\n', left
);
673 ptrdiff_t diff
= (p
- in
) + 1;
674 if(!dispatch_line(in
, diff
, argv
)) goto out
;
679 if(left
) dispatch_line(in
, left
, argv
);
686 if(prog_state
.pipe_mode
) {
690 if(prog_state
.delayedflush
)
691 write_statefile(prog_state
.lineno
- 1, prog_state
.temp_state
);
693 while(prog_state
.threads_running
) reap_child();
695 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
696 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
697 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
699 if(prog_state
.tempdir
)
700 rmdir(prog_state
.tempdir
);