Fix closure compile error.
[chromium-blink-merge.git] / sync / internal_api / sync_manager_impl.cc
blob86033ee9f9e93ccd9d6ad9cb7f9c8444112d21d0
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/internal_api/sync_manager_impl.h"
7 #include <string>
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/thread_task_runner_handle.h"
19 #include "base/values.h"
20 #include "sync/engine/sync_scheduler.h"
21 #include "sync/engine/syncer_types.h"
22 #include "sync/internal_api/change_reorder_buffer.h"
23 #include "sync/internal_api/public/base/cancelation_signal.h"
24 #include "sync/internal_api/public/base/invalidation_interface.h"
25 #include "sync/internal_api/public/base/model_type.h"
26 #include "sync/internal_api/public/base_node.h"
27 #include "sync/internal_api/public/configure_reason.h"
28 #include "sync/internal_api/public/engine/polling_constants.h"
29 #include "sync/internal_api/public/http_post_provider_factory.h"
30 #include "sync/internal_api/public/internal_components_factory.h"
31 #include "sync/internal_api/public/read_node.h"
32 #include "sync/internal_api/public/read_transaction.h"
33 #include "sync/internal_api/public/sync_context.h"
34 #include "sync/internal_api/public/sync_context_proxy.h"
35 #include "sync/internal_api/public/user_share.h"
36 #include "sync/internal_api/public/util/experiments.h"
37 #include "sync/internal_api/public/write_node.h"
38 #include "sync/internal_api/public/write_transaction.h"
39 #include "sync/internal_api/sync_context_proxy_impl.h"
40 #include "sync/internal_api/syncapi_internal.h"
41 #include "sync/internal_api/syncapi_server_connection_manager.h"
42 #include "sync/protocol/proto_value_conversions.h"
43 #include "sync/protocol/sync.pb.h"
44 #include "sync/sessions/directory_type_debug_info_emitter.h"
45 #include "sync/syncable/directory.h"
46 #include "sync/syncable/entry.h"
47 #include "sync/syncable/in_memory_directory_backing_store.h"
48 #include "sync/syncable/on_disk_directory_backing_store.h"
50 using base::TimeDelta;
51 using sync_pb::GetUpdatesCallerInfo;
53 class GURL;
55 namespace syncer {
57 using sessions::SyncSessionContext;
58 using syncable::ImmutableWriteTransactionInfo;
59 using syncable::SPECIFICS;
60 using syncable::UNIQUE_POSITION;
62 namespace {
64 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
65 ConfigureReason reason) {
66 switch (reason) {
67 case CONFIGURE_REASON_RECONFIGURATION:
68 return GetUpdatesCallerInfo::RECONFIGURATION;
69 case CONFIGURE_REASON_MIGRATION:
70 return GetUpdatesCallerInfo::MIGRATION;
71 case CONFIGURE_REASON_NEW_CLIENT:
72 return GetUpdatesCallerInfo::NEW_CLIENT;
73 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
74 case CONFIGURE_REASON_CRYPTO:
75 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
76 case CONFIGURE_REASON_PROGRAMMATIC:
77 return GetUpdatesCallerInfo::PROGRAMMATIC;
78 default:
79 NOTREACHED();
81 return GetUpdatesCallerInfo::UNKNOWN;
84 } // namespace
86 SyncManagerImpl::SyncManagerImpl(const std::string& name)
87 : name_(name),
88 change_delegate_(NULL),
89 initialized_(false),
90 observing_network_connectivity_changes_(false),
91 weak_ptr_factory_(this) {
92 // Pre-fill |notification_info_map_|.
93 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
94 notification_info_map_.insert(
95 std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
99 SyncManagerImpl::~SyncManagerImpl() {
100 DCHECK(thread_checker_.CalledOnValidThread());
101 CHECK(!initialized_);
104 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
105 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
107 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
108 base::DictionaryValue* value = new base::DictionaryValue();
109 value->SetInteger("totalCount", total_count);
110 value->SetString("payload", payload);
111 return value;
114 bool SyncManagerImpl::VisiblePositionsDiffer(
115 const syncable::EntryKernelMutation& mutation) const {
116 const syncable::EntryKernel& a = mutation.original;
117 const syncable::EntryKernel& b = mutation.mutated;
118 if (!b.ShouldMaintainPosition())
119 return false;
120 if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
121 return true;
122 if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
123 return true;
124 return false;
127 bool SyncManagerImpl::VisiblePropertiesDiffer(
128 const syncable::EntryKernelMutation& mutation,
129 Cryptographer* cryptographer) const {
130 const syncable::EntryKernel& a = mutation.original;
131 const syncable::EntryKernel& b = mutation.mutated;
132 const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
133 const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
134 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
135 GetModelTypeFromSpecifics(b_specifics));
136 ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
137 // Suppress updates to items that aren't tracked by any browser model.
138 if (model_type < FIRST_REAL_MODEL_TYPE ||
139 !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
140 return false;
142 if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
143 return true;
144 if (!AreSpecificsEqual(cryptographer,
145 a.ref(syncable::SPECIFICS),
146 b.ref(syncable::SPECIFICS))) {
147 return true;
149 if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA),
150 b.ref(syncable::ATTACHMENT_METADATA))) {
151 return true;
153 // We only care if the name has changed if neither specifics is encrypted
154 // (encrypted nodes blow away the NON_UNIQUE_NAME).
155 if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
156 a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
157 return true;
158 if (VisiblePositionsDiffer(mutation))
159 return true;
160 return false;
163 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
164 return directory()->InitialSyncEndedTypes();
167 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
168 ModelTypeSet types) {
169 ModelTypeSet result;
170 for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
171 sync_pb::DataTypeProgressMarker marker;
172 directory()->GetDownloadProgress(i.Get(), &marker);
174 if (marker.token().empty())
175 result.Put(i.Get());
177 return result;
180 void SyncManagerImpl::ConfigureSyncer(
181 ConfigureReason reason,
182 ModelTypeSet to_download,
183 ModelTypeSet to_purge,
184 ModelTypeSet to_journal,
185 ModelTypeSet to_unapply,
186 const ModelSafeRoutingInfo& new_routing_info,
187 const base::Closure& ready_task,
188 const base::Closure& retry_task) {
189 DCHECK(thread_checker_.CalledOnValidThread());
190 DCHECK(!ready_task.is_null());
191 DCHECK(initialized_);
193 DVLOG(1) << "Configuring -"
194 << "\n\t" << "current types: "
195 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
196 << "\n\t" << "types to download: "
197 << ModelTypeSetToString(to_download)
198 << "\n\t" << "types to purge: "
199 << ModelTypeSetToString(to_purge)
200 << "\n\t" << "types to journal: "
201 << ModelTypeSetToString(to_journal)
202 << "\n\t" << "types to unapply: "
203 << ModelTypeSetToString(to_unapply);
204 if (!PurgeDisabledTypes(to_purge,
205 to_journal,
206 to_unapply)) {
207 // We failed to cleanup the types. Invoke the ready task without actually
208 // configuring any types. The caller should detect this as a configuration
209 // failure and act appropriately.
210 ready_task.Run();
211 return;
214 ConfigurationParams params(GetSourceFromReason(reason),
215 to_download,
216 new_routing_info,
217 ready_task,
218 retry_task);
220 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE, base::Time());
221 scheduler_->ScheduleConfiguration(params);
224 void SyncManagerImpl::Init(InitArgs* args) {
225 CHECK(!initialized_);
226 DCHECK(thread_checker_.CalledOnValidThread());
227 DCHECK(args->post_factory.get());
228 DCHECK(!args->credentials.email.empty());
229 DCHECK(!args->credentials.sync_token.empty());
230 DCHECK(!args->credentials.scope_set.empty());
231 DCHECK(args->cancelation_signal);
232 DVLOG(1) << "SyncManager starting Init...";
234 weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
236 change_delegate_ = args->change_delegate;
238 AddObserver(&js_sync_manager_observer_);
239 SetJsEventHandler(args->event_handler);
241 AddObserver(&debug_info_event_listener_);
243 database_path_ = args->database_location.Append(
244 syncable::Directory::kSyncDatabaseFilename);
245 report_unrecoverable_error_function_ =
246 args->report_unrecoverable_error_function;
248 allstatus_.SetHasKeystoreKey(
249 !args->restored_keystore_key_for_bootstrapping.empty());
250 sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
251 &share_, args->encryptor, args->restored_key_for_bootstrapping,
252 args->restored_keystore_key_for_bootstrapping, args->clear_data_option));
253 sync_encryption_handler_->AddObserver(this);
254 sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
255 sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
257 base::FilePath absolute_db_path = database_path_;
258 DCHECK(absolute_db_path.IsAbsolute());
260 scoped_ptr<syncable::DirectoryBackingStore> backing_store =
261 args->internal_components_factory->BuildDirectoryBackingStore(
262 InternalComponentsFactory::STORAGE_ON_DISK,
263 args->credentials.email, absolute_db_path).Pass();
265 DCHECK(backing_store.get());
266 share_.directory.reset(
267 new syncable::Directory(
268 backing_store.release(),
269 args->unrecoverable_error_handler,
270 report_unrecoverable_error_function_,
271 sync_encryption_handler_.get(),
272 sync_encryption_handler_->GetCryptographerUnsafe()));
273 share_.sync_credentials = args->credentials;
275 // UserShare is accessible to a lot of code that doesn't need access to the
276 // sync token so clear sync_token from the UserShare.
277 share_.sync_credentials.sync_token = "";
279 const std::string& username = args->credentials.email;
280 DVLOG(1) << "Username: " << username;
281 if (!OpenDirectory(username)) {
282 NotifyInitializationFailure();
283 LOG(ERROR) << "Sync manager initialization failed!";
284 return;
287 // Now that we have opened the Directory we can restore any previously saved
288 // nigori specifics.
289 if (args->saved_nigori_state) {
290 sync_encryption_handler_->RestoreNigori(*args->saved_nigori_state);
291 args->saved_nigori_state.reset();
294 connection_manager_.reset(new SyncAPIServerConnectionManager(
295 args->service_url.host() + args->service_url.path(),
296 args->service_url.EffectiveIntPort(),
297 args->service_url.SchemeIsCryptographic(), args->post_factory.release(),
298 args->cancelation_signal));
299 connection_manager_->set_client_id(directory()->cache_guid());
300 connection_manager_->AddListener(this);
302 std::string sync_id = directory()->cache_guid();
304 DVLOG(1) << "Setting sync client ID: " << sync_id;
305 allstatus_.SetSyncId(sync_id);
306 DVLOG(1) << "Setting invalidator client ID: " << args->invalidator_client_id;
307 allstatus_.SetInvalidatorClientId(args->invalidator_client_id);
309 model_type_registry_.reset(
310 new ModelTypeRegistry(args->workers, directory(), this));
311 sync_encryption_handler_->AddObserver(model_type_registry_.get());
313 // Bind the SyncContext WeakPtr to this thread. This helps us crash earlier
314 // if the pointer is misused in debug mode.
315 base::WeakPtr<syncer_v2::SyncContext> weak_core =
316 model_type_registry_->AsWeakPtr();
317 weak_core.get();
319 sync_context_proxy_.reset(new syncer_v2::SyncContextProxyImpl(
320 base::ThreadTaskRunnerHandle::Get(), weak_core));
322 // Build a SyncSessionContext and store the worker in it.
323 DVLOG(1) << "Sync is bringing up SyncSessionContext.";
324 std::vector<SyncEngineEventListener*> listeners;
325 listeners.push_back(&allstatus_);
326 listeners.push_back(this);
327 session_context_ =
328 args->internal_components_factory->BuildContext(
329 connection_manager_.get(),
330 directory(),
331 args->extensions_activity,
332 listeners,
333 &debug_info_event_listener_,
334 model_type_registry_.get(),
335 args->invalidator_client_id)
336 .Pass();
337 session_context_->set_account_name(args->credentials.email);
338 scheduler_ = args->internal_components_factory->BuildScheduler(
339 name_, session_context_.get(), args->cancelation_signal).Pass();
341 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE, base::Time());
343 initialized_ = true;
345 net::NetworkChangeNotifier::AddIPAddressObserver(this);
346 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
347 observing_network_connectivity_changes_ = true;
349 UpdateCredentials(args->credentials);
351 NotifyInitializationSuccess();
354 void SyncManagerImpl::NotifyInitializationSuccess() {
355 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
356 OnInitializationComplete(
357 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
358 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
359 true, InitialSyncEndedTypes()));
362 void SyncManagerImpl::NotifyInitializationFailure() {
363 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
364 OnInitializationComplete(
365 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
366 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
367 false, ModelTypeSet()));
370 void SyncManagerImpl::OnPassphraseRequired(
371 PassphraseRequiredReason reason,
372 const sync_pb::EncryptedData& pending_keys) {
373 // Does nothing.
376 void SyncManagerImpl::OnPassphraseAccepted() {
377 // Does nothing.
380 void SyncManagerImpl::OnBootstrapTokenUpdated(
381 const std::string& bootstrap_token,
382 BootstrapTokenType type) {
383 if (type == KEYSTORE_BOOTSTRAP_TOKEN)
384 allstatus_.SetHasKeystoreKey(true);
387 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
388 bool encrypt_everything) {
389 allstatus_.SetEncryptedTypes(encrypted_types);
392 void SyncManagerImpl::OnEncryptionComplete() {
393 // Does nothing.
396 void SyncManagerImpl::OnCryptographerStateChanged(
397 Cryptographer* cryptographer) {
398 allstatus_.SetCryptographerReady(cryptographer->is_ready());
399 allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
400 allstatus_.SetKeystoreMigrationTime(
401 sync_encryption_handler_->migration_time());
404 void SyncManagerImpl::OnPassphraseTypeChanged(
405 PassphraseType type,
406 base::Time explicit_passphrase_time) {
407 allstatus_.SetPassphraseType(type);
408 allstatus_.SetKeystoreMigrationTime(
409 sync_encryption_handler_->migration_time());
412 void SyncManagerImpl::OnLocalSetPassphraseEncryption(
413 const SyncEncryptionHandler::NigoriState& nigori_state) {
416 void SyncManagerImpl::StartSyncingNormally(
417 const ModelSafeRoutingInfo& routing_info,
418 base::Time last_poll_time) {
419 // Start the sync scheduler.
420 // TODO(sync): We always want the newest set of routes when we switch back
421 // to normal mode. Figure out how to enforce set_routing_info is always
422 // appropriately set and that it's only modified when switching to normal
423 // mode.
424 DCHECK(thread_checker_.CalledOnValidThread());
425 session_context_->SetRoutingInfo(routing_info);
426 scheduler_->Start(SyncScheduler::NORMAL_MODE,
427 last_poll_time);
430 syncable::Directory* SyncManagerImpl::directory() {
431 return share_.directory.get();
434 const SyncScheduler* SyncManagerImpl::scheduler() const {
435 return scheduler_.get();
438 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
439 return connection_manager_->HasInvalidAuthToken();
442 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
443 DCHECK(!initialized_) << "Should only happen once";
445 // Set before Open().
446 change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
447 WeakHandle<syncable::TransactionObserver> transaction_observer(
448 MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
450 syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
451 open_result = directory()->Open(username, this, transaction_observer);
452 if (open_result != syncable::OPENED) {
453 LOG(ERROR) << "Could not open share for:" << username;
454 return false;
457 // Unapplied datatypes (those that do not have initial sync ended set) get
458 // re-downloaded during any configuration. But, it's possible for a datatype
459 // to have a progress marker but not have initial sync ended yet, making
460 // it a candidate for migration. This is a problem, as the DataTypeManager
461 // does not support a migration while it's already in the middle of a
462 // configuration. As a result, any partially synced datatype can stall the
463 // DTM, waiting for the configuration to complete, which it never will due
464 // to the migration error. In addition, a partially synced nigori will
465 // trigger the migration logic before the backend is initialized, resulting
466 // in crashes. We therefore detect and purge any partially synced types as
467 // part of initialization.
468 if (!PurgePartiallySyncedTypes())
469 return false;
471 return true;
474 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
475 ModelTypeSet partially_synced_types = ModelTypeSet::All();
476 partially_synced_types.RemoveAll(InitialSyncEndedTypes());
477 partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
478 ModelTypeSet::All()));
480 DVLOG(1) << "Purging partially synced types "
481 << ModelTypeSetToString(partially_synced_types);
482 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
483 partially_synced_types.Size());
484 if (partially_synced_types.Empty())
485 return true;
486 return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
487 ModelTypeSet(),
488 ModelTypeSet());
491 bool SyncManagerImpl::PurgeDisabledTypes(
492 ModelTypeSet to_purge,
493 ModelTypeSet to_journal,
494 ModelTypeSet to_unapply) {
495 if (to_purge.Empty())
496 return true;
497 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
498 DCHECK(to_purge.HasAll(to_journal));
499 DCHECK(to_purge.HasAll(to_unapply));
500 return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
503 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
504 DCHECK(thread_checker_.CalledOnValidThread());
505 DCHECK(initialized_);
506 DCHECK(!credentials.email.empty());
507 DCHECK(!credentials.sync_token.empty());
508 DCHECK(!credentials.scope_set.empty());
510 observing_network_connectivity_changes_ = true;
511 if (!connection_manager_->SetAuthToken(credentials.sync_token))
512 return; // Auth token is known to be invalid, so exit early.
514 scheduler_->OnCredentialsUpdated();
516 // TODO(zea): pass the credential age to the debug info event listener.
519 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
520 DCHECK(thread_checker_.CalledOnValidThread());
521 observers_.AddObserver(observer);
524 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
525 DCHECK(thread_checker_.CalledOnValidThread());
526 observers_.RemoveObserver(observer);
529 void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) {
530 DCHECK(thread_checker_.CalledOnValidThread());
532 // Prevent any in-flight method calls from running. Also
533 // invalidates |weak_handle_this_| and |change_observer_|.
534 weak_ptr_factory_.InvalidateWeakPtrs();
535 js_mutation_event_observer_.InvalidateWeakPtrs();
537 scheduler_.reset();
538 session_context_.reset();
540 if (model_type_registry_)
541 sync_encryption_handler_->RemoveObserver(model_type_registry_.get());
543 model_type_registry_.reset();
545 if (sync_encryption_handler_) {
546 sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
547 sync_encryption_handler_->RemoveObserver(this);
550 SetJsEventHandler(WeakHandle<JsEventHandler>());
551 RemoveObserver(&js_sync_manager_observer_);
553 RemoveObserver(&debug_info_event_listener_);
555 // |connection_manager_| may end up being NULL here in tests (in synchronous
556 // initialization mode).
558 // TODO(akalin): Fix this behavior.
559 if (connection_manager_)
560 connection_manager_->RemoveListener(this);
561 connection_manager_.reset();
563 net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
564 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
565 observing_network_connectivity_changes_ = false;
567 if (initialized_ && directory()) {
568 directory()->SaveChanges();
571 share_.directory.reset();
573 change_delegate_ = NULL;
575 initialized_ = false;
577 // We reset these here, since only now we know they will not be
578 // accessed from other threads (since we shut down everything).
579 change_observer_.Reset();
580 weak_handle_this_.Reset();
583 void SyncManagerImpl::OnIPAddressChanged() {
584 if (!observing_network_connectivity_changes_) {
585 DVLOG(1) << "IP address change dropped.";
586 return;
588 DVLOG(1) << "IP address change detected.";
589 OnNetworkConnectivityChangedImpl();
592 void SyncManagerImpl::OnConnectionTypeChanged(
593 net::NetworkChangeNotifier::ConnectionType) {
594 if (!observing_network_connectivity_changes_) {
595 DVLOG(1) << "Connection type change dropped.";
596 return;
598 DVLOG(1) << "Connection type change detected.";
599 OnNetworkConnectivityChangedImpl();
602 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
603 DCHECK(thread_checker_.CalledOnValidThread());
604 scheduler_->OnConnectionStatusChange();
607 void SyncManagerImpl::OnServerConnectionEvent(
608 const ServerConnectionEvent& event) {
609 DCHECK(thread_checker_.CalledOnValidThread());
610 if (event.connection_code ==
611 HttpResponse::SERVER_CONNECTION_OK) {
612 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
613 OnConnectionStatusChange(CONNECTION_OK));
616 if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
617 observing_network_connectivity_changes_ = false;
618 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
619 OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
622 if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
623 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
624 OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
628 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
629 ModelTypeSet models_with_changes) {
630 // This notification happens immediately after the transaction mutex is
631 // released. This allows work to be performed without blocking other threads
632 // from acquiring a transaction.
633 if (!change_delegate_)
634 return;
636 // Call commit.
637 for (ModelTypeSet::Iterator it = models_with_changes.First();
638 it.Good(); it.Inc()) {
639 change_delegate_->OnChangesComplete(it.Get());
640 change_observer_.Call(
641 FROM_HERE,
642 &SyncManager::ChangeObserver::OnChangesComplete,
643 it.Get());
647 ModelTypeSet
648 SyncManagerImpl::HandleTransactionEndingChangeEvent(
649 const ImmutableWriteTransactionInfo& write_transaction_info,
650 syncable::BaseTransaction* trans) {
651 // This notification happens immediately before a syncable WriteTransaction
652 // falls out of scope. It happens while the channel mutex is still held,
653 // and while the transaction mutex is held, so it cannot be re-entrant.
654 if (!change_delegate_ || change_records_.empty())
655 return ModelTypeSet();
657 // This will continue the WriteTransaction using a read only wrapper.
658 // This is the last chance for read to occur in the WriteTransaction
659 // that's closing. This special ReadTransaction will not close the
660 // underlying transaction.
661 ReadTransaction read_trans(GetUserShare(), trans);
663 ModelTypeSet models_with_changes;
664 for (ChangeRecordMap::const_iterator it = change_records_.begin();
665 it != change_records_.end(); ++it) {
666 DCHECK(!it->second.Get().empty());
667 ModelType type = ModelTypeFromInt(it->first);
668 change_delegate_->
669 OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
670 &read_trans, it->second);
671 change_observer_.Call(FROM_HERE,
672 &SyncManager::ChangeObserver::OnChangesApplied,
673 type, write_transaction_info.Get().id, it->second);
674 models_with_changes.Put(type);
676 change_records_.clear();
677 return models_with_changes;
680 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
681 const ImmutableWriteTransactionInfo& write_transaction_info,
682 syncable::BaseTransaction* trans,
683 std::vector<int64>* entries_changed) {
684 // We have been notified about a user action changing a sync model.
685 LOG_IF(WARNING, !change_records_.empty()) <<
686 "CALCULATE_CHANGES called with unapplied old changes.";
688 // The mutated model type, or UNSPECIFIED if nothing was mutated.
689 ModelTypeSet mutated_model_types;
691 const syncable::ImmutableEntryKernelMutationMap& mutations =
692 write_transaction_info.Get().mutations;
693 for (syncable::EntryKernelMutationMap::const_iterator it =
694 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
695 if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
696 continue;
699 ModelType model_type =
700 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
701 if (model_type < FIRST_REAL_MODEL_TYPE) {
702 NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
703 continue;
706 // Found real mutation.
707 if (model_type != UNSPECIFIED) {
708 mutated_model_types.Put(model_type);
709 entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
713 // Nudge if necessary.
714 if (!mutated_model_types.Empty()) {
715 if (weak_handle_this_.IsInitialized()) {
716 weak_handle_this_.Call(FROM_HERE,
717 &SyncManagerImpl::RequestNudgeForDataTypes,
718 FROM_HERE,
719 mutated_model_types);
720 } else {
721 NOTREACHED();
726 void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
727 ModelType type, ChangeReorderBuffer* buffer,
728 Cryptographer* cryptographer, const syncable::EntryKernel& original,
729 bool existed_before, bool exists_now) {
730 // If this is a deletion and the datatype was encrypted, we need to decrypt it
731 // and attach it to the buffer.
732 if (!exists_now && existed_before) {
733 sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
734 if (type == PASSWORDS) {
735 // Passwords must use their own legacy ExtraPasswordChangeRecordData.
736 scoped_ptr<sync_pb::PasswordSpecificsData> data(
737 DecryptPasswordSpecifics(original_specifics, cryptographer));
738 if (!data) {
739 NOTREACHED();
740 return;
742 buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
743 } else if (original_specifics.has_encrypted()) {
744 // All other datatypes can just create a new unencrypted specifics and
745 // attach it.
746 const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
747 if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
748 NOTREACHED();
749 return;
752 buffer->SetSpecificsForId(id, original_specifics);
756 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
757 const ImmutableWriteTransactionInfo& write_transaction_info,
758 syncable::BaseTransaction* trans,
759 std::vector<int64>* entries_changed) {
760 // We only expect one notification per sync step, so change_buffers_ should
761 // contain no pending entries.
762 LOG_IF(WARNING, !change_records_.empty()) <<
763 "CALCULATE_CHANGES called with unapplied old changes.";
765 ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
767 Cryptographer* crypto = directory()->GetCryptographer(trans);
768 const syncable::ImmutableEntryKernelMutationMap& mutations =
769 write_transaction_info.Get().mutations;
770 for (syncable::EntryKernelMutationMap::const_iterator it =
771 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
772 bool existed_before = !it->second.original.ref(syncable::IS_DEL);
773 bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
775 // Omit items that aren't associated with a model.
776 ModelType type =
777 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
778 if (type < FIRST_REAL_MODEL_TYPE)
779 continue;
781 int64 handle = it->first;
782 if (exists_now && !existed_before)
783 change_buffers[type].PushAddedItem(handle);
784 else if (!exists_now && existed_before)
785 change_buffers[type].PushDeletedItem(handle);
786 else if (exists_now && existed_before &&
787 VisiblePropertiesDiffer(it->second, crypto)) {
788 change_buffers[type].PushUpdatedItem(handle);
791 SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
792 it->second.original, existed_before, exists_now);
795 ReadTransaction read_trans(GetUserShare(), trans);
796 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
797 if (!change_buffers[i].IsEmpty()) {
798 if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
799 &(change_records_[i]))) {
800 for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
801 entries_changed->push_back((change_records_[i].Get())[j].id);
803 if (change_records_[i].Get().empty())
804 change_records_.erase(i);
809 void SyncManagerImpl::RequestNudgeForDataTypes(
810 const tracked_objects::Location& nudge_location,
811 ModelTypeSet types) {
812 debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
814 scheduler_->ScheduleLocalNudge(types, nudge_location);
817 void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) {
818 DCHECK(thread_checker_.CalledOnValidThread());
819 scheduler_->ScheduleInitialSyncNudge(type);
822 void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) {
823 DCHECK(thread_checker_.CalledOnValidThread());
824 RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type));
827 void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) {
828 DCHECK(thread_checker_.CalledOnValidThread());
829 RefreshTypes(ModelTypeSet(type));
832 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
833 DCHECK(thread_checker_.CalledOnValidThread());
834 // Only send an event if this is due to a cycle ending and this cycle
835 // concludes a canonical "sync" process; that is, based on what is known
836 // locally we are "all happy" and up-to-date. There may be new changes on
837 // the server, but we'll get them on a subsequent sync.
839 // Notifications are sent at the end of every sync cycle, regardless of
840 // whether we should sync again.
841 if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
842 if (!initialized_) {
843 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
844 << "initialized";
845 return;
848 DVLOG(1) << "Sending OnSyncCycleCompleted";
849 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
850 OnSyncCycleCompleted(event.snapshot));
854 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
855 FOR_EACH_OBSERVER(
856 SyncManager::Observer, observers_,
857 OnActionableError(error));
860 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
862 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
864 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
865 FOR_EACH_OBSERVER(
866 SyncManager::Observer, observers_,
867 OnMigrationRequested(types));
870 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
871 protocol_event_buffer_.RecordProtocolEvent(event);
872 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
873 OnProtocolEvent(event));
876 void SyncManagerImpl::SetJsEventHandler(
877 const WeakHandle<JsEventHandler>& event_handler) {
878 js_sync_manager_observer_.SetJsEventHandler(event_handler);
879 js_mutation_event_observer_.SetJsEventHandler(event_handler);
880 js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
883 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType(
884 syncer::ModelType type) {
885 DirectoryTypeDebugInfoEmitterMap* emitter_map =
886 model_type_registry_->directory_type_debug_info_emitter_map();
887 DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type);
889 if (it == emitter_map->end()) {
890 // This can happen in some cases. The UI thread makes requests of us
891 // when it doesn't really know which types are enabled or disabled.
892 DLOG(WARNING) << "Asked to return debug info for invalid type "
893 << ModelTypeToString(type);
894 return scoped_ptr<base::ListValue>(new base::ListValue());
897 return it->second->GetAllNodes();
900 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) {
901 DCHECK(thread_checker_.CalledOnValidThread());
903 DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled;
904 allstatus_.SetNotificationsEnabled(invalidator_enabled);
905 scheduler_->SetNotificationsEnabled(invalidator_enabled);
908 void SyncManagerImpl::OnIncomingInvalidation(
909 syncer::ModelType type,
910 scoped_ptr<InvalidationInterface> invalidation) {
911 DCHECK(thread_checker_.CalledOnValidThread());
913 allstatus_.IncrementNotificationsReceived();
914 scheduler_->ScheduleInvalidationNudge(
915 type,
916 invalidation.Pass(),
917 FROM_HERE);
920 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
921 DCHECK(thread_checker_.CalledOnValidThread());
922 if (types.Empty()) {
923 LOG(WARNING) << "Sync received refresh request with no types specified.";
924 } else {
925 scheduler_->ScheduleLocalRefreshRequest(
926 types, FROM_HERE);
930 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
931 return allstatus_.status();
934 void SyncManagerImpl::SaveChanges() {
935 directory()->SaveChanges();
938 UserShare* SyncManagerImpl::GetUserShare() {
939 DCHECK(initialized_);
940 return &share_;
943 syncer_v2::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() {
944 DCHECK(initialized_);
945 return sync_context_proxy_.get();
948 const std::string SyncManagerImpl::cache_guid() {
949 DCHECK(initialized_);
950 return directory()->cache_guid();
953 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
954 ReadTransaction trans(FROM_HERE, GetUserShare());
955 ReadNode nigori_node(&trans);
956 if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) {
957 DVLOG(1) << "Couldn't find Nigori node.";
958 return false;
960 bool found_experiment = false;
962 ReadNode favicon_sync_node(&trans);
963 if (favicon_sync_node.InitByClientTagLookup(
964 syncer::EXPERIMENTS,
965 syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
966 experiments->favicon_sync_limit =
967 favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
968 favicon_sync_limit();
969 found_experiment = true;
972 ReadNode pre_commit_update_avoidance_node(&trans);
973 if (pre_commit_update_avoidance_node.InitByClientTagLookup(
974 syncer::EXPERIMENTS,
975 syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
976 session_context_->set_server_enabled_pre_commit_update_avoidance(
977 pre_commit_update_avoidance_node.GetExperimentsSpecifics().
978 pre_commit_update_avoidance().enabled());
979 // We don't bother setting found_experiment. The frontend doesn't need to
980 // know about this.
983 ReadNode gcm_invalidations_node(&trans);
984 if (gcm_invalidations_node.InitByClientTagLookup(
985 syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) ==
986 BaseNode::INIT_OK) {
987 const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
988 gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
989 if (gcm_invalidations.has_enabled()) {
990 experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
991 found_experiment = true;
995 ReadNode wallet_sync_node(&trans);
996 if (wallet_sync_node.InitByClientTagLookup(
997 syncer::EXPERIMENTS, syncer::kWalletSyncTag) == BaseNode::INIT_OK) {
998 const sync_pb::WalletSyncFlags& wallet_sync =
999 wallet_sync_node.GetExperimentsSpecifics().wallet_sync();
1000 if (wallet_sync.has_enabled()) {
1001 experiments->wallet_sync_enabled = wallet_sync.enabled();
1002 found_experiment = true;
1006 return found_experiment;
1009 bool SyncManagerImpl::HasUnsyncedItems() {
1010 ReadTransaction trans(FROM_HERE, GetUserShare());
1011 return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1014 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
1015 return sync_encryption_handler_.get();
1018 ScopedVector<syncer::ProtocolEvent>
1019 SyncManagerImpl::GetBufferedProtocolEvents() {
1020 return protocol_event_buffer_.GetBufferedProtocolEvents();
1023 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
1024 syncer::TypeDebugInfoObserver* observer) {
1025 model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer);
1028 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
1029 syncer::TypeDebugInfoObserver* observer) {
1030 model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer);
1033 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
1034 syncer::TypeDebugInfoObserver* observer) {
1035 return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer);
1038 void SyncManagerImpl::RequestEmitDebugInfo() {
1039 model_type_registry_->RequestEmitDebugInfo();
1042 void SyncManagerImpl::ClearServerData(const ClearServerDataCallback& callback) {
1043 DCHECK(thread_checker_.CalledOnValidThread());
1044 scheduler_->Start(SyncScheduler::CLEAR_SERVER_DATA_MODE, base::Time());
1045 ClearParams params(callback);
1046 scheduler_->ScheduleClearServerData(params);
1049 } // namespace syncer