2 * Socket and pipe I/O utilities used in rsync.
4 * Copyright (C) 1996-2001 Andrew Tridgell
5 * Copyright (C) 1996 Paul Mackerras
6 * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org>
7 * Copyright (C) 2003-2008 Wayne Davison
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, visit the http://fsf.org website.
23 /* Rsync provides its own multiplexing system, which is used to send
24 * stderr and stdout over a single socket.
26 * For historical reasons this is off during the start of the
27 * connection, but it's switched on quite early using
28 * io_start_multiplex_out() and io_start_multiplex_in(). */
33 /** If no timeout is specified then use a 60 second select timeout */
34 #define SELECT_TIMEOUT 60
37 extern size_t bwlimit_writemax
;
38 extern int io_timeout
;
39 extern int allowed_lull
;
43 extern int am_generator
;
44 extern int inc_recurse
;
49 extern int read_batch
;
50 extern int csum_length
;
51 extern int protect_args
;
52 extern int checksum_seed
;
53 extern int protocol_version
;
54 extern int remove_source_files
;
55 extern int preserve_hard_links
;
56 extern struct stats stats
;
57 extern struct file_list
*cur_flist
;
59 extern int filesfrom_convert
;
60 extern iconv_t ic_send
, ic_recv
;
63 const char phase_unknown
[] = "unknown";
64 int ignore_timeout
= 0;
68 /* Ignore an EOF error if non-zero. See whine_about_eof(). */
69 int kluge_around_eof
= 0;
76 static int iobuf_f_in
= -1;
77 static char *iobuf_in
;
78 static size_t iobuf_in_siz
;
79 static size_t iobuf_in_ndx
;
80 static size_t iobuf_in_remaining
;
82 static int iobuf_f_out
= -1;
83 static char *iobuf_out
;
84 static int iobuf_out_cnt
;
86 int flist_forward_from
= -1;
88 static int io_multiplexing_out
;
89 static int io_multiplexing_in
;
90 static time_t last_io_in
;
91 static time_t last_io_out
;
94 static int write_batch_monitor_in
= -1;
95 static int write_batch_monitor_out
= -1;
97 static int io_filesfrom_f_in
= -1;
98 static int io_filesfrom_f_out
= -1;
99 static xbuf ff_buf
= EMPTY_XBUF
;
100 static char ff_lastchar
;
102 static xbuf iconv_buf
= EMPTY_XBUF
;
104 static int defer_forwarding_messages
= 0, defer_forwarding_keep
= 0;
105 static int select_timeout
= SELECT_TIMEOUT
;
106 static int active_filecnt
= 0;
107 static OFF_T active_bytecnt
= 0;
108 static int first_message
= 1;
110 static char int_byte_extra
[64] = {
111 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (00 - 3F)/4 */
112 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (40 - 7F)/4 */
113 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, /* (80 - BF)/4 */
114 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 5, 6, /* (C0 - FF)/4 */
117 #define REMOTE_OPTION_ERROR "rsync: on remote machine: -"
118 #define REMOTE_OPTION_ERROR2 ": unknown option"
120 enum festatus
{ FES_SUCCESS
, FES_REDO
, FES_NO_SEND
};
122 static void readfd(int fd
, char *buffer
, size_t N
);
123 static void writefd(int fd
, const char *buf
, size_t len
);
124 static void writefd_unbuffered(int fd
, const char *buf
, size_t len
);
125 static void mplex_write(int fd
, enum msgcode code
, const char *buf
, size_t len
, int convert
);
127 struct flist_ndx_item
{
128 struct flist_ndx_item
*next
;
132 struct flist_ndx_list
{
133 struct flist_ndx_item
*head
, *tail
;
136 static struct flist_ndx_list redo_list
, hlink_list
;
138 struct msg_list_item
{
139 struct msg_list_item
*next
;
145 struct msg_list_item
*head
, *tail
;
148 static struct msg_list msg_queue
;
150 static void flist_ndx_push(struct flist_ndx_list
*lp
, int ndx
)
152 struct flist_ndx_item
*item
;
154 if (!(item
= new(struct flist_ndx_item
)))
155 out_of_memory("flist_ndx_push");
159 lp
->tail
->next
= item
;
165 static int flist_ndx_pop(struct flist_ndx_list
*lp
)
167 struct flist_ndx_item
*next
;
174 next
= lp
->head
->next
;
183 static void got_flist_entry_status(enum festatus status
, const char *buf
)
185 int ndx
= IVAL(buf
, 0);
186 struct file_list
*flist
= flist_for_ndx(ndx
);
188 assert(flist
!= NULL
);
190 if (remove_source_files
) {
192 active_bytecnt
-= F_LENGTH(flist
->files
[ndx
- flist
->ndx_start
]);
196 flist
->in_progress
--;
200 if (remove_source_files
)
201 send_msg(MSG_SUCCESS
, buf
, 4, 0);
202 if (preserve_hard_links
) {
203 struct file_struct
*file
= flist
->files
[ndx
- flist
->ndx_start
];
204 if (F_IS_HLINKED(file
)) {
205 flist_ndx_push(&hlink_list
, ndx
);
206 flist
->in_progress
++;
213 flist_ndx_push(&redo_list
, ndx
);
220 static void check_timeout(void)
224 if (!io_timeout
|| ignore_timeout
)
228 last_io_in
= time(NULL
);
234 if (t
- last_io_in
>= io_timeout
) {
235 if (!am_server
&& !am_daemon
) {
236 rprintf(FERROR
, "io timeout after %d seconds -- exiting\n",
237 (int)(t
-last_io_in
));
239 exit_cleanup(RERR_TIMEOUT
);
243 /* Note the fds used for the main socket (which might really be a pipe
244 * for a local transfer, but we can ignore that). */
245 void io_set_sock_fds(int f_in
, int f_out
)
251 void set_io_timeout(int secs
)
255 if (!io_timeout
|| io_timeout
> SELECT_TIMEOUT
)
256 select_timeout
= SELECT_TIMEOUT
;
258 select_timeout
= io_timeout
;
260 allowed_lull
= read_batch
? 0 : (io_timeout
+ 1) / 2;
263 /* Setup the fd used to receive MSG_* messages. Only needed during the
264 * early stages of being a local sender (up through the sending of the
265 * file list) or when we're the generator (to fetch the messages from
267 void set_msg_fd_in(int fd
)
272 /* Setup the fd used to send our MSG_* messages. Only needed when
273 * we're the receiver (to send our messages to the generator). */
274 void set_msg_fd_out(int fd
)
277 set_nonblocking(msg_fd_out
);
280 /* Add a message to the pending MSG_* list. */
281 static void msg_list_add(struct msg_list
*lst
, int code
, const char *buf
, int len
, int convert
)
283 struct msg_list_item
*m
;
284 int sz
= len
+ 4 + sizeof m
[0] - 1;
286 if (!(m
= (struct msg_list_item
*)new_array(char, sz
)))
287 out_of_memory("msg_list_add");
289 m
->convert
= convert
;
290 SIVAL(m
->buf
, 0, ((code
+MPLEX_BASE
)<<24) | len
);
291 memcpy(m
->buf
+ 4, buf
, len
);
299 static void msg_flush(void)
302 while (msg_queue
.head
&& io_multiplexing_out
) {
303 struct msg_list_item
*m
= msg_queue
.head
;
304 int len
= IVAL(m
->buf
, 0) & 0xFFFFFF;
305 int tag
= *((uchar
*)m
->buf
+3) - MPLEX_BASE
;
306 if (!(msg_queue
.head
= m
->next
))
307 msg_queue
.tail
= NULL
;
308 stats
.total_written
+= len
+ 4;
309 defer_forwarding_messages
++;
310 mplex_write(sock_f_out
, tag
, m
->buf
+ 4, len
, m
->convert
);
311 defer_forwarding_messages
--;
315 while (msg_queue
.head
) {
316 struct msg_list_item
*m
= msg_queue
.head
;
317 int len
= IVAL(m
->buf
, 0) & 0xFFFFFF;
318 int tag
= *((uchar
*)m
->buf
+3) - MPLEX_BASE
;
319 if (!(msg_queue
.head
= m
->next
))
320 msg_queue
.tail
= NULL
;
321 defer_forwarding_messages
++;
322 mplex_write(msg_fd_out
, tag
, m
->buf
+ 4, len
, m
->convert
);
323 defer_forwarding_messages
--;
329 static void check_for_d_option_error(const char *msg
)
331 static char rsync263_opts
[] = "BCDHIKLPRSTWabceghlnopqrtuvxz";
336 || strncmp(msg
, REMOTE_OPTION_ERROR
, sizeof REMOTE_OPTION_ERROR
- 1) != 0)
339 msg
+= sizeof REMOTE_OPTION_ERROR
- 1;
340 if (*msg
== '-' || (colon
= strchr(msg
, ':')) == NULL
341 || strncmp(colon
, REMOTE_OPTION_ERROR2
, sizeof REMOTE_OPTION_ERROR2
- 1) != 0)
344 for ( ; *msg
!= ':'; msg
++) {
347 else if (*msg
== 'e')
349 else if (strchr(rsync263_opts
, *msg
) == NULL
)
355 "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n");
359 /* Read a message from the MSG_* fd and handle it. This is called either
360 * during the early stages of being a local sender (up through the sending
361 * of the file list) or when we're the generator (to fetch the messages
362 * from the receiver). */
363 static void read_msg_fd(void)
367 struct file_list
*flist
;
371 /* Temporarily disable msg_fd_in. This is needed to avoid looping back
372 * to this routine from writefd_unbuffered(). */
375 defer_forwarding_messages
++;
380 len
= tag
& 0xFFFFFF;
381 tag
= (tag
>> 24) - MPLEX_BASE
;
385 if (len
< 0 || len
> 1 || !am_generator
) {
387 rprintf(FERROR
, "invalid message %d:%d [%s%s]\n",
388 tag
, len
, who_am_i(),
389 inc_recurse
? "/inc" : "");
390 exit_cleanup(RERR_STREAMIO
);
393 readfd(fd
, buf
, len
);
394 stats
.total_read
= read_varlong(fd
, 3);
399 if (len
!= 4 || !am_generator
)
402 got_flist_entry_status(FES_REDO
, buf
);
405 if (len
!= 4 || !am_generator
|| !inc_recurse
)
408 /* Read extra file list from receiver. */
409 assert(iobuf_in
!= NULL
);
410 assert(iobuf_f_in
== fd
);
412 rprintf(FINFO
, "[%s] receiving flist for dir %d\n",
413 who_am_i(), IVAL(buf
,0));
415 flist
= recv_file_list(fd
);
416 flist
->parent_ndx
= IVAL(buf
,0);
417 #ifdef SUPPORT_HARD_LINKS
418 if (preserve_hard_links
)
419 match_hard_links(flist
);
423 if (len
!= 0 || !am_generator
|| !inc_recurse
)
430 readfd(fd
, buf
, len
);
431 io_error
|= IVAL(buf
, 0);
434 if (len
>= (int)sizeof buf
|| !am_generator
)
436 readfd(fd
, buf
, len
);
437 send_msg(MSG_DELETED
, buf
, len
, 1);
440 if (len
!= 4 || !am_generator
)
443 got_flist_entry_status(FES_SUCCESS
, buf
);
446 if (len
!= 4 || !am_generator
)
449 got_flist_entry_status(FES_NO_SEND
, buf
);
451 case MSG_ERROR_SOCKET
:
455 if (tag
== MSG_ERROR_SOCKET
)
456 io_end_multiplex_out();
468 rwrite((enum logcode
)tag
, buf
, n
, !am_generator
);
473 rprintf(FERROR
, "unknown message %d:%d [%s]\n",
474 tag
, len
, who_am_i());
475 exit_cleanup(RERR_STREAMIO
);
480 if (!--defer_forwarding_messages
&& !no_flush
)
484 /* This is used by the generator to limit how many file transfers can
485 * be active at once when --remove-source-files is specified. Without
486 * this, sender-side deletions were mostly happening at the end. */
487 void increment_active_files(int ndx
, int itemizing
, enum logcode code
)
489 /* TODO: tune these limits? */
490 while (active_filecnt
>= (active_bytecnt
>= 128*1024 ? 10 : 50)) {
491 check_for_finished_files(itemizing
, code
, 0);
493 io_flush(NORMAL_FLUSH
);
499 active_bytecnt
+= F_LENGTH(cur_flist
->files
[ndx
- cur_flist
->ndx_start
]);
502 /* Write an message to a multiplexed stream. If this fails, rsync exits. */
503 static void mplex_write(int fd
, enum msgcode code
, const char *buf
, size_t len
, int convert
)
505 char buffer
[BIGPATHBUFLEN
]; /* Oversized for use by iconv code. */
509 /* We need to convert buf before doing anything else so that we
510 * can include the (converted) byte length in the message header. */
511 if (convert
&& ic_send
!= (iconv_t
)-1) {
514 INIT_XBUF(outbuf
, buffer
+ 4, 0, sizeof buffer
- 4);
515 INIT_XBUF(inbuf
, (char*)buf
, len
, -1);
517 iconvbufs(ic_send
, &inbuf
, &outbuf
,
518 ICB_INCLUDE_BAD
| ICB_INCLUDE_INCOMPLETE
);
520 rprintf(FERROR
, "overflowed conversion buffer in mplex_write");
521 exit_cleanup(RERR_UNSUPPORTED
);
524 n
= len
= outbuf
.len
;
527 if (n
> 1024 - 4) /* BIGPATHBUFLEN can handle 1024 bytes */
528 n
= 0; /* We'd rather do 2 writes than too much memcpy(). */
530 memcpy(buffer
+ 4, buf
, n
);
532 SIVAL(buffer
, 0, ((MPLEX_BASE
+ (int)code
)<<24) + len
);
534 defer_forwarding_keep
= 1; /* defer_forwarding_messages++ on return */
535 writefd_unbuffered(fd
, buffer
, n
+4);
536 defer_forwarding_keep
= 0;
539 writefd_unbuffered(fd
, buf
+n
, len
-n
);
541 if (!--defer_forwarding_messages
&& !no_flush
)
545 int send_msg(enum msgcode code
, const char *buf
, int len
, int convert
)
547 if (msg_fd_out
< 0) {
548 if (!defer_forwarding_messages
)
549 return io_multiplex_write(code
, buf
, len
, convert
);
550 if (!io_multiplexing_out
)
552 msg_list_add(&msg_queue
, code
, buf
, len
, convert
);
555 if (flist_forward_from
>= 0)
556 msg_list_add(&msg_queue
, code
, buf
, len
, convert
);
558 mplex_write(msg_fd_out
, code
, buf
, len
, convert
);
562 void send_msg_int(enum msgcode code
, int num
)
565 SIVAL(numbuf
, 0, num
);
566 send_msg(code
, numbuf
, 4, 0);
569 void wait_for_receiver(void)
572 io_flush(NORMAL_FLUSH
);
577 int get_redo_num(void)
579 return flist_ndx_pop(&redo_list
);
582 int get_hlink_num(void)
584 return flist_ndx_pop(&hlink_list
);
588 * When we're the receiver and we have a local --files-from list of names
589 * that needs to be sent over the socket to the sender, we have to do two
590 * things at the same time: send the sender a list of what files we're
591 * processing and read the incoming file+info list from the sender. We do
592 * this by augmenting the read_timeout() function to copy this data. It
593 * uses ff_buf to read a block of data from f_in (when it is ready, since
594 * it might be a pipe) and then blast it out f_out (when it is ready to
595 * receive more data).
597 void io_set_filesfrom_fds(int f_in
, int f_out
)
599 io_filesfrom_f_in
= f_in
;
600 io_filesfrom_f_out
= f_out
;
601 alloc_xbuf(&ff_buf
, 2048);
604 alloc_xbuf(&iconv_buf
, 1024);
608 /* It's almost always an error to get an EOF when we're trying to read from the
609 * network, because the protocol is (for the most part) self-terminating.
611 * There is one case for the receiver when it is at the end of the transfer
612 * (hanging around reading any keep-alive packets that might come its way): if
613 * the sender dies before the generator's kill-signal comes through, we can end
614 * up here needing to loop until the kill-signal arrives. In this situation,
615 * kluge_around_eof will be < 0.
617 * There is another case for older protocol versions (< 24) where the module
618 * listing was not terminated, so we must ignore an EOF error in that case and
619 * exit. In this situation, kluge_around_eof will be > 0. */
620 static void whine_about_eof(int fd
)
622 if (kluge_around_eof
&& fd
== sock_f_in
) {
624 if (kluge_around_eof
> 0)
626 /* If we're still here after 10 seconds, exit with an error. */
627 for (i
= 10*1000/20; i
--; )
631 rprintf(FERROR
, RSYNC_NAME
": connection unexpectedly closed "
632 "(%.0f bytes received so far) [%s]\n",
633 (double)stats
.total_read
, who_am_i());
635 exit_cleanup(RERR_STREAMIO
);
639 * Read from a socket with I/O timeout. return the number of bytes
640 * read. If no bytes can be read then exit, never return a number <= 0.
642 * TODO: If the remote shell connection fails, then current versions
643 * actually report an "unexpected EOF" error here. Since it's a
644 * fairly common mistake to try to use rsh when ssh is required, we
645 * should trap that: if we fail to read any data at all, we should
646 * give a better explanation. We can tell whether the connection has
647 * started by looking e.g. at whether the remote version is known yet.
649 static int read_timeout(int fd
, char *buf
, size_t len
)
653 io_flush(FULL_FLUSH
);
656 /* until we manage to read *something* */
665 if (io_filesfrom_f_out
>= 0) {
667 if (ff_buf
.len
== 0) {
668 if (io_filesfrom_f_in
>= 0) {
669 FD_SET(io_filesfrom_f_in
, &r_fds
);
670 new_fd
= io_filesfrom_f_in
;
672 io_filesfrom_f_out
= -1;
676 FD_SET(io_filesfrom_f_out
, &w_fds
);
677 new_fd
= io_filesfrom_f_out
;
683 tv
.tv_sec
= select_timeout
;
688 count
= select(maxfd
+ 1, &r_fds
, &w_fds
, NULL
, &tv
);
691 if (errno
== EBADF
) {
692 defer_forwarding_messages
= 0;
693 exit_cleanup(RERR_SOCKETIO
);
699 if (io_filesfrom_f_out
>= 0) {
701 if (FD_ISSET(io_filesfrom_f_out
, &w_fds
)) {
702 int l
= write(io_filesfrom_f_out
,
703 ff_buf
.buf
+ ff_buf
.pos
,
706 if (!(ff_buf
.len
-= l
))
710 } else if (errno
!= EINTR
) {
711 /* XXX should we complain? */
712 io_filesfrom_f_out
= -1;
715 } else if (io_filesfrom_f_in
>= 0) {
716 if (FD_ISSET(io_filesfrom_f_in
, &r_fds
)) {
718 xbuf
*ibuf
= filesfrom_convert
? &iconv_buf
: &ff_buf
;
720 xbuf
*ibuf
= &ff_buf
;
722 int l
= read(io_filesfrom_f_in
, ibuf
->buf
, ibuf
->size
);
724 if (l
== 0 || errno
!= EINTR
) {
725 /* Send end-of-file marker */
726 memcpy(ff_buf
.buf
, "\0\0", 2);
727 ff_buf
.len
= ff_lastchar
? 2 : 1;
729 io_filesfrom_f_in
= -1;
733 if (filesfrom_convert
) {
736 iconvbufs(ic_send
, &iconv_buf
, &ff_buf
,
737 ICB_EXPAND_OUT
|ICB_INCLUDE_BAD
|ICB_INCLUDE_INCOMPLETE
);
742 char *s
= ff_buf
.buf
+ l
;
743 /* Transform CR and/or LF into '\0' */
744 while (s
-- > ff_buf
.buf
) {
745 if (*s
== '\n' || *s
== '\r')
750 /* Last buf ended with a '\0', so don't
751 * let this buf start with one. */
752 while (l
&& ff_buf
.buf
[ff_buf
.pos
] == '\0')
758 char *f
= ff_buf
.buf
+ ff_buf
.pos
;
761 /* Eliminate any multi-'\0' runs. */
763 if (!(*t
++ = *f
++)) {
764 while (f
!= eob
&& !*f
)
776 if (!FD_ISSET(fd
, &r_fds
))
779 n
= read(fd
, buf
, len
);
783 whine_about_eof(fd
); /* Doesn't return. */
784 if (errno
== EINTR
|| errno
== EWOULDBLOCK
788 /* Don't write errors on a dead socket. */
789 if (fd
== sock_f_in
) {
790 io_end_multiplex_out();
791 rsyserr(FERROR_SOCKET
, errno
, "read error");
793 rsyserr(FERROR
, errno
, "read error");
794 exit_cleanup(RERR_STREAMIO
);
801 if (fd
== sock_f_in
&& io_timeout
)
802 last_io_in
= time(NULL
);
808 /* Read a line into the "buf" buffer. */
809 int read_line(int fd
, char *buf
, size_t bufsiz
, int flags
)
815 if (flags
& RL_CONVERT
&& iconv_buf
.size
< bufsiz
)
816 realloc_xbuf(&iconv_buf
, bufsiz
+ 1024);
821 s
= flags
& RL_CONVERT
? iconv_buf
.buf
: buf
;
825 eob
= s
+ bufsiz
- 1;
827 cnt
= read(fd
, &ch
, 1);
828 if (cnt
< 0 && (errno
== EWOULDBLOCK
829 || errno
== EINTR
|| errno
== EAGAIN
)) {
836 tv
.tv_sec
= select_timeout
;
838 if (!select(fd
+1, &r_fds
, NULL
, &e_fds
, &tv
))
840 /*if (FD_ISSET(fd, &e_fds))
841 rprintf(FINFO, "select exception on fd %d\n", fd); */
846 if (flags
& RL_EOL_NULLS
? ch
== '\0' : (ch
== '\r' || ch
== '\n')) {
847 /* Skip empty lines if dumping comments. */
848 if (flags
& RL_DUMP_COMMENTS
&& s
== buf
)
857 if (flags
& RL_DUMP_COMMENTS
&& (*buf
== '#' || *buf
== ';'))
861 if (flags
& RL_CONVERT
) {
863 INIT_XBUF(outbuf
, buf
, 0, bufsiz
);
865 iconv_buf
.len
= s
- iconv_buf
.buf
;
866 iconvbufs(ic_recv
, &iconv_buf
, &outbuf
,
867 ICB_INCLUDE_BAD
| ICB_INCLUDE_INCOMPLETE
);
868 outbuf
.buf
[outbuf
.len
] = '\0';
876 void read_args(int f_in
, char *mod_name
, char *buf
, size_t bufsiz
, int rl_nulls
,
877 char ***argv_p
, int *argc_p
, char **request_p
)
879 int maxargs
= MAX_ARGS
;
883 int rl_flags
= (rl_nulls
? RL_EOL_NULLS
: 0);
886 rl_flags
|= (protect_args
&& ic_recv
!= (iconv_t
)-1 ? RL_CONVERT
: 0);
889 if (!(argv
= new_array(char *, maxargs
)))
890 out_of_memory("read_args");
891 if (mod_name
&& !protect_args
)
892 argv
[argc
++] = "rsyncd";
895 if (read_line(f_in
, buf
, bufsiz
, rl_flags
) == 0)
898 if (argc
== maxargs
-1) {
900 if (!(argv
= realloc_array(argv
, char *, maxargs
)))
901 out_of_memory("read_args");
906 *request_p
= strdup(buf
);
910 glob_expand_module(mod_name
, buf
, &argv
, &argc
, &maxargs
);
912 glob_expand(buf
, &argv
, &argc
, &maxargs
);
914 if (!(p
= strdup(buf
)))
915 out_of_memory("read_args");
917 if (*p
== '.' && p
[1] == '\0')
923 glob_expand(NULL
, NULL
, NULL
, NULL
);
929 int io_start_buffering_out(int f_out
)
932 assert(f_out
== iobuf_f_out
);
935 if (!(iobuf_out
= new_array(char, IO_BUFFER_SIZE
)))
936 out_of_memory("io_start_buffering_out");
942 int io_start_buffering_in(int f_in
)
945 assert(f_in
== iobuf_f_in
);
948 iobuf_in_siz
= 2 * IO_BUFFER_SIZE
;
949 if (!(iobuf_in
= new_array(char, iobuf_in_siz
)))
950 out_of_memory("io_start_buffering_in");
955 void io_end_buffering_in(void)
962 iobuf_in_remaining
= 0;
966 void io_end_buffering_out(void)
970 io_flush(FULL_FLUSH
);
976 void maybe_flush_socket(int important
)
978 if (iobuf_out
&& iobuf_out_cnt
979 && (important
|| time(NULL
) - last_io_out
>= 5))
980 io_flush(NORMAL_FLUSH
);
983 void maybe_send_keepalive(void)
985 if (time(NULL
) - last_io_out
>= allowed_lull
) {
986 if (!iobuf_out
|| !iobuf_out_cnt
) {
987 if (protocol_version
< 29)
988 return; /* there's nothing we can do */
989 if (protocol_version
>= 30)
990 send_msg(MSG_NOOP
, "", 0, 0);
992 write_int(sock_f_out
, cur_flist
->used
);
993 write_shortint(sock_f_out
, ITEM_IS_NEW
);
997 io_flush(NORMAL_FLUSH
);
1001 void start_flist_forward(int f_in
)
1003 assert(iobuf_out
!= NULL
);
1004 assert(iobuf_f_out
== msg_fd_out
);
1005 flist_forward_from
= f_in
;
1008 void stop_flist_forward()
1010 flist_forward_from
= -1;
1011 io_flush(FULL_FLUSH
);
1015 * Continue trying to read len bytes - don't return until len has been
1018 static void read_loop(int fd
, char *buf
, size_t len
)
1021 int n
= read_timeout(fd
, buf
, len
);
1029 * Read from the file descriptor handling multiplexing - return number
1032 * Never returns <= 0.
1034 static int readfd_unbuffered(int fd
, char *buf
, size_t len
)
1038 char line
[BIGPATHBUFLEN
];
1040 if (!iobuf_in
|| fd
!= iobuf_f_in
)
1041 return read_timeout(fd
, buf
, len
);
1043 if (!io_multiplexing_in
&& iobuf_in_remaining
== 0) {
1044 iobuf_in_remaining
= read_timeout(fd
, iobuf_in
, iobuf_in_siz
);
1049 if (iobuf_in_remaining
) {
1050 len
= MIN(len
, iobuf_in_remaining
);
1051 memcpy(buf
, iobuf_in
+ iobuf_in_ndx
, len
);
1052 iobuf_in_ndx
+= len
;
1053 iobuf_in_remaining
-= len
;
1058 read_loop(fd
, line
, 4);
1059 tag
= IVAL(line
, 0);
1061 msg_bytes
= tag
& 0xFFFFFF;
1062 tag
= (tag
>> 24) - MPLEX_BASE
;
1066 if (msg_bytes
> iobuf_in_siz
) {
1067 if (!(iobuf_in
= realloc_array(iobuf_in
, char,
1069 out_of_memory("readfd_unbuffered");
1070 iobuf_in_siz
= msg_bytes
;
1072 read_loop(fd
, iobuf_in
, msg_bytes
);
1073 iobuf_in_remaining
= msg_bytes
;
1078 maybe_send_keepalive();
1083 read_loop(fd
, line
, msg_bytes
);
1084 send_msg_int(MSG_IO_ERROR
, IVAL(line
, 0));
1085 io_error
|= IVAL(line
, 0);
1088 if (msg_bytes
>= sizeof line
)
1091 if (ic_recv
!= (iconv_t
)-1) {
1097 INIT_CONST_XBUF(outbuf
, line
);
1098 INIT_XBUF(inbuf
, ibuf
, 0, -1);
1101 inbuf
.len
= msg_bytes
> sizeof ibuf
1102 ? sizeof ibuf
: msg_bytes
;
1103 read_loop(fd
, inbuf
.buf
, inbuf
.len
);
1104 if (!(msg_bytes
-= inbuf
.len
)
1105 && !ibuf
[inbuf
.len
-1])
1106 inbuf
.len
--, add_null
= 1;
1107 if (iconvbufs(ic_send
, &inbuf
, &outbuf
,
1108 ICB_INCLUDE_BAD
| ICB_INCLUDE_INCOMPLETE
) < 0)
1113 if (outbuf
.len
== outbuf
.size
)
1115 outbuf
.buf
[outbuf
.len
++] = '\0';
1117 msg_bytes
= outbuf
.len
;
1120 read_loop(fd
, line
, msg_bytes
);
1121 /* A directory name was sent with the trailing null */
1122 if (msg_bytes
> 0 && !line
[msg_bytes
-1])
1123 log_delete(line
, S_IFDIR
);
1125 line
[msg_bytes
] = '\0';
1126 log_delete(line
, S_IFREG
);
1130 if (msg_bytes
!= 4) {
1132 rprintf(FERROR
, "invalid multi-message %d:%ld [%s]\n",
1133 tag
, (long)msg_bytes
, who_am_i());
1134 exit_cleanup(RERR_STREAMIO
);
1136 read_loop(fd
, line
, msg_bytes
);
1137 successful_send(IVAL(line
, 0));
1142 read_loop(fd
, line
, msg_bytes
);
1143 send_msg_int(MSG_NO_SEND
, IVAL(line
, 0));
1147 case MSG_ERROR_XFER
:
1149 if (msg_bytes
>= sizeof line
) {
1152 "multiplexing overflow %d:%ld [%s]\n",
1153 tag
, (long)msg_bytes
, who_am_i());
1154 exit_cleanup(RERR_STREAMIO
);
1156 read_loop(fd
, line
, msg_bytes
);
1157 rwrite((enum logcode
)tag
, line
, msg_bytes
, 1);
1158 if (first_message
) {
1159 if (list_only
&& !am_sender
&& tag
== 1) {
1160 line
[msg_bytes
] = '\0';
1161 check_for_d_option_error(line
);
1167 rprintf(FERROR
, "unexpected tag %d [%s]\n",
1169 exit_cleanup(RERR_STREAMIO
);
1173 if (iobuf_in_remaining
== 0)
1174 io_flush(NORMAL_FLUSH
);
1179 /* Do a buffered read from fd. Don't return until all N bytes have
1180 * been read. If all N can't be read then exit with an error. */
1181 static void readfd(int fd
, char *buffer
, size_t N
)
1187 cnt
= readfd_unbuffered(fd
, buffer
+ total
, N
-total
);
1191 if (fd
== write_batch_monitor_in
) {
1192 if ((size_t)write(batch_fd
, buffer
, total
) != total
)
1193 exit_cleanup(RERR_FILEIO
);
1196 if (fd
== flist_forward_from
)
1197 writefd(iobuf_f_out
, buffer
, total
);
1199 if (fd
== sock_f_in
)
1200 stats
.total_read
+= total
;
1203 unsigned short read_shortint(int f
)
1207 return (UVAL(b
, 1) << 8) + UVAL(b
, 0);
1210 int32
read_int(int f
)
1217 #if SIZEOF_INT32 > 4
1218 if (num
& (int32
)0x80000000)
1219 num
|= ~(int32
)0xffffffff;
1224 int32
read_varint(int f
)
1234 readfd(f
, (char*)&ch
, 1);
1235 extra
= int_byte_extra
[ch
/ 4];
1237 uchar bit
= ((uchar
)1<<(8-extra
));
1238 if (extra
>= (int)sizeof u
.b
) {
1239 rprintf(FERROR
, "Overflow in read_varint()\n");
1240 exit_cleanup(RERR_STREAMIO
);
1242 readfd(f
, u
.b
, extra
);
1243 u
.b
[extra
] = ch
& (bit
-1);
1246 #if CAREFUL_ALIGNMENT
1249 #if SIZEOF_INT32 > 4
1250 if (u
.x
& (int32
)0x80000000)
1251 u
.x
|= ~(int32
)0xffffffff;
1256 int64
read_varlong(int f
, uchar min_bytes
)
1265 #if SIZEOF_INT64 < 8
1270 readfd(f
, b2
, min_bytes
);
1271 memcpy(u
.b
, b2
+1, min_bytes
-1);
1272 extra
= int_byte_extra
[CVAL(b2
, 0) / 4];
1274 uchar bit
= ((uchar
)1<<(8-extra
));
1275 if (min_bytes
+ extra
> (int)sizeof u
.b
) {
1276 rprintf(FERROR
, "Overflow in read_varlong()\n");
1277 exit_cleanup(RERR_STREAMIO
);
1279 readfd(f
, u
.b
+ min_bytes
- 1, extra
);
1280 u
.b
[min_bytes
+ extra
- 1] = CVAL(b2
, 0) & (bit
-1);
1281 #if SIZEOF_INT64 < 8
1282 if (min_bytes
+ extra
> 5 || u
.b
[4] || CVAL(u
.b
,3) & 0x80) {
1283 rprintf(FERROR
, "Integer overflow: attempted 64-bit offset\n");
1284 exit_cleanup(RERR_UNSUPPORTED
);
1288 u
.b
[min_bytes
+ extra
- 1] = CVAL(b2
, 0);
1289 #if SIZEOF_INT64 < 8
1291 #elif CAREFUL_ALIGNMENT
1292 u
.x
= IVAL(u
.b
,0) | (((int64
)IVAL(u
.b
,4))<<32);
1297 int64
read_longint(int f
)
1299 #if SIZEOF_INT64 >= 8
1302 int32 num
= read_int(f
);
1304 if (num
!= (int32
)0xffffffff)
1307 #if SIZEOF_INT64 < 8
1308 rprintf(FERROR
, "Integer overflow: attempted 64-bit offset\n");
1309 exit_cleanup(RERR_UNSUPPORTED
);
1312 return IVAL(b
,0) | (((int64
)IVAL(b
,4))<<32);
1316 void read_buf(int f
, char *buf
, size_t len
)
1321 void read_sbuf(int f
, char *buf
, size_t len
)
1323 readfd(f
, buf
, len
);
1327 uchar
read_byte(int f
)
1330 readfd(f
, (char *)&c
, 1);
1334 int read_vstring(int f
, char *buf
, int bufsize
)
1336 int len
= read_byte(f
);
1339 len
= (len
& ~0x80) * 0x100 + read_byte(f
);
1341 if (len
>= bufsize
) {
1342 rprintf(FERROR
, "over-long vstring received (%d > %d)\n",
1348 readfd(f
, buf
, len
);
1353 /* Populate a sum_struct with values from the socket. This is
1354 * called by both the sender and the receiver. */
1355 void read_sum_head(int f
, struct sum_struct
*sum
)
1357 sum
->count
= read_int(f
);
1358 if (sum
->count
< 0) {
1359 rprintf(FERROR
, "Invalid checksum count %ld [%s]\n",
1360 (long)sum
->count
, who_am_i());
1361 exit_cleanup(RERR_PROTOCOL
);
1363 sum
->blength
= read_int(f
);
1364 if (sum
->blength
< 0 || sum
->blength
> MAX_BLOCK_SIZE
) {
1365 rprintf(FERROR
, "Invalid block length %ld [%s]\n",
1366 (long)sum
->blength
, who_am_i());
1367 exit_cleanup(RERR_PROTOCOL
);
1369 sum
->s2length
= protocol_version
< 27 ? csum_length
: (int)read_int(f
);
1370 if (sum
->s2length
< 0 || sum
->s2length
> MAX_DIGEST_LEN
) {
1371 rprintf(FERROR
, "Invalid checksum length %d [%s]\n",
1372 sum
->s2length
, who_am_i());
1373 exit_cleanup(RERR_PROTOCOL
);
1375 sum
->remainder
= read_int(f
);
1376 if (sum
->remainder
< 0 || sum
->remainder
> sum
->blength
) {
1377 rprintf(FERROR
, "Invalid remainder length %ld [%s]\n",
1378 (long)sum
->remainder
, who_am_i());
1379 exit_cleanup(RERR_PROTOCOL
);
1383 /* Send the values from a sum_struct over the socket. Set sum to
1384 * NULL if there are no checksums to send. This is called by both
1385 * the generator and the sender. */
1386 void write_sum_head(int f
, struct sum_struct
*sum
)
1388 static struct sum_struct null_sum
;
1393 write_int(f
, sum
->count
);
1394 write_int(f
, sum
->blength
);
1395 if (protocol_version
>= 27)
1396 write_int(f
, sum
->s2length
);
1397 write_int(f
, sum
->remainder
);
1401 * Sleep after writing to limit I/O bandwidth usage.
1403 * @todo Rather than sleeping after each write, it might be better to
1404 * use some kind of averaging. The current algorithm seems to always
1405 * use a bit less bandwidth than specified, because it doesn't make up
1406 * for slow periods. But arguably this is a feature. In addition, we
1407 * ought to take the time used to write the data into account.
1409 * During some phases of big transfers (file FOO is uptodate) this is
1410 * called with a small bytes_written every time. As the kernel has to
1411 * round small waits up to guarantee that we actually wait at least the
1412 * requested number of microseconds, this can become grossly inaccurate.
1413 * We therefore keep track of the bytes we've written over time and only
1414 * sleep when the accumulated delay is at least 1 tenth of a second.
1416 static void sleep_for_bwlimit(int bytes_written
)
1418 static struct timeval prior_tv
;
1419 static long total_written
= 0;
1420 struct timeval tv
, start_tv
;
1421 long elapsed_usec
, sleep_usec
;
1423 #define ONE_SEC 1000000L /* # of microseconds in a second */
1425 if (!bwlimit_writemax
)
1428 total_written
+= bytes_written
;
1430 gettimeofday(&start_tv
, NULL
);
1431 if (prior_tv
.tv_sec
) {
1432 elapsed_usec
= (start_tv
.tv_sec
- prior_tv
.tv_sec
) * ONE_SEC
1433 + (start_tv
.tv_usec
- prior_tv
.tv_usec
);
1434 total_written
-= elapsed_usec
* bwlimit
/ (ONE_SEC
/1024);
1435 if (total_written
< 0)
1439 sleep_usec
= total_written
* (ONE_SEC
/1024) / bwlimit
;
1440 if (sleep_usec
< ONE_SEC
/ 10) {
1441 prior_tv
= start_tv
;
1445 tv
.tv_sec
= sleep_usec
/ ONE_SEC
;
1446 tv
.tv_usec
= sleep_usec
% ONE_SEC
;
1447 select(0, NULL
, NULL
, NULL
, &tv
);
1449 gettimeofday(&prior_tv
, NULL
);
1450 elapsed_usec
= (prior_tv
.tv_sec
- start_tv
.tv_sec
) * ONE_SEC
1451 + (prior_tv
.tv_usec
- start_tv
.tv_usec
);
1452 total_written
= (sleep_usec
- elapsed_usec
) * bwlimit
/ (ONE_SEC
/1024);
1455 /* Write len bytes to the file descriptor fd, looping as necessary to get
1456 * the job done and also (in certain circumstances) reading any data on
1457 * msg_fd_in to avoid deadlock.
1459 * This function underlies the multiplexing system. The body of the
1460 * application never calls this function directly. */
1461 static void writefd_unbuffered(int fd
, const char *buf
, size_t len
)
1463 size_t n
, total
= 0;
1464 fd_set w_fds
, r_fds
, e_fds
;
1465 int maxfd
, count
, cnt
, using_r_fds
;
1470 defer_forwarding_messages
++, defer_inc
++;
1472 while (total
< len
) {
1479 if (msg_fd_in
>= 0) {
1481 FD_SET(msg_fd_in
, &r_fds
);
1482 if (msg_fd_in
> maxfd
)
1488 tv
.tv_sec
= select_timeout
;
1492 count
= select(maxfd
+ 1, using_r_fds
? &r_fds
: NULL
,
1493 &w_fds
, &e_fds
, &tv
);
1496 if (count
< 0 && errno
== EBADF
)
1497 exit_cleanup(RERR_SOCKETIO
);
1502 /*if (FD_ISSET(fd, &e_fds))
1503 rprintf(FINFO, "select exception on fd %d\n", fd); */
1505 if (using_r_fds
&& FD_ISSET(msg_fd_in
, &r_fds
))
1508 if (!FD_ISSET(fd
, &w_fds
))
1512 if (bwlimit_writemax
&& n
> bwlimit_writemax
)
1513 n
= bwlimit_writemax
;
1514 cnt
= write(fd
, buf
+ total
, n
);
1520 if (errno
== EWOULDBLOCK
|| errno
== EAGAIN
) {
1526 /* Don't try to write errors back across the stream. */
1527 if (fd
== sock_f_out
)
1528 io_end_multiplex_out();
1529 /* Don't try to write errors down a failing msg pipe. */
1530 if (am_server
&& fd
== msg_fd_out
)
1531 exit_cleanup(RERR_STREAMIO
);
1532 rsyserr(FERROR
, errno
,
1533 "writefd_unbuffered failed to write %ld bytes [%s]",
1534 (long)len
, who_am_i());
1535 /* If the other side is sending us error messages, try
1536 * to grab any messages they sent before they died. */
1537 while (!am_server
&& fd
== sock_f_out
&& io_multiplexing_in
) {
1541 readfd_unbuffered(sock_f_in
, buf
, sizeof buf
);
1543 exit_cleanup(RERR_STREAMIO
);
1547 defer_forwarding_messages
++, defer_inc
++;
1549 if (fd
== sock_f_out
) {
1550 if (io_timeout
|| am_generator
)
1551 last_io_out
= time(NULL
);
1552 sleep_for_bwlimit(cnt
);
1557 defer_inc
-= defer_forwarding_keep
;
1558 if (!(defer_forwarding_messages
-= defer_inc
) && !no_flush
)
1562 void io_flush(int flush_it_all
)
1564 if (!iobuf_out_cnt
|| no_flush
)
1567 if (io_multiplexing_out
)
1568 mplex_write(sock_f_out
, MSG_DATA
, iobuf_out
, iobuf_out_cnt
, 0);
1570 writefd_unbuffered(iobuf_f_out
, iobuf_out
, iobuf_out_cnt
);
1573 if (flush_it_all
&& !defer_forwarding_messages
)
1577 static void writefd(int fd
, const char *buf
, size_t len
)
1579 if (fd
== sock_f_out
)
1580 stats
.total_written
+= len
;
1582 if (fd
== write_batch_monitor_out
) {
1583 if ((size_t)write(batch_fd
, buf
, len
) != len
)
1584 exit_cleanup(RERR_FILEIO
);
1587 if (!iobuf_out
|| fd
!= iobuf_f_out
) {
1588 writefd_unbuffered(fd
, buf
, len
);
1593 int n
= MIN((int)len
, IO_BUFFER_SIZE
- iobuf_out_cnt
);
1595 memcpy(iobuf_out
+iobuf_out_cnt
, buf
, n
);
1601 if (iobuf_out_cnt
== IO_BUFFER_SIZE
)
1602 io_flush(NORMAL_FLUSH
);
1606 void write_shortint(int f
, unsigned short x
)
1610 b
[1] = (char)(x
>> 8);
1614 void write_int(int f
, int32 x
)
1621 void write_varint(int f
, int32 x
)
1629 while (cnt
> 1 && b
[cnt
] == 0)
1631 bit
= ((uchar
)1<<(7-cnt
+1));
1632 if (CVAL(b
, cnt
) >= bit
) {
1636 *b
= b
[cnt
] | ~(bit
*2-1);
1643 void write_varlong(int f
, int64 x
, uchar min_bytes
)
1650 #if SIZEOF_INT64 >= 8
1651 SIVAL(b
, 5, x
>> 32);
1653 if (x
<= 0x7FFFFFFF && x
>= 0)
1654 memset(b
+ 5, 0, 4);
1656 rprintf(FERROR
, "Integer overflow: attempted 64-bit offset\n");
1657 exit_cleanup(RERR_UNSUPPORTED
);
1661 while (cnt
> min_bytes
&& b
[cnt
] == 0)
1663 bit
= ((uchar
)1<<(7-cnt
+min_bytes
));
1664 if (CVAL(b
, cnt
) >= bit
) {
1667 } else if (cnt
> min_bytes
)
1668 *b
= b
[cnt
] | ~(bit
*2-1);
1676 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
1677 * 64-bit types on this platform.
1679 void write_longint(int f
, int64 x
)
1681 char b
[12], * const s
= b
+4;
1684 if (x
<= 0x7FFFFFFF && x
>= 0) {
1689 #if SIZEOF_INT64 < 8
1690 rprintf(FERROR
, "Integer overflow: attempted 64-bit offset\n");
1691 exit_cleanup(RERR_UNSUPPORTED
);
1694 SIVAL(s
, 4, x
>> 32);
1699 void write_buf(int f
, const char *buf
, size_t len
)
1704 /** Write a string to the connection */
1705 void write_sbuf(int f
, const char *buf
)
1707 writefd(f
, buf
, strlen(buf
));
1710 void write_byte(int f
, uchar c
)
1712 writefd(f
, (char *)&c
, 1);
1715 void write_vstring(int f
, const char *str
, int len
)
1717 uchar lenbuf
[3], *lb
= lenbuf
;
1722 "attempting to send over-long vstring (%d > %d)\n",
1724 exit_cleanup(RERR_PROTOCOL
);
1726 *lb
++ = len
/ 0x100 + 0x80;
1730 writefd(f
, (char*)lenbuf
, lb
- lenbuf
+ 1);
1732 writefd(f
, str
, len
);
1735 /* Send a file-list index using a byte-reduction method. */
1736 void write_ndx(int f
, int32 ndx
)
1738 static int32 prev_positive
= -1, prev_negative
= 1;
1739 int32 diff
, cnt
= 0;
1742 if (protocol_version
< 30 || read_batch
) {
1747 /* Send NDX_DONE as a single-byte 0 with no side effects. Send
1748 * negative nums as a positive after sending a leading 0xFF. */
1750 diff
= ndx
- prev_positive
;
1751 prev_positive
= ndx
;
1752 } else if (ndx
== NDX_DONE
) {
1757 b
[cnt
++] = (char)0xFF;
1759 diff
= ndx
- prev_negative
;
1760 prev_negative
= ndx
;
1763 /* A diff of 1 - 253 is sent as a one-byte diff; a diff of 254 - 32767
1764 * or 0 is sent as a 0xFE + a two-byte diff; otherwise we send 0xFE
1765 * & all 4 bytes of the (non-negative) num with the high-bit set. */
1766 if (diff
< 0xFE && diff
> 0)
1767 b
[cnt
++] = (char)diff
;
1768 else if (diff
< 0 || diff
> 0x7FFF) {
1769 b
[cnt
++] = (char)0xFE;
1770 b
[cnt
++] = (char)((ndx
>> 24) | 0x80);
1771 b
[cnt
++] = (char)ndx
;
1772 b
[cnt
++] = (char)(ndx
>> 8);
1773 b
[cnt
++] = (char)(ndx
>> 16);
1775 b
[cnt
++] = (char)0xFE;
1776 b
[cnt
++] = (char)(diff
>> 8);
1777 b
[cnt
++] = (char)diff
;
1782 /* Receive a file-list index using a byte-reduction method. */
1783 int32
read_ndx(int f
)
1785 static int32 prev_positive
= -1, prev_negative
= 1;
1786 int32
*prev_ptr
, num
;
1789 if (protocol_version
< 30)
1793 if (CVAL(b
, 0) == 0xFF) {
1795 prev_ptr
= &prev_negative
;
1796 } else if (CVAL(b
, 0) == 0)
1799 prev_ptr
= &prev_positive
;
1800 if (CVAL(b
, 0) == 0xFE) {
1802 if (CVAL(b
, 0) & 0x80) {
1803 b
[3] = CVAL(b
, 0) & ~0x80;
1808 num
= (UVAL(b
,0)<<8) + UVAL(b
,1) + *prev_ptr
;
1810 num
= UVAL(b
, 0) + *prev_ptr
;
1812 if (prev_ptr
== &prev_negative
)
1817 /* Read a line of up to bufsiz-1 characters into buf. Strips
1818 * the (required) trailing newline and all carriage returns.
1819 * Returns 1 for success; 0 for I/O error or truncation. */
1820 int read_line_old(int f
, char *buf
, size_t bufsiz
)
1822 bufsiz
--; /* leave room for the null */
1823 while (bufsiz
> 0) {
1825 read_buf(f
, buf
, 1);
1830 if (buf
[0] != '\r') {
1839 void io_printf(int fd
, const char *format
, ...)
1842 char buf
[BIGPATHBUFLEN
];
1845 va_start(ap
, format
);
1846 len
= vsnprintf(buf
, sizeof buf
, format
, ap
);
1850 exit_cleanup(RERR_STREAMIO
);
1852 if (len
> (int)sizeof buf
) {
1853 rprintf(FERROR
, "io_printf() was too long for the buffer.\n");
1854 exit_cleanup(RERR_STREAMIO
);
1857 write_sbuf(fd
, buf
);
1860 /** Setup for multiplexing a MSG_* stream with the data stream. */
1861 void io_start_multiplex_out(void)
1863 io_flush(NORMAL_FLUSH
);
1864 io_start_buffering_out(sock_f_out
);
1865 io_multiplexing_out
= 1;
1868 /** Setup for multiplexing a MSG_* stream with the data stream. */
1869 void io_start_multiplex_in(void)
1871 io_flush(NORMAL_FLUSH
);
1872 io_start_buffering_in(sock_f_in
);
1873 io_multiplexing_in
= 1;
1876 /** Write an message to the multiplexed data stream. */
1877 int io_multiplex_write(enum msgcode code
, const char *buf
, size_t len
, int convert
)
1879 if (!io_multiplexing_out
)
1881 io_flush(NORMAL_FLUSH
);
1882 stats
.total_written
+= (len
+4);
1883 mplex_write(sock_f_out
, code
, buf
, len
, convert
);
1887 void io_end_multiplex_in(void)
1889 io_multiplexing_in
= 0;
1890 io_end_buffering_in();
1893 /** Stop output multiplexing. */
1894 void io_end_multiplex_out(void)
1896 io_multiplexing_out
= 0;
1897 io_end_buffering_out();
1900 void start_write_batch(int fd
)
1902 /* Some communication has already taken place, but we don't
1903 * enable batch writing until here so that we can write a
1904 * canonical record of the communication even though the
1905 * actual communication so far depends on whether a daemon
1907 write_int(batch_fd
, protocol_version
);
1908 if (protocol_version
>= 30)
1909 write_byte(batch_fd
, inc_recurse
);
1910 write_int(batch_fd
, checksum_seed
);
1913 write_batch_monitor_out
= fd
;
1915 write_batch_monitor_in
= fd
;
1918 void stop_write_batch(void)
1920 write_batch_monitor_out
= -1;
1921 write_batch_monitor_in
= -1;