fix missing output in unbuffered mode when redirected to a file
[rofl0r-jobflow.git] / jobflow.c
blobb8cf31ffa66e849d9e7aef24fa1dade2946153e1
1 /*
2 Copyright (C) 2012 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #undef _POSIX_C_SOURCE
19 #define _POSIX_C_SOURCE 200809L
20 #undef _XOPEN_SOURCE
21 #define _XOPEN_SOURCE 700
22 #undef _GNU_SOURCE
23 #define _GNU_SOURCE
25 #include "../lib/include/optparser.h"
26 #include "../lib/include/stringptr.h"
27 #include "../lib/include/stringptrlist.h"
28 #include "../lib/include/sblist.h"
29 #include "../lib/include/strlib.h"
30 #include "../lib/include/timelib.h"
31 #include "../lib/include/filelib.h"
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <stdint.h>
37 #include <stddef.h>
38 #include <errno.h>
39 #include <time.h>
41 /* defines the amount of milliseconds to sleep between each call to the reaper,
42 * once all free slots are exhausted */
43 #define SLEEP_MS 21
45 /* defines after how many milliseconds a reap of the running processes is obligatory. */
46 #define REAP_INTERVAL_MS 100
48 /* process handling */
50 #include <fcntl.h>
51 #include <spawn.h>
52 #include <sys/wait.h>
53 #include <sys/stat.h>
55 #include <sys/resource.h>
57 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
58 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
59 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
60 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
61 static int prlimit(int pid, ...) {
62 (void) pid;
63 fprintf(stderr, "prlimit() not implemented on this system\n");
64 errno = EINVAL;
65 return -1;
67 #endif
70 #include <sys/time.h>
72 typedef struct {
73 pid_t pid;
74 posix_spawn_file_actions_t fa;
75 } job_info;
77 typedef struct {
78 int limit;
79 struct rlimit rl;
80 } limit_rec;
82 /* defines how many slots our free_slots struct can take */
83 #define MAX_SLOTS 128
85 typedef struct {
86 unsigned numthreads;
87 unsigned threads_running;
88 char* statefile;
89 unsigned skip;
90 sblist* job_infos;
91 sblist* subst_entries;
92 sblist* limits;
93 unsigned cmd_startarg;
94 size_t free_slots[MAX_SLOTS];
95 unsigned free_slots_count;
96 char* tempdir;
97 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
98 the top value in ms can be supplied via a command line switch.
99 this option makes only sense if the interval is somewhat smaller than the
100 expected runtime of the average job.
101 this option is useful to not overload a network app due to hundreds of
102 parallel connection tries on startup.
104 int buffered:1; /* write stdout and stderr of each task into a file,
105 and print it to stdout once the process ends.
106 this prevents mixing up of the output of multiple tasks. */
107 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
108 this means faster program execution, but could also be imprecise if the number of
109 jobs is small or smaller than the available threadcount / MAX_SLOTS. */
110 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
111 } prog_state_s;
113 prog_state_s prog_state;
116 extern char** environ;
118 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
119 int ret = snprintf(buf, bufsize,
120 is_stderr ? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
121 prog_state.tempdir, (unsigned) jobindex);
122 return ret > 0 && (size_t) ret < bufsize;
125 void launch_job(size_t jobindex, char** argv) {
126 char stdout_filename_buf[256];
127 char stderr_filename_buf[256];
128 job_info* job = sblist_get(prog_state.job_infos, jobindex);
130 if(job->pid != -1) return;
132 if(prog_state.buffered) {
133 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
134 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
135 fprintf(stderr, "temp filename too long!\n");
136 return;
140 errno = posix_spawn_file_actions_init(&job->fa);
141 if(errno) goto spawn_error;
142 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
143 if(errno) goto spawn_error;
145 if(prog_state.buffered) {
146 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
147 if(errno) goto spawn_error;
148 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
149 if(errno) goto spawn_error;
152 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
153 if(errno) goto spawn_error;
155 if(prog_state.buffered) {
156 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
157 if(errno) goto spawn_error;
158 if(prog_state.join_output)
159 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
160 else
161 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
162 if(errno) goto spawn_error;
165 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
166 if(errno) {
167 spawn_error:
168 job->pid = -1;
169 perror("posix_spawn");
170 } else {
171 prog_state.threads_running++;
172 if(prog_state.limits) {
173 limit_rec* limit;
174 sblist_iter(prog_state.limits, limit) {
175 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
176 perror("prlimit");
182 static void releaseJobSlot(size_t job_id) {
183 if(prog_state.free_slots_count < MAX_SLOTS) {
184 prog_state.free_slots[prog_state.free_slots_count] = job_id;
185 prog_state.free_slots_count++;
189 static void dump_output(size_t job_id, int is_stderr) {
190 char out_filename_buf[256];
191 char buf[4096];
192 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
193 size_t nread;
195 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
197 dst = fopen(out_filename_buf, "r");
198 if(dst) {
199 while((nread = fread(buf, 1, sizeof(buf), dst))) {
200 fwrite(buf, 1, nread, out_stream);
201 if(nread < sizeof(buf)) break;
203 fclose(dst);
204 fflush(out_stream);
208 static void reapChilds(void) {
209 size_t i;
210 job_info* job;
211 int ret, retval;
213 prog_state.free_slots_count = 0;
215 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
216 job = sblist_get(prog_state.job_infos, i);
217 if(job->pid != -1) {
218 ret = waitpid(job->pid, &retval, WNOHANG);
219 if(ret != 0) {
220 // error or changed state.
221 if(ret == -1) {
222 perror("waitpid");
223 continue;
225 if(!retval) {
226 //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL);
227 } else {
228 //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL);
230 job->pid = -1;
231 posix_spawn_file_actions_destroy(&job->fa);
232 //job->passed = 0;
233 releaseJobSlot(i);
234 prog_state.threads_running--;
236 if(prog_state.buffered) {
237 dump_output(i, 0);
238 if(!prog_state.join_output)
239 dump_output(i, 1);
242 } else
243 releaseJobSlot(i);
248 __attribute__((noreturn))
249 static void die(const char* msg) {
250 fprintf(stderr, msg);
251 exit(1);
254 static unsigned long parse_human_number(stringptr* num) {
255 unsigned long ret = 0;
256 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
257 const char* kmg = "KMG";
258 char* kmgind;
259 if(num && num->size) {
260 ret = atol(num->ptr);
261 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
262 ret *= mul[kmgind - kmg];
264 return ret;
267 static int syntax(void) {
268 puts(
269 "jobflow (C) rofl0r\n"
270 "------------------\n"
271 "this program is intended to be used as a recipient of another programs output\n"
272 "it launches processes to which the current line can be passed as an argument\n"
273 "using {} for substitution (as in find -exec).\n"
274 "\n"
275 "available options:\n\n"
276 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
277 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
278 "-exec ./mycommand {}\n"
279 "\n"
280 "-skip=XXX\n"
281 " XXX=number of entries to skip\n"
282 "-threads=XXX\n"
283 " XXX=number of parallel processes to spawn]\n"
284 "-resume\n"
285 " resume from last jobnumber stored in statefile\n"
286 "-statefile=XXX\n"
287 " XXX=filename\n"
288 " saves last launched jobnumber into a file\n"
289 "-delayedflush\n"
290 " only write to statefile whenever all processes are busy,\n"
291 " and at program end\n"
292 "-delayedspinup=XXX\n"
293 " XXX=maximum amount of milliseconds\n"
294 " ...to wait when spinning up a fresh set of processes\n"
295 " a random value between 0 and the chosen amount is used to delay initial\n"
296 " spinup.\n"
297 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
298 " activity on program startup\n"
299 "-buffered\n"
300 " store the stdout and stderr of launched processes into a temporary file\n"
301 " which will be printed after a process has finished.\n"
302 " this prevents mixing up of output of different processes.\n"
303 "-joinoutput\n"
304 " if -buffered, write both stdout and stderr into the same file.\n"
305 " this saves the chronological order of the output, and the combined output\n"
306 " will only be printed to stdout.\n"
307 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
308 " sets the rlimit of the new created processes.\n"
309 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
310 "-exec command with args\n"
311 " everything past -exec is treated as the command to execute on each line of\n"
312 " stdin received. the line can be passed as an argument using {}.\n"
313 " {.} passes everything before the last dot in a line as an argument.\n"
314 " it is possible to use multiple substitutions inside a single argument,\n"
315 " but currently only of one type.\n"
317 return 1;
320 static int parse_args(int argc, char** argv) {
321 op_state op_b, *op = &op_b;
322 op_init(op, argc, argv);
323 char *op_temp;
324 if(argc == 1 || op_hasflag(op, SPL("-help")))
325 return syntax();
326 op_temp = op_get(op, SPL("threads"));
327 prog_state.numthreads = op_temp ? atoi(op_temp) : 1;
328 op_temp = op_get(op, SPL("statefile"));
329 prog_state.statefile = op_temp;
331 op_temp = op_get(op, SPL("skip"));
332 prog_state.skip = op_temp ? atoi(op_temp) : 0;
333 if(op_hasflag(op, SPL("resume"))) {
334 if(!prog_state.statefile) die("-resume needs -statefile\n");
335 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
336 stringptr* fc = stringptr_fromfile(prog_state.statefile);
337 prog_state.skip = atoi(fc->ptr);
341 prog_state.delayedflush = 0;
342 if(op_hasflag(op, SPL("delayedflush"))) {
343 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
344 prog_state.delayedflush = 1;
347 op_temp = op_get(op, SPL("delayedspinup"));
348 prog_state.delayedspinup_interval = op_temp ? atoi(op_temp) : 0;
350 prog_state.cmd_startarg = 0;
351 prog_state.subst_entries = NULL;
353 if(op_hasflag(op, SPL("exec"))) {
354 uint32_t subst_ent;
355 unsigned i, r = 0;
356 for(i = 1; i < (unsigned) argc; i++) {
357 if(str_equal(argv[i], "-exec")) {
358 r = i + 1;
359 break;
362 if(r && r < (unsigned) argc) {
363 prog_state.cmd_startarg = r;
366 // save entries which must be substituted, to save some cycles.
367 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
368 for(i = r; i < (unsigned) argc; i++) {
369 subst_ent = i - r;
370 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) sblist_add(prog_state.subst_entries, &subst_ent);
374 prog_state.buffered = 0;
375 if(op_hasflag(op, SPL("buffered"))) {
376 prog_state.buffered = 1;
379 prog_state.join_output = 0;
380 if(op_hasflag(op, SPL("joinoutput"))) {
381 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
382 prog_state.join_output = 1;
385 prog_state.limits = NULL;
386 op_temp = op_get(op, SPL("limits"));
387 if(op_temp) {
388 unsigned i;
389 SPDECLAREC(limits, op_temp);
390 stringptrlist* limit_list = stringptr_splitc(limits, ',');
391 stringptrlist* kv;
392 stringptr* key, *value;
393 limit_rec lim;
394 if(stringptrlist_getsize(limit_list)) {
395 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
396 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
397 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
398 if(stringptrlist_getsize(kv) != 2) continue;
399 key = stringptrlist_get(kv, 0);
400 value = stringptrlist_get(kv, 1);
401 if(EQ(key, SPL("mem")))
402 lim.limit = RLIMIT_AS;
403 else if(EQ(key, SPL("cpu")))
404 lim.limit = RLIMIT_CPU;
405 else if(EQ(key, SPL("stack")))
406 lim.limit = RLIMIT_STACK;
407 else if(EQ(key, SPL("fsize")))
408 lim.limit = RLIMIT_FSIZE;
409 else if(EQ(key, SPL("nofiles")))
410 lim.limit = RLIMIT_NOFILE;
411 else
412 die("unknown option passed to -limits");
414 if(getrlimit(lim.limit, &lim.rl) == -1) {
415 perror("getrlimit");
416 die("could not query rlimits");
418 lim.rl.rlim_cur = parse_human_number(value);
419 sblist_add(prog_state.limits, &lim);
420 stringptrlist_free(kv);
422 stringptrlist_free(limit_list);
425 return 0;
428 static void init_queue(void) {
429 unsigned i;
430 job_info ji;
432 ji.pid = -1;
433 memset(&ji.fa, 0, sizeof(ji.fa));
435 for(i = 0; i < prog_state.numthreads; i++) {
436 sblist_add(prog_state.job_infos, &ji);
440 static void write_statefile(uint64_t n, const char* tempfile) {
441 char numbuf[64];
442 stringptr num_b, *num = &num_b;
444 num_b.ptr = uint64ToString(n + 1, numbuf);
445 num_b.size = strlen(numbuf);
446 stringptr_tofile((char*) tempfile, num);
447 if(rename(tempfile, prog_state.statefile) == -1)
448 perror("rename");
451 // returns numbers of substitutions done, -1 on out of buffer.
452 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
453 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
454 size_t i;
455 int ret = 0;
456 for(i = 0; dest_size > 0 && i < source->size; ) {
457 if(stringptr_here(source, i, what)) {
458 if(dest_size < (ssize_t) whit->size) return -1;
459 memcpy(dest, whit->ptr, whit->size);
460 dest += whit->size;
461 dest_size -= whit->size;
462 ret++;
463 i += what->size;
464 } else {
465 *dest = source->ptr[i];
466 dest++;
467 dest_size--;
468 i++;
471 if(!dest_size) return -1;
472 *dest = 0;
473 return ret;
476 int main(int argc, char** argv) {
477 char inbuf[4096]; char* fgets_result;
478 stringptr line_b, *line = &line_b;
479 char* cmd_argv[4096];
480 char subst_buf[16][4096];
481 unsigned max_subst;
483 struct timeval reapTime;
485 uint64_t n = 0;
486 unsigned i;
487 unsigned spinup_counter = 0;
489 char tempdir_buf[256];
490 char temp_state[256];
492 srand(time(NULL));
494 if(argc > 4096) argc = 4096;
495 prog_state.threads_running = 0;
496 prog_state.free_slots_count = 0;
497 gettimestamp(&reapTime);
499 if(parse_args(argc, argv)) return 1;
501 if(prog_state.statefile)
502 ulz_snprintf(temp_state, sizeof(temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
504 prog_state.tempdir = NULL;
506 if(prog_state.buffered) {
507 prog_state.tempdir = tempdir_buf;
508 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
509 perror("mkdtemp");
510 die("could not create tempdir\n");
512 } else {
513 /* if the stdout/stderr fds are not in O_APPEND mode,
514 the dup()'s of the fds in posix_spawn can cause different
515 file positions, causing the different processes to overwrite each others output.
516 testcase:
517 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
519 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
520 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
523 if(prog_state.cmd_startarg) {
524 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
525 cmd_argv[i - prog_state.cmd_startarg] = argv[i];
527 cmd_argv[argc - prog_state.cmd_startarg] = NULL;
530 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
531 init_queue();
533 while((fgets_result = fgets(inbuf, sizeof(inbuf), stdin))) {
534 if(prog_state.skip)
535 prog_state.skip--;
536 else {
537 if(!prog_state.cmd_startarg)
538 printf(fgets_result);
539 else {
540 stringptr_fromchar(fgets_result, line);
541 stringptr_chomp(line);
543 max_subst = 0;
544 if(prog_state.subst_entries) {
545 uint32_t* index;
546 sblist_iter(prog_state.subst_entries, index) {
547 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
548 int ret;
549 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
550 if(ret == -1) {
551 too_long:
552 fprintf(stderr, "fatal: line too long for substitution: %s\n", line->ptr);
553 goto out;
554 } else if(!ret) {
555 char* lastdot = stringptr_rchr(line, '.');
556 stringptr tilLastDot = *line;
557 if(lastdot) tilLastDot.size = lastdot - line->ptr;
558 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
559 if(ret == -1) goto too_long;
561 if(ret) {
562 cmd_argv[*index] = subst_buf[max_subst];
563 max_subst++;
568 while(prog_state.free_slots_count == 0 || mspassed(&reapTime) > REAP_INTERVAL_MS) {
569 reapChilds();
570 gettimestamp(&reapTime);
571 if(!prog_state.free_slots_count) msleep(SLEEP_MS);
574 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
575 msleep(rand() % (prog_state.delayedspinup_interval + 1));
576 spinup_counter++;
579 launch_job(prog_state.free_slots[prog_state.free_slots_count-1], cmd_argv);
580 prog_state.free_slots_count--;
582 if(prog_state.statefile && (prog_state.delayedflush == 0 || prog_state.free_slots_count == 0)) {
583 write_statefile(n, temp_state);
587 n++;
590 out:
592 if(prog_state.delayedflush)
593 write_statefile(n - 1, temp_state);
595 while(prog_state.threads_running) {
596 reapChilds();
597 if(prog_state.threads_running) msleep(SLEEP_MS);
600 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
601 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
602 if(prog_state.limits) sblist_free(prog_state.limits);
604 if(prog_state.tempdir)
605 rmdir(prog_state.tempdir);
608 fflush(stdout);
609 fflush(stderr);
612 return 0;