newline
[rofl0r-jobflow.git] / jobflow.c
blobfaf695d0c0ab5e4302f36dc6bbfd1047f7e255b1
1 #undef _POSIX_C_SOURCE
2 #define _POSIX_C_SOURCE 200809L
3 #undef _XOPEN_SOURCE
4 #define _XOPEN_SOURCE 700
5 #undef _GNU_SOURCE
6 #define _GNU_SOURCE
8 #include "../lib/include/optparser.h"
9 #include "../lib/include/stringptr.h"
10 #include "../lib/include/stringptrlist.h"
11 #include "../lib/include/sblist.h"
12 #include "../lib/include/strlib.h"
13 #include "../lib/include/timelib.h"
14 #include "../lib/include/filelib.h"
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <stdint.h>
20 #include <stddef.h>
21 #include <errno.h>
22 #include <time.h>
24 /* defines the amount of milliseconds to sleep between each call to the reaper,
25 * once all free slots are exhausted */
26 #define SLEEP_MS 21
28 /* defines after how many milliseconds a reap of the running processes is obligatory. */
29 #define REAP_INTERVAL_MS 100
31 /* process handling */
33 #include <fcntl.h>
34 #include <spawn.h>
35 #include <sys/wait.h>
36 #include <sys/stat.h>
38 #include <sys/resource.h>
40 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
41 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
42 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
43 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
44 #include <errno.h>
45 static int prlimit(int pid, ...) {
46 (void) pid;
47 fprintf(stderr, "prlimit() not implemented on this system\n");
48 errno = EINVAL;
49 return -1;
51 #endif
54 #include <sys/time.h>
56 typedef struct {
57 pid_t pid;
58 posix_spawn_file_actions_t fa;
59 } job_info;
61 typedef struct {
62 int limit;
63 struct rlimit rl;
64 } limit_rec;
66 /* defines how many slots our free_slots struct can take */
67 #define MAX_SLOTS 128
69 typedef struct {
70 unsigned numthreads;
71 unsigned threads_running;
72 char* statefile;
73 unsigned skip;
74 sblist* job_infos;
75 sblist* subst_entries;
76 sblist* limits;
77 unsigned cmd_startarg;
78 size_t free_slots[MAX_SLOTS];
79 unsigned free_slots_count;
80 char* tempdir;
81 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
82 the top value in ms can be supplied via a command line switch.
83 this option makes only sense if the interval is somewhat smaller than the
84 expected runtime of the average job.
85 this option is useful to not overload a network app due to hundreds of
86 parallel connection tries on startup.
88 int buffered:1; /* write stdout and stderr of each task into a file,
89 and print it to stdout once the process ends.
90 this prevents mixing up of the output of multiple tasks. */
91 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
92 this means faster program execution, but could also be imprecise if the number of
93 jobs is small or smaller than the available threadcount / MAX_SLOTS. */
94 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
95 } prog_state_s;
97 prog_state_s prog_state;
100 extern char** environ;
102 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
103 int ret = snprintf(buf, bufsize,
104 is_stderr ? "%s/jd_proc_%.5u_stdout.log" : "%s/jd_proc_%.5u_stderr.log",
105 prog_state.tempdir, (unsigned) jobindex);
106 return ret > 0 && (size_t) ret < bufsize;
109 void launch_job(size_t jobindex, char** argv) {
110 char stdout_filename_buf[256];
111 char stderr_filename_buf[256];
112 job_info* job = sblist_get(prog_state.job_infos, jobindex);
114 if(job->pid != -1) return;
116 if(prog_state.buffered) {
117 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
118 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
119 fprintf(stderr, "temp filename too long!\n");
120 return;
124 errno = posix_spawn_file_actions_init(&job->fa);
125 if(errno) goto spawn_error;
126 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
127 if(errno) goto spawn_error;
129 if(prog_state.buffered) {
130 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
131 if(errno) goto spawn_error;
132 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
133 if(errno) goto spawn_error;
136 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
137 if(errno) goto spawn_error;
139 if(prog_state.buffered) {
140 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
141 if(errno) goto spawn_error;
142 if(prog_state.join_output)
143 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
144 else
145 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
146 if(errno) goto spawn_error;
149 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
150 if(errno) {
151 spawn_error:
152 job->pid = -1;
153 perror("posix_spawn");
154 } else {
155 prog_state.threads_running++;
156 if(prog_state.limits) {
157 limit_rec* limit;
158 sblist_iter(prog_state.limits, limit) {
159 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
160 perror("prlimit");
166 static void addJobSlot(size_t job_id) {
167 if(prog_state.free_slots_count < MAX_SLOTS) {
168 prog_state.free_slots[prog_state.free_slots_count] = job_id;
169 prog_state.free_slots_count++;
173 static void dump_output(size_t job_id, int is_stderr) {
174 char out_filename_buf[256];
175 char buf[4096];
176 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
177 size_t nread;
179 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
181 dst = fopen(out_filename_buf, "r");
182 while(dst && (nread = fread(buf, 1, sizeof(buf), dst))) {
183 fwrite(buf, 1, nread, out_stream);
184 if(nread < sizeof(buf)) break;
186 if(dst)
187 fclose(dst);
189 fflush(out_stream);
192 /* reap childs and return pointer to a free "slot" or NULL */
193 static void reapChilds(void) {
194 size_t i;
195 job_info* job;
196 int ret, retval;
198 prog_state.free_slots_count = 0;
200 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
201 job = sblist_get(prog_state.job_infos, i);
202 if(job->pid != -1) {
203 ret = waitpid(job->pid, &retval, WNOHANG);
204 if(ret != 0) {
205 // error or changed state.
206 if(ret == -1) {
207 perror("waitpid");
208 continue;
210 if(!retval) {
211 //log_put(js->log_fd, VARISL(" job finished: "), VARIS(job->prog), NULL);
213 else {
214 //log_put(js->log_fd, VARISL(" got error "), VARII(WEXITSTATUS(retval)), VARISL(" from "), VARIS(job->prog), NULL);
216 job->pid = -1;
217 posix_spawn_file_actions_destroy(&job->fa);
218 //job->passed = 0;
219 addJobSlot(i);
220 prog_state.threads_running--;
222 if(prog_state.buffered) {
223 dump_output(i, 0);
224 if(!prog_state.join_output)
225 dump_output(i, 1);
229 } else
230 addJobSlot(i);
235 __attribute__((noreturn))
236 static void die(const char* msg) {
237 fprintf(stderr, msg);
238 exit(1);
241 static long parse_human_number(stringptr* num) {
242 long ret = 0;
243 char buf[64];
244 if(num && num->size && num->size < sizeof(buf)) {
245 if(num->ptr[num->size -1] == 'G')
246 ret = 1024 * 1024 * 1024;
247 else if(num->ptr[num->size -1] == 'M')
248 ret = 1024 * 1024;
249 else if(num->ptr[num->size -1] == 'K')
250 ret = 1024;
251 if(ret) {
252 memcpy(buf, num->ptr, num->size);
253 buf[num->size] = 0;
254 return atol(buf) * ret;
256 return atol(num->ptr);
258 return ret;
261 static int syntax(void) {
262 puts(
263 "jobflow (C) rofl0r\n"
264 "------------------\n"
265 "this program is intended to be used as a recipient of another programs output\n"
266 "it launches processes to which the current line can be passed as an argument\n"
267 "using {} for substitution (as in find -exec).\n"
268 "\n"
269 "available options:\n\n"
270 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
271 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
272 "-exec ./mycommand {}\n"
273 "\n"
274 "-skip=XXX\n"
275 " XXX=number of entries to skip\n"
276 "-threads=XXX\n"
277 " XXX=number of parallel processes to spawn]\n"
278 "-resume\n"
279 " resume from last jobnumber stored in statefile\n"
280 "-statefile=XXX\n"
281 " XXX=filename\n"
282 " saves last launched jobnumber into a file\n"
283 "-delayedflush\n"
284 " only write to statefile whenever all processes are busy,\n"
285 " and at program end\n"
286 "-delayedspinup=XXX\n"
287 " XXX=maximum amount of milliseconds\n"
288 " ...to wait when spinning up a fresh set of processes\n"
289 " a random value between 0 and the chosen amount is used to delay initial\n"
290 " spinup.\n"
291 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
292 " activity on program startup\n"
293 "-buffered\n"
294 " store the stdout and stderr of launched processes into a temporary file\n"
295 " which will be printed after a process has finished.\n"
296 " this prevents mixing up of output of different processes.\n"
297 "-joinoutput\n"
298 " if -buffered, write both stdout and stderr into the same file.\n"
299 " this saves the chronological order of the output, and the combined output\n"
300 " will only be printed to stdout.\n"
301 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
302 " sets the rlimit of the new created processes.\n"
303 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
304 "-exec command with args\n"
305 " everything past -exec is treated as the command to execute on each line of\n"
306 " stdin received. the line can be passed as an argument using {}."
308 return 1;
311 static int parse_args(int argc, char** argv) {
312 op_state op_b, *op = &op_b;
313 op_init(op, argc, argv);
314 char *op_temp;
315 if(argc == 1 || op_hasflag(op, SPL("-help")))
316 return syntax();
317 op_temp = op_get(op, SPL("threads"));
318 prog_state.numthreads = op_temp ? atoi(op_temp) : 1;
319 op_temp = op_get(op, SPL("statefile"));
320 prog_state.statefile = op_temp;
322 op_temp = op_get(op, SPL("skip"));
323 prog_state.skip = op_temp ? atoi(op_temp) : 0;
324 if(op_hasflag(op, SPL("resume"))) {
325 if(!prog_state.statefile) die("-resume needs -statefile\n");
326 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
327 stringptr* fc = stringptr_fromfile(prog_state.statefile);
328 prog_state.skip = atoi(fc->ptr);
332 prog_state.delayedflush = 0;
333 if(op_hasflag(op, SPL("delayedflush"))) {
334 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
335 prog_state.delayedflush = 1;
338 op_temp = op_get(op, SPL("delayedspinup"));
339 prog_state.delayedspinup_interval = op_temp ? atoi(op_temp) : 0;
341 prog_state.cmd_startarg = 0;
342 prog_state.subst_entries = NULL;
344 if(op_hasflag(op, SPL("exec"))) {
345 uint32_t subst_ent;
346 unsigned i, r = 0;
347 for(i = 1; i < (unsigned) argc; i++) {
348 if(str_equal(argv[i], "-exec")) {
349 r = i + 1;
350 break;
353 if(r && r < (unsigned) argc) {
354 prog_state.cmd_startarg = r;
357 // save entries which must be substituted, to save some cycles.
358 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
359 for(i = r; i < (unsigned) argc; i++) {
360 subst_ent = i - r;
361 if(strstr(argv[i], "{}")) sblist_add(prog_state.subst_entries, &subst_ent);
365 prog_state.buffered = 0;
366 if(op_hasflag(op, SPL("buffered"))) {
367 prog_state.buffered = 1;
370 prog_state.join_output = 0;
371 if(op_hasflag(op, SPL("joinoutput"))) {
372 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
373 prog_state.join_output = 1;
376 prog_state.limits = NULL;
377 op_temp = op_get(op, SPL("limits"));
378 if(op_temp) {
379 unsigned i;
380 SPDECLAREC(limits, op_temp);
381 stringptrlist* limit_list = stringptr_splitc(limits, ',');
382 stringptrlist* kv;
383 stringptr* key, *value;
384 limit_rec lim;
385 if(stringptrlist_getsize(limit_list)) {
386 prog_state.limits = sblist_new(sizeof(limit_rec), stringptrlist_getsize(limit_list));
387 for(i = 0; i < stringptrlist_getsize(limit_list); i++) {
388 kv = stringptr_splitc(stringptrlist_get(limit_list, i), '=');
389 if(stringptrlist_getsize(kv) != 2) continue;
390 key = stringptrlist_get(kv, 0);
391 value = stringptrlist_get(kv, 1);
392 if(EQ(key, SPL("mem")))
393 lim.limit = RLIMIT_AS;
394 else if(EQ(key, SPL("cpu")))
395 lim.limit = RLIMIT_CPU;
396 else if(EQ(key, SPL("stack")))
397 lim.limit = RLIMIT_STACK;
398 else if(EQ(key, SPL("fsize")))
399 lim.limit = RLIMIT_FSIZE;
400 else if(EQ(key, SPL("nofiles")))
401 lim.limit = RLIMIT_NOFILE;
402 else
403 die("unknown option passed to -limits");
405 if(getrlimit(lim.limit, &lim.rl) == -1) {
406 perror("getrlimit");
407 die("could not query rlimits");
409 lim.rl.rlim_cur = parse_human_number(value);
410 sblist_add(prog_state.limits, &lim);
411 stringptrlist_free(kv);
413 stringptrlist_free(limit_list);
416 return 0;
419 static void init_queue(void) {
420 unsigned i;
421 job_info ji;
423 ji.pid = -1;
424 memset(&ji.fa, 0, sizeof(ji.fa));
426 for(i = 0; i < prog_state.numthreads; i++) {
427 sblist_add(prog_state.job_infos, &ji);
431 static void write_statefile(uint64_t n, const char* tempfile) {
432 char numbuf[64];
433 stringptr num_b, *num = &num_b;
435 num_b.ptr = uint64ToString(n + 1, numbuf);
436 num_b.size = strlen(numbuf);
437 stringptr_tofile((char*) tempfile, num);
438 if(rename(tempfile, prog_state.statefile) == -1)
439 perror("rename");
442 int main(int argc, char** argv) {
443 char inbuf[4096]; char* fgets_result, *strstr_result, *p;
444 stringptr line_b, *line = &line_b;
445 char* cmd_argv[4096];
446 char subst_buf[4096][16];
447 unsigned max_subst;
449 struct timeval reapTime;
451 uint64_t n = 0;
452 unsigned i, j;
453 unsigned spinup_counter = 0;
455 char tempdir_buf[256];
456 char temp_state[256];
458 srand(time(NULL));
460 if(argc > 4096) argc = 4096;
461 prog_state.threads_running = 0;
462 prog_state.free_slots_count = 0;
463 gettimestamp(&reapTime);
465 if(parse_args(argc, argv)) return 1;
467 if(prog_state.statefile)
468 ulz_snprintf(temp_state, sizeof(temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
470 prog_state.tempdir = NULL;
472 if(prog_state.buffered) {
473 prog_state.tempdir = tempdir_buf;
474 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
475 perror("mkdtemp");
476 die("could not create tempdir\n");
480 if(prog_state.cmd_startarg) {
481 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
482 cmd_argv[i - prog_state.cmd_startarg] = argv[i];
484 cmd_argv[argc - prog_state.cmd_startarg] = NULL;
487 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
488 init_queue();
490 while((fgets_result = fgets(inbuf, sizeof(inbuf), stdin))) {
491 if(prog_state.skip)
492 prog_state.skip--;
493 else {
494 if(!prog_state.cmd_startarg)
495 printf(fgets_result);
496 else {
497 stringptr_fromchar(fgets_result, line);
498 stringptr_chomp(line);
500 max_subst = 0;
501 if(prog_state.subst_entries) {
502 uint32_t* index;
503 sblist_iter(prog_state.subst_entries, index) {
504 p = argv[*index + prog_state.cmd_startarg];
505 if((strstr_result = strstr(p, "{}"))) {
506 j = 0;
507 j = strstr_result - p;
508 if(j) memcpy(subst_buf[max_subst], p, j);
509 strncpy(&subst_buf[max_subst][j], line->ptr, 4096 - j);
510 j += line->size;
511 if(j > 4096) {
512 fprintf(stderr, "fatal: line too long for substitution: %s\n", line->ptr);
513 goto out;
515 strncpy(&subst_buf[max_subst][j], strstr_result + 2, 4096 - j);
517 cmd_argv[*index] = subst_buf[max_subst];
518 max_subst++;
519 if(max_subst >= 16) die("too many substitutions!\n");
524 while(prog_state.free_slots_count == 0 || mspassed(&reapTime) > REAP_INTERVAL_MS) {
525 reapChilds();
526 gettimestamp(&reapTime);
527 if(!prog_state.free_slots_count) msleep(SLEEP_MS);
530 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
531 msleep(rand() % (prog_state.delayedspinup_interval + 1));
532 spinup_counter++;
535 launch_job(prog_state.free_slots[prog_state.free_slots_count-1], cmd_argv);
536 prog_state.free_slots_count--;
538 if(prog_state.statefile && (prog_state.delayedflush == 0 || prog_state.free_slots_count == 0)) {
539 write_statefile(n, temp_state);
543 n++;
546 out:
548 if(prog_state.delayedflush)
549 write_statefile(n - 1, temp_state);
551 while(prog_state.threads_running) {
552 reapChilds();
553 if(prog_state.threads_running) msleep(SLEEP_MS);
556 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
557 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
558 if(prog_state.limits) sblist_free(prog_state.limits);
560 if(prog_state.tempdir)
561 rmdir(prog_state.tempdir);
563 return 0;