Correct a parameter order swap in "diffusion.historyquery" for Mercurial
[phabricator.git] / src / applications / repository / daemon / PhabricatorRepositoryPullLocalDaemon.php
blob48eabf1545efd8590c30cbd55828c63eabd3d9c8
1 <?php
3 /**
4 * Run pull commands on local working copies to keep them up to date. This
5 * daemon handles all repository types.
7 * By default, the daemon pulls **every** repository. If you want it to be
8 * responsible for only some repositories, you can launch it with a list of
9 * repositories:
11 * ./phd launch repositorypulllocal -- X Q Z
13 * You can also launch a daemon which is responsible for all //but// one or
14 * more repositories:
16 * ./phd launch repositorypulllocal -- --not A --not B
18 * If you have a very large number of repositories and some aren't being pulled
19 * as frequently as you'd like, you can either change the pull frequency of
20 * the less-important repositories to a larger number (so the daemon will skip
21 * them more often) or launch one daemon for all the less-important repositories
22 * and one for the more important repositories (or one for each more important
23 * repository).
25 * @task pull Pulling Repositories
27 final class PhabricatorRepositoryPullLocalDaemon
28 extends PhabricatorDaemon {
30 private $statusMessageCursor = 0;
32 /* -( Pulling Repositories )----------------------------------------------- */
35 /**
36 * @task pull
38 protected function run() {
39 $argv = $this->getArgv();
40 array_unshift($argv, __CLASS__);
41 $args = new PhutilArgumentParser($argv);
42 $args->parse(
43 array(
44 array(
45 'name' => 'no-discovery',
46 'help' => pht('Pull only, without discovering commits.'),
48 array(
49 'name' => 'not',
50 'param' => 'repository',
51 'repeat' => true,
52 'help' => pht('Do not pull __repository__.'),
54 array(
55 'name' => 'repositories',
56 'wildcard' => true,
57 'help' => pht('Pull specific __repositories__ instead of all.'),
59 ));
61 $no_discovery = $args->getArg('no-discovery');
62 $include = $args->getArg('repositories');
63 $exclude = $args->getArg('not');
65 // Each repository has an individual pull frequency; after we pull it,
66 // wait that long to pull it again. When we start up, try to pull everything
67 // serially.
68 $retry_after = array();
70 $min_sleep = 15;
71 $max_sleep = phutil_units('5 minutes in seconds');
72 $max_futures = 4;
73 $futures = array();
74 $queue = array();
76 $future_pool = new FuturePool();
78 $future_pool->getIteratorTemplate()
79 ->setUpdateInterval($min_sleep);
81 $sync_wait = phutil_units('2 minutes in seconds');
82 $last_sync = array();
84 while (!$this->shouldExit()) {
85 PhabricatorCaches::destroyRequestCache();
86 $device = AlmanacKeys::getLiveDevice();
88 $pullable = $this->loadPullableRepositories($include, $exclude, $device);
90 // If any repositories have the NEEDS_UPDATE flag set, pull them
91 // as soon as possible.
92 $need_update_messages = $this->loadRepositoryUpdateMessages(true);
93 foreach ($need_update_messages as $message) {
94 $repo = idx($pullable, $message->getRepositoryID());
95 if (!$repo) {
96 continue;
99 $this->log(
100 pht(
101 'Got an update message for repository "%s"!',
102 $repo->getMonogram()));
104 $retry_after[$message->getRepositoryID()] = time();
107 if ($device) {
108 $unsynchronized = $this->loadUnsynchronizedRepositories($device);
109 $now = PhabricatorTime::getNow();
110 foreach ($unsynchronized as $repository) {
111 $id = $repository->getID();
113 $this->log(
114 pht(
115 'Cluster repository ("%s") is out of sync on this node ("%s").',
116 $repository->getDisplayName(),
117 $device->getName()));
119 // Don't let out-of-sync conditions trigger updates too frequently,
120 // since we don't want to get trapped in a death spiral if sync is
121 // failing.
122 $sync_at = idx($last_sync, $id, 0);
123 $wait_duration = ($now - $sync_at);
124 if ($wait_duration < $sync_wait) {
125 $this->log(
126 pht(
127 'Skipping forced out-of-sync update because the last update '.
128 'was too recent (%s seconds ago).',
129 $wait_duration));
130 continue;
133 $last_sync[$id] = $now;
134 $retry_after[$id] = $now;
138 // If any repositories were deleted, remove them from the retry timer map
139 // so we don't end up with a retry timer that never gets updated and
140 // causes us to sleep for the minimum amount of time.
141 $retry_after = array_select_keys(
142 $retry_after,
143 array_keys($pullable));
145 // Figure out which repositories we need to queue for an update.
146 foreach ($pullable as $id => $repository) {
147 $now = PhabricatorTime::getNow();
148 $display_name = $repository->getDisplayName();
150 if (isset($futures[$id])) {
151 $this->log(
152 pht(
153 'Repository "%s" is currently updating.',
154 $display_name));
155 continue;
158 if (isset($queue[$id])) {
159 $this->log(
160 pht(
161 'Repository "%s" is already queued.',
162 $display_name));
163 continue;
166 $after = idx($retry_after, $id);
167 if (!$after) {
168 $smart_wait = $repository->loadUpdateInterval($min_sleep);
169 $last_update = $this->loadLastUpdate($repository);
171 $after = $last_update + $smart_wait;
172 $retry_after[$id] = $after;
174 $this->log(
175 pht(
176 'Scheduling repository "%s" with an update window of %s '.
177 'second(s). Last update was %s second(s) ago.',
178 $display_name,
179 new PhutilNumber($smart_wait),
180 new PhutilNumber($now - $last_update)));
183 if ($after > time()) {
184 $this->log(
185 pht(
186 'Repository "%s" is not due for an update for %s second(s).',
187 $display_name,
188 new PhutilNumber($after - $now)));
189 continue;
192 $this->log(
193 pht(
194 'Scheduling repository "%s" for an update (%s seconds overdue).',
195 $display_name,
196 new PhutilNumber($now - $after)));
198 $queue[$id] = $after;
201 // Process repositories in the order they became candidates for updates.
202 asort($queue);
204 // Dequeue repositories until we hit maximum parallelism.
205 while ($queue && (count($futures) < $max_futures)) {
206 foreach ($queue as $id => $time) {
207 $repository = idx($pullable, $id);
208 if (!$repository) {
209 $this->log(
210 pht('Repository %s is no longer pullable; skipping.', $id));
211 unset($queue[$id]);
212 continue;
215 $display_name = $repository->getDisplayName();
216 $this->log(
217 pht(
218 'Starting update for repository "%s".',
219 $display_name));
221 unset($queue[$id]);
223 $future = $this->buildUpdateFuture(
224 $repository,
225 $no_discovery);
227 $futures[$id] = $future->getFutureKey();
229 $future_pool->addFuture($future);
230 break;
234 if ($queue) {
235 $this->log(
236 pht(
237 'Not enough process slots to schedule the other %s '.
238 'repository(s) for updates yet.',
239 phutil_count($queue)));
242 if ($future_pool->hasFutures()) {
243 while ($future_pool->hasFutures()) {
244 $future = $future_pool->resolve();
246 $this->stillWorking();
248 if ($future === null) {
249 $this->log(pht('Waiting for updates to complete...'));
251 if ($this->loadRepositoryUpdateMessages()) {
252 $this->log(pht('Interrupted by pending updates!'));
253 break;
256 continue;
259 $future_key = $future->getFutureKey();
260 $repository_id = null;
261 foreach ($futures as $id => $key) {
262 if ($key === $future_key) {
263 $repository_id = $id;
264 unset($futures[$id]);
265 break;
269 $retry_after[$repository_id] = $this->resolveUpdateFuture(
270 $pullable[$repository_id],
271 $future,
272 $min_sleep);
274 // We have a free slot now, so go try to fill it.
275 break;
278 // Jump back into prioritization if we had any futures to deal with.
279 continue;
282 $should_hibernate = $this->waitForUpdates($max_sleep, $retry_after);
283 if ($should_hibernate) {
284 break;
292 * @task pull
294 private function buildUpdateFuture(
295 PhabricatorRepository $repository,
296 $no_discovery) {
298 $bin = dirname(phutil_get_library_root('phabricator')).'/bin/repository';
300 $flags = array();
301 if ($no_discovery) {
302 $flags[] = '--no-discovery';
305 $monogram = $repository->getMonogram();
306 $future = new ExecFuture('%s update %Ls -- %s', $bin, $flags, $monogram);
308 // Sometimes, the underlying VCS commands will hang indefinitely. We've
309 // observed this occasionally with GitHub, and other users have observed
310 // it with other VCS servers.
312 // To limit the damage this can cause, kill the update out after a
313 // reasonable amount of time, under the assumption that it has hung.
315 // Since it's hard to know what a "reasonable" amount of time is given that
316 // users may be downloading a repository full of pirated movies over a
317 // potato, these limits are fairly generous. Repositories exceeding these
318 // limits can be manually pulled with `bin/repository update X`, which can
319 // just run for as long as it wants.
321 if ($repository->isImporting()) {
322 $timeout = phutil_units('4 hours in seconds');
323 } else {
324 $timeout = phutil_units('15 minutes in seconds');
327 $future->setTimeout($timeout);
329 // The default TERM inherited by this process is "unknown", which causes PHP
330 // to produce a warning upon startup. Override it to squash this output to
331 // STDERR.
332 $future->updateEnv('TERM', 'dumb');
334 return $future;
339 * Check for repositories that should be updated immediately.
341 * With the `$consume` flag, an internal cursor will also be incremented so
342 * that these messages are not returned by subsequent calls.
344 * @param bool Pass `true` to consume these messages, so the process will
345 * not see them again.
346 * @return list<wild> Pending update messages.
348 * @task pull
350 private function loadRepositoryUpdateMessages($consume = false) {
351 $type_need_update = PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE;
352 $messages = id(new PhabricatorRepositoryStatusMessage())->loadAllWhere(
353 'statusType = %s AND id > %d',
354 $type_need_update,
355 $this->statusMessageCursor);
357 // Keep track of messages we've seen so that we don't load them again.
358 // If we reload messages, we can get stuck a loop if we have a failing
359 // repository: we update immediately in response to the message, but do
360 // not clear the message because the update does not succeed. We then
361 // immediately retry. Instead, messages are only permitted to trigger
362 // an immediate update once.
364 if ($consume) {
365 foreach ($messages as $message) {
366 $this->statusMessageCursor = max(
367 $this->statusMessageCursor,
368 $message->getID());
372 return $messages;
377 * @task pull
379 private function loadLastUpdate(PhabricatorRepository $repository) {
380 $table = new PhabricatorRepositoryStatusMessage();
381 $conn = $table->establishConnection('r');
383 $epoch = queryfx_one(
384 $conn,
385 'SELECT MAX(epoch) last_update FROM %T
386 WHERE repositoryID = %d
387 AND statusType IN (%Ls)',
388 $table->getTableName(),
389 $repository->getID(),
390 array(
391 PhabricatorRepositoryStatusMessage::TYPE_INIT,
392 PhabricatorRepositoryStatusMessage::TYPE_FETCH,
395 if ($epoch) {
396 return (int)$epoch['last_update'];
399 return PhabricatorTime::getNow();
403 * @task pull
405 private function loadPullableRepositories(
406 array $include,
407 array $exclude,
408 AlmanacDevice $device = null) {
410 $query = id(new PhabricatorRepositoryQuery())
411 ->setViewer($this->getViewer());
413 if ($include) {
414 $query->withIdentifiers($include);
417 $repositories = $query->execute();
418 $repositories = mpull($repositories, null, 'getPHID');
420 if ($include) {
421 $map = $query->getIdentifierMap();
422 foreach ($include as $identifier) {
423 if (empty($map[$identifier])) {
424 throw new Exception(
425 pht(
426 'No repository "%s" exists!',
427 $identifier));
432 if ($exclude) {
433 $xquery = id(new PhabricatorRepositoryQuery())
434 ->setViewer($this->getViewer())
435 ->withIdentifiers($exclude);
437 $excluded_repos = $xquery->execute();
438 $xmap = $xquery->getIdentifierMap();
440 foreach ($exclude as $identifier) {
441 if (empty($xmap[$identifier])) {
442 throw new Exception(
443 pht(
444 'No repository "%s" exists!',
445 $identifier));
449 foreach ($excluded_repos as $excluded_repo) {
450 unset($repositories[$excluded_repo->getPHID()]);
454 foreach ($repositories as $key => $repository) {
455 if (!$repository->isTracked()) {
456 unset($repositories[$key]);
460 $viewer = $this->getViewer();
462 $filter = id(new DiffusionLocalRepositoryFilter())
463 ->setViewer($viewer)
464 ->setDevice($device)
465 ->setRepositories($repositories);
467 $repositories = $filter->execute();
469 foreach ($filter->getRejectionReasons() as $reason) {
470 $this->log($reason);
473 // Shuffle the repositories, then re-key the array since shuffle()
474 // discards keys. This is mostly for startup, we'll use soft priorities
475 // later.
476 shuffle($repositories);
477 $repositories = mpull($repositories, null, 'getID');
479 return $repositories;
484 * @task pull
486 private function resolveUpdateFuture(
487 PhabricatorRepository $repository,
488 ExecFuture $future,
489 $min_sleep) {
491 $display_name = $repository->getDisplayName();
493 $this->log(pht('Resolving update for "%s".', $display_name));
495 try {
496 list($stdout, $stderr) = $future->resolvex();
497 } catch (Exception $ex) {
498 $proxy = new PhutilProxyException(
499 pht(
500 'Error while updating the "%s" repository.',
501 $display_name),
502 $ex);
503 phlog($proxy);
505 $smart_wait = $repository->loadUpdateInterval($min_sleep);
506 return PhabricatorTime::getNow() + $smart_wait;
509 if (strlen($stderr)) {
510 $stderr_msg = pht(
511 'Unexpected output while updating repository "%s": %s',
512 $display_name,
513 $stderr);
514 phlog($stderr_msg);
517 $smart_wait = $repository->loadUpdateInterval($min_sleep);
519 $this->log(
520 pht(
521 'Based on activity in repository "%s", considering a wait of %s '.
522 'seconds before update.',
523 $display_name,
524 new PhutilNumber($smart_wait)));
526 return PhabricatorTime::getNow() + $smart_wait;
532 * Sleep for a short period of time, waiting for update messages from the
535 * @task pull
537 private function waitForUpdates($min_sleep, array $retry_after) {
538 $this->log(
539 pht('No repositories need updates right now, sleeping...'));
541 $sleep_until = time() + $min_sleep;
542 if ($retry_after) {
543 $sleep_until = min($sleep_until, min($retry_after));
546 while (($sleep_until - time()) > 0) {
547 $sleep_duration = ($sleep_until - time());
549 if ($this->shouldHibernate($sleep_duration)) {
550 return true;
553 $this->log(
554 pht(
555 'Sleeping for %s more second(s)...',
556 new PhutilNumber($sleep_duration)));
558 $this->sleep(1);
560 if ($this->shouldExit()) {
561 $this->log(pht('Awakened from sleep by graceful shutdown!'));
562 return false;
565 if ($this->loadRepositoryUpdateMessages()) {
566 $this->log(pht('Awakened from sleep by pending updates!'));
567 break;
571 return false;
574 private function loadUnsynchronizedRepositories(AlmanacDevice $device) {
575 $viewer = $this->getViewer();
576 $table = new PhabricatorRepositoryWorkingCopyVersion();
577 $conn = $table->establishConnection('r');
579 $our_versions = queryfx_all(
580 $conn,
581 'SELECT repositoryPHID, repositoryVersion FROM %R WHERE devicePHID = %s',
582 $table,
583 $device->getPHID());
584 $our_versions = ipull($our_versions, 'repositoryVersion', 'repositoryPHID');
586 $max_versions = queryfx_all(
587 $conn,
588 'SELECT repositoryPHID, MAX(repositoryVersion) maxVersion FROM %R
589 GROUP BY repositoryPHID',
590 $table);
591 $max_versions = ipull($max_versions, 'maxVersion', 'repositoryPHID');
593 $unsynchronized_phids = array();
594 foreach ($max_versions as $repository_phid => $max_version) {
595 $our_version = idx($our_versions, $repository_phid);
596 if (($our_version === null) || ($our_version < $max_version)) {
597 $unsynchronized_phids[] = $repository_phid;
601 if (!$unsynchronized_phids) {
602 return array();
605 return id(new PhabricatorRepositoryQuery())
606 ->setViewer($viewer)
607 ->withPHIDs($unsynchronized_phids)
608 ->execute();