2 * @brief Xapian remote backend server base class
4 /* Copyright (C) 2006-2023 Olly Betts
5 * Copyright (C) 2006,2007,2009,2010 Lemur Consulting Ltd
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "remoteserver.h"
25 #include "xapian/constants.h"
26 #include "xapian/database.h"
27 #include "xapian/enquire.h"
28 #include "xapian/error.h"
29 #include "xapian/matchspy.h"
30 #include "xapian/query.h"
31 #include "xapian/valueiterator.h"
39 #include "matcher/multimatch.h"
43 #include "serialise.h"
44 #include "serialise-double.h"
45 #include "serialise-error.h"
47 #include "stringutils.h"
48 #include "weight/weightinternal.h"
50 XAPIAN_NORETURN(static void throw_read_only());
54 throw Xapian::InvalidOperationError("Server is read-only");
57 /// Class to throw when we receive the connection closing message.
58 struct ConnectionClosed
{ };
60 RemoteServer::RemoteServer(const std::vector
<std::string
> &dbpaths
,
61 int fdin_
, int fdout_
,
62 double active_timeout_
, double idle_timeout_
,
64 : RemoteConnection(fdin_
, fdout_
, std::string()),
65 db(NULL
), wdb(NULL
), writable(writable_
),
66 active_timeout(active_timeout_
), idle_timeout(idle_timeout_
)
68 // Catch errors opening the database and propagate them to the client.
70 Assert(!dbpaths
.empty());
71 // We always open the database read-only to start with. If we're
72 // writable, the client can ask to be upgraded to write access once
73 // connected if it wants it.
74 db
= new Xapian::Database(dbpaths
[0]);
75 // Build a better description than Database::get_description() gives
76 // in the variable context. FIXME: improve Database::get_description()
77 // and then just use that instead.
80 vector
<std::string
>::const_iterator
i(dbpaths
.begin());
81 for (++i
; i
!= dbpaths
.end(); ++i
) {
82 db
->add_database(Xapian::Database(*i
));
86 } catch (const Xapian::Error
&err
) {
87 // Propagate the exception to the client.
88 send_message(REPLY_EXCEPTION
, serialise_error(err
));
89 // And rethrow it so our caller can log it and close the connection.
94 // It's simplest to just ignore SIGPIPE. We'll still know if the
95 // connection dies because we'll get EPIPE back from write().
97 // This is OK because RemoteServer subclasses are only used in
98 // specialised programs - if we expose any of them as API classes
99 // then we should use SO_NOSIGPIE/MSG_NOSIGNAL instead like we do
100 // on the client side.
101 if (signal(SIGPIPE
, SIG_IGN
) == SIG_ERR
)
102 throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno
);
105 // Send greeting message.
106 msg_update(string());
109 RemoteServer::~RemoteServer()
112 // wdb is either NULL or equal to db, so we shouldn't delete it too!
116 RemoteServer::get_message(double timeout
, string
& result
,
117 message_type required_type
)
119 double end_time
= RealTime::end_time(timeout
);
120 int type
= RemoteConnection::get_message(result
, end_time
);
122 // Handle "shutdown connection" message here. Treat EOF here for a read-only
123 // database the same way since a read-only client just closes the
124 // connection when done.
125 if (type
== MSG_SHUTDOWN
|| (type
< 0 && wdb
== NULL
))
126 throw ConnectionClosed();
128 throw Xapian::NetworkError("Connection closed unexpectedly");
129 if (type
>= MSG_MAX
) {
130 string
errmsg("Invalid message type ");
132 throw Xapian::NetworkError(errmsg
);
134 if (required_type
!= MSG_MAX
&& type
!= int(required_type
)) {
135 string
errmsg("Expecting message type ");
136 errmsg
+= str(int(required_type
));
139 throw Xapian::NetworkError(errmsg
);
141 return static_cast<message_type
>(type
);
145 RemoteServer::send_message(reply_type type
, const string
&message
)
147 double end_time
= RealTime::end_time(active_timeout
);
148 unsigned char type_as_char
= static_cast<unsigned char>(type
);
149 RemoteConnection::send_message(type_as_char
, message
, end_time
);
152 typedef void (RemoteServer::* dispatch_func
)(const string
&);
159 /* This list needs to be kept in the same order as the list of
160 * message types in "remoteprotocol.h". Note that messages at the
161 * end of the list in "remoteprotocol.h" can be omitted if they
162 * don't correspond to dispatch actions.
164 static const dispatch_func dispatch
[] = {
165 &RemoteServer::msg_allterms
,
166 &RemoteServer::msg_collfreq
,
167 &RemoteServer::msg_document
,
168 &RemoteServer::msg_termexists
,
169 &RemoteServer::msg_termfreq
,
170 &RemoteServer::msg_valuestats
,
171 &RemoteServer::msg_keepalive
,
172 &RemoteServer::msg_doclength
,
173 &RemoteServer::msg_query
,
174 &RemoteServer::msg_termlist
,
175 &RemoteServer::msg_positionlist
,
176 &RemoteServer::msg_postlist
,
177 &RemoteServer::msg_reopen
,
178 &RemoteServer::msg_update
,
179 &RemoteServer::msg_adddocument
,
180 &RemoteServer::msg_cancel_
,
181 &RemoteServer::msg_deletedocumentterm_
,
182 &RemoteServer::msg_commit
,
183 &RemoteServer::msg_replacedocument_
,
184 &RemoteServer::msg_replacedocumentterm
,
185 &RemoteServer::msg_deletedocument
,
186 &RemoteServer::msg_writeaccess
,
187 &RemoteServer::msg_getmetadata
,
188 &RemoteServer::msg_setmetadata_
,
189 &RemoteServer::msg_addspelling_
,
190 &RemoteServer::msg_removespelling
,
191 0, // MSG_GETMSET - used during a conversation.
192 0, // MSG_SHUTDOWN - handled by get_message().
193 &RemoteServer::msg_openmetadatakeylist
,
194 &RemoteServer::msg_freqs
,
195 &RemoteServer::msg_uniqueterms
,
196 &RemoteServer::msg_deletedocumentterm
,
197 &RemoteServer::msg_replacedocument
,
198 &RemoteServer::msg_cancel
,
199 &RemoteServer::msg_setmetadata
,
200 &RemoteServer::msg_addspelling
,
204 size_t type
= get_message(idle_timeout
, message
);
205 if (type
>= sizeof(dispatch
) / sizeof(dispatch
[0]) || !dispatch
[type
]) {
206 string
errmsg("Unexpected message type ");
208 throw Xapian::InvalidArgumentError(errmsg
);
210 (this->*(dispatch
[type
]))(message
);
211 } catch (const Xapian::NetworkTimeoutError
& e
) {
213 // We've had a timeout, so the client may not be listening, so
214 // set the end_time to 1 and if we can't send the message right
215 // away, just exit and the client will cope.
216 send_message(REPLY_EXCEPTION
, serialise_error(e
), 1.0);
219 // And rethrow it so our caller can log it and close the
222 } catch (const Xapian::NetworkError
&) {
223 // All other network errors mean we are fatally confused and are
224 // unlikely to be able to communicate further across this
225 // connection. So we don't try to propagate the error to the
226 // client, but instead just rethrow the exception so our caller can
227 // log it and close the connection.
229 } catch (const Xapian::Error
&e
) {
230 // Propagate the exception to the client, then return to the main
231 // message handling loop.
232 send_message(REPLY_EXCEPTION
, serialise_error(e
));
233 } catch (ConnectionClosed
&) {
236 // Propagate an unknown exception to the client.
237 send_message(REPLY_EXCEPTION
, string());
238 // And rethrow it so our caller can log it and close the
246 RemoteServer::msg_allterms(const string
&message
)
248 string prev
= message
;
251 const string
& prefix
= message
;
252 const Xapian::TermIterator end
= db
->allterms_end(prefix
);
253 for (Xapian::TermIterator t
= db
->allterms_begin(prefix
); t
!= end
; ++t
) {
254 const string
& v
= *t
;
255 size_t reuse
= common_prefix_length(prev
, v
, 255);
256 reply
= encode_length(t
.get_termfreq());
257 reply
.append(1, char(reuse
));
258 reply
.append(v
, reuse
, string::npos
);
259 send_message(REPLY_ALLTERMS
, reply
);
263 send_message(REPLY_DONE
, string());
267 RemoteServer::msg_termlist(const string
&message
)
269 const char *p
= message
.data();
270 const char *p_end
= p
+ message
.size();
272 decode_length(&p
, p_end
, did
);
274 send_message(REPLY_DOCLENGTH
, encode_length(db
->get_doclength(did
)));
276 const Xapian::TermIterator end
= db
->termlist_end(did
);
277 for (Xapian::TermIterator t
= db
->termlist_begin(did
); t
!= end
; ++t
) {
278 const string
& v
= *t
;
279 size_t reuse
= common_prefix_length(prev
, v
, 255);
280 string reply
= encode_length(t
.get_wdf());
281 reply
+= encode_length(t
.get_termfreq());
282 reply
.append(1, char(reuse
));
283 reply
.append(v
, reuse
, string::npos
);
284 send_message(REPLY_TERMLIST
, reply
);
288 send_message(REPLY_DONE
, string());
292 RemoteServer::msg_positionlist(const string
&message
)
294 const char *p
= message
.data();
295 const char *p_end
= p
+ message
.size();
297 decode_length(&p
, p_end
, did
);
298 string
term(p
, p_end
- p
);
300 Xapian::termpos lastpos
= static_cast<Xapian::termpos
>(-1);
301 const Xapian::PositionIterator end
= db
->positionlist_end(did
, term
);
302 for (Xapian::PositionIterator i
= db
->positionlist_begin(did
, term
);
304 Xapian::termpos pos
= *i
;
305 send_message(REPLY_POSITIONLIST
, encode_length(pos
- lastpos
- 1));
309 send_message(REPLY_DONE
, string());
313 RemoteServer::msg_postlist(const string
&message
)
315 const string
& term
= message
;
317 Xapian::doccount termfreq
= db
->get_termfreq(term
);
318 Xapian::termcount collfreq
= db
->get_collection_freq(term
);
319 send_message(REPLY_POSTLISTSTART
, encode_length(termfreq
) + encode_length(collfreq
));
321 Xapian::docid lastdocid
= 0;
322 const Xapian::PostingIterator end
= db
->postlist_end(term
);
323 for (Xapian::PostingIterator i
= db
->postlist_begin(term
);
326 Xapian::docid newdocid
= *i
;
327 string reply
= encode_length(newdocid
- lastdocid
- 1);
328 reply
+= encode_length(i
.get_wdf());
330 send_message(REPLY_POSTLISTITEM
, reply
);
331 lastdocid
= newdocid
;
334 send_message(REPLY_DONE
, string());
338 RemoteServer::msg_writeaccess(const string
& msg
)
343 int flags
= Xapian::DB_OPEN
;
344 const char *p
= msg
.c_str();
345 const char *p_end
= p
+ msg
.size();
348 decode_length(&p
, p_end
, flag_bits
);
349 flags
|= flag_bits
&~ Xapian::DB_ACTION_MASK_
;
351 throw Xapian::NetworkError("Junk at end of MSG_WRITEACCESS");
355 wdb
= new Xapian::WritableDatabase(context
, flags
);
362 RemoteServer::msg_reopen(const string
& msg
)
365 send_message(REPLY_DONE
, string());
372 RemoteServer::msg_update(const string
&)
374 static const char protocol
[2] = {
375 char(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION
),
376 char(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION
)
378 string
message(protocol
, 2);
379 Xapian::doccount num_docs
= db
->get_doccount();
380 message
+= encode_length(num_docs
);
381 message
+= encode_length(db
->get_lastdocid() - num_docs
);
382 Xapian::termcount doclen_lb
= db
->get_doclength_lower_bound();
383 message
+= encode_length(doclen_lb
);
384 message
+= encode_length(db
->get_doclength_upper_bound() - doclen_lb
);
385 message
+= (db
->has_positions() ? '1' : '0');
386 message
+= encode_length(db
->get_total_length());
387 string uuid
= db
->get_uuid();
389 send_message(REPLY_UPDATE
, message
);
393 RemoteServer::msg_query(const string
&message_in
)
395 const char *p
= message_in
.c_str();
396 const char *p_end
= p
+ message_in
.size();
398 // Unserialise the Query.
400 decode_length_and_check(&p
, p_end
, len
);
401 Xapian::Query
query(Xapian::Query::unserialise(string(p
, len
), reg
));
404 // Unserialise assorted Enquire settings.
405 Xapian::termcount qlen
;
406 decode_length(&p
, p_end
, qlen
);
408 Xapian::valueno collapse_max
;
409 decode_length(&p
, p_end
, collapse_max
);
411 Xapian::valueno collapse_key
= Xapian::BAD_VALUENO
;
413 decode_length(&p
, p_end
, collapse_key
);
415 if (p_end
- p
< 4 || *p
< '0' || *p
> '2') {
416 throw Xapian::NetworkError("bad message (docid_order)");
418 Xapian::Enquire::docid_order order
;
419 order
= static_cast<Xapian::Enquire::docid_order
>(*p
++ - '0');
421 Xapian::valueno sort_key
;
422 decode_length(&p
, p_end
, sort_key
);
424 if (*p
< '0' || *p
> '3') {
425 throw Xapian::NetworkError("bad message (sort_by)");
427 Xapian::Enquire::Internal::sort_setting sort_by
;
428 sort_by
= static_cast<Xapian::Enquire::Internal::sort_setting
>(*p
++ - '0');
430 if (*p
< '0' || *p
> '1') {
431 throw Xapian::NetworkError("bad message (sort_value_forward)");
433 bool sort_value_forward(*p
++ != '0');
435 double time_limit
= unserialise_double(&p
, p_end
);
437 int percent_cutoff
= *p
++;
438 if (percent_cutoff
< 0 || percent_cutoff
> 100) {
439 throw Xapian::NetworkError("bad message (percent_cutoff)");
442 double weight_cutoff
= unserialise_double(&p
, p_end
);
443 if (weight_cutoff
< 0) {
444 throw Xapian::NetworkError("bad message (weight_cutoff)");
447 // Unserialise the Weight object.
448 decode_length_and_check(&p
, p_end
, len
);
449 string
wtname(p
, len
);
452 const Xapian::Weight
* wttype
= reg
.get_weighting_scheme(wtname
);
453 if (wttype
== NULL
) {
454 // Note: user weighting schemes should be registered by adding them to
455 // a Registry, and setting the context using
456 // RemoteServer::set_registry().
457 throw Xapian::InvalidArgumentError("Weighting scheme " +
458 wtname
+ " not registered");
461 decode_length_and_check(&p
, p_end
, len
);
462 AutoPtr
<Xapian::Weight
> wt(wttype
->unserialise(string(p
, len
)));
465 // Unserialise the RSet object.
466 decode_length_and_check(&p
, p_end
, len
);
467 Xapian::RSet rset
= unserialise_rset(string(p
, len
));
470 // Unserialise any MatchSpy objects.
471 vector
<Xapian::Internal::opt_intrusive_ptr
<Xapian::MatchSpy
>> matchspies
;
473 decode_length_and_check(&p
, p_end
, len
);
474 string
spytype(p
, len
);
475 const Xapian::MatchSpy
* spyclass
= reg
.get_match_spy(spytype
);
476 if (spyclass
== NULL
) {
477 throw Xapian::InvalidArgumentError("Match spy " + spytype
+
482 decode_length_and_check(&p
, p_end
, len
);
483 matchspies
.push_back(spyclass
->unserialise(string(p
, len
), reg
)->release());
487 Xapian::Weight::Internal local_stats
;
488 MultiMatch
match(*db
, query
, qlen
, &rset
, collapse_max
, collapse_key
,
489 percent_cutoff
, weight_cutoff
, order
,
490 sort_key
, sort_by
, sort_value_forward
, time_limit
,
491 local_stats
, wt
.get(), matchspies
, false, false);
493 send_message(REPLY_STATS
, serialise_stats(local_stats
));
496 get_message(active_timeout
, message
, MSG_GETMSET
);
498 p_end
= p
+ message
.size();
500 Xapian::termcount first
;
501 decode_length(&p
, p_end
, first
);
502 Xapian::termcount maxitems
;
503 decode_length(&p
, p_end
, maxitems
);
505 Xapian::termcount check_at_least
;
506 decode_length(&p
, p_end
, check_at_least
);
508 AutoPtr
<Xapian::Weight::Internal
> total_stats(new Xapian::Weight::Internal
);
509 unserialise_stats(p
, p_end
, *(total_stats
.get()));
510 total_stats
->set_bounds_from_db(*db
);
513 match
.get_mset(first
, maxitems
, check_at_least
, mset
, *(total_stats
.get()), 0, 0);
514 mset
.internal
->stats
= total_stats
.release();
517 for (auto i
: matchspies
) {
518 string spy_results
= i
->serialise_results();
519 message
+= encode_length(spy_results
.size());
520 message
+= spy_results
;
522 message
+= serialise_mset(mset
);
523 send_message(REPLY_RESULTS
, message
);
527 RemoteServer::msg_document(const string
&message
)
529 const char *p
= message
.data();
530 const char *p_end
= p
+ message
.size();
532 decode_length(&p
, p_end
, did
);
534 Xapian::Document doc
= db
->get_document(did
);
536 send_message(REPLY_DOCDATA
, doc
.get_data());
538 Xapian::ValueIterator i
;
539 for (i
= doc
.values_begin(); i
!= doc
.values_end(); ++i
) {
540 string item
= encode_length(i
.get_valueno());
542 send_message(REPLY_VALUE
, item
);
544 send_message(REPLY_DONE
, string());
548 RemoteServer::msg_keepalive(const string
&)
550 // Ensure *our* database stays alive, as it may contain remote databases!
552 send_message(REPLY_DONE
, string());
556 RemoteServer::msg_termexists(const string
&term
)
558 send_message((db
->term_exists(term
) ? REPLY_TERMEXISTS
: REPLY_TERMDOESNTEXIST
), string());
562 RemoteServer::msg_collfreq(const string
&term
)
564 send_message(REPLY_COLLFREQ
, encode_length(db
->get_collection_freq(term
)));
568 RemoteServer::msg_termfreq(const string
&term
)
570 send_message(REPLY_TERMFREQ
, encode_length(db
->get_termfreq(term
)));
574 RemoteServer::msg_freqs(const string
&term
)
576 string msg
= encode_length(db
->get_termfreq(term
));
577 msg
+= encode_length(db
->get_collection_freq(term
));
578 send_message(REPLY_FREQS
, msg
);
582 RemoteServer::msg_valuestats(const string
& message
)
584 const char *p
= message
.data();
585 const char *p_end
= p
+ message
.size();
587 Xapian::valueno slot
;
588 decode_length(&p
, p_end
, slot
);
590 message_out
+= encode_length(db
->get_value_freq(slot
));
591 string bound
= db
->get_value_lower_bound(slot
);
592 message_out
+= encode_length(bound
.size());
593 message_out
+= bound
;
594 bound
= db
->get_value_upper_bound(slot
);
595 message_out
+= encode_length(bound
.size());
596 message_out
+= bound
;
598 send_message(REPLY_VALUESTATS
, message_out
);
603 RemoteServer::msg_doclength(const string
&message
)
605 const char *p
= message
.data();
606 const char *p_end
= p
+ message
.size();
608 decode_length(&p
, p_end
, did
);
609 send_message(REPLY_DOCLENGTH
, encode_length(db
->get_doclength(did
)));
613 RemoteServer::msg_uniqueterms(const string
&message
)
615 const char *p
= message
.data();
616 const char *p_end
= p
+ message
.size();
618 decode_length(&p
, p_end
, did
);
619 send_message(REPLY_UNIQUETERMS
, encode_length(db
->get_unique_terms(did
)));
623 RemoteServer::msg_commit(const string
&)
630 send_message(REPLY_DONE
, string());
634 RemoteServer::msg_cancel(const string
&message
)
636 msg_cancel_(message
);
637 send_message(REPLY_DONE
, string());
641 RemoteServer::msg_cancel_(const string
&)
646 // We can't call cancel since that's an internal method, but this
647 // has the same effect with minimal additional overhead.
648 wdb
->begin_transaction(false);
649 wdb
->cancel_transaction();
653 RemoteServer::msg_adddocument(const string
& message
)
658 Xapian::docid did
= wdb
->add_document(unserialise_document(message
));
660 send_message(REPLY_ADDDOCUMENT
, encode_length(did
));
664 RemoteServer::msg_deletedocument(const string
& message
)
669 const char *p
= message
.data();
670 const char *p_end
= p
+ message
.size();
672 decode_length(&p
, p_end
, did
);
674 wdb
->delete_document(did
);
676 send_message(REPLY_DONE
, string());
680 RemoteServer::msg_deletedocumentterm(const string
& message
)
682 msg_deletedocumentterm_(message
);
683 send_message(REPLY_DONE
, string());
687 RemoteServer::msg_deletedocumentterm_(const string
& message
)
692 wdb
->delete_document(message
);
696 RemoteServer::msg_replacedocument(const string
& message
)
698 msg_replacedocument_(message
);
699 send_message(REPLY_DONE
, string());
703 RemoteServer::msg_replacedocument_(const string
& message
)
708 const char *p
= message
.data();
709 const char *p_end
= p
+ message
.size();
711 decode_length(&p
, p_end
, did
);
713 wdb
->replace_document(did
, unserialise_document(string(p
, p_end
)));
717 RemoteServer::msg_replacedocumentterm(const string
& message
)
722 const char *p
= message
.data();
723 const char *p_end
= p
+ message
.size();
725 decode_length_and_check(&p
, p_end
, len
);
726 string
unique_term(p
, len
);
729 Xapian::docid did
= wdb
->replace_document(unique_term
, unserialise_document(string(p
, p_end
)));
731 send_message(REPLY_ADDDOCUMENT
, encode_length(did
));
735 RemoteServer::msg_getmetadata(const string
& message
)
737 send_message(REPLY_METADATA
, db
->get_metadata(message
));
741 RemoteServer::msg_openmetadatakeylist(const string
& message
)
743 string prev
= message
;
746 const string
& prefix
= message
;
747 const Xapian::TermIterator end
= db
->metadata_keys_end(prefix
);
748 Xapian::TermIterator t
= db
->metadata_keys_begin(prefix
);
749 for (; t
!= end
; ++t
) {
750 const string
& v
= *t
;
751 size_t reuse
= common_prefix_length(prev
, v
, 255);
752 reply
.assign(1, char(reuse
));
753 reply
.append(v
, reuse
, string::npos
);
754 send_message(REPLY_METADATAKEYLIST
, reply
);
757 send_message(REPLY_DONE
, string());
761 RemoteServer::msg_setmetadata(const string
& message
)
763 msg_setmetadata_(message
);
764 send_message(REPLY_DONE
, string());
768 RemoteServer::msg_setmetadata_(const string
& message
)
772 const char *p
= message
.data();
773 const char *p_end
= p
+ message
.size();
775 decode_length_and_check(&p
, p_end
, keylen
);
776 string
key(p
, keylen
);
778 string
val(p
, p_end
- p
);
779 wdb
->set_metadata(key
, val
);
783 RemoteServer::msg_addspelling(const string
& message
)
785 msg_addspelling_(message
);
786 send_message(REPLY_DONE
, string());
790 RemoteServer::msg_addspelling_(const string
& message
)
794 const char *p
= message
.data();
795 const char *p_end
= p
+ message
.size();
796 Xapian::termcount freqinc
;
797 decode_length(&p
, p_end
, freqinc
);
798 wdb
->add_spelling(string(p
, p_end
- p
), freqinc
);
802 RemoteServer::msg_removespelling(const string
& message
)
806 const char *p
= message
.data();
807 const char *p_end
= p
+ message
.size();
808 Xapian::termcount freqdec
;
809 decode_length(&p
, p_end
, freqdec
);
810 wdb
->remove_spelling(string(p
, p_end
- p
), freqdec
);