modified: SpatialOmicsCoord.py
[GalaxyCodeBases.git] / c_cpp / etc / mbuffer / mbuffer.c
blobf70395e28a2930b06a34d32730155cce3ff43f5e
1 /*
2 * Copyright (C) 2000-2014, Thomas Maier-Komor
4 * This is the source code of mbuffer.
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "config.h"
22 #ifdef S_SPLINT_S
23 typedef int caddr_t;
24 #include <sys/_types.h>
25 #include <cygwin/types.h>
26 #include <cygwin/in.h>
27 #endif
29 #define _GNU_SOURCE 1 /* needed for O_DIRECT */
30 #include <assert.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <float.h>
34 #include <libgen.h>
35 #include <limits.h>
36 #include <math.h>
37 #include <netdb.h>
38 #include <pthread.h>
39 #include <semaphore.h>
40 #include <signal.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <strings.h>
44 #include <string.h>
45 #include <sys/mman.h>
46 #include <sys/socket.h>
47 #include <sys/stat.h>
48 #include <sys/time.h>
49 #include <termios.h>
50 #include <unistd.h>
53 #ifdef __FreeBSD__
54 #include <sys/sysctl.h>
55 #endif
57 #ifdef HAVE_SENDFILE
58 #ifdef HAVE_SENDFILE_H
59 #include <sys/sendfile.h>
60 #endif
61 #endif
63 #ifndef EBADRQC
64 #define EBADRQC EINVAL
65 #endif
67 #ifdef HAVE_LIBMHASH
68 #include <mhash.h>
69 #define HAVE_MD5 1
70 #elif defined HAVE_LIBMD5
71 #include <md5.h>
72 static MD5_CTX MD5ctxt;
73 #define MD5_INIT(ctxt) MD5Init(&ctxt);
74 #define MD5_UPDATE(ctxt,at,num) MD5Update(&ctxt,(unsigned char *)(at),(unsigned int)(num))
75 #define MD5_END(hash,ctxt) MD5Final(hash,&(ctxt))
76 #define HAVE_MD5 1
77 #elif defined HAVE_LIBCRYPTO
78 #include <openssl/md5.h>
79 static MD5_CTX MD5ctxt;
80 #define MD5_INIT(ctxt) MD5_Init(&ctxt);
81 #define MD5_UPDATE(ctxt,at,num) MD5_Update(&ctxt,at,num)
82 #define MD5_END(hash,ctxt) MD5_Final(hash,&(ctxt))
83 #define HAVE_MD5 1
84 #endif
87 * _POSIX_THREADS is only defined if full thread support is available.
88 * We don't need full thread support, so we skip this test...
89 * #ifndef _POSIX_THREADS
90 * #error posix threads are required
91 * #endif
94 #ifndef S_SPLINT_S
95 #ifndef _POSIX_SEMAPHORES
96 #error posix sempahores are required
97 #endif
98 #endif
100 #ifdef O_LARGEFILE
101 #define LARGEFILE O_LARGEFILE
102 #else
103 #define LARGEFILE 0
104 #endif
106 #ifdef O_DIRECT
107 #define DIRECT O_DIRECT
108 #else
109 #define DIRECT 0
110 #endif
113 #include "dest.h"
114 #include "network.h"
115 #include "log.h"
117 /* if this sendfile implementation does not support sending from buffers,
118 disable sendfile support */
119 #ifndef SFV_FD_SELF
120 #ifdef __GNUC__
121 #warning sendfile is unable to send from buffers
122 #endif
123 #undef HAVE_SENDFILE
124 #endif
126 char
127 *Prefix;
128 int
129 In = -1, WatchdogRaised = 0;
130 size_t
131 PrefixLen = 0;
133 static pthread_t
134 Reader, Watchdog;
135 static long
136 Tmp = -1,
137 OptSync = 0;
138 static unsigned long
139 Outsize = 10240, Pause = 0, Timeout = 0;
140 static volatile int
141 Terminate = 0, /* abort execution, because of error or signal */
142 EmptyCount = 0, /* counter incremented when buffer runs empty */
143 FullCount = 0, /* counter incremented when buffer gets full */
144 NumSenders = -1,/* number of sender threads */
145 Done = 0,
146 MainOutOK = 1; /* is the main outputThread still writing or just coordinating senders */
147 static unsigned long long
148 Totalmem = 0, PgSz = 0, NumP = 0, Blocksize = 10240,
149 MaxReadSpeed = 0, MaxWriteSpeed = 0, OutVolsize = 0;
150 static volatile unsigned long long
151 Rest = 0, Numin = 0, Numout = 0;
152 static double
153 StartWrite = 0, StartRead = 1;
154 static char
155 *Tmpfile = 0, **Buffer;
156 static const char
157 *Infile = 0, *AutoloadCmd = 0;
158 static unsigned int
159 AutoloadTime = 0;
160 static int
161 Memlock = 0, TermQ[2],
162 Memmap = 0, Quiet = 0, Status = 1, StatusLog = 1,
163 Hashers = 0, Direct = 0, SetOutsize = 0;
164 static long
165 NumVolumes = 1, /* number of input volumes, 0 for interactive prompting */
166 Finish = -1, /* this is for graceful termination */
167 Numblocks = 512; /* number of buffer blocks */
168 static long long
169 TickTime = 0;
171 static clockid_t
172 ClockSrc = CLOCK_REALTIME;
174 #ifdef __sun
175 #include <synch.h>
176 #define sem_t sema_t
177 #define sem_init(a,b,c) sema_init(a,c,USYNC_THREAD,0)
178 #define sem_post sema_post
179 #define sem_getvalue(a,b) ((*(b) = (a)->count), 0)
180 #if defined(__SunOS_5_8) || defined(__SunOS_5_9)
181 #define sem_wait SemWait
182 int SemWait(sema_t *s)
184 int err;
185 do {
186 err = sema_wait(s);
187 } while (err == EINTR);
188 return err;
190 #else
191 #define sem_wait sema_wait
192 #endif
193 #endif
195 static sem_t Dev2Buf, Buf2Dev;
196 static pthread_cond_t
197 PercLow = PTHREAD_COND_INITIALIZER, /* low watermark */
198 PercHigh = PTHREAD_COND_INITIALIZER, /* high watermark */
199 SendCond = PTHREAD_COND_INITIALIZER;
200 static pthread_mutex_t
201 TermMut = PTHREAD_MUTEX_INITIALIZER, /* prevents statusThread from interfering with request*Volume */
202 LowMut = PTHREAD_MUTEX_INITIALIZER,
203 HighMut = PTHREAD_MUTEX_INITIALIZER,
204 SendMut = PTHREAD_MUTEX_INITIALIZER;
205 static int Terminal = 1, Autoloader = 0;
206 static struct timeval Starttime;
207 static dest_t *Dest = 0;
208 static char *volatile SendAt = 0;
209 static volatile int SendSize = 0, ActSenders = 0;
211 #ifdef __CYGWIN__
212 #include <malloc.h>
213 #undef assert
214 #define assert(x) ((x) || (*(char *) 0 = 1))
215 #endif
219 static int kb2str(char *s, double v)
221 const char *dim = "KMGT", *f;
223 while (v > 10000.0) {
224 v /= 1024.0;
225 ++dim;
226 if (*dim == 0) {
227 v *= 1024.0*1024.0*1024.0*1024.0;
228 break;
231 if (v < 0)
232 f = " ??? ";
233 else if (v < 100)
234 f = "%4.1f %ci";
235 else if (v < 10000) {
236 v = rint(v);
237 f = "%4.0f %ci";
238 } else
239 f = "%5.lg ";
240 return sprintf(s,f,v,*dim);
245 static void summary(unsigned long long numb, int numthreads)
247 int h,m;
248 double secs,av;
249 char buf[256], *msg = buf;
250 struct timeval now;
252 (void) gettimeofday(&now,0);
253 if (Status)
254 *msg++ = '\n';
255 if ((Terminate == 1) && (numthreads == 0))
256 numthreads = 1;
257 secs = now.tv_sec - Starttime.tv_sec + (double) now.tv_usec / 1000000 - (double) Starttime.tv_usec / 1000000;
258 assert(secs > 0);
259 numb >>= 10;
260 av = (double)(numb)/secs*numthreads;
261 h = (int) secs/3600;
262 m = (int) (secs - h * 3600)/60;
263 secs -= m * 60 + h * 3600;
264 if (numthreads > 1)
265 msg += sprintf(msg,"summary: %dx ",numthreads);
266 else
267 msg += sprintf(msg,"summary: ");
268 msg += kb2str(msg,numb);
269 msg += sprintf(msg,"Byte in ");
270 if (h > 0)
271 msg += sprintf(msg,"%dh %02dmin %04.1fsec - average of ",h,m,secs);
272 else if (m > 0)
273 msg += sprintf(msg,"%2dmin %04.1fsec - average of ",m,secs);
274 else
275 msg += sprintf(msg,"%4.1fsec - average of ",secs);
276 msg += kb2str(msg,av);
277 msg += sprintf(msg,"B/s");
278 if (EmptyCount != 0)
279 msg += sprintf(msg,", %dx empty",EmptyCount);
280 if (FullCount != 0)
281 msg += sprintf(msg,", %dx full",FullCount);
282 *msg++ = '\n';
283 *msg = '\0';
284 if (Log != STDERR_FILENO)
285 (void) write(Log,buf,msg-buf);
286 if (Status)
287 (void) write(STDERR_FILENO,buf,msg-buf);
292 static void cancelAll(void)
294 dest_t *d = Dest;
295 do {
296 (void) pthread_cancel(d->thread);
297 if (d->result == 0)
298 d->result = "canceled";
299 d = d->next;
300 } while (d);
301 if (Status)
302 (void) pthread_cancel(Reader);
307 static RETSIGTYPE sigHandler(int signr)
309 switch (signr) {
310 case SIGHUP:
311 case SIGINT:
312 ErrorOccurred = 1;
313 Terminate = 1;
314 (void) close(In);
315 if (TermQ[1] != -1) {
316 (void) write(TermQ[1],"0",1);
318 if (StartWrite > 0)
319 (void) pthread_cond_signal(&PercHigh);
320 if (StartRead < 1)
321 (void) pthread_cond_signal(&PercLow);
322 break;
323 default:
324 (void) raise(SIGABRT);
330 /* Thread-safe replacement for usleep. Argument must be a whole
331 * number of microseconds to sleep.
333 static int mt_usleep(unsigned long sleep_usecs)
335 struct timespec tv;
336 tv.tv_sec = sleep_usecs / 1000000;
337 tv.tv_nsec = (sleep_usecs % 1000000) * 1000;
339 do {
340 /* Sleep for the time specified in tv. If interrupted by a
341 * signal, place the remaining time left to sleep back into tv.
343 if (0 == nanosleep(&tv, &tv))
344 return 0;
345 } while (errno == EINTR);
346 return -1;
351 static void *watchdogThread(void *ignored)
353 unsigned long ni = Numin, no = Numout;
354 for (;;) {
355 sleep(Timeout);
356 if ((ni == Numin) && (Finish == -1)) {
357 errormsg("watchdog timeout: input stalled; sending SIGINT\n");
358 WatchdogRaised = 1;
359 kill(getpid(),SIGINT);
361 if (no == Numout) {
362 errormsg("watchdog timeout: output stalled; sending SIGINT\n");
363 WatchdogRaised = 1;
364 kill(getpid(),SIGINT);
366 ni = Numin;
367 no = Numout;
369 #ifdef __GNUC__
370 return 0; // suppresses a gcc warning
371 #endif
375 static void statusThread(void)
377 struct timeval last, now;
378 double in = 0, out = 0, total, diff, fill;
379 unsigned long long lin = 0, lout = 0;
380 int unwritten = 1; /* assumption: initially there is at least one unwritten block */
381 fd_set readfds;
382 struct timeval timeout = {0,200000};
383 int maxfd = 0;
385 last = Starttime;
386 #ifdef __alpha
387 (void) mt_usleep(1000); /* needed on alpha (stderr fails with fpe on nan) */
388 #endif
389 if (TermQ[0] != -1)
390 maxfd = TermQ[0]+1;
391 while ((Numin == 0) && (Terminate == 0) && (Finish == -1)) {
392 timeout.tv_sec = 0;
393 timeout.tv_usec = 200000;
394 FD_ZERO(&readfds);
395 if (TermQ[0] != -1)
396 FD_SET(TermQ[0],&readfds);
397 switch (select(maxfd,&readfds,0,0,&timeout)) {
398 case 0: continue;
399 case 1: break;
400 case -1:
401 if (errno == EINTR)
402 break;
403 default: abort();
406 while (!Done) {
407 int err,numsender;
408 ssize_t nw = 0;
409 char buf[256], *b = buf;
411 timeout.tv_sec = 0;
412 timeout.tv_usec = 500000;
413 FD_ZERO(&readfds);
414 if (TermQ[0] != -1)
415 FD_SET(TermQ[0],&readfds);
416 err = select(maxfd,&readfds,0,0,&timeout);
417 switch (err) {
418 case 0: break;
419 case 1: return;
420 case -1:
421 if (errno == EINTR)
422 break;
423 default: abort();
425 (void) gettimeofday(&now,0);
426 diff = now.tv_sec - last.tv_sec + (double) (now.tv_usec - last.tv_usec) / 1000000;
427 err = pthread_mutex_lock(&TermMut);
428 assert(0 == err);
429 err = sem_getvalue(&Buf2Dev,&unwritten);
430 assert(0 == err);
431 fill = (double)unwritten / (double)Numblocks * 100.0;
432 in = (double)(((Numin - lin) * Blocksize) >> 10);
433 in /= diff;
434 out = (double)(((Numout - lout) * Blocksize) >> 10);
435 out /= diff;
436 lin = Numin;
437 lout = Numout;
438 last = now;
439 total = (double)((Numout * Blocksize) >> 10);
440 fill = (fill < 0.0) ? 0.0 : fill;
441 b += sprintf(b,"\rin @ ");
442 b += kb2str(b,in);
443 numsender = NumSenders + MainOutOK - Hashers;
444 b += sprintf(b,"B/s, out @ ");
445 b += kb2str(b, out * numsender);
446 if (numsender != 1)
447 b += sprintf(b,"B/s, %d x ",numsender);
448 else
449 b += sprintf(b,"B/s, ");
450 b += kb2str(b,total);
451 b += sprintf(b,"B total, buffer %3.0f%% full",fill);
452 if (Quiet == 0) {
453 #ifdef NEED_IO_INTERLOCK
454 if (Log == STDERR_FILENO) {
455 int e;
456 e = pthread_mutex_lock(&LogMut);
457 assert(e == 0);
458 nw = write(STDERR_FILENO,buf,strlen(buf));
459 e = pthread_mutex_unlock(&LogMut);
460 assert(e == 0);
461 } else
462 #endif
463 nw = write(STDERR_FILENO,buf,strlen(buf));
465 if ((StatusLog != 0) && (Log != STDERR_FILENO))
466 infomsg("%s\n",buf+1);
467 err = pthread_mutex_unlock(&TermMut);
468 assert(0 == err);
469 if (nw == -1) /* stop trying to print status messages after a write error */
470 break;
476 static inline long long timediff(struct timespec *restrict t1, struct timespec *restrict t2)
478 long long tdiff;
479 tdiff = (t1->tv_sec - t2->tv_sec) * 1000000;
480 tdiff += (t1->tv_nsec - t2->tv_nsec) / 1000;
481 if (tdiff < 0)
482 tdiff = 0;
483 return tdiff;
488 static long long enforceSpeedLimit(unsigned long long limit, long long num, struct timespec *last)
490 struct timespec now;
491 long long tdiff;
492 double dt;
493 long self = (long) pthread_self();
495 num += Blocksize;
496 if (num < 0) {
497 debugmsg("enforceSpeedLimit(%lld,%lld): thread %ld\n",limit,num,self);
498 return num;
500 (void) clock_gettime(ClockSrc,&now);
501 tdiff = timediff(&now,last);
502 dt = (double)tdiff * 1E-6;
503 if (((double)num/dt) > (double)limit) {
504 double req = (double)num/limit - dt;
505 long long w = (long long) (req * 1E6);
506 if (w >= TickTime) {
507 long long slept, ret;
508 (void) mt_usleep(w);
509 (void) clock_gettime(ClockSrc,last);
510 slept = timediff(last,&now);
511 ret = -(long long)((double)limit * (double)(slept-w) * 1E-6);
512 debugmsg("thread %ld: slept for %lld usec (planned for %lld), ret = %lld\n",self,slept,w,ret);
513 return ret;
514 } else {
515 debugmsg("thread %ld: request for sleeping %lld usec delayed\n",self,w);
517 * Sleeping now would cause too much of a slowdown. So
518 * we defer this sleep until the sleeping time is
519 * longer than the tick time. Like this we can stay as
520 * close to the speed limit as possible.
522 return num;
525 debugmsg("thread %ld: %lld/%g (%g) <= %g\n",self,num,dt,num/dt,(double)limit);
526 return num;
531 static int promptInteractive(unsigned at, unsigned num)
533 static const char prompt[] = "\nContinue with next volume? Press 'y' to continue or 'n' to finish...";
534 static const char contmsg[] = "\nyes - continuing with next volume...\n";
535 static const char donemsg[] = "\nno - input done, waiting for output to finish...\n";
536 int err;
538 err = pthread_mutex_lock(&TermMut);
539 assert(0 == err);
540 if (-1 == write(STDERR_FILENO,prompt,sizeof(prompt))) {
541 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno));
542 Terminate = 1;
543 pthread_exit((void *) -1);
545 for (;;) {
546 char c = 0;
547 if (-1 == read(STDERR_FILENO,&c,1) && (errno != EINTR)) {
548 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno));
549 Terminate = 1;
550 pthread_exit((void *) -1);
552 debugmsg("prompt input %c\n",c);
553 switch (c) {
554 case 'n':
555 case 'N':
556 Rest = num;
557 (void) write(STDERR_FILENO,donemsg,sizeof(donemsg));
558 err = pthread_mutex_lock(&HighMut);
559 assert(err == 0);
560 err = sem_post(&Buf2Dev);
561 assert(err == 0);
562 err = pthread_cond_signal(&PercHigh);
563 assert(err == 0);
564 err = pthread_mutex_unlock(&HighMut);
565 assert(err == 0);
566 err = pthread_mutex_unlock(&TermMut);
567 assert(0 == err);
568 Finish = at;
569 if (Status)
570 pthread_exit(0);
571 return 0;
572 case 'y':
573 case 'Y':
574 (void) write(STDERR_FILENO,contmsg,sizeof(contmsg));
575 err = pthread_mutex_unlock(&TermMut);
576 assert(0 == err);
577 return 1;
578 default:;
585 static int requestInputVolume(unsigned at, unsigned num)
587 static struct timeval volstart = {0,0};
588 const char *cmd;
589 struct timeval now;
590 double diff;
591 unsigned min,hr;
592 char cmd_buf[15+strlen(Infile)];
594 debugmsg("requesting new volume for input\n");
595 (void) gettimeofday(&now,0);
596 if (volstart.tv_sec)
597 diff = now.tv_sec - volstart.tv_sec + (double) (now.tv_usec - volstart.tv_usec) * 1E-6;
598 else
599 diff = now.tv_sec - Starttime.tv_sec + (double) (now.tv_usec - Starttime.tv_usec) * 1E-6;
600 if (diff > 3600) {
601 hr = (unsigned) (diff / 3600);
602 diff -= hr * 3600;
603 min = (unsigned) (diff / 60);
604 diff -= min * 60;
605 infomsg("time for reading volume: %u:%02u:%02f\n",hr,min,diff);
606 } else if (diff > 60) {
607 min = (unsigned) (diff / 60);
608 diff -= min * 60;
609 infomsg("time for reading volume: %02u:%02f\n",min,diff);
610 } else
611 infomsg("time for reading volume: %02fsec.\n",diff);
612 if (-1 == close(In))
613 errormsg("error closing input: %s\n",strerror(errno));
614 do {
615 if ((Autoloader) && (Infile)) {
616 int ret;
617 if (AutoloadCmd) {
618 cmd = AutoloadCmd;
619 } else {
620 (void) snprintf(cmd_buf, sizeof(cmd_buf), "mt -f %s offline", Infile);
621 cmd = cmd_buf;
623 infomsg("requesting new input volume with command '%s'\n",cmd);
624 ret = system(cmd);
625 if (0 < ret) {
626 warningmsg("error running \"%s\" to change volume in autoloader: exitcode %d\n",cmd,ret);
627 Terminate = 1;
628 pthread_exit((void *) 0);
629 } else if (0 > ret) {
630 errormsg("error starting \"%s\" to change volume in autoloader: %s\n", cmd, strerror(errno));
631 Terminate = 1;
632 pthread_exit((void *) -1);
634 if (AutoloadTime) {
635 infomsg("waiting for drive to get ready...\n");
636 (void) sleep(AutoloadTime);
638 } else {
639 if (0 == promptInteractive(at,num))
640 return 0;
642 In = open(Infile, O_RDONLY | LARGEFILE | Direct);
643 if ((-1 == In) && (errno == EINVAL))
644 In = open(Infile, O_RDONLY | Direct);
645 if (-1 == In)
646 errormsg("could not reopen input: %s\n",strerror(errno));
647 #ifdef __sun
648 if (-1 == directio(In,DIRECTIO_ON))
649 infomsg("direct I/O hinting failed for input: %s\n",strerror(errno));
650 #endif
651 } while (In == -1);
652 (void) gettimeofday(&volstart,0);
653 diff = volstart.tv_sec - now.tv_sec + (double) (volstart.tv_usec - now.tv_usec) * 1E-6;
654 infomsg("tape-change took %fsec. - continuing with next volume\n",diff);
655 NumVolumes--;
656 if (Terminal && ! Autoloader) {
657 char msg[] = "\nOK - continuing...\n";
658 (void) write(STDERR_FILENO,msg,sizeof(msg));
660 return 1;
665 static void releaseLock(void *l)
667 int err = pthread_mutex_unlock((pthread_mutex_t *)l);
668 assert(err == 0);
673 static void *inputThread(void *ignored)
675 int fill = 0;
676 unsigned long long num;
677 int at = 0;
678 long long xfer = 0;
679 const double startread = StartRead, startwrite = StartWrite;
680 struct timespec last;
681 #ifndef __sun
682 int maxfd = TermQ[0] > In ? TermQ[0] + 1 : In + 1;
684 if (Status != 0)
685 assert(TermQ[0] != -1);
686 #endif
687 (void) clock_gettime(ClockSrc,&last);
688 assert(ignored == 0);
689 infomsg("inputThread: starting with threadid %ld...\n",(long)pthread_self());
690 for (;;) {
691 int err;
693 if (startread < 1) {
694 err = pthread_mutex_lock(&LowMut);
695 assert(err == 0);
696 err = sem_getvalue(&Buf2Dev,&fill);
697 assert(err == 0);
698 if (fill == Numblocks - 1) {
699 debugmsg("inputThread: buffer full, waiting for it to drain.\n");
700 pthread_cleanup_push(releaseLock,&LowMut);
701 err = pthread_cond_wait(&PercLow,&LowMut);
702 assert(err == 0);
703 pthread_cleanup_pop(0);
704 ++FullCount;
705 debugmsg("inputThread: low watermark reached, continuing...\n");
707 err = pthread_mutex_unlock(&LowMut);
708 assert(err == 0);
710 if (Terminate) { /* for async termination requests */
711 debugmsg("inputThread: terminating early upon request...\n");
712 if (-1 == close(In))
713 errormsg("error closing input: %s\n",strerror(errno));
714 if (Status)
715 pthread_exit((void *)1);
716 return (void *) 1;
718 err = sem_wait(&Dev2Buf); /* Wait for one or more buffer blocks to be free */
719 assert(err == 0);
720 num = 0;
721 do {
722 int in;
723 #ifndef __sun
724 if (Status != 0) {
725 fd_set readfds;
726 FD_ZERO(&readfds);
727 FD_SET(TermQ[0],&readfds);
728 FD_SET(In,&readfds);
729 err = select(maxfd,&readfds,0,0,0);
730 debugiomsg("inputThread: select(%d, {%d,%d}, 0, 0, 0) = %d\n", maxfd,In,TermQ[0],err);
731 assert((err > 0) || (errno == EBADF));
732 if (FD_ISSET(TermQ[0],&readfds))
733 return (void *)-1;
734 assert(FD_ISSET(In,&readfds));
736 #endif
737 in = read(In,Buffer[at] + num,Blocksize - num);
738 debugiomsg("inputThread: read(In, Buffer[%d] + %llu, %llu) = %d\n", at, num, Blocksize - num, in);
739 if (in > 0) {
740 num += in;
741 } else if ((0 == in) && (Terminal||Autoloader) && (NumVolumes != 1)) {
742 if (0 == requestInputVolume(at,num))
743 return 0;
744 } else if ((-1 == in) && (errno == EIO) && (Terminal||Autoloader) && (NumVolumes != 1)) {
745 requestInputVolume(at,num);
746 } else if (in <= 0) {
747 /* error or end-of-file */
748 if ((-1 == in) && (Terminate == 0))
749 errormsg("inputThread: error reading at offset 0x%llx: %s\n",Numin*Blocksize,strerror(errno));
750 Rest = num;
751 Finish = at;
752 debugmsg("inputThread: last block has %llu bytes\n",num);
753 err = pthread_mutex_lock(&HighMut);
754 assert(err == 0);
755 err = sem_post(&Buf2Dev);
756 assert(err == 0);
757 err = pthread_cond_signal(&PercHigh);
758 assert(err == 0);
759 err = pthread_mutex_unlock(&HighMut);
760 assert(err == 0);
761 infomsg("inputThread: exiting...\n");
762 if (Status)
763 pthread_exit((void *) in);
764 return (void *) in;
766 } while (num < Blocksize);
767 if (MaxReadSpeed)
768 xfer = enforceSpeedLimit(MaxReadSpeed,xfer,&last);
769 err = sem_post(&Buf2Dev);
770 assert(err == 0);
771 if (startwrite > 0) {
772 err = pthread_mutex_lock(&HighMut);
773 assert(err == 0);
774 err = sem_getvalue(&Buf2Dev,&fill);
775 assert(err == 0);
776 if (((double) fill / (double) Numblocks) + DBL_EPSILON >= startwrite) {
777 err = pthread_cond_signal(&PercHigh);
778 assert(err == 0);
780 err = pthread_mutex_unlock(&HighMut);
781 assert(err == 0);
783 if (++at == Numblocks)
784 at = 0;
785 Numin++;
791 static inline int syncSenders(char *b, int s)
793 static volatile int size = 0, skipped = 0;
794 static char *volatile buf = 0;
795 int err;
797 err = pthread_mutex_lock(&SendMut);
798 assert(err == 0);
799 if (b) {
800 buf = b;
801 size = s;
803 if (s < 0)
804 --NumSenders;
805 if (--ActSenders) {
806 debugiomsg("syncSenders(%p,%d): ActSenders = %d\n",b,s,ActSenders);
807 pthread_cleanup_push(releaseLock,&SendMut);
808 err = pthread_cond_wait(&SendCond,&SendMut);
809 assert(err == 0);
810 pthread_cleanup_pop(1);
811 debugiomsg("syncSenders(): continue\n");
812 return 0;
813 } else {
814 ActSenders = NumSenders + 1;
815 assert((buf != 0) || Terminate);
816 SendAt = buf;
817 SendSize = size;
818 buf = 0;
819 if (skipped) {
820 // after the first time, always give a buffer free after sync
821 err = sem_post(&Dev2Buf);
822 assert(err == 0);
823 } else {
824 // the first time no buffer has been given free
825 skipped = 1;
827 err = pthread_mutex_unlock(&SendMut);
828 assert(err == 0);
829 debugiomsg("syncSenders(): send %d@%p, BROADCAST\n",SendSize,SendAt);
830 err = pthread_cond_broadcast(&SendCond);
831 assert(err == 0);
832 return 1;
838 static inline void terminateSender(int fd, dest_t *d, int ret)
840 debugmsg("terminating operation on %s\n",d->arg);
841 if (-1 != fd) {
842 int err;
843 infomsg("syncing %s...\n",d->arg);
845 err = fsync(fd);
846 while ((err != 0) && (errno == EINTR));
847 if (err != 0) {
848 if ((errno == EINVAL) || (errno == EBADRQC)) {
849 infomsg("syncing unsupported on %s: omitted.\n",d->arg);
850 } else {
851 warningmsg("unable to sync %s: %s\n",d->arg,strerror(errno));
854 if (-1 == close(fd))
855 errormsg("error closing file %s: %s\n",d->arg,strerror(errno));
857 if (ret != 0) {
858 ret = syncSenders(0,-1);
859 debugmsg("terminateSender(%s): sendSender(0,-1) = %d\n",d->arg,ret);
861 pthread_exit((void *) ret);
866 static void *senderThread(void *arg)
868 unsigned long long outsize = Blocksize;
869 dest_t *dest = (dest_t *)arg;
870 int out = dest->fd;
871 #ifdef HAVE_SENDFILE
872 int sendout = 1;
873 #endif
874 #ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
875 struct stat st;
877 debugmsg("sender(%s): checking output device...\n",dest->arg);
878 if (-1 == fstat(out,&st))
879 warningmsg("could not stat output %s: %s\n",dest->arg,strerror(errno));
880 else if (S_ISBLK(st.st_mode) || S_ISCHR(st.st_mode)) {
881 infomsg("blocksize is %d bytes on output device\n",st.st_blksize);
882 if ((Blocksize < st.st_blksize) || (Blocksize % st.st_blksize != 0)) {
883 warningmsg("Blocksize should be a multiple of the blocksize of the output device!\n"
884 "This can cause problems with some device/OS combinations...\n"
885 "Blocksize on output device %s is %d (transfer block size is %lld)\n", dest->arg, st.st_blksize, Blocksize);
886 if (SetOutsize) {
887 errormsg("unable to set output blocksize\n");
888 dest->result = strerror(errno);
889 terminateSender(out,dest,1);
891 } else {
892 if (SetOutsize) {
893 infomsg("setting output blocksize to %d\n",st.st_blksize);
894 outsize = st.st_blksize;
897 } else
898 infomsg("no device on output stream %s\n",dest->arg);
899 #endif
900 debugmsg("sender(%s): starting...\n",dest->arg);
901 for (;;) {
902 int size, num = 0;
903 (void) syncSenders(0,0);
904 size = SendSize;
905 if (0 == size) {
906 debugmsg("senderThread(\"%s\"): done.\n",dest->arg);
907 terminateSender(out,dest,0);
908 return 0; /* for lint */
910 if (Terminate) {
911 infomsg("senderThread(\"%s\"): terminating early upon request...\n",dest->arg);
912 dest->result = "canceled";
913 terminateSender(out,dest,1);
915 do {
916 unsigned long long rest = size - num;
917 int ret;
918 assert(size >= num);
919 #ifdef HAVE_SENDFILE
920 if (sendout) {
921 off_t baddr = (off_t) (SendAt+num);
922 unsigned long long n = SetOutsize ? (rest > Outsize ? (rest/Outsize)*Outsize : rest) : rest;
923 ret = sendfile(out,SFV_FD_SELF,&baddr,n);
924 debugiomsg("sender(%s): sendfile(%d, SFV_FD_SELF, &%p, %llu) = %d\n", dest->arg, dest->fd, (void*)baddr, n, ret);
925 if ((ret == -1) && ((errno == EINVAL) || (errno == EOPNOTSUPP))) {
926 sendout = 0;
927 debugmsg("sender(%s): sendfile unsupported - falling back to write\n", dest->arg);
928 continue;
930 } else
931 #endif
933 char *baddr = SendAt+num;
934 ret = write(out,baddr,rest > outsize ? outsize :rest);
935 debugiomsg("sender(%s): writing %llu@0x%p: ret = %d\n",dest->arg,rest,(void*)baddr,ret);
937 if (-1 == ret) {
938 errormsg("error writing to %s: %s\n",dest->arg,strerror(errno));
939 dest->result = strerror(errno);
940 terminateSender(out,dest,1);
942 num += ret;
943 } while (num != size);
949 static void *hashThread(void *arg)
951 #ifdef HAVE_MD5
952 dest_t *dest = (dest_t *) arg;
953 #ifdef HAVE_LIBMHASH
954 int algo = dest->fd;
956 MHASH ctxt = mhash_init(algo);
957 assert(ctxt != MHASH_FAILED);
958 #else
959 MD5_INIT(MD5ctxt);
960 #endif
961 debugmsg("hashThread(): starting...\n");
962 for (;;) {
963 int size;
965 (void) syncSenders(0,0);
966 size = SendSize;
967 if (0 == size) {
968 size_t ds;
969 unsigned char hashvalue[128];
970 char *msg, *m;
971 const char *an;
972 int i;
974 msg = malloc(300);
975 m = msg;
976 debugmsg("hashThread(): done.\n");
977 #ifdef HAVE_LIBMHASH
978 mhash_deinit(ctxt,hashvalue);
979 an = (const char *) mhash_get_hash_name_static(algo);
980 ds = mhash_get_block_size(algo);
981 #else
982 MD5_END(hashvalue,MD5ctxt);
983 an = "md5";
984 ds = 16;
985 #endif
986 assert(sizeof(hashvalue) >= ds);
987 m += sprintf(m,"%s hash: ",an);
988 for (i = 0; i < ds; ++i)
989 m += sprintf(m,"%02x",(unsigned int)hashvalue[i]);
990 *m++ = '\n';
991 *m = 0;
992 dest->result = msg;
993 pthread_exit((void *) msg);
994 return 0; /* for lint */
996 if (Terminate) {
997 (void) syncSenders(0,-1);
998 infomsg("hashThread(): terminating early upon request...\n");
999 pthread_exit((void *) 0);
1001 debugiomsg("hashThread(): hashing %d@0x%p\n",size,(void*)SendAt);
1002 #ifdef HAVE_LIBMHASH
1003 mhash(ctxt,SendAt,size);
1004 #else
1005 MD5_UPDATE(MD5ctxt,SendAt,size);
1006 #endif
1008 #endif
1012 static int requestOutputVolume(int out, const char *outfile)
1014 static struct timeval volstart = {0,0};
1015 struct timeval now;
1016 double diff;
1017 unsigned min,hr;
1019 if (!outfile) {
1020 errormsg("End of volume, but not end of input:\n"
1021 "Output file must be given (option -o) for multi volume support!\n");
1022 return -1;
1024 infomsg("end of volume - last block on volume: %lld\n",Numout);
1025 (void) gettimeofday(&now,0);
1026 if (volstart.tv_sec)
1027 diff = now.tv_sec - volstart.tv_sec + (double) (now.tv_usec - volstart.tv_usec) * 1E-6;
1028 else
1029 diff = now.tv_sec - Starttime.tv_sec + (double) (now.tv_usec - Starttime.tv_usec) * 1E-6;
1030 if (diff > 3600) {
1031 hr = (unsigned) (diff / 3600);
1032 diff -= hr * 3600;
1033 min = (unsigned) (diff / 60);
1034 diff -= min * 60;
1035 infomsg("time for writing volume: %u:%02u:%02f\n",hr,min,diff);
1036 } else if (diff > 60) {
1037 min = (unsigned) (diff / 60);
1038 diff -= min * 60;
1039 infomsg("time for writing volume: %02u:%02f\n",min,diff);
1040 } else
1041 infomsg("time for writing volume: %02fsec.\n",diff);
1042 if (-1 == close(out))
1043 errormsg("error closing output %s: %s\n",outfile,strerror(errno));
1044 do {
1045 mode_t mode;
1046 if (Autoloader) {
1047 const char default_cmd[] = "mt -f %s offline";
1048 char cmd_buf[sizeof(default_cmd)+strlen(outfile)];
1049 const char *cmd = AutoloadCmd;
1050 int err;
1052 if (cmd == 0) {
1053 (void) snprintf(cmd_buf, sizeof(cmd_buf), default_cmd, Infile);
1054 cmd = cmd_buf;
1056 infomsg("requesting new output volume with command '%s'\n",cmd);
1057 err = system(cmd);
1058 if (0 < err) {
1059 errormsg("error running \"%s\" to change volume in autoloader - exitcode %d\n", cmd, err);
1060 Autoloader = 0;
1061 return -1;
1062 } else if (0 > err) {
1063 errormsg("error starting \"%s\" to change volume in autoloader: %s\n", cmd, strerror(errno));
1064 Autoloader = 0;
1065 return -1;
1067 if (AutoloadTime) {
1068 infomsg("waiting for drive to get ready...\n");
1069 (void) sleep(AutoloadTime);
1071 } else {
1072 int err;
1073 char c = 0, msg[] = "\nvolume full - insert new media and press return when ready...\n";
1074 if (Terminal == 0) {
1075 errormsg("End of volume, but not end of input.\n"
1076 "Specify an autoload command, if you are working without terminal.\n");
1077 return -1;
1079 err = pthread_mutex_lock(&TermMut);
1080 assert(0 == err);
1081 if (-1 == write(STDERR_FILENO,msg,sizeof(msg))) {
1082 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno));
1083 return -1;
1085 do {
1086 if (-1 == read(STDERR_FILENO,&c,1) && (errno != EINTR)) {
1087 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno));
1088 return -1;
1090 } while (c != '\n');
1091 err = pthread_mutex_unlock(&TermMut);
1092 assert(0 == err);
1094 mode = O_WRONLY|O_TRUNC|OptSync|LARGEFILE|Direct;
1095 if (strncmp(outfile,"/dev/",5))
1096 mode |= O_CREAT;
1097 out = open(outfile,mode,0666);
1098 if (-1 == out)
1099 errormsg("error reopening output file: %s\n",strerror(errno));
1100 #ifdef __sun
1101 if (-1 == directio(out,DIRECTIO_ON))
1102 infomsg("direct I/O hinting failed for output: %s\n",strerror(errno));
1103 #endif
1104 } while (-1 == out);
1105 (void) gettimeofday(&volstart,0);
1106 diff = volstart.tv_sec - now.tv_sec + (double) (volstart.tv_usec - now.tv_usec) * 1E-6;
1107 infomsg("tape-change took %fsec. - continuing with next volume\n",diff);
1108 if (Terminal && ! Autoloader) {
1109 char msg[] = "\nOK - continuing...\n";
1110 (void) write(STDERR_FILENO,msg,sizeof(msg));
1112 return out;
1117 static void terminateOutputThread(dest_t *d, int status)
1119 int err;
1121 infomsg("outputThread: syncing %s...\n",d->arg);
1123 err = fsync(d->fd);
1124 while ((err != 0) && (errno == EINTR));
1125 if (err != 0) {
1126 if ((errno == EINVAL) || (errno == EBADRQC)) {
1127 infomsg("syncing unsupported on %s: omitted.\n",d->arg);
1128 } else {
1129 warningmsg("unable to sync %s: %s\n",d->arg,strerror(errno));
1132 infomsg("outputThread: finished - exiting...\n");
1133 if (-1 == close(d->fd))
1134 errormsg("error closing %s: %s\n",d->arg,strerror(errno));
1135 if (TermQ[1] != -1) {
1136 err = write(TermQ[1],"0",1);
1137 if (err == -1)
1138 errormsg("error writing to termination queue: %s\n",strerror(errno));
1140 if (status) {
1141 (void) sem_post(&Dev2Buf);
1142 (void) pthread_cond_broadcast(&SendCond);
1144 Done = 1;
1145 pthread_exit((void *)status);
1150 static void *outputThread(void *arg)
1152 dest_t *dest = (dest_t *) arg;
1153 unsigned at = 0;
1154 int fill = 0, haderror = 0, out, multipleSenders;
1155 #ifdef HAVE_SENDFILE
1156 int sendout = 1;
1157 #endif
1158 const double startwrite = StartWrite, startread = StartRead;
1159 unsigned long long blocksize = Blocksize;
1160 long long xfer = 0;
1161 struct timespec last;
1163 assert(NumSenders >= 0);
1164 if (dest->next) {
1165 int ret;
1166 dest_t *d = dest->next;
1167 debugmsg("NumSenders = %d\n",NumSenders);
1168 ActSenders = NumSenders + 1;
1169 ret = pthread_mutex_init(&SendMut,0);
1170 assert(ret == 0);
1171 ret = pthread_cond_init(&SendCond,0);
1172 assert(ret == 0);
1173 do {
1174 if (d->arg == 0) {
1175 debugmsg("creating hash thread with algorithm %s\n",d->name);
1176 ret = pthread_create(&d->thread,0,hashThread,d);
1177 assert(ret == 0);
1178 } else if (d->fd != -1) {
1179 debugmsg("creating sender for %s\n",d->arg);
1180 ret = pthread_create(&d->thread,0,senderThread,d);
1181 assert(ret == 0);
1182 } else {
1183 debugmsg("outputThread: ignoring destination %s\n",d->arg);
1184 d->name = 0;
1186 d = d->next;
1187 } while (d);
1189 multipleSenders = (NumSenders > 0);
1190 dest->result = 0;
1191 out = dest->fd;
1192 if (startwrite > 0) {
1193 int err;
1194 err = pthread_mutex_lock(&HighMut);
1195 assert(err == 0);
1196 debugmsg("outputThread: delaying start until buffer reaches high watermark\n");
1197 pthread_cleanup_push(releaseLock,&HighMut);
1198 err = pthread_cond_wait(&PercHigh,&HighMut);
1199 assert(err == 0);
1200 pthread_cleanup_pop(0);
1201 debugmsg("outputThread: high watermark reached, starting...\n");
1202 err = pthread_mutex_unlock(&HighMut);
1203 assert(err == 0);
1204 } else
1205 infomsg("outputThread: starting output on %s...\n",dest->arg);
1206 /* initialize last to 0, because we don't want to wait initially */
1207 (void) clock_gettime(ClockSrc,&last);
1208 for (;;) {
1209 unsigned long long rest = blocksize;
1210 int err;
1212 if ((startwrite > 0) && (fill <= 0)) {
1213 assert(fill == 0);
1214 err = pthread_mutex_lock(&HighMut);
1215 assert(err == 0);
1216 err = sem_getvalue(&Buf2Dev,&fill);
1217 assert(err == 0);
1218 if (fill == 0) {
1219 debugmsg("outputThread: buffer empty, waiting for it to fill\n");
1220 pthread_cleanup_push(releaseLock,&HighMut);
1221 err = pthread_cond_wait(&PercHigh,&HighMut);
1222 assert(err == 0);
1223 pthread_cleanup_pop(0);
1224 ++EmptyCount;
1225 debugmsg("outputThread: high watermark reached, continuing...\n");
1226 (void) clock_gettime(ClockSrc,&last);
1228 err = pthread_mutex_unlock(&HighMut);
1229 assert(err == 0);
1230 } else
1231 --fill;
1232 err = sem_wait(&Buf2Dev);
1233 assert(err == 0);
1234 if (Terminate) {
1235 infomsg("outputThread: terminating upon termination request...\n");
1236 dest->result = "canceled";
1237 terminateOutputThread(dest,1);
1239 if (Finish == at) {
1240 err = sem_getvalue(&Buf2Dev,&fill);
1241 assert(err == 0);
1242 if ((fill == 0) && (0 == Rest)) {
1243 if (multipleSenders)
1244 (void) syncSenders((char*)0xdeadbeef,0);
1245 infomsg("outputThread: finished - exiting...\n");
1246 terminateOutputThread(dest,haderror);
1247 } else {
1248 blocksize = rest = Rest;
1249 debugmsg("outputThread: last block has %llu bytes\n",(unsigned long long)Rest);
1252 if (multipleSenders)
1253 (void) syncSenders(Buffer[at],blocksize);
1254 /* switch output volume if -D <size> has been reached */
1255 if ( (OutVolsize != 0) && (Numout > 0) && (Numout % (OutVolsize/Blocksize)) == 0 ) {
1256 /* Sleep to let status thread "catch up" so that the displayed total is a multiple of OutVolsize */
1257 (void) mt_usleep(500000);
1258 out = requestOutputVolume(out,dest->name);
1259 if (out == -1) {
1260 haderror = 1;
1261 dest->result = strerror(errno);
1264 do {
1265 /* use Outsize which could be the blocksize of the device (option -d) */
1266 unsigned long long n = rest > Outsize ? Outsize : rest;
1267 int num;
1268 if (haderror) {
1269 if (NumSenders == 0)
1270 Terminate = 1;
1271 num = (int)rest;
1272 } else
1273 #ifdef HAVE_SENDFILE
1274 if (sendout) {
1275 off_t baddr = (off_t) (Buffer[at] + blocksize - rest);
1276 num = sendfile(out,SFV_FD_SELF,&baddr,n);
1277 debugiomsg("outputThread: sendfile(%d, SFV_FD_SELF, &(Buffer[%d] + %llu), %llu) = %d\n", out, at, blocksize - rest, n, num);
1278 if ((num == -1) && ((errno == EOPNOTSUPP) || (errno == EINVAL))) {
1279 infomsg("sendfile not supported - falling back to write...\n");
1280 sendout = 0;
1281 continue;
1283 } else
1284 #endif
1286 num = write(out,Buffer[at] + blocksize - rest, n);
1287 debugiomsg("outputThread: writing %lld@0x%p: ret = %d\n", n, Buffer[at] + blocksize - rest, num);
1289 if (Terminal||Autoloader) {
1290 if (((-1 == num) && ((errno == ENOMEM) || (errno == ENOSPC)))
1291 || (0 == num)) {
1292 /* request a new volume */
1293 out = requestOutputVolume(out,dest->name);
1294 if (out == -1)
1295 haderror = 1;
1296 continue;
1298 } else if (-1 == num) {
1299 dest->result = strerror(errno);
1300 errormsg("outputThread: error writing to %s at offset 0x%llx: %s\n",dest->arg,(long long)Blocksize*Numout+blocksize-rest,strerror(errno));
1301 MainOutOK = 0;
1302 if (NumSenders == 0) {
1303 debugmsg("outputThread: terminating...\n");
1304 Terminate = 1;
1305 err = sem_post(&Dev2Buf);
1306 assert(err == 0);
1307 terminateOutputThread(dest,1);
1309 debugmsg("outputThread: %d senders remaining - continuing...\n",NumSenders);
1310 haderror = 1;
1312 rest -= num;
1313 } while (rest > 0);
1314 if (multipleSenders == 0) {
1315 err = sem_post(&Dev2Buf);
1316 assert(err == 0);
1318 if (MaxWriteSpeed)
1319 xfer = enforceSpeedLimit(MaxWriteSpeed,xfer,&last);
1320 if (Pause)
1321 (void) mt_usleep(Pause);
1322 if (Finish == at) {
1323 err = sem_getvalue(&Buf2Dev,&fill);
1324 assert(err == 0);
1325 if (fill == 0) {
1326 if (multipleSenders)
1327 (void) syncSenders((char*)0xdeadbeef,0);
1328 terminateOutputThread(dest,0);
1329 return 0; /* make lint happy */
1332 if (Numblocks == ++at)
1333 at = 0;
1334 if (startread < 1) {
1335 err = pthread_mutex_lock(&LowMut);
1336 assert(err == 0);
1337 err = sem_getvalue(&Buf2Dev,&fill);
1338 assert(err == 0);
1339 if (((double)fill / (double)Numblocks) < startread) {
1340 err = pthread_cond_signal(&PercLow);
1341 assert(err == 0);
1343 err = pthread_mutex_unlock(&LowMut);
1344 assert(err == 0);
1346 Numout++;
1352 static void version(void)
1354 (void) fprintf(stderr,
1355 "mbuffer version "PACKAGE_VERSION"\n"\
1356 "Copyright 2001-2014 - T. Maier-Komor\n"\
1357 "License: GPLv3 - see file LICENSE\n"\
1358 "This program comes with ABSOLUTELY NO WARRANTY!!!\n"
1359 "Donations via PayPal to thomas@maier-komor.de are welcome and support this work!\n"
1360 "\n"
1362 exit(EXIT_SUCCESS);
1367 static void usage(void)
1369 const char *dim = "bkMGTP";
1370 unsigned long long m = Numblocks * Blocksize;
1371 while (m >= 10000) {
1372 m >>= 10;
1373 ++dim;
1375 (void) fprintf(stderr,
1376 "usage: mbuffer [Options]\n"
1377 "Options:\n"
1378 "-b <num> : use <num> blocks for buffer (default: %ld)\n"
1379 "-s <size> : use blocks of <size> bytes for processing (default: %llu)\n"
1380 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__) || defined(__FreeBSD__)
1381 "-m <size> : memory <size> of buffer in b,k,M,G,%% (default: 2%% = %llu%c)\n"
1382 #else
1383 "-m <size> : memory <size> of buffer in b,k,M,G,%% (default: %llu%c)\n"
1384 #endif
1385 #ifdef _POSIX_MEMLOCK_RANGE
1386 "-L : lock buffer in memory (unusable with file based buffers)\n"
1387 #endif
1388 "-d : use blocksize of device for output\n"
1389 "-D <size> : assumed output device size (default: infinite/auto-detect)\n"
1390 "-P <num> : start writing after buffer has been filled more than <num>%%\n"
1391 "-p <num> : start reading after buffer has been filled less than <num>%%\n"
1392 "-i <file> : use <file> for input\n"
1393 "-o <file> : use <file> for output (this option can be passed MULTIPLE times)\n"
1394 "--append : append to output file (must be passed before -o)\n"
1395 "--truncate : truncate next file (must be passed before -o)\n"
1396 "-I <h>:<p> : use network port <port> as input, allow only host <h> to connect\n"
1397 "-I <p> : use network port <port> as input\n"
1398 "-O <h>:<p> : output data to host <h> and port <p> (MUTLIPLE outputs supported)\n"
1399 "-n <num> : <num> volumes for input, '0' to prompt interactively\n"
1400 "-t : use memory mapped temporary file (for huge buffer)\n"
1401 "-T <file> : as -t but uses <file> as buffer\n"
1402 "-l <file> : use <file> for logging messages\n"
1403 "-u <num> : pause <num> milliseconds after each write\n"
1404 "-r <rate> : limit read rate to <rate> B/s, where <rate> can be given in b,k,M,G\n"
1405 "-R <rate> : same as -r for writing; use eiter one, if your tape is too fast\n"
1406 "-f : overwrite existing files\n"
1407 "-a <time> : autoloader which needs <time> seconds to reload\n"
1408 "-A <cmd> : issue command <cmd> to request new volume\n"
1409 "-v <level> : set verbose level to <level> (valid values are 0..6)\n"
1410 "-q : quiet - do not display the status on stderr\n"
1411 "-Q : quiet - do not log the status\n"
1412 "-c : write with synchronous data integrity support\n"
1413 "-e : stop processing on any kind of error\n"
1414 #ifdef O_DIRECT
1415 "--direct : open input and output with O_DIRECT\n"
1416 #endif
1417 #if defined HAVE_LIBCRYPTO || defined HAVE_LIBMD5 || defined HAVE_LIBMHASH
1418 "-H\n"
1419 "--md5 : generate md5 hash of transfered data\n"
1420 "--hash <a> : use alogritm <a>, if <a> is 'list' possible algorithms are listed\n"
1421 "--pid : print PID of this instance\n"
1422 "-W <time> : set watchdog timeout to <time> seconds\n"
1423 #endif
1424 "-4 : force use of IPv4\n"
1425 "-6 : force use of IPv6\n"
1426 "-0 : use IPv4 or IPv6\n"
1427 "--tcpbuffer: size for TCP buffer\n"
1428 "-V\n"
1429 "--version : print version information\n"
1430 "Unsupported buffer options: -t -Z -B\n"
1431 ,Numblocks
1432 ,Blocksize
1434 ,*dim
1436 exit(EXIT_SUCCESS);
1440 static unsigned long long calcint(const char **argv, int c, unsigned long long def)
1442 char ch;
1443 double d = (double)def;
1445 switch (sscanf(argv[c],"%lf%c",&d,&ch)) {
1446 default:
1447 assert(0);
1448 break;
1449 case 2:
1450 if (d <= 0)
1451 fatal("invalid argument - must be > 0\n");
1452 switch (ch) {
1453 case 'k':
1454 case 'K':
1455 d *= 1024.0;
1456 return (unsigned long long) d;
1457 case 'm':
1458 case 'M':
1459 d *= 1024.0*1024.0;
1460 return (unsigned long long) d;
1461 case 'g':
1462 case 'G':
1463 d *= 1024.0*1024.0*1024.0;
1464 return (unsigned long long) d;
1465 case 't':
1466 case 'T':
1467 d *= 1024.0*1024.0*1024.0*1024.0;
1468 return (unsigned long long) d;
1469 case '%':
1470 if ((d >= 90) || (d <= 0))
1471 fatal("invalid value for percentage (must be 0..90)\n");
1472 return (unsigned long long) d;
1473 case 'b':
1474 case 'B':
1475 if (d < 128)
1476 fatal("invalid value for number of bytes\n");
1477 return (unsigned long long) d;
1478 default:
1479 if (argv[c][-2] == '-')
1480 fatal("unrecognized size charakter \"%c\" for option \"%s\"\n",ch,&argv[c][-2]);
1481 else
1482 fatal("unrecognized size charakter \"%c\" for option \"%s\"\n",ch,argv[c-1]);
1483 return d;
1485 case 1:
1486 if (d <= 0)
1487 fatal("invalid argument - must be > 0\n");
1488 if (d <= 100) {
1489 if (argv[c][-2] == '-')
1490 fatal("invalid low value for option \"%s\" - missing suffix?\n",&argv[c][-2]);
1491 else
1492 fatal("invalid low value for option \"%s\" - missing suffix?\n",argv[c-1]);
1494 return d;
1495 case 0:
1496 break;
1498 errormsg("unrecognized argument \"%s\" for option \"%s\"\n",argv[c],argv[c-1]);
1499 return d;
1504 static int argcheck(const char *opt, const char **argv, int *c, int argc)
1506 if (strncmp(opt,argv[*c],strlen(opt)))
1507 return 1;
1508 if (strlen(argv[*c]) > 2)
1509 argv[*c] += 2;
1510 else {
1511 (*c)++;
1512 if (*c == argc)
1513 fatal("missing argument to option %s\n",opt);
1515 return 0;
1520 static void addHashAlgorithm(const char *name)
1522 const char *algoname = "";
1523 int algo = 0;
1524 #if HAVE_LIBMHASH
1525 int numalgo = mhash_count();
1527 while (algo <= numalgo) {
1528 algoname = (const char *) mhash_get_hash_name_static(algo);
1529 if (algoname && (strcasecmp(algoname,name) == 0))
1530 break;
1531 ++algo;
1533 #else
1534 algoname = "MD5";
1535 #endif
1536 if (strcasecmp(algoname,name) == 0) {
1537 dest_t *dest = malloc(sizeof(dest_t));
1538 bzero(dest,sizeof(dest_t));
1539 dest->name = algoname;
1540 dest->fd = algo;
1541 if (Dest) {
1542 dest->next = Dest->next;
1543 Dest->next = dest;
1544 } else {
1545 Dest = dest;
1546 dest->next = 0;
1548 debugmsg("enabled hash algorithm %s\n",name);
1549 ++NumSenders;
1550 ++Hashers;
1551 } else
1552 fatal("invalid or unsupported hash function %s\n",name);
1556 static void openDestinationFiles(dest_t *d)
1558 unsigned errs = ErrorOccurred;
1559 while (d) {
1560 if (d->fd == -1) {
1561 if (0 == strncmp(d->arg,"/dev/",5))
1562 d->mode &= ~O_EXCL;
1563 d->fd = open(d->arg,d->mode,0666);
1564 if ((-1 == d->fd) && (errno == EINVAL)) {
1565 d->mode &= ~LARGEFILE;
1566 d->fd = open(d->arg,d->mode,0666);
1568 if ((-1 == d->fd) && (errno == EINVAL)) {
1569 d->mode &= ~O_TRUNC;
1570 d->fd = open(d->arg,d->mode,0666);
1572 if (-1 == d->fd) {
1573 d->result = strerror(errno);
1574 errormsg("unable to open output %s: %s\n",d->arg,strerror(errno));
1575 } else {
1576 debugmsg("successfully opened destination file %s with fd %d\n",d->arg,d->fd);
1579 if (-1 == d->fd) {
1580 d->name = 0; /* tag destination as unstartable */
1581 --NumSenders;
1583 #ifdef __sun
1584 else if (d->arg) {
1585 if (0 == directio(d->fd,DIRECTIO_ON))
1586 infomsg("direct I/O hinting enabled for output to %s\n",d->arg);
1587 else
1588 infomsg("direct I/O hinting failed for output to %s: %s\n",d->arg,strerror(errno));
1590 #endif
1591 d = d->next;
1593 if (ErrorOccurred != errs)
1594 fatal("unable to open all outputs\n");
1599 static const char *calcval(const char *arg, unsigned long long *res)
1601 char ch;
1602 double d;
1604 switch (sscanf(arg,"%lf%c",&d,&ch)) {
1605 default:
1606 assert(0);
1607 break;
1608 case 2:
1609 if (d <= 0)
1610 return "negative value out of range";
1611 switch (ch) {
1612 case 'k':
1613 case 'K':
1614 d *= 1024.0;
1615 *res = d;
1616 return 0;
1617 case 'm':
1618 case 'M':
1619 d *= 1024.0*1024.0;
1620 *res = d;
1621 return 0;
1622 case 'g':
1623 case 'G':
1624 d *= 1024.0*1024.0*1024.0;
1625 *res = d;
1626 return 0;
1627 case 't':
1628 case 'T':
1629 d *= 1024.0*1024.0*1024.0*1024.0;
1630 *res = d;
1631 return 0;
1632 case '%':
1633 if ((d >= 90) || (d <= 0))
1634 return "invalid value for percentage (must be 0..90)";
1635 *res = d;
1636 return 0;
1637 case 'b':
1638 case 'B':
1639 if (d < 128)
1640 return "invalid value for number of bytes";
1641 *res = d;
1642 return 0;
1643 default:
1644 return "invalid dimension";
1646 case 1:
1647 if (d <= 0)
1648 return "value out of range";
1649 if (d <= 100)
1650 return "value out of range";
1651 *res = d;
1652 return 0;
1653 case 0:
1654 break;
1656 return "unrecognized argument";
1660 static void initDefaults()
1662 #ifdef PATH_MAX
1663 char dfname[PATH_MAX+1];
1664 #else
1665 char dfname[1024];
1666 #endif
1667 char line[256];
1668 const char *home = getenv("HOME");
1669 size_t l;
1670 int df;
1671 FILE *dfstr;
1672 struct stat st;
1674 if (home == 0) {
1675 warningmsg("HOME environment variable not set - unable to find defaults file\n");
1676 return;
1678 strncpy(dfname,home,sizeof(dfname)-1);
1679 dfname[sizeof(dfname)-1] = 0;
1680 l = strlen(dfname);
1681 if (l + 12 > PATH_MAX) {
1682 warningmsg("path to defaults file breaks PATH_MAX\n");
1683 return;
1685 strcat(dfname,"/.mbuffer.rc");
1686 df = open(dfname,O_RDONLY);
1687 if (df == -1) {
1688 if (errno == ENOENT)
1689 infomsg("no defaults file ~/.mbuffer.rc\n");
1690 else
1691 warningmsg("error opening defaults file %s: %s\n",dfname,strerror(errno));
1692 return;
1694 if (-1 == fstat(df,&st)) {
1695 warningmsg("unable to stat defaults file %s: %s\n",dfname,strerror(errno));
1696 close(df);
1697 return;
1699 if (getuid() != st.st_uid) {
1700 warningmsg("ignoring defaults file from different user\n");
1701 close(df);
1702 return;
1704 infomsg("reading defaults file %s\n",dfname);
1705 dfstr = fdopen(df,"r");
1706 assert(dfstr);
1707 while (!feof(dfstr)) {
1708 char key[64],valuestr[64];
1709 fscanf(dfstr,"%255[^\n]\n",line);
1710 char *pound = strchr(line,'#');
1711 unsigned long long value;
1712 int a;
1714 if (pound)
1715 *pound = 0;
1716 a = sscanf(line,"%63[A-Za-z]%*[ \t=:]%63[0-9a-zA-Z]",key,valuestr);
1717 if (a != 2) {
1718 warningmsg("unable to parse line '%s' in .mbuffer.rc; %d arguments\n",line,a);
1719 continue;
1721 debugmsg("parsing key/value pair %s=%s\n",key,valuestr);
1722 if (strcasecmp(key,"numblocks") == 0) {
1723 long nb = strtol(valuestr,0,0);
1724 if ((nb == 0) && (errno == EINVAL)) {
1725 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1726 } else {
1727 Numblocks = nb;
1728 debugmsg("Numblocks = %llu\n",Numblocks);
1730 } else if (strcasecmp(key,"pause") == 0) {
1731 long p = strtol(valuestr,0,0);
1732 if ((p == 0) && (errno == EINVAL)) {
1733 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1734 } else {
1735 Pause = p;
1736 debugmsg("Pause = %d\n",Pause);
1738 } else if (strcasecmp(key,"autoloadtime") == 0) {
1739 long at = strtol(valuestr,0,0) - 1;
1740 if ((at == 0) && (errno == EINVAL))
1741 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1742 else {
1743 AutoloadTime = at;
1744 debugmsg("Autoloader time = %d\n",AutoloadTime);
1746 } else if (strcasecmp(key,"startread") == 0) {
1747 double sr = 0;
1748 if (1 == sscanf(valuestr,"%lf",&sr))
1749 sr /= 100;
1750 if ((sr <= 1) && (sr > 0)) {
1751 StartRead = sr;
1752 debugmsg("StartRead = %1.2lf\n",StartRead);
1754 } else if (strcasecmp(key,"startwrite") == 0) {
1755 double sw = 0;
1756 if (1 == sscanf(valuestr,"%lf",&sw))
1757 sw /= 100;
1758 if ((sw <= 1) && (sw > 0)) {
1759 StartWrite = sw;
1760 debugmsg("StartWrite = %1.2lf\n",StartWrite);
1762 } else if (strcasecmp(key,"timeout") == 0) {
1763 long t = strtol(valuestr,0,0);
1764 if (((t == 0) && (errno == EINVAL)) || (t < 0))
1765 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1766 else {
1767 Timeout = t;
1768 debugmsg("Timeout = %lu\n",Timeout);
1770 } else if (strcasecmp(key,"showstatus") == 0) {
1771 if ((strcasecmp(valuestr,"yes") == 0) || (strcasecmp(valuestr,"on") == 0) || (strcmp(valuestr,"1") == 0)) {
1772 Quiet = 0;
1773 debugmsg("showstatus = yes\n");
1774 } else if ((strcasecmp(valuestr,"no") == 0) || (strcasecmp(valuestr,"off") == 0) || (strcmp(valuestr,"0") == 0)) {
1775 Quiet = 1;
1776 debugmsg("showstatus = no\n");
1777 } else
1778 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1779 continue;
1780 } else if (strcasecmp(key,"logstatus") == 0) {
1781 if ((strcasecmp(valuestr,"yes") == 0) || (strcasecmp(valuestr,"on") == 0) || (strcmp(valuestr,"1") == 0)) {
1782 StatusLog = 1;
1783 debugmsg("logstatus = yes\n");
1784 } else if ((strcasecmp(valuestr,"no") == 0) || (strcasecmp(valuestr,"off") == 0) || (strcmp(valuestr,"0") == 0)) {
1785 StatusLog = 0;
1786 debugmsg("logstatus = no\n");
1787 } else
1788 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1789 continue;
1790 } else if (strcasecmp(key,"memlock") == 0) {
1791 if ((strcasecmp(valuestr,"yes") == 0) || (strcasecmp(valuestr,"on") == 0) || (strcmp(valuestr,"1") == 0)) {
1792 Memlock = 1;
1793 debugmsg("Memlock = %lu\n",Memlock);
1794 } else if ((strcasecmp(valuestr,"no") == 0) || (strcasecmp(valuestr,"off") == 0) || (strcmp(valuestr,"0") == 0)) {
1795 Memlock = 0;
1796 debugmsg("Memlock = %lu\n",Memlock);
1797 } else
1798 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1799 continue;
1800 } else if (strcasecmp(key,"printpid") == 0) {
1801 if ((strcasecmp(valuestr,"yes") == 0) || (strcasecmp(valuestr,"on") == 0) || (strcmp(valuestr,"1") == 0)) {
1802 printmsg("PID is %d\n",getpid());
1803 } else if ((strcasecmp(valuestr,"no") == 0) || (strcasecmp(valuestr,"off") == 0) || (strcmp(valuestr,"0") == 0)) {
1804 } else
1805 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1806 continue;
1808 const char *argerror = calcval(valuestr,&value);
1809 if (argerror) {
1810 warningmsg("ignoring key/value pair from defaults file (%s = %s): %s\n",key,valuestr,argerror);
1811 continue;
1813 if (strcasecmp(key,"blocksize") == 0) {
1814 Blocksize = value;
1815 } else if (strcasecmp(key,"maxwritespeed") == 0) {
1816 MaxWriteSpeed = value;
1817 } else if (strcasecmp(key,"maxreadspeed") == 0) {
1818 MaxReadSpeed = value;
1819 } else if (strcasecmp(key,"Totalmem") == 0) {
1820 if (value < 100) {
1821 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__) || defined(__FreeBSD__)
1822 Totalmem = ((unsigned long long) NumP * PgSz * value) / 100 ;
1823 debugmsg("Totalmem = %lluk\n",Totalmem>>10);
1824 #else
1825 warningmsg("Unable to determine page size or amount of available memory - please specify an absolute amount of memory.\n");
1826 #endif
1828 } else if (strcasecmp(key,"tcpbuffer") == 0) {
1829 TCPBufSize = value;
1830 } else {
1831 warningmsg("unknown key: %s\n",key);
1832 continue;
1834 infomsg("setting %s to %lld\n",key,value);
1836 fclose(dfstr);
1837 close(df);
1841 int main(int argc, const char **argv)
1843 int optMset = 0, optSset = 0, optBset = 0, optMode = O_EXCL, numOut = 0;
1844 int numstdout = 0, numthreads = 0;
1845 long mxnrsem;
1846 int c, fl, err;
1847 sigset_t signalSet;
1848 #ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
1849 struct stat st;
1850 #endif
1851 unsigned short netPortIn = 0;
1852 unsigned short netPortOut = 0;
1853 char *argv0 = strdup(argv[0]), *progname, null;
1854 const char *outfile = 0;
1855 struct sigaction sig;
1856 dest_t *dest = 0;
1858 /* setup logging prefix */
1859 progname = basename(argv0);
1860 PrefixLen = strlen(progname) + 2;
1861 Prefix = malloc(PrefixLen);
1862 (void) strcpy(Prefix,progname);
1863 Prefix[PrefixLen - 2] = ':';
1864 Prefix[PrefixLen - 1] = ' ';
1866 /* set verbose level before parsing defaults and options */
1867 for (c = 1; c < argc; c++) {
1868 const char *arg = argv[c];
1869 if ((arg[0] == '-') && (arg[1] == 'v')) {
1870 long verb;
1871 if (arg[2])
1872 verb = strtol(arg+2,0,0);
1873 else
1874 verb = strtol(argv[++c],0,0);
1875 if ((verb == 0) && (errno == EINVAL))
1876 errormsg("invalid argument to option -v: \"%s\"\n",argv[c]);
1877 else
1878 Verbose = verb;
1879 debugmsg("Verbose = %d\n",Verbose);
1883 /* gather system parameters */
1884 TickTime = 1000000 / sysconf(_SC_CLK_TCK);
1885 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__)
1886 PgSz = sysconf(_SC_PAGESIZE);
1887 assert(PgSz > 0);
1888 NumP = sysconf(_SC_AVPHYS_PAGES);
1889 assert(NumP > 0);
1890 Blocksize = PgSz;
1891 debugmsg("total # of phys pages: %li (pagesize %li)\n",NumP,PgSz);
1892 Numblocks = NumP/50;
1893 #elif defined(__FreeBSD__)
1894 size_t nump_size = sizeof(nump_size);
1895 sysctlbyname("hw.availpages", &NumP, &nump_size, NULL, 0);
1896 PgSz = sysconf(_SC_PAGESIZE);
1897 assert(PgSz > 0);
1898 #endif
1899 #if defined(_POSIX_MONOTONIC_CLOCK) && (_POSIX_MONOTONIC_CLOCK >= 0) && defined(CLOCK_MONOTONIC)
1900 if (sysconf(_SC_MONOTONIC_CLOCK) > 0)
1901 ClockSrc = CLOCK_MONOTONIC;
1902 #endif
1904 /* setup parameters */
1905 initDefaults();
1906 debugmsg("default buffer set to %d blocks of %lld bytes\n",Numblocks,Blocksize);
1907 for (c = 1; c < argc; c++) {
1908 if (!argcheck("-s",argv,&c,argc)) {
1909 Blocksize = Outsize = calcint(argv,c,Blocksize);
1910 optSset = 1;
1911 debugmsg("Blocksize = %llu\n",Blocksize);
1912 if (Blocksize < 100)
1913 fatal("cannot set blocksize as percentage of total physical memory\n");
1914 } else if (!strcmp("--append",argv[c])) {
1915 optMode |= O_APPEND;
1916 debugmsg("append to next file\n");
1917 } else if (!strcmp("--truncate",argv[c])) {
1918 optMode |= O_TRUNC;
1919 debugmsg("truncate next file\n");
1920 } else if (!argcheck("-m",argv,&c,argc)) {
1921 Totalmem = calcint(argv,c,Totalmem);
1922 optMset = 1;
1923 if (Totalmem < 100) {
1924 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__) || defined(__FreeBSD__)
1925 Totalmem = ((unsigned long long) NumP * PgSz * Totalmem) / 100 ;
1926 #else
1927 fatal("Unable to determine page size or amount of available memory - please specify an absolute amount of memory.\n");
1928 #endif
1930 debugmsg("Totalmem = %lluk\n",Totalmem>>10);
1931 } else if (!argcheck("-b",argv,&c,argc)) {
1932 long nb = strtol(argv[c],0,0);
1933 if ((nb == 0) && (errno == EINVAL)) {
1934 errormsg("invalid argument to option -b: \"%s\"\n",argv[c]);
1935 } else {
1936 Numblocks = nb;
1937 optBset = 1;
1939 debugmsg("Numblocks = %llu\n",Numblocks);
1940 } else if (!strcmp("--tcpbuffer",argv[c])) {
1941 TCPBufSize = calcint(argv,++c,TCPBufSize);
1942 debugmsg("TCPBufSize = %lu\n",TCPBufSize);
1943 } else if (!argcheck("-d",argv,&c,argc)) {
1944 #ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
1945 SetOutsize = 1;
1946 debugmsg("setting output size according to the blocksize of the device\n");
1947 #else
1948 fatal("cannot determine blocksize of device (unsupported by OS)\n");
1949 #endif
1950 } else if (!argcheck("-v",argv,&c,argc)) {
1951 /* has been parsed already */
1952 } else if (!argcheck("-u",argv,&c,argc)) {
1954 long p = strtol(argv[c],0,0);
1955 if ((p == 0) && (errno == EINVAL))
1956 errormsg("invalid argument to option -u: \"%s\"\n",argv[c]);
1957 else
1958 Pause = p;
1959 debugmsg("Pause = %d\n",Pause);
1960 } else if (!argcheck("-r",argv,&c,argc)) {
1961 MaxReadSpeed = calcint(argv,c,0);
1962 debugmsg("MaxReadSpeed = %lld\n",MaxReadSpeed);
1963 } else if (!argcheck("-R",argv,&c,argc)) {
1964 MaxWriteSpeed = calcint(argv,c,0);
1965 debugmsg("MaxWriteSpeed = %lld\n",MaxWriteSpeed);
1966 } else if (!argcheck("-n",argv,&c,argc)) {
1967 long nv = strtol(argv[c],0,0);
1968 if ((nv < 0) || ((nv == 0) && (errno == EINVAL)))
1969 fatal("invalid argument to option -n: \"%s\"\n",argv[c]);
1970 else
1971 NumVolumes = nv;
1972 if (NumVolumes < 0)
1973 fatal("argument for number of volumes must be > 0\n");
1974 debugmsg("NumVolumes = %d\n",NumVolumes);
1975 } else if (!argcheck("-i",argv,&c,argc)) {
1976 if (strcmp(argv[c],"-")) {
1977 Infile = argv[c];
1978 debugmsg("Infile = %s\n",Infile);
1979 } else {
1980 Infile = STDIN_FILENO;
1981 debugmsg("Infile is stdin\n");
1983 } else if (!argcheck("-o",argv,&c,argc)) {
1984 dest_t *dest = malloc(sizeof(dest_t));
1985 if (strcmp(argv[c],"-")) {
1986 debugmsg("output file: %s\n",argv[c]);
1987 dest->arg = argv[c];
1988 dest->name = argv[c];
1989 dest->fd = -1;
1990 dest->mode = O_CREAT|O_WRONLY|optMode|Direct|LARGEFILE|OptSync;
1991 } else {
1992 if (numstdout++)
1993 fatal("cannot output multiple times to stdout\n");
1994 debugmsg("output to stdout\n",argv[c]);
1995 dest->fd = dup(STDOUT_FILENO);
1996 err = dup2(STDERR_FILENO,STDOUT_FILENO);
1997 assert(err != -1);
1998 dest->arg = "<stdout>";
1999 dest->name = "<stdout>";
2000 dest->mode = 0;
2002 optMode = O_EXCL;
2003 dest->port = 0;
2004 dest->result = 0;
2005 bzero(&dest->thread,sizeof(dest->thread));
2006 dest->next = Dest;
2007 Dest = dest;
2008 if (outfile == 0)
2009 outfile = argv[c];
2010 ++numOut;
2011 ++NumSenders;
2012 #ifdef AF_INET6
2013 } else if (!strcmp("-0",argv[c])) {
2014 AddrFam = AF_UNSPEC;
2015 } else if (!strcmp("-4",argv[c])) {
2016 AddrFam = AF_INET;
2017 } else if (!strcmp("-6",argv[c])) {
2018 AddrFam = AF_INET6;
2019 #endif
2020 } else if (!argcheck("-I",argv,&c,argc)) {
2021 initNetworkInput(argv[c]);
2022 } else if (!argcheck("-O",argv,&c,argc)) {
2023 dest_t *d = createNetworkOutput(argv[c]);
2024 if (d->fd == -1) {
2025 free(d);
2026 } else {
2027 d->next = Dest;
2028 Dest = d;
2029 ++NumSenders;
2031 ++numOut;
2032 } else if (!argcheck("-T",argv,&c,argc)) {
2033 Tmpfile = malloc(strlen(argv[c]) + 1);
2034 if (!Tmpfile)
2035 fatal("out of memory\n");
2036 (void) strcpy(Tmpfile, argv[c]);
2037 Memmap = 1;
2038 debugmsg("Tmpfile = %s\n",Tmpfile);
2039 } else if (!strcmp("-t",argv[c])) {
2040 Memmap = 1;
2041 debugmsg("Memmap = 1\n");
2042 } else if (!argcheck("-l",argv,&c,argc)) {
2043 Log = open(argv[c],O_WRONLY|O_APPEND|O_TRUNC|O_CREAT|LARGEFILE,0666);
2044 if (-1 == Log) {
2045 Log = STDERR_FILENO;
2046 errormsg("error opening log file: %s\n",strerror(errno));
2048 debugmsg("logFile set to %s\n",argv[c]);
2049 } else if (!strcmp("-f",argv[c])) {
2050 optMode &= ~O_EXCL;
2051 debugmsg("overwrite = 1\n");
2052 } else if (!strcmp("-q",argv[c])) {
2053 debugmsg("disabling display of status\n");
2054 Quiet = 1;
2055 } else if (!strcmp("-Q",argv[c])) {
2056 debugmsg("disabling logging of status\n");
2057 StatusLog = 0;
2058 } else if (!strcmp("-c",argv[c])) {
2059 debugmsg("enabling full synchronous I/O\n");
2060 OptSync = O_SYNC;
2061 } else if (!strcmp("-e",argv[c])) {
2062 debugmsg("will terminate on any kind of error\n");
2063 ErrorsFatal = 1;
2064 } else if (!argcheck("-a",argv,&c,argc)) {
2065 long at = strtol(argv[c],0,0) - 1;
2066 if ((at == 0) && (errno == EINVAL))
2067 errormsg("invalid argument to option -a: \"%s\"\n",argv[c]);
2068 else {
2069 Autoloader = 1;
2070 AutoloadTime = at;
2072 debugmsg("Autoloader time = %d\n",AutoloadTime);
2073 } else if (!argcheck("-A",argv,&c,argc)) {
2074 Autoloader = 1;
2075 AutoloadCmd = argv[c];
2076 debugmsg("Autoloader command = \"%s\"\n", AutoloadCmd);
2077 } else if (!argcheck("-P",argv,&c,argc)) {
2078 if (1 != sscanf(argv[c],"%lf",&StartWrite))
2079 StartWrite = 0;
2080 StartWrite /= 100;
2081 if ((StartWrite > 1) || (StartWrite <= 0))
2082 fatal("error in argument -P: must be bigger than 0 and less or equal 100\n");
2083 debugmsg("StartWrite = %1.2lf\n",StartWrite);
2084 } else if (!argcheck("-p",argv,&c,argc)) {
2085 if (1 == sscanf(argv[c],"%lf",&StartRead))
2086 StartRead /= 100;
2087 else
2088 StartRead = 1.0;
2089 if ((StartRead >= 1) || (StartRead < 0))
2090 fatal("error in argument -p: must be bigger or equal to 0 and less than 100\n");
2091 debugmsg("StartRead = %1.2lf\n",StartRead);
2092 } else if (!strcmp("-L",argv[c])) {
2093 #ifdef _POSIX_MEMLOCK_RANGE
2094 Memlock = 1;
2095 debugmsg("memory locking enabled\n");
2096 #else
2097 warning("POSIX memory locking is unsupported on this system.\n");
2098 #endif
2099 } else if (!argcheck("-W",argv,&c,argc)) {
2100 Timeout = strtol(argv[c],0,0);
2101 if (Timeout <= 0)
2102 fatal("invalid argument to option -W\n");
2103 if (Timeout <= AutoloadTime)
2104 fatal("timeout must be bigger than autoload time\n");
2105 } else if (!strcmp("--direct",argv[c])) {
2106 #ifdef O_DIRECT
2107 debugmsg("using O_DIRECT to open file descriptors\n");
2108 Direct = O_DIRECT;
2109 #else
2110 warningmsg("--direct is unsupported on this system\n");
2111 #endif
2112 } else if (!strcmp("--help",argv[c]) || !strcmp("-h",argv[c])) {
2113 usage();
2114 } else if (!strcmp("--version",argv[c]) || !strcmp("-V",argv[c])) {
2115 version();
2116 } else if (!strcmp("--md5",argv[c]) || !strcmp("-H",argv[c])) {
2117 #ifdef HAVE_MD5
2118 addHashAlgorithm("MD5");
2119 #else
2120 fatal("hash calculation support has not been compiled in!\n");
2121 #endif
2122 } else if (!strcmp("--hash",argv[c])) {
2123 ++c;
2124 if (c == argc)
2125 fatal("missing argument to option --hash\n");
2126 #if HAVE_LIBMHASH
2127 if (!strcmp(argv[c],"list")) {
2128 (void) fprintf(stderr,"valid hash functions are:\n");
2129 int algo = mhash_count();
2130 while (algo >= 0) {
2131 const char *algoname = (const char *) mhash_get_hash_name_static(algo);
2132 if (algoname)
2133 (void) fprintf(stderr,"\t%s\n",algoname);
2134 --algo;
2136 exit(EXIT_SUCCESS);
2138 #elif defined HAVE_MD5
2139 if (!strcmp(argv[c],"list")) {
2140 (void) fprintf(stderr,"valid hash functions are:\n");
2141 (void) fprintf(stderr,"\tMD5\n");
2142 exit(EXIT_SUCCESS);
2144 #else
2145 fatal("hash calculation support has not been compiled in!\n");
2146 #endif
2147 addHashAlgorithm(argv[c]);
2148 } else if (!strcmp("--pid",argv[c])) {
2149 printmsg("PID is %d\n",getpid());
2150 } else if (!argcheck("-D",argv,&c,argc)) {
2151 OutVolsize = calcint(argv,c,0);
2152 debugmsg("OutVolsize = %llu\n",OutVolsize);
2153 } else
2154 fatal("unknown option \"%s\"\n",argv[c]);
2157 /* consistency check for options */
2158 if (AutoloadTime && Timeout && Timeout <= AutoloadTime)
2159 fatal("autoload time must be smaller than watchdog timeout\n");
2160 if (optBset&optSset&optMset) {
2161 if (Numblocks * Blocksize != Totalmem)
2162 fatal("inconsistent options: blocksize * number of blocks != totalsize!\n");
2163 } else if (((!optBset)&optSset&optMset) || (optMset&(!optBset)&(!optSset))) {
2164 if (Totalmem <= Blocksize)
2165 fatal("total memory must be larger than block size\n");
2166 Numblocks = Totalmem / Blocksize;
2167 infomsg("Numblocks = %llu, Blocksize = %llu, Totalmem = %llu\n",(unsigned long long)Numblocks,(unsigned long long)Blocksize,(unsigned long long)Totalmem);
2168 } else if (optBset&!optSset&optMset) {
2169 if (Blocksize == 0)
2170 fatal("blocksize must be greater than 0\n");
2171 if (Totalmem <= Blocksize)
2172 fatal("total memory must be larger than block size\n");
2173 Blocksize = Totalmem / Numblocks;
2174 infomsg("blocksize = %llu\n",(unsigned long long)Blocksize);
2176 if ((StartRead < 1) && (StartWrite > 0))
2177 fatal("setting both low watermark and high watermark doesn't make any sense...\n");
2178 if ((NumSenders-Hashers > 0) && (Autoloader || OutVolsize))
2179 fatal("multi-volume support is unsupported with multiple outputs\n");
2180 if (Autoloader) {
2181 if ((!outfile) && (!Infile))
2182 fatal("Setting autoloader time or command without using a device doesn't make any sense!\n");
2183 if (outfile && Infile) {
2184 fatal("Which one is your autoloader? Input or output? Replace input or output with a pipe.\n");
2187 if (Infile && netPortIn)
2188 fatal("Setting both network input port and input file doesn't make sense!\n");
2189 if (outfile && netPortOut)
2190 fatal("Setting both network output and output file doesn't make sense!\n");
2192 /* multi volume input consistency checking */
2193 if ((NumVolumes != 1) && (!Infile))
2194 fatal("multi volume support for input needs an explicit given input device (option -i)\n");
2196 /* SPW: Volsize consistency checking */
2197 if (OutVolsize && !outfile)
2198 fatal("Setting OutVolsize without an output device doesn't make sense!\n");
2199 if ((OutVolsize != 0) && (OutVolsize < Blocksize))
2200 /* code assumes we can write at least one block */
2201 fatal("If non-zero, OutVolsize must be at least as large as the buffer blocksize (%llu)!\n",Blocksize);
2202 /* SPW END */
2204 /* check that we stay within system limits */
2205 #ifdef __FreeBSD__
2207 size_t semvmx_size = sizeof(mxnrsem);
2208 if (sysctlbyname("kern.ipc.semvmx", &mxnrsem, &semvmx_size, 0, 0) == -1)
2209 mxnrsem = -1;
2211 #else
2212 mxnrsem = sysconf(_SC_SEM_VALUE_MAX);
2213 #endif
2214 if (-1 == mxnrsem) {
2215 #ifdef SEM_MAX_VALUE
2216 mxnrsem = SEM_MAX_VALUE;
2217 #else
2218 mxnrsem = LONG_MAX;
2219 warningmsg("unable to determine maximum value of semaphores\n");
2220 #endif
2222 if (Numblocks < 5)
2223 fatal("Minimum block count is 5.\n");
2224 if (Numblocks > mxnrsem) {
2225 fatal("cannot allocate more than %d blocks.\nThis is a system dependent limit, depending on the maximum semaphore value.\nPlease choose a bigger block size.\n",mxnrsem);
2228 if ((Blocksize * (long long)Numblocks) > (long long)SSIZE_MAX)
2229 fatal("Cannot address so much memory (%lld*%d=%lld>%lld).\n",Blocksize,Numblocks,Blocksize*(long long)Numblocks,(long long)SSIZE_MAX);
2230 /* create buffer */
2231 Buffer = (char **) valloc(Numblocks * sizeof(char *));
2232 if (!Buffer)
2233 fatal("Could not allocate enough memory (%d requested): %s\n",Numblocks * sizeof(char *),strerror(errno));
2234 if (Memmap) {
2235 infomsg("mapping temporary file to memory with %llu blocks with %llu byte (%llu kB total)...\n",(unsigned long long) Numblocks,(unsigned long long) Blocksize,(unsigned long long) ((Numblocks*Blocksize) >> 10));
2236 if (!Tmpfile) {
2237 char tmplname[] = "mbuffer-XXXXXX";
2238 char *tmpdir = getenv("TMPDIR") ? getenv("TMPDIR") : "/var/tmp";
2239 char tfilename[sizeof(tmplname) + strlen(tmpdir) + 1];
2240 (void) strcpy(tfilename,tmpdir);
2241 (void) strcat(tfilename,"/");
2242 (void) strcat(tfilename,tmplname);
2243 Tmp = mkstemp(tfilename);
2244 Tmpfile = malloc(strlen(tfilename));
2245 if (!Tmpfile)
2246 fatal("out of memory: %s\n",strerror(errno));
2247 (void) strcpy(Tmpfile,tfilename);
2248 infomsg("tmpfile is %s\n",Tmpfile);
2249 } else {
2250 mode_t mode = O_RDWR | LARGEFILE;
2251 if (strncmp(Tmpfile,"/dev/",5))
2252 mode |= O_CREAT|O_EXCL;
2253 Tmp = open(Tmpfile,mode,0600);
2255 if (-1 == Tmp)
2256 fatal("could not create temporary file (%s): %s\n",Tmpfile,strerror(errno));
2257 if (strncmp(Tmpfile,"/dev/",5))
2258 (void) unlink(Tmpfile);
2259 /* resize the file. Needed - at least under linux, who knows why? */
2260 if (-1 == lseek(Tmp,Numblocks * Blocksize - sizeof(int),SEEK_SET))
2261 fatal("could not resize temporary file: %s\n",strerror(errno));
2262 if (-1 == write(Tmp,&c,sizeof(int)))
2263 fatal("could not resize temporary file: %s\n",strerror(errno));
2264 Buffer[0] = mmap(0,Blocksize*Numblocks,PROT_READ|PROT_WRITE,MAP_SHARED,Tmp,0);
2265 if (MAP_FAILED == Buffer[0])
2266 fatal("could not map buffer-file to memory: %s\n",strerror(errno));
2267 debugmsg("temporary file mapped to address %p\n",Buffer[0]);
2268 } else {
2269 infomsg("allocating memory for %d blocks with %llu byte (%llu kB total)...\n",Numblocks,(unsigned long long) Blocksize,(unsigned long long) ((Numblocks*Blocksize) >> 10));
2270 Buffer[0] = (char *) valloc(Blocksize * Numblocks);
2271 if (Buffer[0] == 0)
2272 fatal("Could not allocate enough memory (%lld requested): %s\n",(unsigned long long)Blocksize * Numblocks,strerror(errno));
2273 #ifdef MADV_DONTFORK
2274 if (-1 == madvise(Buffer[0],Blocksize * Numblocks, MADV_DONTFORK))
2275 warningmsg("unable to advise memory handling of buffer: %s\n",strerror(errno));
2276 #endif
2278 for (c = 1; c < Numblocks; c++) {
2279 Buffer[c] = Buffer[0] + Blocksize * c;
2280 *Buffer[c] = 0; /* touch every block before locking */
2283 #ifdef _POSIX_MEMLOCK_RANGE
2284 if (Memlock) {
2285 uid_t uid;
2286 #ifndef HAVE_SETEUID
2287 #define seteuid setuid
2288 #endif
2289 uid = geteuid();
2290 if (0 != seteuid(0))
2291 warningmsg("could not change to uid 0 to lock memory (is mbuffer setuid root?)\n");
2292 else if ((0 != mlock((char *)Buffer,Numblocks * sizeof(char *))) || (0 != mlock(Buffer[0],Blocksize * Numblocks)))
2293 warningmsg("could not lock buffer in memory: %s\n",strerror(errno));
2294 else
2295 infomsg("memory locked successfully\n");
2296 err = seteuid(uid); /* don't give anyone a chance to attack this program, so giveup uid 0 after locking... */
2297 assert(err == 0);
2299 #endif
2301 debugmsg("creating semaphores...\n");
2302 if (0 != sem_init(&Buf2Dev,0,0))
2303 fatal("Error creating semaphore Buf2Dev: %s\n",strerror(errno));
2304 if (0 != sem_init(&Dev2Buf,0,Numblocks))
2305 fatal("Error creating semaphore Dev2Buf: %s\n",strerror(errno));
2307 debugmsg("opening input...\n");
2308 if (Infile) {
2309 int flags = O_RDONLY | LARGEFILE | Direct;
2310 In = open(Infile,flags);
2311 if (-1 == In) {
2312 if (errno == EINVAL) {
2313 flags &= ~LARGEFILE;
2314 In = open(Infile,flags);
2316 if (-1 == In)
2317 fatal("could not open input file: %s\n",strerror(errno));
2319 } else if (In == -1) {
2320 In = STDIN_FILENO;
2322 #ifdef __sun
2323 if (0 == directio(In,DIRECTIO_ON))
2324 infomsg("direct I/O hinting enabled for input\n");
2325 else
2326 infomsg("direct I/O hinting failed for input: %s\n",strerror(errno));
2327 #endif
2328 if (numOut == 0) {
2329 dest_t *d = malloc(sizeof(dest_t));
2330 d->fd = dup(STDOUT_FILENO);
2331 err = dup2(STDERR_FILENO,STDOUT_FILENO);
2332 assert(err != -1);
2333 d->name = "<stdout>";
2334 d->arg = "<stdout>";
2335 d->port = 0;
2336 d->result = 0;
2337 bzero(&d->thread,sizeof(d->thread));
2338 d->next = Dest;
2339 Dest = d;
2340 ++NumSenders;
2342 openDestinationFiles(Dest);
2343 if (NumSenders == -1) {
2344 fatal("no output left - nothing to do\n");
2347 debugmsg("checking if we have a controlling terminal...\n");
2348 sig.sa_handler = SIG_IGN;
2349 sigemptyset(&sig.sa_mask);
2350 sig.sa_flags = 0;
2351 err = sigaction(SIGTTIN,&sig,0);
2352 assert(err == 0);
2353 fl = fcntl(STDERR_FILENO,F_GETFL);
2354 err = fcntl(STDERR_FILENO,F_SETFL,fl | O_NONBLOCK);
2355 assert(err == 0);
2356 if ((read(STDERR_FILENO,&c,1) == -1) && (errno != EAGAIN)) {
2357 int tty = open("/dev/tty",O_RDWR);
2358 if (-1 == tty) {
2359 Terminal = 0;
2360 if ((Autoloader == 0) && (outfile))
2361 warningmsg("No controlling terminal and no autoloader command specified.\n");
2362 } else {
2363 Terminal = 1;
2364 err = dup2(tty,STDERR_FILENO);
2365 assert(err != -1);
2368 err = fcntl(STDERR_FILENO,F_SETFL,fl);
2369 assert(err == 0);
2370 if ((Terminal == 1) && (NumVolumes != 1)) {
2371 struct termios tset;
2372 if (-1 == tcgetattr(STDERR_FILENO,&tset)) {
2373 warningmsg("unable to get terminal attributes: %s\n",strerror(errno));
2374 } else {
2375 tset.c_lflag &= (~ICANON) & (~ECHO);
2376 tset.c_cc[VTIME] = 0;
2377 tset.c_cc[VMIN] = 1;
2378 if (-1 == tcsetattr(STDERR_FILENO,TCSANOW,&tset))
2379 warningmsg("unable to set terminal attributes: %s\n",strerror(errno));
2383 debugmsg("registering signals...\n");
2384 sig.sa_handler = sigHandler;
2385 err = sigemptyset(&sig.sa_mask);
2386 assert(err == 0);
2387 err = sigaddset(&sig.sa_mask,SIGINT);
2388 assert(err == 0);
2389 sig.sa_flags = SA_RESTART;
2390 if (0 != sigaction(SIGINT,&sig,0))
2391 warningmsg("error registering new SIGINT handler: %s\n",strerror(errno));
2392 err = sigemptyset(&sig.sa_mask);
2393 assert(err == 0);
2394 err = sigaddset(&sig.sa_mask,SIGHUP);
2395 assert(err == 0);
2396 if (0 != sigaction(SIGHUP,&sig,0))
2397 warningmsg("error registering new SIGHUP handler: %s\n",strerror(errno));
2399 debugmsg("starting threads...\n");
2400 (void) gettimeofday(&Starttime,0);
2401 err = sigfillset(&signalSet);
2402 assert(0 == err);
2403 (void) pthread_sigmask(SIG_BLOCK, &signalSet, NULL);
2405 /* select destination for output thread */
2406 dest = Dest;
2407 while (dest->fd == -1) {
2408 dest->name = 0;
2409 debugmsg("skipping destination %s\n",dest->arg);
2410 assert(dest->next);
2411 dest = dest->next;
2414 #ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
2415 debugmsg("checking output device...\n");
2416 if (-1 == fstat(dest->fd,&st))
2417 errormsg("could not stat output: %s\n",strerror(errno));
2418 else if (S_ISBLK(st.st_mode) || S_ISCHR(st.st_mode)) {
2419 infomsg("blocksize is %d bytes on output device\n",st.st_blksize);
2420 if (Blocksize % st.st_blksize != 0) {
2421 warningmsg("Blocksize should be a multiple of the blocksize of the output device!\n"
2422 "This can cause problems with some device/OS combinations...\n"
2423 "Blocksize on output device is %d (transfer block size is %lld)\n", st.st_blksize, Blocksize);
2424 if (SetOutsize)
2425 fatal("unable to set output blocksize\n");
2426 } else {
2427 if (SetOutsize) {
2428 infomsg("setting output blocksize to %d\n",st.st_blksize);
2429 Outsize = st.st_blksize;
2432 } else
2433 infomsg("no device on output stream\n");
2434 debugmsg("checking input device...\n");
2435 if (-1 == fstat(In,&st))
2436 warningmsg("could not stat input: %s\n",strerror(errno));
2437 else if (S_ISBLK(st.st_mode) || S_ISCHR(st.st_mode)) {
2438 infomsg("blocksize is %d bytes on input device\n",st.st_blksize);
2439 if (Blocksize % st.st_blksize != 0) {
2440 warningmsg("Blocksize should be a multiple of the blocksize of the input device!\n"
2441 "Use option -s to adjust transfer block size if you get an out-of-memory error on input.\n"
2442 "Blocksize on input device is %d (transfer block size is %lld)\n", st.st_blksize, Blocksize);
2444 } else
2445 infomsg("no device on input stream\n");
2446 #else
2447 warningmsg("Could not stat output device (unsupported by system)!\n"
2448 "This can result in incorrect written data when\n"
2449 "using multiple volumes. Continue at your own risk!\n");
2450 #endif
2451 if (((Verbose < 4) || (StatusLog == 0)) && (Quiet != 0))
2452 Status = 0;
2453 if (Status) {
2454 if (-1 == pipe(TermQ))
2455 fatal("could not create termination pipe: %s\n",strerror(errno));
2456 } else {
2457 TermQ[0] = -1;
2458 TermQ[1] = -1;
2460 err = pthread_create(&dest->thread,0,&outputThread,dest);
2461 assert(0 == err);
2462 if (Timeout) {
2463 err = pthread_create(&Watchdog,0,&watchdogThread,(void*)0);
2464 assert(0 == err);
2466 if (Status) {
2467 err = pthread_create(&Reader,0,&inputThread,0);
2468 assert(0 == err);
2469 (void) pthread_sigmask(SIG_UNBLOCK, &signalSet, NULL);
2470 statusThread();
2471 err = pthread_join(Reader,0);
2472 if (err != 0)
2473 errormsg("error joining reader: %s\n",strerror(errno));
2474 } else {
2475 (void) pthread_sigmask(SIG_UNBLOCK, &signalSet, NULL);
2476 (void) inputThread(0);
2477 debugmsg("waiting for output to finish...\n");
2478 if (TermQ[0] != -1) {
2479 err = read(TermQ[0],&null,1);
2480 assert(err == 1);
2483 if (Dest) {
2484 dest_t *d = Dest;
2485 int ret;
2487 infomsg("waiting for senders...\n");
2488 if (Terminate)
2489 cancelAll();
2490 do {
2491 if (d->name) {
2492 void *status;
2493 if (d->arg) {
2494 debugmsg("joining sender for %s\n",d->arg);
2495 } else {
2496 debugmsg("joining hasher for %s\n",d->name);
2498 ret = pthread_join(d->thread,&status);
2499 if (ret != 0)
2500 errormsg("error joining %s: %s\n",d->arg,d->name,strerror(errno));
2501 if (status == 0)
2502 ++numthreads;
2504 d = d->next;
2505 } while (d);
2507 if (Status || Log != STDERR_FILENO)
2508 summary(Numout * Blocksize + Rest, numthreads);
2509 if (Memmap) {
2510 int ret = munmap(Buffer[0],Blocksize*Numblocks);
2511 assert(ret == 0);
2513 if (Tmp != -1)
2514 (void) close(Tmp);
2515 if (Dest) {
2516 dest_t *d = Dest;
2517 do {
2518 dest_t *n = d->next;
2519 if (d->result) {
2520 if (d->arg) {
2521 warningmsg("error during output to %s: %s\n",d->arg,d->result);
2522 } else {
2523 (void) write(STDERR_FILENO,d->result,strlen(d->result));
2524 if (Log != STDERR_FILENO)
2525 (void) write(Log,d->result,strlen(d->result));
2528 free(d);
2529 d = n;
2530 } while (d);
2532 if (ErrorOccurred)
2533 exit(EXIT_FAILURE);
2534 exit(EXIT_SUCCESS);
2537 /* vim:tw=0