Stop apparent error from remote server when read-only client disconnects
[xapian.git] / xapian-core / backends / remote / remote-database.cc
blobc15e60238fd68740b2aa1fbd812a231916648ecc
1 /** @file remote-database.cc
2 * @brief Remote backend database class
3 */
4 /* Copyright (C) 2006,2007,2008,2009,2010,2015 Olly Betts
5 * Copyright (C) 2007,2009,2010 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation; either version 2 of the
10 * License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <config.h>
24 #include "remote-database.h"
26 #include "safeerrno.h"
27 #include <signal.h>
29 #include "autoptr.h"
30 #include "emptypostlist.h"
31 #include "inmemory_positionlist.h"
32 #include "net/length.h"
33 #include "net_postlist.h"
34 #include "net_termlist.h"
35 #include "remote-document.h"
36 #include "omassert.h"
37 #include "realtime.h"
38 #include "serialise.h"
39 #include "serialise-double.h"
40 #include "str.h"
41 #include "stringutils.h" // For STRINGIZE().
42 #include "weightinternal.h"
44 #include <string>
45 #include <vector>
47 #include "xapian/error.h"
48 #include "xapian/matchspy.h"
50 using namespace std;
52 XAPIAN_NORETURN(static void throw_connection_closed_unexpectedly());
53 static void
54 throw_connection_closed_unexpectedly()
56 throw Xapian::NetworkError("Connection closed unexpectedly");
59 RemoteDatabase::RemoteDatabase(int fd, double timeout_,
60 const string & context_, bool writable)
61 : link(fd, fd, context_),
62 context(context_),
63 cached_stats_valid(),
64 mru_valstats(),
65 mru_slot(Xapian::BAD_VALUENO),
66 timeout(timeout_)
68 #ifndef __WIN32__
69 // It's simplest to just ignore SIGPIPE. We'll still know if the
70 // connection dies because we'll get EPIPE back from write().
71 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
72 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
74 #endif
76 if (!writable) {
77 // Transactions only make sense when writing, so flag them as
78 // "unimplemented" so that our destructor doesn't call dtor_called()
79 // since that might try to call commit() which will cause a message to
80 // be sent to the remote server and probably an InvalidOperationError
81 // exception message to be returned.
82 transaction_state = TRANSACTION_UNIMPLEMENTED;
85 string message;
86 char type = get_message(message);
88 if (reply_type(type) != REPLY_GREETING || message.size() < 3) {
89 if (type == 'O' && message.size() == size_t('M') && message[0] == ' ') {
90 // The server reply used to start "OM ", which will now be
91 // interpreted as a type 'O' message of length size_t('M')
92 // with first character ' '.
93 throw Xapian::NetworkError("Server protocol version too old", context);
95 throw Xapian::NetworkError("Handshake failed - is this a Xapian server?", context);
98 const char *p = message.c_str();
99 const char *p_end = p + message.size();
101 // The protocol major versions must match. The protocol minor version of
102 // the server must be >= that of the client.
103 int protocol_major = static_cast<unsigned char>(*p++);
104 int protocol_minor = static_cast<unsigned char>(*p++);
105 if (protocol_major != XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION ||
106 protocol_minor < XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION) {
107 string errmsg("Server supports protocol version");
108 if (protocol_minor) {
109 errmsg += "s ";
110 errmsg += str(protocol_major);
111 errmsg += ".0 to ";
113 errmsg += str(protocol_major);
114 errmsg += '.';
115 errmsg += str(protocol_minor);
116 errmsg +=
117 " - client is using "
118 STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION)
120 STRINGIZE(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION);
121 throw Xapian::NetworkError(errmsg, context);
124 apply_stats_update(p, p_end);
126 if (writable) update_stats(MSG_WRITEACCESS);
129 RemoteDatabase *
130 RemoteDatabase::as_remotedatabase()
132 return this;
135 void
136 RemoteDatabase::keep_alive()
138 send_message(MSG_KEEPALIVE, string());
139 string message;
140 get_message(message, REPLY_DONE);
143 TermList *
144 RemoteDatabase::open_metadata_keylist(const std::string &prefix) const
146 // Ensure that total_length and doccount are up-to-date.
147 if (!cached_stats_valid) update_stats();
149 send_message(MSG_METADATAKEYLIST, prefix);
151 string message;
152 AutoPtr<NetworkTermList> tlist(
153 new NetworkTermList(0, doccount,
154 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
155 0));
156 vector<NetworkTermListItem> & items = tlist->items;
158 char type;
159 while ((type = get_message(message)) == REPLY_METADATAKEYLIST) {
160 NetworkTermListItem item;
161 item.tname = message;
162 items.push_back(item);
164 if (type != REPLY_DONE) {
165 throw Xapian::NetworkError("Bad message received", context);
168 tlist->current_position = tlist->items.begin();
169 return tlist.release();
172 TermList *
173 RemoteDatabase::open_term_list(Xapian::docid did) const
175 Assert(did);
177 // Ensure that total_length and doccount are up-to-date.
178 if (!cached_stats_valid) update_stats();
180 send_message(MSG_TERMLIST, encode_length(did));
182 string message;
183 get_message(message, REPLY_DOCLENGTH);
184 const char * p = message.c_str();
185 const char * p_end = p + message.size();
186 Xapian::termcount doclen;
187 decode_length(&p, p_end, doclen);
188 if (p != p_end) {
189 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
192 AutoPtr<NetworkTermList> tlist(
193 new NetworkTermList(doclen, doccount,
194 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
195 did));
196 vector<NetworkTermListItem> & items = tlist->items;
198 char type;
199 while ((type = get_message(message)) == REPLY_TERMLIST) {
200 NetworkTermListItem item;
201 p = message.data();
202 p_end = p + message.size();
203 decode_length(&p, p_end, item.wdf);
204 decode_length(&p, p_end, item.termfreq);
205 item.tname.assign(p, p_end);
206 items.push_back(item);
208 if (type != REPLY_DONE) {
209 throw Xapian::NetworkError("Bad message received", context);
212 tlist->current_position = tlist->items.begin();
213 return tlist.release();
216 TermList *
217 RemoteDatabase::open_allterms(const string & prefix) const {
218 // Ensure that total_length and doccount are up-to-date.
219 if (!cached_stats_valid) update_stats();
221 send_message(MSG_ALLTERMS, prefix);
223 AutoPtr<NetworkTermList> tlist(
224 new NetworkTermList(0, doccount,
225 Xapian::Internal::RefCntPtr<const RemoteDatabase>(this),
226 0));
227 vector<NetworkTermListItem> & items = tlist->items;
229 string message;
230 char type;
231 while ((type = get_message(message)) == REPLY_ALLTERMS) {
232 NetworkTermListItem item;
233 const char * p = message.data();
234 const char * p_end = p + message.size();
235 decode_length(&p, p_end, item.termfreq);
236 item.tname.assign(p, p_end);
237 items.push_back(item);
239 if (type != REPLY_DONE) {
240 throw Xapian::NetworkError("Bad message received", context);
243 tlist->current_position = tlist->items.begin();
244 return tlist.release();
247 LeafPostList *
248 RemoteDatabase::open_post_list(const string &term) const
250 return new NetworkPostList(Xapian::Internal::RefCntPtr<const RemoteDatabase>(this), term);
253 Xapian::doccount
254 RemoteDatabase::read_post_list(const string &term, NetworkPostList & pl) const
256 send_message(MSG_POSTLIST, term);
258 string message;
259 char type;
260 get_message(message, REPLY_POSTLISTSTART);
262 const char * p = message.data();
263 const char * p_end = p + message.size();
264 Xapian::doccount termfreq;
265 decode_length(&p, p_end, termfreq);
267 while ((type = get_message(message)) == REPLY_POSTLISTITEM) {
268 pl.append_posting(message);
270 if (type != REPLY_DONE) {
271 throw Xapian::NetworkError("Bad message received", context);
274 return termfreq;
277 PositionList *
278 RemoteDatabase::open_position_list(Xapian::docid did, const string &term) const
280 send_message(MSG_POSITIONLIST, encode_length(did) + term);
282 vector<Xapian::termpos> positions;
284 string message;
285 char type;
286 Xapian::termpos lastpos = static_cast<Xapian::termpos>(-1);
287 while ((type = get_message(message)) == REPLY_POSITIONLIST) {
288 const char * p = message.data();
289 const char * p_end = p + message.size();
290 Xapian::termpos inc;
291 decode_length(&p, p_end, inc);
292 lastpos += inc + 1;
293 positions.push_back(lastpos);
295 if (type != REPLY_DONE) {
296 throw Xapian::NetworkError("Bad message received", context);
299 return new InMemoryPositionList(positions);
302 bool
303 RemoteDatabase::has_positions() const
305 if (!cached_stats_valid) update_stats();
306 return has_positional_info;
309 void
310 RemoteDatabase::reopen()
312 update_stats(MSG_REOPEN);
313 mru_slot = Xapian::BAD_VALUENO;
316 void
317 RemoteDatabase::close()
319 do_close();
322 // Currently lazy is used when fetching documents from the MSet, and in three
323 // cases in multimatch.cc. One of the latter is when using a MatchDecider,
324 // which we don't support with the remote backend currently. The others are
325 // for the sort key and collapse key which in the remote cases are fetched
326 // during the remote match and passed across with the MSet. So we can safely
327 // ignore "lazy" here for now without any performance penalty during the match
328 // process.
329 Xapian::Document::Internal *
330 RemoteDatabase::open_document(Xapian::docid did, bool /*lazy*/) const
332 Assert(did);
334 send_message(MSG_DOCUMENT, encode_length(did));
335 string doc_data;
336 map<Xapian::valueno, string> values;
337 get_message(doc_data, REPLY_DOCDATA);
339 reply_type type;
340 string message;
341 while ((type = get_message(message)) == REPLY_VALUE) {
342 const char * p = message.data();
343 const char * p_end = p + message.size();
344 Xapian::valueno slot;
345 decode_length(&p, p_end, slot);
346 values.insert(make_pair(slot, string(p, p_end)));
348 if (type != REPLY_DONE) {
349 throw Xapian::NetworkError("Bad message received", context);
352 return new RemoteDocument(this, did, doc_data, values);
355 void
356 RemoteDatabase::update_stats(message_type msg_code) const
358 send_message(msg_code, string());
359 string message;
360 get_message(message, REPLY_UPDATE);
361 const char * p = message.c_str();
362 const char * p_end = p + message.size();
363 apply_stats_update(p, p_end);
366 void
367 RemoteDatabase::apply_stats_update(const char * p, const char * p_end) const
369 decode_length(&p, p_end, doccount);
370 decode_length(&p, p_end, lastdocid);
371 decode_length(&p, p_end, doclen_lbound);
372 decode_length(&p, p_end, doclen_ubound);
373 if (p == p_end) {
374 throw Xapian::NetworkError("Bad stats update message received", context);
376 has_positional_info = (*p++ == '1');
377 decode_length(&p, p_end, total_length);
378 uuid.assign(p, p_end);
379 cached_stats_valid = true;
382 Xapian::doccount
383 RemoteDatabase::get_doccount() const
385 if (!cached_stats_valid) update_stats();
386 return doccount;
389 Xapian::docid
390 RemoteDatabase::get_lastdocid() const
392 if (!cached_stats_valid) update_stats();
393 return lastdocid;
396 totlen_t
397 RemoteDatabase::get_total_length() const
399 if (!cached_stats_valid) update_stats();
400 return total_length;
403 Xapian::doclength
404 RemoteDatabase::get_avlength() const
406 if (!cached_stats_valid) update_stats();
407 if (rare(doccount == 0)) return 0;
408 return Xapian::doclength(total_length) / doccount;
411 bool
412 RemoteDatabase::term_exists(const string & tname) const
414 Assert(!tname.empty());
415 send_message(MSG_TERMEXISTS, tname);
416 string message;
417 reply_type type = get_message(message);
418 if (type != REPLY_TERMEXISTS && type != REPLY_TERMDOESNTEXIST) {
419 throw Xapian::NetworkError("Bad message received", context);
421 return (type == REPLY_TERMEXISTS);
424 Xapian::doccount
425 RemoteDatabase::get_termfreq(const string & tname) const
427 Assert(!tname.empty());
428 send_message(MSG_TERMFREQ, tname);
429 string message;
430 get_message(message, REPLY_TERMFREQ);
431 const char * p = message.data();
432 const char * p_end = p + message.size();
433 Xapian::doccount r;
434 decode_length(&p, p_end, r);
435 return r;
438 Xapian::termcount
439 RemoteDatabase::get_collection_freq(const string & tname) const
441 Assert(!tname.empty());
442 send_message(MSG_COLLFREQ, tname);
443 string message;
444 get_message(message, REPLY_COLLFREQ);
445 const char * p = message.data();
446 const char * p_end = p + message.size();
447 Xapian::termcount r;
448 decode_length(&p, p_end, r);
449 return r;
453 void
454 RemoteDatabase::read_value_stats(Xapian::valueno slot) const
456 if (mru_slot != slot) {
457 send_message(MSG_VALUESTATS, encode_length(slot));
458 string message;
459 get_message(message, REPLY_VALUESTATS);
460 const char * p = message.data();
461 const char * p_end = p + message.size();
462 mru_slot = slot;
463 decode_length(&p, p_end, mru_valstats.freq);
464 size_t len;
465 decode_length_and_check(&p, p_end, len);
466 mru_valstats.lower_bound.assign(p, len);
467 p += len;
468 decode_length_and_check(&p, p_end, len);
469 mru_valstats.upper_bound.assign(p, len);
470 p += len;
471 if (p != p_end) {
472 throw Xapian::NetworkError("Bad REPLY_VALUESTATS message received", context);
477 Xapian::doccount
478 RemoteDatabase::get_value_freq(Xapian::valueno slot) const
480 read_value_stats(slot);
481 return mru_valstats.freq;
484 std::string
485 RemoteDatabase::get_value_lower_bound(Xapian::valueno slot) const
487 read_value_stats(slot);
488 return mru_valstats.lower_bound;
491 std::string
492 RemoteDatabase::get_value_upper_bound(Xapian::valueno slot) const
494 read_value_stats(slot);
495 return mru_valstats.upper_bound;
498 Xapian::termcount
499 RemoteDatabase::get_doclength_lower_bound() const
501 return doclen_lbound;
504 Xapian::termcount
505 RemoteDatabase::get_doclength_upper_bound() const
507 return doclen_ubound;
510 Xapian::termcount
511 RemoteDatabase::get_wdf_upper_bound(const string &) const
513 // The default implementation returns get_collection_freq(), but we
514 // don't want the overhead of a remote message and reply per query
515 // term, and we can get called in the middle of a remote exchange
516 // too. FIXME: handle this bound in the stats local/remote code...
517 return doclen_ubound;
520 Xapian::termcount
521 RemoteDatabase::get_doclength(Xapian::docid did) const
523 Assert(did != 0);
524 send_message(MSG_DOCLENGTH, encode_length(did));
525 string message;
526 get_message(message, REPLY_DOCLENGTH);
527 const char * p = message.c_str();
528 const char * p_end = p + message.size();
529 Xapian::termcount doclen;
530 decode_length(&p, p_end, doclen);
531 if (p != p_end) {
532 throw Xapian::NetworkError("Bad REPLY_DOCLENGTH message received", context);
534 return doclen;
537 reply_type
538 RemoteDatabase::get_message(string &result, reply_type required_type) const
540 double end_time = RealTime::end_time(timeout);
541 int type_int = link.get_message(result, end_time);
542 if (type_int == EOF)
543 throw_connection_closed_unexpectedly();
544 reply_type type = static_cast<reply_type>(type_int);
545 if (type == REPLY_EXCEPTION) {
546 unserialise_error(result, "REMOTE:", context);
548 if (required_type != REPLY_MAX && type != required_type) {
549 string errmsg("Expecting reply type ");
550 errmsg += str(int(required_type));
551 errmsg += ", got ";
552 errmsg += str(int(type));
553 throw Xapian::NetworkError(errmsg);
556 return type;
559 void
560 RemoteDatabase::send_message(message_type type, const string &message) const
562 double end_time = RealTime::end_time(timeout);
563 link.send_message(static_cast<unsigned char>(type), message, end_time);
566 void
567 RemoteDatabase::do_close()
569 // In the constructor, we set transaction_state to
570 // TRANSACTION_UNIMPLEMENTED if we aren't writable so that we can check
571 // it here.
572 bool writable = (transaction_state != TRANSACTION_UNIMPLEMENTED);
574 // Only call dtor_called() if we're writable.
575 if (writable) dtor_called();
577 // If we're writable, wait for a confirmation of the close, so we know that
578 // changes have been written and flushed, and the database write lock
579 // released. For the non-writable case, there's no need to wait, so don't
580 // slow down searching by waiting here.
581 link.do_close(writable);
584 void
585 RemoteDatabase::set_query(const Xapian::Query::Internal *query,
586 Xapian::termcount qlen,
587 Xapian::doccount collapse_max,
588 Xapian::valueno collapse_key,
589 Xapian::Enquire::docid_order order,
590 Xapian::valueno sort_key,
591 Xapian::Enquire::Internal::sort_setting sort_by,
592 bool sort_value_forward,
593 int percent_cutoff, Xapian::weight weight_cutoff,
594 const Xapian::Weight *wtscheme,
595 const Xapian::RSet &omrset,
596 const vector<Xapian::MatchSpy *> & matchspies)
598 string tmp = query->serialise();
599 string message = encode_length(tmp.size());
600 message += tmp;
602 // Serialise assorted Enquire settings.
603 message += encode_length(qlen);
604 message += encode_length(collapse_max);
605 if (collapse_max) message += encode_length(collapse_key);
606 message += char('0' + order);
607 message += encode_length(sort_key);
608 message += char('0' + sort_by);
609 message += char('0' + sort_value_forward);
610 message += char(percent_cutoff);
611 message += serialise_double(weight_cutoff);
613 tmp = wtscheme->name();
614 message += encode_length(tmp.size());
615 message += tmp;
617 tmp = wtscheme->serialise();
618 message += encode_length(tmp.size());
619 message += tmp;
621 tmp = serialise_rset(omrset);
622 message += encode_length(tmp.size());
623 message += tmp;
625 vector<Xapian::MatchSpy *>::const_iterator i;
626 for (i = matchspies.begin(); i != matchspies.end(); ++i) {
627 tmp = (*i)->name();
628 if (tmp.empty()) {
629 throw Xapian::UnimplementedError("MatchSpy subclass not suitable for use with remote searches - name() method returned empty string");
631 message += encode_length(tmp.size());
632 message += tmp;
634 tmp = (*i)->serialise();
635 message += encode_length(tmp.size());
636 message += tmp;
639 send_message(MSG_QUERY_NEW, message);
642 bool
643 RemoteDatabase::get_remote_stats(bool nowait, Xapian::Weight::Internal &out)
645 if (nowait && !link.ready_to_read()) return false;
647 string message;
648 get_message(message, REPLY_STATS);
649 out = unserialise_stats(message);
651 return true;
654 void
655 RemoteDatabase::send_global_stats(Xapian::doccount first,
656 Xapian::doccount maxitems,
657 Xapian::doccount check_at_least,
658 const Xapian::Weight::Internal &stats)
660 string message = encode_length(first);
661 message += encode_length(maxitems);
662 message += encode_length(check_at_least);
663 message += serialise_stats(stats);
664 send_message(MSG_GETMSET, message);
667 void
668 RemoteDatabase::get_mset(Xapian::MSet &mset,
669 const vector<Xapian::MatchSpy *> & matchspies)
671 string message;
672 get_message(message, REPLY_RESULTS_NEW);
673 const char * p = message.data();
674 const char * p_end = p + message.size();
676 vector<Xapian::MatchSpy *>::const_iterator i;
677 for (i = matchspies.begin(); i != matchspies.end(); ++i) {
678 if (p == p_end)
679 throw Xapian::NetworkError("Expected serialised matchspy");
680 size_t len;
681 decode_length_and_check(&p, p_end, len);
682 string spyresults(p, len);
683 p += len;
684 (*i)->merge_results(spyresults);
686 mset = unserialise_mset_new(p, p_end);
689 void
690 RemoteDatabase::commit()
692 send_message(MSG_COMMIT, string());
694 // We need to wait for a response to ensure documents have been committed.
695 string message;
696 get_message(message, REPLY_DONE);
699 void
700 RemoteDatabase::cancel()
702 cached_stats_valid = false;
703 mru_slot = Xapian::BAD_VALUENO;
705 send_message(MSG_CANCEL, string());
708 Xapian::docid
709 RemoteDatabase::add_document(const Xapian::Document & doc)
711 cached_stats_valid = false;
712 mru_slot = Xapian::BAD_VALUENO;
714 send_message(MSG_ADDDOCUMENT, serialise_document(doc));
716 string message;
717 get_message(message, REPLY_ADDDOCUMENT);
719 const char * p = message.data();
720 const char * p_end = p + message.size();
721 Xapian::docid did;
722 decode_length(&p, p_end, did);
723 return did;
726 void
727 RemoteDatabase::delete_document(Xapian::docid did)
729 cached_stats_valid = false;
730 mru_slot = Xapian::BAD_VALUENO;
732 send_message(MSG_DELETEDOCUMENT, encode_length(did));
733 string dummy;
734 get_message(dummy, REPLY_DONE);
737 void
738 RemoteDatabase::delete_document(const std::string & unique_term)
740 cached_stats_valid = false;
741 mru_slot = Xapian::BAD_VALUENO;
743 send_message(MSG_DELETEDOCUMENTTERM, unique_term);
746 void
747 RemoteDatabase::replace_document(Xapian::docid did,
748 const Xapian::Document & doc)
750 cached_stats_valid = false;
751 mru_slot = Xapian::BAD_VALUENO;
753 string message = encode_length(did);
754 message += serialise_document(doc);
756 send_message(MSG_REPLACEDOCUMENT, message);
759 Xapian::docid
760 RemoteDatabase::replace_document(const std::string & unique_term,
761 const Xapian::Document & doc)
763 cached_stats_valid = false;
764 mru_slot = Xapian::BAD_VALUENO;
766 string message = encode_length(unique_term.size());
767 message += unique_term;
768 message += serialise_document(doc);
770 send_message(MSG_REPLACEDOCUMENTTERM, message);
772 get_message(message, REPLY_ADDDOCUMENT);
774 const char * p = message.data();
775 const char * p_end = p + message.size();
776 Xapian::docid did;
777 decode_length(&p, p_end, did);
778 return did;
781 string
782 RemoteDatabase::get_uuid() const
784 return uuid;
787 string
788 RemoteDatabase::get_metadata(const string & key) const
790 send_message(MSG_GETMETADATA, key);
791 string metadata;
792 get_message(metadata, REPLY_METADATA);
793 return metadata;
796 void
797 RemoteDatabase::set_metadata(const string & key, const string & value)
799 string data = encode_length(key.size());
800 data += key;
801 data += value;
802 send_message(MSG_SETMETADATA, data);
805 void
806 RemoteDatabase::add_spelling(const string & word,
807 Xapian::termcount freqinc) const
809 string data = encode_length(freqinc);
810 data += word;
811 send_message(MSG_ADDSPELLING, data);
814 void
815 RemoteDatabase::remove_spelling(const string & word,
816 Xapian::termcount freqdec) const
818 string data = encode_length(freqdec);
819 data += word;
820 send_message(MSG_REMOVESPELLING, data);