4 * Run pull commands on local working copies to keep them up to date. This
5 * daemon handles all repository types.
7 * By default, the daemon pulls **every** repository. If you want it to be
8 * responsible for only some repositories, you can launch it with a list of
11 * ./phd launch repositorypulllocal -- X Q Z
13 * You can also launch a daemon which is responsible for all //but// one or
16 * ./phd launch repositorypulllocal -- --not A --not B
18 * If you have a very large number of repositories and some aren't being pulled
19 * as frequently as you'd like, you can either change the pull frequency of
20 * the less-important repositories to a larger number (so the daemon will skip
21 * them more often) or launch one daemon for all the less-important repositories
22 * and one for the more important repositories (or one for each more important
25 * @task pull Pulling Repositories
27 final class PhabricatorRepositoryPullLocalDaemon
28 extends PhabricatorDaemon
{
30 private $statusMessageCursor = 0;
32 /* -( Pulling Repositories )----------------------------------------------- */
38 protected function run() {
39 $argv = $this->getArgv();
40 array_unshift($argv, __CLASS__
);
41 $args = new PhutilArgumentParser($argv);
45 'name' => 'no-discovery',
46 'help' => pht('Pull only, without discovering commits.'),
50 'param' => 'repository',
52 'help' => pht('Do not pull __repository__.'),
55 'name' => 'repositories',
57 'help' => pht('Pull specific __repositories__ instead of all.'),
61 $no_discovery = $args->getArg('no-discovery');
62 $include = $args->getArg('repositories');
63 $exclude = $args->getArg('not');
65 // Each repository has an individual pull frequency; after we pull it,
66 // wait that long to pull it again. When we start up, try to pull everything
68 $retry_after = array();
71 $max_sleep = phutil_units('5 minutes in seconds');
76 $future_pool = new FuturePool();
78 $future_pool->getIteratorTemplate()
79 ->setUpdateInterval($min_sleep);
81 $sync_wait = phutil_units('2 minutes in seconds');
84 while (!$this->shouldExit()) {
85 PhabricatorCaches
::destroyRequestCache();
86 $device = AlmanacKeys
::getLiveDevice();
88 $pullable = $this->loadPullableRepositories($include, $exclude, $device);
90 // If any repositories have the NEEDS_UPDATE flag set, pull them
91 // as soon as possible.
92 $need_update_messages = $this->loadRepositoryUpdateMessages(true);
93 foreach ($need_update_messages as $message) {
94 $repo = idx($pullable, $message->getRepositoryID());
101 'Got an update message for repository "%s"!',
102 $repo->getMonogram()));
104 $retry_after[$message->getRepositoryID()] = time();
108 $unsynchronized = $this->loadUnsynchronizedRepositories($device);
109 $now = PhabricatorTime
::getNow();
110 foreach ($unsynchronized as $repository) {
111 $id = $repository->getID();
115 'Cluster repository ("%s") is out of sync on this node ("%s").',
116 $repository->getDisplayName(),
117 $device->getName()));
119 // Don't let out-of-sync conditions trigger updates too frequently,
120 // since we don't want to get trapped in a death spiral if sync is
122 $sync_at = idx($last_sync, $id, 0);
123 $wait_duration = ($now - $sync_at);
124 if ($wait_duration < $sync_wait) {
127 'Skipping forced out-of-sync update because the last update '.
128 'was too recent (%s seconds ago).',
133 $last_sync[$id] = $now;
134 $retry_after[$id] = $now;
138 // If any repositories were deleted, remove them from the retry timer map
139 // so we don't end up with a retry timer that never gets updated and
140 // causes us to sleep for the minimum amount of time.
141 $retry_after = array_select_keys(
143 array_keys($pullable));
145 // Figure out which repositories we need to queue for an update.
146 foreach ($pullable as $id => $repository) {
147 $now = PhabricatorTime
::getNow();
148 $display_name = $repository->getDisplayName();
150 if (isset($futures[$id])) {
153 'Repository "%s" is currently updating.',
158 if (isset($queue[$id])) {
161 'Repository "%s" is already queued.',
166 $after = idx($retry_after, $id);
168 $smart_wait = $repository->loadUpdateInterval($min_sleep);
169 $last_update = $this->loadLastUpdate($repository);
171 $after = $last_update +
$smart_wait;
172 $retry_after[$id] = $after;
176 'Scheduling repository "%s" with an update window of %s '.
177 'second(s). Last update was %s second(s) ago.',
179 new PhutilNumber($smart_wait),
180 new PhutilNumber($now - $last_update)));
183 if ($after > time()) {
186 'Repository "%s" is not due for an update for %s second(s).',
188 new PhutilNumber($after - $now)));
194 'Scheduling repository "%s" for an update (%s seconds overdue).',
196 new PhutilNumber($now - $after)));
198 $queue[$id] = $after;
201 // Process repositories in the order they became candidates for updates.
204 // Dequeue repositories until we hit maximum parallelism.
205 while ($queue && (count($futures) < $max_futures)) {
206 foreach ($queue as $id => $time) {
207 $repository = idx($pullable, $id);
210 pht('Repository %s is no longer pullable; skipping.', $id));
215 $display_name = $repository->getDisplayName();
218 'Starting update for repository "%s".',
223 $future = $this->buildUpdateFuture(
227 $futures[$id] = $future->getFutureKey();
229 $future_pool->addFuture($future);
237 'Not enough process slots to schedule the other %s '.
238 'repository(s) for updates yet.',
239 phutil_count($queue)));
242 if ($future_pool->hasFutures()) {
243 while ($future_pool->hasFutures()) {
244 $future = $future_pool->resolve();
246 $this->stillWorking();
248 if ($future === null) {
249 $this->log(pht('Waiting for updates to complete...'));
251 if ($this->loadRepositoryUpdateMessages()) {
252 $this->log(pht('Interrupted by pending updates!'));
259 $future_key = $future->getFutureKey();
260 $repository_id = null;
261 foreach ($futures as $id => $key) {
262 if ($key === $future_key) {
263 $repository_id = $id;
264 unset($futures[$id]);
269 $retry_after[$repository_id] = $this->resolveUpdateFuture(
270 $pullable[$repository_id],
274 // We have a free slot now, so go try to fill it.
278 // Jump back into prioritization if we had any futures to deal with.
282 $should_hibernate = $this->waitForUpdates($max_sleep, $retry_after);
283 if ($should_hibernate) {
294 private function buildUpdateFuture(
295 PhabricatorRepository
$repository,
298 $bin = dirname(phutil_get_library_root('phabricator')).'/bin/repository';
302 $flags[] = '--no-discovery';
305 $monogram = $repository->getMonogram();
306 $future = new ExecFuture('%s update %Ls -- %s', $bin, $flags, $monogram);
308 // Sometimes, the underlying VCS commands will hang indefinitely. We've
309 // observed this occasionally with GitHub, and other users have observed
310 // it with other VCS servers.
312 // To limit the damage this can cause, kill the update out after a
313 // reasonable amount of time, under the assumption that it has hung.
315 // Since it's hard to know what a "reasonable" amount of time is given that
316 // users may be downloading a repository full of pirated movies over a
317 // potato, these limits are fairly generous. Repositories exceeding these
318 // limits can be manually pulled with `bin/repository update X`, which can
319 // just run for as long as it wants.
321 if ($repository->isImporting()) {
322 $timeout = phutil_units('4 hours in seconds');
324 $timeout = phutil_units('15 minutes in seconds');
327 $future->setTimeout($timeout);
329 // The default TERM inherited by this process is "unknown", which causes PHP
330 // to produce a warning upon startup. Override it to squash this output to
332 $future->updateEnv('TERM', 'dumb');
339 * Check for repositories that should be updated immediately.
341 * With the `$consume` flag, an internal cursor will also be incremented so
342 * that these messages are not returned by subsequent calls.
344 * @param bool Pass `true` to consume these messages, so the process will
345 * not see them again.
346 * @return list<wild> Pending update messages.
350 private function loadRepositoryUpdateMessages($consume = false) {
351 $type_need_update = PhabricatorRepositoryStatusMessage
::TYPE_NEEDS_UPDATE
;
352 $messages = id(new PhabricatorRepositoryStatusMessage())->loadAllWhere(
353 'statusType = %s AND id > %d',
355 $this->statusMessageCursor
);
357 // Keep track of messages we've seen so that we don't load them again.
358 // If we reload messages, we can get stuck a loop if we have a failing
359 // repository: we update immediately in response to the message, but do
360 // not clear the message because the update does not succeed. We then
361 // immediately retry. Instead, messages are only permitted to trigger
362 // an immediate update once.
365 foreach ($messages as $message) {
366 $this->statusMessageCursor
= max(
367 $this->statusMessageCursor
,
379 private function loadLastUpdate(PhabricatorRepository
$repository) {
380 $table = new PhabricatorRepositoryStatusMessage();
381 $conn = $table->establishConnection('r');
383 $epoch = queryfx_one(
385 'SELECT MAX(epoch) last_update FROM %T
386 WHERE repositoryID = %d
387 AND statusType IN (%Ls)',
388 $table->getTableName(),
389 $repository->getID(),
391 PhabricatorRepositoryStatusMessage
::TYPE_INIT
,
392 PhabricatorRepositoryStatusMessage
::TYPE_FETCH
,
396 return (int)$epoch['last_update'];
399 return PhabricatorTime
::getNow();
405 private function loadPullableRepositories(
408 AlmanacDevice
$device = null) {
410 $query = id(new PhabricatorRepositoryQuery())
411 ->setViewer($this->getViewer());
414 $query->withIdentifiers($include);
417 $repositories = $query->execute();
418 $repositories = mpull($repositories, null, 'getPHID');
421 $map = $query->getIdentifierMap();
422 foreach ($include as $identifier) {
423 if (empty($map[$identifier])) {
426 'No repository "%s" exists!',
433 $xquery = id(new PhabricatorRepositoryQuery())
434 ->setViewer($this->getViewer())
435 ->withIdentifiers($exclude);
437 $excluded_repos = $xquery->execute();
438 $xmap = $xquery->getIdentifierMap();
440 foreach ($exclude as $identifier) {
441 if (empty($xmap[$identifier])) {
444 'No repository "%s" exists!',
449 foreach ($excluded_repos as $excluded_repo) {
450 unset($repositories[$excluded_repo->getPHID()]);
454 foreach ($repositories as $key => $repository) {
455 if (!$repository->isTracked()) {
456 unset($repositories[$key]);
460 $viewer = $this->getViewer();
462 $filter = id(new DiffusionLocalRepositoryFilter())
465 ->setRepositories($repositories);
467 $repositories = $filter->execute();
469 foreach ($filter->getRejectionReasons() as $reason) {
473 // Shuffle the repositories, then re-key the array since shuffle()
474 // discards keys. This is mostly for startup, we'll use soft priorities
476 shuffle($repositories);
477 $repositories = mpull($repositories, null, 'getID');
479 return $repositories;
486 private function resolveUpdateFuture(
487 PhabricatorRepository
$repository,
491 $display_name = $repository->getDisplayName();
493 $this->log(pht('Resolving update for "%s".', $display_name));
496 list($stdout, $stderr) = $future->resolvex();
497 } catch (Exception
$ex) {
498 $proxy = new PhutilProxyException(
500 'Error while updating the "%s" repository.',
505 $smart_wait = $repository->loadUpdateInterval($min_sleep);
506 return PhabricatorTime
::getNow() +
$smart_wait;
509 if (strlen($stderr)) {
511 'Unexpected output while updating repository "%s": %s',
517 $smart_wait = $repository->loadUpdateInterval($min_sleep);
521 'Based on activity in repository "%s", considering a wait of %s '.
522 'seconds before update.',
524 new PhutilNumber($smart_wait)));
526 return PhabricatorTime
::getNow() +
$smart_wait;
532 * Sleep for a short period of time, waiting for update messages from the
537 private function waitForUpdates($min_sleep, array $retry_after) {
539 pht('No repositories need updates right now, sleeping...'));
541 $sleep_until = time() +
$min_sleep;
543 $sleep_until = min($sleep_until, min($retry_after));
546 while (($sleep_until - time()) > 0) {
547 $sleep_duration = ($sleep_until - time());
549 if ($this->shouldHibernate($sleep_duration)) {
555 'Sleeping for %s more second(s)...',
556 new PhutilNumber($sleep_duration)));
560 if ($this->shouldExit()) {
561 $this->log(pht('Awakened from sleep by graceful shutdown!'));
565 if ($this->loadRepositoryUpdateMessages()) {
566 $this->log(pht('Awakened from sleep by pending updates!'));
574 private function loadUnsynchronizedRepositories(AlmanacDevice
$device) {
575 $viewer = $this->getViewer();
576 $table = new PhabricatorRepositoryWorkingCopyVersion();
577 $conn = $table->establishConnection('r');
579 $our_versions = queryfx_all(
581 'SELECT repositoryPHID, repositoryVersion FROM %R WHERE devicePHID = %s',
584 $our_versions = ipull($our_versions, 'repositoryVersion', 'repositoryPHID');
586 $max_versions = queryfx_all(
588 'SELECT repositoryPHID, MAX(repositoryVersion) maxVersion FROM %R
589 GROUP BY repositoryPHID',
591 $max_versions = ipull($max_versions, 'maxVersion', 'repositoryPHID');
593 $unsynchronized_phids = array();
594 foreach ($max_versions as $repository_phid => $max_version) {
595 $our_version = idx($our_versions, $repository_phid);
596 if (($our_version === null) ||
($our_version < $max_version)) {
597 $unsynchronized_phids[] = $repository_phid;
601 if (!$unsynchronized_phids) {
605 return id(new PhabricatorRepositoryQuery())
607 ->withPHIDs($unsynchronized_phids)