2 #define _POSIX_C_SOURCE 200809L
4 #define _XOPEN_SOURCE 700
8 #include "../lib/include/optparser.h"
9 #include "../lib/include/stringptr.h"
10 #include "../lib/include/stringptrlist.h"
11 #include "../lib/include/sblist.h"
12 #include "../lib/include/strlib.h"
13 #include "../lib/include/timelib.h"
14 #include "../lib/include/filelib.h"
24 /* defines the amount of milliseconds to sleep between each call to the reaper,
25 * once all free slots are exhausted */
28 /* defines after how many milliseconds a reap of the running processes is obligatory. */
29 #define REAP_INTERVAL_MS 100
31 /* process handling */
38 #include <sys/resource.h>
40 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
41 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
42 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
43 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
45 static int prlimit(int pid
, ...) {
47 fprintf(stderr
, "prlimit() not implemented on this system\n");
58 posix_spawn_file_actions_t fa
;
66 /* defines how many slots our free_slots struct can take */
71 unsigned threads_running
;
75 sblist
* subst_entries
;
77 unsigned cmd_startarg
;
78 size_t free_slots
[MAX_SLOTS
];
79 unsigned free_slots_count
;
81 int delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
82 the top value in ms can be supplied via a command line switch.
83 this option makes only sense if the interval is somewhat smaller than the
84 expected runtime of the average job.
85 this option is useful to not overload a network app due to hundreds of
86 parallel connection tries on startup.
88 int buffered
:1; /* write stdout and stderr of each task into a file,
89 and print it to stdout once the process ends.
90 this prevents mixing up of the output of multiple tasks. */
91 int delayedflush
:1; /* only write to statefile whenever all processes are busy, and at program end.
92 this means faster program execution, but could also be imprecise if the number of
93 jobs is small or smaller than the available threadcount / MAX_SLOTS. */
94 int join_output
:1; /* join stdout and stderr of launched jobs into stdout */
97 prog_state_s prog_state
;
100 extern char** environ
;
102 int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
103 int ret
= snprintf(buf
, bufsize
,
104 is_stderr
? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
105 prog_state
.tempdir
, (unsigned) jobindex
);
106 return ret
> 0 && (size_t) ret
< bufsize
;
109 void launch_job(size_t jobindex
, char** argv
) {
110 char stdout_filename_buf
[256];
111 char stderr_filename_buf
[256];
112 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
114 if(job
->pid
!= -1) return;
116 if(prog_state
.buffered
) {
117 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
118 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
119 fprintf(stderr
, "temp filename too long!\n");
124 errno
= posix_spawn_file_actions_init(&job
->fa
);
125 if(errno
) goto spawn_error
;
126 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
127 if(errno
) goto spawn_error
;
129 if(prog_state
.buffered
) {
130 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
131 if(errno
) goto spawn_error
;
132 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
133 if(errno
) goto spawn_error
;
136 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
137 if(errno
) goto spawn_error
;
139 if(prog_state
.buffered
) {
140 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
141 if(errno
) goto spawn_error
;
142 if(prog_state
.join_output
)
143 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
145 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
146 if(errno
) goto spawn_error
;
149 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
153 perror("posix_spawn");
155 prog_state
.threads_running
++;
156 if(prog_state
.limits
) {
158 sblist_iter(prog_state
.limits
, limit
) {
159 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
166 static void addJobSlot(size_t job_id
) {
167 if(prog_state
.free_slots_count
< MAX_SLOTS
) {
168 prog_state
.free_slots
[prog_state
.free_slots_count
] = job_id
;
169 prog_state
.free_slots_count
++;
173 static void dump_output(size_t job_id
, int is_stderr
) {
174 char out_filename_buf
[256];
176 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
179 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
181 dst
= fopen(out_filename_buf
, "r");
182 while(dst
&& (nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
183 fwrite(buf
, 1, nread
, out_stream
);
184 if(nread
< sizeof(buf
)) break;
192 /* reap childs and return pointer to a free "slot" or NULL */
193 static void reapChilds(void) {
198 prog_state
.free_slots_count
= 0;
200 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
201 job
= sblist_get(prog_state
.job_infos
, i
);
203 ret
= waitpid(job
->pid
, &retval
, WNOHANG
);
205 // error or changed state.
211 //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL);
214 //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL);
217 posix_spawn_file_actions_destroy(&job
->fa
);
220 prog_state
.threads_running
--;
222 if(prog_state
.buffered
) {
224 if(!prog_state
.join_output
)
234 __attribute__((noreturn
))
235 static void die(const char* msg
) {
236 fprintf(stderr
, msg
);
240 static long parse_human_number(stringptr
* num
) {
243 if(num
&& num
->size
&& num
->size
< sizeof(buf
)) {
244 if(num
->ptr
[num
->size
-1] == 'G')
245 ret
= 1024 * 1024 * 1024;
246 else if(num
->ptr
[num
->size
-1] == 'M')
248 else if(num
->ptr
[num
->size
-1] == 'K')
251 memcpy(buf
, num
->ptr
, num
->size
);
253 return atol(buf
) * ret
;
255 return atol(num
->ptr
);
260 static int syntax(void) {
262 "jobflow (C) rofl0r\n"
263 "------------------\n"
264 "this program is intended to be used as a recipient of another programs output\n"
265 "it launches processes to which the current line can be passed as an argument\n"
266 "using {} for substitution (as in find -exec).\n"
268 "available options:\n\n"
269 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
270 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
271 "-exec ./mycommand {}\n"
274 " XXX=number of entries to skip\n"
276 " XXX=number of parallel processes to spawn]\n"
278 " resume from last jobnumber stored in statefile\n"
281 " saves last launched jobnumber into a file\n"
283 " only write to statefile whenever all processes are busy,\n"
284 " and at program end\n"
285 "-delayedspinup=XXX\n"
286 " XXX=maximum amount of milliseconds\n"
287 " ...to wait when spinning up a fresh set of processes\n"
288 " a random value between 0 and the chosen amount is used to delay initial\n"
290 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
291 " activity on program startup\n"
293 " store the stdout and stderr of launched processes into a temporary file\n"
294 " which will be printed after a process has finished.\n"
295 " this prevents mixing up of output of different processes.\n"
297 " if -buffered, write both stdout and stderr into the same file.\n"
298 " this saves the chronological order of the output, and the combined output\n"
299 " will only be printed to stdout.\n"
300 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
301 " sets the rlimit of the new created processes.\n"
302 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
303 "-exec command with args\n"
304 " everything past -exec is treated as the command to execute on each line of\n"
305 " stdin received. the line can be passed as an argument using {}."
310 static int parse_args(int argc
, char** argv
) {
311 op_state op_b
, *op
= &op_b
;
312 op_init(op
, argc
, argv
);
314 if(argc
== 1 || op_hasflag(op
, SPL("-help")))
316 op_temp
= op_get(op
, SPL("threads"));
317 prog_state
.numthreads
= op_temp
? atoi(op_temp
) : 1;
318 op_temp
= op_get(op
, SPL("statefile"));
319 prog_state
.statefile
= op_temp
;
321 op_temp
= op_get(op
, SPL("skip"));
322 prog_state
.skip
= op_temp
? atoi(op_temp
) : 0;
323 if(op_hasflag(op
, SPL("resume"))) {
324 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
325 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
326 stringptr
* fc
= stringptr_fromfile(prog_state
.statefile
);
327 prog_state
.skip
= atoi(fc
->ptr
);
331 prog_state
.delayedflush
= 0;
332 if(op_hasflag(op
, SPL("delayedflush"))) {
333 if(!prog_state
.statefile
) die("-delayedflush needs -statefile\n");
334 prog_state
.delayedflush
= 1;
337 op_temp
= op_get(op
, SPL("delayedspinup"));
338 prog_state
.delayedspinup_interval
= op_temp
? atoi(op_temp
) : 0;
340 prog_state
.cmd_startarg
= 0;
341 prog_state
.subst_entries
= NULL
;
343 if(op_hasflag(op
, SPL("exec"))) {
346 for(i
= 1; i
< (unsigned) argc
; i
++) {
347 if(str_equal(argv
[i
], "-exec")) {
352 if(r
&& r
< (unsigned) argc
) {
353 prog_state
.cmd_startarg
= r
;
356 // save entries which must be substituted, to save some cycles.
357 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
358 for(i
= r
; i
< (unsigned) argc
; i
++) {
360 if(strstr(argv
[i
], "{}")) sblist_add(prog_state
.subst_entries
, &subst_ent
);
364 prog_state
.buffered
= 0;
365 if(op_hasflag(op
, SPL("buffered"))) {
366 prog_state
.buffered
= 1;
369 prog_state
.join_output
= 0;
370 if(op_hasflag(op
, SPL("joinoutput"))) {
371 if(!prog_state
.buffered
) die("-joinoutput needs -buffered\n");
372 prog_state
.join_output
= 1;
375 prog_state
.limits
= NULL
;
376 op_temp
= op_get(op
, SPL("limits"));
379 SPDECLAREC(limits
, op_temp
);
380 stringptrlist
* limit_list
= stringptr_splitc(limits
, ',');
382 stringptr
* key
, *value
;
384 if(stringptrlist_getsize(limit_list
)) {
385 prog_state
.limits
= sblist_new(sizeof(limit_rec
), stringptrlist_getsize(limit_list
));
386 for(i
= 0; i
< stringptrlist_getsize(limit_list
); i
++) {
387 kv
= stringptr_splitc(stringptrlist_get(limit_list
, i
), '=');
388 if(stringptrlist_getsize(kv
) != 2) continue;
389 key
= stringptrlist_get(kv
, 0);
390 value
= stringptrlist_get(kv
, 1);
391 if(EQ(key
, SPL("mem")))
392 lim
.limit
= RLIMIT_AS
;
393 else if(EQ(key
, SPL("cpu")))
394 lim
.limit
= RLIMIT_CPU
;
395 else if(EQ(key
, SPL("stack")))
396 lim
.limit
= RLIMIT_STACK
;
397 else if(EQ(key
, SPL("fsize")))
398 lim
.limit
= RLIMIT_FSIZE
;
399 else if(EQ(key
, SPL("nofiles")))
400 lim
.limit
= RLIMIT_NOFILE
;
402 die("unknown option passed to -limits");
404 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
406 die("could not query rlimits");
408 lim
.rl
.rlim_cur
= parse_human_number(value
);
409 sblist_add(prog_state
.limits
, &lim
);
410 stringptrlist_free(kv
);
412 stringptrlist_free(limit_list
);
418 static void init_queue(void) {
423 memset(&ji
.fa
, 0, sizeof(ji
.fa
));
425 for(i
= 0; i
< prog_state
.numthreads
; i
++) {
426 sblist_add(prog_state
.job_infos
, &ji
);
430 static void write_statefile(uint64_t n
, const char* tempfile
) {
432 stringptr num_b
, *num
= &num_b
;
434 num_b
.ptr
= uint64ToString(n
+ 1, numbuf
);
435 num_b
.size
= strlen(numbuf
);
436 stringptr_tofile((char*) tempfile
, num
);
437 if(rename(tempfile
, prog_state
.statefile
) == -1)
441 int main(int argc
, char** argv
) {
442 char inbuf
[4096]; char* fgets_result
, *strstr_result
, *p
;
443 stringptr line_b
, *line
= &line_b
;
444 char* cmd_argv
[4096];
445 char subst_buf
[4096][16];
448 struct timeval reapTime
;
452 unsigned spinup_counter
= 0;
454 char tempdir_buf
[256];
455 char temp_state
[256];
459 if(argc
> 4096) argc
= 4096;
460 prog_state
.threads_running
= 0;
461 prog_state
.free_slots_count
= 0;
462 gettimestamp(&reapTime
);
464 if(parse_args(argc
, argv
)) return 1;
466 if(prog_state
.statefile
)
467 ulz_snprintf(temp_state
, sizeof(temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
469 prog_state
.tempdir
= NULL
;
471 if(prog_state
.buffered
) {
472 prog_state
.tempdir
= tempdir_buf
;
473 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
475 die("could not create tempdir\n");
479 if(prog_state
.cmd_startarg
) {
480 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
481 cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
483 cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
486 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
489 while((fgets_result
= fgets(inbuf
, sizeof(inbuf
), stdin
))) {
493 if(!prog_state
.cmd_startarg
)
494 printf(fgets_result
);
496 stringptr_fromchar(fgets_result
, line
);
497 stringptr_chomp(line
);
500 if(prog_state
.subst_entries
) {
502 sblist_iter(prog_state
.subst_entries
, index
) {
503 p
= argv
[*index
+ prog_state
.cmd_startarg
];
504 if((strstr_result
= strstr(p
, "{}"))) {
506 j
= strstr_result
- p
;
507 if(j
) memcpy(subst_buf
[max_subst
], p
, j
);
508 strncpy(&subst_buf
[max_subst
][j
], line
->ptr
, 4096 - j
);
511 fprintf(stderr
, "fatal: line too long for substitution: %s\n", line
->ptr
);
514 strncpy(&subst_buf
[max_subst
][j
], strstr_result
+ 2, 4096 - j
);
516 cmd_argv
[*index
] = subst_buf
[max_subst
];
518 if(max_subst
>= 16) die("too many substitutions!\n");
523 while(prog_state
.free_slots_count
== 0 || mspassed(&reapTime
) > REAP_INTERVAL_MS
) {
525 gettimestamp(&reapTime
);
526 if(!prog_state
.free_slots_count
) msleep(SLEEP_MS
);
529 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
530 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
534 launch_job(prog_state
.free_slots
[prog_state
.free_slots_count
-1], cmd_argv
);
535 prog_state
.free_slots_count
--;
537 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || prog_state
.free_slots_count
== 0)) {
538 write_statefile(n
, temp_state
);
547 if(prog_state
.delayedflush
)
548 write_statefile(n
- 1, temp_state
);
550 while(prog_state
.threads_running
) {
552 if(prog_state
.threads_running
) msleep(SLEEP_MS
);
555 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
556 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
557 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
559 if(prog_state
.tempdir
)
560 rmdir(prog_state
.tempdir
);