some cleanups and add FIXME comment for issue #1
[rofl0r-jobflow.git] / jobflow.c
blobd01e49a47d181476b98c0f866f6a86092a767e45
1 /*
2 Copyright (C) 2012 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 /* FIXME
19 when using more than 1 process, and not the -buffered option, the output of some processes gets lost.
20 this happens regardless of dynamic/static linking, libc, and whether the target program fflushes before exit.
22 piping the output into cat > file instead, everything arrives.
23 linux bug ?
25 test:
26 fail:
27 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
28 should print 100, but does not always
30 success:
31 seq 100 | ./jobflow.out -threads=100 -exec echo {} | cat > test.tmp ; wc -l test.tmp
32 always prints 100
35 #undef _POSIX_C_SOURCE
36 #define _POSIX_C_SOURCE 200809L
37 #undef _XOPEN_SOURCE
38 #define _XOPEN_SOURCE 700
39 #undef _GNU_SOURCE
40 #define _GNU_SOURCE
42 #include "../lib/include/optparser.h"
43 #include "../lib/include/stringptr.h"
44 #include "../lib/include/stringptrlist.h"
45 #include "../lib/include/sblist.h"
46 #include "../lib/include/strlib.h"
47 #include "../lib/include/timelib.h"
48 #include "../lib/include/filelib.h"
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <unistd.h>
53 #include <stdint.h>
54 #include <stddef.h>
55 #include <errno.h>
56 #include <time.h>
58 /* defines the amount of milliseconds to sleep between each call to the reaper,
59 * once all free slots are exhausted */
60 #define SLEEP_MS 21
62 /* defines after how many milliseconds a reap of the running processes is obligatory. */
63 #define REAP_INTERVAL_MS 100
65 /* process handling */
67 #include <fcntl.h>
68 #include <spawn.h>
69 #include <sys/wait.h>
70 #include <sys/stat.h>
72 #include <sys/resource.h>
74 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
75 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
76 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
77 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
78 static int prlimit(int pid, ...) {
79 (void) pid;
80 fprintf(stderr, "prlimit() not implemented on this system\n");
81 errno = EINVAL;
82 return -1;
84 #endif
87 #include <sys/time.h>
89 typedef struct {
90 pid_t pid;
91 posix_spawn_file_actions_t fa;
92 } job_info;
94 typedef struct {
95 int limit;
96 struct rlimit rl;
97 } limit_rec;
99 /* defines how many slots our free_slots struct can take */
100 #define MAX_SLOTS 128
102 typedef struct {
103 unsigned numthreads;
104 unsigned threads_running;
105 char* statefile;
106 unsigned skip;
107 sblist* job_infos;
108 sblist* subst_entries;
109 sblist* limits;
110 unsigned cmd_startarg;
111 size_t free_slots[MAX_SLOTS];
112 unsigned free_slots_count;
113 char* tempdir;
114 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
115 the top value in ms can be supplied via a command line switch.
116 this option makes only sense if the interval is somewhat smaller than the
117 expected runtime of the average job.
118 this option is useful to not overload a network app due to hundreds of
119 parallel connection tries on startup.
121 int buffered:1; /* write stdout and stderr of each task into a file,
122 and print it to stdout once the process ends.
123 this prevents mixing up of the output of multiple tasks. */
124 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
125 this means faster program execution, but could also be imprecise if the number of
126 jobs is small or smaller than the available threadcount / MAX_SLOTS. */
127 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
128 } prog_state_s;
130 prog_state_s prog_state;
133 extern char** environ;
135 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
136 int ret = snprintf(buf, bufsize,
137 is_stderr ? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
138 prog_state.tempdir, (unsigned) jobindex);
139 return ret > 0 && (size_t) ret < bufsize;
142 void launch_job(size_t jobindex, char** argv) {
143 char stdout_filename_buf[256];
144 char stderr_filename_buf[256];
145 job_info* job = sblist_get(prog_state.job_infos, jobindex);
147 if(job->pid != -1) return;
149 if(prog_state.buffered) {
150 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
151 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
152 fprintf(stderr, "temp filename too long!\n");
153 return;
157 errno = posix_spawn_file_actions_init(&job->fa);
158 if(errno) goto spawn_error;
159 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
160 if(errno) goto spawn_error;
162 if(prog_state.buffered) {
163 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
164 if(errno) goto spawn_error;
165 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
166 if(errno) goto spawn_error;
169 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
170 if(errno) goto spawn_error;
172 if(prog_state.buffered) {
173 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
174 if(errno) goto spawn_error;
175 if(prog_state.join_output)
176 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
177 else
178 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
179 if(errno) goto spawn_error;
182 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
183 if(errno) {
184 spawn_error:
185 job->pid = -1;
186 perror("posix_spawn");
187 } else {
188 prog_state.threads_running++;
189 if(prog_state.limits) {
190 limit_rec* limit;
191 sblist_iter(prog_state.limits, limit) {
192 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
193 perror("prlimit");
199 static void releaseJobSlot(size_t job_id) {
200 if(prog_state.free_slots_count < MAX_SLOTS) {
201 prog_state.free_slots[prog_state.free_slots_count] = job_id;
202 prog_state.free_slots_count++;
206 static void dump_output(size_t job_id, int is_stderr) {
207 char out_filename_buf[256];
208 char buf[4096];
209 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
210 size_t nread;
212 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
214 dst = fopen(out_filename_buf, "r");
215 if(dst) {
216 while((nread = fread(buf, 1, sizeof(buf), dst))) {
217 fwrite(buf, 1, nread, out_stream);
218 if(nread < sizeof(buf)) break;
220 fclose(dst);
221 fflush(out_stream);
225 static void reapChilds(void) {
226 size_t i;
227 job_info* job;
228 int ret, retval;
230 prog_state.free_slots_count = 0;
232 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
233 job = sblist_get(prog_state.job_infos, i);
234 if(job->pid != -1) {
235 ret = waitpid(job->pid, &retval, WNOHANG);
236 if(ret != 0) {
237 // error or changed state.
238 if(ret == -1) {
239 perror("waitpid");
240 continue;
242 if(!retval) {
243 //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL);
244 } else {
245 //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL);
247 job->pid = -1;
248 posix_spawn_file_actions_destroy(&job->fa);
249 //job->passed = 0;
250 releaseJobSlot(i);
251 prog_state.threads_running--;
253 if(prog_state.buffered) {
254 dump_output(i, 0);
255 if(!prog_state.join_output)
256 dump_output(i, 1);
259 } else
260 releaseJobSlot(i);
265 __attribute__((noreturn))
266 static void die(const char* msg) {
267 fprintf(stderr, msg);
268 exit(1);
271 static long parse_human_number(stringptr* num) {
272 long ret = 0;
273 char buf[64];
274 if(num && num->size && num->size < sizeof(buf)) {
275 if(num->ptr[num->size -1] == 'G')
276 ret = 1024 * 1024 * 1024;
277 else if(num->ptr[num->size -1] == 'M')
278 ret = 1024 * 1024;
279 else if(num->ptr[num->size -1] == 'K')
280 ret = 1024;
281 if(ret) {
282 memcpy(buf, num->ptr, num->size);
283 buf[num->size] = 0;
284 return atol(buf) * ret;
286 return atol(num->ptr);
288 return ret;
291 static int syntax(void) {
292 puts(
293 "jobflow (C) rofl0r\n"
294 "------------------\n"
295 "this program is intended to be used as a recipient of another programs output\n"
296 "it launches processes to which the current line can be passed as an argument\n"
297 "using {} for substitution (as in find -exec).\n"
298 "\n"
299 "available options:\n\n"
300 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
301 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
302 "-exec ./mycommand {}\n"
303 "\n"
304 "-skip=XXX\n"
305 " XXX=number of entries to skip\n"
306 "-threads=XXX\n"
307 " XXX=number of parallel processes to spawn]\n"
308 "-resume\n"
309 " resume from last jobnumber stored in statefile\n"
310 "-statefile=XXX\n"
311 " XXX=filename\n"
312 " saves last launched jobnumber into a file\n"
313 "-delayedflush\n"
314 " only write to statefile whenever all processes are busy,\n"
315 " and at program end\n"
316 "-delayedspinup=XXX\n"
317 " XXX=maximum amount of milliseconds\n"
318 " ...to wait when spinning up a fresh set of processes\n"
319 " a random value between 0 and the chosen amount is used to delay initial\n"
320 " spinup.\n"
321 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
322 " activity on program startup\n"
323 "-buffered\n"
324 " store the stdout and stderr of launched processes into a temporary file\n"
325 " which will be printed after a process has finished.\n"
326 " this prevents mixing up of output of different processes.\n"
327 "-joinoutput\n"
328 " if -buffered, write both stdout and stderr into the same file.\n"
329 " this saves the chronological order of the output, and the combined output\n"
330 " will only be printed to stdout.\n"
331 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
332 " sets the rlimit of the new created processes.\n"
333 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
334 "-exec command with args\n"
335 " everything past -exec is treated as the command to execute on each line of\n"
336 " stdin received. the line can be passed as an argument using {}.\n"
337 " {.} passes everything before the last dot in a line as an argument.\n"
338 " it is possible to use multiple substitutions inside a single argument,\n"
339 " but currently only of one type.\n"
341 return 1;
344 static int parse_args(int argc, char** argv) {
345 op_state op_b, *op = &op_b;
346 op_init(op, argc, argv);
347 char *op_temp;
348 if(argc == 1 || op_hasflag(op, SPL("-help")))
349 return syntax();
350 op_temp = op_get(op, SPL("threads"));
351 prog_state.numthreads = op_temp ? atoi(op_temp) : 1;
352 op_temp = op_get(op, SPL("statefile"));
353 prog_state.statefile = op_temp;
355 op_temp = op_get(op, SPL("skip"));
356 prog_state.skip = op_temp ? atoi(op_temp) : 0;
357 if(op_hasflag(op, SPL("resume"))) {
358 if(!prog_state.statefile) die("-resume needs -statefile\n");
359 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
360 stringptr* fc = stringptr_fromfile(prog_state.statefile);
361 prog_state.skip = atoi(fc->ptr);
365 prog_state.delayedflush = 0;
366 if(op_hasflag(op, SPL("delayedflush"))) {
367 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
368 prog_state.delayedflush = 1;
371 op_temp = op_get(op, SPL("delayedspinup"));
372 prog_state.delayedspinup_interval = op_temp ? atoi(op_temp) : 0;
374 prog_state.cmd_startarg = 0;
375 prog_state.subst_entries = NULL;
377 if(op_hasflag(op, SPL("exec"))) {
378 uint32_t subst_ent;
379 unsigned i, r = 0;
380 for(i = 1; i < (unsigned) argc; i++) {
381 if(str_equal(argv[i], "-exec")) {
382 r = i + 1;
383 break;
386 if(r && r < (unsigned) argc) {
387 prog_state.cmd_startarg = r;
390 // save entries which must be substituted, to save some cycles.
391 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
392 for(i = r; i < (unsigned) argc; i++) {
393 subst_ent = i - r;
394 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) sblist_add(prog_state.subst_entries, &subst_ent);
398 prog_state.buffered = 0;
399 if(op_hasflag(op, SPL("buffered"))) {
400 prog_state.buffered = 1;
403 prog_state.join_output = 0;
404 if(op_hasflag(op, SPL("joinoutput"))) {
405 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
406 prog_state.join_output = 1;
409 prog_state.limits = NULL;
410 op_temp = op_get(op, SPL("limits"));
411 if(op_temp) {
412 unsigned i;
413 SPDECLAREC(limits, op_temp);
414 stringptrlist* limit_list = stringptr_splitc(limits, ',');
415 stringptrlist* kv;
416 stringptr* key, *value;
417 limit_rec lim;
418 if(stringptrlist_getsize(limit_list)) {
419 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
420 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
421 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
422 if(stringptrlist_getsize(kv) != 2) continue;
423 key = stringptrlist_get(kv, 0);
424 value = stringptrlist_get(kv, 1);
425 if(EQ(key, SPL("mem")))
426 lim.limit = RLIMIT_AS;
427 else if(EQ(key, SPL("cpu")))
428 lim.limit = RLIMIT_CPU;
429 else if(EQ(key, SPL("stack")))
430 lim.limit = RLIMIT_STACK;
431 else if(EQ(key, SPL("fsize")))
432 lim.limit = RLIMIT_FSIZE;
433 else if(EQ(key, SPL("nofiles")))
434 lim.limit = RLIMIT_NOFILE;
435 else
436 die("unknown option passed to -limits");
438 if(getrlimit(lim.limit, &lim.rl) == -1) {
439 perror("getrlimit");
440 die("could not query rlimits");
442 lim.rl.rlim_cur = parse_human_number(value);
443 sblist_add(prog_state.limits, &lim);
444 stringptrlist_free(kv);
446 stringptrlist_free(limit_list);
449 return 0;
452 static void init_queue(void) {
453 unsigned i;
454 job_info ji;
456 ji.pid = -1;
457 memset(&ji.fa, 0, sizeof(ji.fa));
459 for(i = 0; i < prog_state.numthreads; i++) {
460 sblist_add(prog_state.job_infos, &ji);
464 static void write_statefile(uint64_t n, const char* tempfile) {
465 char numbuf[64];
466 stringptr num_b, *num = &num_b;
468 num_b.ptr = uint64ToString(n + 1, numbuf);
469 num_b.size = strlen(numbuf);
470 stringptr_tofile((char*) tempfile, num);
471 if(rename(tempfile, prog_state.statefile) == -1)
472 perror("rename");
475 // returns numbers of substitutions done, -1 on out of buffer.
476 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
477 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
478 size_t i;
479 int ret = 0;
480 for(i = 0; dest_size > 0 && i < source->size; ) {
481 if(stringptr_here(source, i, what)) {
482 if(dest_size < (ssize_t) whit->size) return -1;
483 memcpy(dest, whit->ptr, whit->size);
484 dest += whit->size;
485 dest_size -= whit->size;
486 ret++;
487 i += what->size;
488 } else {
489 *dest = source->ptr[i];
490 dest++;
491 dest_size--;
492 i++;
495 if(!dest_size) return -1;
496 *dest = 0;
497 return ret;
500 int main(int argc, char** argv) {
501 char inbuf[4096]; char* fgets_result;
502 stringptr line_b, *line = &line_b;
503 char* cmd_argv[4096];
504 char subst_buf[16][4096];
505 unsigned max_subst;
507 struct timeval reapTime;
509 uint64_t n = 0;
510 unsigned i;
511 unsigned spinup_counter = 0;
513 char tempdir_buf[256];
514 char temp_state[256];
516 srand(time(NULL));
518 if(argc > 4096) argc = 4096;
519 prog_state.threads_running = 0;
520 prog_state.free_slots_count = 0;
521 gettimestamp(&reapTime);
523 if(parse_args(argc, argv)) return 1;
525 if(prog_state.statefile)
526 ulz_snprintf(temp_state, sizeof(temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
528 prog_state.tempdir = NULL;
530 if(prog_state.buffered) {
531 prog_state.tempdir = tempdir_buf;
532 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
533 perror("mkdtemp");
534 die("could not create tempdir\n");
538 if(prog_state.cmd_startarg) {
539 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
540 cmd_argv[i - prog_state.cmd_startarg] = argv[i];
542 cmd_argv[argc - prog_state.cmd_startarg] = NULL;
545 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
546 init_queue();
548 while((fgets_result = fgets(inbuf, sizeof(inbuf), stdin))) {
549 if(prog_state.skip)
550 prog_state.skip--;
551 else {
552 if(!prog_state.cmd_startarg)
553 printf(fgets_result);
554 else {
555 stringptr_fromchar(fgets_result, line);
556 stringptr_chomp(line);
558 max_subst = 0;
559 if(prog_state.subst_entries) {
560 uint32_t* index;
561 sblist_iter(prog_state.subst_entries, index) {
562 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
563 int ret;
564 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
565 if(ret == -1) {
566 too_long:
567 fprintf(stderr, "fatal: line too long for substitution: %s\n", line->ptr);
568 goto out;
569 } else if(!ret) {
570 char* lastdot = stringptr_rchr(line, '.');
571 stringptr tilLastDot = *line;
572 if(lastdot) tilLastDot.size = lastdot - line->ptr;
573 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
574 if(ret == -1) goto too_long;
576 if(ret) {
577 cmd_argv[*index] = subst_buf[max_subst];
578 max_subst++;
583 while(prog_state.free_slots_count == 0 || mspassed(&reapTime) > REAP_INTERVAL_MS) {
584 reapChilds();
585 gettimestamp(&reapTime);
586 if(!prog_state.free_slots_count) msleep(SLEEP_MS);
589 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
590 msleep(rand() % (prog_state.delayedspinup_interval + 1));
591 spinup_counter++;
594 launch_job(prog_state.free_slots[prog_state.free_slots_count-1], cmd_argv);
595 prog_state.free_slots_count--;
597 if(prog_state.statefile && (prog_state.delayedflush == 0 || prog_state.free_slots_count == 0)) {
598 write_statefile(n, temp_state);
602 n++;
605 out:
607 if(prog_state.delayedflush)
608 write_statefile(n - 1, temp_state);
610 while(prog_state.threads_running) {
611 reapChilds();
612 if(prog_state.threads_running) msleep(SLEEP_MS);
615 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
616 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
617 if(prog_state.limits) sblist_free(prog_state.limits);
619 if(prog_state.tempdir)
620 rmdir(prog_state.tempdir);
623 fflush(stdout);
624 fflush(stderr);
627 return 0;