1 /* -*- c-file-style: "linux" -*-
3 Copyright (C) 1996-2001 by Andrew Tridgell
4 Copyright (C) Paul Mackerras 1996
5 Copyright (C) 2001 by Martin Pool <mbp@samba.org>
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 socket and pipe IO utilities used in rsync
29 /* if no timeout is specified then use a 60 second select timeout */
30 #define SELECT_TIMEOUT 60
32 static int io_multiplexing_out
;
33 static int io_multiplexing_in
;
34 static int multiplex_in_fd
;
35 static int multiplex_out_fd
;
36 static time_t last_io
;
41 extern int io_timeout
;
42 extern struct stats stats
;
45 /** Ignore EOF errors while reading a module listing if the remote
46 version is 24 or less. */
47 int kludge_around_eof
= False
;
50 static int io_error_fd
= -1;
52 static void read_loop(int fd
, char *buf
, int len
);
54 static void check_timeout(void)
56 extern int am_server
, am_daemon
;
61 if (!io_timeout
) return;
70 if (last_io
&& io_timeout
&& (t
-last_io
) >= io_timeout
) {
71 if (!am_server
&& !am_daemon
) {
72 rprintf(FERROR
,"io timeout after %d seconds - exiting\n",
75 exit_cleanup(RERR_TIMEOUT
);
79 /* setup the fd used to propogate errors */
80 void io_set_error_fd(int fd
)
85 /* read some data from the error fd and write it to the write log code */
86 static void read_error_fd(void)
93 /* io_error_fd is temporarily disabled -- is this meant to
94 * prevent indefinite recursion? */
97 read_loop(fd
, buf
, 4);
100 len
= tag
& 0xFFFFFF;
106 if (n
> (sizeof(buf
)-1)) n
= sizeof(buf
)-1;
107 read_loop(fd
, buf
, n
);
108 rwrite((enum logcode
)tag
, buf
, n
);
116 static void whine_about_eof (void)
119 It's almost always an error to get an EOF when we're trying
120 to read from the network, because the protocol is
123 However, there is one unfortunate cases where it is not,
124 which is rsync <2.4.6 sending a list of modules on a
125 server, since the list is terminated by closing the socket.
126 So, for the section of the program where that is a problem
127 (start_socket_client), kludge_around_eof is True and we
131 if (kludge_around_eof
)
135 "%s: connection unexpectedly closed "
136 "(%.0f bytes read so far)\n",
137 RSYNC_NAME
, (double)stats
.total_read
);
139 exit_cleanup (RERR_STREAMIO
);
144 static void die_from_readerr (int err
)
146 /* this prevents us trying to write errors on a dead socket */
147 io_multiplexing_close();
149 rprintf(FERROR
, "%s: read error: %s\n",
150 RSYNC_NAME
, strerror (err
));
151 exit_cleanup(RERR_STREAMIO
);
156 * Read from a socket with IO timeout. return the number of bytes
157 * read. If no bytes can be read then exit, never return a number <= 0.
159 * TODO: If the remote shell connection fails, then current versions
160 * actually report an "unexpected EOF" error here. Since it's a
161 * fairly common mistake to try to use rsh when ssh is required, we
162 * should trap that: if we fail to read any data at all, we should
163 * give a better explanation. We can tell whether the connection has
164 * started by looking e.g. at whether the remote version is known yet.
166 static int read_timeout (int fd
, char *buf
, int len
)
173 /* until we manage to read *something* */
181 if (io_error_fd
!= -1) {
182 FD_SET(io_error_fd
, &fds
);
183 if (io_error_fd
> fd
) fd_count
= io_error_fd
+1;
186 tv
.tv_sec
= io_timeout
?io_timeout
:SELECT_TIMEOUT
;
191 count
= select(fd_count
, &fds
, NULL
, NULL
, &tv
);
198 if (errno
== EBADF
) {
199 exit_cleanup(RERR_SOCKETIO
);
204 if (io_error_fd
!= -1 && FD_ISSET(io_error_fd
, &fds
)) {
208 if (!FD_ISSET(fd
, &fds
)) continue;
210 n
= read(fd
, buf
, len
);
217 last_io
= time(NULL
);
221 return -1; /* doesn't return */
222 } else if (n
== -1) {
223 if (errno
== EINTR
|| errno
== EWOULDBLOCK
||
227 die_from_readerr (errno
);
237 /*! Continue trying to read len bytes - don't return until len has
239 static void read_loop (int fd
, char *buf
, int len
)
242 int n
= read_timeout(fd
, buf
, len
);
251 * Read from the file descriptor handling multiplexing - return number
254 * Never returns <= 0.
256 static int read_unbuffered(int fd
, char *buf
, int len
)
258 static int remaining
;
262 if (!io_multiplexing_in
|| fd
!= multiplex_in_fd
)
263 return read_timeout(fd
, buf
, len
);
267 len
= MIN(len
, remaining
);
268 read_loop(fd
, buf
, len
);
274 read_loop (fd
, line
, 4);
277 remaining
= tag
& 0xFFFFFF;
280 if (tag
== MPLEX_BASE
) continue;
284 if (tag
!= FERROR
&& tag
!= FINFO
) {
285 rprintf(FERROR
,"unexpected tag %d\n", tag
);
286 exit_cleanup(RERR_STREAMIO
);
289 if (remaining
> sizeof(line
)-1) {
290 rprintf(FERROR
,"multiplexing overflow %d\n\n",
292 exit_cleanup(RERR_STREAMIO
);
295 read_loop(fd
, line
, remaining
);
298 rprintf((enum logcode
)tag
,"%s", line
);
306 /* do a buffered read from fd. don't return until all N bytes
307 have been read. If all N can't be read then exit with an error */
308 static void readfd (int fd
, char *buffer
, int N
)
316 ret
= read_unbuffered (fd
, buffer
+ total
, N
-total
);
320 stats
.total_read
+= total
;
324 int32
read_int(int f
)
331 if (ret
== (int32
)0xffffffff) return -1;
335 int64
read_longint(int f
)
337 extern int remote_version
;
342 if ((int32
)ret
!= (int32
)0xffffffff) {
347 rprintf(FERROR
,"Integer overflow - attempted 64 bit offset\n");
348 exit_cleanup(RERR_UNSUPPORTED
);
350 if (remote_version
>= 16) {
352 ret
= IVAL(b
,0) | (((int64
)IVAL(b
,4))<<32);
359 void read_buf(int f
,char *buf
,int len
)
364 void read_sbuf(int f
,char *buf
,int len
)
366 read_buf (f
,buf
,len
);
370 unsigned char read_byte(int f
)
373 read_buf (f
, (char *)&c
, 1);
377 /* write len bytes to fd */
378 static void writefd_unbuffered(int fd
,char *buf
,int len
)
389 while (total
< len
) {
395 if (io_error_fd
!= -1) {
396 FD_SET(io_error_fd
,&r_fds
);
397 if (io_error_fd
> fd_count
)
398 fd_count
= io_error_fd
;
401 tv
.tv_sec
= io_timeout
?io_timeout
:SELECT_TIMEOUT
;
406 count
= select(fd_count
+1,
407 io_error_fd
!= -1?&r_fds
:NULL
,
416 if (errno
== EBADF
) {
417 exit_cleanup(RERR_SOCKETIO
);
422 if (io_error_fd
!= -1 && FD_ISSET(io_error_fd
, &r_fds
)) {
426 if (FD_ISSET(fd
, &w_fds
)) {
427 int ret
, n
= len
-total
;
428 ret
= write(fd
,buf
+total
,n
);
430 if (ret
== -1 && errno
== EINTR
) {
435 (errno
== EWOULDBLOCK
|| errno
== EAGAIN
)) {
442 "error writing %d unbuffered bytes"
443 " - exiting: %s\n", len
,
445 exit_cleanup(RERR_STREAMIO
);
448 /* Sleep after writing to limit I/O bandwidth */
452 tv
.tv_usec
= ret
* 1000 / bwlimit
;
453 while (tv
.tv_usec
> 1000000)
456 tv
.tv_usec
-= 1000000;
458 select(0, NULL
, NULL
, NULL
, &tv
);
464 last_io
= time(NULL
);
472 static char *io_buffer
;
473 static int io_buffer_count
;
475 void io_start_buffering(int fd
)
477 if (io_buffer
) return;
478 multiplex_out_fd
= fd
;
479 io_buffer
= (char *)malloc(IO_BUFFER_SIZE
);
480 if (!io_buffer
) out_of_memory("writefd");
484 /* write an message to a multiplexed stream. If this fails then rsync
486 static void mplex_write(int fd
, enum logcode code
, char *buf
, int len
)
491 SIVAL(buffer
, 0, ((MPLEX_BASE
+ (int)code
)<<24) + len
);
493 if (n
> (sizeof(buffer
)-4)) {
494 n
= sizeof(buffer
)-4;
497 memcpy(&buffer
[4], buf
, n
);
498 writefd_unbuffered(fd
, buffer
, n
+4);
504 writefd_unbuffered(fd
, buf
, len
);
511 int fd
= multiplex_out_fd
;
515 if (!io_buffer_count
|| no_flush
) return;
517 if (io_multiplexing_out
) {
518 mplex_write(fd
, FNONE
, io_buffer
, io_buffer_count
);
520 writefd_unbuffered(fd
, io_buffer
, io_buffer_count
);
526 /* XXX: fd is ignored, which seems a little strange. */
527 void io_end_buffering(int fd
)
530 if (!io_multiplexing_out
) {
536 static void writefd(int fd
,char *buf
,int len
)
538 stats
.total_written
+= len
;
542 if (!io_buffer
|| fd
!= multiplex_out_fd
) {
543 writefd_unbuffered(fd
, buf
, len
);
548 int n
= MIN(len
, IO_BUFFER_SIZE
-io_buffer_count
);
550 memcpy(io_buffer
+io_buffer_count
, buf
, n
);
553 io_buffer_count
+= n
;
556 if (io_buffer_count
== IO_BUFFER_SIZE
) io_flush();
561 void write_int(int f
,int32 x
)
570 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
571 * 64-bit types on this platform.
573 void write_longint(int f
, int64 x
)
575 extern int remote_version
;
578 if (remote_version
< 16 || x
<= 0x7FFFFFFF) {
579 write_int(f
, (int)x
);
583 write_int(f
, (int32
)0xFFFFFFFF);
584 SIVAL(b
,0,(x
&0xFFFFFFFF));
585 SIVAL(b
,4,((x
>>32)&0xFFFFFFFF));
590 void write_buf(int f
,char *buf
,int len
)
595 /* write a string to the connection */
596 static void write_sbuf(int f
,char *buf
)
598 write_buf(f
, buf
, strlen(buf
));
602 void write_byte(int f
,unsigned char c
)
604 write_buf(f
,(char *)&c
,1);
609 int read_line(int f
, char *buf
, int maxlen
)
614 if (buf
[0] == 0) return 0;
615 if (buf
[0] == '\n') {
619 if (buf
[0] != '\r') {
633 void io_printf(int fd
, const char *format
, ...)
639 va_start(ap
, format
);
640 len
= vsnprintf(buf
, sizeof(buf
), format
, ap
);
643 if (len
< 0) exit_cleanup(RERR_STREAMIO
);
649 /* setup for multiplexing an error stream with the data stream */
650 void io_start_multiplex_out(int fd
)
652 multiplex_out_fd
= fd
;
654 io_start_buffering(fd
);
655 io_multiplexing_out
= 1;
658 /* setup for multiplexing an error stream with the data stream */
659 void io_start_multiplex_in(int fd
)
661 multiplex_in_fd
= fd
;
663 io_multiplexing_in
= 1;
666 /* write an message to the multiplexed error stream */
667 int io_multiplex_write(enum logcode code
, char *buf
, int len
)
669 if (!io_multiplexing_out
) return 0;
672 stats
.total_written
+= (len
+4);
673 mplex_write(multiplex_out_fd
, code
, buf
, len
);
677 /* stop output multiplexing */
678 void io_multiplexing_close(void)
680 io_multiplexing_out
= 0;