remove confusing usage of slot list
[rofl0r-jobflow.git] / jobflow.c
blob031594b07746ad1de0ccb1c76abf6688ccc24540
1 /*
2 Copyright (C) 2012 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #undef _POSIX_C_SOURCE
19 #define _POSIX_C_SOURCE 200809L
20 #undef _XOPEN_SOURCE
21 #define _XOPEN_SOURCE 700
22 #undef _GNU_SOURCE
23 #define _GNU_SOURCE
25 #include "../lib/include/optparser.h"
26 #include "../lib/include/stringptr.h"
27 #include "../lib/include/stringptrlist.h"
28 #include "../lib/include/sblist.h"
29 #include "../lib/include/strlib.h"
30 #include "../lib/include/timelib.h"
31 #include "../lib/include/filelib.h"
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <stdint.h>
37 #include <stddef.h>
38 #include <errno.h>
39 #include <time.h>
40 #include <assert.h>
42 /* process handling */
44 #include <fcntl.h>
45 #include <spawn.h>
46 #include <sys/wait.h>
47 #include <sys/stat.h>
49 #include <sys/resource.h>
51 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
52 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
53 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
54 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
55 static int prlimit(int pid, ...) {
56 (void) pid;
57 fprintf(stderr, "prlimit() not implemented on this system\n");
58 errno = EINVAL;
59 return -1;
61 #endif
64 #include <sys/time.h>
66 typedef struct {
67 pid_t pid;
68 posix_spawn_file_actions_t fa;
69 } job_info;
71 typedef struct {
72 int limit;
73 struct rlimit rl;
74 } limit_rec;
76 typedef struct {
77 int numthreads;
78 unsigned threads_running;
79 char* statefile;
80 unsigned skip;
81 sblist* job_infos;
82 sblist* subst_entries;
83 sblist* limits;
84 unsigned cmd_startarg;
85 char* tempdir;
86 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
87 the top value in ms can be supplied via a command line switch.
88 this option makes only sense if the interval is somewhat smaller than the
89 expected runtime of the average job.
90 this option is useful to not overload a network app due to hundreds of
91 parallel connection tries on startup.
93 int buffered:1; /* write stdout and stderr of each task into a file,
94 and print it to stdout once the process ends.
95 this prevents mixing up of the output of multiple tasks. */
96 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
97 this means faster program execution, but could also be imprecise if the number of
98 jobs is small or smaller than the available threadcount. */
99 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
100 } prog_state_s;
102 prog_state_s prog_state;
105 extern char** environ;
107 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
108 int ret = snprintf(buf, bufsize,
109 is_stderr ? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
110 prog_state.tempdir, (unsigned) jobindex);
111 return ret > 0 && (size_t) ret < bufsize;
114 void launch_job(size_t jobindex, char** argv) {
115 char stdout_filename_buf[256];
116 char stderr_filename_buf[256];
117 job_info* job = sblist_get(prog_state.job_infos, jobindex);
119 if(job->pid != -1) return;
121 if(prog_state.buffered) {
122 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
123 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
124 fprintf(stderr, "temp filename too long!\n");
125 return;
129 errno = posix_spawn_file_actions_init(&job->fa);
130 if(errno) goto spawn_error;
131 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
132 if(errno) goto spawn_error;
134 if(prog_state.buffered) {
135 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
136 if(errno) goto spawn_error;
137 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
138 if(errno) goto spawn_error;
141 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
142 if(errno) goto spawn_error;
144 if(prog_state.buffered) {
145 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
146 if(errno) goto spawn_error;
147 if(prog_state.join_output)
148 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
149 else
150 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
151 if(errno) goto spawn_error;
154 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
155 if(errno) {
156 spawn_error:
157 job->pid = -1;
158 perror("posix_spawn");
159 } else {
160 prog_state.threads_running++;
161 if(prog_state.limits) {
162 limit_rec* limit;
163 sblist_iter(prog_state.limits, limit) {
164 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
165 perror("prlimit");
171 static void dump_output(size_t job_id, int is_stderr) {
172 char out_filename_buf[256];
173 char buf[4096];
174 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
175 size_t nread;
177 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
179 dst = fopen(out_filename_buf, "r");
180 if(dst) {
181 while((nread = fread(buf, 1, sizeof(buf), dst))) {
182 fwrite(buf, 1, nread, out_stream);
183 if(nread < sizeof(buf)) break;
185 fclose(dst);
186 fflush(out_stream);
190 /* wait till a child exits, reap it, and return its job index for slot reuse */
191 static size_t reap_child(void) {
192 size_t i;
193 job_info* job;
194 int ret, retval;
196 do ret = waitpid(-1, &retval, 0);
197 while(ret == -1 || !WIFEXITED(retval));
199 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
200 job = sblist_get(prog_state.job_infos, i);
201 if(job->pid == ret) {
202 job->pid = -1;
203 posix_spawn_file_actions_destroy(&job->fa);
204 prog_state.threads_running--;
205 if(prog_state.buffered) {
206 dump_output(i, 0);
207 if(!prog_state.join_output)
208 dump_output(i, 1);
210 return i;
213 assert(0);
214 return -1;
217 static size_t free_slots(void) {
218 return prog_state.numthreads - prog_state.threads_running;
221 static void add_job(char **argv) {
222 if(free_slots())
223 launch_job(prog_state.threads_running, argv);
224 else
225 launch_job(reap_child(), argv);
228 __attribute__((noreturn))
229 static void die(const char* msg) {
230 fprintf(stderr, msg);
231 exit(1);
234 static unsigned long parse_human_number(stringptr* num) {
235 unsigned long ret = 0;
236 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
237 const char* kmg = "KMG";
238 char* kmgind;
239 if(num && num->size) {
240 ret = atol(num->ptr);
241 if((kmgind = strchr(kmg, num->ptr[num->size -1])))
242 ret *= mul[kmgind - kmg];
244 return ret;
247 static int syntax(void) {
248 puts(
249 "jobflow (C) rofl0r\n"
250 "------------------\n"
251 "this program is intended to be used as a recipient of another programs output\n"
252 "it launches processes to which the current line can be passed as an argument\n"
253 "using {} for substitution (as in find -exec).\n"
254 "\n"
255 "available options:\n\n"
256 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
257 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
258 "-exec ./mycommand {}\n"
259 "\n"
260 "-skip=XXX\n"
261 " XXX=number of entries to skip\n"
262 "-threads=XXX\n"
263 " XXX=number of parallel processes to spawn]\n"
264 "-resume\n"
265 " resume from last jobnumber stored in statefile\n"
266 "-statefile=XXX\n"
267 " XXX=filename\n"
268 " saves last launched jobnumber into a file\n"
269 "-delayedflush\n"
270 " only write to statefile whenever all processes are busy,\n"
271 " and at program end\n"
272 "-delayedspinup=XXX\n"
273 " XXX=maximum amount of milliseconds\n"
274 " ...to wait when spinning up a fresh set of processes\n"
275 " a random value between 0 and the chosen amount is used to delay initial\n"
276 " spinup.\n"
277 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
278 " activity on program startup\n"
279 "-buffered\n"
280 " store the stdout and stderr of launched processes into a temporary file\n"
281 " which will be printed after a process has finished.\n"
282 " this prevents mixing up of output of different processes.\n"
283 "-joinoutput\n"
284 " if -buffered, write both stdout and stderr into the same file.\n"
285 " this saves the chronological order of the output, and the combined output\n"
286 " will only be printed to stdout.\n"
287 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
288 " sets the rlimit of the new created processes.\n"
289 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
290 "-exec command with args\n"
291 " everything past -exec is treated as the command to execute on each line of\n"
292 " stdin received. the line can be passed as an argument using {}.\n"
293 " {.} passes everything before the last dot in a line as an argument.\n"
294 " it is possible to use multiple substitutions inside a single argument,\n"
295 " but currently only of one type.\n"
297 return 1;
300 static int parse_args(int argc, char** argv) {
301 op_state op_b, *op = &op_b;
302 op_init(op, argc, argv);
303 char *op_temp;
304 if(argc == 1 || op_hasflag(op, SPL("-help")))
305 return syntax();
306 op_temp = op_get(op, SPL("threads"));
307 prog_state.numthreads = op_temp ? atoi(op_temp) : 1;
308 if(prog_state.numthreads <= 0) die("threadcount must be >= 1\n");
309 op_temp = op_get(op, SPL("statefile"));
310 prog_state.statefile = op_temp;
312 op_temp = op_get(op, SPL("skip"));
313 prog_state.skip = op_temp ? atoi(op_temp) : 0;
314 if(op_hasflag(op, SPL("resume"))) {
315 if(!prog_state.statefile) die("-resume needs -statefile\n");
316 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
317 stringptr* fc = stringptr_fromfile(prog_state.statefile);
318 prog_state.skip = atoi(fc->ptr);
322 prog_state.delayedflush = 0;
323 if(op_hasflag(op, SPL("delayedflush"))) {
324 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
325 prog_state.delayedflush = 1;
328 op_temp = op_get(op, SPL("delayedspinup"));
329 prog_state.delayedspinup_interval = op_temp ? atoi(op_temp) : 0;
331 prog_state.cmd_startarg = 0;
332 prog_state.subst_entries = NULL;
334 if(op_hasflag(op, SPL("exec"))) {
335 uint32_t subst_ent;
336 unsigned i, r = 0;
337 for(i = 1; i < (unsigned) argc; i++) {
338 if(str_equal(argv[i], "-exec")) {
339 r = i + 1;
340 break;
343 if(r && r < (unsigned) argc) {
344 prog_state.cmd_startarg = r;
347 // save entries which must be substituted, to save some cycles.
348 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
349 for(i = r; i < (unsigned) argc; i++) {
350 subst_ent = i - r;
351 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) sblist_add(prog_state.subst_entries, &subst_ent);
355 prog_state.buffered = 0;
356 if(op_hasflag(op, SPL("buffered"))) {
357 prog_state.buffered = 1;
360 prog_state.join_output = 0;
361 if(op_hasflag(op, SPL("joinoutput"))) {
362 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
363 prog_state.join_output = 1;
366 prog_state.limits = NULL;
367 op_temp = op_get(op, SPL("limits"));
368 if(op_temp) {
369 unsigned i;
370 SPDECLAREC(limits, op_temp);
371 stringptrlist* limit_list = stringptr_splitc(limits, ',');
372 stringptrlist* kv;
373 stringptr* key, *value;
374 limit_rec lim;
375 if(stringptrlist_getsize(limit_list)) {
376 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
377 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
378 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
379 if(stringptrlist_getsize(kv) != 2) continue;
380 key = stringptrlist_get(kv, 0);
381 value = stringptrlist_get(kv, 1);
382 if(EQ(key, SPL("mem")))
383 lim.limit = RLIMIT_AS;
384 else if(EQ(key, SPL("cpu")))
385 lim.limit = RLIMIT_CPU;
386 else if(EQ(key, SPL("stack")))
387 lim.limit = RLIMIT_STACK;
388 else if(EQ(key, SPL("fsize")))
389 lim.limit = RLIMIT_FSIZE;
390 else if(EQ(key, SPL("nofiles")))
391 lim.limit = RLIMIT_NOFILE;
392 else
393 die("unknown option passed to -limits");
395 if(getrlimit(lim.limit, &lim.rl) == -1) {
396 perror("getrlimit");
397 die("could not query rlimits");
399 lim.rl.rlim_cur = parse_human_number(value);
400 sblist_add(prog_state.limits, &lim);
401 stringptrlist_free(kv);
403 stringptrlist_free(limit_list);
406 return 0;
409 static void init_queue(void) {
410 int i;
411 job_info ji = {.pid = -1};
413 for(i = 0; i < prog_state.numthreads; i++)
414 sblist_add(prog_state.job_infos, &ji);
417 static void write_statefile(uint64_t n, const char* tempfile) {
418 char numbuf[64];
419 stringptr num_b, *num = &num_b;
421 num_b.ptr = uint64ToString(n + 1, numbuf);
422 num_b.size = strlen(numbuf);
423 stringptr_tofile((char*) tempfile, num);
424 if(rename(tempfile, prog_state.statefile) == -1)
425 perror("rename");
428 // returns numbers of substitutions done, -1 on out of buffer.
429 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
430 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
431 size_t i;
432 int ret = 0;
433 for(i = 0; dest_size > 0 && i < source->size; ) {
434 if(stringptr_here(source, i, what)) {
435 if(dest_size < (ssize_t) whit->size) return -1;
436 memcpy(dest, whit->ptr, whit->size);
437 dest += whit->size;
438 dest_size -= whit->size;
439 ret++;
440 i += what->size;
441 } else {
442 *dest = source->ptr[i];
443 dest++;
444 dest_size--;
445 i++;
448 if(!dest_size) return -1;
449 *dest = 0;
450 return ret;
453 int main(int argc, char** argv) {
454 char inbuf[4096]; char* fgets_result;
455 stringptr line_b, *line = &line_b;
456 char* cmd_argv[4096];
457 char subst_buf[16][4096];
458 unsigned max_subst;
460 uint64_t n = 0;
461 unsigned i;
462 unsigned spinup_counter = 0;
464 char tempdir_buf[256];
465 char temp_state[256];
467 srand(time(NULL));
469 if(argc > 4096) argc = 4096;
471 prog_state.threads_running = 0;
473 if(parse_args(argc, argv)) return 1;
475 if(prog_state.statefile)
476 ulz_snprintf(temp_state, sizeof(temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
478 prog_state.tempdir = NULL;
480 if(prog_state.buffered) {
481 prog_state.tempdir = tempdir_buf;
482 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
483 perror("mkdtemp");
484 die("could not create tempdir\n");
486 } else {
487 /* if the stdout/stderr fds are not in O_APPEND mode,
488 the dup()'s of the fds in posix_spawn can cause different
489 file positions, causing the different processes to overwrite each others output.
490 testcase:
491 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
493 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
494 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
497 if(prog_state.cmd_startarg) {
498 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
499 cmd_argv[i - prog_state.cmd_startarg] = argv[i];
501 cmd_argv[argc - prog_state.cmd_startarg] = NULL;
504 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
505 init_queue();
507 while((fgets_result = fgets(inbuf, sizeof(inbuf), stdin))) {
508 if(prog_state.skip)
509 prog_state.skip--;
510 else {
511 if(!prog_state.cmd_startarg)
512 printf(fgets_result);
513 else {
514 stringptr_fromchar(fgets_result, line);
515 stringptr_chomp(line);
517 max_subst = 0;
518 if(prog_state.subst_entries) {
519 uint32_t* index;
520 sblist_iter(prog_state.subst_entries, index) {
521 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
522 int ret;
523 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
524 if(ret == -1) {
525 too_long:
526 fprintf(stderr, "fatal: line too long for substitution: %s\n", line->ptr);
527 goto out;
528 } else if(!ret) {
529 char* lastdot = stringptr_rchr(line, '.');
530 stringptr tilLastDot = *line;
531 if(lastdot) tilLastDot.size = lastdot - line->ptr;
532 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
533 if(ret == -1) goto too_long;
535 if(ret) {
536 cmd_argv[*index] = subst_buf[max_subst];
537 max_subst++;
543 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
544 msleep(rand() % (prog_state.delayedspinup_interval + 1));
545 spinup_counter++;
548 add_job(cmd_argv);
550 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
551 write_statefile(n, temp_state);
555 n++;
558 out:
560 if(prog_state.delayedflush)
561 write_statefile(n - 1, temp_state);
563 while(prog_state.threads_running) reap_child();
565 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
566 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
567 if(prog_state.limits) sblist_free(prog_state.limits);
569 if(prog_state.tempdir)
570 rmdir(prog_state.tempdir);
573 fflush(stdout);
574 fflush(stderr);
577 return 0;