Add constant FLINT_MAX_DOCID
[xapian.git] / xapian-core / api / replication.cc
blob728350be6e0e3d5e391648bcbae2336360ebfb9c
1 /** @file replication.cc
2 * @brief Replication support for Xapian databases.
3 */
4 /* Copyright (C) 2008 Lemur Consulting Ltd
5 * Copyright (C) 2008,2009,2010,2011,2015 Olly Betts
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <config.h>
24 #include "replication.h"
26 #include "xapian/base.h"
27 #include "xapian/dbfactory.h"
28 #include "xapian/error.h"
29 #include "xapian/version.h"
31 #include "database.h"
32 #include "databasereplicator.h"
33 #include "debuglog.h"
34 #include "fileutils.h"
35 #ifdef __WIN32__
36 # include "msvc_posix_wrapper.h"
37 #endif
38 #include "net/length.h"
39 #include "omassert.h"
40 #include "realtime.h"
41 #include "remoteconnection.h"
42 #include "noreturn.h"
43 #include "replicationprotocol.h"
44 #include "safeerrno.h"
45 #include "safesysstat.h"
46 #include "safeunistd.h"
47 #include "serialise.h"
48 #include "str.h"
49 #include "utils.h"
51 #include "autoptr.h"
52 #include <cstdio> // For rename().
53 #include <fstream>
54 #include <string>
56 using namespace std;
57 using namespace Xapian;
59 // The banner comment used at the top of the replica's stub database file.
60 #define REPLICA_STUB_BANNER \
61 "# Automatically generated by Xapian::DatabaseReplica v" XAPIAN_VERSION ".\n" \
62 "# Do not manually edit - replication operations may regenerate this file.\n"
64 XAPIAN_NORETURN(static void throw_connection_closed_unexpectedly());
65 static void
66 throw_connection_closed_unexpectedly()
68 throw Xapian::NetworkError("Connection closed unexpectedly");
71 void
72 DatabaseMaster::write_changesets_to_fd(int fd,
73 const string & start_revision,
74 ReplicationInfo * info) const
76 LOGCALL_VOID(REPLICA, "DatabaseMaster::write_changesets_to_fd", fd | start_revision | info);
77 if (info != NULL)
78 info->clear();
79 Database db;
80 try {
81 db = Database(path);
82 } catch (const Xapian::DatabaseError & e) {
83 RemoteConnection conn(-1, fd);
84 conn.send_message(REPL_REPLY_FAIL,
85 "Can't open database: " + e.get_msg(),
86 0.0);
87 return;
89 if (db.internal.size() != 1) {
90 throw Xapian::InvalidOperationError("DatabaseMaster needs to be pointed at exactly one subdatabase");
93 // Extract the UUID from start_revision and compare it to the database.
94 bool need_whole_db = false;
95 string revision;
96 if (start_revision.empty()) {
97 need_whole_db = true;
98 } else {
99 const char * ptr = start_revision.data();
100 const char * end = ptr + start_revision.size();
101 size_t uuid_length;
102 decode_length_and_check(&ptr, end, uuid_length);
103 string request_uuid(ptr, uuid_length);
104 ptr += uuid_length;
105 string db_uuid = db.internal[0]->get_uuid();
106 if (request_uuid != db_uuid) {
107 need_whole_db = true;
109 revision.assign(ptr, end - ptr);
112 db.internal[0]->write_changesets_to_fd(fd, revision, need_whole_db, info);
115 string
116 DatabaseMaster::get_description() const
118 return "DatabaseMaster(" + path + ")";
121 /// Internal implementation of DatabaseReplica
122 class DatabaseReplica::Internal : public Xapian::Internal::RefCntBase {
123 /// Don't allow assignment.
124 void operator=(const Internal &);
126 /// Don't allow copying.
127 Internal(const Internal &);
129 /// The path to the replica directory.
130 string path;
132 /// The id of the currently live database in the replica (0 or 1).
133 int live_id;
135 /** The live database being replicated.
137 * This needs to be mutable because it is sometimes lazily opened.
139 mutable WritableDatabase live_db;
141 /** Do we have an offline database currently?
143 * The offline database is a new copy of the database we're bringing up
144 * to the required revision, which can't yet be made live.
146 bool have_offline_db;
148 /** Flag to indicate that the only valid operation next is a full copy.
150 bool need_copy_next;
152 /** The revision that the secondary database has been updated to.
154 string offline_revision;
156 /** The UUID of the secondary database.
158 string offline_uuid;
160 /** The revision that the secondary database must reach before it can be
161 * made live.
163 string offline_needed_revision;
165 /** The time at which a changeset was last applied to the live database.
167 * Set to 0 if no changeset applied to the live database so far.
169 double last_live_changeset_time;
171 /// The remote connection we're using.
172 RemoteConnection * conn;
174 /** Update the stub database which points to a single database.
176 * The stub database file is created at a separate path, and then
177 * atomically moved into place to replace the old stub database. This
178 * should allow searches to continue uninterrupted.
180 void update_stub_database() const;
182 /** Delete the offline database. */
183 void remove_offline_db();
185 /** Apply a set of DB copy messages from the connection.
187 void apply_db_copy(double end_time);
189 /** Check that a message type is as expected.
191 * Throws a NetworkError if the type is not the expected one.
193 void check_message_type(int type, int expected) const;
195 /** Check if the offline database has reached the required version.
197 * If so, make it live, and remove the old live database.
199 * @return true iff the offline database is made live
201 bool possibly_make_offline_live();
203 string get_replica_path(int id) const {
204 string p = path;
205 p += "/replica_";
206 p += char('0' + id);
207 return p;
210 public:
211 /// Open a new DatabaseReplica::Internal for the specified path.
212 Internal(const string & path_);
214 /// Destructor.
215 ~Internal() { delete conn; }
217 /// Get a string describing the current revision of the replica.
218 string get_revision_info() const;
220 /// Set the file descriptor to read changesets from.
221 void set_read_fd(int fd);
223 /// Read and apply the next changeset.
224 bool apply_next_changeset(ReplicationInfo * info,
225 double reader_close_time);
227 /// Return a string describing this object.
228 string get_description() const { return path; }
231 // Methods of DatabaseReplica
233 DatabaseReplica::DatabaseReplica(const DatabaseReplica & other)
234 : internal(other.internal)
236 LOGCALL_CTOR(REPLICA, "DatabaseReplica", other);
239 void
240 DatabaseReplica::operator=(const DatabaseReplica & other)
242 LOGCALL_VOID(REPLICA, "DatabaseReplica::operator=", other);
243 internal = other.internal;
246 DatabaseReplica::DatabaseReplica()
247 : internal(0)
249 LOGCALL_CTOR(REPLICA, "DatabaseReplica", NO_ARGS);
252 DatabaseReplica::DatabaseReplica(const string & path)
253 : internal(new DatabaseReplica::Internal(path))
255 LOGCALL_CTOR(REPLICA, "DatabaseReplica", path);
258 DatabaseReplica::~DatabaseReplica()
260 LOGCALL_DTOR(REPLICA, "DatabaseReplica");
263 string
264 DatabaseReplica::get_revision_info() const
266 LOGCALL(REPLICA, string, "DatabaseReplica::get_revision_info", NO_ARGS);
267 if (internal.get() == NULL)
268 throw Xapian::InvalidOperationError("Attempt to call DatabaseReplica::get_revision_info on a closed replica.");
269 RETURN(internal->get_revision_info());
272 void
273 DatabaseReplica::set_read_fd(int fd)
275 LOGCALL_VOID(REPLICA, "DatabaseReplica::set_read_fd", fd);
276 if (internal.get() == NULL)
277 throw Xapian::InvalidOperationError("Attempt to call DatabaseReplica::set_read_fd on a closed replica.");
278 internal->set_read_fd(fd);
281 bool
282 DatabaseReplica::apply_next_changeset(ReplicationInfo * info,
283 double reader_close_time)
285 LOGCALL(REPLICA, bool, "DatabaseReplica::apply_next_changeset", info | reader_close_time);
286 if (info != NULL)
287 info->clear();
288 if (internal.get() == NULL)
289 throw Xapian::InvalidOperationError("Attempt to call DatabaseReplica::apply_next_changeset on a closed replica.");
290 RETURN(internal->apply_next_changeset(info, reader_close_time));
293 void
294 DatabaseReplica::close()
296 LOGCALL(REPLICA, bool, "DatabaseReplica::close", NO_ARGS);
297 internal = NULL;
300 string
301 DatabaseReplica::get_description() const
303 string desc("DatabaseReplica(");
304 if (internal.get()) {
305 desc += internal->get_description();
307 desc += ')';
308 return desc;
311 // Methods of DatabaseReplica::Internal
313 void
314 DatabaseReplica::Internal::update_stub_database() const
316 string stub_path = path;
317 stub_path += "/XAPIANDB";
318 string tmp_path = stub_path;
319 tmp_path += ".tmp";
321 ofstream stub(tmp_path.c_str());
322 stub << REPLICA_STUB_BANNER
323 "auto replica_" << live_id << endl;
325 int result;
326 #ifdef __WIN32__
327 result = msvc_posix_rename(tmp_path.c_str(), stub_path.c_str());
328 #else
329 result = rename(tmp_path.c_str(), stub_path.c_str());
330 #endif
331 if (result == -1) {
332 string msg("Failed to update stub db file for replica: ");
333 msg += path;
334 throw Xapian::DatabaseOpeningError(msg);
338 DatabaseReplica::Internal::Internal(const string & path_)
339 : path(path_), live_id(0), live_db(), have_offline_db(false),
340 need_copy_next(false), offline_revision(), offline_needed_revision(),
341 last_live_changeset_time(), conn(NULL)
343 LOGCALL_CTOR(REPLICA, "DatabaseReplica::Internal", path_);
344 #if ! defined XAPIAN_HAS_FLINT_BACKEND && ! defined XAPIAN_HAS_CHERT_BACKEND
345 throw FeatureUnavailableError("Replication requires the Flint or Chert backend to be enabled");
346 #else
347 if (mkdir(path, 0777) == 0) {
348 // The database doesn't already exist - make a directory, containing a
349 // stub database, and point it to a new database.
351 // Create an empty database - the backend doesn't matter as if the
352 // master is a different type, then the replica will become that type
353 // automatically.
354 live_db = WritableDatabase(get_replica_path(live_id),
355 Xapian::DB_CREATE);
356 update_stub_database();
357 } else {
358 if (errno != EEXIST) {
359 throw DatabaseOpeningError("Couldn't create directory '" + path + "'", errno);
361 if (!dir_exists(path)) {
362 throw DatabaseOpeningError("Replica path must be a directory");
364 string stub_path = path;
365 stub_path += "/XAPIANDB";
366 live_db = Auto::open_stub(stub_path, Xapian::DB_OPEN);
367 // FIXME: simplify all this?
368 ifstream stub(stub_path.c_str());
369 string line;
370 while (getline(stub, line)) {
371 if (!line.empty() && line[0] != '#') {
372 live_id = line[line.size() - 1] - '0';
373 break;
377 #endif
380 string
381 DatabaseReplica::Internal::get_revision_info() const
383 LOGCALL(REPLICA, string, "DatabaseReplica::Internal::get_revision_info", NO_ARGS);
384 if (live_db.internal.empty())
385 live_db = WritableDatabase(get_replica_path(live_id), Xapian::DB_OPEN);
386 if (live_db.internal.size() != 1)
387 throw Xapian::InvalidOperationError("DatabaseReplica needs to be pointed at exactly one subdatabase");
389 string uuid = (live_db.internal[0])->get_uuid();
390 string buf = encode_length(uuid.size());
391 buf += uuid;
392 buf += (live_db.internal[0])->get_revision_info();
393 RETURN(buf);
396 void
397 DatabaseReplica::Internal::remove_offline_db()
399 // Delete the offline database.
400 removedir(get_replica_path(live_id ^ 1));
401 have_offline_db = false;
404 void
405 DatabaseReplica::Internal::apply_db_copy(double end_time)
407 have_offline_db = true;
408 last_live_changeset_time = 0;
409 string offline_path = get_replica_path(live_id ^ 1);
410 // If there's already an offline database, discard it. This happens if one
411 // copy of the database was sent, but further updates were needed before it
412 // could be made live, and the remote end was then unable to send those
413 // updates (probably due to not having changesets available, or the remote
414 // database being replaced by a new database).
415 removedir(offline_path);
416 if (mkdir(offline_path, 0777)) {
417 throw Xapian::DatabaseError("Cannot make directory '" +
418 offline_path + "'", errno);
422 string buf;
423 int type = conn->get_message(buf, end_time);
424 check_message_type(type, REPL_REPLY_DB_HEADER);
425 const char * ptr = buf.data();
426 const char * end = ptr + buf.size();
427 size_t uuid_length;
428 decode_length_and_check(&ptr, end, uuid_length);
429 offline_uuid.assign(ptr, uuid_length);
430 offline_revision.assign(buf, ptr + uuid_length - buf.data(), buf.npos);
433 // Now, read the files for the database from the connection and create it.
434 while (true) {
435 string filename;
436 int type = conn->sniff_next_message_type(end_time);
437 if (type < 0 || type == REPL_REPLY_FAIL)
438 return;
439 if (type == REPL_REPLY_DB_FOOTER)
440 break;
442 type = conn->get_message(filename, end_time);
443 check_message_type(type, REPL_REPLY_DB_FILENAME);
445 // Check that the filename doesn't contain '..'. No valid database
446 // file contains .., so we don't need to check that the .. is a path.
447 if (filename.find("..") != string::npos) {
448 throw NetworkError("Filename in database contains '..'");
451 type = conn->sniff_next_message_type(end_time);
452 if (type < 0 || type == REPL_REPLY_FAIL)
453 return;
455 string filepath = offline_path + "/" + filename;
456 type = conn->receive_file(filepath, end_time);
457 if (type < 0)
458 throw_connection_closed_unexpectedly();
459 check_message_type(type, REPL_REPLY_DB_FILEDATA);
461 int type = conn->get_message(offline_needed_revision, end_time);
462 check_message_type(type, REPL_REPLY_DB_FOOTER);
463 need_copy_next = false;
466 void
467 DatabaseReplica::Internal::check_message_type(int type, int expected) const
469 if (type != expected) {
470 if (type < 0)
471 throw_connection_closed_unexpectedly();
472 string m = "Expected replication protocol message type #";
473 m += str(expected);
474 m += ", got #";
475 m += str(type);
476 throw NetworkError(m);
480 bool
481 DatabaseReplica::Internal::possibly_make_offline_live()
483 string replica_path(get_replica_path(live_id ^ 1));
484 AutoPtr<DatabaseReplicator> replicator;
485 try {
486 replicator.reset(DatabaseReplicator::open(replica_path));
487 } catch (const Xapian::DatabaseError &) {
488 return false;
490 if (offline_needed_revision.empty()) {
491 return false;
493 if (!replicator->check_revision_at_least(offline_revision,
494 offline_needed_revision)) {
495 return false;
498 string replicated_uuid = replicator->get_uuid();
499 if (replicated_uuid.empty()) {
500 return false;
503 if (replicated_uuid != offline_uuid) {
504 return false;
507 live_id ^= 1;
508 // Open the database first, so that if there's a problem, an exception
509 // will be thrown before we make the new database live.
510 live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
511 update_stub_database();
512 remove_offline_db();
513 return true;
516 void
517 DatabaseReplica::Internal::set_read_fd(int fd)
519 delete conn;
520 conn = NULL;
521 conn = new RemoteConnection(fd, -1);
524 bool
525 DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info,
526 double reader_close_time)
528 LOGCALL(REPLICA, bool, "DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time);
529 if (live_db.internal.empty())
530 live_db = WritableDatabase(get_replica_path(live_id), Xapian::DB_OPEN);
531 if (live_db.internal.size() != 1)
532 throw Xapian::InvalidOperationError("DatabaseReplica needs to be pointed at exactly one subdatabase");
534 while (true) {
535 int type = conn->sniff_next_message_type(0.0);
536 switch (type) {
537 case REPL_REPLY_END_OF_CHANGES: {
538 string buf;
539 type = conn->get_message(buf, 0.0);
540 check_message_type(type, REPL_REPLY_END_OF_CHANGES);
541 RETURN(false);
543 case REPL_REPLY_DB_HEADER:
544 // Apply the copy - remove offline db in case of any error.
545 try {
546 apply_db_copy(0.0);
547 if (info != NULL)
548 ++(info->fullcopy_count);
549 string replica_uuid;
551 AutoPtr<DatabaseReplicator> replicator(
552 DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
553 replica_uuid = replicator->get_uuid();
555 if (replica_uuid != offline_uuid) {
556 remove_offline_db();
557 // We've been sent an database with the wrong uuid,
558 // which only happens if the database at the server
559 // got changed during the copy, so the only safe
560 // action next is a new copy. Set a flag to ensure
561 // that this happens, or we're at risk of database
562 // corruption.
563 need_copy_next = true;
565 } catch (...) {
566 remove_offline_db();
567 throw;
569 if (possibly_make_offline_live()) {
570 if (info != NULL)
571 info->changed = true;
573 break;
574 case REPL_REPLY_CHANGESET:
575 if (need_copy_next) {
576 throw NetworkError("Needed a database copy next");
578 if (!have_offline_db) {
579 // Close the live db.
580 string replica_path(get_replica_path(live_id));
581 live_db = WritableDatabase();
583 if (last_live_changeset_time != 0.0) {
584 // Wait until at least "reader_close_time" seconds have
585 // passed since the last changeset was applied, to
586 // allow any active readers to finish and be reopened.
587 double until;
588 until = last_live_changeset_time + reader_close_time;
589 RealTime::sleep(until);
592 // Open a replicator for the live path, and apply the
593 // changeset.
595 AutoPtr<DatabaseReplicator> replicator(
596 DatabaseReplicator::open(replica_path));
598 // Ignore the returned revision number, since we are
599 // live so the changeset must be safe to apply to a
600 // live DB.
601 replicator->apply_changeset_from_conn(*conn, 0.0, true);
603 last_live_changeset_time = RealTime::now();
605 if (info != NULL) {
606 ++(info->changeset_count);
607 info->changed = true;
609 // Now the replicator is closed, open the live db again.
610 live_db = WritableDatabase(replica_path, Xapian::DB_OPEN);
611 RETURN(true);
615 AutoPtr<DatabaseReplicator> replicator(
616 DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
618 offline_revision = replicator->
619 apply_changeset_from_conn(*conn, 0.0, false);
621 if (info != NULL) {
622 ++(info->changeset_count);
625 if (possibly_make_offline_live()) {
626 if (info != NULL)
627 info->changed = true;
629 RETURN(true);
630 case REPL_REPLY_FAIL: {
631 string buf;
632 if (conn->get_message(buf, 0.0) < 0)
633 throw_connection_closed_unexpectedly();
634 throw NetworkError("Unable to fully synchronise: " + buf);
636 case -1:
637 throw_connection_closed_unexpectedly();
638 default:
639 throw NetworkError("Unknown replication protocol message ("
640 + str(type) + ")");