2 Copyright (C) 2012 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 when using more than 1 process, and not the -buffered option, the output of some processes gets lost.
20 this happens regardless of dynamic/static linking, libc, and whether the target program fflushes before exit.
22 piping the output into cat > file instead, everything arrives.
27 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
28 should print 100, but does not always
31 seq 100 | ./jobflow.out -threads=100 -exec echo {} | cat > test.tmp ; wc -l test.tmp
35 #undef _POSIX_C_SOURCE
36 #define _POSIX_C_SOURCE 200809L
38 #define _XOPEN_SOURCE 700
42 #include "../lib/include/optparser.h"
43 #include "../lib/include/stringptr.h"
44 #include "../lib/include/stringptrlist.h"
45 #include "../lib/include/sblist.h"
46 #include "../lib/include/strlib.h"
47 #include "../lib/include/timelib.h"
48 #include "../lib/include/filelib.h"
58 /* defines the amount of milliseconds to sleep between each call to the reaper,
59 * once all free slots are exhausted */
62 /* defines after how many milliseconds a reap of the running processes is obligatory. */
63 #define REAP_INTERVAL_MS 100
65 /* process handling */
72 #include <sys/resource.h>
74 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
75 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
76 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
77 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
78 static int prlimit(int pid
, ...) {
80 fprintf(stderr
, "prlimit() not implemented on this system\n");
91 posix_spawn_file_actions_t fa
;
99 /* defines how many slots our free_slots struct can take */
100 #define MAX_SLOTS 128
104 unsigned threads_running
;
108 sblist
* subst_entries
;
110 unsigned cmd_startarg
;
111 size_t free_slots
[MAX_SLOTS
];
112 unsigned free_slots_count
;
114 int delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
115 the top value in ms can be supplied via a command line switch.
116 this option makes only sense if the interval is somewhat smaller than the
117 expected runtime of the average job.
118 this option is useful to not overload a network app due to hundreds of
119 parallel connection tries on startup.
121 int buffered
:1; /* write stdout and stderr of each task into a file,
122 and print it to stdout once the process ends.
123 this prevents mixing up of the output of multiple tasks. */
124 int delayedflush
:1; /* only write to statefile whenever all processes are busy, and at program end.
125 this means faster program execution, but could also be imprecise if the number of
126 jobs is small or smaller than the available threadcount / MAX_SLOTS. */
127 int join_output
:1; /* join stdout and stderr of launched jobs into stdout */
130 prog_state_s prog_state
;
133 extern char** environ
;
135 int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
136 int ret
= snprintf(buf
, bufsize
,
137 is_stderr
? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
138 prog_state
.tempdir
, (unsigned) jobindex
);
139 return ret
> 0 && (size_t) ret
< bufsize
;
142 void launch_job(size_t jobindex
, char** argv
) {
143 char stdout_filename_buf
[256];
144 char stderr_filename_buf
[256];
145 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
147 if(job
->pid
!= -1) return;
149 if(prog_state
.buffered
) {
150 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
151 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
152 fprintf(stderr
, "temp filename too long!\n");
157 errno
= posix_spawn_file_actions_init(&job
->fa
);
158 if(errno
) goto spawn_error
;
159 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
160 if(errno
) goto spawn_error
;
162 if(prog_state
.buffered
) {
163 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
164 if(errno
) goto spawn_error
;
165 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
166 if(errno
) goto spawn_error
;
169 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
170 if(errno
) goto spawn_error
;
172 if(prog_state
.buffered
) {
173 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
174 if(errno
) goto spawn_error
;
175 if(prog_state
.join_output
)
176 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
178 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
179 if(errno
) goto spawn_error
;
182 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
186 perror("posix_spawn");
188 prog_state
.threads_running
++;
189 if(prog_state
.limits
) {
191 sblist_iter(prog_state
.limits
, limit
) {
192 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
199 static void releaseJobSlot(size_t job_id
) {
200 if(prog_state
.free_slots_count
< MAX_SLOTS
) {
201 prog_state
.free_slots
[prog_state
.free_slots_count
] = job_id
;
202 prog_state
.free_slots_count
++;
206 static void dump_output(size_t job_id
, int is_stderr
) {
207 char out_filename_buf
[256];
209 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
212 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
214 dst
= fopen(out_filename_buf
, "r");
216 while((nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
217 fwrite(buf
, 1, nread
, out_stream
);
218 if(nread
< sizeof(buf
)) break;
225 static void reapChilds(void) {
230 prog_state
.free_slots_count
= 0;
232 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
233 job
= sblist_get(prog_state
.job_infos
, i
);
235 ret
= waitpid(job
->pid
, &retval
, WNOHANG
);
237 // error or changed state.
243 //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL);
245 //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL);
248 posix_spawn_file_actions_destroy(&job
->fa
);
251 prog_state
.threads_running
--;
253 if(prog_state
.buffered
) {
255 if(!prog_state
.join_output
)
265 __attribute__((noreturn
))
266 static void die(const char* msg
) {
267 fprintf(stderr
, msg
);
271 static long parse_human_number(stringptr
* num
) {
274 if(num
&& num
->size
&& num
->size
< sizeof(buf
)) {
275 if(num
->ptr
[num
->size
-1] == 'G')
276 ret
= 1024 * 1024 * 1024;
277 else if(num
->ptr
[num
->size
-1] == 'M')
279 else if(num
->ptr
[num
->size
-1] == 'K')
282 memcpy(buf
, num
->ptr
, num
->size
);
284 return atol(buf
) * ret
;
286 return atol(num
->ptr
);
291 static int syntax(void) {
293 "jobflow (C) rofl0r\n"
294 "------------------\n"
295 "this program is intended to be used as a recipient of another programs output\n"
296 "it launches processes to which the current line can be passed as an argument\n"
297 "using {} for substitution (as in find -exec).\n"
299 "available options:\n\n"
300 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
301 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
302 "-exec ./mycommand {}\n"
305 " XXX=number of entries to skip\n"
307 " XXX=number of parallel processes to spawn]\n"
309 " resume from last jobnumber stored in statefile\n"
312 " saves last launched jobnumber into a file\n"
314 " only write to statefile whenever all processes are busy,\n"
315 " and at program end\n"
316 "-delayedspinup=XXX\n"
317 " XXX=maximum amount of milliseconds\n"
318 " ...to wait when spinning up a fresh set of processes\n"
319 " a random value between 0 and the chosen amount is used to delay initial\n"
321 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
322 " activity on program startup\n"
324 " store the stdout and stderr of launched processes into a temporary file\n"
325 " which will be printed after a process has finished.\n"
326 " this prevents mixing up of output of different processes.\n"
328 " if -buffered, write both stdout and stderr into the same file.\n"
329 " this saves the chronological order of the output, and the combined output\n"
330 " will only be printed to stdout.\n"
331 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
332 " sets the rlimit of the new created processes.\n"
333 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
334 "-exec command with args\n"
335 " everything past -exec is treated as the command to execute on each line of\n"
336 " stdin received. the line can be passed as an argument using {}.\n"
337 " {.} passes everything before the last dot in a line as an argument.\n"
338 " it is possible to use multiple substitutions inside a single argument,\n"
339 " but currently only of one type.\n"
344 static int parse_args(int argc
, char** argv
) {
345 op_state op_b
, *op
= &op_b
;
346 op_init(op
, argc
, argv
);
348 if(argc
== 1 || op_hasflag(op
, SPL("-help")))
350 op_temp
= op_get(op
, SPL("threads"));
351 prog_state
.numthreads
= op_temp
? atoi(op_temp
) : 1;
352 op_temp
= op_get(op
, SPL("statefile"));
353 prog_state
.statefile
= op_temp
;
355 op_temp
= op_get(op
, SPL("skip"));
356 prog_state
.skip
= op_temp
? atoi(op_temp
) : 0;
357 if(op_hasflag(op
, SPL("resume"))) {
358 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
359 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
360 stringptr
* fc
= stringptr_fromfile(prog_state
.statefile
);
361 prog_state
.skip
= atoi(fc
->ptr
);
365 prog_state
.delayedflush
= 0;
366 if(op_hasflag(op
, SPL("delayedflush"))) {
367 if(!prog_state
.statefile
) die("-delayedflush needs -statefile\n");
368 prog_state
.delayedflush
= 1;
371 op_temp
= op_get(op
, SPL("delayedspinup"));
372 prog_state
.delayedspinup_interval
= op_temp
? atoi(op_temp
) : 0;
374 prog_state
.cmd_startarg
= 0;
375 prog_state
.subst_entries
= NULL
;
377 if(op_hasflag(op
, SPL("exec"))) {
380 for(i
= 1; i
< (unsigned) argc
; i
++) {
381 if(str_equal(argv
[i
], "-exec")) {
386 if(r
&& r
< (unsigned) argc
) {
387 prog_state
.cmd_startarg
= r
;
390 // save entries which must be substituted, to save some cycles.
391 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
392 for(i
= r
; i
< (unsigned) argc
; i
++) {
394 if(strstr(argv
[i
], "{}") || strstr(argv
[i
], "{.}")) sblist_add(prog_state
.subst_entries
, &subst_ent
);
398 prog_state
.buffered
= 0;
399 if(op_hasflag(op
, SPL("buffered"))) {
400 prog_state
.buffered
= 1;
403 prog_state
.join_output
= 0;
404 if(op_hasflag(op
, SPL("joinoutput"))) {
405 if(!prog_state
.buffered
) die("-joinoutput needs -buffered\n");
406 prog_state
.join_output
= 1;
409 prog_state
.limits
= NULL
;
410 op_temp
= op_get(op
, SPL("limits"));
413 SPDECLAREC(limits
, op_temp
);
414 stringptrlist
* limit_list
= stringptr_splitc(limits
, ',');
416 stringptr
* key
, *value
;
418 if(stringptrlist_getsize(limit_list
)) {
419 prog_state
.limits
= sblist_new(sizeof(limit_rec
), stringptrlist_getsize(limit_list
));
420 for(i
= 0; i
< stringptrlist_getsize(limit_list
); i
++) {
421 kv
= stringptr_splitc(stringptrlist_get(limit_list
, i
), '=');
422 if(stringptrlist_getsize(kv
) != 2) continue;
423 key
= stringptrlist_get(kv
, 0);
424 value
= stringptrlist_get(kv
, 1);
425 if(EQ(key
, SPL("mem")))
426 lim
.limit
= RLIMIT_AS
;
427 else if(EQ(key
, SPL("cpu")))
428 lim
.limit
= RLIMIT_CPU
;
429 else if(EQ(key
, SPL("stack")))
430 lim
.limit
= RLIMIT_STACK
;
431 else if(EQ(key
, SPL("fsize")))
432 lim
.limit
= RLIMIT_FSIZE
;
433 else if(EQ(key
, SPL("nofiles")))
434 lim
.limit
= RLIMIT_NOFILE
;
436 die("unknown option passed to -limits");
438 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
440 die("could not query rlimits");
442 lim
.rl
.rlim_cur
= parse_human_number(value
);
443 sblist_add(prog_state
.limits
, &lim
);
444 stringptrlist_free(kv
);
446 stringptrlist_free(limit_list
);
452 static void init_queue(void) {
457 memset(&ji
.fa
, 0, sizeof(ji
.fa
));
459 for(i
= 0; i
< prog_state
.numthreads
; i
++) {
460 sblist_add(prog_state
.job_infos
, &ji
);
464 static void write_statefile(uint64_t n
, const char* tempfile
) {
466 stringptr num_b
, *num
= &num_b
;
468 num_b
.ptr
= uint64ToString(n
+ 1, numbuf
);
469 num_b
.size
= strlen(numbuf
);
470 stringptr_tofile((char*) tempfile
, num
);
471 if(rename(tempfile
, prog_state
.statefile
) == -1)
475 // returns numbers of substitutions done, -1 on out of buffer.
476 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
477 int substitute_all(char* dest
, ssize_t dest_size
, stringptr
* source
, stringptr
* what
, stringptr
* whit
) {
480 for(i
= 0; dest_size
> 0 && i
< source
->size
; ) {
481 if(stringptr_here(source
, i
, what
)) {
482 if(dest_size
< (ssize_t
) whit
->size
) return -1;
483 memcpy(dest
, whit
->ptr
, whit
->size
);
485 dest_size
-= whit
->size
;
489 *dest
= source
->ptr
[i
];
495 if(!dest_size
) return -1;
500 int main(int argc
, char** argv
) {
501 char inbuf
[4096]; char* fgets_result
;
502 stringptr line_b
, *line
= &line_b
;
503 char* cmd_argv
[4096];
504 char subst_buf
[16][4096];
507 struct timeval reapTime
;
511 unsigned spinup_counter
= 0;
513 char tempdir_buf
[256];
514 char temp_state
[256];
518 if(argc
> 4096) argc
= 4096;
519 prog_state
.threads_running
= 0;
520 prog_state
.free_slots_count
= 0;
521 gettimestamp(&reapTime
);
523 if(parse_args(argc
, argv
)) return 1;
525 if(prog_state
.statefile
)
526 ulz_snprintf(temp_state
, sizeof(temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
528 prog_state
.tempdir
= NULL
;
530 if(prog_state
.buffered
) {
531 prog_state
.tempdir
= tempdir_buf
;
532 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
534 die("could not create tempdir\n");
538 if(prog_state
.cmd_startarg
) {
539 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
540 cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
542 cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
545 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
548 while((fgets_result
= fgets(inbuf
, sizeof(inbuf
), stdin
))) {
552 if(!prog_state
.cmd_startarg
)
553 printf(fgets_result
);
555 stringptr_fromchar(fgets_result
, line
);
556 stringptr_chomp(line
);
559 if(prog_state
.subst_entries
) {
561 sblist_iter(prog_state
.subst_entries
, index
) {
562 SPDECLAREC(source
, argv
[*index
+ prog_state
.cmd_startarg
]);
564 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{}"), line
);
567 fprintf(stderr
, "fatal: line too long for substitution: %s\n", line
->ptr
);
570 char* lastdot
= stringptr_rchr(line
, '.');
571 stringptr tilLastDot
= *line
;
572 if(lastdot
) tilLastDot
.size
= lastdot
- line
->ptr
;
573 ret
= substitute_all(subst_buf
[max_subst
], 4096, source
, SPL("{.}"), &tilLastDot
);
574 if(ret
== -1) goto too_long
;
577 cmd_argv
[*index
] = subst_buf
[max_subst
];
583 while(prog_state
.free_slots_count
== 0 || mspassed(&reapTime
) > REAP_INTERVAL_MS
) {
585 gettimestamp(&reapTime
);
586 if(!prog_state
.free_slots_count
) msleep(SLEEP_MS
);
589 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
590 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
594 launch_job(prog_state
.free_slots
[prog_state
.free_slots_count
-1], cmd_argv
);
595 prog_state
.free_slots_count
--;
597 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || prog_state
.free_slots_count
== 0)) {
598 write_statefile(n
, temp_state
);
607 if(prog_state
.delayedflush
)
608 write_statefile(n
- 1, temp_state
);
610 while(prog_state
.threads_running
) {
612 if(prog_state
.threads_running
) msleep(SLEEP_MS
);
615 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
616 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
617 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
619 if(prog_state
.tempdir
)
620 rmdir(prog_state
.tempdir
);