1 /** @file remote-database.cc
2 * @brief Remote backend database class
4 /* Copyright (C) 2006,2007,2008,2009,2010,2015 Olly Betts
5 * Copyright (C) 2007,2009,2010 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation; either version 2 of the
10 * License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 #include "remote-database.h"
26 #include "safeerrno.h"
30 #include "emptypostlist.h"
31 #include "inmemory_positionlist.h"
32 #include "net/length.h"
33 #include "net_postlist.h"
34 #include "net_termlist.h"
35 #include "remote-document.h"
38 #include "serialise.h"
39 #include "serialise-double.h"
41 #include "stringutils.h" // For STRINGIZE().
42 #include "weightinternal.h"
47 #include "xapian/error.h"
48 #include "xapian/matchspy.h"
52 XAPIAN_NORETURN(static void throw_connection_closed_unexpectedly());
54 throw_connection_closed_unexpectedly()
56 throw Xapian::NetworkError("Connection closed unexpectedly");
59 RemoteDatabase::RemoteDatabase(int fd
, double timeout_
,
60 const string
& context_
, bool writable
)
61 : link(fd
, fd
, context_
),
65 mru_slot(Xapian::BAD_VALUENO
),
69 // It's simplest to just ignore SIGPIPE. We'll still know if the
70 // connection dies because we'll get EPIPE back from write().
71 if (signal(SIGPIPE
, SIG_IGN
) == SIG_ERR
) {
72 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno
);
77 // Transactions only make sense when writing, so flag them as
78 // "unimplemented" so that our destructor doesn't call dtor_called()
79 // since that might try to call commit() which will cause a message to
80 // be sent to the remote server and probably an InvalidOperationError
81 // exception message to be returned.
82 transaction_state
= TRANSACTION_UNIMPLEMENTED
;
86 char type
= get_message(message
);
88 if (reply_type(type
) != REPLY_GREETING
|| message
.size() < 3) {
89 if (type
== 'O' && message
.size() == size_t('M') && message
[0] == ' ') {
90 // The server reply used to start "OM ", which will now be
91 // interpreted as a type 'O' message of length size_t('M')
92 // with first character ' '.
93 throw Xapian::NetworkError("Server protocol version too old", context
);
95 throw Xapian::NetworkError("Handshake failed - is this a Xapian server?", context
);
98 const char *p
= message
.c_str();
99 const char *p_end
= p
+ message
.size();
101 // The protocol major versions must match. The protocol minor version of
102 // the server must be >= that of the client.
103 int protocol_major
= static_cast<unsigned char>(*p
++);
104 int protocol_minor
= static_cast<unsigned char>(*p
++);
105 if (protocol_major
!= XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION
||
106 protocol_minor
< XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION
) {
107 string
errmsg("Server supports protocol version");
108 if (protocol_minor
) {
110 errmsg
+= str(protocol_major
);
113 errmsg
+= str(protocol_major
);
115 errmsg
+= str(protocol_minor
);
117 " - client is using "
118 STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION
)
120 STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION
);
121 throw Xapian::NetworkError(errmsg
, context
);
124 apply_stats_update(p
, p_end
);
126 if (writable
) update_stats(MSG_WRITEACCESS
);
130 RemoteDatabase::as_remotedatabase()
136 RemoteDatabase::keep_alive()
138 send_message(MSG_KEEPALIVE
, string());
140 get_message(message
, REPLY_DONE
);
144 RemoteDatabase::open_metadata_keylist(const std::string
&prefix
) const
146 // Ensure that total_length and doccount are up-to-date.
147 if (!cached_stats_valid
) update_stats();
149 send_message(MSG_METADATAKEYLIST
, prefix
);
152 AutoPtr
<NetworkTermList
> tlist(
153 new NetworkTermList(0, doccount
,
154 Xapian::Internal::RefCntPtr
<const RemoteDatabase
>(this),
156 vector
<NetworkTermListItem
> & items
= tlist
->items
;
159 while ((type
= get_message(message
)) == REPLY_METADATAKEYLIST
) {
160 NetworkTermListItem item
;
161 item
.tname
= message
;
162 items
.push_back(item
);
164 if (type
!= REPLY_DONE
) {
165 throw Xapian::NetworkError("Bad message received", context
);
168 tlist
->current_position
= tlist
->items
.begin();
169 return tlist
.release();
173 RemoteDatabase::open_term_list(Xapian::docid did
) const
177 // Ensure that total_length and doccount are up-to-date.
178 if (!cached_stats_valid
) update_stats();
180 send_message(MSG_TERMLIST
, encode_length(did
));
183 get_message(message
, REPLY_DOCLENGTH
);
184 const char * p
= message
.c_str();
185 const char * p_end
= p
+ message
.size();
186 Xapian::termcount doclen
;
187 decode_length(&p
, p_end
, doclen
);
189 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context
);
192 AutoPtr
<NetworkTermList
> tlist(
193 new NetworkTermList(doclen
, doccount
,
194 Xapian::Internal::RefCntPtr
<const RemoteDatabase
>(this),
196 vector
<NetworkTermListItem
> & items
= tlist
->items
;
199 while ((type
= get_message(message
)) == REPLY_TERMLIST
) {
200 NetworkTermListItem item
;
202 p_end
= p
+ message
.size();
203 decode_length(&p
, p_end
, item
.wdf
);
204 decode_length(&p
, p_end
, item
.termfreq
);
205 item
.tname
.assign(p
, p_end
);
206 items
.push_back(item
);
208 if (type
!= REPLY_DONE
) {
209 throw Xapian::NetworkError("Bad message received", context
);
212 tlist
->current_position
= tlist
->items
.begin();
213 return tlist
.release();
217 RemoteDatabase::open_allterms(const string
& prefix
) const {
218 // Ensure that total_length and doccount are up-to-date.
219 if (!cached_stats_valid
) update_stats();
221 send_message(MSG_ALLTERMS
, prefix
);
223 AutoPtr
<NetworkTermList
> tlist(
224 new NetworkTermList(0, doccount
,
225 Xapian::Internal::RefCntPtr
<const RemoteDatabase
>(this),
227 vector
<NetworkTermListItem
> & items
= tlist
->items
;
231 while ((type
= get_message(message
)) == REPLY_ALLTERMS
) {
232 NetworkTermListItem item
;
233 const char * p
= message
.data();
234 const char * p_end
= p
+ message
.size();
235 decode_length(&p
, p_end
, item
.termfreq
);
236 item
.tname
.assign(p
, p_end
);
237 items
.push_back(item
);
239 if (type
!= REPLY_DONE
) {
240 throw Xapian::NetworkError("Bad message received", context
);
243 tlist
->current_position
= tlist
->items
.begin();
244 return tlist
.release();
248 RemoteDatabase::open_post_list(const string
&term
) const
250 return new NetworkPostList(Xapian::Internal::RefCntPtr
<const RemoteDatabase
>(this), term
);
254 RemoteDatabase::read_post_list(const string
&term
, NetworkPostList
& pl
) const
256 send_message(MSG_POSTLIST
, term
);
260 get_message(message
, REPLY_POSTLISTSTART
);
262 const char * p
= message
.data();
263 const char * p_end
= p
+ message
.size();
264 Xapian::doccount termfreq
;
265 decode_length(&p
, p_end
, termfreq
);
267 while ((type
= get_message(message
)) == REPLY_POSTLISTITEM
) {
268 pl
.append_posting(message
);
270 if (type
!= REPLY_DONE
) {
271 throw Xapian::NetworkError("Bad message received", context
);
278 RemoteDatabase::open_position_list(Xapian::docid did
, const string
&term
) const
280 send_message(MSG_POSITIONLIST
, encode_length(did
) + term
);
282 vector
<Xapian::termpos
> positions
;
286 Xapian::termpos lastpos
= static_cast<Xapian::termpos
>(-1);
287 while ((type
= get_message(message
)) == REPLY_POSITIONLIST
) {
288 const char * p
= message
.data();
289 const char * p_end
= p
+ message
.size();
291 decode_length(&p
, p_end
, inc
);
293 positions
.push_back(lastpos
);
295 if (type
!= REPLY_DONE
) {
296 throw Xapian::NetworkError("Bad message received", context
);
299 return new InMemoryPositionList(positions
);
303 RemoteDatabase::has_positions() const
305 if (!cached_stats_valid
) update_stats();
306 return has_positional_info
;
310 RemoteDatabase::reopen()
312 update_stats(MSG_REOPEN
);
313 mru_slot
= Xapian::BAD_VALUENO
;
317 RemoteDatabase::close()
322 // Currently lazy is used when fetching documents from the MSet, and in three
323 // cases in multimatch.cc. One of the latter is when using a MatchDecider,
324 // which we don't support with the remote backend currently. The others are
325 // for the sort key and collapse key which in the remote cases are fetched
326 // during the remote match and passed across with the MSet. So we can safely
327 // ignore "lazy" here for now without any performance penalty during the match
329 Xapian::Document::Internal
*
330 RemoteDatabase::open_document(Xapian::docid did
, bool /*lazy*/) const
334 send_message(MSG_DOCUMENT
, encode_length(did
));
336 map
<Xapian::valueno
, string
> values
;
337 get_message(doc_data
, REPLY_DOCDATA
);
341 while ((type
= get_message(message
)) == REPLY_VALUE
) {
342 const char * p
= message
.data();
343 const char * p_end
= p
+ message
.size();
344 Xapian::valueno slot
;
345 decode_length(&p
, p_end
, slot
);
346 values
.insert(make_pair(slot
, string(p
, p_end
)));
348 if (type
!= REPLY_DONE
) {
349 throw Xapian::NetworkError("Bad message received", context
);
352 return new RemoteDocument(this, did
, doc_data
, values
);
356 RemoteDatabase::update_stats(message_type msg_code
) const
358 send_message(msg_code
, string());
360 get_message(message
, REPLY_UPDATE
);
361 const char * p
= message
.c_str();
362 const char * p_end
= p
+ message
.size();
363 apply_stats_update(p
, p_end
);
367 RemoteDatabase::apply_stats_update(const char * p
, const char * p_end
) const
369 decode_length(&p
, p_end
, doccount
);
370 decode_length(&p
, p_end
, lastdocid
);
371 decode_length(&p
, p_end
, doclen_lbound
);
372 decode_length(&p
, p_end
, doclen_ubound
);
374 throw Xapian::NetworkError("Bad stats update message received", context
);
376 has_positional_info
= (*p
++ == '1');
377 decode_length(&p
, p_end
, total_length
);
378 uuid
.assign(p
, p_end
);
379 cached_stats_valid
= true;
383 RemoteDatabase::get_doccount() const
385 if (!cached_stats_valid
) update_stats();
390 RemoteDatabase::get_lastdocid() const
392 if (!cached_stats_valid
) update_stats();
397 RemoteDatabase::get_total_length() const
399 if (!cached_stats_valid
) update_stats();
404 RemoteDatabase::get_avlength() const
406 if (!cached_stats_valid
) update_stats();
407 if (rare(doccount
== 0)) return 0;
408 return Xapian::doclength(total_length
) / doccount
;
412 RemoteDatabase::term_exists(const string
& tname
) const
414 Assert(!tname
.empty());
415 send_message(MSG_TERMEXISTS
, tname
);
417 reply_type type
= get_message(message
);
418 if (type
!= REPLY_TERMEXISTS
&& type
!= REPLY_TERMDOESNTEXIST
) {
419 throw Xapian::NetworkError("Bad message received", context
);
421 return (type
== REPLY_TERMEXISTS
);
425 RemoteDatabase::get_termfreq(const string
& tname
) const
427 Assert(!tname
.empty());
428 send_message(MSG_TERMFREQ
, tname
);
430 get_message(message
, REPLY_TERMFREQ
);
431 const char * p
= message
.data();
432 const char * p_end
= p
+ message
.size();
434 decode_length(&p
, p_end
, r
);
439 RemoteDatabase::get_collection_freq(const string
& tname
) const
441 Assert(!tname
.empty());
442 send_message(MSG_COLLFREQ
, tname
);
444 get_message(message
, REPLY_COLLFREQ
);
445 const char * p
= message
.data();
446 const char * p_end
= p
+ message
.size();
448 decode_length(&p
, p_end
, r
);
454 RemoteDatabase::read_value_stats(Xapian::valueno slot
) const
456 if (mru_slot
!= slot
) {
457 send_message(MSG_VALUESTATS
, encode_length(slot
));
459 get_message(message
, REPLY_VALUESTATS
);
460 const char * p
= message
.data();
461 const char * p_end
= p
+ message
.size();
463 decode_length(&p
, p_end
, mru_valstats
.freq
);
465 decode_length_and_check(&p
, p_end
, len
);
466 mru_valstats
.lower_bound
.assign(p
, len
);
468 decode_length_and_check(&p
, p_end
, len
);
469 mru_valstats
.upper_bound
.assign(p
, len
);
472 throw Xapian::NetworkError("Bad REPLY_VALUESTATS message received", context
);
478 RemoteDatabase::get_value_freq(Xapian::valueno slot
) const
480 read_value_stats(slot
);
481 return mru_valstats
.freq
;
485 RemoteDatabase::get_value_lower_bound(Xapian::valueno slot
) const
487 read_value_stats(slot
);
488 return mru_valstats
.lower_bound
;
492 RemoteDatabase::get_value_upper_bound(Xapian::valueno slot
) const
494 read_value_stats(slot
);
495 return mru_valstats
.upper_bound
;
499 RemoteDatabase::get_doclength_lower_bound() const
501 return doclen_lbound
;
505 RemoteDatabase::get_doclength_upper_bound() const
507 return doclen_ubound
;
511 RemoteDatabase::get_wdf_upper_bound(const string
&) const
513 // The default implementation returns get_collection_freq(), but we
514 // don't want the overhead of a remote message and reply per query
515 // term, and we can get called in the middle of a remote exchange
516 // too. FIXME: handle this bound in the stats local/remote code...
517 return doclen_ubound
;
521 RemoteDatabase::get_doclength(Xapian::docid did
) const
524 send_message(MSG_DOCLENGTH
, encode_length(did
));
526 get_message(message
, REPLY_DOCLENGTH
);
527 const char * p
= message
.c_str();
528 const char * p_end
= p
+ message
.size();
529 Xapian::termcount doclen
;
530 decode_length(&p
, p_end
, doclen
);
532 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context
);
538 RemoteDatabase::get_message(string
&result
, reply_type required_type
) const
540 double end_time
= RealTime::end_time(timeout
);
541 int type_int
= link
.get_message(result
, end_time
);
543 throw_connection_closed_unexpectedly();
544 reply_type type
= static_cast<reply_type
>(type_int
);
545 if (type
== REPLY_EXCEPTION
) {
546 unserialise_error(result
, "REMOTE:", context
);
548 if (required_type
!= REPLY_MAX
&& type
!= required_type
) {
549 string
errmsg("Expecting reply type ");
550 errmsg
+= str(int(required_type
));
552 errmsg
+= str(int(type
));
553 throw Xapian::NetworkError(errmsg
);
560 RemoteDatabase::send_message(message_type type
, const string
&message
) const
562 double end_time
= RealTime::end_time(timeout
);
563 link
.send_message(static_cast<unsigned char>(type
), message
, end_time
);
567 RemoteDatabase::do_close()
569 // In the constructor, we set transaction_state to
570 // TRANSACTION_UNIMPLEMENTED if we aren't writable so that we can check
572 bool writable
= (transaction_state
!= TRANSACTION_UNIMPLEMENTED
);
574 // Only call dtor_called() if we're writable.
575 if (writable
) dtor_called();
577 // If we're writable, wait for a confirmation of the close, so we know that
578 // changes have been written and flushed, and the database write lock
579 // released. For the non-writable case, there's no need to wait, so don't
580 // slow down searching by waiting here.
581 link
.do_close(writable
);
585 RemoteDatabase::set_query(const Xapian::Query::Internal
*query
,
586 Xapian::termcount qlen
,
587 Xapian::doccount collapse_max
,
588 Xapian::valueno collapse_key
,
589 Xapian::Enquire::docid_order order
,
590 Xapian::valueno sort_key
,
591 Xapian::Enquire::Internal::sort_setting sort_by
,
592 bool sort_value_forward
,
593 int percent_cutoff
, Xapian::weight weight_cutoff
,
594 const Xapian::Weight
*wtscheme
,
595 const Xapian::RSet
&omrset
,
596 const vector
<Xapian::MatchSpy
*> & matchspies
)
598 string tmp
= query
->serialise();
599 string message
= encode_length(tmp
.size());
602 // Serialise assorted Enquire settings.
603 message
+= encode_length(qlen
);
604 message
+= encode_length(collapse_max
);
605 if (collapse_max
) message
+= encode_length(collapse_key
);
606 message
+= char('0' + order
);
607 message
+= encode_length(sort_key
);
608 message
+= char('0' + sort_by
);
609 message
+= char('0' + sort_value_forward
);
610 message
+= char(percent_cutoff
);
611 message
+= serialise_double(weight_cutoff
);
613 tmp
= wtscheme
->name();
614 message
+= encode_length(tmp
.size());
617 tmp
= wtscheme
->serialise();
618 message
+= encode_length(tmp
.size());
621 tmp
= serialise_rset(omrset
);
622 message
+= encode_length(tmp
.size());
625 vector
<Xapian::MatchSpy
*>::const_iterator i
;
626 for (i
= matchspies
.begin(); i
!= matchspies
.end(); ++i
) {
629 throw Xapian::UnimplementedError("MatchSpy subclass not suitable for use with remote searches - name() method returned empty string");
631 message
+= encode_length(tmp
.size());
634 tmp
= (*i
)->serialise();
635 message
+= encode_length(tmp
.size());
639 send_message(MSG_QUERY_NEW
, message
);
643 RemoteDatabase::get_remote_stats(bool nowait
, Xapian::Weight::Internal
&out
)
645 if (nowait
&& !link
.ready_to_read()) return false;
648 get_message(message
, REPLY_STATS
);
649 out
= unserialise_stats(message
);
655 RemoteDatabase::send_global_stats(Xapian::doccount first
,
656 Xapian::doccount maxitems
,
657 Xapian::doccount check_at_least
,
658 const Xapian::Weight::Internal
&stats
)
660 string message
= encode_length(first
);
661 message
+= encode_length(maxitems
);
662 message
+= encode_length(check_at_least
);
663 message
+= serialise_stats(stats
);
664 send_message(MSG_GETMSET
, message
);
668 RemoteDatabase::get_mset(Xapian::MSet
&mset
,
669 const vector
<Xapian::MatchSpy
*> & matchspies
)
672 get_message(message
, REPLY_RESULTS_NEW
);
673 const char * p
= message
.data();
674 const char * p_end
= p
+ message
.size();
676 vector
<Xapian::MatchSpy
*>::const_iterator i
;
677 for (i
= matchspies
.begin(); i
!= matchspies
.end(); ++i
) {
679 throw Xapian::NetworkError("Expected serialised matchspy");
681 decode_length_and_check(&p
, p_end
, len
);
682 string
spyresults(p
, len
);
684 (*i
)->merge_results(spyresults
);
686 mset
= unserialise_mset_new(p
, p_end
);
690 RemoteDatabase::commit()
692 send_message(MSG_COMMIT
, string());
694 // We need to wait for a response to ensure documents have been committed.
696 get_message(message
, REPLY_DONE
);
700 RemoteDatabase::cancel()
702 cached_stats_valid
= false;
703 mru_slot
= Xapian::BAD_VALUENO
;
705 send_message(MSG_CANCEL
, string());
709 RemoteDatabase::add_document(const Xapian::Document
& doc
)
711 cached_stats_valid
= false;
712 mru_slot
= Xapian::BAD_VALUENO
;
714 send_message(MSG_ADDDOCUMENT
, serialise_document(doc
));
717 get_message(message
, REPLY_ADDDOCUMENT
);
719 const char * p
= message
.data();
720 const char * p_end
= p
+ message
.size();
722 decode_length(&p
, p_end
, did
);
727 RemoteDatabase::delete_document(Xapian::docid did
)
729 cached_stats_valid
= false;
730 mru_slot
= Xapian::BAD_VALUENO
;
732 send_message(MSG_DELETEDOCUMENT
, encode_length(did
));
734 get_message(dummy
, REPLY_DONE
);
738 RemoteDatabase::delete_document(const std::string
& unique_term
)
740 cached_stats_valid
= false;
741 mru_slot
= Xapian::BAD_VALUENO
;
743 send_message(MSG_DELETEDOCUMENTTERM
, unique_term
);
747 RemoteDatabase::replace_document(Xapian::docid did
,
748 const Xapian::Document
& doc
)
750 cached_stats_valid
= false;
751 mru_slot
= Xapian::BAD_VALUENO
;
753 string message
= encode_length(did
);
754 message
+= serialise_document(doc
);
756 send_message(MSG_REPLACEDOCUMENT
, message
);
760 RemoteDatabase::replace_document(const std::string
& unique_term
,
761 const Xapian::Document
& doc
)
763 cached_stats_valid
= false;
764 mru_slot
= Xapian::BAD_VALUENO
;
766 string message
= encode_length(unique_term
.size());
767 message
+= unique_term
;
768 message
+= serialise_document(doc
);
770 send_message(MSG_REPLACEDOCUMENTTERM
, message
);
772 get_message(message
, REPLY_ADDDOCUMENT
);
774 const char * p
= message
.data();
775 const char * p_end
= p
+ message
.size();
777 decode_length(&p
, p_end
, did
);
782 RemoteDatabase::get_uuid() const
788 RemoteDatabase::get_metadata(const string
& key
) const
790 send_message(MSG_GETMETADATA
, key
);
792 get_message(metadata
, REPLY_METADATA
);
797 RemoteDatabase::set_metadata(const string
& key
, const string
& value
)
799 string data
= encode_length(key
.size());
802 send_message(MSG_SETMETADATA
, data
);
806 RemoteDatabase::add_spelling(const string
& word
,
807 Xapian::termcount freqinc
) const
809 string data
= encode_length(freqinc
);
811 send_message(MSG_ADDSPELLING
, data
);
815 RemoteDatabase::remove_spelling(const string
& word
,
816 Xapian::termcount freqdec
) const
818 string data
= encode_length(freqdec
);
820 send_message(MSG_REMOVESPELLING
, data
);