4 * Manages repository synchronization for cluster repositories.
6 * @task config Configuring Synchronization
7 * @task sync Cluster Synchronization
8 * @task internal Internals
10 final class DiffusionRepositoryClusterEngine
extends Phobject
{
14 private $actingAsPHID;
17 private $clusterWriteLock;
18 private $clusterWriteVersion;
19 private $clusterWriteOwner;
22 /* -( Configuring Synchronization )---------------------------------------- */
25 public function setRepository(PhabricatorRepository
$repository) {
26 $this->repository
= $repository;
30 public function getRepository() {
31 return $this->repository
;
34 public function setViewer(PhabricatorUser
$viewer) {
35 $this->viewer
= $viewer;
39 public function getViewer() {
43 public function setLog(DiffusionRepositoryClusterEngineLogInterface
$log) {
48 public function setActingAsPHID($acting_as_phid) {
49 $this->actingAsPHID
= $acting_as_phid;
53 public function getActingAsPHID() {
54 return $this->actingAsPHID
;
57 private function getEffectiveActingAsPHID() {
58 if ($this->actingAsPHID
) {
59 return $this->actingAsPHID
;
62 return $this->getViewer()->getPHID();
66 /* -( Cluster Synchronization )-------------------------------------------- */
70 * Synchronize repository version information after creating a repository.
72 * This initializes working copy versions for all currently bound devices to
73 * 0, so that we don't get stuck making an ambiguous choice about which
74 * devices are leaders when we later synchronize before a read.
78 public function synchronizeWorkingCopyAfterCreation() {
79 if (!$this->shouldEnableSynchronization(false)) {
83 $repository = $this->getRepository();
84 $repository_phid = $repository->getPHID();
86 $service = $repository->loadAlmanacService();
88 throw new Exception(pht('Failed to load repository cluster service.'));
91 $bindings = $service->getActiveBindings();
92 foreach ($bindings as $binding) {
93 PhabricatorRepositoryWorkingCopyVersion
::updateVersion(
95 $binding->getDevicePHID(),
106 public function synchronizeWorkingCopyAfterHostingChange() {
107 if (!$this->shouldEnableSynchronization(false)) {
111 $repository = $this->getRepository();
112 $repository_phid = $repository->getPHID();
114 $versions = PhabricatorRepositoryWorkingCopyVersion
::loadVersions(
116 $versions = mpull($versions, null, 'getDevicePHID');
118 // After converting a hosted repository to observed, or vice versa, we
119 // need to reset version numbers because the clocks for observed and hosted
120 // repositories run on different units.
122 // We identify all the cluster leaders and reset their version to 0.
123 // We identify all the cluster followers and demote them.
125 // This allows the cluster to start over again at version 0 but keep the
129 $max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
130 foreach ($versions as $version) {
131 $device_phid = $version->getDevicePHID();
133 if ($version->getRepositoryVersion() == $max_version) {
134 PhabricatorRepositoryWorkingCopyVersion
::updateVersion(
139 PhabricatorRepositoryWorkingCopyVersion
::demoteDevice(
153 public function synchronizeWorkingCopyBeforeRead() {
154 if (!$this->shouldEnableSynchronization(true)) {
158 $repository = $this->getRepository();
159 $repository_phid = $repository->getPHID();
161 $device = AlmanacKeys
::getLiveDevice();
162 $device_phid = $device->getPHID();
164 $read_lock = PhabricatorRepositoryWorkingCopyVersion
::getReadLock(
168 $lock_wait = phutil_units('2 minutes in seconds');
172 'Acquiring read lock for repository "%s" on device "%s"...',
173 $repository->getDisplayName(),
174 $device->getName()));
177 $start = PhabricatorTime
::getNow();
178 $read_lock->lock($lock_wait);
179 $waited = (PhabricatorTime
::getNow() - $start);
184 'Acquired read lock after %s second(s).',
185 new PhutilNumber($waited)));
189 'Acquired read lock immediately.'));
191 } catch (PhutilLockException
$ex) {
192 throw new PhutilProxyException(
194 'Failed to acquire read lock after waiting %s second(s). You '.
195 'may be able to retry later. (%s)',
196 new PhutilNumber($lock_wait),
201 $versions = PhabricatorRepositoryWorkingCopyVersion
::loadVersions(
203 $versions = mpull($versions, null, 'getDevicePHID');
205 $this_version = idx($versions, $device_phid);
207 $this_version = (int)$this_version->getRepositoryVersion();
209 $this_version = null;
213 // This is the normal case, where we have some version information and
214 // can identify which nodes are leaders. If the current node is not a
215 // leader, we want to fetch from a leader and then update our version.
217 $max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
218 if (($this_version === null) ||
($max_version > $this_version)) {
219 if ($repository->isHosted()) {
220 $fetchable = array();
221 foreach ($versions as $version) {
222 if ($version->getRepositoryVersion() == $max_version) {
223 $fetchable[] = $version->getDevicePHID();
228 $this->synchronizeWorkingCopyFromDevices(
233 $this->synchronizeWorkingCopyFromRemote();
236 PhabricatorRepositoryWorkingCopyVersion
::updateVersion(
243 'Device "%s" is already a cluster leader and does not need '.
244 'to be synchronized.',
245 $device->getName()));
248 $result_version = $max_version;
250 // If no version records exist yet, we need to be careful, because we
251 // can not tell which nodes are leaders.
253 // There might be several nodes with arbitrary existing data, and we have
254 // no way to tell which one has the "right" data. If we pick wrong, we
255 // might erase some or all of the data in the repository.
257 // Since this is dangerous, we refuse to guess unless there is only one
258 // device. If we're the only device in the group, we obviously must be
261 $service = $repository->loadAlmanacService();
263 throw new Exception(pht('Failed to load repository cluster service.'));
266 $bindings = $service->getActiveBindings();
267 $device_map = array();
268 foreach ($bindings as $binding) {
269 $device_map[$binding->getDevicePHID()] = true;
272 if (count($device_map) > 1) {
275 'Repository "%s" exists on more than one device, but no device '.
276 'has any repository version information. There is no way for the '.
277 'software to determine which copy of the existing data is '.
278 'authoritative. Promote a device or see "Ambiguous Leaders" in '.
279 'the documentation.',
280 $repository->getDisplayName()));
283 if (empty($device_map[$device->getPHID()])) {
286 'Repository "%s" is being synchronized on device "%s", but '.
287 'this device is not bound to the corresponding cluster '.
289 $repository->getDisplayName(),
291 $service->getName()));
294 // The current device is the only device in service, so it must be a
295 // leader. We can safely have any future nodes which come online read
297 PhabricatorRepositoryWorkingCopyVersion
::updateVersion(
305 $read_lock->unlock();
307 return $result_version;
314 public function synchronizeWorkingCopyBeforeWrite() {
315 if (!$this->shouldEnableSynchronization(true)) {
319 $repository = $this->getRepository();
320 $viewer = $this->getViewer();
322 $repository_phid = $repository->getPHID();
324 $device = AlmanacKeys
::getLiveDevice();
325 $device_phid = $device->getPHID();
327 $table = new PhabricatorRepositoryWorkingCopyVersion();
328 $locked_connection = $table->establishConnection('w');
330 $write_lock = PhabricatorRepositoryWorkingCopyVersion
::getWriteLock(
333 $write_lock->setExternalConnection($locked_connection);
337 'Acquiring write lock for repository "%s"...',
338 $repository->getDisplayName()));
340 // See T13590. On the HTTP pathway, it's possible for us to hit the script
341 // time limit while holding the durable write lock if a user makes a big
342 // push. Remove the time limit before we acquire the durable lock.
345 $lock_wait = phutil_units('2 minutes in seconds');
347 $write_wait_start = microtime(true);
349 $start = PhabricatorTime
::getNow();
354 $write_lock->lock((int)floor($step_wait));
355 $write_wait_end = microtime(true);
357 } catch (PhutilLockException
$ex) {
358 $waited = (PhabricatorTime
::getNow() - $start);
359 if ($waited > $lock_wait) {
362 $this->logActiveWriter($viewer, $repository);
365 // Wait a little longer before the next message we print.
366 $step_wait = $step_wait +
0.5;
367 $step_wait = min($step_wait, 3);
370 $waited = (PhabricatorTime
::getNow() - $start);
374 'Acquired write lock after %s second(s).',
375 new PhutilNumber($waited)));
379 'Acquired write lock immediately.'));
381 } catch (PhutilLockException
$ex) {
382 throw new PhutilProxyException(
384 'Failed to acquire write lock after waiting %s second(s). You '.
385 'may be able to retry later. (%s)',
386 new PhutilNumber($lock_wait),
391 $versions = PhabricatorRepositoryWorkingCopyVersion
::loadVersions(
393 foreach ($versions as $version) {
394 if (!$version->getIsWriting()) {
400 'An previous write to this repository was interrupted; refusing '.
401 'new writes. This issue requires operator intervention to resolve, '.
402 'see "Write Interruptions" in the "Cluster: Repositories" in the '.
403 'documentation for instructions.'));
406 $read_wait_start = microtime(true);
408 $max_version = $this->synchronizeWorkingCopyBeforeRead();
409 } catch (Exception
$ex) {
410 $write_lock->unlock();
413 $read_wait_end = microtime(true);
416 $hash = Filesystem
::readRandomCharacters(12);
417 $this->clusterWriteOwner
= "{$pid}.{$hash}";
419 PhabricatorRepositoryWorkingCopyVersion
::willWrite(
424 'userPHID' => $this->getEffectiveActingAsPHID(),
425 'epoch' => PhabricatorTime
::getNow(),
426 'devicePHID' => $device_phid,
428 $this->clusterWriteOwner
);
430 $this->clusterWriteVersion
= $max_version;
431 $this->clusterWriteLock
= $write_lock;
433 $write_wait = ($write_wait_end - $write_wait_start);
434 $read_wait = ($read_wait_end - $read_wait_start);
436 $log = $this->logger
;
438 $log->writeClusterEngineLogProperty('writeWait', $write_wait);
439 $log->writeClusterEngineLogProperty('readWait', $read_wait);
444 public function synchronizeWorkingCopyAfterDiscovery($new_version) {
445 if (!$this->shouldEnableSynchronization(true)) {
449 $repository = $this->getRepository();
450 $repository_phid = $repository->getPHID();
451 if ($repository->isHosted()) {
455 $device = AlmanacKeys
::getLiveDevice();
456 $device_phid = $device->getPHID();
458 // NOTE: We are not holding a lock here because this method is only called
459 // from PhabricatorRepositoryDiscoveryEngine, which already holds a device
460 // lock. Even if we do race here and record an older version, the
461 // consequences are mild: we only do extra work to correct it later.
463 $versions = PhabricatorRepositoryWorkingCopyVersion
::loadVersions(
465 $versions = mpull($versions, null, 'getDevicePHID');
467 $this_version = idx($versions, $device_phid);
469 $this_version = (int)$this_version->getRepositoryVersion();
471 $this_version = null;
474 if (($this_version === null) ||
($new_version > $this_version)) {
475 PhabricatorRepositoryWorkingCopyVersion
::updateVersion(
486 public function synchronizeWorkingCopyAfterWrite() {
487 if (!$this->shouldEnableSynchronization(true)) {
491 if (!$this->clusterWriteLock
) {
494 'Trying to synchronize after write, but not holding a write '.
498 $repository = $this->getRepository();
499 $repository_phid = $repository->getPHID();
501 $device = AlmanacKeys
::getLiveDevice();
502 $device_phid = $device->getPHID();
504 // It is possible that we've lost the global lock while receiving the push.
505 // For example, the master database may have been restarted between the
506 // time we acquired the global lock and now, when the push has finished.
508 // We wrote a durable lock while we were holding the the global lock,
509 // essentially upgrading our lock. We can still safely release this upgraded
510 // lock even if we're no longer holding the global lock.
512 // If we fail to release the lock, the repository will be frozen until
513 // an operator can figure out what happened, so we try pretty hard to
514 // reconnect to the database and release the lock.
516 $now = PhabricatorTime
::getNow();
517 $duration = phutil_units('5 minutes in seconds');
518 $try_until = $now +
$duration;
520 $did_release = false;
521 $already_failed = false;
522 while (PhabricatorTime
::getNow() <= $try_until) {
524 // NOTE: This means we're still bumping the version when pushes fail. We
525 // could select only un-rejected events instead to bump a little less
528 $new_log = id(new PhabricatorRepositoryPushEventQuery())
529 ->setViewer(PhabricatorUser
::getOmnipotentUser())
530 ->withRepositoryPHIDs(array($repository_phid))
534 $old_version = $this->clusterWriteVersion
;
536 $new_version = $new_log->getID();
538 $new_version = $old_version;
541 PhabricatorRepositoryWorkingCopyVersion
::didWrite(
544 $this->clusterWriteVersion
,
546 $this->clusterWriteOwner
);
549 } catch (AphrontConnectionQueryException
$ex) {
550 $connection_exception = $ex;
551 } catch (AphrontConnectionLostQueryException
$ex) {
552 $connection_exception = $ex;
555 if (!$already_failed) {
556 $already_failed = true;
558 pht('CRITICAL. Failed to release cluster write lock!'));
562 'The connection to the master database was lost while receiving '.
567 'This process will spend %s more second(s) attempting to '.
568 'recover, then give up.',
569 new PhutilNumber($duration)));
576 if ($already_failed) {
578 pht('RECOVERED. Link to master database was restored.'));
580 $this->logLine(pht('Released cluster write lock.'));
584 'Failed to reconnect to master database and release held write '.
585 'lock ("%s") on device "%s" for repository "%s" after trying '.
586 'for %s seconds(s). This repository will be frozen.',
587 $this->clusterWriteOwner
,
589 $this->getDisplayName(),
590 new PhutilNumber($duration)));
593 // We can continue even if we've lost this lock, everything is still
596 $this->clusterWriteLock
->unlock();
597 } catch (Exception
$ex) {
601 $this->clusterWriteLock
= null;
602 $this->clusterWriteOwner
= null;
606 /* -( Internals )---------------------------------------------------------- */
612 private function shouldEnableSynchronization($require_device) {
613 $repository = $this->getRepository();
615 $service_phid = $repository->getAlmanacServicePHID();
616 if (!$service_phid) {
620 if (!$repository->supportsSynchronization()) {
624 if ($require_device) {
625 $device = AlmanacKeys
::getLiveDevice();
638 private function synchronizeWorkingCopyFromRemote() {
639 $repository = $this->getRepository();
640 $device = AlmanacKeys
::getLiveDevice();
642 $local_path = $repository->getLocalPath();
643 $fetch_uri = $repository->getRemoteURIEnvelope();
645 if ($repository->isGit()) {
646 $this->requireWorkingCopy();
649 'fetch --prune -- %P %s',
654 throw new Exception(pht('Remote sync only supported for git!'));
657 $future = DiffusionCommandEngine
::newCommandEngine($repository)
659 ->setSudoAsDaemon(true)
660 ->setCredentialPHID($repository->getCredentialPHID())
661 ->setURI($repository->getRemoteURIObject())
664 $future->setCWD($local_path);
668 } catch (Exception
$ex) {
671 'Synchronization of "%s" from remote failed: %s',
682 private function synchronizeWorkingCopyFromDevices(
687 $repository = $this->getRepository();
689 $service = $repository->loadAlmanacService();
691 throw new Exception(pht('Failed to load repository cluster service.'));
694 $device_map = array_fuse($device_phids);
695 $bindings = $service->getActiveBindings();
697 $fetchable = array();
698 foreach ($bindings as $binding) {
699 // We can't fetch from nodes which don't have the newest version.
700 $device_phid = $binding->getDevicePHID();
701 if (empty($device_map[$device_phid])) {
705 // TODO: For now, only fetch over SSH. We could support fetching over
707 if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') {
711 $fetchable[] = $binding;
717 'Leader lost: no up-to-date nodes in repository cluster are '.
721 // If we can synchronize from multiple sources, choose one at random.
725 foreach ($fetchable as $binding) {
727 $this->synchronizeWorkingCopyFromBinding(
733 } catch (Exception
$ex) {
747 private function synchronizeWorkingCopyFromBinding(
748 AlmanacBinding
$binding,
752 $repository = $this->getRepository();
753 $device = AlmanacKeys
::getLiveDevice();
757 'Synchronizing this device ("%s") from cluster leader ("%s").',
759 $binding->getDevice()->getName()));
761 $fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding);
762 $local_path = $repository->getLocalPath();
764 if ($repository->isGit()) {
765 $this->requireWorkingCopy();
768 'fetch --prune -- %s %s',
773 throw new Exception(pht('Binding sync only supported for git!'));
776 $future = DiffusionCommandEngine
::newCommandEngine($repository)
778 ->setConnectAsDevice(true)
779 ->setSudoAsDaemon(true)
783 $future->setCWD($local_path);
785 $log = PhabricatorRepositorySyncEvent
::initializeNewEvent()
786 ->setRepositoryPHID($repository->getPHID())
787 ->setEpoch(PhabricatorTime
::getNow())
788 ->setDevicePHID($device->getPHID())
789 ->setFromDevicePHID($binding->getDevice()->getPHID())
790 ->setDeviceVersion($local_version)
791 ->setFromDeviceVersion($remote_version);
793 $sync_start = microtime(true);
797 } catch (Exception
$ex) {
798 $log->setSyncWait(phutil_microseconds_since($sync_start));
800 if ($ex instanceof CommandException
) {
801 if ($future->getWasKilledByTimeout()) {
802 $result_type = PhabricatorRepositorySyncEvent
::RESULT_TIMEOUT
;
804 $result_type = PhabricatorRepositorySyncEvent
::RESULT_ERROR
;
808 ->setResultCode($ex->getError())
809 ->setResultType($result_type)
810 ->setProperty('stdout', $ex->getStdout())
811 ->setProperty('stderr', $ex->getStderr());
815 ->setResultType(PhabricatorRepositorySyncEvent
::RESULT_EXCEPTION
)
816 ->setProperty('message', $ex->getMessage());
823 'Synchronization of "%s" from leader "%s" failed: %s',
825 $binding->getDevice()->getName(),
832 ->setSyncWait(phutil_microseconds_since($sync_start))
834 ->setResultType(PhabricatorRepositorySyncEvent
::RESULT_SYNC
)
842 private function logLine($message) {
843 return $this->logText("# {$message}\n");
850 private function logText($message) {
851 $log = $this->logger
;
853 $log->writeClusterEngineLogMessage($message);
858 private function requireWorkingCopy() {
859 $repository = $this->getRepository();
860 $local_path = $repository->getLocalPath();
862 if (!Filesystem
::pathExists($local_path)) {
863 $device = AlmanacKeys
::getLiveDevice();
867 'Repository "%s" does not have a working copy on this device '.
868 'yet, so it can not be synchronized. Wait for the daemons to '.
869 'construct one or run `bin/repository update %s` on this host '.
870 '("%s") to build it explicitly.',
871 $repository->getDisplayName(),
872 $repository->getMonogram(),
873 $device->getName()));
877 private function logActiveWriter(
878 PhabricatorUser
$viewer,
879 PhabricatorRepository
$repository) {
881 $writer = PhabricatorRepositoryWorkingCopyVersion
::loadWriter(
882 $repository->getPHID());
884 $this->logLine(pht('Waiting on another user to finish writing...'));
888 $user_phid = $writer->getWriteProperty('userPHID');
889 $device_phid = $writer->getWriteProperty('devicePHID');
890 $epoch = $writer->getWriteProperty('epoch');
892 $phids = array($user_phid, $device_phid);
893 $handles = $viewer->loadHandles($phids);
895 $duration = (PhabricatorTime
::getNow() - $epoch) +
1;
899 'Waiting for %s to finish writing (on device "%s" for %ss)...',
900 $handles[$user_phid]->getName(),
901 $handles[$device_phid]->getName(),
902 new PhutilNumber($duration)));
905 public function newMaintenanceEvent() {
906 $viewer = $this->getViewer();
907 $repository = $this->getRepository();
908 $now = PhabricatorTime
::getNow();
910 $event = PhabricatorRepositoryPushEvent
::initializeNewEvent($viewer)
911 ->setRepositoryPHID($repository->getPHID())
913 ->setPusherPHID($this->getEffectiveActingAsPHID())
914 ->setRejectCode(PhabricatorRepositoryPushLog
::REJECT_ACCEPT
);
919 public function newMaintenanceLog() {
920 $viewer = $this->getViewer();
921 $repository = $this->getRepository();
922 $now = PhabricatorTime
::getNow();
924 $device = AlmanacKeys
::getLiveDevice();
926 $device_phid = $device->getPHID();
931 return PhabricatorRepositoryPushLog
::initializeNewLog($viewer)
932 ->setDevicePHID($device_phid)
933 ->setRepositoryPHID($repository->getPHID())
934 ->attachRepository($repository)
936 ->setPusherPHID($this->getEffectiveActingAsPHID())
937 ->setChangeFlags(PhabricatorRepositoryPushLog
::CHANGEFLAG_MAINTENANCE
)
938 ->setRefType(PhabricatorRepositoryPushLog
::REFTYPE_MAINTENANCE
)