simplify parse_human_number
[rofl0r-jobflow.git] / jobflow.c
blob9d316997bbb1efee84a2e91237c4449a19c5674c
1 /*
2 Copyright (C) 2012 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 /* FIXME
19 when using more than 1 process, and not the -buffered option, the output of some processes gets lost.
20 this happens regardless of dynamic/static linking, libc, and whether the target program fflushes before exit.
22 piping the output into cat > file instead, everything arrives.
23 linux bug ?
25 test:
26 fail:
27 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
28 should print 100, but does not always
30 success:
31 seq 100 | ./jobflow.out -threads=100 -exec echo {} | cat > test.tmp ; wc -l test.tmp
32 always prints 100
35 #undef _POSIX_C_SOURCE
36 #define _POSIX_C_SOURCE 200809L
37 #undef _XOPEN_SOURCE
38 #define _XOPEN_SOURCE 700
39 #undef _GNU_SOURCE
40 #define _GNU_SOURCE
42 #include "../lib/include/optparser.h"
43 #include "../lib/include/stringptr.h"
44 #include "../lib/include/stringptrlist.h"
45 #include "../lib/include/sblist.h"
46 #include "../lib/include/strlib.h"
47 #include "../lib/include/timelib.h"
48 #include "../lib/include/filelib.h"
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <unistd.h>
53 #include <stdint.h>
54 #include <stddef.h>
55 #include <errno.h>
56 #include <time.h>
58 /* defines the amount of milliseconds to sleep between each call to the reaper,
59 * once all free slots are exhausted */
60 #define SLEEP_MS 21
62 /* defines after how many milliseconds a reap of the running processes is obligatory. */
63 #define REAP_INTERVAL_MS 100
65 /* process handling */
67 #include <fcntl.h>
68 #include <spawn.h>
69 #include <sys/wait.h>
70 #include <sys/stat.h>
72 #include <sys/resource.h>
74 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
75 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
76 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
77 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
78 static int prlimit(int pid, ...) {
79 (void) pid;
80 fprintf(stderr, "prlimit() not implemented on this system\n");
81 errno = EINVAL;
82 return -1;
84 #endif
87 #include <sys/time.h>
89 typedef struct {
90 pid_t pid;
91 posix_spawn_file_actions_t fa;
92 } job_info;
94 typedef struct {
95 int limit;
96 struct rlimit rl;
97 } limit_rec;
99 /* defines how many slots our free_slots struct can take */
100 #define MAX_SLOTS 128
102 typedef struct {
103 unsigned numthreads;
104 unsigned threads_running;
105 char* statefile;
106 unsigned skip;
107 sblist* job_infos;
108 sblist* subst_entries;
109 sblist* limits;
110 unsigned cmd_startarg;
111 size_t free_slots[MAX_SLOTS];
112 unsigned free_slots_count;
113 char* tempdir;
114 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
115 the top value in ms can be supplied via a command line switch.
116 this option makes only sense if the interval is somewhat smaller than the
117 expected runtime of the average job.
118 this option is useful to not overload a network app due to hundreds of
119 parallel connection tries on startup.
121 int buffered:1; /* write stdout and stderr of each task into a file,
122 and print it to stdout once the process ends.
123 this prevents mixing up of the output of multiple tasks. */
124 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
125 this means faster program execution, but could also be imprecise if the number of
126 jobs is small or smaller than the available threadcount / MAX_SLOTS. */
127 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
128 } prog_state_s;
130 prog_state_s prog_state;
133 extern char** environ;
135 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
136 int ret = snprintf(buf, bufsize,
137 is_stderr ? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
138 prog_state.tempdir, (unsigned) jobindex);
139 return ret > 0 && (size_t) ret < bufsize;
142 void launch_job(size_t jobindex, char** argv) {
143 char stdout_filename_buf[256];
144 char stderr_filename_buf[256];
145 job_info* job = sblist_get(prog_state.job_infos, jobindex);
147 if(job->pid != -1) return;
149 if(prog_state.buffered) {
150 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
151 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
152 fprintf(stderr, "temp filename too long!\n");
153 return;
157 errno = posix_spawn_file_actions_init(&job->fa);
158 if(errno) goto spawn_error;
159 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
160 if(errno) goto spawn_error;
162 if(prog_state.buffered) {
163 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
164 if(errno) goto spawn_error;
165 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
166 if(errno) goto spawn_error;
169 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
170 if(errno) goto spawn_error;
172 if(prog_state.buffered) {
173 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
174 if(errno) goto spawn_error;
175 if(prog_state.join_output)
176 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
177 else
178 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
179 if(errno) goto spawn_error;
182 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
183 if(errno) {
184 spawn_error:
185 job->pid = -1;
186 perror("posix_spawn");
187 } else {
188 prog_state.threads_running++;
189 if(prog_state.limits) {
190 limit_rec* limit;
191 sblist_iter(prog_state.limits, limit) {
192 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
193 perror("prlimit");
199 static void releaseJobSlot(size_t job_id) {
200 if(prog_state.free_slots_count < MAX_SLOTS) {
201 prog_state.free_slots[prog_state.free_slots_count] = job_id;
202 prog_state.free_slots_count++;
206 static void dump_output(size_t job_id, int is_stderr) {
207 char out_filename_buf[256];
208 char buf[4096];
209 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
210 size_t nread;
212 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
214 dst = fopen(out_filename_buf, "r");
215 if(dst) {
216 while((nread = fread(buf, 1, sizeof(buf), dst))) {
217 fwrite(buf, 1, nread, out_stream);
218 if(nread < sizeof(buf)) break;
220 fclose(dst);
221 fflush(out_stream);
225 static void reapChilds(void) {
226 size_t i;
227 job_info* job;
228 int ret, retval;
230 prog_state.free_slots_count = 0;
232 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
233 job = sblist_get(prog_state.job_infos, i);
234 if(job->pid != -1) {
235 ret = waitpid(job->pid, &retval, WNOHANG);
236 if(ret != 0) {
237 // error or changed state.
238 if(ret == -1) {
239 perror("waitpid");
240 continue;
242 if(!retval) {
243 //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL);
244 } else {
245 //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL);
247 job->pid = -1;
248 posix_spawn_file_actions_destroy(&job->fa);
249 //job->passed = 0;
250 releaseJobSlot(i);
251 prog_state.threads_running--;
253 if(prog_state.buffered) {
254 dump_output(i, 0);
255 if(!prog_state.join_output)
256 dump_output(i, 1);
259 } else
260 releaseJobSlot(i);
265 __attribute__((noreturn))
266 static void die(const char* msg) {
267 fprintf(stderr, msg);
268 exit(1);
271 static unsigned long parse_human_number(stringptr* num) {
272 unsigned long ret = 0;
273 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
274 const char* kmg = "KMG";
275 char* kmgind;
276 if(num && num->size) {
277 ret = atol(num->ptr);
278 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
279 ret *= mul[kmgind - kmg];
281 return ret;
284 static int syntax(void) {
285 puts(
286 "jobflow (C) rofl0r\n"
287 "------------------\n"
288 "this program is intended to be used as a recipient of another programs output\n"
289 "it launches processes to which the current line can be passed as an argument\n"
290 "using {} for substitution (as in find -exec).\n"
291 "\n"
292 "available options:\n\n"
293 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
294 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
295 "-exec ./mycommand {}\n"
296 "\n"
297 "-skip=XXX\n"
298 " XXX=number of entries to skip\n"
299 "-threads=XXX\n"
300 " XXX=number of parallel processes to spawn]\n"
301 "-resume\n"
302 " resume from last jobnumber stored in statefile\n"
303 "-statefile=XXX\n"
304 " XXX=filename\n"
305 " saves last launched jobnumber into a file\n"
306 "-delayedflush\n"
307 " only write to statefile whenever all processes are busy,\n"
308 " and at program end\n"
309 "-delayedspinup=XXX\n"
310 " XXX=maximum amount of milliseconds\n"
311 " ...to wait when spinning up a fresh set of processes\n"
312 " a random value between 0 and the chosen amount is used to delay initial\n"
313 " spinup.\n"
314 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
315 " activity on program startup\n"
316 "-buffered\n"
317 " store the stdout and stderr of launched processes into a temporary file\n"
318 " which will be printed after a process has finished.\n"
319 " this prevents mixing up of output of different processes.\n"
320 "-joinoutput\n"
321 " if -buffered, write both stdout and stderr into the same file.\n"
322 " this saves the chronological order of the output, and the combined output\n"
323 " will only be printed to stdout.\n"
324 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
325 " sets the rlimit of the new created processes.\n"
326 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
327 "-exec command with args\n"
328 " everything past -exec is treated as the command to execute on each line of\n"
329 " stdin received. the line can be passed as an argument using {}.\n"
330 " {.} passes everything before the last dot in a line as an argument.\n"
331 " it is possible to use multiple substitutions inside a single argument,\n"
332 " but currently only of one type.\n"
334 return 1;
337 static int parse_args(int argc, char** argv) {
338 op_state op_b, *op = &op_b;
339 op_init(op, argc, argv);
340 char *op_temp;
341 if(argc == 1 || op_hasflag(op, SPL("-help")))
342 return syntax();
343 op_temp = op_get(op, SPL("threads"));
344 prog_state.numthreads = op_temp ? atoi(op_temp) : 1;
345 op_temp = op_get(op, SPL("statefile"));
346 prog_state.statefile = op_temp;
348 op_temp = op_get(op, SPL("skip"));
349 prog_state.skip = op_temp ? atoi(op_temp) : 0;
350 if(op_hasflag(op, SPL("resume"))) {
351 if(!prog_state.statefile) die("-resume needs -statefile\n");
352 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
353 stringptr* fc = stringptr_fromfile(prog_state.statefile);
354 prog_state.skip = atoi(fc->ptr);
358 prog_state.delayedflush = 0;
359 if(op_hasflag(op, SPL("delayedflush"))) {
360 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
361 prog_state.delayedflush = 1;
364 op_temp = op_get(op, SPL("delayedspinup"));
365 prog_state.delayedspinup_interval = op_temp ? atoi(op_temp) : 0;
367 prog_state.cmd_startarg = 0;
368 prog_state.subst_entries = NULL;
370 if(op_hasflag(op, SPL("exec"))) {
371 uint32_t subst_ent;
372 unsigned i, r = 0;
373 for(i = 1; i < (unsigned) argc; i++) {
374 if(str_equal(argv[i], "-exec")) {
375 r = i + 1;
376 break;
379 if(r && r < (unsigned) argc) {
380 prog_state.cmd_startarg = r;
383 // save entries which must be substituted, to save some cycles.
384 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
385 for(i = r; i < (unsigned) argc; i++) {
386 subst_ent = i - r;
387 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) sblist_add(prog_state.subst_entries, &subst_ent);
391 prog_state.buffered = 0;
392 if(op_hasflag(op, SPL("buffered"))) {
393 prog_state.buffered = 1;
396 prog_state.join_output = 0;
397 if(op_hasflag(op, SPL("joinoutput"))) {
398 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
399 prog_state.join_output = 1;
402 prog_state.limits = NULL;
403 op_temp = op_get(op, SPL("limits"));
404 if(op_temp) {
405 unsigned i;
406 SPDECLAREC(limits, op_temp);
407 stringptrlist* limit_list = stringptr_splitc(limits, ',');
408 stringptrlist* kv;
409 stringptr* key, *value;
410 limit_rec lim;
411 if(stringptrlist_getsize(limit_list)) {
412 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
413 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
414 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
415 if(stringptrlist_getsize(kv) != 2) continue;
416 key = stringptrlist_get(kv, 0);
417 value = stringptrlist_get(kv, 1);
418 if(EQ(key, SPL("mem")))
419 lim.limit = RLIMIT_AS;
420 else if(EQ(key, SPL("cpu")))
421 lim.limit = RLIMIT_CPU;
422 else if(EQ(key, SPL("stack")))
423 lim.limit = RLIMIT_STACK;
424 else if(EQ(key, SPL("fsize")))
425 lim.limit = RLIMIT_FSIZE;
426 else if(EQ(key, SPL("nofiles")))
427 lim.limit = RLIMIT_NOFILE;
428 else
429 die("unknown option passed to -limits");
431 if(getrlimit(lim.limit, &lim.rl) == -1) {
432 perror("getrlimit");
433 die("could not query rlimits");
435 lim.rl.rlim_cur = parse_human_number(value);
436 sblist_add(prog_state.limits, &lim);
437 stringptrlist_free(kv);
439 stringptrlist_free(limit_list);
442 return 0;
445 static void init_queue(void) {
446 unsigned i;
447 job_info ji;
449 ji.pid = -1;
450 memset(&ji.fa, 0, sizeof(ji.fa));
452 for(i = 0; i < prog_state.numthreads; i++) {
453 sblist_add(prog_state.job_infos, &ji);
457 static void write_statefile(uint64_t n, const char* tempfile) {
458 char numbuf[64];
459 stringptr num_b, *num = &num_b;
461 num_b.ptr = uint64ToString(n + 1, numbuf);
462 num_b.size = strlen(numbuf);
463 stringptr_tofile((char*) tempfile, num);
464 if(rename(tempfile, prog_state.statefile) == -1)
465 perror("rename");
468 // returns numbers of substitutions done, -1 on out of buffer.
469 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
470 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
471 size_t i;
472 int ret = 0;
473 for(i = 0; dest_size > 0 && i < source->size; ) {
474 if(stringptr_here(source, i, what)) {
475 if(dest_size < (ssize_t) whit->size) return -1;
476 memcpy(dest, whit->ptr, whit->size);
477 dest += whit->size;
478 dest_size -= whit->size;
479 ret++;
480 i += what->size;
481 } else {
482 *dest = source->ptr[i];
483 dest++;
484 dest_size--;
485 i++;
488 if(!dest_size) return -1;
489 *dest = 0;
490 return ret;
493 int main(int argc, char** argv) {
494 char inbuf[4096]; char* fgets_result;
495 stringptr line_b, *line = &line_b;
496 char* cmd_argv[4096];
497 char subst_buf[16][4096];
498 unsigned max_subst;
500 struct timeval reapTime;
502 uint64_t n = 0;
503 unsigned i;
504 unsigned spinup_counter = 0;
506 char tempdir_buf[256];
507 char temp_state[256];
509 srand(time(NULL));
511 if(argc > 4096) argc = 4096;
512 prog_state.threads_running = 0;
513 prog_state.free_slots_count = 0;
514 gettimestamp(&reapTime);
516 if(parse_args(argc, argv)) return 1;
518 if(prog_state.statefile)
519 ulz_snprintf(temp_state, sizeof(temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
521 prog_state.tempdir = NULL;
523 if(prog_state.buffered) {
524 prog_state.tempdir = tempdir_buf;
525 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
526 perror("mkdtemp");
527 die("could not create tempdir\n");
531 if(prog_state.cmd_startarg) {
532 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
533 cmd_argv[i - prog_state.cmd_startarg] = argv[i];
535 cmd_argv[argc - prog_state.cmd_startarg] = NULL;
538 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
539 init_queue();
541 while((fgets_result = fgets(inbuf, sizeof(inbuf), stdin))) {
542 if(prog_state.skip)
543 prog_state.skip--;
544 else {
545 if(!prog_state.cmd_startarg)
546 printf(fgets_result);
547 else {
548 stringptr_fromchar(fgets_result, line);
549 stringptr_chomp(line);
551 max_subst = 0;
552 if(prog_state.subst_entries) {
553 uint32_t* index;
554 sblist_iter(prog_state.subst_entries, index) {
555 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
556 int ret;
557 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
558 if(ret == -1) {
559 too_long:
560 fprintf(stderr, "fatal: line too long for substitution: %s\n", line->ptr);
561 goto out;
562 } else if(!ret) {
563 char* lastdot = stringptr_rchr(line, '.');
564 stringptr tilLastDot = *line;
565 if(lastdot) tilLastDot.size = lastdot - line->ptr;
566 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
567 if(ret == -1) goto too_long;
569 if(ret) {
570 cmd_argv[*index] = subst_buf[max_subst];
571 max_subst++;
576 while(prog_state.free_slots_count == 0 || mspassed(&reapTime) > REAP_INTERVAL_MS) {
577 reapChilds();
578 gettimestamp(&reapTime);
579 if(!prog_state.free_slots_count) msleep(SLEEP_MS);
582 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
583 msleep(rand() % (prog_state.delayedspinup_interval + 1));
584 spinup_counter++;
587 launch_job(prog_state.free_slots[prog_state.free_slots_count-1], cmd_argv);
588 prog_state.free_slots_count--;
590 if(prog_state.statefile && (prog_state.delayedflush == 0 || prog_state.free_slots_count == 0)) {
591 write_statefile(n, temp_state);
595 n++;
598 out:
600 if(prog_state.delayedflush)
601 write_statefile(n - 1, temp_state);
603 while(prog_state.threads_running) {
604 reapChilds();
605 if(prog_state.threads_running) msleep(SLEEP_MS);
608 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
609 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
610 if(prog_state.limits) sblist_free(prog_state.limits);
612 if(prog_state.tempdir)
613 rmdir(prog_state.tempdir);
616 fflush(stdout);
617 fflush(stderr);
620 return 0;