2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.0"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
25 #define _XOPEN_SOURCE 700
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
46 /* process handling */
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid
, ...) {
61 dprintf(2, "prlimit() not implemented on this system\n");
72 posix_spawn_file_actions_t fa
;
82 unsigned threads_running
;
86 sblist
* subst_entries
;
88 unsigned cmd_startarg
;
90 int delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
91 the top value in ms can be supplied via a command line switch.
92 this option makes only sense if the interval is somewhat smaller than the
93 expected runtime of the average job.
94 this option is useful to not overload a network app due to hundreds of
95 parallel connection tries on startup.
97 int buffered
:1; /* write stdout and stderr of each task into a file,
98 and print it to stdout once the process ends.
99 this prevents mixing up of the output of multiple tasks. */
100 int delayedflush
:1; /* only write to statefile whenever all processes are busy, and at program end.
101 this means faster program execution, but could also be imprecise if the number of
102 jobs is small or smaller than the available threadcount. */
103 int join_output
:1; /* join stdout and stderr of launched jobs into stdout */
106 prog_state_s prog_state
;
109 extern char** environ
;
111 int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
112 int ret
= snprintf(buf
, bufsize
,
113 is_stderr
? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
114 prog_state
.tempdir
, (unsigned) jobindex
);
115 return ret
> 0 && (size_t) ret
< bufsize
;
118 void launch_job(size_t jobindex
, char** argv
) {
119 char stdout_filename_buf
[256];
120 char stderr_filename_buf
[256];
121 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
123 if(job
->pid
!= -1) return;
125 if(prog_state
.buffered
) {
126 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
127 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
128 dprintf(2, "temp filename too long!\n");
133 errno
= posix_spawn_file_actions_init(&job
->fa
);
134 if(errno
) goto spawn_error
;
135 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
136 if(errno
) goto spawn_error
;
138 if(prog_state
.buffered
) {
139 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
140 if(errno
) goto spawn_error
;
141 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
142 if(errno
) goto spawn_error
;
145 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
146 if(errno
) goto spawn_error
;
148 if(prog_state
.buffered
) {
149 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
150 if(errno
) goto spawn_error
;
151 if(prog_state
.join_output
)
152 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
154 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
155 if(errno
) goto spawn_error
;
158 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
162 perror("posix_spawn");
164 prog_state
.threads_running
++;
165 if(prog_state
.limits
) {
167 sblist_iter(prog_state
.limits
, limit
) {
168 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
175 static void dump_output(size_t job_id
, int is_stderr
) {
176 char out_filename_buf
[256];
178 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
181 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
183 dst
= fopen(out_filename_buf
, "r");
185 while((nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
186 fwrite(buf
, 1, nread
, out_stream
);
187 if(nread
< sizeof(buf
)) break;
194 /* wait till a child exits, reap it, and return its job index for slot reuse */
195 static size_t reap_child(void) {
200 do ret
= waitpid(-1, &retval
, 0);
201 while(ret
== -1 || !WIFEXITED(retval
));
203 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
204 job
= sblist_get(prog_state
.job_infos
, i
);
205 if(job
->pid
== ret
) {
207 posix_spawn_file_actions_destroy(&job
->fa
);
208 prog_state
.threads_running
--;
209 if(prog_state
.buffered
) {
211 if(!prog_state
.join_output
)
221 static size_t free_slots(void) {
222 return prog_state
.numthreads
- prog_state
.threads_running
;
225 static void add_job(char **argv
) {
227 launch_job(prog_state
.threads_running
, argv
);
229 launch_job(reap_child(), argv
);
232 __attribute__((noreturn
))
233 static void die(const char* msg
) {
238 static unsigned long parse_human_number(stringptr
* num
) {
239 unsigned long ret
= 0;
240 static const unsigned long mul
[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
241 const char* kmg
= "KMG";
243 if(num
&& num
->size
) {
244 ret
= atol(num
->ptr
);
245 if((kmgind
= strchr(kmg
, num
->ptr
[num
->size
-1])))
246 ret
*= mul
[kmgind
- kmg
];
251 static int syntax(void) {
253 "jobflow " VERSION
" (C) rofl0r\n"
254 "------------------\n"
255 "this program is intended to be used as a recipient of another programs output\n"
256 "it launches processes to which the current line can be passed as an argument\n"
257 "using {} for substitution (as in find -exec).\n"
259 "available options:\n\n"
260 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
261 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
262 "-exec ./mycommand {}\n"
265 " XXX=number of entries to skip\n"
267 " XXX=number of parallel processes to spawn]\n"
269 " resume from last jobnumber stored in statefile\n"
272 " saves last launched jobnumber into a file\n"
274 " only write to statefile whenever all processes are busy,\n"
275 " and at program end\n"
276 "-delayedspinup=XXX\n"
277 " XXX=maximum amount of milliseconds\n"
278 " ...to wait when spinning up a fresh set of processes\n"
279 " a random value between 0 and the chosen amount is used to delay initial\n"
281 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
282 " activity on program startup\n"
284 " store the stdout and stderr of launched processes into a temporary file\n"
285 " which will be printed after a process has finished.\n"
286 " this prevents mixing up of output of different processes.\n"
288 " if -buffered, write both stdout and stderr into the same file.\n"
289 " this saves the chronological order of the output, and the combined output\n"
290 " will only be printed to stdout.\n"
291 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
292 " sets the rlimit of the new created processes.\n"
293 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
294 "-exec command with args\n"
295 " everything past -exec is treated as the command to execute on each line of\n"
296 " stdin received. the line can be passed as an argument using {}.\n"
297 " {.} passes everything before the last dot in a line as an argument.\n"
298 " it is possible to use multiple substitutions inside a single argument,\n"
299 " but currently only of one type.\n"
305 static int parse_args(int argc
, char** argv
) {
306 op_state op_b
, *op
= &op_b
;
307 op_init(op
, argc
, argv
);
309 if(argc
== 1 || op_hasflag(op
, SPL("-help")))
311 op_temp
= op_get(op
, SPL("threads"));
312 prog_state
.numthreads
= op_temp
? atoi(op_temp
) : 1;
313 if(prog_state
.numthreads
<= 0) die("threadcount must be >= 1\n");
314 op_temp
= op_get(op
, SPL("statefile"));
315 prog_state
.statefile
= op_temp
;
317 op_temp
= op_get(op
, SPL("skip"));
318 prog_state
.skip
= op_temp
? atoi(op_temp
) : 0;
319 if(op_hasflag(op
, SPL("resume"))) {
320 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
321 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
322 stringptr
* fc
= stringptr_fromfile(prog_state
.statefile
);
323 prog_state
.skip
= atoi(fc
->ptr
);
327 prog_state
.delayedflush
= 0;
328 if(op_hasflag(op
, SPL("delayedflush"))) {
329 if(!prog_state
.statefile
) die("-delayedflush needs -statefile\n");
330 prog_state
.delayedflush
= 1;
333 op_temp
= op_get(op
, SPL("delayedspinup"));
334 prog_state
.delayedspinup_interval
= op_temp
? atoi(op_temp
) : 0;
336 prog_state
.cmd_startarg
= 0;
337 prog_state
.subst_entries
= NULL
;
339 if(op_hasflag(op
, SPL("exec"))) {
342 for(i
= 1; i
< (unsigned) argc
; i
++) {
343 if(str_equal(argv
[i
], "-exec")) {
348 if(r
&& r
< (unsigned) argc
) {
349 prog_state
.cmd_startarg
= r
;
352 // save entries which must be substituted, to save some cycles.
353 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
354 for(i
= r
; i
< (unsigned) argc
; i
++) {
356 if(strstr(argv
[i
], "{}") || strstr(argv
[i
], "{.}")) sblist_add(prog_state
.subst_entries
, &subst_ent
);
360 prog_state
.buffered
= 0;
361 if(op_hasflag(op
, SPL("buffered"))) {
362 prog_state
.buffered
= 1;
365 prog_state
.join_output
= 0;
366 if(op_hasflag(op
, SPL("joinoutput"))) {
367 if(!prog_state
.buffered
) die("-joinoutput needs -buffered\n");
368 prog_state
.join_output
= 1;
371 prog_state
.limits
= NULL
;
372 op_temp
= op_get(op
, SPL("limits"));
375 SPDECLAREC(limits
, op_temp
);
376 stringptrlist
* limit_list
= stringptr_splitc(limits
, ',');
378 stringptr
* key
, *value
;
380 if(stringptrlist_getsize(limit_list
)) {
381 prog_state
.limits
= sblist_new(sizeof(limit_rec
), stringptrlist_getsize(limit_list
));
382 for(i
= 0; i
< stringptrlist_getsize(limit_list
); i
++) {
383 kv
= stringptr_splitc(stringptrlist_get(limit_list
, i
), '=');
384 if(stringptrlist_getsize(kv
) != 2) continue;
385 key
= stringptrlist_get(kv
, 0);
386 value
= stringptrlist_get(kv
, 1);
387 if(EQ(key
, SPL("mem")))
388 lim
.limit
= RLIMIT_AS
;
389 else if(EQ(key
, SPL("cpu")))
390 lim
.limit
= RLIMIT_CPU
;
391 else if(EQ(key
, SPL("stack")))
392 lim
.limit
= RLIMIT_STACK
;
393 else if(EQ(key
, SPL("fsize")))
394 lim
.limit
= RLIMIT_FSIZE
;
395 else if(EQ(key
, SPL("nofiles")))
396 lim
.limit
= RLIMIT_NOFILE
;
398 die("unknown option passed to -limits");
400 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
402 die("could not query rlimits");
404 lim
.rl
.rlim_cur
= parse_human_number(value
);
405 sblist_add(prog_state
.limits
, &lim
);
406 stringptrlist_free(kv
);
408 stringptrlist_free(limit_list
);
414 static void init_queue(void) {
416 job_info ji
= {.pid
= -1};
418 for(i
= 0; i
< prog_state
.numthreads
; i
++)
419 sblist_add(prog_state
.job_infos
, &ji
);
422 static void write_statefile(uint64_t n
, const char* tempfile
) {
424 stringptr num_b
, *num
= &num_b
;
426 num_b
.ptr
= uint64ToString(n
+ 1, numbuf
);
427 num_b
.size
= strlen(numbuf
);
428 stringptr_tofile((char*) tempfile
, num
);
429 if(rename(tempfile
, prog_state
.statefile
) == -1)
433 // returns numbers of substitutions done, -1 on out of buffer.
434 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
435 int substitute_all(char* dest
, ssize_t dest_size
, stringptr
* source
, stringptr
* what
, stringptr
* whit
) {
438 for(i
= 0; dest_size
> 0 && i
< source
->size
; ) {
439 if(stringptr_here(source
, i
, what
)) {
440 if(dest_size
< (ssize_t
) whit
->size
) return -1;
441 memcpy(dest
, whit
->ptr
, whit
->size
);
443 dest_size
-= whit
->size
;
447 *dest
= source
->ptr
[i
];
453 if(!dest_size
) return -1;
458 int main(int argc
, char** argv
) {
459 char inbuf
[4096]; char* fgets_result
;
460 stringptr line_b
, *line
= &line_b
;
461 char* cmd_argv
[4096];
462 char subst_buf
[16][4096];
467 unsigned spinup_counter
= 0;
469 char tempdir_buf
[256];
470 char temp_state
[256];
474 if(argc
> 4096) argc
= 4096;
476 prog_state
.threads_running
= 0;
478 if(parse_args(argc
, argv
)) return 1;
480 if(prog_state
.statefile
)
481 snprintf(temp_state
, sizeof(temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
483 prog_state
.tempdir
= NULL
;
485 if(prog_state
.buffered
) {
486 prog_state
.tempdir
= tempdir_buf
;
487 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
489 die("could not create tempdir\n");
492 /* if the stdout/stderr fds are not in O_APPEND mode,
493 the dup()'s of the fds in posix_spawn can cause different
494 file positions, causing the different processes to overwrite each others output.
496 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
498 if(fcntl(1, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
499 if(fcntl(2, F_SETFL
, O_APPEND
) == -1) perror("fcntl");
502 if(prog_state
.cmd_startarg
) {
503 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
504 cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
506 cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
509 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
512 for(;(fgets_result
= fgets(inbuf
, sizeof(inbuf
), stdin
));lineno
++) {
513 if(prog_state
.skip
) {
517 if(!prog_state
.cmd_startarg
)
518 dprintf(1, fgets_result
);
520 stringptr_fromchar(fgets_result
, line
);
521 stringptr_chomp(line
);
524 if(prog_state
.subst_entries
) {
526 sblist_iter(prog_state
.subst_entries
, index
) {
527 SPDECLAREC(source
, argv
[*index
+ prog_state
.cmd_startarg
]);
529 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{}"), line
);
532 dprintf(2, "fatal: line too long for substitution: %s\n", line
->ptr
);
535 char* lastdot
= stringptr_rchr(line
, '.');
536 stringptr tilLastDot
= *line
;
537 if(lastdot
) tilLastDot
.size
= lastdot
- line
->ptr
;
538 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{.}"), &tilLastDot
);
539 if(ret
== -1) goto too_long
;
542 cmd_argv
[*index
] = subst_buf
[max_subst
];
549 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
550 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
556 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || free_slots() == 0)) {
557 write_statefile(lineno
, temp_state
);
564 if(prog_state
.delayedflush
)
565 write_statefile(lineno
- 1, temp_state
);
567 while(prog_state
.threads_running
) reap_child();
569 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
570 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
571 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
573 if(prog_state
.tempdir
)
574 rmdir(prog_state
.tempdir
);