[SyncFS] Build indexes from FileTracker entries on disk.
[chromium-blink-merge.git] / sync / internal_api / sync_manager_impl.cc
blob256a5c9fdd4fc729720227c65405da8038895155
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/internal_api/sync_manager_impl.h"
7 #include <string>
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/thread_task_runner_handle.h"
19 #include "base/values.h"
20 #include "sync/engine/sync_scheduler.h"
21 #include "sync/engine/syncer_types.h"
22 #include "sync/internal_api/change_reorder_buffer.h"
23 #include "sync/internal_api/public/base/cancelation_signal.h"
24 #include "sync/internal_api/public/base/invalidation_interface.h"
25 #include "sync/internal_api/public/base/model_type.h"
26 #include "sync/internal_api/public/base_node.h"
27 #include "sync/internal_api/public/configure_reason.h"
28 #include "sync/internal_api/public/engine/polling_constants.h"
29 #include "sync/internal_api/public/http_post_provider_factory.h"
30 #include "sync/internal_api/public/internal_components_factory.h"
31 #include "sync/internal_api/public/read_node.h"
32 #include "sync/internal_api/public/read_transaction.h"
33 #include "sync/internal_api/public/sync_context.h"
34 #include "sync/internal_api/public/sync_context_proxy.h"
35 #include "sync/internal_api/public/user_share.h"
36 #include "sync/internal_api/public/util/experiments.h"
37 #include "sync/internal_api/public/write_node.h"
38 #include "sync/internal_api/public/write_transaction.h"
39 #include "sync/internal_api/sync_context_proxy_impl.h"
40 #include "sync/internal_api/syncapi_internal.h"
41 #include "sync/internal_api/syncapi_server_connection_manager.h"
42 #include "sync/protocol/proto_value_conversions.h"
43 #include "sync/protocol/sync.pb.h"
44 #include "sync/sessions/directory_type_debug_info_emitter.h"
45 #include "sync/syncable/directory.h"
46 #include "sync/syncable/entry.h"
47 #include "sync/syncable/in_memory_directory_backing_store.h"
48 #include "sync/syncable/on_disk_directory_backing_store.h"
50 using base::TimeDelta;
51 using sync_pb::GetUpdatesCallerInfo;
53 namespace syncer {
55 using sessions::SyncSessionContext;
56 using syncable::ImmutableWriteTransactionInfo;
57 using syncable::SPECIFICS;
58 using syncable::UNIQUE_POSITION;
60 namespace {
62 // Delays for syncer nudges.
63 static const int kDefaultNudgeDelayMilliseconds = 200;
64 static const int kPreferencesNudgeDelayMilliseconds = 2000;
65 static const int kSyncRefreshDelayMsec = 500;
66 static const int kSyncSchedulerDelayMsec = 250;
68 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
69 ConfigureReason reason) {
70 switch (reason) {
71 case CONFIGURE_REASON_RECONFIGURATION:
72 return GetUpdatesCallerInfo::RECONFIGURATION;
73 case CONFIGURE_REASON_MIGRATION:
74 return GetUpdatesCallerInfo::MIGRATION;
75 case CONFIGURE_REASON_NEW_CLIENT:
76 return GetUpdatesCallerInfo::NEW_CLIENT;
77 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
78 case CONFIGURE_REASON_CRYPTO:
79 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
80 default:
81 NOTREACHED();
83 return GetUpdatesCallerInfo::UNKNOWN;
86 } // namespace
88 // A class to calculate nudge delays for types.
89 class NudgeStrategy {
90 public:
91 static TimeDelta GetNudgeDelayTimeDelta(const ModelType& model_type,
92 SyncManagerImpl* core) {
93 NudgeDelayStrategy delay_type = GetNudgeDelayStrategy(model_type);
94 return GetNudgeDelayTimeDeltaFromType(delay_type,
95 model_type,
96 core);
99 private:
100 // Possible types of nudge delay for datatypes.
101 // Note: These are just hints. If a sync happens then all dirty entries
102 // would be committed as part of the sync.
103 enum NudgeDelayStrategy {
104 // Sync right away.
105 IMMEDIATE,
107 // Sync this change while syncing another change.
108 ACCOMPANY_ONLY,
110 // The datatype does not use one of the predefined wait times but defines
111 // its own wait time logic for nudge.
112 CUSTOM,
115 static NudgeDelayStrategy GetNudgeDelayStrategy(const ModelType& type) {
116 switch (type) {
117 case AUTOFILL:
118 return ACCOMPANY_ONLY;
119 case PREFERENCES:
120 case SESSIONS:
121 case FAVICON_IMAGES:
122 case FAVICON_TRACKING:
123 return CUSTOM;
124 default:
125 return IMMEDIATE;
129 static TimeDelta GetNudgeDelayTimeDeltaFromType(
130 const NudgeDelayStrategy& delay_type, const ModelType& model_type,
131 const SyncManagerImpl* core) {
132 CHECK(core);
133 TimeDelta delay = TimeDelta::FromMilliseconds(
134 kDefaultNudgeDelayMilliseconds);
135 switch (delay_type) {
136 case IMMEDIATE:
137 delay = TimeDelta::FromMilliseconds(
138 kDefaultNudgeDelayMilliseconds);
139 break;
140 case ACCOMPANY_ONLY:
141 delay = TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds);
142 break;
143 case CUSTOM:
144 switch (model_type) {
145 case PREFERENCES:
146 delay = TimeDelta::FromMilliseconds(
147 kPreferencesNudgeDelayMilliseconds);
148 break;
149 case SESSIONS:
150 case FAVICON_IMAGES:
151 case FAVICON_TRACKING:
152 delay = core->scheduler()->GetSessionsCommitDelay();
153 break;
154 default:
155 NOTREACHED();
157 break;
158 default:
159 NOTREACHED();
161 return delay;
165 SyncManagerImpl::SyncManagerImpl(const std::string& name)
166 : name_(name),
167 change_delegate_(NULL),
168 initialized_(false),
169 observing_network_connectivity_changes_(false),
170 report_unrecoverable_error_function_(NULL),
171 weak_ptr_factory_(this) {
172 // Pre-fill |notification_info_map_|.
173 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
174 notification_info_map_.insert(
175 std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
179 SyncManagerImpl::~SyncManagerImpl() {
180 DCHECK(thread_checker_.CalledOnValidThread());
181 CHECK(!initialized_);
184 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
185 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
187 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
188 base::DictionaryValue* value = new base::DictionaryValue();
189 value->SetInteger("totalCount", total_count);
190 value->SetString("payload", payload);
191 return value;
194 bool SyncManagerImpl::VisiblePositionsDiffer(
195 const syncable::EntryKernelMutation& mutation) const {
196 const syncable::EntryKernel& a = mutation.original;
197 const syncable::EntryKernel& b = mutation.mutated;
198 if (!b.ShouldMaintainPosition())
199 return false;
200 if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
201 return true;
202 if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
203 return true;
204 return false;
207 bool SyncManagerImpl::VisiblePropertiesDiffer(
208 const syncable::EntryKernelMutation& mutation,
209 Cryptographer* cryptographer) const {
210 const syncable::EntryKernel& a = mutation.original;
211 const syncable::EntryKernel& b = mutation.mutated;
212 const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
213 const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
214 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
215 GetModelTypeFromSpecifics(b_specifics));
216 ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
217 // Suppress updates to items that aren't tracked by any browser model.
218 if (model_type < FIRST_REAL_MODEL_TYPE ||
219 !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
220 return false;
222 if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
223 return true;
224 if (!AreSpecificsEqual(cryptographer,
225 a.ref(syncable::SPECIFICS),
226 b.ref(syncable::SPECIFICS))) {
227 return true;
229 // We only care if the name has changed if neither specifics is encrypted
230 // (encrypted nodes blow away the NON_UNIQUE_NAME).
231 if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
232 a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
233 return true;
234 if (VisiblePositionsDiffer(mutation))
235 return true;
236 return false;
239 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
240 return directory()->InitialSyncEndedTypes();
243 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
244 ModelTypeSet types) {
245 ModelTypeSet result;
246 for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
247 sync_pb::DataTypeProgressMarker marker;
248 directory()->GetDownloadProgress(i.Get(), &marker);
250 if (marker.token().empty())
251 result.Put(i.Get());
253 return result;
256 void SyncManagerImpl::ConfigureSyncer(
257 ConfigureReason reason,
258 ModelTypeSet to_download,
259 ModelTypeSet to_purge,
260 ModelTypeSet to_journal,
261 ModelTypeSet to_unapply,
262 const ModelSafeRoutingInfo& new_routing_info,
263 const base::Closure& ready_task,
264 const base::Closure& retry_task) {
265 DCHECK(thread_checker_.CalledOnValidThread());
266 DCHECK(!ready_task.is_null());
267 DCHECK(!retry_task.is_null());
269 DVLOG(1) << "Configuring -"
270 << "\n\t" << "current types: "
271 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
272 << "\n\t" << "types to download: "
273 << ModelTypeSetToString(to_download)
274 << "\n\t" << "types to purge: "
275 << ModelTypeSetToString(to_purge)
276 << "\n\t" << "types to journal: "
277 << ModelTypeSetToString(to_journal)
278 << "\n\t" << "types to unapply: "
279 << ModelTypeSetToString(to_unapply);
280 if (!PurgeDisabledTypes(to_purge,
281 to_journal,
282 to_unapply)) {
283 // We failed to cleanup the types. Invoke the ready task without actually
284 // configuring any types. The caller should detect this as a configuration
285 // failure and act appropriately.
286 ready_task.Run();
287 return;
290 ConfigurationParams params(GetSourceFromReason(reason),
291 to_download,
292 new_routing_info,
293 ready_task,
294 retry_task);
296 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
297 scheduler_->ScheduleConfiguration(params);
300 void SyncManagerImpl::Init(
301 const base::FilePath& database_location,
302 const WeakHandle<JsEventHandler>& event_handler,
303 const std::string& sync_server_and_path,
304 int port,
305 bool use_ssl,
306 scoped_ptr<HttpPostProviderFactory> post_factory,
307 const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
308 ExtensionsActivity* extensions_activity,
309 SyncManager::ChangeDelegate* change_delegate,
310 const SyncCredentials& credentials,
311 const std::string& invalidator_client_id,
312 const std::string& restored_key_for_bootstrapping,
313 const std::string& restored_keystore_key_for_bootstrapping,
314 InternalComponentsFactory* internal_components_factory,
315 Encryptor* encryptor,
316 scoped_ptr<UnrecoverableErrorHandler> unrecoverable_error_handler,
317 ReportUnrecoverableErrorFunction report_unrecoverable_error_function,
318 CancelationSignal* cancelation_signal) {
319 CHECK(!initialized_);
320 DCHECK(thread_checker_.CalledOnValidThread());
321 DCHECK(post_factory.get());
322 DCHECK(!credentials.email.empty());
323 DCHECK(!credentials.sync_token.empty());
324 DCHECK(!credentials.scope_set.empty());
325 DCHECK(cancelation_signal);
326 DVLOG(1) << "SyncManager starting Init...";
328 weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
330 change_delegate_ = change_delegate;
332 AddObserver(&js_sync_manager_observer_);
333 SetJsEventHandler(event_handler);
335 AddObserver(&debug_info_event_listener_);
337 database_path_ = database_location.Append(
338 syncable::Directory::kSyncDatabaseFilename);
339 unrecoverable_error_handler_ = unrecoverable_error_handler.Pass();
340 report_unrecoverable_error_function_ = report_unrecoverable_error_function;
342 allstatus_.SetHasKeystoreKey(
343 !restored_keystore_key_for_bootstrapping.empty());
344 sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
345 &share_,
346 encryptor,
347 restored_key_for_bootstrapping,
348 restored_keystore_key_for_bootstrapping));
349 sync_encryption_handler_->AddObserver(this);
350 sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
351 sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
353 base::FilePath absolute_db_path = database_path_;
354 DCHECK(absolute_db_path.IsAbsolute());
356 scoped_ptr<syncable::DirectoryBackingStore> backing_store =
357 internal_components_factory->BuildDirectoryBackingStore(
358 credentials.email, absolute_db_path).Pass();
360 DCHECK(backing_store.get());
361 share_.directory.reset(
362 new syncable::Directory(
363 backing_store.release(),
364 unrecoverable_error_handler_.get(),
365 report_unrecoverable_error_function_,
366 sync_encryption_handler_.get(),
367 sync_encryption_handler_->GetCryptographerUnsafe()));
368 share_.sync_credentials = credentials;
370 // UserShare is accessible to a lot of code that doesn't need access to the
371 // sync token so clear sync_token from the UserShare.
372 share_.sync_credentials.sync_token = "";
374 const std::string& username = credentials.email;
375 DVLOG(1) << "Username: " << username;
376 if (!OpenDirectory(username)) {
377 NotifyInitializationFailure();
378 LOG(ERROR) << "Sync manager initialization failed!";
379 return;
382 connection_manager_.reset(new SyncAPIServerConnectionManager(
383 sync_server_and_path, port, use_ssl,
384 post_factory.release(), cancelation_signal));
385 connection_manager_->set_client_id(directory()->cache_guid());
386 connection_manager_->AddListener(this);
388 std::string sync_id = directory()->cache_guid();
390 DVLOG(1) << "Setting sync client ID: " << sync_id;
391 allstatus_.SetSyncId(sync_id);
392 DVLOG(1) << "Setting invalidator client ID: " << invalidator_client_id;
393 allstatus_.SetInvalidatorClientId(invalidator_client_id);
395 model_type_registry_.reset(new ModelTypeRegistry(workers, directory(), this));
397 // Bind the SyncContext WeakPtr to this thread. This helps us crash earlier
398 // if the pointer is misused in debug mode.
399 base::WeakPtr<SyncContext> weak_core = model_type_registry_->AsWeakPtr();
400 weak_core.get();
402 sync_context_proxy_.reset(
403 new SyncContextProxyImpl(base::ThreadTaskRunnerHandle::Get(), weak_core));
405 // Build a SyncSessionContext and store the worker in it.
406 DVLOG(1) << "Sync is bringing up SyncSessionContext.";
407 std::vector<SyncEngineEventListener*> listeners;
408 listeners.push_back(&allstatus_);
409 listeners.push_back(this);
410 session_context_ = internal_components_factory->BuildContext(
411 connection_manager_.get(),
412 directory(),
413 extensions_activity,
414 listeners,
415 &debug_info_event_listener_,
416 model_type_registry_.get(),
417 invalidator_client_id).Pass();
418 session_context_->set_account_name(credentials.email);
419 scheduler_ = internal_components_factory->BuildScheduler(
420 name_, session_context_.get(), cancelation_signal).Pass();
422 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
424 initialized_ = true;
426 net::NetworkChangeNotifier::AddIPAddressObserver(this);
427 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
428 observing_network_connectivity_changes_ = true;
430 UpdateCredentials(credentials);
432 NotifyInitializationSuccess();
435 void SyncManagerImpl::NotifyInitializationSuccess() {
436 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
437 OnInitializationComplete(
438 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
439 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
440 true, InitialSyncEndedTypes()));
443 void SyncManagerImpl::NotifyInitializationFailure() {
444 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
445 OnInitializationComplete(
446 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
447 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
448 false, ModelTypeSet()));
451 void SyncManagerImpl::OnPassphraseRequired(
452 PassphraseRequiredReason reason,
453 const sync_pb::EncryptedData& pending_keys) {
454 // Does nothing.
457 void SyncManagerImpl::OnPassphraseAccepted() {
458 // Does nothing.
461 void SyncManagerImpl::OnBootstrapTokenUpdated(
462 const std::string& bootstrap_token,
463 BootstrapTokenType type) {
464 if (type == KEYSTORE_BOOTSTRAP_TOKEN)
465 allstatus_.SetHasKeystoreKey(true);
468 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
469 bool encrypt_everything) {
470 allstatus_.SetEncryptedTypes(encrypted_types);
473 void SyncManagerImpl::OnEncryptionComplete() {
474 // Does nothing.
477 void SyncManagerImpl::OnCryptographerStateChanged(
478 Cryptographer* cryptographer) {
479 allstatus_.SetCryptographerReady(cryptographer->is_ready());
480 allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
481 allstatus_.SetKeystoreMigrationTime(
482 sync_encryption_handler_->migration_time());
485 void SyncManagerImpl::OnPassphraseTypeChanged(
486 PassphraseType type,
487 base::Time explicit_passphrase_time) {
488 allstatus_.SetPassphraseType(type);
489 allstatus_.SetKeystoreMigrationTime(
490 sync_encryption_handler_->migration_time());
493 void SyncManagerImpl::StartSyncingNormally(
494 const ModelSafeRoutingInfo& routing_info) {
495 // Start the sync scheduler.
496 // TODO(sync): We always want the newest set of routes when we switch back
497 // to normal mode. Figure out how to enforce set_routing_info is always
498 // appropriately set and that it's only modified when switching to normal
499 // mode.
500 DCHECK(thread_checker_.CalledOnValidThread());
501 session_context_->SetRoutingInfo(routing_info);
502 scheduler_->Start(SyncScheduler::NORMAL_MODE);
505 syncable::Directory* SyncManagerImpl::directory() {
506 return share_.directory.get();
509 const SyncScheduler* SyncManagerImpl::scheduler() const {
510 return scheduler_.get();
513 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
514 return connection_manager_->HasInvalidAuthToken();
517 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
518 DCHECK(!initialized_) << "Should only happen once";
520 // Set before Open().
521 change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
522 WeakHandle<syncable::TransactionObserver> transaction_observer(
523 MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
525 syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
526 open_result = directory()->Open(username, this, transaction_observer);
527 if (open_result != syncable::OPENED) {
528 LOG(ERROR) << "Could not open share for:" << username;
529 return false;
532 // Unapplied datatypes (those that do not have initial sync ended set) get
533 // re-downloaded during any configuration. But, it's possible for a datatype
534 // to have a progress marker but not have initial sync ended yet, making
535 // it a candidate for migration. This is a problem, as the DataTypeManager
536 // does not support a migration while it's already in the middle of a
537 // configuration. As a result, any partially synced datatype can stall the
538 // DTM, waiting for the configuration to complete, which it never will due
539 // to the migration error. In addition, a partially synced nigori will
540 // trigger the migration logic before the backend is initialized, resulting
541 // in crashes. We therefore detect and purge any partially synced types as
542 // part of initialization.
543 if (!PurgePartiallySyncedTypes())
544 return false;
546 return true;
549 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
550 ModelTypeSet partially_synced_types = ModelTypeSet::All();
551 partially_synced_types.RemoveAll(InitialSyncEndedTypes());
552 partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
553 ModelTypeSet::All()));
555 DVLOG(1) << "Purging partially synced types "
556 << ModelTypeSetToString(partially_synced_types);
557 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
558 partially_synced_types.Size());
559 if (partially_synced_types.Empty())
560 return true;
561 return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
562 ModelTypeSet(),
563 ModelTypeSet());
566 bool SyncManagerImpl::PurgeDisabledTypes(
567 ModelTypeSet to_purge,
568 ModelTypeSet to_journal,
569 ModelTypeSet to_unapply) {
570 if (to_purge.Empty())
571 return true;
572 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
573 DCHECK(to_purge.HasAll(to_journal));
574 DCHECK(to_purge.HasAll(to_unapply));
575 return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
578 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
579 DCHECK(thread_checker_.CalledOnValidThread());
580 DCHECK(initialized_);
581 DCHECK(!credentials.email.empty());
582 DCHECK(!credentials.sync_token.empty());
583 DCHECK(!credentials.scope_set.empty());
585 observing_network_connectivity_changes_ = true;
586 if (!connection_manager_->SetAuthToken(credentials.sync_token))
587 return; // Auth token is known to be invalid, so exit early.
589 scheduler_->OnCredentialsUpdated();
591 // TODO(zea): pass the credential age to the debug info event listener.
594 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
595 DCHECK(thread_checker_.CalledOnValidThread());
596 observers_.AddObserver(observer);
599 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
600 DCHECK(thread_checker_.CalledOnValidThread());
601 observers_.RemoveObserver(observer);
604 void SyncManagerImpl::ShutdownOnSyncThread() {
605 DCHECK(thread_checker_.CalledOnValidThread());
607 // Prevent any in-flight method calls from running. Also
608 // invalidates |weak_handle_this_| and |change_observer_|.
609 weak_ptr_factory_.InvalidateWeakPtrs();
610 js_mutation_event_observer_.InvalidateWeakPtrs();
612 scheduler_.reset();
613 session_context_.reset();
614 model_type_registry_.reset();
616 if (sync_encryption_handler_) {
617 sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
618 sync_encryption_handler_->RemoveObserver(this);
621 SetJsEventHandler(WeakHandle<JsEventHandler>());
622 RemoveObserver(&js_sync_manager_observer_);
624 RemoveObserver(&debug_info_event_listener_);
626 // |connection_manager_| may end up being NULL here in tests (in synchronous
627 // initialization mode).
629 // TODO(akalin): Fix this behavior.
630 if (connection_manager_)
631 connection_manager_->RemoveListener(this);
632 connection_manager_.reset();
634 net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
635 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
636 observing_network_connectivity_changes_ = false;
638 if (initialized_ && directory()) {
639 directory()->SaveChanges();
642 share_.directory.reset();
644 change_delegate_ = NULL;
646 initialized_ = false;
648 // We reset these here, since only now we know they will not be
649 // accessed from other threads (since we shut down everything).
650 change_observer_.Reset();
651 weak_handle_this_.Reset();
654 void SyncManagerImpl::OnIPAddressChanged() {
655 if (!observing_network_connectivity_changes_) {
656 DVLOG(1) << "IP address change dropped.";
657 return;
659 DVLOG(1) << "IP address change detected.";
660 OnNetworkConnectivityChangedImpl();
663 void SyncManagerImpl::OnConnectionTypeChanged(
664 net::NetworkChangeNotifier::ConnectionType) {
665 if (!observing_network_connectivity_changes_) {
666 DVLOG(1) << "Connection type change dropped.";
667 return;
669 DVLOG(1) << "Connection type change detected.";
670 OnNetworkConnectivityChangedImpl();
673 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
674 DCHECK(thread_checker_.CalledOnValidThread());
675 scheduler_->OnConnectionStatusChange();
678 void SyncManagerImpl::OnServerConnectionEvent(
679 const ServerConnectionEvent& event) {
680 DCHECK(thread_checker_.CalledOnValidThread());
681 if (event.connection_code ==
682 HttpResponse::SERVER_CONNECTION_OK) {
683 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
684 OnConnectionStatusChange(CONNECTION_OK));
687 if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
688 observing_network_connectivity_changes_ = false;
689 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
690 OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
693 if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
694 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
695 OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
699 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
700 ModelTypeSet models_with_changes) {
701 // This notification happens immediately after the transaction mutex is
702 // released. This allows work to be performed without blocking other threads
703 // from acquiring a transaction.
704 if (!change_delegate_)
705 return;
707 // Call commit.
708 for (ModelTypeSet::Iterator it = models_with_changes.First();
709 it.Good(); it.Inc()) {
710 change_delegate_->OnChangesComplete(it.Get());
711 change_observer_.Call(
712 FROM_HERE,
713 &SyncManager::ChangeObserver::OnChangesComplete,
714 it.Get());
718 ModelTypeSet
719 SyncManagerImpl::HandleTransactionEndingChangeEvent(
720 const ImmutableWriteTransactionInfo& write_transaction_info,
721 syncable::BaseTransaction* trans) {
722 // This notification happens immediately before a syncable WriteTransaction
723 // falls out of scope. It happens while the channel mutex is still held,
724 // and while the transaction mutex is held, so it cannot be re-entrant.
725 if (!change_delegate_ || change_records_.empty())
726 return ModelTypeSet();
728 // This will continue the WriteTransaction using a read only wrapper.
729 // This is the last chance for read to occur in the WriteTransaction
730 // that's closing. This special ReadTransaction will not close the
731 // underlying transaction.
732 ReadTransaction read_trans(GetUserShare(), trans);
734 ModelTypeSet models_with_changes;
735 for (ChangeRecordMap::const_iterator it = change_records_.begin();
736 it != change_records_.end(); ++it) {
737 DCHECK(!it->second.Get().empty());
738 ModelType type = ModelTypeFromInt(it->first);
739 change_delegate_->
740 OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
741 &read_trans, it->second);
742 change_observer_.Call(FROM_HERE,
743 &SyncManager::ChangeObserver::OnChangesApplied,
744 type, write_transaction_info.Get().id, it->second);
745 models_with_changes.Put(type);
747 change_records_.clear();
748 return models_with_changes;
751 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
752 const ImmutableWriteTransactionInfo& write_transaction_info,
753 syncable::BaseTransaction* trans,
754 std::vector<int64>* entries_changed) {
755 // We have been notified about a user action changing a sync model.
756 LOG_IF(WARNING, !change_records_.empty()) <<
757 "CALCULATE_CHANGES called with unapplied old changes.";
759 // The mutated model type, or UNSPECIFIED if nothing was mutated.
760 ModelTypeSet mutated_model_types;
762 const syncable::ImmutableEntryKernelMutationMap& mutations =
763 write_transaction_info.Get().mutations;
764 for (syncable::EntryKernelMutationMap::const_iterator it =
765 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
766 if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
767 continue;
770 ModelType model_type =
771 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
772 if (model_type < FIRST_REAL_MODEL_TYPE) {
773 NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
774 continue;
777 // Found real mutation.
778 if (model_type != UNSPECIFIED) {
779 mutated_model_types.Put(model_type);
780 entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
784 // Nudge if necessary.
785 if (!mutated_model_types.Empty()) {
786 if (weak_handle_this_.IsInitialized()) {
787 weak_handle_this_.Call(FROM_HERE,
788 &SyncManagerImpl::RequestNudgeForDataTypes,
789 FROM_HERE,
790 mutated_model_types);
791 } else {
792 NOTREACHED();
797 void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
798 ModelType type, ChangeReorderBuffer* buffer,
799 Cryptographer* cryptographer, const syncable::EntryKernel& original,
800 bool existed_before, bool exists_now) {
801 // If this is a deletion and the datatype was encrypted, we need to decrypt it
802 // and attach it to the buffer.
803 if (!exists_now && existed_before) {
804 sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
805 if (type == PASSWORDS) {
806 // Passwords must use their own legacy ExtraPasswordChangeRecordData.
807 scoped_ptr<sync_pb::PasswordSpecificsData> data(
808 DecryptPasswordSpecifics(original_specifics, cryptographer));
809 if (!data) {
810 NOTREACHED();
811 return;
813 buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
814 } else if (original_specifics.has_encrypted()) {
815 // All other datatypes can just create a new unencrypted specifics and
816 // attach it.
817 const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
818 if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
819 NOTREACHED();
820 return;
823 buffer->SetSpecificsForId(id, original_specifics);
827 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
828 const ImmutableWriteTransactionInfo& write_transaction_info,
829 syncable::BaseTransaction* trans,
830 std::vector<int64>* entries_changed) {
831 // We only expect one notification per sync step, so change_buffers_ should
832 // contain no pending entries.
833 LOG_IF(WARNING, !change_records_.empty()) <<
834 "CALCULATE_CHANGES called with unapplied old changes.";
836 ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
838 Cryptographer* crypto = directory()->GetCryptographer(trans);
839 const syncable::ImmutableEntryKernelMutationMap& mutations =
840 write_transaction_info.Get().mutations;
841 for (syncable::EntryKernelMutationMap::const_iterator it =
842 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
843 bool existed_before = !it->second.original.ref(syncable::IS_DEL);
844 bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
846 // Omit items that aren't associated with a model.
847 ModelType type =
848 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
849 if (type < FIRST_REAL_MODEL_TYPE)
850 continue;
852 int64 handle = it->first;
853 if (exists_now && !existed_before)
854 change_buffers[type].PushAddedItem(handle);
855 else if (!exists_now && existed_before)
856 change_buffers[type].PushDeletedItem(handle);
857 else if (exists_now && existed_before &&
858 VisiblePropertiesDiffer(it->second, crypto)) {
859 change_buffers[type].PushUpdatedItem(handle);
862 SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
863 it->second.original, existed_before, exists_now);
866 ReadTransaction read_trans(GetUserShare(), trans);
867 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
868 if (!change_buffers[i].IsEmpty()) {
869 if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
870 &(change_records_[i]))) {
871 for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
872 entries_changed->push_back((change_records_[i].Get())[j].id);
874 if (change_records_[i].Get().empty())
875 change_records_.erase(i);
880 TimeDelta SyncManagerImpl::GetNudgeDelayTimeDelta(
881 const ModelType& model_type) {
882 return NudgeStrategy::GetNudgeDelayTimeDelta(model_type, this);
885 void SyncManagerImpl::RequestNudgeForDataTypes(
886 const tracked_objects::Location& nudge_location,
887 ModelTypeSet types) {
888 debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
890 // TODO(lipalani) : Calculate the nudge delay based on all types.
891 base::TimeDelta nudge_delay = NudgeStrategy::GetNudgeDelayTimeDelta(
892 types.First().Get(),
893 this);
894 scheduler_->ScheduleLocalNudge(nudge_delay,
895 types,
896 nudge_location);
899 void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) {
900 // TODO(rlarocque): Initial downloads should have a separate nudge type.
901 DCHECK(thread_checker_.CalledOnValidThread());
902 RefreshTypes(ModelTypeSet(type));
905 void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) {
906 DCHECK(thread_checker_.CalledOnValidThread());
907 RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type));
910 void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) {
911 DCHECK(thread_checker_.CalledOnValidThread());
912 RefreshTypes(ModelTypeSet(type));
915 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
916 DCHECK(thread_checker_.CalledOnValidThread());
917 // Only send an event if this is due to a cycle ending and this cycle
918 // concludes a canonical "sync" process; that is, based on what is known
919 // locally we are "all happy" and up-to-date. There may be new changes on
920 // the server, but we'll get them on a subsequent sync.
922 // Notifications are sent at the end of every sync cycle, regardless of
923 // whether we should sync again.
924 if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
925 if (!initialized_) {
926 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
927 << "initialized";
928 return;
931 DVLOG(1) << "Sending OnSyncCycleCompleted";
932 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
933 OnSyncCycleCompleted(event.snapshot));
937 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
938 FOR_EACH_OBSERVER(
939 SyncManager::Observer, observers_,
940 OnActionableError(error));
943 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
945 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
947 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
948 FOR_EACH_OBSERVER(
949 SyncManager::Observer, observers_,
950 OnMigrationRequested(types));
953 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
954 protocol_event_buffer_.RecordProtocolEvent(event);
955 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
956 OnProtocolEvent(event));
959 void SyncManagerImpl::SetJsEventHandler(
960 const WeakHandle<JsEventHandler>& event_handler) {
961 js_sync_manager_observer_.SetJsEventHandler(event_handler);
962 js_mutation_event_observer_.SetJsEventHandler(event_handler);
963 js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
966 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType(
967 syncer::ModelType type) {
968 DirectoryTypeDebugInfoEmitterMap* emitter_map =
969 model_type_registry_->directory_type_debug_info_emitter_map();
970 DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type);
972 if (it == emitter_map->end()) {
973 // This can happen in some cases. The UI thread makes requests of us
974 // when it doesn't really know which types are enabled or disabled.
975 DLOG(WARNING) << "Asked to return debug info for invalid type "
976 << ModelTypeToString(type);
977 return scoped_ptr<base::ListValue>();
980 return it->second->GetAllNodes();
983 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) {
984 DCHECK(thread_checker_.CalledOnValidThread());
986 DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled;
987 allstatus_.SetNotificationsEnabled(invalidator_enabled);
988 scheduler_->SetNotificationsEnabled(invalidator_enabled);
991 void SyncManagerImpl::OnIncomingInvalidation(
992 syncer::ModelType type,
993 scoped_ptr<InvalidationInterface> invalidation) {
994 DCHECK(thread_checker_.CalledOnValidThread());
996 scheduler_->ScheduleInvalidationNudge(
997 TimeDelta::FromMilliseconds(kSyncSchedulerDelayMsec),
998 type,
999 invalidation.Pass(),
1000 FROM_HERE);
1003 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
1004 DCHECK(thread_checker_.CalledOnValidThread());
1005 if (types.Empty()) {
1006 LOG(WARNING) << "Sync received refresh request with no types specified.";
1007 } else {
1008 scheduler_->ScheduleLocalRefreshRequest(
1009 TimeDelta::FromMilliseconds(kSyncRefreshDelayMsec),
1010 types, FROM_HERE);
1014 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
1015 return allstatus_.status();
1018 void SyncManagerImpl::SaveChanges() {
1019 directory()->SaveChanges();
1022 UserShare* SyncManagerImpl::GetUserShare() {
1023 DCHECK(initialized_);
1024 return &share_;
1027 syncer::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() {
1028 DCHECK(initialized_);
1029 return sync_context_proxy_.get();
1032 const std::string SyncManagerImpl::cache_guid() {
1033 DCHECK(initialized_);
1034 return directory()->cache_guid();
1037 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
1038 ReadTransaction trans(FROM_HERE, GetUserShare());
1039 ReadNode nigori_node(&trans);
1040 if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) {
1041 DVLOG(1) << "Couldn't find Nigori node.";
1042 return false;
1044 bool found_experiment = false;
1046 ReadNode favicon_sync_node(&trans);
1047 if (favicon_sync_node.InitByClientTagLookup(
1048 syncer::EXPERIMENTS,
1049 syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
1050 experiments->favicon_sync_limit =
1051 favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
1052 favicon_sync_limit();
1053 found_experiment = true;
1056 ReadNode pre_commit_update_avoidance_node(&trans);
1057 if (pre_commit_update_avoidance_node.InitByClientTagLookup(
1058 syncer::EXPERIMENTS,
1059 syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
1060 session_context_->set_server_enabled_pre_commit_update_avoidance(
1061 pre_commit_update_avoidance_node.GetExperimentsSpecifics().
1062 pre_commit_update_avoidance().enabled());
1063 // We don't bother setting found_experiment. The frontend doesn't need to
1064 // know about this.
1067 ReadNode gcm_channel_node(&trans);
1068 if (gcm_channel_node.InitByClientTagLookup(
1069 syncer::EXPERIMENTS,
1070 syncer::kGCMChannelTag) == BaseNode::INIT_OK &&
1071 gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) {
1072 experiments->gcm_channel_state =
1073 (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ?
1074 syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED);
1075 found_experiment = true;
1078 ReadNode enhanced_bookmarks_node(&trans);
1079 if (enhanced_bookmarks_node.InitByClientTagLookup(
1080 syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) ==
1081 BaseNode::INIT_OK &&
1082 enhanced_bookmarks_node.GetExperimentsSpecifics()
1083 .has_enhanced_bookmarks()) {
1084 const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks =
1085 enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks();
1086 if (enhanced_bookmarks.has_enabled())
1087 experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled();
1088 if (enhanced_bookmarks.has_extension_id()) {
1089 experiments->enhanced_bookmarks_ext_id =
1090 enhanced_bookmarks.extension_id();
1092 found_experiment = true;
1095 ReadNode gcm_invalidations_node(&trans);
1096 if (gcm_invalidations_node.InitByClientTagLookup(
1097 syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) ==
1098 BaseNode::INIT_OK) {
1099 const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
1100 gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
1101 if (gcm_invalidations.has_enabled()) {
1102 experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
1103 found_experiment = true;
1107 return found_experiment;
1110 bool SyncManagerImpl::HasUnsyncedItems() {
1111 ReadTransaction trans(FROM_HERE, GetUserShare());
1112 return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1115 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
1116 return sync_encryption_handler_.get();
1119 ScopedVector<syncer::ProtocolEvent>
1120 SyncManagerImpl::GetBufferedProtocolEvents() {
1121 return protocol_event_buffer_.GetBufferedProtocolEvents();
1124 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
1125 syncer::TypeDebugInfoObserver* observer) {
1126 model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer);
1129 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
1130 syncer::TypeDebugInfoObserver* observer) {
1131 model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer);
1134 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
1135 syncer::TypeDebugInfoObserver* observer) {
1136 return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer);
1139 void SyncManagerImpl::RequestEmitDebugInfo() {
1140 model_type_registry_->RequestEmitDebugInfo();
1143 // static.
1144 int SyncManagerImpl::GetDefaultNudgeDelay() {
1145 return kDefaultNudgeDelayMilliseconds;
1148 // static.
1149 int SyncManagerImpl::GetPreferencesNudgeDelay() {
1150 return kPreferencesNudgeDelayMilliseconds;
1153 } // namespace syncer