2 Copyright (C) 2012 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #undef _POSIX_C_SOURCE
19 #define _POSIX_C_SOURCE 200809L
21 #define _XOPEN_SOURCE 700
25 #include "../lib/include/optparser.h"
26 #include "../lib/include/stringptr.h"
27 #include "../lib/include/stringptrlist.h"
28 #include "../lib/include/sblist.h"
29 #include "../lib/include/strlib.h"
30 #include "../lib/include/timelib.h"
31 #include "../lib/include/filelib.h"
41 /* defines the amount of milliseconds to sleep between each call to the reaper,
42 * once all free slots are exhausted */
45 /* defines after how many milliseconds a reap of the running processes is obligatory. */
46 #define REAP_INTERVAL_MS 100
48 /* process handling */
55 #include <sys/resource.h>
57 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
58 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
59 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
60 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
62 static int prlimit(int pid
, ...) {
64 fprintf(stderr
, "prlimit() not implemented on this system\n");
75 posix_spawn_file_actions_t fa
;
83 /* defines how many slots our free_slots struct can take */
88 unsigned threads_running
;
92 sblist
* subst_entries
;
94 unsigned cmd_startarg
;
95 size_t free_slots
[MAX_SLOTS
];
96 unsigned free_slots_count
;
98 int delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
99 the top value in ms can be supplied via a command line switch.
100 this option makes only sense if the interval is somewhat smaller than the
101 expected runtime of the average job.
102 this option is useful to not overload a network app due to hundreds of
103 parallel connection tries on startup.
105 int buffered
:1; /* write stdout and stderr of each task into a file,
106 and print it to stdout once the process ends.
107 this prevents mixing up of the output of multiple tasks. */
108 int delayedflush
:1; /* only write to statefile whenever all processes are busy, and at program end.
109 this means faster program execution, but could also be imprecise if the number of
110 jobs is small or smaller than the available threadcount / MAX_SLOTS. */
111 int join_output
:1; /* join stdout and stderr of launched jobs into stdout */
114 prog_state_s prog_state
;
117 extern char** environ
;
119 int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
120 int ret
= snprintf(buf
, bufsize
,
121 is_stderr
? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
122 prog_state
.tempdir
, (unsigned) jobindex
);
123 return ret
> 0 && (size_t) ret
< bufsize
;
126 void launch_job(size_t jobindex
, char** argv
) {
127 char stdout_filename_buf
[256];
128 char stderr_filename_buf
[256];
129 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
131 if(job
->pid
!= -1) return;
133 if(prog_state
.buffered
) {
134 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
135 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
136 fprintf(stderr
, "temp filename too long!\n");
141 errno
= posix_spawn_file_actions_init(&job
->fa
);
142 if(errno
) goto spawn_error
;
143 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
144 if(errno
) goto spawn_error
;
146 if(prog_state
.buffered
) {
147 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
148 if(errno
) goto spawn_error
;
149 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
150 if(errno
) goto spawn_error
;
153 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
154 if(errno
) goto spawn_error
;
156 if(prog_state
.buffered
) {
157 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
158 if(errno
) goto spawn_error
;
159 if(prog_state
.join_output
)
160 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
162 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
163 if(errno
) goto spawn_error
;
166 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
170 perror("posix_spawn");
172 prog_state
.threads_running
++;
173 if(prog_state
.limits
) {
175 sblist_iter(prog_state
.limits
, limit
) {
176 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
183 static void addJobSlot(size_t job_id
) {
184 if(prog_state
.free_slots_count
< MAX_SLOTS
) {
185 prog_state
.free_slots
[prog_state
.free_slots_count
] = job_id
;
186 prog_state
.free_slots_count
++;
190 static void dump_output(size_t job_id
, int is_stderr
) {
191 char out_filename_buf
[256];
193 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
196 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
198 dst
= fopen(out_filename_buf
, "r");
199 while(dst
&& (nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
200 fwrite(buf
, 1, nread
, out_stream
);
201 if(nread
< sizeof(buf
)) break;
209 /* reap childs and return pointer to a free "slot" or NULL */
210 static void reapChilds(void) {
215 prog_state
.free_slots_count
= 0;
217 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
218 job
= sblist_get(prog_state
.job_infos
, i
);
220 ret
= waitpid(job
->pid
, &retval
, WNOHANG
);
222 // error or changed state.
228 //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL);
231 //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL);
234 posix_spawn_file_actions_destroy(&job
->fa
);
237 prog_state
.threads_running
--;
239 if(prog_state
.buffered
) {
241 if(!prog_state
.join_output
)
251 __attribute__((noreturn
))
252 static void die(const char* msg
) {
253 fprintf(stderr
, msg
);
257 static long parse_human_number(stringptr
* num
) {
260 if(num
&& num
->size
&& num
->size
< sizeof(buf
)) {
261 if(num
->ptr
[num
->size
-1] == 'G')
262 ret
= 1024 * 1024 * 1024;
263 else if(num
->ptr
[num
->size
-1] == 'M')
265 else if(num
->ptr
[num
->size
-1] == 'K')
268 memcpy(buf
, num
->ptr
, num
->size
);
270 return atol(buf
) * ret
;
272 return atol(num
->ptr
);
277 static int syntax(void) {
279 "jobflow (C) rofl0r\n"
280 "------------------\n"
281 "this program is intended to be used as a recipient of another programs output\n"
282 "it launches processes to which the current line can be passed as an argument\n"
283 "using {} for substitution (as in find -exec).\n"
285 "available options:\n\n"
286 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
287 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
288 "-exec ./mycommand {}\n"
291 " XXX=number of entries to skip\n"
293 " XXX=number of parallel processes to spawn]\n"
295 " resume from last jobnumber stored in statefile\n"
298 " saves last launched jobnumber into a file\n"
300 " only write to statefile whenever all processes are busy,\n"
301 " and at program end\n"
302 "-delayedspinup=XXX\n"
303 " XXX=maximum amount of milliseconds\n"
304 " ...to wait when spinning up a fresh set of processes\n"
305 " a random value between 0 and the chosen amount is used to delay initial\n"
307 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
308 " activity on program startup\n"
310 " store the stdout and stderr of launched processes into a temporary file\n"
311 " which will be printed after a process has finished.\n"
312 " this prevents mixing up of output of different processes.\n"
314 " if -buffered, write both stdout and stderr into the same file.\n"
315 " this saves the chronological order of the output, and the combined output\n"
316 " will only be printed to stdout.\n"
317 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
318 " sets the rlimit of the new created processes.\n"
319 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
320 "-exec command with args\n"
321 " everything past -exec is treated as the command to execute on each line of\n"
322 " stdin received. the line can be passed as an argument using {}.\n"
323 " {.} passes everything before the last dot in a line as an argument.\n"
328 static int parse_args(int argc
, char** argv
) {
329 op_state op_b
, *op
= &op_b
;
330 op_init(op
, argc
, argv
);
332 if(argc
== 1 || op_hasflag(op
, SPL("-help")))
334 op_temp
= op_get(op
, SPL("threads"));
335 prog_state
.numthreads
= op_temp
? atoi(op_temp
) : 1;
336 op_temp
= op_get(op
, SPL("statefile"));
337 prog_state
.statefile
= op_temp
;
339 op_temp
= op_get(op
, SPL("skip"));
340 prog_state
.skip
= op_temp
? atoi(op_temp
) : 0;
341 if(op_hasflag(op
, SPL("resume"))) {
342 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
343 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
344 stringptr
* fc
= stringptr_fromfile(prog_state
.statefile
);
345 prog_state
.skip
= atoi(fc
->ptr
);
349 prog_state
.delayedflush
= 0;
350 if(op_hasflag(op
, SPL("delayedflush"))) {
351 if(!prog_state
.statefile
) die("-delayedflush needs -statefile\n");
352 prog_state
.delayedflush
= 1;
355 op_temp
= op_get(op
, SPL("delayedspinup"));
356 prog_state
.delayedspinup_interval
= op_temp
? atoi(op_temp
) : 0;
358 prog_state
.cmd_startarg
= 0;
359 prog_state
.subst_entries
= NULL
;
361 if(op_hasflag(op
, SPL("exec"))) {
364 for(i
= 1; i
< (unsigned) argc
; i
++) {
365 if(str_equal(argv
[i
], "-exec")) {
370 if(r
&& r
< (unsigned) argc
) {
371 prog_state
.cmd_startarg
= r
;
374 // save entries which must be substituted, to save some cycles.
375 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
376 for(i
= r
; i
< (unsigned) argc
; i
++) {
378 if(strstr(argv
[i
], "{}") || strstr(argv
[i
], "{.}")) sblist_add(prog_state
.subst_entries
, &subst_ent
);
382 prog_state
.buffered
= 0;
383 if(op_hasflag(op
, SPL("buffered"))) {
384 prog_state
.buffered
= 1;
387 prog_state
.join_output
= 0;
388 if(op_hasflag(op
, SPL("joinoutput"))) {
389 if(!prog_state
.buffered
) die("-joinoutput needs -buffered\n");
390 prog_state
.join_output
= 1;
393 prog_state
.limits
= NULL
;
394 op_temp
= op_get(op
, SPL("limits"));
397 SPDECLAREC(limits
, op_temp
);
398 stringptrlist
* limit_list
= stringptr_splitc(limits
, ',');
400 stringptr
* key
, *value
;
402 if(stringptrlist_getsize(limit_list
)) {
403 prog_state
.limits
= sblist_new(sizeof(limit_rec
), stringptrlist_getsize(limit_list
));
404 for(i
= 0; i
< stringptrlist_getsize(limit_list
); i
++) {
405 kv
= stringptr_splitc(stringptrlist_get(limit_list
, i
), '=');
406 if(stringptrlist_getsize(kv
) != 2) continue;
407 key
= stringptrlist_get(kv
, 0);
408 value
= stringptrlist_get(kv
, 1);
409 if(EQ(key
, SPL("mem")))
410 lim
.limit
= RLIMIT_AS
;
411 else if(EQ(key
, SPL("cpu")))
412 lim
.limit
= RLIMIT_CPU
;
413 else if(EQ(key
, SPL("stack")))
414 lim
.limit
= RLIMIT_STACK
;
415 else if(EQ(key
, SPL("fsize")))
416 lim
.limit
= RLIMIT_FSIZE
;
417 else if(EQ(key
, SPL("nofiles")))
418 lim
.limit
= RLIMIT_NOFILE
;
420 die("unknown option passed to -limits");
422 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
424 die("could not query rlimits");
426 lim
.rl
.rlim_cur
= parse_human_number(value
);
427 sblist_add(prog_state
.limits
, &lim
);
428 stringptrlist_free(kv
);
430 stringptrlist_free(limit_list
);
436 static void init_queue(void) {
441 memset(&ji
.fa
, 0, sizeof(ji
.fa
));
443 for(i
= 0; i
< prog_state
.numthreads
; i
++) {
444 sblist_add(prog_state
.job_infos
, &ji
);
448 static void write_statefile(uint64_t n
, const char* tempfile
) {
450 stringptr num_b
, *num
= &num_b
;
452 num_b
.ptr
= uint64ToString(n
+ 1, numbuf
);
453 num_b
.size
= strlen(numbuf
);
454 stringptr_tofile((char*) tempfile
, num
);
455 if(rename(tempfile
, prog_state
.statefile
) == -1)
459 // returns 0 if no substitution took place, 1 if successful, -1 on out of buffer.
460 // parameters "source" and "what" have to be zero-terminated
461 int substitute(char* dest
, size_t dest_size
, stringptr
* source
, stringptr
* what
, stringptr
* whit
) {
464 if(!(source
->size
) || (source
->size
< what
->size
) || (!(strstr_result
= strstr(source
->ptr
, what
->ptr
)))) return 0;
465 if(dest_size
< (source
->size
- what
->size
) + whit
->size
+ 1) return -1;
466 len
= strstr_result
- source
->ptr
;
467 memcpy(dest
, source
->ptr
, len
);
469 memcpy(dest
, whit
->ptr
, whit
->size
);
471 if((dest_size
= source
->size
- len
- what
->size
)) {
472 memcpy(dest
, source
->ptr
+ len
+ what
->size
, dest_size
);
479 int main(int argc
, char** argv
) {
480 char inbuf
[4096]; char* fgets_result
;
481 stringptr line_b
, *line
= &line_b
;
482 char* cmd_argv
[4096];
483 char subst_buf
[16][4096];
486 struct timeval reapTime
;
490 unsigned spinup_counter
= 0;
492 char tempdir_buf
[256];
493 char temp_state
[256];
497 if(argc
> 4096) argc
= 4096;
498 prog_state
.threads_running
= 0;
499 prog_state
.free_slots_count
= 0;
500 gettimestamp(&reapTime
);
502 if(parse_args(argc
, argv
)) return 1;
504 if(prog_state
.statefile
)
505 ulz_snprintf(temp_state
, sizeof(temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
507 prog_state
.tempdir
= NULL
;
509 if(prog_state
.buffered
) {
510 prog_state
.tempdir
= tempdir_buf
;
511 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
513 die("could not create tempdir\n");
517 if(prog_state
.cmd_startarg
) {
518 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
519 cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
521 cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
524 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
527 while((fgets_result
= fgets(inbuf
, sizeof(inbuf
), stdin
))) {
531 if(!prog_state
.cmd_startarg
)
532 printf(fgets_result
);
534 stringptr_fromchar(fgets_result
, line
);
535 stringptr_chomp(line
);
538 if(prog_state
.subst_entries
) {
540 sblist_iter(prog_state
.subst_entries
, index
) {
541 SPDECLAREC(source
, argv
[*index
+ prog_state
.cmd_startarg
]);
543 ret
= substitute(subst_buf
[max_subst
], 4096, source
, SPL("{}"), line
);
546 fprintf(stderr
, "fatal: line too long for substitution: %s\n", line
->ptr
);
549 char* lastdot
= stringptr_rchr(line
, '.');
550 stringptr tilLastDot
= *line
;
551 if(lastdot
) tilLastDot
.size
= lastdot
- line
->ptr
;
552 ret
= substitute(subst_buf
[max_subst
], 4096, source
, SPL("{.}"), &tilLastDot
);
553 if(ret
== -1) goto too_long
;
556 cmd_argv
[*index
] = subst_buf
[max_subst
];
562 while(prog_state
.free_slots_count
== 0 || mspassed(&reapTime
) > REAP_INTERVAL_MS
) {
564 gettimestamp(&reapTime
);
565 if(!prog_state
.free_slots_count
) msleep(SLEEP_MS
);
568 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
569 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
573 launch_job(prog_state
.free_slots
[prog_state
.free_slots_count
-1], cmd_argv
);
574 prog_state
.free_slots_count
--;
576 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || prog_state
.free_slots_count
== 0)) {
577 write_statefile(n
, temp_state
);
586 if(prog_state
.delayedflush
)
587 write_statefile(n
- 1, temp_state
);
589 while(prog_state
.threads_running
) {
591 if(prog_state
.threads_running
) msleep(SLEEP_MS
);
594 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
595 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
596 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
598 if(prog_state
.tempdir
)
599 rmdir(prog_state
.tempdir
);