2 * @brief RemoteConnection class used by the remote backend.
4 /* Copyright (C) 2006-2024 Olly Betts
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "remoteconnection.h"
25 #include <xapian/error.h>
27 #include "safefcntl.h"
28 #include "safeunistd.h"
33 # include "safesysselect.h"
42 # include <type_traits>
47 #include "filetests.h"
51 #include "posixy_wrapper.h"
53 #include "socket_utils.h"
57 #define CHUNKSIZE 4096
61 throw_database_closed()
63 throw Xapian::DatabaseClosedError("Database has been closed");
68 throw_network_error_insane_message_length()
70 throw Xapian::NetworkError("Insane message length specified!");
75 throw_timeout(const char* msg
, const string
& context
)
77 throw Xapian::NetworkTimeoutError(msg
, context
);
82 update_overlapped_offset(WSAOVERLAPPED
& overlapped
, DWORD n
)
84 if (add_overflows(overlapped
.Offset
, n
, overlapped
.Offset
))
85 ++overlapped
.OffsetHigh
;
89 RemoteConnection::RemoteConnection(int fdin_
, int fdout_
,
90 const string
& context_
)
91 : fdin(fdin_
), fdout(fdout_
), context(context_
)
94 memset(&overlapped
, 0, sizeof(overlapped
));
95 overlapped
.hEvent
= CreateEvent(NULL
, FALSE
, FALSE
, NULL
);
96 if (!overlapped
.hEvent
)
97 throw Xapian::NetworkError("Failed to setup OVERLAPPED",
98 context
, -int(GetLastError()));
100 #elif defined USE_SO_NOSIGPIPE
101 // SO_NOSIGPIPE is a non-standardised socket option supported by a number
102 // of platforms - at least DragonFlyBSD, FreeBSD, macOS (not older
103 // versions, e.g. 10.15 apparently lacks it), Solaris; notably not
104 // supported by Linux or OpenBSD though.
106 // We use it where supported due to one big advantage over POSIX's
107 // MSG_NOSIGNAL which is that we can just set it once for a socket whereas
108 // with MSG_NOSIGNAL we need to call send(..., MSG_NOSIGNAL) instead of
109 // write(...), but send() only works on sockets, so with MSG_NOSIGNAL any
110 // code which might be working with files or pipes as well as sockets needs
111 // conditional handling depending on whether the fd is a socket or not.
113 // SO_NOSIGPIPE is present on NetBSD, but it seems when using it we still
114 // get SIGPIPE (reproduced on NetBSD 9.3 and 10.0) so we avoid using it
117 if (setsockopt(fdout
, SOL_SOCKET
, SO_NOSIGPIPE
,
118 reinterpret_cast<char*>(&on
), sizeof(on
)) < 0) {
119 // Some platforms (including FreeBSD, macOS, DragonflyBSD) seem to
120 // fail with EBADF instead of ENOTSOCK when passed a non-socket so
121 // allow either. If the descriptor is actually not valid we'll report
122 // it the next time we try to use it (as we would when not trying to
123 // use SO_NOSIGPIPE so this actually gives a more consistent error
125 if (errno
!= ENOTSOCK
&& errno
!= EBADF
) {
126 throw Xapian::NetworkError("Couldn't set SO_NOSIGPIPE on socket",
130 #elif defined USE_MSG_NOSIGNAL
131 // We can use send(..., MSG_NOSIGNAL) to avoid generating SIGPIPE
132 // (MSG_NOSIGNAL was added in POSIX.1-2008). This seems to be pretty much
133 // universally supported by current Unix-like platforms, but older macOS
134 // and Solaris apparently didn't have it.
136 // If fdout is not a socket, we'll set send_flags = 0 when the first send()
137 // fails with ENOTSOCK and use write() instead from then on.
139 // It's simplest to just ignore SIGPIPE. Not ideal, but it seems only old
140 // versions of macOS and of Solaris will end up here so let's not bother
141 // trying to do any clever trickery.
142 if (signal(SIGPIPE
, SIG_IGN
) == SIG_ERR
) {
143 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno
);
149 RemoteConnection::~RemoteConnection()
151 if (overlapped
.hEvent
)
152 CloseHandle(overlapped
.hEvent
);
157 RemoteConnection::read_at_least(size_t min_len
, double end_time
)
159 LOGCALL(REMOTE
, bool, "RemoteConnection::read_at_least", min_len
| end_time
);
161 if (buffer
.length() >= min_len
) RETURN(true);
164 HANDLE hin
= fd_to_handle(fdin
);
168 BOOL ok
= ReadFile(hin
, buf
, sizeof(buf
), &received
, &overlapped
);
170 int errcode
= GetLastError();
171 if (errcode
!= ERROR_IO_PENDING
)
172 throw Xapian::NetworkError("read failed", context
, -errcode
);
173 // Is asynch - just wait for the data to be received or a timeout.
175 waitrc
= WaitForSingleObject(overlapped
.hEvent
, calc_read_wait_msecs(end_time
));
176 if (waitrc
!= WAIT_OBJECT_0
) {
177 LOGLINE(REMOTE
, "read: timeout has expired");
178 throw_timeout("Timeout expired while trying to read", context
);
180 // Get the final result of the read.
181 if (!GetOverlappedResult(hin
, &overlapped
, &received
, FALSE
))
182 throw Xapian::NetworkError("Failed to get overlapped result",
183 context
, -int(GetLastError()));
190 buffer
.append(buf
, received
);
192 // We must update the offset in the OVERLAPPED structure manually.
193 update_overlapped_offset(overlapped
, received
);
194 } while (buffer
.length() < min_len
);
196 // If there's no end_time, just use blocking I/O.
197 if (fcntl(fdin
, F_SETFL
, (end_time
!= 0.0) ? O_NONBLOCK
: 0) < 0) {
198 throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
204 ssize_t received
= read(fdin
, buf
, sizeof(buf
));
207 buffer
.append(buf
, received
);
208 if (buffer
.length() >= min_len
) RETURN(true);
216 LOGLINE(REMOTE
, "read gave errno = " << errno
);
217 if (errno
== EINTR
) continue;
220 throw Xapian::NetworkError("read failed", context
, errno
);
222 Assert(end_time
!= 0.0);
224 // Calculate how far in the future end_time is.
225 double now
= RealTime::now();
226 double time_diff
= end_time
- now
;
227 // Check if the timeout has expired.
229 LOGLINE(REMOTE
, "read: timeout has expired");
230 throw_timeout("Timeout expired while trying to read", context
);
233 // Wait until there is data, an error, or the timeout is reached.
238 int poll_result
= poll(&fds
, 1, int(time_diff
* 1000));
239 if (poll_result
> 0) break;
241 if (poll_result
== 0)
242 throw_timeout("Timeout expired while trying to read", context
);
244 // EINTR means poll was interrupted by a signal. EAGAIN means that
245 // allocation of internal data structures failed.
246 if (errno
!= EINTR
&& errno
!= EAGAIN
)
247 throw Xapian::NetworkError("poll failed during read",
250 if (fdin
>= FD_SETSIZE
) {
251 // We can't block with a timeout, so just sleep and retry.
252 RealTime::sleep(now
+ min(0.001, time_diff
/ 4));
257 FD_SET(fdin
, &fdset
);
260 RealTime::to_timeval(time_diff
, &tv
);
261 int select_result
= select(fdin
+ 1, &fdset
, 0, 0, &tv
);
262 if (select_result
> 0) break;
264 if (select_result
== 0)
265 throw_timeout("Timeout expired while trying to read", context
);
267 // EINTR means select was interrupted by a signal. The Linux
268 // select(2) man page says: "Portable programs may wish to check
269 // for EAGAIN and loop, just as with EINTR" and that seems to be
270 // necessary for cygwin at least.
271 if (errno
!= EINTR
&& errno
!= EAGAIN
)
272 throw Xapian::NetworkError("select failed during read",
283 RemoteConnection::send_or_write(const void* p
, size_t len
)
285 # ifdef USE_MSG_NOSIGNAL
287 ssize_t n
= send(fdout
, p
, len
, send_flags
);
288 if (usual(n
>= 0 || errno
!= ENOTSOCK
)) return n
;
289 // In some testcases in the testsuite and in xapian-progsrv (in some
290 // cases) fdout won't be a socket. Clear send_flags so we only try
291 // send() once in this case.
295 return write(fdout
, p
, len
);
300 RemoteConnection::send_message(char type
, string_view message
, double end_time
)
302 LOGCALL_VOID(REMOTE
, "RemoteConnection::send_message", type
| message
| end_time
);
304 throw_database_closed();
308 pack_uint(header
, message
.size());
309 string_view header_view
= header
;
312 HANDLE hout
= fd_to_handle(fdout
);
313 const string_view
* str
= &header_view
;
318 BOOL ok
= WriteFile(hout
, str
->data() + count
, str
->size() - count
, &n
, &overlapped
);
320 int errcode
= GetLastError();
321 if (errcode
!= ERROR_IO_PENDING
)
322 throw Xapian::NetworkError("write failed", context
, -errcode
);
323 // Just wait for the data to be sent, or a timeout.
325 waitrc
= WaitForSingleObject(overlapped
.hEvent
, calc_read_wait_msecs(end_time
));
326 if (waitrc
!= WAIT_OBJECT_0
) {
327 LOGLINE(REMOTE
, "write: timeout has expired");
328 throw_timeout("Timeout expired while trying to write", context
);
330 // Get the final result.
331 if (!GetOverlappedResult(hout
, &overlapped
, &n
, FALSE
))
332 throw Xapian::NetworkError("Failed to get overlapped result",
333 context
, -int(GetLastError()));
338 // We must update the offset in the OVERLAPPED structure manually.
339 update_overlapped_offset(overlapped
, n
);
341 if (count
== str
->size()) {
342 if (str
== &message
|| message
.empty()) return;
348 // If there's no end_time, just use blocking I/O.
349 if (fcntl(fdout
, F_SETFL
, (end_time
!= 0.0) ? O_NONBLOCK
: 0) < 0) {
350 throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
354 const string_view
* str
= &header_view
;
358 // We've set write to non-blocking, so just try writing as there
359 // will usually be space.
360 ssize_t n
= send_or_write(str
->data() + count
, str
->size() - count
);
364 if (count
== str
->size()) {
365 if (str
== &message
|| message
.empty()) return;
372 LOGLINE(REMOTE
, "write gave errno = " << errno
);
373 if (errno
== EINTR
) continue;
376 throw Xapian::NetworkError("write failed", context
, errno
);
378 double now
= RealTime::now();
379 double time_diff
= end_time
- now
;
381 LOGLINE(REMOTE
, "write: timeout has expired");
382 throw_timeout("Timeout expired while trying to write", context
);
385 // Wait until there is space or the timeout is reached.
389 fds
.events
= POLLOUT
;
390 int result
= poll(&fds
, 1, int(time_diff
* 1000));
391 # define POLLSELECT "poll"
393 if (fdout
>= FD_SETSIZE
) {
394 // We can't block with a timeout, so just sleep and retry.
395 RealTime::sleep(now
+ min(0.001, time_diff
/ 4));
401 FD_SET(fdout
, &fdset
);
404 RealTime::to_timeval(time_diff
, &tv
);
405 int result
= select(fdout
+ 1, 0, &fdset
, 0, &tv
);
406 # define POLLSELECT "select"
410 if (errno
== EINTR
|| errno
== EAGAIN
) {
411 // EINTR/EAGAIN means select was interrupted by a signal.
412 // We could just retry the poll/select, but it's easier to just
416 throw Xapian::NetworkError(POLLSELECT
" failed during write",
422 throw_timeout("Timeout expired while trying to write", context
);
428 RemoteConnection::send_file(char type
, int fd
, double end_time
)
430 LOGCALL_VOID(REMOTE
, "RemoteConnection::send_file", type
| fd
| end_time
);
432 throw_database_closed();
434 auto size
= file_size(fd
);
436 throw Xapian::NetworkError("Couldn't stat file to send", errno
);
437 // FIXME: Use sendfile() or similar if available?
444 pack_uint(enc_size
, size
);
445 c
+= enc_size
.size();
446 // An encoded length should be just a few bytes.
447 AssertRel(c
, <=, sizeof(buf
));
448 memcpy(buf
+ 1, enc_size
.data(), enc_size
.size());
452 HANDLE hout
= fd_to_handle(fdout
);
456 BOOL ok
= WriteFile(hout
, buf
+ count
, c
- count
, &n
, &overlapped
);
458 int errcode
= GetLastError();
459 if (errcode
!= ERROR_IO_PENDING
)
460 throw Xapian::NetworkError("write failed", context
, -errcode
);
461 // Just wait for the data to be sent, or a timeout.
463 waitrc
= WaitForSingleObject(overlapped
.hEvent
, calc_read_wait_msecs(end_time
));
464 if (waitrc
!= WAIT_OBJECT_0
) {
465 LOGLINE(REMOTE
, "write: timeout has expired");
466 throw_timeout("Timeout expired while trying to write", context
);
468 // Get the final result.
469 if (!GetOverlappedResult(hout
, &overlapped
, &n
, FALSE
))
470 throw Xapian::NetworkError("Failed to get overlapped result",
471 context
, -int(GetLastError()));
476 // We must update the offset in the OVERLAPPED structure manually.
477 update_overlapped_offset(overlapped
, n
);
480 if (size
== 0) return;
484 res
= read(fd
, buf
, sizeof(buf
));
485 } while (res
< 0 && errno
== EINTR
);
486 if (res
< 0) throw Xapian::NetworkError("read failed", errno
);
494 // If there's no end_time, just use blocking I/O.
495 if (fcntl(fdout
, F_SETFL
, (end_time
!= 0.0) ? O_NONBLOCK
: 0) < 0) {
496 throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
502 // We've set write to non-blocking, so just try writing as there
503 // will usually be space.
504 ssize_t n
= send_or_write(buf
+ count
, c
- count
);
509 if (size
== 0) return;
513 res
= read(fd
, buf
, sizeof(buf
));
514 } while (res
< 0 && errno
== EINTR
);
515 if (res
< 0) throw Xapian::NetworkError("read failed", errno
);
524 LOGLINE(REMOTE
, "write gave errno = " << errno
);
525 if (errno
== EINTR
) continue;
528 throw Xapian::NetworkError("write failed", context
, errno
);
530 double now
= RealTime::now();
531 double time_diff
= end_time
- now
;
533 LOGLINE(REMOTE
, "write: timeout has expired");
534 throw_timeout("Timeout expired while trying to write", context
);
537 // Wait until there is space or the timeout is reached.
541 fds
.events
= POLLOUT
;
542 int result
= poll(&fds
, 1, int(time_diff
* 1000));
543 # define POLLSELECT "poll"
545 if (fdout
>= FD_SETSIZE
) {
546 // We can't block with a timeout, so just sleep and retry.
547 RealTime::sleep(now
+ min(0.001, time_diff
/ 4));
553 FD_SET(fdout
, &fdset
);
556 RealTime::to_timeval(time_diff
, &tv
);
557 int result
= select(fdout
+ 1, 0, &fdset
, 0, &tv
);
558 # define POLLSELECT "select"
562 if (errno
== EINTR
|| errno
== EAGAIN
) {
563 // EINTR/EAGAIN means select was interrupted by a signal.
564 // We could just retry the poll/select, but it's easier to just
568 throw Xapian::NetworkError(POLLSELECT
" failed during write",
574 throw_timeout("Timeout expired while trying to write", context
);
580 RemoteConnection::sniff_next_message_type(double end_time
)
582 LOGCALL(REMOTE
, int, "RemoteConnection::sniff_next_message_type", end_time
);
584 throw_database_closed();
586 if (!read_at_least(1, end_time
))
588 unsigned char type
= buffer
[0];
593 RemoteConnection::get_message(string
&result
, double end_time
)
595 LOGCALL(REMOTE
, int, "RemoteConnection::get_message", result
| end_time
);
597 throw_database_closed();
599 if (!read_at_least(2, end_time
))
601 // This code assume things about the pack_uint() encoding in order to
602 // handle partial reads.
603 size_t len
= static_cast<unsigned char>(buffer
[1]);
605 if (!read_at_least(len
+ 2, end_time
))
607 result
.assign(buffer
.data() + 2, len
);
608 unsigned char type
= buffer
[0];
609 buffer
.erase(0, len
+ 2);
613 // We know the message payload is at least 128 bytes of data, and if we
614 // read that much we'll definitely have the whole of the length.
615 if (!read_at_least(128 + 2, end_time
))
617 const char* p
= buffer
.data();
618 const char* p_end
= p
+ buffer
.size();
620 if (!unpack_uint(&p
, p_end
, &len
)) {
623 size_t header_len
= (p
- buffer
.data());
624 if (!read_at_least(header_len
+ len
, end_time
))
626 result
.assign(buffer
.data() + header_len
, len
);
627 unsigned char type
= buffer
[0];
628 buffer
.erase(0, header_len
+ len
);
633 RemoteConnection::get_message_chunked(double end_time
)
635 LOGCALL(REMOTE
, int, "RemoteConnection::get_message_chunked", end_time
);
638 throw_database_closed();
640 if (!read_at_least(2, end_time
))
642 // This code assume things about the pack_uint() encoding in order to
643 // handle partial reads.
644 uint_least64_t len
= static_cast<unsigned char>(buffer
[1]);
646 chunked_data_left
= off_t(len
);
647 char type
= buffer
[0];
652 // We know the message payload is at least 128 bytes of data, and if we
653 // read that much we'll definitely have the whole of the length.
654 if (!read_at_least(128 + 2, end_time
))
656 const char* p
= buffer
.data();
657 const char* p_end
= p
+ buffer
.size();
659 if (!unpack_uint(&p
, p_end
, &len
)) {
662 chunked_data_left
= off_t(len
);
663 // Check that the value of len fits in an off_t without loss.
664 if (rare(uint_least64_t(chunked_data_left
) != len
)) {
665 throw_network_error_insane_message_length();
667 size_t header_len
= (p
- buffer
.data());
668 unsigned char type
= buffer
[0];
669 buffer
.erase(0, header_len
);
674 RemoteConnection::get_message_chunk(string
&result
, size_t at_least
,
677 LOGCALL(REMOTE
, int, "RemoteConnection::get_message_chunk", result
| at_least
| end_time
);
679 throw_database_closed();
681 if (at_least
<= result
.size()) RETURN(true);
682 at_least
-= result
.size();
684 bool read_enough
= (off_t(at_least
) <= chunked_data_left
);
685 if (!read_enough
) at_least
= size_t(chunked_data_left
);
687 if (!read_at_least(at_least
, end_time
))
690 size_t retlen
= min(off_t(buffer
.size()), chunked_data_left
);
691 result
.append(buffer
, 0, retlen
);
692 buffer
.erase(0, retlen
);
693 chunked_data_left
-= retlen
;
695 RETURN(int(read_enough
));
698 /** Write n bytes from block pointed to by p to file descriptor fd. */
700 write_all(int fd
, const char * p
, size_t n
)
703 ssize_t c
= write(fd
, p
, n
);
705 if (errno
== EINTR
) continue;
706 throw Xapian::NetworkError("Error writing to file", errno
);
714 RemoteConnection::receive_file(const string
&file
, double end_time
)
716 LOGCALL(REMOTE
, int, "RemoteConnection::receive_file", file
| end_time
);
718 throw_database_closed();
720 // FIXME: Do we want to be able to delete the file during writing?
721 FD
fd(posixy_open(file
.c_str(), O_WRONLY
|O_CREAT
|O_TRUNC
|O_CLOEXEC
, 0666));
723 throw Xapian::NetworkError("Couldn't open file for writing: " + file
, errno
);
725 int type
= get_message_chunked(end_time
);
727 off_t min_read
= min(chunked_data_left
, off_t(CHUNKSIZE
));
728 if (!read_at_least(min_read
, end_time
))
730 write_all(fd
, buffer
.data(), min_read
);
731 chunked_data_left
-= min_read
;
732 buffer
.erase(0, min_read
);
733 } while (chunked_data_left
);
738 RemoteConnection::shutdown()
740 LOGCALL_VOID(REMOTE
, "RemoteConnection::shutdown", NO_ARGS
);
742 if (fdin
< 0) return;
744 // We can be called from a destructor, so we can't throw an exception.
746 send_message(MSG_SHUTDOWN
, {}, 0.0);
748 HANDLE hin
= fd_to_handle(fdin
);
751 BOOL ok
= ReadFile(hin
, &dummy
, 1, &received
, &overlapped
);
752 if (!ok
&& GetLastError() == ERROR_IO_PENDING
) {
753 // Wait for asynchronous read to complete.
754 (void)WaitForSingleObject(overlapped
.hEvent
, INFINITE
);
757 // Wait for the connection to be closed - when this happens
758 // poll()/select() will report that a read won't block.
765 res
= poll(&fds
, 1, -1);
766 } while (res
< 0 && (errno
== EINTR
|| errno
== EAGAIN
));
768 if (fdin
< FD_SETSIZE
) {
771 FD_SET(fdin
, &fdset
);
774 res
= select(fdin
+ 1, &fdset
, 0, 0, NULL
);
775 } while (res
< 0 && (errno
== EINTR
|| errno
== EAGAIN
));
784 RemoteConnection::do_close()
786 LOGCALL_VOID(REMOTE
, "RemoteConnection::do_close", NO_ARGS
);
789 close_fd_or_socket(fdin
);
791 // If the same fd is used in both directions, don't close it twice.
792 if (fdin
== fdout
) fdout
= -1;
798 close_fd_or_socket(fdout
);
805 RemoteConnection::calc_read_wait_msecs(double end_time
)
810 // Calculate how far in the future end_time is.
811 double time_diff
= end_time
- RealTime::now();
813 // DWORD is unsigned, so we mustn't try and return a negative value.
814 if (time_diff
< 0.0) {
815 throw_timeout("Timeout expired before starting read", context
);
817 return static_cast<DWORD
>(time_diff
* 1000.0);