2 * @brief Xapian remote backend server base class
4 /* Copyright (C) 2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2019 Olly Betts
5 * Copyright (C) 2006,2007,2009,2010 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "remoteserver.h"
25 #include "xapian/constants.h"
26 #include "xapian/database.h"
27 #include "xapian/enquire.h"
28 #include "xapian/error.h"
29 #include "xapian/matchspy.h"
30 #include "xapian/query.h"
31 #include "xapian/valueiterator.h"
39 #include "matcher/multimatch.h"
43 #include "serialise.h"
44 #include "serialise-double.h"
45 #include "serialise-error.h"
47 #include "stringutils.h"
48 #include "weight/weightinternal.h"
50 XAPIAN_NORETURN(static void throw_read_only());
54 throw Xapian::InvalidOperationError("Server is read-only");
57 /// Class to throw when we receive the connection closing message.
58 struct ConnectionClosed
{ };
60 RemoteServer::RemoteServer(const std::vector
<std::string
> &dbpaths
,
61 int fdin_
, int fdout_
,
62 double active_timeout_
, double idle_timeout_
,
64 : RemoteConnection(fdin_
, fdout_
, std::string()),
65 db(NULL
), wdb(NULL
), writable(writable_
),
66 active_timeout(active_timeout_
), idle_timeout(idle_timeout_
)
68 // Catch errors opening the database and propagate them to the client.
70 Assert(!dbpaths
.empty());
71 // We always open the database read-only to start with. If we're
72 // writable, the client can ask to be upgraded to write access once
73 // connected if it wants it.
74 db
= new Xapian::Database(dbpaths
[0]);
75 // Build a better description than Database::get_description() gives
76 // in the variable context. FIXME: improve Database::get_description()
77 // and then just use that instead.
81 vector
<std::string
>::const_iterator
i(dbpaths
.begin());
82 for (++i
; i
!= dbpaths
.end(); ++i
) {
83 db
->add_database(Xapian::Database(*i
));
88 AssertEq(dbpaths
.size(), 1); // Expecting exactly one database.
90 } catch (const Xapian::Error
&err
) {
91 // Propagate the exception to the client.
92 send_message(REPLY_EXCEPTION
, serialise_error(err
));
93 // And rethrow it so our caller can log it and close the connection.
98 // It's simplest to just ignore SIGPIPE. We'll still know if the
99 // connection dies because we'll get EPIPE back from write().
100 if (signal(SIGPIPE
, SIG_IGN
) == SIG_ERR
)
101 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno
);
104 // Send greeting message.
105 msg_update(string());
108 RemoteServer::~RemoteServer()
111 // wdb is either NULL or equal to db, so we shouldn't delete it too!
115 RemoteServer::get_message(double timeout
, string
& result
,
116 message_type required_type
)
118 double end_time
= RealTime::end_time(timeout
);
119 int type
= RemoteConnection::get_message(result
, end_time
);
121 // Handle "shutdown connection" message here. Treat EOF here for a read-only
122 // database the same way since a read-only client just closes the
123 // connection when done.
124 if (type
== MSG_SHUTDOWN
|| (type
< 0 && wdb
== NULL
))
125 throw ConnectionClosed();
127 throw Xapian::NetworkError("Connection closed unexpectedly");
128 if (type
>= MSG_MAX
) {
129 string
errmsg("Invalid message type ");
131 throw Xapian::NetworkError(errmsg
);
133 if (required_type
!= MSG_MAX
&& type
!= int(required_type
)) {
134 string
errmsg("Expecting message type ");
135 errmsg
+= str(int(required_type
));
138 throw Xapian::NetworkError(errmsg
);
140 return static_cast<message_type
>(type
);
144 RemoteServer::send_message(reply_type type
, const string
&message
)
146 double end_time
= RealTime::end_time(active_timeout
);
147 unsigned char type_as_char
= static_cast<unsigned char>(type
);
148 RemoteConnection::send_message(type_as_char
, message
, end_time
);
151 typedef void (RemoteServer::* dispatch_func
)(const string
&);
158 /* This list needs to be kept in the same order as the list of
159 * message types in "remoteprotocol.h". Note that messages at the
160 * end of the list in "remoteprotocol.h" can be omitted if they
161 * don't correspond to dispatch actions.
163 static const dispatch_func dispatch
[] = {
164 &RemoteServer::msg_allterms
,
165 &RemoteServer::msg_collfreq
,
166 &RemoteServer::msg_document
,
167 &RemoteServer::msg_termexists
,
168 &RemoteServer::msg_termfreq
,
169 &RemoteServer::msg_valuestats
,
170 &RemoteServer::msg_keepalive
,
171 &RemoteServer::msg_doclength
,
172 &RemoteServer::msg_query
,
173 &RemoteServer::msg_termlist
,
174 &RemoteServer::msg_positionlist
,
175 &RemoteServer::msg_postlist
,
176 &RemoteServer::msg_reopen
,
177 &RemoteServer::msg_update
,
178 &RemoteServer::msg_adddocument
,
179 &RemoteServer::msg_cancel_
,
180 &RemoteServer::msg_deletedocumentterm_
,
181 &RemoteServer::msg_commit
,
182 &RemoteServer::msg_replacedocument_
,
183 &RemoteServer::msg_replacedocumentterm
,
184 &RemoteServer::msg_deletedocument
,
185 &RemoteServer::msg_writeaccess
,
186 &RemoteServer::msg_getmetadata
,
187 &RemoteServer::msg_setmetadata_
,
188 &RemoteServer::msg_addspelling_
,
189 &RemoteServer::msg_removespelling
,
190 0, // MSG_GETMSET - used during a conversation.
191 0, // MSG_SHUTDOWN - handled by get_message().
192 &RemoteServer::msg_openmetadatakeylist
,
193 &RemoteServer::msg_freqs
,
194 &RemoteServer::msg_uniqueterms
,
195 &RemoteServer::msg_deletedocumentterm
,
196 &RemoteServer::msg_replacedocument
,
197 &RemoteServer::msg_cancel
,
198 &RemoteServer::msg_setmetadata
,
199 &RemoteServer::msg_addspelling
,
203 size_t type
= get_message(idle_timeout
, message
);
204 if (type
>= sizeof(dispatch
) / sizeof(dispatch
[0]) || !dispatch
[type
]) {
205 string
errmsg("Unexpected message type ");
207 throw Xapian::InvalidArgumentError(errmsg
);
209 (this->*(dispatch
[type
]))(message
);
210 } catch (const Xapian::NetworkTimeoutError
& e
) {
212 // We've had a timeout, so the client may not be listening, so
213 // set the end_time to 1 and if we can't send the message right
214 // away, just exit and the client will cope.
215 send_message(REPLY_EXCEPTION
, serialise_error(e
), 1.0);
218 // And rethrow it so our caller can log it and close the
221 } catch (const Xapian::NetworkError
&) {
222 // All other network errors mean we are fatally confused and are
223 // unlikely to be able to communicate further across this
224 // connection. So we don't try to propagate the error to the
225 // client, but instead just rethrow the exception so our caller can
226 // log it and close the connection.
228 } catch (const Xapian::Error
&e
) {
229 // Propagate the exception to the client, then return to the main
230 // message handling loop.
231 send_message(REPLY_EXCEPTION
, serialise_error(e
));
232 } catch (ConnectionClosed
&) {
235 // Propagate an unknown exception to the client.
236 send_message(REPLY_EXCEPTION
, string());
237 // And rethrow it so our caller can log it and close the
245 RemoteServer::msg_allterms(const string
&message
)
247 string prev
= message
;
250 const string
& prefix
= message
;
251 const Xapian::TermIterator end
= db
->allterms_end(prefix
);
252 for (Xapian::TermIterator t
= db
->allterms_begin(prefix
); t
!= end
; ++t
) {
253 if (rare(prev
.size() > 255))
255 const string
& v
= *t
;
256 size_t reuse
= common_prefix_length(prev
, v
);
257 reply
= encode_length(t
.get_termfreq());
258 reply
.append(1, char(reuse
));
259 reply
.append(v
, reuse
, string::npos
);
260 send_message(REPLY_ALLTERMS
, reply
);
264 send_message(REPLY_DONE
, string());
268 RemoteServer::msg_termlist(const string
&message
)
270 const char *p
= message
.data();
271 const char *p_end
= p
+ message
.size();
273 decode_length(&p
, p_end
, did
);
275 send_message(REPLY_DOCLENGTH
, encode_length(db
->get_doclength(did
)));
277 const Xapian::TermIterator end
= db
->termlist_end(did
);
278 for (Xapian::TermIterator t
= db
->termlist_begin(did
); t
!= end
; ++t
) {
279 if (rare(prev
.size() > 255))
281 const string
& v
= *t
;
282 size_t reuse
= common_prefix_length(prev
, v
);
283 string reply
= encode_length(t
.get_wdf());
284 reply
+= encode_length(t
.get_termfreq());
285 reply
.append(1, char(reuse
));
286 reply
.append(v
, reuse
, string::npos
);
287 send_message(REPLY_TERMLIST
, reply
);
291 send_message(REPLY_DONE
, string());
295 RemoteServer::msg_positionlist(const string
&message
)
297 const char *p
= message
.data();
298 const char *p_end
= p
+ message
.size();
300 decode_length(&p
, p_end
, did
);
301 string
term(p
, p_end
- p
);
303 Xapian::termpos lastpos
= static_cast<Xapian::termpos
>(-1);
304 const Xapian::PositionIterator end
= db
->positionlist_end(did
, term
);
305 for (Xapian::PositionIterator i
= db
->positionlist_begin(did
, term
);
307 Xapian::termpos pos
= *i
;
308 send_message(REPLY_POSITIONLIST
, encode_length(pos
- lastpos
- 1));
312 send_message(REPLY_DONE
, string());
316 RemoteServer::msg_postlist(const string
&message
)
318 const string
& term
= message
;
320 Xapian::doccount termfreq
= db
->get_termfreq(term
);
321 Xapian::termcount collfreq
= db
->get_collection_freq(term
);
322 send_message(REPLY_POSTLISTSTART
, encode_length(termfreq
) + encode_length(collfreq
));
324 Xapian::docid lastdocid
= 0;
325 const Xapian::PostingIterator end
= db
->postlist_end(term
);
326 for (Xapian::PostingIterator i
= db
->postlist_begin(term
);
329 Xapian::docid newdocid
= *i
;
330 string reply
= encode_length(newdocid
- lastdocid
- 1);
331 reply
+= encode_length(i
.get_wdf());
333 send_message(REPLY_POSTLISTITEM
, reply
);
334 lastdocid
= newdocid
;
337 send_message(REPLY_DONE
, string());
341 RemoteServer::msg_writeaccess(const string
& msg
)
346 int flags
= Xapian::DB_OPEN
;
347 const char *p
= msg
.c_str();
348 const char *p_end
= p
+ msg
.size();
351 decode_length(&p
, p_end
, flag_bits
);
352 flags
|= flag_bits
&~ Xapian::DB_ACTION_MASK_
;
354 throw Xapian::NetworkError("Junk at end of MSG_WRITEACCESS");
358 wdb
= new Xapian::WritableDatabase(context
, flags
);
365 RemoteServer::msg_reopen(const string
& msg
)
368 send_message(REPLY_DONE
, string());
375 RemoteServer::msg_update(const string
&)
377 static const char protocol
[2] = {
378 char(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION
),
379 char(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION
)
381 string
message(protocol
, 2);
382 Xapian::doccount num_docs
= db
->get_doccount();
383 message
+= encode_length(num_docs
);
384 message
+= encode_length(db
->get_lastdocid() - num_docs
);
385 Xapian::termcount doclen_lb
= db
->get_doclength_lower_bound();
386 message
+= encode_length(doclen_lb
);
387 message
+= encode_length(db
->get_doclength_upper_bound() - doclen_lb
);
388 message
+= (db
->has_positions() ? '1' : '0');
389 message
+= encode_length(db
->get_total_length());
390 string uuid
= db
->get_uuid();
392 send_message(REPLY_UPDATE
, message
);
396 RemoteServer::msg_query(const string
&message_in
)
398 const char *p
= message_in
.c_str();
399 const char *p_end
= p
+ message_in
.size();
401 // Unserialise the Query.
403 decode_length_and_check(&p
, p_end
, len
);
404 Xapian::Query
query(Xapian::Query::unserialise(string(p
, len
), reg
));
407 // Unserialise assorted Enquire settings.
408 Xapian::termcount qlen
;
409 decode_length(&p
, p_end
, qlen
);
411 Xapian::valueno collapse_max
;
412 decode_length(&p
, p_end
, collapse_max
);
414 Xapian::valueno collapse_key
= Xapian::BAD_VALUENO
;
416 decode_length(&p
, p_end
, collapse_key
);
418 if (p_end
- p
< 4 || *p
< '0' || *p
> '2') {
419 throw Xapian::NetworkError("bad message (docid_order)");
421 Xapian::Enquire::docid_order order
;
422 order
= static_cast<Xapian::Enquire::docid_order
>(*p
++ - '0');
424 Xapian::valueno sort_key
;
425 decode_length(&p
, p_end
, sort_key
);
427 if (*p
< '0' || *p
> '3') {
428 throw Xapian::NetworkError("bad message (sort_by)");
430 Xapian::Enquire::Internal::sort_setting sort_by
;
431 sort_by
= static_cast<Xapian::Enquire::Internal::sort_setting
>(*p
++ - '0');
433 if (*p
< '0' || *p
> '1') {
434 throw Xapian::NetworkError("bad message (sort_value_forward)");
436 bool sort_value_forward(*p
++ != '0');
438 double time_limit
= unserialise_double(&p
, p_end
);
440 int percent_cutoff
= *p
++;
441 if (percent_cutoff
< 0 || percent_cutoff
> 100) {
442 throw Xapian::NetworkError("bad message (percent_cutoff)");
445 double weight_cutoff
= unserialise_double(&p
, p_end
);
446 if (weight_cutoff
< 0) {
447 throw Xapian::NetworkError("bad message (weight_cutoff)");
450 // Unserialise the Weight object.
451 decode_length_and_check(&p
, p_end
, len
);
452 string
wtname(p
, len
);
455 const Xapian::Weight
* wttype
= reg
.get_weighting_scheme(wtname
);
456 if (wttype
== NULL
) {
457 // Note: user weighting schemes should be registered by adding them to
458 // a Registry, and setting the context using
459 // RemoteServer::set_registry().
460 throw Xapian::InvalidArgumentError("Weighting scheme " +
461 wtname
+ " not registered");
464 decode_length_and_check(&p
, p_end
, len
);
465 AutoPtr
<Xapian::Weight
> wt(wttype
->unserialise(string(p
, len
)));
468 // Unserialise the RSet object.
469 decode_length_and_check(&p
, p_end
, len
);
470 Xapian::RSet rset
= unserialise_rset(string(p
, len
));
473 // Unserialise any MatchSpy objects.
474 vector
<Xapian::Internal::opt_intrusive_ptr
<Xapian::MatchSpy
>> matchspies
;
476 decode_length_and_check(&p
, p_end
, len
);
477 string
spytype(p
, len
);
478 const Xapian::MatchSpy
* spyclass
= reg
.get_match_spy(spytype
);
479 if (spyclass
== NULL
) {
480 throw Xapian::InvalidArgumentError("Match spy " + spytype
+
485 decode_length_and_check(&p
, p_end
, len
);
486 matchspies
.push_back(spyclass
->unserialise(string(p
, len
), reg
)->release());
490 Xapian::Weight::Internal local_stats
;
491 MultiMatch
match(*db
, query
, qlen
, &rset
, collapse_max
, collapse_key
,
492 percent_cutoff
, weight_cutoff
, order
,
493 sort_key
, sort_by
, sort_value_forward
, time_limit
,
494 local_stats
, wt
.get(), matchspies
, false, false);
496 send_message(REPLY_STATS
, serialise_stats(local_stats
));
499 get_message(active_timeout
, message
, MSG_GETMSET
);
501 p_end
= p
+ message
.size();
503 Xapian::termcount first
;
504 decode_length(&p
, p_end
, first
);
505 Xapian::termcount maxitems
;
506 decode_length(&p
, p_end
, maxitems
);
508 Xapian::termcount check_at_least
;
509 decode_length(&p
, p_end
, check_at_least
);
511 AutoPtr
<Xapian::Weight::Internal
> total_stats(new Xapian::Weight::Internal
);
512 unserialise_stats(p
, p_end
, *(total_stats
.get()));
513 total_stats
->set_bounds_from_db(*db
);
516 match
.get_mset(first
, maxitems
, check_at_least
, mset
, *(total_stats
.get()), 0, 0);
517 mset
.internal
->stats
= total_stats
.release();
520 for (auto i
: matchspies
) {
521 string spy_results
= i
->serialise_results();
522 message
+= encode_length(spy_results
.size());
523 message
+= spy_results
;
525 message
+= serialise_mset(mset
);
526 send_message(REPLY_RESULTS
, message
);
530 RemoteServer::msg_document(const string
&message
)
532 const char *p
= message
.data();
533 const char *p_end
= p
+ message
.size();
535 decode_length(&p
, p_end
, did
);
537 Xapian::Document doc
= db
->get_document(did
);
539 send_message(REPLY_DOCDATA
, doc
.get_data());
541 Xapian::ValueIterator i
;
542 for (i
= doc
.values_begin(); i
!= doc
.values_end(); ++i
) {
543 string item
= encode_length(i
.get_valueno());
545 send_message(REPLY_VALUE
, item
);
547 send_message(REPLY_DONE
, string());
551 RemoteServer::msg_keepalive(const string
&)
553 // Ensure *our* database stays alive, as it may contain remote databases!
555 send_message(REPLY_DONE
, string());
559 RemoteServer::msg_termexists(const string
&term
)
561 send_message((db
->term_exists(term
) ? REPLY_TERMEXISTS
: REPLY_TERMDOESNTEXIST
), string());
565 RemoteServer::msg_collfreq(const string
&term
)
567 send_message(REPLY_COLLFREQ
, encode_length(db
->get_collection_freq(term
)));
571 RemoteServer::msg_termfreq(const string
&term
)
573 send_message(REPLY_TERMFREQ
, encode_length(db
->get_termfreq(term
)));
577 RemoteServer::msg_freqs(const string
&term
)
579 string msg
= encode_length(db
->get_termfreq(term
));
580 msg
+= encode_length(db
->get_collection_freq(term
));
581 send_message(REPLY_FREQS
, msg
);
585 RemoteServer::msg_valuestats(const string
& message
)
587 const char *p
= message
.data();
588 const char *p_end
= p
+ message
.size();
590 Xapian::valueno slot
;
591 decode_length(&p
, p_end
, slot
);
593 message_out
+= encode_length(db
->get_value_freq(slot
));
594 string bound
= db
->get_value_lower_bound(slot
);
595 message_out
+= encode_length(bound
.size());
596 message_out
+= bound
;
597 bound
= db
->get_value_upper_bound(slot
);
598 message_out
+= encode_length(bound
.size());
599 message_out
+= bound
;
601 send_message(REPLY_VALUESTATS
, message_out
);
606 RemoteServer::msg_doclength(const string
&message
)
608 const char *p
= message
.data();
609 const char *p_end
= p
+ message
.size();
611 decode_length(&p
, p_end
, did
);
612 send_message(REPLY_DOCLENGTH
, encode_length(db
->get_doclength(did
)));
616 RemoteServer::msg_uniqueterms(const string
&message
)
618 const char *p
= message
.data();
619 const char *p_end
= p
+ message
.size();
621 decode_length(&p
, p_end
, did
);
622 send_message(REPLY_UNIQUETERMS
, encode_length(db
->get_unique_terms(did
)));
626 RemoteServer::msg_commit(const string
&)
633 send_message(REPLY_DONE
, string());
637 RemoteServer::msg_cancel(const string
&message
)
639 msg_cancel_(message
);
640 send_message(REPLY_DONE
, string());
644 RemoteServer::msg_cancel_(const string
&)
649 // We can't call cancel since that's an internal method, but this
650 // has the same effect with minimal additional overhead.
651 wdb
->begin_transaction(false);
652 wdb
->cancel_transaction();
656 RemoteServer::msg_adddocument(const string
& message
)
661 Xapian::docid did
= wdb
->add_document(unserialise_document(message
));
663 send_message(REPLY_ADDDOCUMENT
, encode_length(did
));
667 RemoteServer::msg_deletedocument(const string
& message
)
672 const char *p
= message
.data();
673 const char *p_end
= p
+ message
.size();
675 decode_length(&p
, p_end
, did
);
677 wdb
->delete_document(did
);
679 send_message(REPLY_DONE
, string());
683 RemoteServer::msg_deletedocumentterm(const string
& message
)
685 msg_deletedocumentterm_(message
);
686 send_message(REPLY_DONE
, string());
690 RemoteServer::msg_deletedocumentterm_(const string
& message
)
695 wdb
->delete_document(message
);
699 RemoteServer::msg_replacedocument(const string
& message
)
701 msg_replacedocument_(message
);
702 send_message(REPLY_DONE
, string());
706 RemoteServer::msg_replacedocument_(const string
& message
)
711 const char *p
= message
.data();
712 const char *p_end
= p
+ message
.size();
714 decode_length(&p
, p_end
, did
);
716 wdb
->replace_document(did
, unserialise_document(string(p
, p_end
)));
720 RemoteServer::msg_replacedocumentterm(const string
& message
)
725 const char *p
= message
.data();
726 const char *p_end
= p
+ message
.size();
728 decode_length_and_check(&p
, p_end
, len
);
729 string
unique_term(p
, len
);
732 Xapian::docid did
= wdb
->replace_document(unique_term
, unserialise_document(string(p
, p_end
)));
734 send_message(REPLY_ADDDOCUMENT
, encode_length(did
));
738 RemoteServer::msg_getmetadata(const string
& message
)
740 send_message(REPLY_METADATA
, db
->get_metadata(message
));
744 RemoteServer::msg_openmetadatakeylist(const string
& message
)
746 string prev
= message
;
749 const string
& prefix
= message
;
750 const Xapian::TermIterator end
= db
->metadata_keys_end(prefix
);
751 Xapian::TermIterator t
= db
->metadata_keys_begin(prefix
);
752 for (; t
!= end
; ++t
) {
753 if (rare(prev
.size() > 255))
755 const string
& v
= *t
;
756 size_t reuse
= common_prefix_length(prev
, v
);
757 reply
.assign(1, char(reuse
));
758 reply
.append(v
, reuse
, string::npos
);
759 send_message(REPLY_METADATAKEYLIST
, reply
);
762 send_message(REPLY_DONE
, string());
766 RemoteServer::msg_setmetadata(const string
& message
)
768 msg_setmetadata_(message
);
769 send_message(REPLY_DONE
, string());
773 RemoteServer::msg_setmetadata_(const string
& message
)
777 const char *p
= message
.data();
778 const char *p_end
= p
+ message
.size();
780 decode_length_and_check(&p
, p_end
, keylen
);
781 string
key(p
, keylen
);
783 string
val(p
, p_end
- p
);
784 wdb
->set_metadata(key
, val
);
788 RemoteServer::msg_addspelling(const string
& message
)
790 msg_addspelling_(message
);
791 send_message(REPLY_DONE
, string());
795 RemoteServer::msg_addspelling_(const string
& message
)
799 const char *p
= message
.data();
800 const char *p_end
= p
+ message
.size();
801 Xapian::termcount freqinc
;
802 decode_length(&p
, p_end
, freqinc
);
803 wdb
->add_spelling(string(p
, p_end
- p
), freqinc
);
807 RemoteServer::msg_removespelling(const string
& message
)
811 const char *p
= message
.data();
812 const char *p_end
= p
+ message
.size();
813 Xapian::termcount freqdec
;
814 decode_length(&p
, p_end
, freqdec
);
815 wdb
->remove_spelling(string(p
, p_end
- p
), freqdec
);