make multiple substitutions possible
[rofl0r-jobflow.git] / jobflow.c
blobbc4d4b9e8c2f5b12f9dbb26c020392b2d1c7bff8
1 /*
2 Copyright (C) 2012 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #undef _POSIX_C_SOURCE
19 #define _POSIX_C_SOURCE 200809L
20 #undef _XOPEN_SOURCE
21 #define _XOPEN_SOURCE 700
22 #undef _GNU_SOURCE
23 #define _GNU_SOURCE
25 #include "../lib/include/optparser.h"
26 #include "../lib/include/stringptr.h"
27 #include "../lib/include/stringptrlist.h"
28 #include "../lib/include/sblist.h"
29 #include "../lib/include/strlib.h"
30 #include "../lib/include/timelib.h"
31 #include "../lib/include/filelib.h"
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <stdint.h>
37 #include <stddef.h>
38 #include <errno.h>
39 #include <time.h>
41 /* defines the amount of milliseconds to sleep between each call to the reaper,
42 * once all free slots are exhausted */
43 #define SLEEP_MS 21
45 /* defines after how many milliseconds a reap of the running processes is obligatory. */
46 #define REAP_INTERVAL_MS 100
48 /* process handling */
50 #include <fcntl.h>
51 #include <spawn.h>
52 #include <sys/wait.h>
53 #include <sys/stat.h>
55 #include <sys/resource.h>
57 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
58 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
59 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
60 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
61 #include <errno.h>
62 static int prlimit(int pid, ...) {
63 (void) pid;
64 fprintf(stderr, "prlimit() not implemented on this system\n");
65 errno = EINVAL;
66 return -1;
68 #endif
71 #include <sys/time.h>
73 typedef struct {
74 pid_t pid;
75 posix_spawn_file_actions_t fa;
76 } job_info;
78 typedef struct {
79 int limit;
80 struct rlimit rl;
81 } limit_rec;
83 /* defines how many slots our free_slots struct can take */
84 #define MAX_SLOTS 128
86 typedef struct {
87 unsigned numthreads;
88 unsigned threads_running;
89 char* statefile;
90 unsigned skip;
91 sblist* job_infos;
92 sblist* subst_entries;
93 sblist* limits;
94 unsigned cmd_startarg;
95 size_t free_slots[MAX_SLOTS];
96 unsigned free_slots_count;
97 char* tempdir;
98 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
99 the top value in ms can be supplied via a command line switch.
100 this option makes only sense if the interval is somewhat smaller than the
101 expected runtime of the average job.
102 this option is useful to not overload a network app due to hundreds of
103 parallel connection tries on startup.
105 int buffered:1; /* write stdout and stderr of each task into a file,
106 and print it to stdout once the process ends.
107 this prevents mixing up of the output of multiple tasks. */
108 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
109 this means faster program execution, but could also be imprecise if the number of
110 jobs is small or smaller than the available threadcount / MAX_SLOTS. */
111 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
112 } prog_state_s;
114 prog_state_s prog_state;
117 extern char** environ;
119 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
120 int ret = snprintf(buf, bufsize,
121 is_stderr ? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
122 prog_state.tempdir, (unsigned) jobindex);
123 return ret > 0 && (size_t) ret < bufsize;
126 void launch_job(size_t jobindex, char** argv) {
127 char stdout_filename_buf[256];
128 char stderr_filename_buf[256];
129 job_info* job = sblist_get(prog_state.job_infos, jobindex);
131 if(job->pid != -1) return;
133 if(prog_state.buffered) {
134 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
135 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
136 fprintf(stderr, "temp filename too long!\n");
137 return;
141 errno = posix_spawn_file_actions_init(&job->fa);
142 if(errno) goto spawn_error;
143 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
144 if(errno) goto spawn_error;
146 if(prog_state.buffered) {
147 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
148 if(errno) goto spawn_error;
149 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
150 if(errno) goto spawn_error;
153 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
154 if(errno) goto spawn_error;
156 if(prog_state.buffered) {
157 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
158 if(errno) goto spawn_error;
159 if(prog_state.join_output)
160 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
161 else
162 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
163 if(errno) goto spawn_error;
166 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
167 if(errno) {
168 spawn_error:
169 job->pid = -1;
170 perror("posix_spawn");
171 } else {
172 prog_state.threads_running++;
173 if(prog_state.limits) {
174 limit_rec* limit;
175 sblist_iter(prog_state.limits, limit) {
176 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
177 perror("prlimit");
183 static void addJobSlot(size_t job_id) {
184 if(prog_state.free_slots_count < MAX_SLOTS) {
185 prog_state.free_slots[prog_state.free_slots_count] = job_id;
186 prog_state.free_slots_count++;
190 static void dump_output(size_t job_id, int is_stderr) {
191 char out_filename_buf[256];
192 char buf[4096];
193 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
194 size_t nread;
196 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
198 dst = fopen(out_filename_buf, "r");
199 while(dst && (nread = fread(buf, 1, sizeof(buf), dst))) {
200 fwrite(buf, 1, nread, out_stream);
201 if(nread < sizeof(buf)) break;
203 if(dst)
204 fclose(dst);
206 fflush(out_stream);
209 /* reap childs and return pointer to a free "slot" or NULL */
210 static void reapChilds(void) {
211 size_t i;
212 job_info* job;
213 int ret, retval;
215 prog_state.free_slots_count = 0;
217 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
218 job = sblist_get(prog_state.job_infos, i);
219 if(job->pid != -1) {
220 ret = waitpid(job->pid, &retval, WNOHANG);
221 if(ret != 0) {
222 // error or changed state.
223 if(ret == -1) {
224 perror("waitpid");
225 continue;
227 if(!retval) {
228 //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL);
230 else {
231 //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL);
233 job->pid = -1;
234 posix_spawn_file_actions_destroy(&job->fa);
235 //job->passed = 0;
236 addJobSlot(i);
237 prog_state.threads_running--;
239 if(prog_state.buffered) {
240 dump_output(i, 0);
241 if(!prog_state.join_output)
242 dump_output(i, 1);
245 } else
246 addJobSlot(i);
251 __attribute__((noreturn))
252 static void die(const char* msg) {
253 fprintf(stderr, msg);
254 exit(1);
257 static long parse_human_number(stringptr* num) {
258 long ret = 0;
259 char buf[64];
260 if(num && num->size && num->size < sizeof(buf)) {
261 if(num->ptr[num->size -1] == 'G')
262 ret = 1024 * 1024 * 1024;
263 else if(num->ptr[num->size -1] == 'M')
264 ret = 1024 * 1024;
265 else if(num->ptr[num->size -1] == 'K')
266 ret = 1024;
267 if(ret) {
268 memcpy(buf, num->ptr, num->size);
269 buf[num->size] = 0;
270 return atol(buf) * ret;
272 return atol(num->ptr);
274 return ret;
277 static int syntax(void) {
278 puts(
279 "jobflow (C) rofl0r\n"
280 "------------------\n"
281 "this program is intended to be used as a recipient of another programs output\n"
282 "it launches processes to which the current line can be passed as an argument\n"
283 "using {} for substitution (as in find -exec).\n"
284 "\n"
285 "available options:\n\n"
286 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
287 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
288 "-exec ./mycommand {}\n"
289 "\n"
290 "-skip=XXX\n"
291 " XXX=number of entries to skip\n"
292 "-threads=XXX\n"
293 " XXX=number of parallel processes to spawn]\n"
294 "-resume\n"
295 " resume from last jobnumber stored in statefile\n"
296 "-statefile=XXX\n"
297 " XXX=filename\n"
298 " saves last launched jobnumber into a file\n"
299 "-delayedflush\n"
300 " only write to statefile whenever all processes are busy,\n"
301 " and at program end\n"
302 "-delayedspinup=XXX\n"
303 " XXX=maximum amount of milliseconds\n"
304 " ...to wait when spinning up a fresh set of processes\n"
305 " a random value between 0 and the chosen amount is used to delay initial\n"
306 " spinup.\n"
307 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
308 " activity on program startup\n"
309 "-buffered\n"
310 " store the stdout and stderr of launched processes into a temporary file\n"
311 " which will be printed after a process has finished.\n"
312 " this prevents mixing up of output of different processes.\n"
313 "-joinoutput\n"
314 " if -buffered, write both stdout and stderr into the same file.\n"
315 " this saves the chronological order of the output, and the combined output\n"
316 " will only be printed to stdout.\n"
317 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
318 " sets the rlimit of the new created processes.\n"
319 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
320 "-exec command with args\n"
321 " everything past -exec is treated as the command to execute on each line of\n"
322 " stdin received. the line can be passed as an argument using {}.\n"
323 " {.} passes everything before the last dot in a line as an argument.\n"
324 " it is possible to use multiple substitutions inside a single argument,\n"
325 " but currently only of one type.\n"
327 return 1;
330 static int parse_args(int argc, char** argv) {
331 op_state op_b, *op = &op_b;
332 op_init(op, argc, argv);
333 char *op_temp;
334 if(argc == 1 || op_hasflag(op, SPL("-help")))
335 return syntax();
336 op_temp = op_get(op, SPL("threads"));
337 prog_state.numthreads = op_temp ? atoi(op_temp) : 1;
338 op_temp = op_get(op, SPL("statefile"));
339 prog_state.statefile = op_temp;
341 op_temp = op_get(op, SPL("skip"));
342 prog_state.skip = op_temp ? atoi(op_temp) : 0;
343 if(op_hasflag(op, SPL("resume"))) {
344 if(!prog_state.statefile) die("-resume needs -statefile\n");
345 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
346 stringptr* fc = stringptr_fromfile(prog_state.statefile);
347 prog_state.skip = atoi(fc->ptr);
351 prog_state.delayedflush = 0;
352 if(op_hasflag(op, SPL("delayedflush"))) {
353 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
354 prog_state.delayedflush = 1;
357 op_temp = op_get(op, SPL("delayedspinup"));
358 prog_state.delayedspinup_interval = op_temp ? atoi(op_temp) : 0;
360 prog_state.cmd_startarg = 0;
361 prog_state.subst_entries = NULL;
363 if(op_hasflag(op, SPL("exec"))) {
364 uint32_t subst_ent;
365 unsigned i, r = 0;
366 for(i = 1; i < (unsigned) argc; i++) {
367 if(str_equal(argv[i], "-exec")) {
368 r = i + 1;
369 break;
372 if(r && r < (unsigned) argc) {
373 prog_state.cmd_startarg = r;
376 // save entries which must be substituted, to save some cycles.
377 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
378 for(i = r; i < (unsigned) argc; i++) {
379 subst_ent = i - r;
380 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) sblist_add(prog_state.subst_entries, &subst_ent);
384 prog_state.buffered = 0;
385 if(op_hasflag(op, SPL("buffered"))) {
386 prog_state.buffered = 1;
389 prog_state.join_output = 0;
390 if(op_hasflag(op, SPL("joinoutput"))) {
391 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
392 prog_state.join_output = 1;
395 prog_state.limits = NULL;
396 op_temp = op_get(op, SPL("limits"));
397 if(op_temp) {
398 unsigned i;
399 SPDECLAREC(limits, op_temp);
400 stringptrlist* limit_list = stringptr_splitc(limits, ',');
401 stringptrlist* kv;
402 stringptr* key, *value;
403 limit_rec lim;
404 if(stringptrlist_getsize(limit_list)) {
405 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
406 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
407 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
408 if(stringptrlist_getsize(kv) != 2) continue;
409 key = stringptrlist_get(kv, 0);
410 value = stringptrlist_get(kv, 1);
411 if(EQ(key, SPL("mem")))
412 lim.limit = RLIMIT_AS;
413 else if(EQ(key, SPL("cpu")))
414 lim.limit = RLIMIT_CPU;
415 else if(EQ(key, SPL("stack")))
416 lim.limit = RLIMIT_STACK;
417 else if(EQ(key, SPL("fsize")))
418 lim.limit = RLIMIT_FSIZE;
419 else if(EQ(key, SPL("nofiles")))
420 lim.limit = RLIMIT_NOFILE;
421 else
422 die("unknown option passed to -limits");
424 if(getrlimit(lim.limit, &lim.rl) == -1) {
425 perror("getrlimit");
426 die("could not query rlimits");
428 lim.rl.rlim_cur = parse_human_number(value);
429 sblist_add(prog_state.limits, &lim);
430 stringptrlist_free(kv);
432 stringptrlist_free(limit_list);
435 return 0;
438 static void init_queue(void) {
439 unsigned i;
440 job_info ji;
442 ji.pid = -1;
443 memset(&ji.fa, 0, sizeof(ji.fa));
445 for(i = 0; i < prog_state.numthreads; i++) {
446 sblist_add(prog_state.job_infos, &ji);
450 static void write_statefile(uint64_t n, const char* tempfile) {
451 char numbuf[64];
452 stringptr num_b, *num = &num_b;
454 num_b.ptr = uint64ToString(n + 1, numbuf);
455 num_b.size = strlen(numbuf);
456 stringptr_tofile((char*) tempfile, num);
457 if(rename(tempfile, prog_state.statefile) == -1)
458 perror("rename");
461 // returns numbers of substitutions done, -1 on out of buffer.
462 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
463 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
464 size_t i;
465 int ret = 0;
466 for(i = 0; dest_size > 0 && i < source->size; ) {
467 if(stringptr_here(source, i, what)) {
468 if(dest_size < (ssize_t) whit->size) return -1;
469 memcpy(dest, whit->ptr, whit->size);
470 dest += whit->size;
471 dest_size -= whit->size;
472 ret++;
473 i += what->size;
474 } else {
475 *dest = source->ptr[i];
476 dest++;
477 dest_size--;
478 i++;
481 if(!dest_size) return -1;
482 *dest = 0;
483 return ret;
486 int main(int argc, char** argv) {
487 char inbuf[4096]; char* fgets_result;
488 stringptr line_b, *line = &line_b;
489 char* cmd_argv[4096];
490 char subst_buf[16][4096];
491 unsigned max_subst;
493 struct timeval reapTime;
495 uint64_t n = 0;
496 unsigned i;
497 unsigned spinup_counter = 0;
499 char tempdir_buf[256];
500 char temp_state[256];
502 srand(time(NULL));
504 if(argc > 4096) argc = 4096;
505 prog_state.threads_running = 0;
506 prog_state.free_slots_count = 0;
507 gettimestamp(&reapTime);
509 if(parse_args(argc, argv)) return 1;
511 if(prog_state.statefile)
512 ulz_snprintf(temp_state, sizeof(temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
514 prog_state.tempdir = NULL;
516 if(prog_state.buffered) {
517 prog_state.tempdir = tempdir_buf;
518 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
519 perror("mkdtemp");
520 die("could not create tempdir\n");
524 if(prog_state.cmd_startarg) {
525 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
526 cmd_argv[i - prog_state.cmd_startarg] = argv[i];
528 cmd_argv[argc - prog_state.cmd_startarg] = NULL;
531 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
532 init_queue();
534 while((fgets_result = fgets(inbuf, sizeof(inbuf), stdin))) {
535 if(prog_state.skip)
536 prog_state.skip--;
537 else {
538 if(!prog_state.cmd_startarg)
539 printf(fgets_result);
540 else {
541 stringptr_fromchar(fgets_result, line);
542 stringptr_chomp(line);
544 max_subst = 0;
545 if(prog_state.subst_entries) {
546 uint32_t* index;
547 sblist_iter(prog_state.subst_entries, index) {
548 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
549 int ret;
550 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
551 if(ret == -1) {
552 too_long:
553 fprintf(stderr, "fatal: line too long for substitution: %s\n", line->ptr);
554 goto out;
555 } else if(!ret) {
556 char* lastdot = stringptr_rchr(line, '.');
557 stringptr tilLastDot = *line;
558 if(lastdot) tilLastDot.size = lastdot - line->ptr;
559 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
560 if(ret == -1) goto too_long;
562 if(ret) {
563 cmd_argv[*index] = subst_buf[max_subst];
564 max_subst++;
569 while(prog_state.free_slots_count == 0 || mspassed(&reapTime) > REAP_INTERVAL_MS) {
570 reapChilds();
571 gettimestamp(&reapTime);
572 if(!prog_state.free_slots_count) msleep(SLEEP_MS);
575 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
576 msleep(rand() % (prog_state.delayedspinup_interval + 1));
577 spinup_counter++;
580 launch_job(prog_state.free_slots[prog_state.free_slots_count-1], cmd_argv);
581 prog_state.free_slots_count--;
583 if(prog_state.statefile && (prog_state.delayedflush == 0 || prog_state.free_slots_count == 0)) {
584 write_statefile(n, temp_state);
588 n++;
591 out:
593 if(prog_state.delayedflush)
594 write_statefile(n - 1, temp_state);
596 while(prog_state.threads_running) {
597 reapChilds();
598 if(prog_state.threads_running) msleep(SLEEP_MS);
601 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
602 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
603 if(prog_state.limits) sblist_free(prog_state.limits);
605 if(prog_state.tempdir)
606 rmdir(prog_state.tempdir);
609 fflush(stdout);
610 fflush(stderr);
613 return 0;