flush on exit
[rofl0r-jobflow.git] / jobflow.c
blobe3996a956f997a1626666d05b3040102d31a0781
1 #undef _POSIX_C_SOURCE
2 #define _POSIX_C_SOURCE 200809L
3 #undef _XOPEN_SOURCE
4 #define _XOPEN_SOURCE 700
5 #undef _GNU_SOURCE
6 #define _GNU_SOURCE
8 #include "../lib/include/optparser.h"
9 #include "../lib/include/stringptr.h"
10 #include "../lib/include/stringptrlist.h"
11 #include "../lib/include/sblist.h"
12 #include "../lib/include/strlib.h"
13 #include "../lib/include/timelib.h"
14 #include "../lib/include/filelib.h"
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <stdint.h>
20 #include <stddef.h>
21 #include <errno.h>
22 #include <time.h>
24 /* defines the amount of milliseconds to sleep between each call to the reaper,
25 * once all free slots are exhausted */
26 #define SLEEP_MS 21
28 /* defines after how many milliseconds a reap of the running processes is obligatory. */
29 #define REAP_INTERVAL_MS 100
31 /* process handling */
33 #include <fcntl.h>
34 #include <spawn.h>
35 #include <sys/wait.h>
36 #include <sys/stat.h>
38 #include <sys/resource.h>
40 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
41 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
42 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
43 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
44 #include <errno.h>
45 static int prlimit(int pid, ...) {
46 (void) pid;
47 fprintf(stderr, "prlimit() not implemented on this system\n");
48 errno = EINVAL;
49 return -1;
51 #endif
54 #include <sys/time.h>
56 typedef struct {
57 pid_t pid;
58 posix_spawn_file_actions_t fa;
59 } job_info;
61 typedef struct {
62 int limit;
63 struct rlimit rl;
64 } limit_rec;
66 /* defines how many slots our free_slots struct can take */
67 #define MAX_SLOTS 128
69 typedef struct {
70 unsigned numthreads;
71 unsigned threads_running;
72 char* statefile;
73 unsigned skip;
74 sblist* job_infos;
75 sblist* subst_entries;
76 sblist* limits;
77 unsigned cmd_startarg;
78 size_t free_slots[MAX_SLOTS];
79 unsigned free_slots_count;
80 char* tempdir;
81 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
82 the top value in ms can be supplied via a command line switch.
83 this option makes only sense if the interval is somewhat smaller than the
84 expected runtime of the average job.
85 this option is useful to not overload a network app due to hundreds of
86 parallel connection tries on startup.
88 int buffered:1; /* write stdout and stderr of each task into a file,
89 and print it to stdout once the process ends.
90 this prevents mixing up of the output of multiple tasks. */
91 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
92 this means faster program execution, but could also be imprecise if the number of
93 jobs is small or smaller than the available threadcount / MAX_SLOTS. */
94 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
95 } prog_state_s;
97 prog_state_s prog_state;
100 extern char** environ;
102 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
103 int ret = snprintf(buf, bufsize,
104 is_stderr ? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
105 prog_state.tempdir, (unsigned) jobindex);
106 return ret > 0 && (size_t) ret < bufsize;
109 void launch_job(size_t jobindex, char** argv) {
110 char stdout_filename_buf[256];
111 char stderr_filename_buf[256];
112 job_info* job = sblist_get(prog_state.job_infos, jobindex);
114 if(job->pid != -1) return;
116 if(prog_state.buffered) {
117 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
118 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
119 fprintf(stderr, "temp filename too long!\n");
120 return;
124 errno = posix_spawn_file_actions_init(&job->fa);
125 if(errno) goto spawn_error;
126 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
127 if(errno) goto spawn_error;
129 if(prog_state.buffered) {
130 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
131 if(errno) goto spawn_error;
132 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
133 if(errno) goto spawn_error;
136 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
137 if(errno) goto spawn_error;
139 if(prog_state.buffered) {
140 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
141 if(errno) goto spawn_error;
142 if(prog_state.join_output)
143 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
144 else
145 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
146 if(errno) goto spawn_error;
149 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
150 if(errno) {
151 spawn_error:
152 job->pid = -1;
153 perror("posix_spawn");
154 } else {
155 prog_state.threads_running++;
156 if(prog_state.limits) {
157 limit_rec* limit;
158 sblist_iter(prog_state.limits, limit) {
159 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
160 perror("prlimit");
166 static void addJobSlot(size_t job_id) {
167 if(prog_state.free_slots_count < MAX_SLOTS) {
168 prog_state.free_slots[prog_state.free_slots_count] = job_id;
169 prog_state.free_slots_count++;
173 static void dump_output(size_t job_id, int is_stderr) {
174 char out_filename_buf[256];
175 char buf[4096];
176 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
177 size_t nread;
179 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
181 dst = fopen(out_filename_buf, "r");
182 while(dst && (nread = fread(buf, 1, sizeof(buf), dst))) {
183 fwrite(buf, 1, nread, out_stream);
184 if(nread < sizeof(buf)) break;
186 if(dst)
187 fclose(dst);
189 fflush(out_stream);
192 /* reap childs and return pointer to a free "slot" or NULL */
193 static void reapChilds(void) {
194 size_t i;
195 job_info* job;
196 int ret, retval;
198 prog_state.free_slots_count = 0;
200 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
201 job = sblist_get(prog_state.job_infos, i);
202 if(job->pid != -1) {
203 ret = waitpid(job->pid, &retval, WNOHANG);
204 if(ret != 0) {
205 // error or changed state.
206 if(ret == -1) {
207 perror("waitpid");
208 continue;
210 if(!retval) {
211 //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL);
213 else {
214 //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL);
216 job->pid = -1;
217 posix_spawn_file_actions_destroy(&job->fa);
218 //job->passed = 0;
219 addJobSlot(i);
220 prog_state.threads_running--;
222 if(prog_state.buffered) {
223 dump_output(i, 0);
224 if(!prog_state.join_output)
225 dump_output(i, 1);
228 } else
229 addJobSlot(i);
234 __attribute__((noreturn))
235 static void die(const char* msg) {
236 fprintf(stderr, msg);
237 exit(1);
240 static long parse_human_number(stringptr* num) {
241 long ret = 0;
242 char buf[64];
243 if(num && num->size && num->size < sizeof(buf)) {
244 if(num->ptr[num->size -1] == 'G')
245 ret = 1024 * 1024 * 1024;
246 else if(num->ptr[num->size -1] == 'M')
247 ret = 1024 * 1024;
248 else if(num->ptr[num->size -1] == 'K')
249 ret = 1024;
250 if(ret) {
251 memcpy(buf, num->ptr, num->size);
252 buf[num->size] = 0;
253 return atol(buf) * ret;
255 return atol(num->ptr);
257 return ret;
260 static int syntax(void) {
261 puts(
262 "jobflow (C) rofl0r\n"
263 "------------------\n"
264 "this program is intended to be used as a recipient of another programs output\n"
265 "it launches processes to which the current line can be passed as an argument\n"
266 "using {} for substitution (as in find -exec).\n"
267 "\n"
268 "available options:\n\n"
269 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
270 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
271 "-exec ./mycommand {}\n"
272 "\n"
273 "-skip=XXX\n"
274 " XXX=number of entries to skip\n"
275 "-threads=XXX\n"
276 " XXX=number of parallel processes to spawn]\n"
277 "-resume\n"
278 " resume from last jobnumber stored in statefile\n"
279 "-statefile=XXX\n"
280 " XXX=filename\n"
281 " saves last launched jobnumber into a file\n"
282 "-delayedflush\n"
283 " only write to statefile whenever all processes are busy,\n"
284 " and at program end\n"
285 "-delayedspinup=XXX\n"
286 " XXX=maximum amount of milliseconds\n"
287 " ...to wait when spinning up a fresh set of processes\n"
288 " a random value between 0 and the chosen amount is used to delay initial\n"
289 " spinup.\n"
290 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
291 " activity on program startup\n"
292 "-buffered\n"
293 " store the stdout and stderr of launched processes into a temporary file\n"
294 " which will be printed after a process has finished.\n"
295 " this prevents mixing up of output of different processes.\n"
296 "-joinoutput\n"
297 " if -buffered, write both stdout and stderr into the same file.\n"
298 " this saves the chronological order of the output, and the combined output\n"
299 " will only be printed to stdout.\n"
300 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
301 " sets the rlimit of the new created processes.\n"
302 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
303 "-exec command with args\n"
304 " everything past -exec is treated as the command to execute on each line of\n"
305 " stdin received. the line can be passed as an argument using {}."
307 return 1;
310 static int parse_args(int argc, char** argv) {
311 op_state op_b, *op = &op_b;
312 op_init(op, argc, argv);
313 char *op_temp;
314 if(argc == 1 || op_hasflag(op, SPL("-help")))
315 return syntax();
316 op_temp = op_get(op, SPL("threads"));
317 prog_state.numthreads = op_temp ? atoi(op_temp) : 1;
318 op_temp = op_get(op, SPL("statefile"));
319 prog_state.statefile = op_temp;
321 op_temp = op_get(op, SPL("skip"));
322 prog_state.skip = op_temp ? atoi(op_temp) : 0;
323 if(op_hasflag(op, SPL("resume"))) {
324 if(!prog_state.statefile) die("-resume needs -statefile\n");
325 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
326 stringptr* fc = stringptr_fromfile(prog_state.statefile);
327 prog_state.skip = atoi(fc->ptr);
331 prog_state.delayedflush = 0;
332 if(op_hasflag(op, SPL("delayedflush"))) {
333 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
334 prog_state.delayedflush = 1;
337 op_temp = op_get(op, SPL("delayedspinup"));
338 prog_state.delayedspinup_interval = op_temp ? atoi(op_temp) : 0;
340 prog_state.cmd_startarg = 0;
341 prog_state.subst_entries = NULL;
343 if(op_hasflag(op, SPL("exec"))) {
344 uint32_t subst_ent;
345 unsigned i, r = 0;
346 for(i = 1; i < (unsigned) argc; i++) {
347 if(str_equal(argv[i], "-exec")) {
348 r = i + 1;
349 break;
352 if(r && r < (unsigned) argc) {
353 prog_state.cmd_startarg = r;
356 // save entries which must be substituted, to save some cycles.
357 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
358 for(i = r; i < (unsigned) argc; i++) {
359 subst_ent = i - r;
360 if(strstr(argv[i], "{}")) sblist_add(prog_state.subst_entries, &subst_ent);
364 prog_state.buffered = 0;
365 if(op_hasflag(op, SPL("buffered"))) {
366 prog_state.buffered = 1;
369 prog_state.join_output = 0;
370 if(op_hasflag(op, SPL("joinoutput"))) {
371 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
372 prog_state.join_output = 1;
375 prog_state.limits = NULL;
376 op_temp = op_get(op, SPL("limits"));
377 if(op_temp) {
378 unsigned i;
379 SPDECLAREC(limits, op_temp);
380 stringptrlist* limit_list = stringptr_splitc(limits, ',');
381 stringptrlist* kv;
382 stringptr* key, *value;
383 limit_rec lim;
384 if(stringptrlist_getsize(limit_list)) {
385 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
386 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
387 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
388 if(stringptrlist_getsize(kv) != 2) continue;
389 key = stringptrlist_get(kv, 0);
390 value = stringptrlist_get(kv, 1);
391 if(EQ(key, SPL("mem")))
392 lim.limit = RLIMIT_AS;
393 else if(EQ(key, SPL("cpu")))
394 lim.limit = RLIMIT_CPU;
395 else if(EQ(key, SPL("stack")))
396 lim.limit = RLIMIT_STACK;
397 else if(EQ(key, SPL("fsize")))
398 lim.limit = RLIMIT_FSIZE;
399 else if(EQ(key, SPL("nofiles")))
400 lim.limit = RLIMIT_NOFILE;
401 else
402 die("unknown option passed to -limits");
404 if(getrlimit(lim.limit, &lim.rl) == -1) {
405 perror("getrlimit");
406 die("could not query rlimits");
408 lim.rl.rlim_cur = parse_human_number(value);
409 sblist_add(prog_state.limits, &lim);
410 stringptrlist_free(kv);
412 stringptrlist_free(limit_list);
415 return 0;
418 static void init_queue(void) {
419 unsigned i;
420 job_info ji;
422 ji.pid = -1;
423 memset(&ji.fa, 0, sizeof(ji.fa));
425 for(i = 0; i < prog_state.numthreads; i++) {
426 sblist_add(prog_state.job_infos, &ji);
430 static void write_statefile(uint64_t n, const char* tempfile) {
431 char numbuf[64];
432 stringptr num_b, *num = &num_b;
434 num_b.ptr = uint64ToString(n + 1, numbuf);
435 num_b.size = strlen(numbuf);
436 stringptr_tofile((char*) tempfile, num);
437 if(rename(tempfile, prog_state.statefile) == -1)
438 perror("rename");
441 int main(int argc, char** argv) {
442 char inbuf[4096]; char* fgets_result, *strstr_result, *p;
443 stringptr line_b, *line = &line_b;
444 char* cmd_argv[4096];
445 char subst_buf[4096][16];
446 unsigned max_subst;
448 struct timeval reapTime;
450 uint64_t n = 0;
451 unsigned i, j;
452 unsigned spinup_counter = 0;
454 char tempdir_buf[256];
455 char temp_state[256];
457 srand(time(NULL));
459 if(argc > 4096) argc = 4096;
460 prog_state.threads_running = 0;
461 prog_state.free_slots_count = 0;
462 gettimestamp(&reapTime);
464 if(parse_args(argc, argv)) return 1;
466 if(prog_state.statefile)
467 ulz_snprintf(temp_state, sizeof(temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
469 prog_state.tempdir = NULL;
471 if(prog_state.buffered) {
472 prog_state.tempdir = tempdir_buf;
473 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
474 perror("mkdtemp");
475 die("could not create tempdir\n");
479 if(prog_state.cmd_startarg) {
480 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
481 cmd_argv[i - prog_state.cmd_startarg] = argv[i];
483 cmd_argv[argc - prog_state.cmd_startarg] = NULL;
486 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
487 init_queue();
489 while((fgets_result = fgets(inbuf, sizeof(inbuf), stdin))) {
490 if(prog_state.skip)
491 prog_state.skip--;
492 else {
493 if(!prog_state.cmd_startarg)
494 printf(fgets_result);
495 else {
496 stringptr_fromchar(fgets_result, line);
497 stringptr_chomp(line);
499 max_subst = 0;
500 if(prog_state.subst_entries) {
501 uint32_t* index;
502 sblist_iter(prog_state.subst_entries, index) {
503 p = argv[*index + prog_state.cmd_startarg];
504 if((strstr_result = strstr(p, "{}"))) {
505 j = 0;
506 j = strstr_result - p;
507 if(j) memcpy(subst_buf[max_subst], p, j);
508 strncpy(&subst_buf[max_subst][j], line->ptr, 4096 - j);
509 j += line->size;
510 if(j > 4096) {
511 fprintf(stderr, "fatal: line too long for substitution: %s\n", line->ptr);
512 goto out;
514 strncpy(&subst_buf[max_subst][j], strstr_result + 2, 4096 - j);
516 cmd_argv[*index] = subst_buf[max_subst];
517 max_subst++;
518 if(max_subst >= 16) die("too many substitutions!\n");
523 while(prog_state.free_slots_count == 0 || mspassed(&reapTime) > REAP_INTERVAL_MS) {
524 reapChilds();
525 gettimestamp(&reapTime);
526 if(!prog_state.free_slots_count) msleep(SLEEP_MS);
529 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
530 msleep(rand() % (prog_state.delayedspinup_interval + 1));
531 spinup_counter++;
534 launch_job(prog_state.free_slots[prog_state.free_slots_count-1], cmd_argv);
535 prog_state.free_slots_count--;
537 if(prog_state.statefile && (prog_state.delayedflush == 0 || prog_state.free_slots_count == 0)) {
538 write_statefile(n, temp_state);
542 n++;
545 out:
547 if(prog_state.delayedflush)
548 write_statefile(n - 1, temp_state);
550 while(prog_state.threads_running) {
551 reapChilds();
552 if(prog_state.threads_running) msleep(SLEEP_MS);
555 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
556 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
557 if(prog_state.limits) sblist_free(prog_state.limits);
559 if(prog_state.tempdir)
560 rmdir(prog_state.tempdir);
563 fflush(stdout);
564 fflush(stderr);
567 return 0;