1 /* -*- c-file-style: "linux" -*-
3 Copyright (C) 1996-2001 by Andrew Tridgell
4 Copyright (C) Paul Mackerras 1996
5 Copyright (C) 2001 by Martin Pool <mbp@samba.org>
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 socket and pipe IO utilities used in rsync
29 /* if no timeout is specified then use a 60 second select timeout */
30 #define SELECT_TIMEOUT 60
32 static int io_multiplexing_out
;
33 static int io_multiplexing_in
;
34 static int multiplex_in_fd
;
35 static int multiplex_out_fd
;
36 static time_t last_io
;
41 extern int io_timeout
;
42 extern struct stats stats
;
45 /** Ignore EOF errors while reading a module listing if the remote
46 version is 24 or less. */
47 int kludge_around_eof
= False
;
50 static int io_error_fd
= -1;
52 static void read_loop(int fd
, char *buf
, int len
);
54 static void check_timeout(void)
56 extern int am_server
, am_daemon
;
61 if (!io_timeout
) return;
70 if (last_io
&& io_timeout
&& (t
-last_io
) >= io_timeout
) {
71 if (!am_server
&& !am_daemon
) {
72 rprintf(FERROR
,"io timeout after %d seconds - exiting\n",
75 exit_cleanup(RERR_TIMEOUT
);
79 /* setup the fd used to propogate errors */
80 void io_set_error_fd(int fd
)
85 /* read some data from the error fd and write it to the write log code */
86 static void read_error_fd(void)
93 /* io_error_fd is temporarily disabled -- is this meant to
94 * prevent indefinite recursion? */
97 read_loop(fd
, buf
, 4);
100 len
= tag
& 0xFFFFFF;
106 if (n
> (sizeof(buf
)-1)) n
= sizeof(buf
)-1;
107 read_loop(fd
, buf
, n
);
108 rwrite((enum logcode
)tag
, buf
, n
);
116 static void whine_about_eof (void)
119 It's almost always an error to get an EOF when we're trying
120 to read from the network, because the protocol is
123 However, there is one unfortunate cases where it is not,
124 which is rsync <2.4.6 sending a list of modules on a
125 server, since the list is terminated by closing the socket.
126 So, for the section of the program where that is a problem
127 (start_socket_client), kludge_around_eof is True and we
131 if (kludge_around_eof
)
135 "%s: connection unexpectedly closed "
136 "(%.0f bytes read so far)\n",
137 RSYNC_NAME
, (double)stats
.total_read
);
139 exit_cleanup (RERR_STREAMIO
);
144 static void die_from_readerr (int err
)
146 /* this prevents us trying to write errors on a dead socket */
147 io_multiplexing_close();
149 rprintf(FERROR
, "%s: read error: %s\n",
150 RSYNC_NAME
, strerror (err
));
151 exit_cleanup(RERR_STREAMIO
);
156 * Read from a socket with IO timeout. return the number of bytes
157 * read. If no bytes can be read then exit, never return a number <= 0.
159 * TODO: If the remote shell connection fails, then current versions
160 * actually report an "unexpected EOF" error here. Since it's a
161 * fairly common mistake to try to use rsh when ssh is required, we
162 * should trap that: if we fail to read any data at all, we should
163 * give a better explanation. We can tell whether the connection has
164 * started by looking e.g. at whether the remote version is known yet.
166 static int read_timeout (int fd
, char *buf
, int len
)
173 /* until we manage to read *something* */
180 if (io_error_fd
!= -1) {
181 FD_SET(io_error_fd
, &fds
);
182 if (io_error_fd
> fd
) fd_count
= io_error_fd
+1;
185 tv
.tv_sec
= io_timeout
?io_timeout
:SELECT_TIMEOUT
;
190 if (select(fd_count
, &fds
, NULL
, NULL
, &tv
) < 1) {
191 if (errno
== EBADF
) {
192 exit_cleanup(RERR_SOCKETIO
);
198 if (io_error_fd
!= -1 && FD_ISSET(io_error_fd
, &fds
)) {
202 if (!FD_ISSET(fd
, &fds
)) continue;
204 n
= read(fd
, buf
, len
);
211 last_io
= time(NULL
);
215 return -1; /* doesn't return */
216 } else if (n
== -1) {
217 if (errno
== EINTR
|| errno
== EWOULDBLOCK
||
221 die_from_readerr (errno
);
231 /*! Continue trying to read len bytes - don't return until len has
233 static void read_loop (int fd
, char *buf
, int len
)
236 int n
= read_timeout(fd
, buf
, len
);
245 * Read from the file descriptor handling multiplexing - return number
248 * Never returns <= 0.
250 static int read_unbuffered(int fd
, char *buf
, int len
)
252 static int remaining
;
256 if (!io_multiplexing_in
|| fd
!= multiplex_in_fd
)
257 return read_timeout(fd
, buf
, len
);
261 len
= MIN(len
, remaining
);
262 read_loop(fd
, buf
, len
);
268 read_loop (fd
, line
, 4);
271 remaining
= tag
& 0xFFFFFF;
274 if (tag
== MPLEX_BASE
) continue;
278 if (tag
!= FERROR
&& tag
!= FINFO
) {
279 rprintf(FERROR
,"unexpected tag %d\n", tag
);
280 exit_cleanup(RERR_STREAMIO
);
283 if (remaining
> sizeof(line
)-1) {
284 rprintf(FERROR
,"multiplexing overflow %d\n\n",
286 exit_cleanup(RERR_STREAMIO
);
289 read_loop(fd
, line
, remaining
);
292 rprintf((enum logcode
)tag
,"%s", line
);
300 /* do a buffered read from fd. don't return until all N bytes
301 have been read. If all N can't be read then exit with an error */
302 static void readfd (int fd
, char *buffer
, int N
)
310 ret
= read_unbuffered (fd
, buffer
+ total
, N
-total
);
314 stats
.total_read
+= total
;
318 int32
read_int(int f
)
325 if (ret
== (int32
)0xffffffff) return -1;
329 int64
read_longint(int f
)
331 extern int remote_version
;
336 if ((int32
)ret
!= (int32
)0xffffffff) {
341 rprintf(FERROR
,"Integer overflow - attempted 64 bit offset\n");
342 exit_cleanup(RERR_UNSUPPORTED
);
344 if (remote_version
>= 16) {
346 ret
= IVAL(b
,0) | (((int64
)IVAL(b
,4))<<32);
353 void read_buf(int f
,char *buf
,int len
)
358 void read_sbuf(int f
,char *buf
,int len
)
360 read_buf (f
,buf
,len
);
364 unsigned char read_byte(int f
)
367 read_buf (f
, (char *)&c
, 1);
371 /* write len bytes to fd */
372 static void writefd_unbuffered(int fd
,char *buf
,int len
)
383 while (total
< len
) {
389 if (io_error_fd
!= -1) {
390 FD_SET(io_error_fd
,&r_fds
);
391 if (io_error_fd
> fd_count
)
392 fd_count
= io_error_fd
;
395 tv
.tv_sec
= io_timeout
?io_timeout
:SELECT_TIMEOUT
;
400 count
= select(fd_count
+1,
401 io_error_fd
!= -1?&r_fds
:NULL
,
406 if (errno
== EBADF
) {
407 exit_cleanup(RERR_SOCKETIO
);
413 if (io_error_fd
!= -1 && FD_ISSET(io_error_fd
, &r_fds
)) {
417 if (FD_ISSET(fd
, &w_fds
)) {
418 int ret
, n
= len
-total
;
419 ret
= write(fd
,buf
+total
,n
);
421 if (ret
== -1 && errno
== EINTR
) {
426 (errno
== EWOULDBLOCK
|| errno
== EAGAIN
)) {
433 "error writing %d unbuffered bytes"
434 " - exiting: %s\n", len
,
436 exit_cleanup(RERR_STREAMIO
);
439 /* Sleep after writing to limit I/O bandwidth */
443 tv
.tv_usec
= ret
* 1000 / bwlimit
;
444 while (tv
.tv_usec
> 1000000)
447 tv
.tv_usec
-= 1000000;
449 select(0, NULL
, NULL
, NULL
, &tv
);
455 last_io
= time(NULL
);
463 static char *io_buffer
;
464 static int io_buffer_count
;
466 void io_start_buffering(int fd
)
468 if (io_buffer
) return;
469 multiplex_out_fd
= fd
;
470 io_buffer
= (char *)malloc(IO_BUFFER_SIZE
);
471 if (!io_buffer
) out_of_memory("writefd");
475 /* write an message to a multiplexed stream. If this fails then rsync
477 static void mplex_write(int fd
, enum logcode code
, char *buf
, int len
)
482 SIVAL(buffer
, 0, ((MPLEX_BASE
+ (int)code
)<<24) + len
);
484 if (n
> (sizeof(buffer
)-4)) {
485 n
= sizeof(buffer
)-4;
488 memcpy(&buffer
[4], buf
, n
);
489 writefd_unbuffered(fd
, buffer
, n
+4);
495 writefd_unbuffered(fd
, buf
, len
);
502 int fd
= multiplex_out_fd
;
506 if (!io_buffer_count
|| no_flush
) return;
508 if (io_multiplexing_out
) {
509 mplex_write(fd
, FNONE
, io_buffer
, io_buffer_count
);
511 writefd_unbuffered(fd
, io_buffer
, io_buffer_count
);
517 /* XXX: fd is ignored, which seems a little strange. */
518 void io_end_buffering(int fd
)
521 if (!io_multiplexing_out
) {
527 static void writefd(int fd
,char *buf
,int len
)
529 stats
.total_written
+= len
;
533 if (!io_buffer
|| fd
!= multiplex_out_fd
) {
534 writefd_unbuffered(fd
, buf
, len
);
539 int n
= MIN(len
, IO_BUFFER_SIZE
-io_buffer_count
);
541 memcpy(io_buffer
+io_buffer_count
, buf
, n
);
544 io_buffer_count
+= n
;
547 if (io_buffer_count
== IO_BUFFER_SIZE
) io_flush();
552 void write_int(int f
,int32 x
)
561 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
562 * 64-bit types on this platform.
564 void write_longint(int f
, int64 x
)
566 extern int remote_version
;
569 if (remote_version
< 16 || x
<= 0x7FFFFFFF) {
570 write_int(f
, (int)x
);
574 write_int(f
, (int32
)0xFFFFFFFF);
575 SIVAL(b
,0,(x
&0xFFFFFFFF));
576 SIVAL(b
,4,((x
>>32)&0xFFFFFFFF));
581 void write_buf(int f
,char *buf
,int len
)
586 /* write a string to the connection */
587 static void write_sbuf(int f
,char *buf
)
589 write_buf(f
, buf
, strlen(buf
));
593 void write_byte(int f
,unsigned char c
)
595 write_buf(f
,(char *)&c
,1);
600 int read_line(int f
, char *buf
, int maxlen
)
605 if (buf
[0] == 0) return 0;
606 if (buf
[0] == '\n') {
610 if (buf
[0] != '\r') {
624 void io_printf(int fd
, const char *format
, ...)
630 va_start(ap
, format
);
631 len
= vsnprintf(buf
, sizeof(buf
), format
, ap
);
634 if (len
< 0) exit_cleanup(RERR_STREAMIO
);
640 /* setup for multiplexing an error stream with the data stream */
641 void io_start_multiplex_out(int fd
)
643 multiplex_out_fd
= fd
;
645 io_start_buffering(fd
);
646 io_multiplexing_out
= 1;
649 /* setup for multiplexing an error stream with the data stream */
650 void io_start_multiplex_in(int fd
)
652 multiplex_in_fd
= fd
;
654 io_multiplexing_in
= 1;
657 /* write an message to the multiplexed error stream */
658 int io_multiplex_write(enum logcode code
, char *buf
, int len
)
660 if (!io_multiplexing_out
) return 0;
663 stats
.total_written
+= (len
+4);
664 mplex_write(multiplex_out_fd
, code
, buf
, len
);
668 /* stop output multiplexing */
669 void io_multiplexing_close(void)
671 io_multiplexing_out
= 0;