[Sync] Clean up ProfileSyncService observer events
[chromium-blink-merge.git] / sync / internal_api / sync_manager_impl.cc
blob8ff4843763e91ece862be273e31e94fbcb28e1b7
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/internal_api/sync_manager_impl.h"
7 #include <string>
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/thread_task_runner_handle.h"
19 #include "base/values.h"
20 #include "sync/engine/sync_scheduler.h"
21 #include "sync/engine/syncer_types.h"
22 #include "sync/internal_api/change_reorder_buffer.h"
23 #include "sync/internal_api/public/base/cancelation_signal.h"
24 #include "sync/internal_api/public/base/invalidation_interface.h"
25 #include "sync/internal_api/public/base/model_type.h"
26 #include "sync/internal_api/public/base_node.h"
27 #include "sync/internal_api/public/configure_reason.h"
28 #include "sync/internal_api/public/engine/polling_constants.h"
29 #include "sync/internal_api/public/http_post_provider_factory.h"
30 #include "sync/internal_api/public/internal_components_factory.h"
31 #include "sync/internal_api/public/read_node.h"
32 #include "sync/internal_api/public/read_transaction.h"
33 #include "sync/internal_api/public/sync_context.h"
34 #include "sync/internal_api/public/sync_context_proxy.h"
35 #include "sync/internal_api/public/user_share.h"
36 #include "sync/internal_api/public/util/experiments.h"
37 #include "sync/internal_api/public/write_node.h"
38 #include "sync/internal_api/public/write_transaction.h"
39 #include "sync/internal_api/sync_context_proxy_impl.h"
40 #include "sync/internal_api/syncapi_internal.h"
41 #include "sync/internal_api/syncapi_server_connection_manager.h"
42 #include "sync/protocol/proto_value_conversions.h"
43 #include "sync/protocol/sync.pb.h"
44 #include "sync/sessions/directory_type_debug_info_emitter.h"
45 #include "sync/syncable/directory.h"
46 #include "sync/syncable/entry.h"
47 #include "sync/syncable/in_memory_directory_backing_store.h"
48 #include "sync/syncable/on_disk_directory_backing_store.h"
50 using base::TimeDelta;
51 using sync_pb::GetUpdatesCallerInfo;
53 class GURL;
55 namespace syncer {
57 using sessions::SyncSessionContext;
58 using syncable::ImmutableWriteTransactionInfo;
59 using syncable::SPECIFICS;
60 using syncable::UNIQUE_POSITION;
62 namespace {
64 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
65 ConfigureReason reason) {
66 switch (reason) {
67 case CONFIGURE_REASON_RECONFIGURATION:
68 return GetUpdatesCallerInfo::RECONFIGURATION;
69 case CONFIGURE_REASON_MIGRATION:
70 return GetUpdatesCallerInfo::MIGRATION;
71 case CONFIGURE_REASON_NEW_CLIENT:
72 return GetUpdatesCallerInfo::NEW_CLIENT;
73 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
74 case CONFIGURE_REASON_CRYPTO:
75 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
76 case CONFIGURE_REASON_PROGRAMMATIC:
77 return GetUpdatesCallerInfo::PROGRAMMATIC;
78 default:
79 NOTREACHED();
81 return GetUpdatesCallerInfo::UNKNOWN;
84 } // namespace
86 SyncManagerImpl::SyncManagerImpl(const std::string& name)
87 : name_(name),
88 change_delegate_(NULL),
89 initialized_(false),
90 observing_network_connectivity_changes_(false),
91 report_unrecoverable_error_function_(NULL),
92 weak_ptr_factory_(this) {
93 // Pre-fill |notification_info_map_|.
94 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
95 notification_info_map_.insert(
96 std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
100 SyncManagerImpl::~SyncManagerImpl() {
101 DCHECK(thread_checker_.CalledOnValidThread());
102 CHECK(!initialized_);
105 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
106 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
108 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
109 base::DictionaryValue* value = new base::DictionaryValue();
110 value->SetInteger("totalCount", total_count);
111 value->SetString("payload", payload);
112 return value;
115 bool SyncManagerImpl::VisiblePositionsDiffer(
116 const syncable::EntryKernelMutation& mutation) const {
117 const syncable::EntryKernel& a = mutation.original;
118 const syncable::EntryKernel& b = mutation.mutated;
119 if (!b.ShouldMaintainPosition())
120 return false;
121 if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
122 return true;
123 if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
124 return true;
125 return false;
128 bool SyncManagerImpl::VisiblePropertiesDiffer(
129 const syncable::EntryKernelMutation& mutation,
130 Cryptographer* cryptographer) const {
131 const syncable::EntryKernel& a = mutation.original;
132 const syncable::EntryKernel& b = mutation.mutated;
133 const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
134 const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
135 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
136 GetModelTypeFromSpecifics(b_specifics));
137 ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
138 // Suppress updates to items that aren't tracked by any browser model.
139 if (model_type < FIRST_REAL_MODEL_TYPE ||
140 !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
141 return false;
143 if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
144 return true;
145 if (!AreSpecificsEqual(cryptographer,
146 a.ref(syncable::SPECIFICS),
147 b.ref(syncable::SPECIFICS))) {
148 return true;
150 if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA),
151 b.ref(syncable::ATTACHMENT_METADATA))) {
152 return true;
154 // We only care if the name has changed if neither specifics is encrypted
155 // (encrypted nodes blow away the NON_UNIQUE_NAME).
156 if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
157 a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
158 return true;
159 if (VisiblePositionsDiffer(mutation))
160 return true;
161 return false;
164 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
165 return directory()->InitialSyncEndedTypes();
168 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
169 ModelTypeSet types) {
170 ModelTypeSet result;
171 for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
172 sync_pb::DataTypeProgressMarker marker;
173 directory()->GetDownloadProgress(i.Get(), &marker);
175 if (marker.token().empty())
176 result.Put(i.Get());
178 return result;
181 void SyncManagerImpl::ConfigureSyncer(
182 ConfigureReason reason,
183 ModelTypeSet to_download,
184 ModelTypeSet to_purge,
185 ModelTypeSet to_journal,
186 ModelTypeSet to_unapply,
187 const ModelSafeRoutingInfo& new_routing_info,
188 const base::Closure& ready_task,
189 const base::Closure& retry_task) {
190 DCHECK(thread_checker_.CalledOnValidThread());
191 DCHECK(!ready_task.is_null());
192 DCHECK(initialized_);
194 DVLOG(1) << "Configuring -"
195 << "\n\t" << "current types: "
196 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
197 << "\n\t" << "types to download: "
198 << ModelTypeSetToString(to_download)
199 << "\n\t" << "types to purge: "
200 << ModelTypeSetToString(to_purge)
201 << "\n\t" << "types to journal: "
202 << ModelTypeSetToString(to_journal)
203 << "\n\t" << "types to unapply: "
204 << ModelTypeSetToString(to_unapply);
205 if (!PurgeDisabledTypes(to_purge,
206 to_journal,
207 to_unapply)) {
208 // We failed to cleanup the types. Invoke the ready task without actually
209 // configuring any types. The caller should detect this as a configuration
210 // failure and act appropriately.
211 ready_task.Run();
212 return;
215 ConfigurationParams params(GetSourceFromReason(reason),
216 to_download,
217 new_routing_info,
218 ready_task,
219 retry_task);
221 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
222 scheduler_->ScheduleConfiguration(params);
225 void SyncManagerImpl::Init(InitArgs* args) {
226 CHECK(!initialized_);
227 DCHECK(thread_checker_.CalledOnValidThread());
228 DCHECK(args->post_factory.get());
229 DCHECK(!args->credentials.email.empty());
230 DCHECK(!args->credentials.sync_token.empty());
231 DCHECK(!args->credentials.scope_set.empty());
232 DCHECK(args->cancelation_signal);
233 DVLOG(1) << "SyncManager starting Init...";
235 weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
237 change_delegate_ = args->change_delegate;
239 AddObserver(&js_sync_manager_observer_);
240 SetJsEventHandler(args->event_handler);
242 AddObserver(&debug_info_event_listener_);
244 database_path_ = args->database_location.Append(
245 syncable::Directory::kSyncDatabaseFilename);
246 unrecoverable_error_handler_ = args->unrecoverable_error_handler.Pass();
247 report_unrecoverable_error_function_ =
248 args->report_unrecoverable_error_function;
250 allstatus_.SetHasKeystoreKey(
251 !args->restored_keystore_key_for_bootstrapping.empty());
252 sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
253 &share_,
254 args->encryptor,
255 args->restored_key_for_bootstrapping,
256 args->restored_keystore_key_for_bootstrapping));
257 sync_encryption_handler_->AddObserver(this);
258 sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
259 sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
261 base::FilePath absolute_db_path = database_path_;
262 DCHECK(absolute_db_path.IsAbsolute());
264 scoped_ptr<syncable::DirectoryBackingStore> backing_store =
265 args->internal_components_factory->BuildDirectoryBackingStore(
266 InternalComponentsFactory::STORAGE_ON_DISK,
267 args->credentials.email, absolute_db_path).Pass();
269 DCHECK(backing_store.get());
270 share_.directory.reset(
271 new syncable::Directory(
272 backing_store.release(),
273 unrecoverable_error_handler_.get(),
274 report_unrecoverable_error_function_,
275 sync_encryption_handler_.get(),
276 sync_encryption_handler_->GetCryptographerUnsafe()));
277 share_.sync_credentials = args->credentials;
279 // UserShare is accessible to a lot of code that doesn't need access to the
280 // sync token so clear sync_token from the UserShare.
281 share_.sync_credentials.sync_token = "";
283 const std::string& username = args->credentials.email;
284 DVLOG(1) << "Username: " << username;
285 if (!OpenDirectory(username)) {
286 NotifyInitializationFailure();
287 LOG(ERROR) << "Sync manager initialization failed!";
288 return;
291 connection_manager_.reset(new SyncAPIServerConnectionManager(
292 args->service_url.host() + args->service_url.path(),
293 args->service_url.EffectiveIntPort(),
294 args->service_url.SchemeIsSecure(),
295 args->post_factory.release(),
296 args->cancelation_signal));
297 connection_manager_->set_client_id(directory()->cache_guid());
298 connection_manager_->AddListener(this);
300 std::string sync_id = directory()->cache_guid();
302 DVLOG(1) << "Setting sync client ID: " << sync_id;
303 allstatus_.SetSyncId(sync_id);
304 DVLOG(1) << "Setting invalidator client ID: " << args->invalidator_client_id;
305 allstatus_.SetInvalidatorClientId(args->invalidator_client_id);
307 model_type_registry_.reset(
308 new ModelTypeRegistry(args->workers, directory(), this));
309 sync_encryption_handler_->AddObserver(model_type_registry_.get());
311 // Bind the SyncContext WeakPtr to this thread. This helps us crash earlier
312 // if the pointer is misused in debug mode.
313 base::WeakPtr<SyncContext> weak_core = model_type_registry_->AsWeakPtr();
314 weak_core.get();
316 sync_context_proxy_.reset(
317 new SyncContextProxyImpl(base::ThreadTaskRunnerHandle::Get(), weak_core));
319 // Build a SyncSessionContext and store the worker in it.
320 DVLOG(1) << "Sync is bringing up SyncSessionContext.";
321 std::vector<SyncEngineEventListener*> listeners;
322 listeners.push_back(&allstatus_);
323 listeners.push_back(this);
324 session_context_ =
325 args->internal_components_factory->BuildContext(
326 connection_manager_.get(),
327 directory(),
328 args->extensions_activity,
329 listeners,
330 &debug_info_event_listener_,
331 model_type_registry_.get(),
332 args->invalidator_client_id)
333 .Pass();
334 session_context_->set_account_name(args->credentials.email);
335 scheduler_ = args->internal_components_factory->BuildScheduler(
336 name_, session_context_.get(), args->cancelation_signal).Pass();
338 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
340 initialized_ = true;
342 net::NetworkChangeNotifier::AddIPAddressObserver(this);
343 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
344 observing_network_connectivity_changes_ = true;
346 UpdateCredentials(args->credentials);
348 NotifyInitializationSuccess();
351 void SyncManagerImpl::NotifyInitializationSuccess() {
352 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
353 OnInitializationComplete(
354 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
355 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
356 true, InitialSyncEndedTypes()));
359 void SyncManagerImpl::NotifyInitializationFailure() {
360 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
361 OnInitializationComplete(
362 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
363 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
364 false, ModelTypeSet()));
367 void SyncManagerImpl::OnPassphraseRequired(
368 PassphraseRequiredReason reason,
369 const sync_pb::EncryptedData& pending_keys) {
370 // Does nothing.
373 void SyncManagerImpl::OnPassphraseAccepted() {
374 // Does nothing.
377 void SyncManagerImpl::OnBootstrapTokenUpdated(
378 const std::string& bootstrap_token,
379 BootstrapTokenType type) {
380 if (type == KEYSTORE_BOOTSTRAP_TOKEN)
381 allstatus_.SetHasKeystoreKey(true);
384 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
385 bool encrypt_everything) {
386 allstatus_.SetEncryptedTypes(encrypted_types);
389 void SyncManagerImpl::OnEncryptionComplete() {
390 // Does nothing.
393 void SyncManagerImpl::OnCryptographerStateChanged(
394 Cryptographer* cryptographer) {
395 allstatus_.SetCryptographerReady(cryptographer->is_ready());
396 allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
397 allstatus_.SetKeystoreMigrationTime(
398 sync_encryption_handler_->migration_time());
401 void SyncManagerImpl::OnPassphraseTypeChanged(
402 PassphraseType type,
403 base::Time explicit_passphrase_time) {
404 allstatus_.SetPassphraseType(type);
405 allstatus_.SetKeystoreMigrationTime(
406 sync_encryption_handler_->migration_time());
409 void SyncManagerImpl::StartSyncingNormally(
410 const ModelSafeRoutingInfo& routing_info) {
411 // Start the sync scheduler.
412 // TODO(sync): We always want the newest set of routes when we switch back
413 // to normal mode. Figure out how to enforce set_routing_info is always
414 // appropriately set and that it's only modified when switching to normal
415 // mode.
416 DCHECK(thread_checker_.CalledOnValidThread());
417 session_context_->SetRoutingInfo(routing_info);
418 scheduler_->Start(SyncScheduler::NORMAL_MODE);
421 syncable::Directory* SyncManagerImpl::directory() {
422 return share_.directory.get();
425 const SyncScheduler* SyncManagerImpl::scheduler() const {
426 return scheduler_.get();
429 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
430 return connection_manager_->HasInvalidAuthToken();
433 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
434 DCHECK(!initialized_) << "Should only happen once";
436 // Set before Open().
437 change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
438 WeakHandle<syncable::TransactionObserver> transaction_observer(
439 MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
441 syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
442 open_result = directory()->Open(username, this, transaction_observer);
443 if (open_result != syncable::OPENED) {
444 LOG(ERROR) << "Could not open share for:" << username;
445 return false;
448 // Unapplied datatypes (those that do not have initial sync ended set) get
449 // re-downloaded during any configuration. But, it's possible for a datatype
450 // to have a progress marker but not have initial sync ended yet, making
451 // it a candidate for migration. This is a problem, as the DataTypeManager
452 // does not support a migration while it's already in the middle of a
453 // configuration. As a result, any partially synced datatype can stall the
454 // DTM, waiting for the configuration to complete, which it never will due
455 // to the migration error. In addition, a partially synced nigori will
456 // trigger the migration logic before the backend is initialized, resulting
457 // in crashes. We therefore detect and purge any partially synced types as
458 // part of initialization.
459 if (!PurgePartiallySyncedTypes())
460 return false;
462 return true;
465 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
466 ModelTypeSet partially_synced_types = ModelTypeSet::All();
467 partially_synced_types.RemoveAll(InitialSyncEndedTypes());
468 partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
469 ModelTypeSet::All()));
471 DVLOG(1) << "Purging partially synced types "
472 << ModelTypeSetToString(partially_synced_types);
473 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
474 partially_synced_types.Size());
475 if (partially_synced_types.Empty())
476 return true;
477 return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
478 ModelTypeSet(),
479 ModelTypeSet());
482 bool SyncManagerImpl::PurgeDisabledTypes(
483 ModelTypeSet to_purge,
484 ModelTypeSet to_journal,
485 ModelTypeSet to_unapply) {
486 if (to_purge.Empty())
487 return true;
488 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
489 DCHECK(to_purge.HasAll(to_journal));
490 DCHECK(to_purge.HasAll(to_unapply));
491 return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
494 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
495 DCHECK(thread_checker_.CalledOnValidThread());
496 DCHECK(initialized_);
497 DCHECK(!credentials.email.empty());
498 DCHECK(!credentials.sync_token.empty());
499 DCHECK(!credentials.scope_set.empty());
501 observing_network_connectivity_changes_ = true;
502 if (!connection_manager_->SetAuthToken(credentials.sync_token))
503 return; // Auth token is known to be invalid, so exit early.
505 scheduler_->OnCredentialsUpdated();
507 // TODO(zea): pass the credential age to the debug info event listener.
510 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
511 DCHECK(thread_checker_.CalledOnValidThread());
512 observers_.AddObserver(observer);
515 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
516 DCHECK(thread_checker_.CalledOnValidThread());
517 observers_.RemoveObserver(observer);
520 void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) {
521 DCHECK(thread_checker_.CalledOnValidThread());
523 // Prevent any in-flight method calls from running. Also
524 // invalidates |weak_handle_this_| and |change_observer_|.
525 weak_ptr_factory_.InvalidateWeakPtrs();
526 js_mutation_event_observer_.InvalidateWeakPtrs();
528 scheduler_.reset();
529 session_context_.reset();
531 if (model_type_registry_)
532 sync_encryption_handler_->RemoveObserver(model_type_registry_.get());
534 model_type_registry_.reset();
536 if (sync_encryption_handler_) {
537 sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
538 sync_encryption_handler_->RemoveObserver(this);
541 SetJsEventHandler(WeakHandle<JsEventHandler>());
542 RemoveObserver(&js_sync_manager_observer_);
544 RemoveObserver(&debug_info_event_listener_);
546 // |connection_manager_| may end up being NULL here in tests (in synchronous
547 // initialization mode).
549 // TODO(akalin): Fix this behavior.
550 if (connection_manager_)
551 connection_manager_->RemoveListener(this);
552 connection_manager_.reset();
554 net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
555 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
556 observing_network_connectivity_changes_ = false;
558 if (initialized_ && directory()) {
559 directory()->SaveChanges();
562 share_.directory.reset();
564 change_delegate_ = NULL;
566 initialized_ = false;
568 // We reset these here, since only now we know they will not be
569 // accessed from other threads (since we shut down everything).
570 change_observer_.Reset();
571 weak_handle_this_.Reset();
574 void SyncManagerImpl::OnIPAddressChanged() {
575 if (!observing_network_connectivity_changes_) {
576 DVLOG(1) << "IP address change dropped.";
577 return;
579 DVLOG(1) << "IP address change detected.";
580 OnNetworkConnectivityChangedImpl();
583 void SyncManagerImpl::OnConnectionTypeChanged(
584 net::NetworkChangeNotifier::ConnectionType) {
585 if (!observing_network_connectivity_changes_) {
586 DVLOG(1) << "Connection type change dropped.";
587 return;
589 DVLOG(1) << "Connection type change detected.";
590 OnNetworkConnectivityChangedImpl();
593 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
594 DCHECK(thread_checker_.CalledOnValidThread());
595 scheduler_->OnConnectionStatusChange();
598 void SyncManagerImpl::OnServerConnectionEvent(
599 const ServerConnectionEvent& event) {
600 DCHECK(thread_checker_.CalledOnValidThread());
601 if (event.connection_code ==
602 HttpResponse::SERVER_CONNECTION_OK) {
603 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
604 OnConnectionStatusChange(CONNECTION_OK));
607 if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
608 observing_network_connectivity_changes_ = false;
609 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
610 OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
613 if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
614 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
615 OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
619 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
620 ModelTypeSet models_with_changes) {
621 // This notification happens immediately after the transaction mutex is
622 // released. This allows work to be performed without blocking other threads
623 // from acquiring a transaction.
624 if (!change_delegate_)
625 return;
627 // Call commit.
628 for (ModelTypeSet::Iterator it = models_with_changes.First();
629 it.Good(); it.Inc()) {
630 change_delegate_->OnChangesComplete(it.Get());
631 change_observer_.Call(
632 FROM_HERE,
633 &SyncManager::ChangeObserver::OnChangesComplete,
634 it.Get());
638 ModelTypeSet
639 SyncManagerImpl::HandleTransactionEndingChangeEvent(
640 const ImmutableWriteTransactionInfo& write_transaction_info,
641 syncable::BaseTransaction* trans) {
642 // This notification happens immediately before a syncable WriteTransaction
643 // falls out of scope. It happens while the channel mutex is still held,
644 // and while the transaction mutex is held, so it cannot be re-entrant.
645 if (!change_delegate_ || change_records_.empty())
646 return ModelTypeSet();
648 // This will continue the WriteTransaction using a read only wrapper.
649 // This is the last chance for read to occur in the WriteTransaction
650 // that's closing. This special ReadTransaction will not close the
651 // underlying transaction.
652 ReadTransaction read_trans(GetUserShare(), trans);
654 ModelTypeSet models_with_changes;
655 for (ChangeRecordMap::const_iterator it = change_records_.begin();
656 it != change_records_.end(); ++it) {
657 DCHECK(!it->second.Get().empty());
658 ModelType type = ModelTypeFromInt(it->first);
659 change_delegate_->
660 OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
661 &read_trans, it->second);
662 change_observer_.Call(FROM_HERE,
663 &SyncManager::ChangeObserver::OnChangesApplied,
664 type, write_transaction_info.Get().id, it->second);
665 models_with_changes.Put(type);
667 change_records_.clear();
668 return models_with_changes;
671 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
672 const ImmutableWriteTransactionInfo& write_transaction_info,
673 syncable::BaseTransaction* trans,
674 std::vector<int64>* entries_changed) {
675 // We have been notified about a user action changing a sync model.
676 LOG_IF(WARNING, !change_records_.empty()) <<
677 "CALCULATE_CHANGES called with unapplied old changes.";
679 // The mutated model type, or UNSPECIFIED if nothing was mutated.
680 ModelTypeSet mutated_model_types;
682 const syncable::ImmutableEntryKernelMutationMap& mutations =
683 write_transaction_info.Get().mutations;
684 for (syncable::EntryKernelMutationMap::const_iterator it =
685 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
686 if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
687 continue;
690 ModelType model_type =
691 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
692 if (model_type < FIRST_REAL_MODEL_TYPE) {
693 NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
694 continue;
697 // Found real mutation.
698 if (model_type != UNSPECIFIED) {
699 mutated_model_types.Put(model_type);
700 entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
704 // Nudge if necessary.
705 if (!mutated_model_types.Empty()) {
706 if (weak_handle_this_.IsInitialized()) {
707 weak_handle_this_.Call(FROM_HERE,
708 &SyncManagerImpl::RequestNudgeForDataTypes,
709 FROM_HERE,
710 mutated_model_types);
711 } else {
712 NOTREACHED();
717 void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
718 ModelType type, ChangeReorderBuffer* buffer,
719 Cryptographer* cryptographer, const syncable::EntryKernel& original,
720 bool existed_before, bool exists_now) {
721 // If this is a deletion and the datatype was encrypted, we need to decrypt it
722 // and attach it to the buffer.
723 if (!exists_now && existed_before) {
724 sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
725 if (type == PASSWORDS) {
726 // Passwords must use their own legacy ExtraPasswordChangeRecordData.
727 scoped_ptr<sync_pb::PasswordSpecificsData> data(
728 DecryptPasswordSpecifics(original_specifics, cryptographer));
729 if (!data) {
730 NOTREACHED();
731 return;
733 buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
734 } else if (original_specifics.has_encrypted()) {
735 // All other datatypes can just create a new unencrypted specifics and
736 // attach it.
737 const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
738 if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
739 NOTREACHED();
740 return;
743 buffer->SetSpecificsForId(id, original_specifics);
747 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
748 const ImmutableWriteTransactionInfo& write_transaction_info,
749 syncable::BaseTransaction* trans,
750 std::vector<int64>* entries_changed) {
751 // We only expect one notification per sync step, so change_buffers_ should
752 // contain no pending entries.
753 LOG_IF(WARNING, !change_records_.empty()) <<
754 "CALCULATE_CHANGES called with unapplied old changes.";
756 ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
758 Cryptographer* crypto = directory()->GetCryptographer(trans);
759 const syncable::ImmutableEntryKernelMutationMap& mutations =
760 write_transaction_info.Get().mutations;
761 for (syncable::EntryKernelMutationMap::const_iterator it =
762 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
763 bool existed_before = !it->second.original.ref(syncable::IS_DEL);
764 bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
766 // Omit items that aren't associated with a model.
767 ModelType type =
768 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
769 if (type < FIRST_REAL_MODEL_TYPE)
770 continue;
772 int64 handle = it->first;
773 if (exists_now && !existed_before)
774 change_buffers[type].PushAddedItem(handle);
775 else if (!exists_now && existed_before)
776 change_buffers[type].PushDeletedItem(handle);
777 else if (exists_now && existed_before &&
778 VisiblePropertiesDiffer(it->second, crypto)) {
779 change_buffers[type].PushUpdatedItem(handle);
782 SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
783 it->second.original, existed_before, exists_now);
786 ReadTransaction read_trans(GetUserShare(), trans);
787 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
788 if (!change_buffers[i].IsEmpty()) {
789 if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
790 &(change_records_[i]))) {
791 for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
792 entries_changed->push_back((change_records_[i].Get())[j].id);
794 if (change_records_[i].Get().empty())
795 change_records_.erase(i);
800 void SyncManagerImpl::RequestNudgeForDataTypes(
801 const tracked_objects::Location& nudge_location,
802 ModelTypeSet types) {
803 debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
805 scheduler_->ScheduleLocalNudge(types, nudge_location);
808 void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) {
809 DCHECK(thread_checker_.CalledOnValidThread());
810 scheduler_->ScheduleInitialSyncNudge(type);
813 void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) {
814 DCHECK(thread_checker_.CalledOnValidThread());
815 RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type));
818 void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) {
819 DCHECK(thread_checker_.CalledOnValidThread());
820 RefreshTypes(ModelTypeSet(type));
823 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
824 DCHECK(thread_checker_.CalledOnValidThread());
825 // Only send an event if this is due to a cycle ending and this cycle
826 // concludes a canonical "sync" process; that is, based on what is known
827 // locally we are "all happy" and up-to-date. There may be new changes on
828 // the server, but we'll get them on a subsequent sync.
830 // Notifications are sent at the end of every sync cycle, regardless of
831 // whether we should sync again.
832 if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
833 if (!initialized_) {
834 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
835 << "initialized";
836 return;
839 DVLOG(1) << "Sending OnSyncCycleCompleted";
840 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
841 OnSyncCycleCompleted(event.snapshot));
845 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
846 FOR_EACH_OBSERVER(
847 SyncManager::Observer, observers_,
848 OnActionableError(error));
851 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
853 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
855 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
856 FOR_EACH_OBSERVER(
857 SyncManager::Observer, observers_,
858 OnMigrationRequested(types));
861 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
862 protocol_event_buffer_.RecordProtocolEvent(event);
863 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
864 OnProtocolEvent(event));
867 void SyncManagerImpl::SetJsEventHandler(
868 const WeakHandle<JsEventHandler>& event_handler) {
869 js_sync_manager_observer_.SetJsEventHandler(event_handler);
870 js_mutation_event_observer_.SetJsEventHandler(event_handler);
871 js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
874 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType(
875 syncer::ModelType type) {
876 DirectoryTypeDebugInfoEmitterMap* emitter_map =
877 model_type_registry_->directory_type_debug_info_emitter_map();
878 DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type);
880 if (it == emitter_map->end()) {
881 // This can happen in some cases. The UI thread makes requests of us
882 // when it doesn't really know which types are enabled or disabled.
883 DLOG(WARNING) << "Asked to return debug info for invalid type "
884 << ModelTypeToString(type);
885 return scoped_ptr<base::ListValue>(new base::ListValue());
888 return it->second->GetAllNodes();
891 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) {
892 DCHECK(thread_checker_.CalledOnValidThread());
894 DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled;
895 allstatus_.SetNotificationsEnabled(invalidator_enabled);
896 scheduler_->SetNotificationsEnabled(invalidator_enabled);
899 void SyncManagerImpl::OnIncomingInvalidation(
900 syncer::ModelType type,
901 scoped_ptr<InvalidationInterface> invalidation) {
902 DCHECK(thread_checker_.CalledOnValidThread());
904 scheduler_->ScheduleInvalidationNudge(
905 type,
906 invalidation.Pass(),
907 FROM_HERE);
910 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
911 DCHECK(thread_checker_.CalledOnValidThread());
912 if (types.Empty()) {
913 LOG(WARNING) << "Sync received refresh request with no types specified.";
914 } else {
915 scheduler_->ScheduleLocalRefreshRequest(
916 types, FROM_HERE);
920 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
921 return allstatus_.status();
924 void SyncManagerImpl::SaveChanges() {
925 directory()->SaveChanges();
928 UserShare* SyncManagerImpl::GetUserShare() {
929 DCHECK(initialized_);
930 return &share_;
933 syncer::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() {
934 DCHECK(initialized_);
935 return sync_context_proxy_.get();
938 const std::string SyncManagerImpl::cache_guid() {
939 DCHECK(initialized_);
940 return directory()->cache_guid();
943 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
944 ReadTransaction trans(FROM_HERE, GetUserShare());
945 ReadNode nigori_node(&trans);
946 if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) {
947 DVLOG(1) << "Couldn't find Nigori node.";
948 return false;
950 bool found_experiment = false;
952 ReadNode favicon_sync_node(&trans);
953 if (favicon_sync_node.InitByClientTagLookup(
954 syncer::EXPERIMENTS,
955 syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
956 experiments->favicon_sync_limit =
957 favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
958 favicon_sync_limit();
959 found_experiment = true;
962 ReadNode pre_commit_update_avoidance_node(&trans);
963 if (pre_commit_update_avoidance_node.InitByClientTagLookup(
964 syncer::EXPERIMENTS,
965 syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
966 session_context_->set_server_enabled_pre_commit_update_avoidance(
967 pre_commit_update_avoidance_node.GetExperimentsSpecifics().
968 pre_commit_update_avoidance().enabled());
969 // We don't bother setting found_experiment. The frontend doesn't need to
970 // know about this.
973 ReadNode gcm_channel_node(&trans);
974 if (gcm_channel_node.InitByClientTagLookup(
975 syncer::EXPERIMENTS,
976 syncer::kGCMChannelTag) == BaseNode::INIT_OK &&
977 gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) {
978 experiments->gcm_channel_state =
979 (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ?
980 syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED);
981 found_experiment = true;
984 ReadNode enhanced_bookmarks_node(&trans);
985 if (enhanced_bookmarks_node.InitByClientTagLookup(
986 syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) ==
987 BaseNode::INIT_OK &&
988 enhanced_bookmarks_node.GetExperimentsSpecifics()
989 .has_enhanced_bookmarks()) {
990 const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks =
991 enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks();
992 if (enhanced_bookmarks.has_enabled())
993 experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled();
994 if (enhanced_bookmarks.has_extension_id()) {
995 experiments->enhanced_bookmarks_ext_id =
996 enhanced_bookmarks.extension_id();
998 found_experiment = true;
1001 ReadNode gcm_invalidations_node(&trans);
1002 if (gcm_invalidations_node.InitByClientTagLookup(
1003 syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) ==
1004 BaseNode::INIT_OK) {
1005 const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
1006 gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
1007 if (gcm_invalidations.has_enabled()) {
1008 experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
1009 found_experiment = true;
1013 return found_experiment;
1016 bool SyncManagerImpl::HasUnsyncedItems() {
1017 ReadTransaction trans(FROM_HERE, GetUserShare());
1018 return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1021 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
1022 return sync_encryption_handler_.get();
1025 ScopedVector<syncer::ProtocolEvent>
1026 SyncManagerImpl::GetBufferedProtocolEvents() {
1027 return protocol_event_buffer_.GetBufferedProtocolEvents();
1030 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
1031 syncer::TypeDebugInfoObserver* observer) {
1032 model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer);
1035 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
1036 syncer::TypeDebugInfoObserver* observer) {
1037 model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer);
1040 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
1041 syncer::TypeDebugInfoObserver* observer) {
1042 return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer);
1045 void SyncManagerImpl::RequestEmitDebugInfo() {
1046 model_type_registry_->RequestEmitDebugInfo();
1049 } // namespace syncer