2 * Copyright (C) 2000-2014, Thomas Maier-Komor
4 * This is the source code of mbuffer.
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
24 #include <sys/_types.h>
25 #include <cygwin/types.h>
26 #include <cygwin/in.h>
29 #define _GNU_SOURCE 1 /* needed for O_DIRECT */
39 #include <semaphore.h>
46 #include <sys/socket.h>
54 #include <sys/sysctl.h>
58 #ifdef HAVE_SENDFILE_H
59 #include <sys/sendfile.h>
64 #define EBADRQC EINVAL
70 #elif defined HAVE_LIBMD5
72 static MD5_CTX MD5ctxt
;
73 #define MD5_INIT(ctxt) MD5Init(&ctxt);
74 #define MD5_UPDATE(ctxt,at,num) MD5Update(&ctxt,(unsigned char *)(at),(unsigned int)(num))
75 #define MD5_END(hash,ctxt) MD5Final(hash,&(ctxt))
77 #elif defined HAVE_LIBCRYPTO
78 #include <openssl/md5.h>
79 static MD5_CTX MD5ctxt
;
80 #define MD5_INIT(ctxt) MD5_Init(&ctxt);
81 #define MD5_UPDATE(ctxt,at,num) MD5_Update(&ctxt,at,num)
82 #define MD5_END(hash,ctxt) MD5_Final(hash,&(ctxt))
87 * _POSIX_THREADS is only defined if full thread support is available.
88 * We don't need full thread support, so we skip this test...
89 * #ifndef _POSIX_THREADS
90 * #error posix threads are required
95 #ifndef _POSIX_SEMAPHORES
96 #error posix sempahores are required
101 #define LARGEFILE O_LARGEFILE
107 #define DIRECT O_DIRECT
117 /* if this sendfile implementation does not support sending from buffers,
118 disable sendfile support */
121 #warning sendfile is unable to send from buffers
129 In
= -1, WatchdogRaised
= 0;
139 Outsize
= 10240, Pause
= 0, Timeout
= 0;
141 Terminate
= 0, /* abort execution, because of error or signal */
142 EmptyCount
= 0, /* counter incremented when buffer runs empty */
143 FullCount
= 0, /* counter incremented when buffer gets full */
144 NumSenders
= -1,/* number of sender threads */
146 MainOutOK
= 1; /* is the main outputThread still writing or just coordinating senders */
147 static unsigned long long
148 Totalmem
= 0, PgSz
= 0, NumP
= 0, Blocksize
= 10240,
149 MaxReadSpeed
= 0, MaxWriteSpeed
= 0, OutVolsize
= 0;
150 static volatile unsigned long long
151 Rest
= 0, Numin
= 0, Numout
= 0;
153 StartWrite
= 0, StartRead
= 1;
155 *Tmpfile
= 0, **Buffer
;
157 *Infile
= 0, *AutoloadCmd
= 0;
161 Memlock
= 0, TermQ
[2],
162 Memmap
= 0, Quiet
= 0, Status
= 1, StatusLog
= 1,
163 Hashers
= 0, Direct
= 0, SetOutsize
= 0;
165 NumVolumes
= 1, /* number of input volumes, 0 for interactive prompting */
166 Finish
= -1, /* this is for graceful termination */
167 Numblocks
= 512; /* number of buffer blocks */
172 ClockSrc
= CLOCK_REALTIME
;
177 #define sem_init(a,b,c) sema_init(a,c,USYNC_THREAD,0)
178 #define sem_post sema_post
179 #define sem_getvalue(a,b) ((*(b) = (a)->count), 0)
180 #if defined(__SunOS_5_8) || defined(__SunOS_5_9)
181 #define sem_wait SemWait
182 int SemWait(sema_t
*s
)
187 } while (err
== EINTR
);
191 #define sem_wait sema_wait
195 static sem_t Dev2Buf
, Buf2Dev
;
196 static pthread_cond_t
197 PercLow
= PTHREAD_COND_INITIALIZER
, /* low watermark */
198 PercHigh
= PTHREAD_COND_INITIALIZER
, /* high watermark */
199 SendCond
= PTHREAD_COND_INITIALIZER
;
200 static pthread_mutex_t
201 TermMut
= PTHREAD_MUTEX_INITIALIZER
, /* prevents statusThread from interfering with request*Volume */
202 LowMut
= PTHREAD_MUTEX_INITIALIZER
,
203 HighMut
= PTHREAD_MUTEX_INITIALIZER
,
204 SendMut
= PTHREAD_MUTEX_INITIALIZER
;
205 static int Terminal
= 1, Autoloader
= 0;
206 static struct timeval Starttime
;
207 static dest_t
*Dest
= 0;
208 static char *volatile SendAt
= 0;
209 static volatile int SendSize
= 0, ActSenders
= 0;
214 #define assert(x) ((x) || (*(char *) 0 = 1))
219 static int kb2str(char *s
, double v
)
221 const char *dim
= "KMGT", *f
;
223 while (v
> 10000.0) {
227 v
*= 1024.0*1024.0*1024.0*1024.0;
235 else if (v
< 10000) {
240 return sprintf(s
,f
,v
,*dim
);
245 static void summary(unsigned long long numb
, int numthreads
)
249 char buf
[256], *msg
= buf
;
252 (void) gettimeofday(&now
,0);
255 if ((Terminate
== 1) && (numthreads
== 0))
257 secs
= now
.tv_sec
- Starttime
.tv_sec
+ (double) now
.tv_usec
/ 1000000 - (double) Starttime
.tv_usec
/ 1000000;
260 av
= (double)(numb
)/secs
*numthreads
;
262 m
= (int) (secs
- h
* 3600)/60;
263 secs
-= m
* 60 + h
* 3600;
265 msg
+= sprintf(msg
,"summary: %dx ",numthreads
);
267 msg
+= sprintf(msg
,"summary: ");
268 msg
+= kb2str(msg
,numb
);
269 msg
+= sprintf(msg
,"Byte in ");
271 msg
+= sprintf(msg
,"%dh %02dmin %04.1fsec - average of ",h
,m
,secs
);
273 msg
+= sprintf(msg
,"%2dmin %04.1fsec - average of ",m
,secs
);
275 msg
+= sprintf(msg
,"%4.1fsec - average of ",secs
);
276 msg
+= kb2str(msg
,av
);
277 msg
+= sprintf(msg
,"B/s");
279 msg
+= sprintf(msg
,", %dx empty",EmptyCount
);
281 msg
+= sprintf(msg
,", %dx full",FullCount
);
284 if (Log
!= STDERR_FILENO
)
285 (void) write(Log
,buf
,msg
-buf
);
287 (void) write(STDERR_FILENO
,buf
,msg
-buf
);
292 static void cancelAll(void)
296 (void) pthread_cancel(d
->thread
);
298 d
->result
= "canceled";
302 (void) pthread_cancel(Reader
);
307 static RETSIGTYPE
sigHandler(int signr
)
315 if (TermQ
[1] != -1) {
316 (void) write(TermQ
[1],"0",1);
319 (void) pthread_cond_signal(&PercHigh
);
321 (void) pthread_cond_signal(&PercLow
);
324 (void) raise(SIGABRT
);
330 /* Thread-safe replacement for usleep. Argument must be a whole
331 * number of microseconds to sleep.
333 static int mt_usleep(unsigned long sleep_usecs
)
336 tv
.tv_sec
= sleep_usecs
/ 1000000;
337 tv
.tv_nsec
= (sleep_usecs
% 1000000) * 1000;
340 /* Sleep for the time specified in tv. If interrupted by a
341 * signal, place the remaining time left to sleep back into tv.
343 if (0 == nanosleep(&tv
, &tv
))
345 } while (errno
== EINTR
);
351 static void *watchdogThread(void *ignored
)
353 unsigned long ni
= Numin
, no
= Numout
;
356 if ((ni
== Numin
) && (Finish
== -1)) {
357 errormsg("watchdog timeout: input stalled; sending SIGINT\n");
359 kill(getpid(),SIGINT
);
362 errormsg("watchdog timeout: output stalled; sending SIGINT\n");
364 kill(getpid(),SIGINT
);
370 return 0; // suppresses a gcc warning
375 static void statusThread(void)
377 struct timeval last
, now
;
378 double in
= 0, out
= 0, total
, diff
, fill
;
379 unsigned long long lin
= 0, lout
= 0;
380 int unwritten
= 1; /* assumption: initially there is at least one unwritten block */
382 struct timeval timeout
= {0,200000};
387 (void) mt_usleep(1000); /* needed on alpha (stderr fails with fpe on nan) */
391 while ((Numin
== 0) && (Terminate
== 0) && (Finish
== -1)) {
393 timeout
.tv_usec
= 200000;
396 FD_SET(TermQ
[0],&readfds
);
397 switch (select(maxfd
,&readfds
,0,0,&timeout
)) {
409 char buf
[256], *b
= buf
;
412 timeout
.tv_usec
= 500000;
415 FD_SET(TermQ
[0],&readfds
);
416 err
= select(maxfd
,&readfds
,0,0,&timeout
);
425 (void) gettimeofday(&now
,0);
426 diff
= now
.tv_sec
- last
.tv_sec
+ (double) (now
.tv_usec
- last
.tv_usec
) / 1000000;
427 err
= pthread_mutex_lock(&TermMut
);
429 err
= sem_getvalue(&Buf2Dev
,&unwritten
);
431 fill
= (double)unwritten
/ (double)Numblocks
* 100.0;
432 in
= (double)(((Numin
- lin
) * Blocksize
) >> 10);
434 out
= (double)(((Numout
- lout
) * Blocksize
) >> 10);
439 total
= (double)((Numout
* Blocksize
) >> 10);
440 fill
= (fill
< 0.0) ? 0.0 : fill
;
441 b
+= sprintf(b
,"\rin @ ");
443 numsender
= NumSenders
+ MainOutOK
- Hashers
;
444 b
+= sprintf(b
,"B/s, out @ ");
445 b
+= kb2str(b
, out
* numsender
);
447 b
+= sprintf(b
,"B/s, %d x ",numsender
);
449 b
+= sprintf(b
,"B/s, ");
450 b
+= kb2str(b
,total
);
451 b
+= sprintf(b
,"B total, buffer %3.0f%% full",fill
);
453 #ifdef NEED_IO_INTERLOCK
454 if (Log
== STDERR_FILENO
) {
456 e
= pthread_mutex_lock(&LogMut
);
458 nw
= write(STDERR_FILENO
,buf
,strlen(buf
));
459 e
= pthread_mutex_unlock(&LogMut
);
463 nw
= write(STDERR_FILENO
,buf
,strlen(buf
));
465 if ((StatusLog
!= 0) && (Log
!= STDERR_FILENO
))
466 infomsg("%s\n",buf
+1);
467 err
= pthread_mutex_unlock(&TermMut
);
469 if (nw
== -1) /* stop trying to print status messages after a write error */
476 static inline long long timediff(struct timespec
*restrict t1
, struct timespec
*restrict t2
)
479 tdiff
= (t1
->tv_sec
- t2
->tv_sec
) * 1000000;
480 tdiff
+= (t1
->tv_nsec
- t2
->tv_nsec
) / 1000;
488 static long long enforceSpeedLimit(unsigned long long limit
, long long num
, struct timespec
*last
)
493 long self
= (long) pthread_self();
497 debugmsg("enforceSpeedLimit(%lld,%lld): thread %ld\n",limit
,num
,self
);
500 (void) clock_gettime(ClockSrc
,&now
);
501 tdiff
= timediff(&now
,last
);
502 dt
= (double)tdiff
* 1E-6;
503 if (((double)num
/dt
) > (double)limit
) {
504 double req
= (double)num
/limit
- dt
;
505 long long w
= (long long) (req
* 1E6
);
507 long long slept
, ret
;
509 (void) clock_gettime(ClockSrc
,last
);
510 slept
= timediff(last
,&now
);
511 ret
= -(long long)((double)limit
* (double)(slept
-w
) * 1E-6);
512 debugmsg("thread %ld: slept for %lld usec (planned for %lld), ret = %lld\n",self
,slept
,w
,ret
);
515 debugmsg("thread %ld: request for sleeping %lld usec delayed\n",self
,w
);
517 * Sleeping now would cause too much of a slowdown. So
518 * we defer this sleep until the sleeping time is
519 * longer than the tick time. Like this we can stay as
520 * close to the speed limit as possible.
525 debugmsg("thread %ld: %lld/%g (%g) <= %g\n",self
,num
,dt
,num
/dt
,(double)limit
);
531 static int promptInteractive(unsigned at
, unsigned num
)
533 static const char prompt
[] = "\nContinue with next volume? Press 'y' to continue or 'n' to finish...";
534 static const char contmsg
[] = "\nyes - continuing with next volume...\n";
535 static const char donemsg
[] = "\nno - input done, waiting for output to finish...\n";
538 err
= pthread_mutex_lock(&TermMut
);
540 if (-1 == write(STDERR_FILENO
,prompt
,sizeof(prompt
))) {
541 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno
));
543 pthread_exit((void *) -1);
547 if (-1 == read(STDERR_FILENO
,&c
,1) && (errno
!= EINTR
)) {
548 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno
));
550 pthread_exit((void *) -1);
552 debugmsg("prompt input %c\n",c
);
557 (void) write(STDERR_FILENO
,donemsg
,sizeof(donemsg
));
558 err
= pthread_mutex_lock(&HighMut
);
560 err
= sem_post(&Buf2Dev
);
562 err
= pthread_cond_signal(&PercHigh
);
564 err
= pthread_mutex_unlock(&HighMut
);
566 err
= pthread_mutex_unlock(&TermMut
);
574 (void) write(STDERR_FILENO
,contmsg
,sizeof(contmsg
));
575 err
= pthread_mutex_unlock(&TermMut
);
585 static int requestInputVolume(unsigned at
, unsigned num
)
587 static struct timeval volstart
= {0,0};
592 char cmd_buf
[15+strlen(Infile
)];
594 debugmsg("requesting new volume for input\n");
595 (void) gettimeofday(&now
,0);
597 diff
= now
.tv_sec
- volstart
.tv_sec
+ (double) (now
.tv_usec
- volstart
.tv_usec
) * 1E-6;
599 diff
= now
.tv_sec
- Starttime
.tv_sec
+ (double) (now
.tv_usec
- Starttime
.tv_usec
) * 1E-6;
601 hr
= (unsigned) (diff
/ 3600);
603 min
= (unsigned) (diff
/ 60);
605 infomsg("time for reading volume: %u:%02u:%02f\n",hr
,min
,diff
);
606 } else if (diff
> 60) {
607 min
= (unsigned) (diff
/ 60);
609 infomsg("time for reading volume: %02u:%02f\n",min
,diff
);
611 infomsg("time for reading volume: %02fsec.\n",diff
);
613 errormsg("error closing input: %s\n",strerror(errno
));
615 if ((Autoloader
) && (Infile
)) {
620 (void) snprintf(cmd_buf
, sizeof(cmd_buf
), "mt -f %s offline", Infile
);
623 infomsg("requesting new input volume with command '%s'\n",cmd
);
626 warningmsg("error running \"%s\" to change volume in autoloader: exitcode %d\n",cmd
,ret
);
628 pthread_exit((void *) 0);
629 } else if (0 > ret
) {
630 errormsg("error starting \"%s\" to change volume in autoloader: %s\n", cmd
, strerror(errno
));
632 pthread_exit((void *) -1);
635 infomsg("waiting for drive to get ready...\n");
636 (void) sleep(AutoloadTime
);
639 if (0 == promptInteractive(at
,num
))
642 In
= open(Infile
, O_RDONLY
| LARGEFILE
| Direct
);
643 if ((-1 == In
) && (errno
== EINVAL
))
644 In
= open(Infile
, O_RDONLY
| Direct
);
646 errormsg("could not reopen input: %s\n",strerror(errno
));
648 if (-1 == directio(In
,DIRECTIO_ON
))
649 infomsg("direct I/O hinting failed for input: %s\n",strerror(errno
));
652 (void) gettimeofday(&volstart
,0);
653 diff
= volstart
.tv_sec
- now
.tv_sec
+ (double) (volstart
.tv_usec
- now
.tv_usec
) * 1E-6;
654 infomsg("tape-change took %fsec. - continuing with next volume\n",diff
);
656 if (Terminal
&& ! Autoloader
) {
657 char msg
[] = "\nOK - continuing...\n";
658 (void) write(STDERR_FILENO
,msg
,sizeof(msg
));
665 static void releaseLock(void *l
)
667 int err
= pthread_mutex_unlock((pthread_mutex_t
*)l
);
673 static void *inputThread(void *ignored
)
676 unsigned long long num
;
679 const double startread
= StartRead
, startwrite
= StartWrite
;
680 struct timespec last
;
682 int maxfd
= TermQ
[0] > In
? TermQ
[0] + 1 : In
+ 1;
685 assert(TermQ
[0] != -1);
687 (void) clock_gettime(ClockSrc
,&last
);
688 assert(ignored
== 0);
689 infomsg("inputThread: starting with threadid %ld...\n",(long)pthread_self());
694 err
= pthread_mutex_lock(&LowMut
);
696 err
= sem_getvalue(&Buf2Dev
,&fill
);
698 if (fill
== Numblocks
- 1) {
699 debugmsg("inputThread: buffer full, waiting for it to drain.\n");
700 pthread_cleanup_push(releaseLock
,&LowMut
);
701 err
= pthread_cond_wait(&PercLow
,&LowMut
);
703 pthread_cleanup_pop(0);
705 debugmsg("inputThread: low watermark reached, continuing...\n");
707 err
= pthread_mutex_unlock(&LowMut
);
710 if (Terminate
) { /* for async termination requests */
711 debugmsg("inputThread: terminating early upon request...\n");
713 errormsg("error closing input: %s\n",strerror(errno
));
715 pthread_exit((void *)1);
718 err
= sem_wait(&Dev2Buf
); /* Wait for one or more buffer blocks to be free */
727 FD_SET(TermQ
[0],&readfds
);
729 err
= select(maxfd
,&readfds
,0,0,0);
730 debugiomsg("inputThread: select(%d, {%d,%d}, 0, 0, 0) = %d\n", maxfd
,In
,TermQ
[0],err
);
731 assert((err
> 0) || (errno
== EBADF
));
732 if (FD_ISSET(TermQ
[0],&readfds
))
734 assert(FD_ISSET(In
,&readfds
));
737 in
= read(In
,Buffer
[at
] + num
,Blocksize
- num
);
738 debugiomsg("inputThread: read(In, Buffer[%d] + %llu, %llu) = %d\n", at
, num
, Blocksize
- num
, in
);
741 } else if ((0 == in
) && (Terminal
||Autoloader
) && (NumVolumes
!= 1)) {
742 if (0 == requestInputVolume(at
,num
))
744 } else if ((-1 == in
) && (errno
== EIO
) && (Terminal
||Autoloader
) && (NumVolumes
!= 1)) {
745 requestInputVolume(at
,num
);
746 } else if (in
<= 0) {
747 /* error or end-of-file */
748 if ((-1 == in
) && (Terminate
== 0))
749 errormsg("inputThread: error reading at offset 0x%llx: %s\n",Numin
*Blocksize
,strerror(errno
));
752 debugmsg("inputThread: last block has %llu bytes\n",num
);
753 err
= pthread_mutex_lock(&HighMut
);
755 err
= sem_post(&Buf2Dev
);
757 err
= pthread_cond_signal(&PercHigh
);
759 err
= pthread_mutex_unlock(&HighMut
);
761 infomsg("inputThread: exiting...\n");
763 pthread_exit((void *) in
);
766 } while (num
< Blocksize
);
768 xfer
= enforceSpeedLimit(MaxReadSpeed
,xfer
,&last
);
769 err
= sem_post(&Buf2Dev
);
771 if (startwrite
> 0) {
772 err
= pthread_mutex_lock(&HighMut
);
774 err
= sem_getvalue(&Buf2Dev
,&fill
);
776 if (((double) fill
/ (double) Numblocks
) + DBL_EPSILON
>= startwrite
) {
777 err
= pthread_cond_signal(&PercHigh
);
780 err
= pthread_mutex_unlock(&HighMut
);
783 if (++at
== Numblocks
)
791 static inline int syncSenders(char *b
, int s
)
793 static volatile int size
= 0, skipped
= 0;
794 static char *volatile buf
= 0;
797 err
= pthread_mutex_lock(&SendMut
);
806 debugiomsg("syncSenders(%p,%d): ActSenders = %d\n",b
,s
,ActSenders
);
807 pthread_cleanup_push(releaseLock
,&SendMut
);
808 err
= pthread_cond_wait(&SendCond
,&SendMut
);
810 pthread_cleanup_pop(1);
811 debugiomsg("syncSenders(): continue\n");
814 ActSenders
= NumSenders
+ 1;
815 assert((buf
!= 0) || Terminate
);
820 // after the first time, always give a buffer free after sync
821 err
= sem_post(&Dev2Buf
);
824 // the first time no buffer has been given free
827 err
= pthread_mutex_unlock(&SendMut
);
829 debugiomsg("syncSenders(): send %d@%p, BROADCAST\n",SendSize
,SendAt
);
830 err
= pthread_cond_broadcast(&SendCond
);
838 static inline void terminateSender(int fd
, dest_t
*d
, int ret
)
840 debugmsg("terminating operation on %s\n",d
->arg
);
843 infomsg("syncing %s...\n",d
->arg
);
846 while ((err
!= 0) && (errno
== EINTR
));
848 if ((errno
== EINVAL
) || (errno
== EBADRQC
)) {
849 infomsg("syncing unsupported on %s: omitted.\n",d
->arg
);
851 warningmsg("unable to sync %s: %s\n",d
->arg
,strerror(errno
));
855 errormsg("error closing file %s: %s\n",d
->arg
,strerror(errno
));
858 ret
= syncSenders(0,-1);
859 debugmsg("terminateSender(%s): sendSender(0,-1) = %d\n",d
->arg
,ret
);
861 pthread_exit((void *) ret
);
866 static void *senderThread(void *arg
)
868 unsigned long long outsize
= Blocksize
;
869 dest_t
*dest
= (dest_t
*)arg
;
874 #ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
877 debugmsg("sender(%s): checking output device...\n",dest
->arg
);
878 if (-1 == fstat(out
,&st
))
879 warningmsg("could not stat output %s: %s\n",dest
->arg
,strerror(errno
));
880 else if (S_ISBLK(st
.st_mode
) || S_ISCHR(st
.st_mode
)) {
881 infomsg("blocksize is %d bytes on output device\n",st
.st_blksize
);
882 if ((Blocksize
< st
.st_blksize
) || (Blocksize
% st
.st_blksize
!= 0)) {
883 warningmsg("Blocksize should be a multiple of the blocksize of the output device!\n"
884 "This can cause problems with some device/OS combinations...\n"
885 "Blocksize on output device %s is %d (transfer block size is %lld)\n", dest
->arg
, st
.st_blksize
, Blocksize
);
887 errormsg("unable to set output blocksize\n");
888 dest
->result
= strerror(errno
);
889 terminateSender(out
,dest
,1);
893 infomsg("setting output blocksize to %d\n",st
.st_blksize
);
894 outsize
= st
.st_blksize
;
898 infomsg("no device on output stream %s\n",dest
->arg
);
900 debugmsg("sender(%s): starting...\n",dest
->arg
);
903 (void) syncSenders(0,0);
906 debugmsg("senderThread(\"%s\"): done.\n",dest
->arg
);
907 terminateSender(out
,dest
,0);
908 return 0; /* for lint */
911 infomsg("senderThread(\"%s\"): terminating early upon request...\n",dest
->arg
);
912 dest
->result
= "canceled";
913 terminateSender(out
,dest
,1);
916 unsigned long long rest
= size
- num
;
921 off_t baddr
= (off_t
) (SendAt
+num
);
922 unsigned long long n
= SetOutsize
? (rest
> Outsize
? (rest
/Outsize
)*Outsize
: rest
) : rest
;
923 ret
= sendfile(out
,SFV_FD_SELF
,&baddr
,n
);
924 debugiomsg("sender(%s): sendfile(%d, SFV_FD_SELF, &%p, %llu) = %d\n", dest
->arg
, dest
->fd
, (void*)baddr
, n
, ret
);
925 if ((ret
== -1) && ((errno
== EINVAL
) || (errno
== EOPNOTSUPP
))) {
927 debugmsg("sender(%s): sendfile unsupported - falling back to write\n", dest
->arg
);
933 char *baddr
= SendAt
+num
;
934 ret
= write(out
,baddr
,rest
> outsize
? outsize
:rest
);
935 debugiomsg("sender(%s): writing %llu@0x%p: ret = %d\n",dest
->arg
,rest
,(void*)baddr
,ret
);
938 errormsg("error writing to %s: %s\n",dest
->arg
,strerror(errno
));
939 dest
->result
= strerror(errno
);
940 terminateSender(out
,dest
,1);
943 } while (num
!= size
);
949 static void *hashThread(void *arg
)
952 dest_t
*dest
= (dest_t
*) arg
;
956 MHASH ctxt
= mhash_init(algo
);
957 assert(ctxt
!= MHASH_FAILED
);
961 debugmsg("hashThread(): starting...\n");
965 (void) syncSenders(0,0);
969 unsigned char hashvalue
[128];
976 debugmsg("hashThread(): done.\n");
978 mhash_deinit(ctxt
,hashvalue
);
979 an
= (const char *) mhash_get_hash_name_static(algo
);
980 ds
= mhash_get_block_size(algo
);
982 MD5_END(hashvalue
,MD5ctxt
);
986 assert(sizeof(hashvalue
) >= ds
);
987 m
+= sprintf(m
,"%s hash: ",an
);
988 for (i
= 0; i
< ds
; ++i
)
989 m
+= sprintf(m
,"%02x",(unsigned int)hashvalue
[i
]);
993 pthread_exit((void *) msg
);
994 return 0; /* for lint */
997 (void) syncSenders(0,-1);
998 infomsg("hashThread(): terminating early upon request...\n");
999 pthread_exit((void *) 0);
1001 debugiomsg("hashThread(): hashing %d@0x%p\n",size
,(void*)SendAt
);
1002 #ifdef HAVE_LIBMHASH
1003 mhash(ctxt
,SendAt
,size
);
1005 MD5_UPDATE(MD5ctxt
,SendAt
,size
);
1012 static int requestOutputVolume(int out
, const char *outfile
)
1014 static struct timeval volstart
= {0,0};
1020 errormsg("End of volume, but not end of input:\n"
1021 "Output file must be given (option -o) for multi volume support!\n");
1024 infomsg("end of volume - last block on volume: %lld\n",Numout
);
1025 (void) gettimeofday(&now
,0);
1026 if (volstart
.tv_sec
)
1027 diff
= now
.tv_sec
- volstart
.tv_sec
+ (double) (now
.tv_usec
- volstart
.tv_usec
) * 1E-6;
1029 diff
= now
.tv_sec
- Starttime
.tv_sec
+ (double) (now
.tv_usec
- Starttime
.tv_usec
) * 1E-6;
1031 hr
= (unsigned) (diff
/ 3600);
1033 min
= (unsigned) (diff
/ 60);
1035 infomsg("time for writing volume: %u:%02u:%02f\n",hr
,min
,diff
);
1036 } else if (diff
> 60) {
1037 min
= (unsigned) (diff
/ 60);
1039 infomsg("time for writing volume: %02u:%02f\n",min
,diff
);
1041 infomsg("time for writing volume: %02fsec.\n",diff
);
1042 if (-1 == close(out
))
1043 errormsg("error closing output %s: %s\n",outfile
,strerror(errno
));
1047 const char default_cmd
[] = "mt -f %s offline";
1048 char cmd_buf
[sizeof(default_cmd
)+strlen(outfile
)];
1049 const char *cmd
= AutoloadCmd
;
1053 (void) snprintf(cmd_buf
, sizeof(cmd_buf
), default_cmd
, Infile
);
1056 infomsg("requesting new output volume with command '%s'\n",cmd
);
1059 errormsg("error running \"%s\" to change volume in autoloader - exitcode %d\n", cmd
, err
);
1062 } else if (0 > err
) {
1063 errormsg("error starting \"%s\" to change volume in autoloader: %s\n", cmd
, strerror(errno
));
1068 infomsg("waiting for drive to get ready...\n");
1069 (void) sleep(AutoloadTime
);
1073 char c
= 0, msg
[] = "\nvolume full - insert new media and press return when ready...\n";
1074 if (Terminal
== 0) {
1075 errormsg("End of volume, but not end of input.\n"
1076 "Specify an autoload command, if you are working without terminal.\n");
1079 err
= pthread_mutex_lock(&TermMut
);
1081 if (-1 == write(STDERR_FILENO
,msg
,sizeof(msg
))) {
1082 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno
));
1086 if (-1 == read(STDERR_FILENO
,&c
,1) && (errno
!= EINTR
)) {
1087 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno
));
1090 } while (c
!= '\n');
1091 err
= pthread_mutex_unlock(&TermMut
);
1094 mode
= O_WRONLY
|O_TRUNC
|OptSync
|LARGEFILE
|Direct
;
1095 if (strncmp(outfile
,"/dev/",5))
1097 out
= open(outfile
,mode
,0666);
1099 errormsg("error reopening output file: %s\n",strerror(errno
));
1101 if (-1 == directio(out
,DIRECTIO_ON
))
1102 infomsg("direct I/O hinting failed for output: %s\n",strerror(errno
));
1104 } while (-1 == out
);
1105 (void) gettimeofday(&volstart
,0);
1106 diff
= volstart
.tv_sec
- now
.tv_sec
+ (double) (volstart
.tv_usec
- now
.tv_usec
) * 1E-6;
1107 infomsg("tape-change took %fsec. - continuing with next volume\n",diff
);
1108 if (Terminal
&& ! Autoloader
) {
1109 char msg
[] = "\nOK - continuing...\n";
1110 (void) write(STDERR_FILENO
,msg
,sizeof(msg
));
1117 static void terminateOutputThread(dest_t
*d
, int status
)
1121 infomsg("outputThread: syncing %s...\n",d
->arg
);
1124 while ((err
!= 0) && (errno
== EINTR
));
1126 if ((errno
== EINVAL
) || (errno
== EBADRQC
)) {
1127 infomsg("syncing unsupported on %s: omitted.\n",d
->arg
);
1129 warningmsg("unable to sync %s: %s\n",d
->arg
,strerror(errno
));
1132 infomsg("outputThread: finished - exiting...\n");
1133 if (-1 == close(d
->fd
))
1134 errormsg("error closing %s: %s\n",d
->arg
,strerror(errno
));
1135 if (TermQ
[1] != -1) {
1136 err
= write(TermQ
[1],"0",1);
1138 errormsg("error writing to termination queue: %s\n",strerror(errno
));
1141 (void) sem_post(&Dev2Buf
);
1142 (void) pthread_cond_broadcast(&SendCond
);
1145 pthread_exit((void *)status
);
1150 static void *outputThread(void *arg
)
1152 dest_t
*dest
= (dest_t
*) arg
;
1154 int fill
= 0, haderror
= 0, out
, multipleSenders
;
1155 #ifdef HAVE_SENDFILE
1158 const double startwrite
= StartWrite
, startread
= StartRead
;
1159 unsigned long long blocksize
= Blocksize
;
1161 struct timespec last
;
1163 assert(NumSenders
>= 0);
1166 dest_t
*d
= dest
->next
;
1167 debugmsg("NumSenders = %d\n",NumSenders
);
1168 ActSenders
= NumSenders
+ 1;
1169 ret
= pthread_mutex_init(&SendMut
,0);
1171 ret
= pthread_cond_init(&SendCond
,0);
1175 debugmsg("creating hash thread with algorithm %s\n",d
->name
);
1176 ret
= pthread_create(&d
->thread
,0,hashThread
,d
);
1178 } else if (d
->fd
!= -1) {
1179 debugmsg("creating sender for %s\n",d
->arg
);
1180 ret
= pthread_create(&d
->thread
,0,senderThread
,d
);
1183 debugmsg("outputThread: ignoring destination %s\n",d
->arg
);
1189 multipleSenders
= (NumSenders
> 0);
1192 if (startwrite
> 0) {
1194 err
= pthread_mutex_lock(&HighMut
);
1196 debugmsg("outputThread: delaying start until buffer reaches high watermark\n");
1197 pthread_cleanup_push(releaseLock
,&HighMut
);
1198 err
= pthread_cond_wait(&PercHigh
,&HighMut
);
1200 pthread_cleanup_pop(0);
1201 debugmsg("outputThread: high watermark reached, starting...\n");
1202 err
= pthread_mutex_unlock(&HighMut
);
1205 infomsg("outputThread: starting output on %s...\n",dest
->arg
);
1206 /* initialize last to 0, because we don't want to wait initially */
1207 (void) clock_gettime(ClockSrc
,&last
);
1209 unsigned long long rest
= blocksize
;
1212 if ((startwrite
> 0) && (fill
<= 0)) {
1214 err
= pthread_mutex_lock(&HighMut
);
1216 err
= sem_getvalue(&Buf2Dev
,&fill
);
1219 debugmsg("outputThread: buffer empty, waiting for it to fill\n");
1220 pthread_cleanup_push(releaseLock
,&HighMut
);
1221 err
= pthread_cond_wait(&PercHigh
,&HighMut
);
1223 pthread_cleanup_pop(0);
1225 debugmsg("outputThread: high watermark reached, continuing...\n");
1226 (void) clock_gettime(ClockSrc
,&last
);
1228 err
= pthread_mutex_unlock(&HighMut
);
1232 err
= sem_wait(&Buf2Dev
);
1235 infomsg("outputThread: terminating upon termination request...\n");
1236 dest
->result
= "canceled";
1237 terminateOutputThread(dest
,1);
1240 err
= sem_getvalue(&Buf2Dev
,&fill
);
1242 if ((fill
== 0) && (0 == Rest
)) {
1243 if (multipleSenders
)
1244 (void) syncSenders((char*)0xdeadbeef,0);
1245 infomsg("outputThread: finished - exiting...\n");
1246 terminateOutputThread(dest
,haderror
);
1248 blocksize
= rest
= Rest
;
1249 debugmsg("outputThread: last block has %llu bytes\n",(unsigned long long)Rest
);
1252 if (multipleSenders
)
1253 (void) syncSenders(Buffer
[at
],blocksize
);
1254 /* switch output volume if -D <size> has been reached */
1255 if ( (OutVolsize
!= 0) && (Numout
> 0) && (Numout
% (OutVolsize
/Blocksize
)) == 0 ) {
1256 /* Sleep to let status thread "catch up" so that the displayed total is a multiple of OutVolsize */
1257 (void) mt_usleep(500000);
1258 out
= requestOutputVolume(out
,dest
->name
);
1261 dest
->result
= strerror(errno
);
1265 /* use Outsize which could be the blocksize of the device (option -d) */
1266 unsigned long long n
= rest
> Outsize
? Outsize
: rest
;
1269 if (NumSenders
== 0)
1273 #ifdef HAVE_SENDFILE
1275 off_t baddr
= (off_t
) (Buffer
[at
] + blocksize
- rest
);
1276 num
= sendfile(out
,SFV_FD_SELF
,&baddr
,n
);
1277 debugiomsg("outputThread: sendfile(%d, SFV_FD_SELF, &(Buffer[%d] + %llu), %llu) = %d\n", out
, at
, blocksize
- rest
, n
, num
);
1278 if ((num
== -1) && ((errno
== EOPNOTSUPP
) || (errno
== EINVAL
))) {
1279 infomsg("sendfile not supported - falling back to write...\n");
1286 num
= write(out
,Buffer
[at
] + blocksize
- rest
, n
);
1287 debugiomsg("outputThread: writing %lld@0x%p: ret = %d\n", n
, Buffer
[at
] + blocksize
- rest
, num
);
1289 if (Terminal
||Autoloader
) {
1290 if (((-1 == num
) && ((errno
== ENOMEM
) || (errno
== ENOSPC
)))
1292 /* request a new volume */
1293 out
= requestOutputVolume(out
,dest
->name
);
1298 } else if (-1 == num
) {
1299 dest
->result
= strerror(errno
);
1300 errormsg("outputThread: error writing to %s at offset 0x%llx: %s\n",dest
->arg
,(long long)Blocksize
*Numout
+blocksize
-rest
,strerror(errno
));
1302 if (NumSenders
== 0) {
1303 debugmsg("outputThread: terminating...\n");
1305 err
= sem_post(&Dev2Buf
);
1307 terminateOutputThread(dest
,1);
1309 debugmsg("outputThread: %d senders remaining - continuing...\n",NumSenders
);
1314 if (multipleSenders
== 0) {
1315 err
= sem_post(&Dev2Buf
);
1319 xfer
= enforceSpeedLimit(MaxWriteSpeed
,xfer
,&last
);
1321 (void) mt_usleep(Pause
);
1323 err
= sem_getvalue(&Buf2Dev
,&fill
);
1326 if (multipleSenders
)
1327 (void) syncSenders((char*)0xdeadbeef,0);
1328 terminateOutputThread(dest
,0);
1329 return 0; /* make lint happy */
1332 if (Numblocks
== ++at
)
1334 if (startread
< 1) {
1335 err
= pthread_mutex_lock(&LowMut
);
1337 err
= sem_getvalue(&Buf2Dev
,&fill
);
1339 if (((double)fill
/ (double)Numblocks
) < startread
) {
1340 err
= pthread_cond_signal(&PercLow
);
1343 err
= pthread_mutex_unlock(&LowMut
);
1352 static void version(void)
1354 (void) fprintf(stderr
,
1355 "mbuffer version "PACKAGE_VERSION
"\n"\
1356 "Copyright 2001-2014 - T. Maier-Komor\n"\
1357 "License: GPLv3 - see file LICENSE\n"\
1358 "This program comes with ABSOLUTELY NO WARRANTY!!!\n"
1359 "Donations via PayPal to thomas@maier-komor.de are welcome and support this work!\n"
1367 static void usage(void)
1369 const char *dim
= "bkMGTP";
1370 unsigned long long m
= Numblocks
* Blocksize
;
1371 while (m
>= 10000) {
1375 (void) fprintf(stderr
,
1376 "usage: mbuffer [Options]\n"
1378 "-b <num> : use <num> blocks for buffer (default: %ld)\n"
1379 "-s <size> : use blocks of <size> bytes for processing (default: %llu)\n"
1380 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__) || defined(__FreeBSD__)
1381 "-m <size> : memory <size> of buffer in b,k,M,G,%% (default: 2%% = %llu%c)\n"
1383 "-m <size> : memory <size> of buffer in b,k,M,G,%% (default: %llu%c)\n"
1385 #ifdef _POSIX_MEMLOCK_RANGE
1386 "-L : lock buffer in memory (unusable with file based buffers)\n"
1388 "-d : use blocksize of device for output\n"
1389 "-D <size> : assumed output device size (default: infinite/auto-detect)\n"
1390 "-P <num> : start writing after buffer has been filled more than <num>%%\n"
1391 "-p <num> : start reading after buffer has been filled less than <num>%%\n"
1392 "-i <file> : use <file> for input\n"
1393 "-o <file> : use <file> for output (this option can be passed MULTIPLE times)\n"
1394 "--append : append to output file (must be passed before -o)\n"
1395 "--truncate : truncate next file (must be passed before -o)\n"
1396 "-I <h>:<p> : use network port <port> as input, allow only host <h> to connect\n"
1397 "-I <p> : use network port <port> as input\n"
1398 "-O <h>:<p> : output data to host <h> and port <p> (MUTLIPLE outputs supported)\n"
1399 "-n <num> : <num> volumes for input, '0' to prompt interactively\n"
1400 "-t : use memory mapped temporary file (for huge buffer)\n"
1401 "-T <file> : as -t but uses <file> as buffer\n"
1402 "-l <file> : use <file> for logging messages\n"
1403 "-u <num> : pause <num> milliseconds after each write\n"
1404 "-r <rate> : limit read rate to <rate> B/s, where <rate> can be given in b,k,M,G\n"
1405 "-R <rate> : same as -r for writing; use eiter one, if your tape is too fast\n"
1406 "-f : overwrite existing files\n"
1407 "-a <time> : autoloader which needs <time> seconds to reload\n"
1408 "-A <cmd> : issue command <cmd> to request new volume\n"
1409 "-v <level> : set verbose level to <level> (valid values are 0..6)\n"
1410 "-q : quiet - do not display the status on stderr\n"
1411 "-Q : quiet - do not log the status\n"
1412 "-c : write with synchronous data integrity support\n"
1413 "-e : stop processing on any kind of error\n"
1415 "--direct : open input and output with O_DIRECT\n"
1417 #if defined HAVE_LIBCRYPTO || defined HAVE_LIBMD5 || defined HAVE_LIBMHASH
1419 "--md5 : generate md5 hash of transfered data\n"
1420 "--hash <a> : use alogritm <a>, if <a> is 'list' possible algorithms are listed\n"
1421 "--pid : print PID of this instance\n"
1422 "-W <time> : set watchdog timeout to <time> seconds\n"
1424 "-4 : force use of IPv4\n"
1425 "-6 : force use of IPv6\n"
1426 "-0 : use IPv4 or IPv6\n"
1427 "--tcpbuffer: size for TCP buffer\n"
1429 "--version : print version information\n"
1430 "Unsupported buffer options: -t -Z -B\n"
1440 static unsigned long long calcint(const char **argv
, int c
, unsigned long long def
)
1443 double d
= (double)def
;
1445 switch (sscanf(argv
[c
],"%lf%c",&d
,&ch
)) {
1451 fatal("invalid argument - must be > 0\n");
1456 return (unsigned long long) d
;
1460 return (unsigned long long) d
;
1463 d
*= 1024.0*1024.0*1024.0;
1464 return (unsigned long long) d
;
1467 d
*= 1024.0*1024.0*1024.0*1024.0;
1468 return (unsigned long long) d
;
1470 if ((d
>= 90) || (d
<= 0))
1471 fatal("invalid value for percentage (must be 0..90)\n");
1472 return (unsigned long long) d
;
1476 fatal("invalid value for number of bytes\n");
1477 return (unsigned long long) d
;
1479 if (argv
[c
][-2] == '-')
1480 fatal("unrecognized size charakter \"%c\" for option \"%s\"\n",ch
,&argv
[c
][-2]);
1482 fatal("unrecognized size charakter \"%c\" for option \"%s\"\n",ch
,argv
[c
-1]);
1487 fatal("invalid argument - must be > 0\n");
1489 if (argv
[c
][-2] == '-')
1490 fatal("invalid low value for option \"%s\" - missing suffix?\n",&argv
[c
][-2]);
1492 fatal("invalid low value for option \"%s\" - missing suffix?\n",argv
[c
-1]);
1498 errormsg("unrecognized argument \"%s\" for option \"%s\"\n",argv
[c
],argv
[c
-1]);
1504 static int argcheck(const char *opt
, const char **argv
, int *c
, int argc
)
1506 if (strncmp(opt
,argv
[*c
],strlen(opt
)))
1508 if (strlen(argv
[*c
]) > 2)
1513 fatal("missing argument to option %s\n",opt
);
1520 static void addHashAlgorithm(const char *name
)
1522 const char *algoname
= "";
1525 int numalgo
= mhash_count();
1527 while (algo
<= numalgo
) {
1528 algoname
= (const char *) mhash_get_hash_name_static(algo
);
1529 if (algoname
&& (strcasecmp(algoname
,name
) == 0))
1536 if (strcasecmp(algoname
,name
) == 0) {
1537 dest_t
*dest
= malloc(sizeof(dest_t
));
1538 bzero(dest
,sizeof(dest_t
));
1539 dest
->name
= algoname
;
1542 dest
->next
= Dest
->next
;
1548 debugmsg("enabled hash algorithm %s\n",name
);
1552 fatal("invalid or unsupported hash function %s\n",name
);
1556 static void openDestinationFiles(dest_t
*d
)
1558 unsigned errs
= ErrorOccurred
;
1561 if (0 == strncmp(d
->arg
,"/dev/",5))
1563 d
->fd
= open(d
->arg
,d
->mode
,0666);
1564 if ((-1 == d
->fd
) && (errno
== EINVAL
)) {
1565 d
->mode
&= ~LARGEFILE
;
1566 d
->fd
= open(d
->arg
,d
->mode
,0666);
1568 if ((-1 == d
->fd
) && (errno
== EINVAL
)) {
1569 d
->mode
&= ~O_TRUNC
;
1570 d
->fd
= open(d
->arg
,d
->mode
,0666);
1573 d
->result
= strerror(errno
);
1574 errormsg("unable to open output %s: %s\n",d
->arg
,strerror(errno
));
1576 debugmsg("successfully opened destination file %s with fd %d\n",d
->arg
,d
->fd
);
1580 d
->name
= 0; /* tag destination as unstartable */
1585 if (0 == directio(d
->fd
,DIRECTIO_ON
))
1586 infomsg("direct I/O hinting enabled for output to %s\n",d
->arg
);
1588 infomsg("direct I/O hinting failed for output to %s: %s\n",d
->arg
,strerror(errno
));
1593 if (ErrorOccurred
!= errs
)
1594 fatal("unable to open all outputs\n");
1599 static const char *calcval(const char *arg
, unsigned long long *res
)
1604 switch (sscanf(arg
,"%lf%c",&d
,&ch
)) {
1610 return "negative value out of range";
1624 d
*= 1024.0*1024.0*1024.0;
1629 d
*= 1024.0*1024.0*1024.0*1024.0;
1633 if ((d
>= 90) || (d
<= 0))
1634 return "invalid value for percentage (must be 0..90)";
1640 return "invalid value for number of bytes";
1644 return "invalid dimension";
1648 return "value out of range";
1650 return "value out of range";
1656 return "unrecognized argument";
1660 static void initDefaults()
1663 char dfname
[PATH_MAX
+1];
1668 const char *home
= getenv("HOME");
1675 warningmsg("HOME environment variable not set - unable to find defaults file\n");
1678 strncpy(dfname
,home
,sizeof(dfname
)-1);
1679 dfname
[sizeof(dfname
)-1] = 0;
1681 if (l
+ 12 > PATH_MAX
) {
1682 warningmsg("path to defaults file breaks PATH_MAX\n");
1685 strcat(dfname
,"/.mbuffer.rc");
1686 df
= open(dfname
,O_RDONLY
);
1688 if (errno
== ENOENT
)
1689 infomsg("no defaults file ~/.mbuffer.rc\n");
1691 warningmsg("error opening defaults file %s: %s\n",dfname
,strerror(errno
));
1694 if (-1 == fstat(df
,&st
)) {
1695 warningmsg("unable to stat defaults file %s: %s\n",dfname
,strerror(errno
));
1699 if (getuid() != st
.st_uid
) {
1700 warningmsg("ignoring defaults file from different user\n");
1704 infomsg("reading defaults file %s\n",dfname
);
1705 dfstr
= fdopen(df
,"r");
1707 while (!feof(dfstr
)) {
1708 char key
[64],valuestr
[64];
1709 fscanf(dfstr
,"%255[^\n]\n",line
);
1710 char *pound
= strchr(line
,'#');
1711 unsigned long long value
;
1716 a
= sscanf(line
,"%63[A-Za-z]%*[ \t=:]%63[0-9a-zA-Z]",key
,valuestr
);
1718 warningmsg("unable to parse line '%s' in .mbuffer.rc; %d arguments\n",line
,a
);
1721 debugmsg("parsing key/value pair %s=%s\n",key
,valuestr
);
1722 if (strcasecmp(key
,"numblocks") == 0) {
1723 long nb
= strtol(valuestr
,0,0);
1724 if ((nb
== 0) && (errno
== EINVAL
)) {
1725 warningmsg("invalid argument for %s: \"%s\"\n",key
,valuestr
);
1728 debugmsg("Numblocks = %llu\n",Numblocks
);
1730 } else if (strcasecmp(key
,"pause") == 0) {
1731 long p
= strtol(valuestr
,0,0);
1732 if ((p
== 0) && (errno
== EINVAL
)) {
1733 warningmsg("invalid argument for %s: \"%s\"\n",key
,valuestr
);
1736 debugmsg("Pause = %d\n",Pause
);
1738 } else if (strcasecmp(key
,"autoloadtime") == 0) {
1739 long at
= strtol(valuestr
,0,0) - 1;
1740 if ((at
== 0) && (errno
== EINVAL
))
1741 warningmsg("invalid argument for %s: \"%s\"\n",key
,valuestr
);
1744 debugmsg("Autoloader time = %d\n",AutoloadTime
);
1746 } else if (strcasecmp(key
,"startread") == 0) {
1748 if (1 == sscanf(valuestr
,"%lf",&sr
))
1750 if ((sr
<= 1) && (sr
> 0)) {
1752 debugmsg("StartRead = %1.2lf\n",StartRead
);
1754 } else if (strcasecmp(key
,"startwrite") == 0) {
1756 if (1 == sscanf(valuestr
,"%lf",&sw
))
1758 if ((sw
<= 1) && (sw
> 0)) {
1760 debugmsg("StartWrite = %1.2lf\n",StartWrite
);
1762 } else if (strcasecmp(key
,"timeout") == 0) {
1763 long t
= strtol(valuestr
,0,0);
1764 if (((t
== 0) && (errno
== EINVAL
)) || (t
< 0))
1765 warningmsg("invalid argument for %s: \"%s\"\n",key
,valuestr
);
1768 debugmsg("Timeout = %lu\n",Timeout
);
1770 } else if (strcasecmp(key
,"showstatus") == 0) {
1771 if ((strcasecmp(valuestr
,"yes") == 0) || (strcasecmp(valuestr
,"on") == 0) || (strcmp(valuestr
,"1") == 0)) {
1773 debugmsg("showstatus = yes\n");
1774 } else if ((strcasecmp(valuestr
,"no") == 0) || (strcasecmp(valuestr
,"off") == 0) || (strcmp(valuestr
,"0") == 0)) {
1776 debugmsg("showstatus = no\n");
1778 warningmsg("invalid argument for %s: \"%s\"\n",key
,valuestr
);
1780 } else if (strcasecmp(key
,"logstatus") == 0) {
1781 if ((strcasecmp(valuestr
,"yes") == 0) || (strcasecmp(valuestr
,"on") == 0) || (strcmp(valuestr
,"1") == 0)) {
1783 debugmsg("logstatus = yes\n");
1784 } else if ((strcasecmp(valuestr
,"no") == 0) || (strcasecmp(valuestr
,"off") == 0) || (strcmp(valuestr
,"0") == 0)) {
1786 debugmsg("logstatus = no\n");
1788 warningmsg("invalid argument for %s: \"%s\"\n",key
,valuestr
);
1790 } else if (strcasecmp(key
,"memlock") == 0) {
1791 if ((strcasecmp(valuestr
,"yes") == 0) || (strcasecmp(valuestr
,"on") == 0) || (strcmp(valuestr
,"1") == 0)) {
1793 debugmsg("Memlock = %lu\n",Memlock
);
1794 } else if ((strcasecmp(valuestr
,"no") == 0) || (strcasecmp(valuestr
,"off") == 0) || (strcmp(valuestr
,"0") == 0)) {
1796 debugmsg("Memlock = %lu\n",Memlock
);
1798 warningmsg("invalid argument for %s: \"%s\"\n",key
,valuestr
);
1800 } else if (strcasecmp(key
,"printpid") == 0) {
1801 if ((strcasecmp(valuestr
,"yes") == 0) || (strcasecmp(valuestr
,"on") == 0) || (strcmp(valuestr
,"1") == 0)) {
1802 printmsg("PID is %d\n",getpid());
1803 } else if ((strcasecmp(valuestr
,"no") == 0) || (strcasecmp(valuestr
,"off") == 0) || (strcmp(valuestr
,"0") == 0)) {
1805 warningmsg("invalid argument for %s: \"%s\"\n",key
,valuestr
);
1808 const char *argerror
= calcval(valuestr
,&value
);
1810 warningmsg("ignoring key/value pair from defaults file (%s = %s): %s\n",key
,valuestr
,argerror
);
1813 if (strcasecmp(key
,"blocksize") == 0) {
1815 } else if (strcasecmp(key
,"maxwritespeed") == 0) {
1816 MaxWriteSpeed
= value
;
1817 } else if (strcasecmp(key
,"maxreadspeed") == 0) {
1818 MaxReadSpeed
= value
;
1819 } else if (strcasecmp(key
,"Totalmem") == 0) {
1821 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__) || defined(__FreeBSD__)
1822 Totalmem
= ((unsigned long long) NumP
* PgSz
* value
) / 100 ;
1823 debugmsg("Totalmem = %lluk\n",Totalmem
>>10);
1825 warningmsg("Unable to determine page size or amount of available memory - please specify an absolute amount of memory.\n");
1828 } else if (strcasecmp(key
,"tcpbuffer") == 0) {
1831 warningmsg("unknown key: %s\n",key
);
1834 infomsg("setting %s to %lld\n",key
,value
);
1841 int main(int argc
, const char **argv
)
1843 int optMset
= 0, optSset
= 0, optBset
= 0, optMode
= O_EXCL
, numOut
= 0;
1844 int numstdout
= 0, numthreads
= 0;
1848 #ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
1851 unsigned short netPortIn
= 0;
1852 unsigned short netPortOut
= 0;
1853 char *argv0
= strdup(argv
[0]), *progname
, null
;
1854 const char *outfile
= 0;
1855 struct sigaction sig
;
1858 /* setup logging prefix */
1859 progname
= basename(argv0
);
1860 PrefixLen
= strlen(progname
) + 2;
1861 Prefix
= malloc(PrefixLen
);
1862 (void) strcpy(Prefix
,progname
);
1863 Prefix
[PrefixLen
- 2] = ':';
1864 Prefix
[PrefixLen
- 1] = ' ';
1866 /* set verbose level before parsing defaults and options */
1867 for (c
= 1; c
< argc
; c
++) {
1868 const char *arg
= argv
[c
];
1869 if ((arg
[0] == '-') && (arg
[1] == 'v')) {
1872 verb
= strtol(arg
+2,0,0);
1874 verb
= strtol(argv
[++c
],0,0);
1875 if ((verb
== 0) && (errno
== EINVAL
))
1876 errormsg("invalid argument to option -v: \"%s\"\n",argv
[c
]);
1879 debugmsg("Verbose = %d\n",Verbose
);
1883 /* gather system parameters */
1884 TickTime
= 1000000 / sysconf(_SC_CLK_TCK
);
1885 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__)
1886 PgSz
= sysconf(_SC_PAGESIZE
);
1888 NumP
= sysconf(_SC_AVPHYS_PAGES
);
1891 debugmsg("total # of phys pages: %li (pagesize %li)\n",NumP
,PgSz
);
1892 Numblocks
= NumP
/50;
1893 #elif defined(__FreeBSD__)
1894 size_t nump_size
= sizeof(nump_size
);
1895 sysctlbyname("hw.availpages", &NumP
, &nump_size
, NULL
, 0);
1896 PgSz
= sysconf(_SC_PAGESIZE
);
1899 #if defined(_POSIX_MONOTONIC_CLOCK) && (_POSIX_MONOTONIC_CLOCK >= 0) && defined(CLOCK_MONOTONIC)
1900 if (sysconf(_SC_MONOTONIC_CLOCK
) > 0)
1901 ClockSrc
= CLOCK_MONOTONIC
;
1904 /* setup parameters */
1906 debugmsg("default buffer set to %d blocks of %lld bytes\n",Numblocks
,Blocksize
);
1907 for (c
= 1; c
< argc
; c
++) {
1908 if (!argcheck("-s",argv
,&c
,argc
)) {
1909 Blocksize
= Outsize
= calcint(argv
,c
,Blocksize
);
1911 debugmsg("Blocksize = %llu\n",Blocksize
);
1912 if (Blocksize
< 100)
1913 fatal("cannot set blocksize as percentage of total physical memory\n");
1914 } else if (!strcmp("--append",argv
[c
])) {
1915 optMode
|= O_APPEND
;
1916 debugmsg("append to next file\n");
1917 } else if (!strcmp("--truncate",argv
[c
])) {
1919 debugmsg("truncate next file\n");
1920 } else if (!argcheck("-m",argv
,&c
,argc
)) {
1921 Totalmem
= calcint(argv
,c
,Totalmem
);
1923 if (Totalmem
< 100) {
1924 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__) || defined(__FreeBSD__)
1925 Totalmem
= ((unsigned long long) NumP
* PgSz
* Totalmem
) / 100 ;
1927 fatal("Unable to determine page size or amount of available memory - please specify an absolute amount of memory.\n");
1930 debugmsg("Totalmem = %lluk\n",Totalmem
>>10);
1931 } else if (!argcheck("-b",argv
,&c
,argc
)) {
1932 long nb
= strtol(argv
[c
],0,0);
1933 if ((nb
== 0) && (errno
== EINVAL
)) {
1934 errormsg("invalid argument to option -b: \"%s\"\n",argv
[c
]);
1939 debugmsg("Numblocks = %llu\n",Numblocks
);
1940 } else if (!strcmp("--tcpbuffer",argv
[c
])) {
1941 TCPBufSize
= calcint(argv
,++c
,TCPBufSize
);
1942 debugmsg("TCPBufSize = %lu\n",TCPBufSize
);
1943 } else if (!argcheck("-d",argv
,&c
,argc
)) {
1944 #ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
1946 debugmsg("setting output size according to the blocksize of the device\n");
1948 fatal("cannot determine blocksize of device (unsupported by OS)\n");
1950 } else if (!argcheck("-v",argv
,&c
,argc
)) {
1951 /* has been parsed already */
1952 } else if (!argcheck("-u",argv
,&c
,argc
)) {
1954 long p
= strtol(argv
[c
],0,0);
1955 if ((p
== 0) && (errno
== EINVAL
))
1956 errormsg("invalid argument to option -u: \"%s\"\n",argv
[c
]);
1959 debugmsg("Pause = %d\n",Pause
);
1960 } else if (!argcheck("-r",argv
,&c
,argc
)) {
1961 MaxReadSpeed
= calcint(argv
,c
,0);
1962 debugmsg("MaxReadSpeed = %lld\n",MaxReadSpeed
);
1963 } else if (!argcheck("-R",argv
,&c
,argc
)) {
1964 MaxWriteSpeed
= calcint(argv
,c
,0);
1965 debugmsg("MaxWriteSpeed = %lld\n",MaxWriteSpeed
);
1966 } else if (!argcheck("-n",argv
,&c
,argc
)) {
1967 long nv
= strtol(argv
[c
],0,0);
1968 if ((nv
< 0) || ((nv
== 0) && (errno
== EINVAL
)))
1969 fatal("invalid argument to option -n: \"%s\"\n",argv
[c
]);
1973 fatal("argument for number of volumes must be > 0\n");
1974 debugmsg("NumVolumes = %d\n",NumVolumes
);
1975 } else if (!argcheck("-i",argv
,&c
,argc
)) {
1976 if (strcmp(argv
[c
],"-")) {
1978 debugmsg("Infile = %s\n",Infile
);
1980 Infile
= STDIN_FILENO
;
1981 debugmsg("Infile is stdin\n");
1983 } else if (!argcheck("-o",argv
,&c
,argc
)) {
1984 dest_t
*dest
= malloc(sizeof(dest_t
));
1985 if (strcmp(argv
[c
],"-")) {
1986 debugmsg("output file: %s\n",argv
[c
]);
1987 dest
->arg
= argv
[c
];
1988 dest
->name
= argv
[c
];
1990 dest
->mode
= O_CREAT
|O_WRONLY
|optMode
|Direct
|LARGEFILE
|OptSync
;
1993 fatal("cannot output multiple times to stdout\n");
1994 debugmsg("output to stdout\n",argv
[c
]);
1995 dest
->fd
= dup(STDOUT_FILENO
);
1996 err
= dup2(STDERR_FILENO
,STDOUT_FILENO
);
1998 dest
->arg
= "<stdout>";
1999 dest
->name
= "<stdout>";
2005 bzero(&dest
->thread
,sizeof(dest
->thread
));
2013 } else if (!strcmp("-0",argv
[c
])) {
2014 AddrFam
= AF_UNSPEC
;
2015 } else if (!strcmp("-4",argv
[c
])) {
2017 } else if (!strcmp("-6",argv
[c
])) {
2020 } else if (!argcheck("-I",argv
,&c
,argc
)) {
2021 initNetworkInput(argv
[c
]);
2022 } else if (!argcheck("-O",argv
,&c
,argc
)) {
2023 dest_t
*d
= createNetworkOutput(argv
[c
]);
2032 } else if (!argcheck("-T",argv
,&c
,argc
)) {
2033 Tmpfile
= malloc(strlen(argv
[c
]) + 1);
2035 fatal("out of memory\n");
2036 (void) strcpy(Tmpfile
, argv
[c
]);
2038 debugmsg("Tmpfile = %s\n",Tmpfile
);
2039 } else if (!strcmp("-t",argv
[c
])) {
2041 debugmsg("Memmap = 1\n");
2042 } else if (!argcheck("-l",argv
,&c
,argc
)) {
2043 Log
= open(argv
[c
],O_WRONLY
|O_APPEND
|O_TRUNC
|O_CREAT
|LARGEFILE
,0666);
2045 Log
= STDERR_FILENO
;
2046 errormsg("error opening log file: %s\n",strerror(errno
));
2048 debugmsg("logFile set to %s\n",argv
[c
]);
2049 } else if (!strcmp("-f",argv
[c
])) {
2051 debugmsg("overwrite = 1\n");
2052 } else if (!strcmp("-q",argv
[c
])) {
2053 debugmsg("disabling display of status\n");
2055 } else if (!strcmp("-Q",argv
[c
])) {
2056 debugmsg("disabling logging of status\n");
2058 } else if (!strcmp("-c",argv
[c
])) {
2059 debugmsg("enabling full synchronous I/O\n");
2061 } else if (!strcmp("-e",argv
[c
])) {
2062 debugmsg("will terminate on any kind of error\n");
2064 } else if (!argcheck("-a",argv
,&c
,argc
)) {
2065 long at
= strtol(argv
[c
],0,0) - 1;
2066 if ((at
== 0) && (errno
== EINVAL
))
2067 errormsg("invalid argument to option -a: \"%s\"\n",argv
[c
]);
2072 debugmsg("Autoloader time = %d\n",AutoloadTime
);
2073 } else if (!argcheck("-A",argv
,&c
,argc
)) {
2075 AutoloadCmd
= argv
[c
];
2076 debugmsg("Autoloader command = \"%s\"\n", AutoloadCmd
);
2077 } else if (!argcheck("-P",argv
,&c
,argc
)) {
2078 if (1 != sscanf(argv
[c
],"%lf",&StartWrite
))
2081 if ((StartWrite
> 1) || (StartWrite
<= 0))
2082 fatal("error in argument -P: must be bigger than 0 and less or equal 100\n");
2083 debugmsg("StartWrite = %1.2lf\n",StartWrite
);
2084 } else if (!argcheck("-p",argv
,&c
,argc
)) {
2085 if (1 == sscanf(argv
[c
],"%lf",&StartRead
))
2089 if ((StartRead
>= 1) || (StartRead
< 0))
2090 fatal("error in argument -p: must be bigger or equal to 0 and less than 100\n");
2091 debugmsg("StartRead = %1.2lf\n",StartRead
);
2092 } else if (!strcmp("-L",argv
[c
])) {
2093 #ifdef _POSIX_MEMLOCK_RANGE
2095 debugmsg("memory locking enabled\n");
2097 warning("POSIX memory locking is unsupported on this system.\n");
2099 } else if (!argcheck("-W",argv
,&c
,argc
)) {
2100 Timeout
= strtol(argv
[c
],0,0);
2102 fatal("invalid argument to option -W\n");
2103 if (Timeout
<= AutoloadTime
)
2104 fatal("timeout must be bigger than autoload time\n");
2105 } else if (!strcmp("--direct",argv
[c
])) {
2107 debugmsg("using O_DIRECT to open file descriptors\n");
2110 warningmsg("--direct is unsupported on this system\n");
2112 } else if (!strcmp("--help",argv
[c
]) || !strcmp("-h",argv
[c
])) {
2114 } else if (!strcmp("--version",argv
[c
]) || !strcmp("-V",argv
[c
])) {
2116 } else if (!strcmp("--md5",argv
[c
]) || !strcmp("-H",argv
[c
])) {
2118 addHashAlgorithm("MD5");
2120 fatal("hash calculation support has not been compiled in!\n");
2122 } else if (!strcmp("--hash",argv
[c
])) {
2125 fatal("missing argument to option --hash\n");
2127 if (!strcmp(argv
[c
],"list")) {
2128 (void) fprintf(stderr
,"valid hash functions are:\n");
2129 int algo
= mhash_count();
2131 const char *algoname
= (const char *) mhash_get_hash_name_static(algo
);
2133 (void) fprintf(stderr
,"\t%s\n",algoname
);
2138 #elif defined HAVE_MD5
2139 if (!strcmp(argv
[c
],"list")) {
2140 (void) fprintf(stderr
,"valid hash functions are:\n");
2141 (void) fprintf(stderr
,"\tMD5\n");
2145 fatal("hash calculation support has not been compiled in!\n");
2147 addHashAlgorithm(argv
[c
]);
2148 } else if (!strcmp("--pid",argv
[c
])) {
2149 printmsg("PID is %d\n",getpid());
2150 } else if (!argcheck("-D",argv
,&c
,argc
)) {
2151 OutVolsize
= calcint(argv
,c
,0);
2152 debugmsg("OutVolsize = %llu\n",OutVolsize
);
2154 fatal("unknown option \"%s\"\n",argv
[c
]);
2157 /* consistency check for options */
2158 if (AutoloadTime
&& Timeout
&& Timeout
<= AutoloadTime
)
2159 fatal("autoload time must be smaller than watchdog timeout\n");
2160 if (optBset
&optSset
&optMset
) {
2161 if (Numblocks
* Blocksize
!= Totalmem
)
2162 fatal("inconsistent options: blocksize * number of blocks != totalsize!\n");
2163 } else if (((!optBset
)&optSset
&optMset
) || (optMset
&(!optBset
)&(!optSset
))) {
2164 if (Totalmem
<= Blocksize
)
2165 fatal("total memory must be larger than block size\n");
2166 Numblocks
= Totalmem
/ Blocksize
;
2167 infomsg("Numblocks = %llu, Blocksize = %llu, Totalmem = %llu\n",(unsigned long long)Numblocks
,(unsigned long long)Blocksize
,(unsigned long long)Totalmem
);
2168 } else if (optBset
&!optSset
&optMset
) {
2170 fatal("blocksize must be greater than 0\n");
2171 if (Totalmem
<= Blocksize
)
2172 fatal("total memory must be larger than block size\n");
2173 Blocksize
= Totalmem
/ Numblocks
;
2174 infomsg("blocksize = %llu\n",(unsigned long long)Blocksize
);
2176 if ((StartRead
< 1) && (StartWrite
> 0))
2177 fatal("setting both low watermark and high watermark doesn't make any sense...\n");
2178 if ((NumSenders
-Hashers
> 0) && (Autoloader
|| OutVolsize
))
2179 fatal("multi-volume support is unsupported with multiple outputs\n");
2181 if ((!outfile
) && (!Infile
))
2182 fatal("Setting autoloader time or command without using a device doesn't make any sense!\n");
2183 if (outfile
&& Infile
) {
2184 fatal("Which one is your autoloader? Input or output? Replace input or output with a pipe.\n");
2187 if (Infile
&& netPortIn
)
2188 fatal("Setting both network input port and input file doesn't make sense!\n");
2189 if (outfile
&& netPortOut
)
2190 fatal("Setting both network output and output file doesn't make sense!\n");
2192 /* multi volume input consistency checking */
2193 if ((NumVolumes
!= 1) && (!Infile
))
2194 fatal("multi volume support for input needs an explicit given input device (option -i)\n");
2196 /* SPW: Volsize consistency checking */
2197 if (OutVolsize
&& !outfile
)
2198 fatal("Setting OutVolsize without an output device doesn't make sense!\n");
2199 if ((OutVolsize
!= 0) && (OutVolsize
< Blocksize
))
2200 /* code assumes we can write at least one block */
2201 fatal("If non-zero, OutVolsize must be at least as large as the buffer blocksize (%llu)!\n",Blocksize
);
2204 /* check that we stay within system limits */
2207 size_t semvmx_size
= sizeof(mxnrsem
);
2208 if (sysctlbyname("kern.ipc.semvmx", &mxnrsem
, &semvmx_size
, 0, 0) == -1)
2212 mxnrsem
= sysconf(_SC_SEM_VALUE_MAX
);
2214 if (-1 == mxnrsem
) {
2215 #ifdef SEM_MAX_VALUE
2216 mxnrsem
= SEM_MAX_VALUE
;
2219 warningmsg("unable to determine maximum value of semaphores\n");
2223 fatal("Minimum block count is 5.\n");
2224 if (Numblocks
> mxnrsem
) {
2225 fatal("cannot allocate more than %d blocks.\nThis is a system dependent limit, depending on the maximum semaphore value.\nPlease choose a bigger block size.\n",mxnrsem
);
2228 if ((Blocksize
* (long long)Numblocks
) > (long long)SSIZE_MAX
)
2229 fatal("Cannot address so much memory (%lld*%d=%lld>%lld).\n",Blocksize
,Numblocks
,Blocksize
*(long long)Numblocks
,(long long)SSIZE_MAX
);
2231 Buffer
= (char **) valloc(Numblocks
* sizeof(char *));
2233 fatal("Could not allocate enough memory (%d requested): %s\n",Numblocks
* sizeof(char *),strerror(errno
));
2235 infomsg("mapping temporary file to memory with %llu blocks with %llu byte (%llu kB total)...\n",(unsigned long long) Numblocks
,(unsigned long long) Blocksize
,(unsigned long long) ((Numblocks
*Blocksize
) >> 10));
2237 char tmplname
[] = "mbuffer-XXXXXX";
2238 char *tmpdir
= getenv("TMPDIR") ? getenv("TMPDIR") : "/var/tmp";
2239 char tfilename
[sizeof(tmplname
) + strlen(tmpdir
) + 1];
2240 (void) strcpy(tfilename
,tmpdir
);
2241 (void) strcat(tfilename
,"/");
2242 (void) strcat(tfilename
,tmplname
);
2243 Tmp
= mkstemp(tfilename
);
2244 Tmpfile
= malloc(strlen(tfilename
));
2246 fatal("out of memory: %s\n",strerror(errno
));
2247 (void) strcpy(Tmpfile
,tfilename
);
2248 infomsg("tmpfile is %s\n",Tmpfile
);
2250 mode_t mode
= O_RDWR
| LARGEFILE
;
2251 if (strncmp(Tmpfile
,"/dev/",5))
2252 mode
|= O_CREAT
|O_EXCL
;
2253 Tmp
= open(Tmpfile
,mode
,0600);
2256 fatal("could not create temporary file (%s): %s\n",Tmpfile
,strerror(errno
));
2257 if (strncmp(Tmpfile
,"/dev/",5))
2258 (void) unlink(Tmpfile
);
2259 /* resize the file. Needed - at least under linux, who knows why? */
2260 if (-1 == lseek(Tmp
,Numblocks
* Blocksize
- sizeof(int),SEEK_SET
))
2261 fatal("could not resize temporary file: %s\n",strerror(errno
));
2262 if (-1 == write(Tmp
,&c
,sizeof(int)))
2263 fatal("could not resize temporary file: %s\n",strerror(errno
));
2264 Buffer
[0] = mmap(0,Blocksize
*Numblocks
,PROT_READ
|PROT_WRITE
,MAP_SHARED
,Tmp
,0);
2265 if (MAP_FAILED
== Buffer
[0])
2266 fatal("could not map buffer-file to memory: %s\n",strerror(errno
));
2267 debugmsg("temporary file mapped to address %p\n",Buffer
[0]);
2269 infomsg("allocating memory for %d blocks with %llu byte (%llu kB total)...\n",Numblocks
,(unsigned long long) Blocksize
,(unsigned long long) ((Numblocks
*Blocksize
) >> 10));
2270 Buffer
[0] = (char *) valloc(Blocksize
* Numblocks
);
2272 fatal("Could not allocate enough memory (%lld requested): %s\n",(unsigned long long)Blocksize
* Numblocks
,strerror(errno
));
2273 #ifdef MADV_DONTFORK
2274 if (-1 == madvise(Buffer
[0],Blocksize
* Numblocks
, MADV_DONTFORK
))
2275 warningmsg("unable to advise memory handling of buffer: %s\n",strerror(errno
));
2278 for (c
= 1; c
< Numblocks
; c
++) {
2279 Buffer
[c
] = Buffer
[0] + Blocksize
* c
;
2280 *Buffer
[c
] = 0; /* touch every block before locking */
2283 #ifdef _POSIX_MEMLOCK_RANGE
2286 #ifndef HAVE_SETEUID
2287 #define seteuid setuid
2290 if (0 != seteuid(0))
2291 warningmsg("could not change to uid 0 to lock memory (is mbuffer setuid root?)\n");
2292 else if ((0 != mlock((char *)Buffer
,Numblocks
* sizeof(char *))) || (0 != mlock(Buffer
[0],Blocksize
* Numblocks
)))
2293 warningmsg("could not lock buffer in memory: %s\n",strerror(errno
));
2295 infomsg("memory locked successfully\n");
2296 err
= seteuid(uid
); /* don't give anyone a chance to attack this program, so giveup uid 0 after locking... */
2301 debugmsg("creating semaphores...\n");
2302 if (0 != sem_init(&Buf2Dev
,0,0))
2303 fatal("Error creating semaphore Buf2Dev: %s\n",strerror(errno
));
2304 if (0 != sem_init(&Dev2Buf
,0,Numblocks
))
2305 fatal("Error creating semaphore Dev2Buf: %s\n",strerror(errno
));
2307 debugmsg("opening input...\n");
2309 int flags
= O_RDONLY
| LARGEFILE
| Direct
;
2310 In
= open(Infile
,flags
);
2312 if (errno
== EINVAL
) {
2313 flags
&= ~LARGEFILE
;
2314 In
= open(Infile
,flags
);
2317 fatal("could not open input file: %s\n",strerror(errno
));
2319 } else if (In
== -1) {
2323 if (0 == directio(In
,DIRECTIO_ON
))
2324 infomsg("direct I/O hinting enabled for input\n");
2326 infomsg("direct I/O hinting failed for input: %s\n",strerror(errno
));
2329 dest_t
*d
= malloc(sizeof(dest_t
));
2330 d
->fd
= dup(STDOUT_FILENO
);
2331 err
= dup2(STDERR_FILENO
,STDOUT_FILENO
);
2333 d
->name
= "<stdout>";
2334 d
->arg
= "<stdout>";
2337 bzero(&d
->thread
,sizeof(d
->thread
));
2342 openDestinationFiles(Dest
);
2343 if (NumSenders
== -1) {
2344 fatal("no output left - nothing to do\n");
2347 debugmsg("checking if we have a controlling terminal...\n");
2348 sig
.sa_handler
= SIG_IGN
;
2349 sigemptyset(&sig
.sa_mask
);
2351 err
= sigaction(SIGTTIN
,&sig
,0);
2353 fl
= fcntl(STDERR_FILENO
,F_GETFL
);
2354 err
= fcntl(STDERR_FILENO
,F_SETFL
,fl
| O_NONBLOCK
);
2356 if ((read(STDERR_FILENO
,&c
,1) == -1) && (errno
!= EAGAIN
)) {
2357 int tty
= open("/dev/tty",O_RDWR
);
2360 if ((Autoloader
== 0) && (outfile
))
2361 warningmsg("No controlling terminal and no autoloader command specified.\n");
2364 err
= dup2(tty
,STDERR_FILENO
);
2368 err
= fcntl(STDERR_FILENO
,F_SETFL
,fl
);
2370 if ((Terminal
== 1) && (NumVolumes
!= 1)) {
2371 struct termios tset
;
2372 if (-1 == tcgetattr(STDERR_FILENO
,&tset
)) {
2373 warningmsg("unable to get terminal attributes: %s\n",strerror(errno
));
2375 tset
.c_lflag
&= (~ICANON
) & (~ECHO
);
2376 tset
.c_cc
[VTIME
] = 0;
2377 tset
.c_cc
[VMIN
] = 1;
2378 if (-1 == tcsetattr(STDERR_FILENO
,TCSANOW
,&tset
))
2379 warningmsg("unable to set terminal attributes: %s\n",strerror(errno
));
2383 debugmsg("registering signals...\n");
2384 sig
.sa_handler
= sigHandler
;
2385 err
= sigemptyset(&sig
.sa_mask
);
2387 err
= sigaddset(&sig
.sa_mask
,SIGINT
);
2389 sig
.sa_flags
= SA_RESTART
;
2390 if (0 != sigaction(SIGINT
,&sig
,0))
2391 warningmsg("error registering new SIGINT handler: %s\n",strerror(errno
));
2392 err
= sigemptyset(&sig
.sa_mask
);
2394 err
= sigaddset(&sig
.sa_mask
,SIGHUP
);
2396 if (0 != sigaction(SIGHUP
,&sig
,0))
2397 warningmsg("error registering new SIGHUP handler: %s\n",strerror(errno
));
2399 debugmsg("starting threads...\n");
2400 (void) gettimeofday(&Starttime
,0);
2401 err
= sigfillset(&signalSet
);
2403 (void) pthread_sigmask(SIG_BLOCK
, &signalSet
, NULL
);
2405 /* select destination for output thread */
2407 while (dest
->fd
== -1) {
2409 debugmsg("skipping destination %s\n",dest
->arg
);
2414 #ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
2415 debugmsg("checking output device...\n");
2416 if (-1 == fstat(dest
->fd
,&st
))
2417 errormsg("could not stat output: %s\n",strerror(errno
));
2418 else if (S_ISBLK(st
.st_mode
) || S_ISCHR(st
.st_mode
)) {
2419 infomsg("blocksize is %d bytes on output device\n",st
.st_blksize
);
2420 if (Blocksize
% st
.st_blksize
!= 0) {
2421 warningmsg("Blocksize should be a multiple of the blocksize of the output device!\n"
2422 "This can cause problems with some device/OS combinations...\n"
2423 "Blocksize on output device is %d (transfer block size is %lld)\n", st
.st_blksize
, Blocksize
);
2425 fatal("unable to set output blocksize\n");
2428 infomsg("setting output blocksize to %d\n",st
.st_blksize
);
2429 Outsize
= st
.st_blksize
;
2433 infomsg("no device on output stream\n");
2434 debugmsg("checking input device...\n");
2435 if (-1 == fstat(In
,&st
))
2436 warningmsg("could not stat input: %s\n",strerror(errno
));
2437 else if (S_ISBLK(st
.st_mode
) || S_ISCHR(st
.st_mode
)) {
2438 infomsg("blocksize is %d bytes on input device\n",st
.st_blksize
);
2439 if (Blocksize
% st
.st_blksize
!= 0) {
2440 warningmsg("Blocksize should be a multiple of the blocksize of the input device!\n"
2441 "Use option -s to adjust transfer block size if you get an out-of-memory error on input.\n"
2442 "Blocksize on input device is %d (transfer block size is %lld)\n", st
.st_blksize
, Blocksize
);
2445 infomsg("no device on input stream\n");
2447 warningmsg("Could not stat output device (unsupported by system)!\n"
2448 "This can result in incorrect written data when\n"
2449 "using multiple volumes. Continue at your own risk!\n");
2451 if (((Verbose
< 4) || (StatusLog
== 0)) && (Quiet
!= 0))
2454 if (-1 == pipe(TermQ
))
2455 fatal("could not create termination pipe: %s\n",strerror(errno
));
2460 err
= pthread_create(&dest
->thread
,0,&outputThread
,dest
);
2463 err
= pthread_create(&Watchdog
,0,&watchdogThread
,(void*)0);
2467 err
= pthread_create(&Reader
,0,&inputThread
,0);
2469 (void) pthread_sigmask(SIG_UNBLOCK
, &signalSet
, NULL
);
2471 err
= pthread_join(Reader
,0);
2473 errormsg("error joining reader: %s\n",strerror(errno
));
2475 (void) pthread_sigmask(SIG_UNBLOCK
, &signalSet
, NULL
);
2476 (void) inputThread(0);
2477 debugmsg("waiting for output to finish...\n");
2478 if (TermQ
[0] != -1) {
2479 err
= read(TermQ
[0],&null
,1);
2487 infomsg("waiting for senders...\n");
2494 debugmsg("joining sender for %s\n",d
->arg
);
2496 debugmsg("joining hasher for %s\n",d
->name
);
2498 ret
= pthread_join(d
->thread
,&status
);
2500 errormsg("error joining %s: %s\n",d
->arg
,d
->name
,strerror(errno
));
2507 if (Status
|| Log
!= STDERR_FILENO
)
2508 summary(Numout
* Blocksize
+ Rest
, numthreads
);
2510 int ret
= munmap(Buffer
[0],Blocksize
*Numblocks
);
2518 dest_t
*n
= d
->next
;
2521 warningmsg("error during output to %s: %s\n",d
->arg
,d
->result
);
2523 (void) write(STDERR_FILENO
,d
->result
,strlen(d
->result
));
2524 if (Log
!= STDERR_FILENO
)
2525 (void) write(Log
,d
->result
,strlen(d
->result
));