replace use of stringptr_rchr()
[rofl0r-jobflow.git] / jobflow.c
blob2362b7e4c2c2bb8298cbdad5332f268a87452a71
1 /*
2 Copyright (C) 2012,2014,2016,2017,2018 rofl0r
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #define VERSION "1.2.4"
22 #undef _POSIX_C_SOURCE
23 #define _POSIX_C_SOURCE 200809L
24 #undef _XOPEN_SOURCE
25 #define _XOPEN_SOURCE 700
26 #undef _GNU_SOURCE
27 #define _GNU_SOURCE
29 #include "../lib/include/optparser.h"
30 #include "../lib/include/stringptr.h"
31 #include "../lib/include/sblist.h"
32 #include "../lib/include/strlib.h"
33 #include "../lib/include/macros.h"
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <stdint.h>
39 #include <stddef.h>
40 #include <errno.h>
41 #include <time.h>
42 #include <assert.h>
43 #include <ctype.h>
44 #include <sys/mman.h>
46 /* process handling */
48 #include <fcntl.h>
49 #include <spawn.h>
50 #include <sys/wait.h>
51 #include <sys/stat.h>
53 #include <sys/resource.h>
55 #if defined(__GLIBC__) && (__GLIBC__ < 3) && (__GLIBC_MINOR__ < 13)
56 /* http://repo.or.cz/w/glibc.git/commitdiff/c08fb0d7bba4015078406b28d3906ccc5fda9d5a ,
57 * http://repo.or.cz/w/glibc.git/commitdiff/052fa7b33ef5deeb4987e5264cf397b3161d8a01 */
58 #warning to use prlimit() you have to use musl libc 0.8.4+ or glibc 2.13+
59 static int prlimit(int pid, ...) {
60 (void) pid;
61 dprintf(2, "prlimit() not implemented on this system\n");
62 errno = EINVAL;
63 return -1;
65 #endif
67 #include <sys/time.h>
69 /* some small helper funcs from libulz */
71 static int msleep(long millisecs) {
72 struct timespec req, rem;
73 req.tv_sec = millisecs / 1000;
74 req.tv_nsec = (millisecs % 1000) * 1000 * 1000;
75 int ret;
76 while((ret = nanosleep(&req, &rem)) == -1 && errno == EINTR) req = rem;
77 return ret;
80 static const char ulz_conv_cypher[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
81 static const size_t ulz_conv_cypher_len = sizeof(ulz_conv_cypher) - 1;
82 static char* ulz_mkdtemp(char* templ) {
83 size_t i, l = strlen(templ);
84 if(l < 6) {
85 errno = EINVAL;
86 return NULL;
88 loop:
89 for(i = l - 6; i < l; i++) templ[i] = ulz_conv_cypher[rand() % ulz_conv_cypher_len];
90 if(mkdir(templ, S_IRWXU) == -1) {
91 if(errno == EEXIST) goto loop;
92 return NULL;
94 return templ;
97 static size_t gen_fn(char* buf, const char* prefix, size_t pl, const char* tmpdir) {
98 size_t tl = strlen(tmpdir);
99 size_t a = 0;
100 memcpy(buf+a, tmpdir, tl);
101 a+=tl;
102 memcpy(buf+a,prefix,pl);
103 a+=pl;
104 memcpy(buf+a,"XXXXXX", 7);
105 return a+6;
108 /* calls mkdtemp on /dev/shm and on failure on /tmp, to get the fastest possible
109 * storage. returns size of the string returned in buffer */
110 static size_t mktempdir(const char* prefix, char* buffer, size_t bufsize) {
111 size_t ret, pl = strlen(prefix);
112 if(bufsize < sizeof("/dev/shm/") -1 + pl + sizeof("XXXXXX")) return 0;
113 ret = gen_fn(buffer, prefix, pl, "/dev/shm/");
114 if(!ulz_mkdtemp(buffer)) {
115 ret = gen_fn(buffer, prefix, pl, "/tmp/");
116 if(!ulz_mkdtemp(buffer)) return 0;
118 return ret;
122 typedef struct {
123 pid_t pid;
124 int pipe;
125 posix_spawn_file_actions_t fa;
126 } job_info;
128 typedef struct {
129 int limit;
130 struct rlimit rl;
131 } limit_rec;
133 typedef struct {
134 char temp_state[256];
135 char* cmd_argv[4096];
136 unsigned long long lineno;
137 unsigned numthreads;
138 unsigned threads_running;
139 char* statefile;
140 char* eof_marker;
141 unsigned long long skip;
142 sblist* job_infos;
143 sblist* subst_entries;
144 sblist* limits;
145 unsigned cmd_startarg;
146 char* tempdir;
147 int delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
148 the top value in ms can be supplied via a command line switch.
149 this option makes only sense if the interval is somewhat smaller than the
150 expected runtime of the average job.
151 this option is useful to not overload a network app due to hundreds of
152 parallel connection tries on startup.
154 int buffered:1; /* write stdout and stderr of each task into a file,
155 and print it to stdout once the process ends.
156 this prevents mixing up of the output of multiple tasks. */
157 int delayedflush:1; /* only write to statefile whenever all processes are busy, and at program end.
158 this means faster program execution, but could also be imprecise if the number of
159 jobs is small or smaller than the available threadcount. */
160 int join_output:1; /* join stdout and stderr of launched jobs into stdout */
161 int pipe_mode:1;
162 size_t bulk_bytes;
163 } prog_state_s;
165 prog_state_s prog_state;
168 extern char** environ;
170 int makeLogfilename(char* buf, size_t bufsize, size_t jobindex, int is_stderr) {
171 int ret = snprintf(buf, bufsize, "%s/jd_proc_%.5lu_std%s.log",
172 prog_state.tempdir, (unsigned long) jobindex, is_stderr ? "err" : "out");
173 return ret > 0 && (size_t) ret < bufsize;
176 void launch_job(size_t jobindex, char** argv) {
177 char stdout_filename_buf[256];
178 char stderr_filename_buf[256];
179 job_info* job = sblist_get(prog_state.job_infos, jobindex);
181 if(job->pid != -1) return;
183 if(prog_state.buffered) {
184 if((!makeLogfilename(stdout_filename_buf, sizeof(stdout_filename_buf), jobindex, 0)) ||
185 ((!prog_state.join_output) && !makeLogfilename(stderr_filename_buf, sizeof(stderr_filename_buf), jobindex, 1)) ) {
186 dprintf(2, "temp filename too long!\n");
187 return;
191 errno = posix_spawn_file_actions_init(&job->fa);
192 if(errno) goto spawn_error;
194 errno = posix_spawn_file_actions_addclose(&job->fa, 0);
195 if(errno) goto spawn_error;
197 int pipes[2];
198 if(prog_state.pipe_mode) {
199 if(pipe(pipes)) {
200 perror("pipe");
201 goto spawn_error;
203 job->pipe = pipes[1];
204 errno = posix_spawn_file_actions_adddup2(&job->fa, pipes[0], 0);
205 if(errno) goto spawn_error;
206 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[0]);
207 if(errno) goto spawn_error;
208 errno = posix_spawn_file_actions_addclose(&job->fa, pipes[1]);
209 if(errno) goto spawn_error;
212 if(prog_state.buffered) {
213 errno = posix_spawn_file_actions_addclose(&job->fa, 1);
214 if(errno) goto spawn_error;
215 errno = posix_spawn_file_actions_addclose(&job->fa, 2);
216 if(errno) goto spawn_error;
219 if(!prog_state.pipe_mode) {
220 errno = posix_spawn_file_actions_addopen(&job->fa, 0, "/dev/null", O_RDONLY, 0);
221 if(errno) goto spawn_error;
224 if(prog_state.buffered) {
225 errno = posix_spawn_file_actions_addopen(&job->fa, 1, stdout_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
226 if(errno) goto spawn_error;
227 if(prog_state.join_output)
228 errno = posix_spawn_file_actions_adddup2(&job->fa, 1, 2);
229 else
230 errno = posix_spawn_file_actions_addopen(&job->fa, 2, stderr_filename_buf, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
231 if(errno) goto spawn_error;
234 errno = posix_spawnp(&job->pid, argv[0], &job->fa, NULL, argv, environ);
235 if(errno) {
236 spawn_error:
237 job->pid = -1;
238 perror("posix_spawn");
239 } else {
240 prog_state.threads_running++;
241 if(prog_state.limits) {
242 limit_rec* limit;
243 sblist_iter(prog_state.limits, limit) {
244 if(prlimit(job->pid, limit->limit, &limit->rl, NULL) == -1)
245 perror("prlimit");
249 if(prog_state.pipe_mode)
250 close(pipes[0]);
253 static void dump_output(size_t job_id, int is_stderr) {
254 char out_filename_buf[256];
255 char buf[4096];
256 FILE* dst, *out_stream = is_stderr ? stderr : stdout;
257 size_t nread;
259 makeLogfilename(out_filename_buf, sizeof(out_filename_buf), job_id, is_stderr);
261 dst = fopen(out_filename_buf, "r");
262 if(dst) {
263 while((nread = fread(buf, 1, sizeof(buf), dst))) {
264 fwrite(buf, 1, nread, out_stream);
265 if(nread < sizeof(buf)) break;
267 fclose(dst);
268 fflush(out_stream);
269 unlink(out_filename_buf);
273 static void write_all(int fd, void* buf, size_t size) {
274 size_t left = size;
275 const char *p = buf;
276 while(1) {
277 if(left == 0) break;
278 ssize_t n = write(fd, p, left);
279 switch(n) {
280 case -1:
281 if(errno == EINTR) continue;
282 else {
283 perror("write");
284 return;
286 default:
287 p += n;
288 left -= n;
293 static void pass_stdin(char *line, size_t len) {
294 static size_t next_child = 0;
295 if(next_child >= sblist_getsize(prog_state.job_infos))
296 next_child = 0;
297 job_info *job = sblist_get(prog_state.job_infos, next_child);
298 write_all(job->pipe, line, len);
299 next_child++;
302 static void close_pipes(void) {
303 size_t i;
304 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
305 job_info *job = sblist_get(prog_state.job_infos, i);
306 close(job->pipe);
310 /* wait till a child exits, reap it, and return its job index for slot reuse */
311 static size_t reap_child(void) {
312 size_t i;
313 job_info* job;
314 int ret, retval;
316 do ret = waitpid(-1, &retval, 0);
317 while(ret == -1 || !WIFEXITED(retval));
319 for(i = 0; i < sblist_getsize(prog_state.job_infos); i++) {
320 job = sblist_get(prog_state.job_infos, i);
321 if(job->pid == ret) {
322 job->pid = -1;
323 posix_spawn_file_actions_destroy(&job->fa);
324 prog_state.threads_running--;
325 if(prog_state.buffered) {
326 dump_output(i, 0);
327 if(!prog_state.join_output)
328 dump_output(i, 1);
330 return i;
333 assert(0);
334 return -1;
337 static size_t free_slots(void) {
338 return prog_state.numthreads - prog_state.threads_running;
341 __attribute__((noreturn))
342 static void die(const char* msg) {
343 dprintf(2, msg);
344 exit(1);
347 static unsigned long parse_human_number(const char* num) {
348 unsigned long ret = 0;
349 static const unsigned long mul[] = {1024, 1024 * 1024, 1024 * 1024 * 1024};
350 const char* kmg = "KMG";
351 const char* kmgind, *p;
352 ret = atol(num);
353 p = num;
354 while(isdigit(*(++p)));
355 if(*p && (kmgind = strchr(kmg, *p)))
356 ret *= mul[kmgind - kmg];
357 return ret;
360 static int syntax(void) {
361 dprintf(2,
362 "jobflow " VERSION " (C) rofl0r\n"
363 "------------------------\n"
364 "this program is intended to be used as a recipient of another programs output\n"
365 "it launches processes to which the current line can be passed as an argument\n"
366 "using {} for substitution (as in find -exec).\n"
367 "if no substitution argument ({} or {.}) is provided, input is piped into\n"
368 "stdin of child processes. input will be then evenly distributed to jobs,\n"
369 "until EOF is received. we call this 'pipe mode'.\n"
370 "\n"
371 "available options:\n\n"
372 "-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
373 "-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
374 "-eof=XXX\n"
375 "-exec ./mycommand {}\n"
376 "\n"
377 "-skip=XXX\n"
378 " XXX=number of entries to skip\n"
379 "-threads=XXX (alternative: -j=XXX)\n"
380 " XXX=number of parallel processes to spawn\n"
381 "-resume\n"
382 " resume from last jobnumber stored in statefile\n"
383 "-eof=XXX\n"
384 " use XXX as the EOF marker on stdin\n"
385 " if the marker is encountered, behave as if stdin was closed\n"
386 " not compatible with pipe/bulk mode\n"
387 "-statefile=XXX\n"
388 " XXX=filename\n"
389 " saves last launched jobnumber into a file\n"
390 "-delayedflush\n"
391 " only write to statefile whenever all processes are busy,\n"
392 " and at program end\n"
393 "-delayedspinup=XXX\n"
394 " XXX=maximum amount of milliseconds\n"
395 " ...to wait when spinning up a fresh set of processes\n"
396 " a random value between 0 and the chosen amount is used to delay initial\n"
397 " spinup.\n"
398 " this can be handy to circumvent an I/O lockdown because of a burst of \n"
399 " activity on program startup\n"
400 "-buffered\n"
401 " store the stdout and stderr of launched processes into a temporary file\n"
402 " which will be printed after a process has finished.\n"
403 " this prevents mixing up of output of different processes.\n"
404 "-joinoutput\n"
405 " if -buffered, write both stdout and stderr into the same file.\n"
406 " this saves the chronological order of the output, and the combined output\n"
407 " will only be printed to stdout.\n"
408 "-bulk=XXX\n"
409 " do bulk copies with a buffer of XXX bytes. only usable in pipe mode.\n"
410 " this passes (almost) the entire buffer to the next scheduled job.\n"
411 " the passed buffer will be truncated to the last line break boundary,\n"
412 " so jobs always get entire lines to work with.\n"
413 " this option is useful when you have huge input files and relatively short\n"
414 " task runtimes. by using it, syscall overhead can be reduced to a minimum.\n"
415 " XXX must be a multiple of 4KB. the suffixes G/M/K are detected.\n"
416 " actual memory allocation will be twice the amount passed.\n"
417 " note that pipe buffer size is limited to 64K on linux, so anything higher\n"
418 " than that probably doesn't make sense.\n"
419 " if no size is passed (i.e. only -bulk), a default of 4K will be used.\n"
420 "-limits=[mem=XXX,cpu=XXX,stack=XXX,fsize=XXX,nofiles=XXX]\n"
421 " sets the rlimit of the new created processes.\n"
422 " see \"man setrlimit\" for an explanation. the suffixes G/M/K are detected.\n"
423 "-exec command with args\n"
424 " everything past -exec is treated as the command to execute on each line of\n"
425 " stdin received. the line can be passed as an argument using {}.\n"
426 " {.} passes everything before the last dot in a line as an argument.\n"
427 " it is possible to use multiple substitutions inside a single argument,\n"
428 " but currently only of one type.\n"
429 " if -exec is omitted, input will merely be dumped to stdout (like cat).\n"
430 "\n"
432 return 1;
435 #undef strtoll
436 #define strtoll(a,b,c) strtoint64(a, strlen(a))
437 static int parse_args(int argc, char** argv) {
438 op_state op_b, *op = &op_b;
439 op_init(op, argc, argv);
440 char *op_temp;
441 if(op_hasflag(op, SPL("help")))
442 return syntax();
444 op_temp = op_get(op, SPL("threads"));
445 if(!op_temp) op_temp = op_get(op, SPL("j"));
446 long long x = op_temp ? strtoll(op_temp,0,10) : 1;
447 if(x <= 0) die("threadcount must be >= 1\n");
448 prog_state.numthreads = x;
450 op_temp = op_get(op, SPL("statefile"));
451 prog_state.statefile = op_temp;
453 op_temp = op_get(op, SPL("eof"));
454 prog_state.eof_marker = op_temp;
456 op_temp = op_get(op, SPL("skip"));
457 prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
458 if(op_hasflag(op, SPL("resume"))) {
459 if(!prog_state.statefile) die("-resume needs -statefile\n");
460 if(access(prog_state.statefile, W_OK | R_OK) != -1) {
461 FILE *f = fopen(prog_state.statefile, "r");
462 if(f) {
463 char nb[64];
464 if(fgets(nb, sizeof nb, f)) prog_state.skip = strtoll(nb,0,10);
465 fclose(f);
470 prog_state.delayedflush = 0;
471 if(op_hasflag(op, SPL("delayedflush"))) {
472 if(!prog_state.statefile) die("-delayedflush needs -statefile\n");
473 prog_state.delayedflush = 1;
476 prog_state.pipe_mode = 0;
478 op_temp = op_get(op, SPL("delayedspinup"));
479 prog_state.delayedspinup_interval = op_temp ? strtoll(op_temp,0,10) : 0;
481 prog_state.cmd_startarg = 0;
482 prog_state.subst_entries = NULL;
484 if(op_hasflag(op, SPL("exec"))) {
485 uint32_t subst_ent;
486 unsigned i, r = 0;
487 for(i = 1; i < (unsigned) argc; i++) {
488 if(str_equal(argv[i], "-exec") || str_equal(argv[i], "--exec")) {
489 r = i + 1;
490 break;
493 if(r && r < (unsigned) argc) {
494 prog_state.cmd_startarg = r;
497 prog_state.subst_entries = sblist_new(sizeof(uint32_t), 16);
499 // save entries which must be substituted, to save some cycles.
500 for(i = r; i < (unsigned) argc; i++) {
501 subst_ent = i - r;
502 if(strstr(argv[i], "{}") || strstr(argv[i], "{.}")) {
503 sblist_add(prog_state.subst_entries, &subst_ent);
506 if(sblist_getsize(prog_state.subst_entries) == 0) {
507 prog_state.pipe_mode = 1;
508 sblist_free(prog_state.subst_entries);
509 prog_state.subst_entries = 0;
513 prog_state.buffered = 0;
514 if(op_hasflag(op, SPL("buffered"))) {
515 prog_state.buffered = 1;
518 prog_state.join_output = 0;
519 if(op_hasflag(op, SPL("joinoutput"))) {
520 if(!prog_state.buffered) die("-joinoutput needs -buffered\n");
521 prog_state.join_output = 1;
524 prog_state.bulk_bytes = 0;
525 op_temp = op_get(op, SPL("bulk"));
526 if(op_temp) {
527 prog_state.bulk_bytes = parse_human_number(op_temp);
528 if(prog_state.bulk_bytes % 4096)
529 die("bulk size must be a multiple of 4096\n");
530 } else if(op_hasflag(op, SPL("bulk")))
531 prog_state.bulk_bytes = 4096;
533 prog_state.limits = NULL;
534 op_temp = op_get(op, SPL("limits"));
535 if(op_temp) {
536 unsigned i;
537 char *limits = op_temp;
538 while(1) {
539 limits += strspn(limits, ",");
540 size_t l = strcspn(limits, ",");
541 if(!l) break;
542 size_t l2 = strcspn(limits, "=");
543 if(l2 >= l) die("syntax error in limits argument");
544 limit_rec lim;
545 if(!prog_state.limits)
546 prog_state.limits = sblist_new(sizeof(limit_rec), 4);
547 static const struct { int lim_val; const char lim_name[8]; } lim_tab[] = {
548 { RLIMIT_AS, "mem" },
549 { RLIMIT_CPU, "cpu" },
550 { RLIMIT_STACK, "stack" },
551 { RLIMIT_FSIZE, "fsize" },
552 { RLIMIT_NOFILE, "nofiles" },
554 for(i=0; i<ARRAY_SIZE(lim_tab);++i)
555 if(!strncmp(limits, lim_tab[i].lim_name, l2)) {
556 lim.limit = lim_tab[i].lim_val;
557 break;
559 if(i >= ARRAY_SIZE(lim_tab))
560 die("unknown option passed to -limits");
561 if(getrlimit(lim.limit, &lim.rl) == -1) {
562 perror("getrlimit");
563 die("could not query rlimits");
565 lim.rl.rlim_cur = parse_human_number(limits+l2+1);
566 sblist_add(prog_state.limits, &lim);
567 limits += l;
570 return 0;
573 static void init_queue(void) {
574 unsigned i;
575 job_info ji = {.pid = -1};
577 for(i = 0; i < prog_state.numthreads; i++)
578 sblist_add(prog_state.job_infos, &ji);
581 static void write_statefile(unsigned long long n, const char* tempfile) {
582 int fd = open(tempfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
583 if(fd != -1) {
584 dprintf(fd, "%llu\n", n + 1ULL);
585 close(fd);
586 if(rename(tempfile, prog_state.statefile) == -1)
587 perror("rename");
588 } else
589 perror("open");
592 // returns numbers of substitutions done, -1 on out of buffer.
593 // dest is always overwritten. if not substitutions were done, it contains a copy of source.
594 int substitute_all(char* dest, ssize_t dest_size, stringptr* source, stringptr* what, stringptr* whit) {
595 size_t i;
596 int ret = 0;
597 for(i = 0; dest_size > 0 && i < source->size; ) {
598 if(stringptr_here(source, i, what)) {
599 if(dest_size < (ssize_t) whit->size) return -1;
600 memcpy(dest, whit->ptr, whit->size);
601 dest += whit->size;
602 dest_size -= whit->size;
603 ret++;
604 i += what->size;
605 } else {
606 *dest = source->ptr[i];
607 dest++;
608 dest_size--;
609 i++;
612 if(!dest_size) return -1;
613 *dest = 0;
614 return ret;
617 static char* mystrnchr(const char *in, int ch, size_t end) {
618 const char *e = in+end;
619 const char *p = in;
620 while(p != e && *p != ch) p++;
621 if(p != e) return (char*)p;
622 return 0;
624 static char* mystrnrchr(const char *in, int ch, size_t end) {
625 const char *e = in+end-1;
626 const char *p = in;
627 while(p != e && *e != ch) e--;
628 if(*e == ch) return (char*)e;
629 return 0;
631 static char* mystrnrchr_chk(const char *in, int ch, size_t end) {
632 if(!end) return 0;
633 return mystrnrchr(in, ch, end);
636 static int need_linecounter(void) {
637 return !!prog_state.skip || prog_state.statefile;
639 static size_t count_linefeeds(const char *buf, size_t len) {
640 const char *p = buf, *e = buf+len;
641 size_t cnt = 0;
642 while(p < e) {
643 if(*p == '\n') cnt++;
644 p++;
646 return cnt;
649 static int match_eof(char* inbuf, size_t len) {
650 if(!prog_state.eof_marker) return 0;
651 size_t l = strlen(prog_state.eof_marker);
652 return l == len-1 && !memcmp(prog_state.eof_marker, inbuf, l);
655 static inline int islb(int p) { return p == '\n' || p == '\r'; }
656 static void chomp(char *s, size_t *len) {
657 while(*len && islb(s[*len-1])) s[--(*len)] = 0;
660 #define MAX_SUBSTS 16
661 static int dispatch_line(char* inbuf, size_t len, char** argv) {
662 char subst_buf[MAX_SUBSTS][4096];
663 static unsigned spinup_counter = 0;
665 stringptr line_b, *line = &line_b;
667 if(!prog_state.bulk_bytes)
668 prog_state.lineno++;
669 else if(need_linecounter()) {
670 prog_state.lineno += count_linefeeds(inbuf, len);
673 if(prog_state.skip) {
674 if(!prog_state.bulk_bytes) {
675 prog_state.skip--;
676 return 1;
677 } else {
678 while(len && prog_state.skip) {
679 char *q = mystrnchr(inbuf, '\n', len);
680 if(q) {
681 ptrdiff_t diff = (q - inbuf) + 1;
682 inbuf += diff;
683 len -= diff;
684 prog_state.skip--;
685 } else {
686 return 1;
689 if(!len) return 1;
692 if(!prog_state.cmd_startarg) {
693 write_all(1, inbuf, len);
694 return 1;
697 if(!prog_state.pipe_mode)
698 chomp(inbuf, &len);
700 line->ptr = inbuf; line->size = len;
702 if(prog_state.subst_entries) {
703 unsigned max_subst = 0;
704 uint32_t* index;
705 sblist_iter(prog_state.subst_entries, index) {
706 if(max_subst >= MAX_SUBSTS) break;
707 SPDECLAREC(source, argv[*index + prog_state.cmd_startarg]);
708 int ret;
709 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{}"), line);
710 if(ret == -1) {
711 too_long:
712 dprintf(2, "fatal: line too long for substitution: %s\n", line->ptr);
713 return 0;
714 } else if(!ret) {
715 char* lastdot = mystrnrchr_chk(line->ptr, '.', line->size);
716 stringptr tilLastDot = *line;
717 if(lastdot) tilLastDot.size = lastdot - line->ptr;
718 ret = substitute_all(subst_buf[max_subst], 4096, source, SPL("{.}"), &tilLastDot);
719 if(ret == -1) goto too_long;
721 if(ret) {
722 prog_state.cmd_argv[*index] = subst_buf[max_subst];
723 max_subst++;
729 if(prog_state.delayedspinup_interval && spinup_counter < (prog_state.numthreads * 2)) {
730 msleep(rand() % (prog_state.delayedspinup_interval + 1));
731 spinup_counter++;
734 if(free_slots())
735 launch_job(prog_state.threads_running, prog_state.cmd_argv);
736 else if(!prog_state.pipe_mode)
737 launch_job(reap_child(), prog_state.cmd_argv);
739 if(prog_state.statefile && (prog_state.delayedflush == 0 || free_slots() == 0)) {
740 write_statefile(prog_state.lineno, prog_state.temp_state);
743 if(prog_state.pipe_mode)
744 pass_stdin(line->ptr, line->size);
746 return 1;
749 int main(int argc, char** argv) {
750 unsigned i;
752 char tempdir_buf[256];
754 srand(time(NULL));
756 if(argc > 4096) argc = 4096;
758 prog_state.threads_running = 0;
760 if(parse_args(argc, argv)) return 1;
762 if(prog_state.statefile)
763 snprintf(prog_state.temp_state, sizeof(prog_state.temp_state), "%s.%u", prog_state.statefile, (unsigned) getpid());
765 prog_state.tempdir = NULL;
767 if(prog_state.buffered) {
768 prog_state.tempdir = tempdir_buf;
769 if(mktempdir("jobflow", tempdir_buf, sizeof(tempdir_buf)) == 0) {
770 perror("mkdtemp");
771 die("could not create tempdir\n");
773 } else {
774 /* if the stdout/stderr fds are not in O_APPEND mode,
775 the dup()'s of the fds in posix_spawn can cause different
776 file positions, causing the different processes to overwrite each others output.
777 testcase:
778 seq 100 | ./jobflow.out -threads=100 -exec echo {} > test.tmp ; wc -l test.tmp
780 if(fcntl(1, F_SETFL, O_APPEND) == -1) perror("fcntl");
781 if(fcntl(2, F_SETFL, O_APPEND) == -1) perror("fcntl");
784 if(prog_state.cmd_startarg) {
785 for(i = prog_state.cmd_startarg; i < (unsigned) argc; i++) {
786 prog_state.cmd_argv[i - prog_state.cmd_startarg] = argv[i];
788 prog_state.cmd_argv[argc - prog_state.cmd_startarg] = NULL;
791 prog_state.job_infos = sblist_new(sizeof(job_info), prog_state.numthreads);
792 init_queue();
794 prog_state.lineno = 0;
796 size_t left = 0, bytes_read = 0;
797 const size_t chunksize = prog_state.bulk_bytes ? prog_state.bulk_bytes : 16*1024;
799 char *mem = mmap(NULL, chunksize*2, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0);
800 char *buf1 = mem;
801 char *buf2 = mem+chunksize;
802 char *in, *inbuf;
804 int exitcode = 1;
806 while(1) {
807 inbuf = buf1+chunksize-left;
808 memcpy(inbuf, buf2+bytes_read-left, left);
809 ssize_t n = read(0, buf2, chunksize);
810 if(n == -1) {
811 perror("read");
812 goto out;
814 bytes_read = n;
815 left += n;
816 in = inbuf;
817 while(left) {
818 char *p;
819 if(prog_state.pipe_mode && prog_state.bulk_bytes)
820 p = mystrnrchr(in, '\n', left);
821 else
822 p = mystrnchr (in, '\n', left);
824 if(!p) break;
825 ptrdiff_t diff = (p - in) + 1;
826 if(match_eof(in, diff)) {
827 exitcode = 0;
828 goto out;
830 if(!dispatch_line(in, diff, argv))
831 goto out;
832 left -= diff;
833 in += diff;
835 if(!n) {
836 if(left && !match_eof(in, left)) dispatch_line(in, left, argv);
837 break;
839 if(left > chunksize) {
840 dprintf(2, "error: input line length exceeds buffer size\n");
841 goto out;
845 exitcode = 0;
847 out:
849 if(prog_state.pipe_mode) {
850 close_pipes();
853 if(prog_state.delayedflush)
854 write_statefile(prog_state.lineno - 1, prog_state.temp_state);
856 while(prog_state.threads_running) reap_child();
858 if(prog_state.subst_entries) sblist_free(prog_state.subst_entries);
859 if(prog_state.job_infos) sblist_free(prog_state.job_infos);
860 if(prog_state.limits) sblist_free(prog_state.limits);
862 if(prog_state.tempdir)
863 rmdir(prog_state.tempdir);
866 fflush(stdout);
867 fflush(stderr);
870 return exitcode;