2 #define _POSIX_C_SOURCE 200809L
4 #define _XOPEN_SOURCE 700
8 #include "../lib/include/optparser.h"
9 #include "../lib/include/stringptr.h"
10 #include "../lib/include/stringptrlist.h"
11 #include "../lib/include/sblist.h"
12 #include "../lib/include/strlib.h"
13 #include "../lib/include/timelib.h"
14 #include "../lib/include/filelib.h"
24 /* defines the amount of milliseconds to sleep between each call to the reaper,
25 * once all free slots are exhausted */
28 /* defines after how many milliseconds a reap of the running processes is obligatory. */
29 #define REAP_INTERVAL_MS 100
31 /* process handling */
38 #include <sys/resource.h>
40 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
41 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
42 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
43 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
45 static int prlimit(int pid
, ...) {
47 fprintf(stderr
, "prlimit() not implemented on this system\n");
58 posix_spawn_file_actions_t fa
;
66 /* defines how many slots our free_slots struct can take */
71 unsigned threads_running
;
75 sblist
* subst_entries
;
77 unsigned cmd_startarg
;
78 size_t free_slots
[MAX_SLOTS
];
79 unsigned free_slots_count
;
81 int delayedspinup_interval
; /* use a random delay until the queue gets filled for the first time.
82 the top value in ms can be supplied via a command line switch.
83 this option makes only sense if the interval is somewhat smaller than the
84 expected runtime of the average job.
85 this option is useful to not overload a network app due to hundreds of
86 parallel connection tries on startup.
88 int buffered
:1; /* write stdout and stderr of each task into a file,
89 and print it to stdout once the process ends.
90 this prevents mixing up of the output of multiple tasks. */
91 int delayedflush
:1; /* only write to statefile whenever all processes are busy, and at program end.
92 this means faster program execution, but could also be imprecise if the number of
93 jobs is small or smaller than the available threadcount / MAX_SLOTS. */
94 int join_output
:1; /* join stdout and stderr of launched jobs into stdout */
97 prog_state_s prog_state
;
100 extern char** environ
;
102 int makeLogfilename(char* buf
, size_t bufsize
, size_t jobindex
, int is_stderr
) {
103 int ret
= snprintf(buf
, bufsize
,
104 is_stderr
? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
105 prog_state
.tempdir
, (unsigned) jobindex
);
106 return ret
> 0 && (size_t) ret
< bufsize
;
109 void launch_job(size_t jobindex
, char** argv
) {
110 char stdout_filename_buf
[256];
111 char stderr_filename_buf
[256];
112 job_info
* job
= sblist_get(prog_state
.job_infos
, jobindex
);
114 if(job
->pid
!= -1) return;
116 if(prog_state
.buffered
) {
117 if((!makeLogfilename(stdout_filename_buf
, sizeof(stdout_filename_buf
), jobindex
, 0)) ||
118 ((!prog_state
.join_output
) && !makeLogfilename(stderr_filename_buf
, sizeof(stderr_filename_buf
), jobindex
, 1)) ) {
119 fprintf(stderr
, "temp filename too long!\n");
124 errno
= posix_spawn_file_actions_init(&job
->fa
);
125 if(errno
) goto spawn_error
;
126 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 0);
127 if(errno
) goto spawn_error
;
129 if(prog_state
.buffered
) {
130 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 1);
131 if(errno
) goto spawn_error
;
132 errno
= posix_spawn_file_actions_addclose(&job
->fa
, 2);
133 if(errno
) goto spawn_error
;
136 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 0, "/dev/null", O_RDONLY
, 0);
137 if(errno
) goto spawn_error
;
139 if(prog_state
.buffered
) {
140 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 1, stdout_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
141 if(errno
) goto spawn_error
;
142 if(prog_state
.join_output
)
143 errno
= posix_spawn_file_actions_adddup2(&job
->fa
, 1, 2);
145 errno
= posix_spawn_file_actions_addopen(&job
->fa
, 2, stderr_filename_buf
, O_WRONLY
| O_CREAT
| O_TRUNC
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
);
146 if(errno
) goto spawn_error
;
149 errno
= posix_spawnp(&job
->pid
, argv
[0], &job
->fa
, NULL
, argv
, environ
);
153 perror("posix_spawn");
155 prog_state
.threads_running
++;
156 if(prog_state
.limits
) {
158 sblist_iter(prog_state
.limits
, limit
) {
159 if(prlimit(job
->pid
, limit
->limit
, &limit
->rl
, NULL
) == -1)
166 static void addJobSlot(size_t job_id
) {
167 if(prog_state
.free_slots_count
< MAX_SLOTS
) {
168 prog_state
.free_slots
[prog_state
.free_slots_count
] = job_id
;
169 prog_state
.free_slots_count
++;
173 static void dump_output(size_t job_id
, int is_stderr
) {
174 char out_filename_buf
[256];
176 FILE* dst
, *out_stream
= is_stderr
? stderr
: stdout
;
179 makeLogfilename(out_filename_buf
, sizeof(out_filename_buf
), job_id
, is_stderr
);
181 dst
= fopen(out_filename_buf
, "r");
182 while(dst
&& (nread
= fread(buf
, 1, sizeof(buf
), dst
))) {
183 fwrite(buf
, 1, nread
, out_stream
);
184 if(nread
< sizeof(buf
)) break;
192 /* reap childs and return pointer to a free "slot" or NULL */
193 static void reapChilds(void) {
198 prog_state
.free_slots_count
= 0;
200 for(i
= 0; i
< sblist_getsize(prog_state
.job_infos
); i
++) {
201 job
= sblist_get(prog_state
.job_infos
, i
);
203 ret
= waitpid(job
->pid
, &retval
, WNOHANG
);
205 // error or changed state.
211 //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL);
214 //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL);
217 posix_spawn_file_actions_destroy(&job
->fa
);
220 prog_state
.threads_running
--;
222 if(prog_state
.buffered
) {
224 if(!prog_state
.join_output
)
235 __attribute__((noreturn
))
236 static void die(const char* msg
) {
237 fprintf(stderr
, msg
);
241 static long parse_human_number(stringptr
* num
) {
244 if(num
&& num
->size
&& num
->size
< sizeof(buf
)) {
245 if(num
->ptr
[num
->size
-1] == 'G')
246 ret
= 1024 * 1024 * 1024;
247 else if(num
->ptr
[num
->size
-1] == 'M')
249 else if(num
->ptr
[num
->size
-1] == 'K')
252 memcpy(buf
, num
->ptr
, num
->size
);
254 return atol(buf
) * ret
;
256 return atol(num
->ptr
);
261 static int syntax(void) {
263 "jobflow (C) rofl0r\n"
264 "------------------\n"
265 "this program is intended to be used as a recipient of another programs output\n"
266 "it launches processes to which the current line can be passed as an argument\n"
267 "using {} for substitution (as in find -exec).\n"
269 "available options:\n\n"
270 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
271 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
272 "-exec ./mycommand {}\n"
275 " XXX=number of entries to skip\n"
277 " XXX=number of parallel processes to spawn]\n"
279 " resume from last jobnumber stored in statefile\n"
282 " saves last launched jobnumber into a file\n"
284 " only write to statefile whenever all processes are busy,\n"
285 " and at program end\n"
286 "-delayedspinup=XXX\n"
287 " XXX=maximum amount of milliseconds\n"
288 " ...to wait when spinning up a fresh set of processes\n"
289 " a random value between 0 and the chosen amount is used to delay initial\n"
291 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
292 " activity on program startup\n"
294 " store the stdout and stderr of launched processes into a temporary file\n"
295 " which will be printed after a process has finished.\n"
296 " this prevents mixing up of output of different processes.\n"
298 " if -buffered, write both stdout and stderr into the same file.\n"
299 " this saves the chronological order of the output, and the combined output\n"
300 " will only be printed to stdout.\n"
301 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
302 " sets the rlimit of the new created processes.\n"
303 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
304 "-exec command with args\n"
305 " everything past -exec is treated as the command to execute on each line of\n"
306 " stdin received. the line can be passed as an argument using {}."
311 static int parse_args(int argc
, char** argv
) {
312 op_state op_b
, *op
= &op_b
;
313 op_init(op
, argc
, argv
);
315 if(argc
== 1 || op_hasflag(op
, SPL("-help")))
317 op_temp
= op_get(op
, SPL("threads"));
318 prog_state
.numthreads
= op_temp
? atoi(op_temp
) : 1;
319 op_temp
= op_get(op
, SPL("statefile"));
320 prog_state
.statefile
= op_temp
;
322 op_temp
= op_get(op
, SPL("skip"));
323 prog_state
.skip
= op_temp
? atoi(op_temp
) : 0;
324 if(op_hasflag(op
, SPL("resume"))) {
325 if(!prog_state
.statefile
) die("-resume needs -statefile\n");
326 if(access(prog_state
.statefile
, W_OK
| R_OK
) != -1) {
327 stringptr
* fc
= stringptr_fromfile(prog_state
.statefile
);
328 prog_state
.skip
= atoi(fc
->ptr
);
332 prog_state
.delayedflush
= 0;
333 if(op_hasflag(op
, SPL("delayedflush"))) {
334 if(!prog_state
.statefile
) die("-delayedflush needs -statefile\n");
335 prog_state
.delayedflush
= 1;
338 op_temp
= op_get(op
, SPL("delayedspinup"));
339 prog_state
.delayedspinup_interval
= op_temp
? atoi(op_temp
) : 0;
341 prog_state
.cmd_startarg
= 0;
342 prog_state
.subst_entries
= NULL
;
344 if(op_hasflag(op
, SPL("exec"))) {
347 for(i
= 1; i
< (unsigned) argc
; i
++) {
348 if(str_equal(argv
[i
], "-exec")) {
353 if(r
&& r
< (unsigned) argc
) {
354 prog_state
.cmd_startarg
= r
;
357 // save entries which must be substituted, to save some cycles.
358 prog_state
.subst_entries
= sblist_new(sizeof(uint32_t), 16);
359 for(i
= r
; i
< (unsigned) argc
; i
++) {
361 if(strstr(argv
[i
], "{}")) sblist_add(prog_state
.subst_entries
, &subst_ent
);
365 prog_state
.buffered
= 0;
366 if(op_hasflag(op
, SPL("buffered"))) {
367 prog_state
.buffered
= 1;
370 prog_state
.join_output
= 0;
371 if(op_hasflag(op
, SPL("joinoutput"))) {
372 if(!prog_state
.buffered
) die("-joinoutput needs -buffered\n");
373 prog_state
.join_output
= 1;
376 prog_state
.limits
= NULL
;
377 op_temp
= op_get(op
, SPL("limits"));
380 SPDECLAREC(limits
, op_temp
);
381 stringptrlist
* limit_list
= stringptr_splitc(limits
, ',');
383 stringptr
* key
, *value
;
385 if(stringptrlist_getsize(limit_list
)) {
386 prog_state
.limits
= sblist_new(sizeof(limit_rec
), stringptrlist_getsize(limit_list
));
387 for(i
= 0; i
< stringptrlist_getsize(limit_list
); i
++) {
388 kv
= stringptr_splitc(stringptrlist_get(limit_list
, i
), '=');
389 if(stringptrlist_getsize(kv
) != 2) continue;
390 key
= stringptrlist_get(kv
, 0);
391 value
= stringptrlist_get(kv
, 1);
392 if(EQ(key
, SPL("mem")))
393 lim
.limit
= RLIMIT_AS
;
394 else if(EQ(key
, SPL("cpu")))
395 lim
.limit
= RLIMIT_CPU
;
396 else if(EQ(key
, SPL("stack")))
397 lim
.limit
= RLIMIT_STACK
;
398 else if(EQ(key
, SPL("fsize")))
399 lim
.limit
= RLIMIT_FSIZE
;
400 else if(EQ(key
, SPL("nofiles")))
401 lim
.limit
= RLIMIT_NOFILE
;
403 die("unknown option passed to -limits");
405 if(getrlimit(lim
.limit
, &lim
.rl
) == -1) {
407 die("could not query rlimits");
409 lim
.rl
.rlim_cur
= parse_human_number(value
);
410 sblist_add(prog_state
.limits
, &lim
);
411 stringptrlist_free(kv
);
413 stringptrlist_free(limit_list
);
419 static void init_queue(void) {
424 memset(&ji
.fa
, 0, sizeof(ji
.fa
));
426 for(i
= 0; i
< prog_state
.numthreads
; i
++) {
427 sblist_add(prog_state
.job_infos
, &ji
);
431 static void write_statefile(uint64_t n
, const char* tempfile
) {
433 stringptr num_b
, *num
= &num_b
;
435 num_b
.ptr
= uint64ToString(n
+ 1, numbuf
);
436 num_b
.size
= strlen(numbuf
);
437 stringptr_tofile((char*) tempfile
, num
);
438 if(rename(tempfile
, prog_state
.statefile
) == -1)
442 int main(int argc
, char** argv
) {
443 char inbuf
[4096]; char* fgets_result
, *strstr_result
, *p
;
444 stringptr line_b
, *line
= &line_b
;
445 char* cmd_argv
[4096];
446 char subst_buf
[4096][16];
449 struct timeval reapTime
;
453 unsigned spinup_counter
= 0;
455 char tempdir_buf
[256];
456 char temp_state
[256];
460 if(argc
> 4096) argc
= 4096;
461 prog_state
.threads_running
= 0;
462 prog_state
.free_slots_count
= 0;
463 gettimestamp(&reapTime
);
465 if(parse_args(argc
, argv
)) return 1;
467 if(prog_state
.statefile
)
468 ulz_snprintf(temp_state
, sizeof(temp_state
), "%s.%u", prog_state
.statefile
, (unsigned) getpid());
470 prog_state
.tempdir
= NULL
;
472 if(prog_state
.buffered
) {
473 prog_state
.tempdir
= tempdir_buf
;
474 if(mktempdir("jobflow", tempdir_buf
, sizeof(tempdir_buf
)) == 0) {
476 die("could not create tempdir\n");
480 if(prog_state
.cmd_startarg
) {
481 for(i
= prog_state
.cmd_startarg
; i
< (unsigned) argc
; i
++) {
482 cmd_argv
[i
- prog_state
.cmd_startarg
] = argv
[i
];
484 cmd_argv
[argc
- prog_state
.cmd_startarg
] = NULL
;
487 prog_state
.job_infos
= sblist_new(sizeof(job_info
), prog_state
.numthreads
);
490 while((fgets_result
= fgets(inbuf
, sizeof(inbuf
), stdin
))) {
494 if(!prog_state
.cmd_startarg
)
495 printf(fgets_result
);
497 stringptr_fromchar(fgets_result
, line
);
498 stringptr_chomp(line
);
501 if(prog_state
.subst_entries
) {
503 sblist_iter(prog_state
.subst_entries
, index
) {
504 p
= argv
[*index
+ prog_state
.cmd_startarg
];
505 if((strstr_result
= strstr(p
, "{}"))) {
507 j
= strstr_result
- p
;
508 if(j
) memcpy(subst_buf
[max_subst
], p
, j
);
509 strncpy(&subst_buf
[max_subst
][j
], line
->ptr
, 4096 - j
);
512 fprintf(stderr
, "fatal: line too long for substitution: %s\n", line
->ptr
);
515 strncpy(&subst_buf
[max_subst
][j
], strstr_result
+ 2, 4096 - j
);
517 cmd_argv
[*index
] = subst_buf
[max_subst
];
519 if(max_subst
>= 16) die("too many substitutions!\n");
524 while(prog_state
.free_slots_count
== 0 || mspassed(&reapTime
) > REAP_INTERVAL_MS
) {
526 gettimestamp(&reapTime
);
527 if(!prog_state
.free_slots_count
) msleep(SLEEP_MS
);
530 if(prog_state
.delayedspinup_interval
&& spinup_counter
< (prog_state
.numthreads
* 2)) {
531 msleep(rand() % (prog_state
.delayedspinup_interval
+ 1));
535 launch_job(prog_state
.free_slots
[prog_state
.free_slots_count
-1], cmd_argv
);
536 prog_state
.free_slots_count
--;
538 if(prog_state
.statefile
&& (prog_state
.delayedflush
== 0 || prog_state
.free_slots_count
== 0)) {
539 write_statefile(n
, temp_state
);
548 if(prog_state
.delayedflush
)
549 write_statefile(n
- 1, temp_state
);
551 while(prog_state
.threads_running
) {
553 if(prog_state
.threads_running
) msleep(SLEEP_MS
);
556 if(prog_state
.subst_entries
) sblist_free(prog_state
.subst_entries
);
557 if(prog_state
.job_infos
) sblist_free(prog_state
.job_infos
);
558 if(prog_state
.limits
) sblist_free(prog_state
.limits
);
560 if(prog_state
.tempdir
)
561 rmdir(prog_state
.tempdir
);