fix typo in usage text
[rofl0r-jobflow.git] / jobflow.c
blob770f538d0e5f81f5dbe8be9b84f5c652bc6d0ca0
1 /*
2 Copyright (C) 2012,2014,2016 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.1.0"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/stringptrlist.h"
32 #include "../lib/include/sblist.h"
33 #include "../lib/include/strlib.h"
34 #include "../lib/include/timelib.h"
35 #include "../lib/include/filelib.h"
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdint.h>
41 #include <stddef.h>
42 #include <errno.h>
43 #include <time.h>
44 #include <assert.h>
46 /* process handling */
48 #include <fcntl.h>
49 #include <spawn.h>
50 #include <sys/wait.h>
51 #include <sys/stat.h>
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid, ...) {
60 (void) pid;
61 dprintf(2, "prlimit() not implemented on this system\n");
62 errno = EINVAL;
63 return -1;
65 #endif
68 #include <sys/time.h>
70 typedef struct {
71 pid_t pid;
72 posix_spawn_file_actions_t fa;
73 } job_info;
75 typedef struct {
76 int limit;
77 struct rlimit rl;
78 } limit_rec;
80 typedef struct {
81 unsigned numthreads;
82 unsigned threads_running;
83 char* statefile;
84 unsigned skip;
85 sblist* job_infos;
86 sblist* subst_entries;
87 sblist* limits;
88 unsigned cmd_startarg;
89 char* tempdir;
90 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
91 the top value in ms can be supplied via a command line switch.
92 this option makes only sense if the interval is somewhat smaller than the
93 expected runtime of the average job.
94 this option is useful to not overload a network app due to hundreds of
95 parallel connection tries on startup.
97 int buffered:1; /* write stdout and stderr of each task into a file,
98 and print it to stdout once the process ends.
99 this prevents mixing up of the output of multiple tasks. */
100 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
101 this means faster program execution, but could also be imprecise if the number of
102 jobs is small or smaller than the available threadcount. */
103 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
104 } prog_state_s;
106 prog_state_s prog_state;
109 extern char** environ;
111 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
112 int ret = snprintf(buf, bufsize,
113 is_stderr ? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
114 prog_state.tempdir, (unsigned) jobindex);
115 return ret > 0 && (size_t) ret < bufsize;
118 void launch_job(size_t jobindex, char** argv) {
119 char stdout_filename_buf[256];
120 char stderr_filename_buf[256];
121 job_info* job = sblist_get(prog_state.job_infos, jobindex);
123 if(job->pid != -1) return;
125 if(prog_state.buffered) {
126 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
127 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
128 dprintf(2, "temp filename too long!\n");
129 return;
133 errno = posix_spawn_file_actions_init(&job->fa);
134 if(errno) goto spawn_error;
135 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
136 if(errno) goto spawn_error;
138 if(prog_state.buffered) {
139 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
140 if(errno) goto spawn_error;
141 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
142 if(errno) goto spawn_error;
145 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
146 if(errno) goto spawn_error;
148 if(prog_state.buffered) {
149 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
150 if(errno) goto spawn_error;
151 if(prog_state.join_output)
152 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
153 else
154 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
155 if(errno) goto spawn_error;
158 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
159 if(errno) {
160 spawn_error:
161 job->pid = -1;
162 perror("posix_spawn");
163 } else {
164 prog_state.threads_running++;
165 if(prog_state.limits) {
166 limit_rec* limit;
167 sblist_iter(prog_state.limits, limit) {
168 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
169 perror("prlimit");
175 static void dump_output(size_t job_id, int is_stderr) {
176 char out_filename_buf[256];
177 char buf[4096];
178 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
179 size_t nread;
181 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
183 dst = fopen(out_filename_buf, "r");
184 if(dst) {
185 while((nread = fread(buf, 1, sizeof(buf), dst))) {
186 fwrite(buf, 1, nread, out_stream);
187 if(nread < sizeof(buf)) break;
189 fclose(dst);
190 fflush(out_stream);
194 /* wait till a child exits, reap it, and return its job index for slot reuse */
195 static size_t reap_child(void) {
196 size_t i;
197 job_info* job;
198 int ret, retval;
200 do ret = waitpid(-1, &retval, 0);
201 while(ret == -1 || !WIFEXITED(retval));
203 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
204 job = sblist_get(prog_state.job_infos, i);
205 if(job->pid == ret) {
206 job->pid = -1;
207 posix_spawn_file_actions_destroy(&job->fa);
208 prog_state.threads_running--;
209 if(prog_state.buffered) {
210 dump_output(i, 0);
211 if(!prog_state.join_output)
212 dump_output(i, 1);
214 return i;
217 assert(0);
218 return -1;
221 static size_t free_slots(void) {
222 return prog_state.numthreads - prog_state.threads_running;
225 static void add_job(char **argv) {
226 if(free_slots())
227 launch_job(prog_state.threads_running, argv);
228 else
229 launch_job(reap_child(), argv);
232 __attribute__((noreturn))
233 static void die(const char* msg) {
234 dprintf(2, msg);
235 exit(1);
238 static unsigned long parse_human_number(stringptr* num) {
239 unsigned long ret = 0;
240 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
241 const char* kmg = "KMG";
242 char* kmgind;
243 if(num && num->size) {
244 ret = atol(num->ptr);
245 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
246 ret *= mul[kmgind - kmg];
248 return ret;
251 static int syntax(void) {
252 dprintf(2,
253 "jobflow " VERSION " (C) rofl0r\n"
254 "------------------\n"
255 "this program is intended to be used as a recipient of another programs output\n"
256 "it launches processes to which the current line can be passed as an argument\n"
257 "using {} for substitution (as in find -exec).\n"
258 "\n"
259 "available options:\n\n"
260 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
261 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
262 "-exec ./mycommand {}\n"
263 "\n"
264 "-skip=XXX\n"
265 " XXX=number of entries to skip\n"
266 "-threads=XXX\n"
267 " XXX=number of parallel processes to spawn\n"
268 "-resume\n"
269 " resume from last jobnumber stored in statefile\n"
270 "-statefile=XXX\n"
271 " XXX=filename\n"
272 " saves last launched jobnumber into a file\n"
273 "-delayedflush\n"
274 " only write to statefile whenever all processes are busy,\n"
275 " and at program end\n"
276 "-delayedspinup=XXX\n"
277 " XXX=maximum amount of milliseconds\n"
278 " ...to wait when spinning up a fresh set of processes\n"
279 " a random value between 0 and the chosen amount is used to delay initial\n"
280 " spinup.\n"
281 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
282 " activity on program startup\n"
283 "-buffered\n"
284 " store the stdout and stderr of launched processes into a temporary file\n"
285 " which will be printed after a process has finished.\n"
286 " this prevents mixing up of output of different processes.\n"
287 "-joinoutput\n"
288 " if -buffered, write both stdout and stderr into the same file.\n"
289 " this saves the chronological order of the output, and the combined output\n"
290 " will only be printed to stdout.\n"
291 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
292 " sets the rlimit of the new created processes.\n"
293 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
294 "-exec command with args\n"
295 " everything past -exec is treated as the command to execute on each line of\n"
296 " stdin received. the line can be passed as an argument using {}.\n"
297 " {.} passes everything before the last dot in a line as an argument.\n"
298 " it is possible to use multiple substitutions inside a single argument,\n"
299 " but currently only of one type.\n"
300 "\n"
302 return 1;
305 static int parse_args(int argc, char** argv) {
306 op_state op_b, *op = &op_b;
307 op_init(op, argc, argv);
308 char *op_temp;
309 if(argc == 1 || op_hasflag(op, SPL("-help")))
310 return syntax();
312 op_temp = op_get(op, SPL("threads"));
313 int x = op_temp ? atoi(op_temp) : 1;
314 if(x <= 0) die("threadcount must be >= 1\n");
315 prog_state.numthreads = x;
317 op_temp = op_get(op, SPL("statefile"));
318 prog_state.statefile = op_temp;
320 op_temp = op_get(op, SPL("skip"));
321 prog_state.skip = op_temp ? atoi(op_temp) : 0;
322 if(op_hasflag(op, SPL("resume"))) {
323 if(!prog_state.statefile) die("-resume needs -statefile\n");
324 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
325 stringptr* fc = stringptr_fromfile(prog_state.statefile);
326 prog_state.skip = atoi(fc->ptr);
330 prog_state.delayedflush = 0;
331 if(op_hasflag(op, SPL("delayedflush"))) {
332 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
333 prog_state.delayedflush = 1;
336 op_temp = op_get(op, SPL("delayedspinup"));
337 prog_state.delayedspinup_interval = op_temp ? atoi(op_temp) : 0;
339 prog_state.cmd_startarg = 0;
340 prog_state.subst_entries = NULL;
342 if(op_hasflag(op, SPL("exec"))) {
343 uint32_t subst_ent;
344 unsigned i, r = 0;
345 for(i = 1; i < (unsigned) argc; i++) {
346 if(str_equal(argv[i], "-exec")) {
347 r = i + 1;
348 break;
351 if(r && r < (unsigned) argc) {
352 prog_state.cmd_startarg = r;
355 // save entries which must be substituted, to save some cycles.
356 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
357 for(i = r; i < (unsigned) argc; i++) {
358 subst_ent = i - r;
359 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) sblist_add(prog_state.subst_entries, &subst_ent);
363 prog_state.buffered = 0;
364 if(op_hasflag(op, SPL("buffered"))) {
365 prog_state.buffered = 1;
368 prog_state.join_output = 0;
369 if(op_hasflag(op, SPL("joinoutput"))) {
370 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
371 prog_state.join_output = 1;
374 prog_state.limits = NULL;
375 op_temp = op_get(op, SPL("limits"));
376 if(op_temp) {
377 unsigned i;
378 SPDECLAREC(limits, op_temp);
379 stringptrlist* limit_list = stringptr_splitc(limits, ',');
380 stringptrlist* kv;
381 stringptr* key, *value;
382 limit_rec lim;
383 if(stringptrlist_getsize(limit_list)) {
384 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
385 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
386 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
387 if(stringptrlist_getsize(kv) != 2) continue;
388 key = stringptrlist_get(kv, 0);
389 value = stringptrlist_get(kv, 1);
390 if(EQ(key, SPL("mem")))
391 lim.limit = RLIMIT_AS;
392 else if(EQ(key, SPL("cpu")))
393 lim.limit = RLIMIT_CPU;
394 else if(EQ(key, SPL("stack")))
395 lim.limit = RLIMIT_STACK;
396 else if(EQ(key, SPL("fsize")))
397 lim.limit = RLIMIT_FSIZE;
398 else if(EQ(key, SPL("nofiles")))
399 lim.limit = RLIMIT_NOFILE;
400 else
401 die("unknown option passed to -limits");
403 if(getrlimit(lim.limit, &lim.rl) == -1) {
404 perror("getrlimit");
405 die("could not query rlimits");
407 lim.rl.rlim_cur = parse_human_number(value);
408 sblist_add(prog_state.limits, &lim);
409 stringptrlist_free(kv);
411 stringptrlist_free(limit_list);
414 return 0;
417 static void init_queue(void) {
418 unsigned i;
419 job_info ji = {.pid = -1};
421 for(i = 0; i < prog_state.numthreads; i++)
422 sblist_add(prog_state.job_infos, &ji);
425 static void write_statefile(uint64_t n, const char* tempfile) {
426 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
427 if(fd != -1) {
428 dprintf(fd, "%llu\n", n + 1ULL);
429 close(fd);
430 if(rename(tempfile, prog_state.statefile) == -1)
431 perror("rename");
432 } else
433 perror("open");
436 // returns numbers of substitutions done, -1 on out of buffer.
437 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
438 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
439 size_t i;
440 int ret = 0;
441 for(i = 0; dest_size > 0 && i < source->size; ) {
442 if(stringptr_here(source, i, what)) {
443 if(dest_size < (ssize_t) whit->size) return -1;
444 memcpy(dest, whit->ptr, whit->size);
445 dest += whit->size;
446 dest_size -= whit->size;
447 ret++;
448 i += what->size;
449 } else {
450 *dest = source->ptr[i];
451 dest++;
452 dest_size--;
453 i++;
456 if(!dest_size) return -1;
457 *dest = 0;
458 return ret;
461 int main(int argc, char** argv) {
462 char inbuf[4096]; char* fgets_result;
463 stringptr line_b, *line = &line_b;
464 char* cmd_argv[4096];
465 char subst_buf[16][4096];
466 unsigned max_subst;
468 uint64_t lineno = 0;
469 unsigned i;
470 unsigned spinup_counter = 0;
472 char tempdir_buf[256];
473 char temp_state[256];
475 srand(time(NULL));
477 if(argc > 4096) argc = 4096;
479 prog_state.threads_running = 0;
481 if(parse_args(argc, argv)) return 1;
483 if(prog_state.statefile)
484 snprintf(temp_state, sizeof(temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
486 prog_state.tempdir = NULL;
488 if(prog_state.buffered) {
489 prog_state.tempdir = tempdir_buf;
490 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
491 perror("mkdtemp");
492 die("could not create tempdir\n");
494 } else {
495 /* if the stdout/stderr fds are not in O_APPEND mode,
496 the dup()'s of the fds in posix_spawn can cause different
497 file positions, causing the different processes to overwrite each others output.
498 testcase:
499 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
501 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
502 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
505 if(prog_state.cmd_startarg) {
506 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
507 cmd_argv[i - prog_state.cmd_startarg] = argv[i];
509 cmd_argv[argc - prog_state.cmd_startarg] = NULL;
512 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
513 init_queue();
515 for(;(fgets_result = fgets(inbuf, sizeof(inbuf), stdin));lineno++) {
516 if(prog_state.skip) {
517 prog_state.skip--;
518 continue;
520 if(!prog_state.cmd_startarg) {
521 dprintf(1, fgets_result);
522 continue;
524 stringptr_fromchar(fgets_result, line);
525 stringptr_chomp(line);
527 max_subst = 0;
528 if(prog_state.subst_entries) {
529 uint32_t* index;
530 sblist_iter(prog_state.subst_entries, index) {
531 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
532 int ret;
533 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
534 if(ret == -1) {
535 too_long:
536 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
537 goto out;
538 } else if(!ret) {
539 char* lastdot = stringptr_rchr(line, '.');
540 stringptr tilLastDot = *line;
541 if(lastdot) tilLastDot.size = lastdot - line->ptr;
542 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
543 if(ret == -1) goto too_long;
545 if(ret) {
546 cmd_argv[*index] = subst_buf[max_subst];
547 max_subst++;
553 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
554 msleep(rand() % (prog_state.delayedspinup_interval + 1));
555 spinup_counter++;
558 add_job(cmd_argv);
560 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
561 write_statefile(lineno, temp_state);
565 out:
567 if(prog_state.delayedflush)
568 write_statefile(lineno - 1, temp_state);
570 while(prog_state.threads_running) reap_child();
572 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
573 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
574 if(prog_state.limits) sblist_free(prog_state.limits);
576 if(prog_state.tempdir)
577 rmdir(prog_state.tempdir);
580 fflush(stdout);
581 fflush(stderr);
584 return 0;