bump version to 1.1.1
[rofl0r-jobflow.git] / jobflow.c
blobb1b731277fc0e707863060862629ad3b79ead991
1 /*
2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.1"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdint.h>
41 #include <stddef.h>
42 #include <errno.h>
43 #include <time.h>
44 #include <assert.h>
46 /* process handling */
48 #include <fcntl.h>
49 #include <spawn.h>
50 #include <sys/wait.h>
51 #include <sys/stat.h>
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid, ...) {
60 (void) pid;
61 dprintf(2, "prlimit() not implemented on this system\n");
62 errno = EINVAL;
63 return -1;
65 #endif
68 #include <sys/time.h>
70 typedef struct {
71 pid_t pid;
72 posix_spawn_file_actions_t fa;
73 } job_info;
75 typedef struct {
76 int limit;
77 struct rlimit rl;
78 } limit_rec;
80 typedef struct {
81 unsigned numthreads;
82 unsigned threads_running;
83 char* statefile;
84 unsigned long long skip;
85 sblist* job_infos;
86 sblist* subst_entries;
87 sblist* limits;
88 unsigned cmd_startarg;
89 char* tempdir;
90 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
91 the top value in ms can be supplied via a command line switch.
92 this option makes only sense if the interval is somewhat smaller than the
93 expected runtime of the average job.
94 this option is useful to not overload a network app due to hundreds of
95 parallel connection tries on startup.
97 int buffered:1; /* write stdout and stderr of each task into a file,
98 and print it to stdout once the process ends.
99 this prevents mixing up of the output of multiple tasks. */
100 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
101 this means faster program execution, but could also be imprecise if the number of
102 jobs is small or smaller than the available threadcount. */
103 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
104 } prog_state_s;
106 prog_state_s prog_state;
109 extern char** environ;
111 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
112 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
113 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
114 return ret > 0 && (size_t) ret < bufsize;
117 void launch_job(size_t jobindex, char** argv) {
118 char stdout_filename_buf[256];
119 char stderr_filename_buf[256];
120 job_info* job = sblist_get(prog_state.job_infos, jobindex);
122 if(job->pid != -1) return;
124 if(prog_state.buffered) {
125 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
126 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
127 dprintf(2, "temp filename too long!\n");
128 return;
132 errno = posix_spawn_file_actions_init(&job->fa);
133 if(errno) goto spawn_error;
134 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
135 if(errno) goto spawn_error;
137 if(prog_state.buffered) {
138 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
139 if(errno) goto spawn_error;
140 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
141 if(errno) goto spawn_error;
144 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
145 if(errno) goto spawn_error;
147 if(prog_state.buffered) {
148 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
149 if(errno) goto spawn_error;
150 if(prog_state.join_output)
151 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
152 else
153 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
154 if(errno) goto spawn_error;
157 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
158 if(errno) {
159 spawn_error:
160 job->pid = -1;
161 perror("posix_spawn");
162 } else {
163 prog_state.threads_running++;
164 if(prog_state.limits) {
165 limit_rec* limit;
166 sblist_iter(prog_state.limits, limit) {
167 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
168 perror("prlimit");
174 static void dump_output(size_t job_id, int is_stderr) {
175 char out_filename_buf[256];
176 char buf[4096];
177 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
178 size_t nread;
180 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
182 dst = fopen(out_filename_buf, "r");
183 if(dst) {
184 while((nread = fread(buf, 1, sizeof(buf), dst))) {
185 fwrite(buf, 1, nread, out_stream);
186 if(nread < sizeof(buf)) break;
188 fclose(dst);
189 fflush(out_stream);
193 /* wait till a child exits, reap it, and return its job index for slot reuse */
194 static size_t reap_child(void) {
195 size_t i;
196 job_info* job;
197 int ret, retval;
199 do ret = waitpid(-1, &retval, 0);
200 while(ret == -1 || !WIFEXITED(retval));
202 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
203 job = sblist_get(prog_state.job_infos, i);
204 if(job->pid == ret) {
205 job->pid = -1;
206 posix_spawn_file_actions_destroy(&job->fa);
207 prog_state.threads_running--;
208 if(prog_state.buffered) {
209 dump_output(i, 0);
210 if(!prog_state.join_output)
211 dump_output(i, 1);
213 return i;
216 assert(0);
217 return -1;
220 static size_t free_slots(void) {
221 return prog_state.numthreads - prog_state.threads_running;
224 static void add_job(char **argv) {
225 if(free_slots())
226 launch_job(prog_state.threads_running, argv);
227 else
228 launch_job(reap_child(), argv);
231 __attribute__((noreturn))
232 static void die(const char* msg) {
233 dprintf(2, msg);
234 exit(1);
237 static unsigned long parse_human_number(stringptr* num) {
238 unsigned long ret = 0;
239 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
240 const char* kmg = "KMG";
241 char* kmgind;
242 if(num && num->size) {
243 ret = atol(num->ptr);
244 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
245 ret *= mul[kmgind - kmg];
247 return ret;
250 static int syntax(void) {
251 dprintf(2,
252 "jobflow " VERSION " (C) rofl0r\n"
253 "------------------\n"
254 "this program is intended to be used as a recipient of another programs output\n"
255 "it launches processes to which the current line can be passed as an argument\n"
256 "using {} for substitution (as in find -exec).\n"
257 "\n"
258 "available options:\n\n"
259 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
260 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
261 "-exec ./mycommand {}\n"
262 "\n"
263 "-skip=XXX\n"
264 " XXX=number of entries to skip\n"
265 "-threads=XXX\n"
266 " XXX=number of parallel processes to spawn\n"
267 "-resume\n"
268 " resume from last jobnumber stored in statefile\n"
269 "-statefile=XXX\n"
270 " XXX=filename\n"
271 " saves last launched jobnumber into a file\n"
272 "-delayedflush\n"
273 " only write to statefile whenever all processes are busy,\n"
274 " and at program end\n"
275 "-delayedspinup=XXX\n"
276 " XXX=maximum amount of milliseconds\n"
277 " ...to wait when spinning up a fresh set of processes\n"
278 " a random value between 0 and the chosen amount is used to delay initial\n"
279 " spinup.\n"
280 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
281 " activity on program startup\n"
282 "-buffered\n"
283 " store the stdout and stderr of launched processes into a temporary file\n"
284 " which will be printed after a process has finished.\n"
285 " this prevents mixing up of output of different processes.\n"
286 "-joinoutput\n"
287 " if -buffered, write both stdout and stderr into the same file.\n"
288 " this saves the chronological order of the output, and the combined output\n"
289 " will only be printed to stdout.\n"
290 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
291 " sets the rlimit of the new created processes.\n"
292 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
293 "-exec command with args\n"
294 " everything past -exec is treated as the command to execute on each line of\n"
295 " stdin received. the line can be passed as an argument using {}.\n"
296 " {.} passes everything before the last dot in a line as an argument.\n"
297 " it is possible to use multiple substitutions inside a single argument,\n"
298 " but currently only of one type.\n"
299 "\n"
301 return 1;
304 #undef strtoll
305 #define strtoll(a,b,c) strtoint64(a, strlen(a))
306 static int parse_args(int argc, char** argv) {
307 op_state op_b, *op = &op_b;
308 op_init(op, argc, argv);
309 char *op_temp;
310 if(argc == 1 || op_hasflag(op, SPL("-help")))
311 return syntax();
313 op_temp = op_get(op, SPL("threads"));
314 long long x = op_temp ? strtoll(op_temp,0,10) : 1;
315 if(x <= 0) die("threadcount must be >= 1\n");
316 prog_state.numthreads = x;
318 op_temp = op_get(op, SPL("statefile"));
319 prog_state.statefile = op_temp;
321 op_temp = op_get(op, SPL("skip"));
322 prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
323 if(op_hasflag(op, SPL("resume"))) {
324 if(!prog_state.statefile) die("-resume needs -statefile\n");
325 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
326 FILE *f = fopen(prog_state.statefile, "r");
327 if(f) {
328 char nb[64];
329 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
330 fclose(f);
335 prog_state.delayedflush = 0;
336 if(op_hasflag(op, SPL("delayedflush"))) {
337 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
338 prog_state.delayedflush = 1;
341 op_temp = op_get(op, SPL("delayedspinup"));
342 prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0;
344 prog_state.cmd_startarg = 0;
345 prog_state.subst_entries = NULL;
347 if(op_hasflag(op, SPL("exec"))) {
348 uint32_t subst_ent;
349 unsigned i, r = 0;
350 for(i = 1; i < (unsigned) argc; i++) {
351 if(str_equal(argv[i], "-exec")) {
352 r = i + 1;
353 break;
356 if(r && r < (unsigned) argc) {
357 prog_state.cmd_startarg = r;
360 // save entries which must be substituted, to save some cycles.
361 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
362 for(i = r; i < (unsigned) argc; i++) {
363 subst_ent = i - r;
364 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) sblist_add(prog_state.subst_entries, &subst_ent);
368 prog_state.buffered = 0;
369 if(op_hasflag(op, SPL("buffered"))) {
370 prog_state.buffered = 1;
373 prog_state.join_output = 0;
374 if(op_hasflag(op, SPL("joinoutput"))) {
375 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
376 prog_state.join_output = 1;
379 prog_state.limits = NULL;
380 op_temp = op_get(op, SPL("limits"));
381 if(op_temp) {
382 unsigned i;
383 SPDECLAREC(limits, op_temp);
384 stringptrlist* limit_list = stringptr_splitc(limits, ',');
385 stringptrlist* kv;
386 stringptr* key, *value;
387 limit_rec lim;
388 if(stringptrlist_getsize(limit_list)) {
389 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
390 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
391 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
392 if(stringptrlist_getsize(kv) != 2) continue;
393 key = stringptrlist_get(kv, 0);
394 value = stringptrlist_get(kv, 1);
395 if(EQ(key, SPL("mem")))
396 lim.limit = RLIMIT_AS;
397 else if(EQ(key, SPL("cpu")))
398 lim.limit = RLIMIT_CPU;
399 else if(EQ(key, SPL("stack")))
400 lim.limit = RLIMIT_STACK;
401 else if(EQ(key, SPL("fsize")))
402 lim.limit = RLIMIT_FSIZE;
403 else if(EQ(key, SPL("nofiles")))
404 lim.limit = RLIMIT_NOFILE;
405 else
406 die("unknown option passed to -limits");
408 if(getrlimit(lim.limit, &lim.rl) == -1) {
409 perror("getrlimit");
410 die("could not query rlimits");
412 lim.rl.rlim_cur = parse_human_number(value);
413 sblist_add(prog_state.limits, &lim);
414 stringptrlist_free(kv);
416 stringptrlist_free(limit_list);
419 return 0;
422 static void init_queue(void) {
423 unsigned i;
424 job_info ji = {.pid = -1};
426 for(i = 0; i < prog_state.numthreads; i++)
427 sblist_add(prog_state.job_infos, &ji);
430 static void write_statefile(unsigned long long n, const char* tempfile) {
431 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
432 if(fd != -1) {
433 dprintf(fd, "%llu\n", n + 1ULL);
434 close(fd);
435 if(rename(tempfile, prog_state.statefile) == -1)
436 perror("rename");
437 } else
438 perror("open");
441 // returns numbers of substitutions done, -1 on out of buffer.
442 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
443 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
444 size_t i;
445 int ret = 0;
446 for(i = 0; dest_size > 0 && i < source->size; ) {
447 if(stringptr_here(source, i, what)) {
448 if(dest_size < (ssize_t) whit->size) return -1;
449 memcpy(dest, whit->ptr, whit->size);
450 dest += whit->size;
451 dest_size -= whit->size;
452 ret++;
453 i += what->size;
454 } else {
455 *dest = source->ptr[i];
456 dest++;
457 dest_size--;
458 i++;
461 if(!dest_size) return -1;
462 *dest = 0;
463 return ret;
466 int main(int argc, char** argv) {
467 char inbuf[4096]; char* fgets_result;
468 stringptr line_b, *line = &line_b;
469 char* cmd_argv[4096];
470 char subst_buf[16][4096];
471 unsigned max_subst;
473 unsigned long long lineno = 0;
474 unsigned i;
475 unsigned spinup_counter = 0;
477 char tempdir_buf[256];
478 char temp_state[256];
480 srand(time(NULL));
482 if(argc > 4096) argc = 4096;
484 prog_state.threads_running = 0;
486 if(parse_args(argc, argv)) return 1;
488 if(prog_state.statefile)
489 snprintf(temp_state, sizeof(temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
491 prog_state.tempdir = NULL;
493 if(prog_state.buffered) {
494 prog_state.tempdir = tempdir_buf;
495 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
496 perror("mkdtemp");
497 die("could not create tempdir\n");
499 } else {
500 /* if the stdout/stderr fds are not in O_APPEND mode,
501 the dup()'s of the fds in posix_spawn can cause different
502 file positions, causing the different processes to overwrite each others output.
503 testcase:
504 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
506 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
507 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
510 if(prog_state.cmd_startarg) {
511 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
512 cmd_argv[i - prog_state.cmd_startarg] = argv[i];
514 cmd_argv[argc - prog_state.cmd_startarg] = NULL;
517 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
518 init_queue();
520 for(;(fgets_result = fgets(inbuf, sizeof(inbuf), stdin));lineno++) {
521 if(prog_state.skip) {
522 prog_state.skip--;
523 continue;
525 if(!prog_state.cmd_startarg) {
526 dprintf(1, fgets_result);
527 continue;
529 stringptr_fromchar(fgets_result, line);
530 stringptr_chomp(line);
532 max_subst = 0;
533 if(prog_state.subst_entries) {
534 uint32_t* index;
535 sblist_iter(prog_state.subst_entries, index) {
536 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
537 int ret;
538 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
539 if(ret == -1) {
540 too_long:
541 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
542 goto out;
543 } else if(!ret) {
544 char* lastdot = stringptr_rchr(line, '.');
545 stringptr tilLastDot = *line;
546 if(lastdot) tilLastDot.size = lastdot - line->ptr;
547 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
548 if(ret == -1) goto too_long;
550 if(ret) {
551 cmd_argv[*index] = subst_buf[max_subst];
552 max_subst++;
558 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
559 msleep(rand() % (prog_state.delayedspinup_interval + 1));
560 spinup_counter++;
563 add_job(cmd_argv);
565 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
566 write_statefile(lineno, temp_state);
570 out:
572 if(prog_state.delayedflush)
573 write_statefile(lineno - 1, temp_state);
575 while(prog_state.threads_running) reap_child();
577 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
578 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
579 if(prog_state.limits) sblist_free(prog_state.limits);
581 if(prog_state.tempdir)
582 rmdir(prog_state.tempdir);
585 fflush(stdout);
586 fflush(stderr);
589 return 0;