Add ICU message format support
[chromium-blink-merge.git] / sync / internal_api / sync_manager_impl.cc
blob15d2d858524bb0a7fdbae735cd924fc262a28c1e
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/internal_api/sync_manager_impl.h"
7 #include <string>
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/thread_task_runner_handle.h"
19 #include "base/values.h"
20 #include "sync/engine/sync_scheduler.h"
21 #include "sync/engine/syncer_types.h"
22 #include "sync/internal_api/change_reorder_buffer.h"
23 #include "sync/internal_api/public/base/cancelation_signal.h"
24 #include "sync/internal_api/public/base/invalidation_interface.h"
25 #include "sync/internal_api/public/base/model_type.h"
26 #include "sync/internal_api/public/base_node.h"
27 #include "sync/internal_api/public/configure_reason.h"
28 #include "sync/internal_api/public/engine/polling_constants.h"
29 #include "sync/internal_api/public/http_post_provider_factory.h"
30 #include "sync/internal_api/public/internal_components_factory.h"
31 #include "sync/internal_api/public/read_node.h"
32 #include "sync/internal_api/public/read_transaction.h"
33 #include "sync/internal_api/public/sync_context.h"
34 #include "sync/internal_api/public/sync_context_proxy.h"
35 #include "sync/internal_api/public/user_share.h"
36 #include "sync/internal_api/public/util/experiments.h"
37 #include "sync/internal_api/public/write_node.h"
38 #include "sync/internal_api/public/write_transaction.h"
39 #include "sync/internal_api/sync_context_proxy_impl.h"
40 #include "sync/internal_api/syncapi_internal.h"
41 #include "sync/internal_api/syncapi_server_connection_manager.h"
42 #include "sync/protocol/proto_value_conversions.h"
43 #include "sync/protocol/sync.pb.h"
44 #include "sync/sessions/directory_type_debug_info_emitter.h"
45 #include "sync/syncable/directory.h"
46 #include "sync/syncable/entry.h"
47 #include "sync/syncable/in_memory_directory_backing_store.h"
48 #include "sync/syncable/on_disk_directory_backing_store.h"
50 using base::TimeDelta;
51 using sync_pb::GetUpdatesCallerInfo;
53 class GURL;
55 namespace syncer {
57 using sessions::SyncSessionContext;
58 using syncable::ImmutableWriteTransactionInfo;
59 using syncable::SPECIFICS;
60 using syncable::UNIQUE_POSITION;
62 namespace {
64 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
65 ConfigureReason reason) {
66 switch (reason) {
67 case CONFIGURE_REASON_RECONFIGURATION:
68 return GetUpdatesCallerInfo::RECONFIGURATION;
69 case CONFIGURE_REASON_MIGRATION:
70 return GetUpdatesCallerInfo::MIGRATION;
71 case CONFIGURE_REASON_NEW_CLIENT:
72 return GetUpdatesCallerInfo::NEW_CLIENT;
73 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
74 case CONFIGURE_REASON_CRYPTO:
75 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
76 case CONFIGURE_REASON_PROGRAMMATIC:
77 return GetUpdatesCallerInfo::PROGRAMMATIC;
78 default:
79 NOTREACHED();
81 return GetUpdatesCallerInfo::UNKNOWN;
84 } // namespace
86 SyncManagerImpl::SyncManagerImpl(const std::string& name)
87 : name_(name),
88 change_delegate_(NULL),
89 initialized_(false),
90 observing_network_connectivity_changes_(false),
91 weak_ptr_factory_(this) {
92 // Pre-fill |notification_info_map_|.
93 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
94 notification_info_map_.insert(
95 std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
99 SyncManagerImpl::~SyncManagerImpl() {
100 DCHECK(thread_checker_.CalledOnValidThread());
101 CHECK(!initialized_);
104 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
105 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
107 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
108 base::DictionaryValue* value = new base::DictionaryValue();
109 value->SetInteger("totalCount", total_count);
110 value->SetString("payload", payload);
111 return value;
114 bool SyncManagerImpl::VisiblePositionsDiffer(
115 const syncable::EntryKernelMutation& mutation) const {
116 const syncable::EntryKernel& a = mutation.original;
117 const syncable::EntryKernel& b = mutation.mutated;
118 if (!b.ShouldMaintainPosition())
119 return false;
120 if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
121 return true;
122 if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
123 return true;
124 return false;
127 bool SyncManagerImpl::VisiblePropertiesDiffer(
128 const syncable::EntryKernelMutation& mutation,
129 Cryptographer* cryptographer) const {
130 const syncable::EntryKernel& a = mutation.original;
131 const syncable::EntryKernel& b = mutation.mutated;
132 const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
133 const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
134 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
135 GetModelTypeFromSpecifics(b_specifics));
136 ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
137 // Suppress updates to items that aren't tracked by any browser model.
138 if (model_type < FIRST_REAL_MODEL_TYPE ||
139 !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
140 return false;
142 if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
143 return true;
144 if (!AreSpecificsEqual(cryptographer,
145 a.ref(syncable::SPECIFICS),
146 b.ref(syncable::SPECIFICS))) {
147 return true;
149 if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA),
150 b.ref(syncable::ATTACHMENT_METADATA))) {
151 return true;
153 // We only care if the name has changed if neither specifics is encrypted
154 // (encrypted nodes blow away the NON_UNIQUE_NAME).
155 if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
156 a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
157 return true;
158 if (VisiblePositionsDiffer(mutation))
159 return true;
160 return false;
163 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
164 return directory()->InitialSyncEndedTypes();
167 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
168 ModelTypeSet types) {
169 ModelTypeSet result;
170 for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
171 sync_pb::DataTypeProgressMarker marker;
172 directory()->GetDownloadProgress(i.Get(), &marker);
174 if (marker.token().empty())
175 result.Put(i.Get());
177 return result;
180 void SyncManagerImpl::ConfigureSyncer(
181 ConfigureReason reason,
182 ModelTypeSet to_download,
183 ModelTypeSet to_purge,
184 ModelTypeSet to_journal,
185 ModelTypeSet to_unapply,
186 const ModelSafeRoutingInfo& new_routing_info,
187 const base::Closure& ready_task,
188 const base::Closure& retry_task) {
189 DCHECK(thread_checker_.CalledOnValidThread());
190 DCHECK(!ready_task.is_null());
191 DCHECK(initialized_);
193 DVLOG(1) << "Configuring -"
194 << "\n\t" << "current types: "
195 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
196 << "\n\t" << "types to download: "
197 << ModelTypeSetToString(to_download)
198 << "\n\t" << "types to purge: "
199 << ModelTypeSetToString(to_purge)
200 << "\n\t" << "types to journal: "
201 << ModelTypeSetToString(to_journal)
202 << "\n\t" << "types to unapply: "
203 << ModelTypeSetToString(to_unapply);
204 if (!PurgeDisabledTypes(to_purge,
205 to_journal,
206 to_unapply)) {
207 // We failed to cleanup the types. Invoke the ready task without actually
208 // configuring any types. The caller should detect this as a configuration
209 // failure and act appropriately.
210 ready_task.Run();
211 return;
214 ConfigurationParams params(GetSourceFromReason(reason),
215 to_download,
216 new_routing_info,
217 ready_task,
218 retry_task);
220 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE, base::Time());
221 scheduler_->ScheduleConfiguration(params);
224 void SyncManagerImpl::Init(InitArgs* args) {
225 CHECK(!initialized_);
226 DCHECK(thread_checker_.CalledOnValidThread());
227 DCHECK(args->post_factory.get());
228 DCHECK(!args->credentials.email.empty());
229 DCHECK(!args->credentials.sync_token.empty());
230 DCHECK(!args->credentials.scope_set.empty());
231 DCHECK(args->cancelation_signal);
232 DVLOG(1) << "SyncManager starting Init...";
234 weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
236 change_delegate_ = args->change_delegate;
238 AddObserver(&js_sync_manager_observer_);
239 SetJsEventHandler(args->event_handler);
241 AddObserver(&debug_info_event_listener_);
243 database_path_ = args->database_location.Append(
244 syncable::Directory::kSyncDatabaseFilename);
245 unrecoverable_error_handler_ = args->unrecoverable_error_handler.Pass();
246 report_unrecoverable_error_function_ =
247 args->report_unrecoverable_error_function;
249 allstatus_.SetHasKeystoreKey(
250 !args->restored_keystore_key_for_bootstrapping.empty());
251 sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
252 &share_, args->encryptor, args->restored_key_for_bootstrapping,
253 args->restored_keystore_key_for_bootstrapping, args->clear_data_option));
254 sync_encryption_handler_->AddObserver(this);
255 sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
256 sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
258 base::FilePath absolute_db_path = database_path_;
259 DCHECK(absolute_db_path.IsAbsolute());
261 scoped_ptr<syncable::DirectoryBackingStore> backing_store =
262 args->internal_components_factory->BuildDirectoryBackingStore(
263 InternalComponentsFactory::STORAGE_ON_DISK,
264 args->credentials.email, absolute_db_path).Pass();
266 DCHECK(backing_store.get());
267 share_.directory.reset(
268 new syncable::Directory(
269 backing_store.release(),
270 unrecoverable_error_handler_.get(),
271 report_unrecoverable_error_function_,
272 sync_encryption_handler_.get(),
273 sync_encryption_handler_->GetCryptographerUnsafe()));
274 share_.sync_credentials = args->credentials;
276 // UserShare is accessible to a lot of code that doesn't need access to the
277 // sync token so clear sync_token from the UserShare.
278 share_.sync_credentials.sync_token = "";
280 const std::string& username = args->credentials.email;
281 DVLOG(1) << "Username: " << username;
282 if (!OpenDirectory(username)) {
283 NotifyInitializationFailure();
284 LOG(ERROR) << "Sync manager initialization failed!";
285 return;
288 // Now that we have opened the Directory we can restore any previously saved
289 // nigori specifics.
290 if (args->saved_nigori_state) {
291 sync_encryption_handler_->RestoreNigori(*args->saved_nigori_state);
292 args->saved_nigori_state.reset();
295 connection_manager_.reset(new SyncAPIServerConnectionManager(
296 args->service_url.host() + args->service_url.path(),
297 args->service_url.EffectiveIntPort(),
298 args->service_url.SchemeIsCryptographic(), args->post_factory.release(),
299 args->cancelation_signal));
300 connection_manager_->set_client_id(directory()->cache_guid());
301 connection_manager_->AddListener(this);
303 std::string sync_id = directory()->cache_guid();
305 DVLOG(1) << "Setting sync client ID: " << sync_id;
306 allstatus_.SetSyncId(sync_id);
307 DVLOG(1) << "Setting invalidator client ID: " << args->invalidator_client_id;
308 allstatus_.SetInvalidatorClientId(args->invalidator_client_id);
310 model_type_registry_.reset(
311 new ModelTypeRegistry(args->workers, directory(), this));
312 sync_encryption_handler_->AddObserver(model_type_registry_.get());
314 // Bind the SyncContext WeakPtr to this thread. This helps us crash earlier
315 // if the pointer is misused in debug mode.
316 base::WeakPtr<syncer_v2::SyncContext> weak_core =
317 model_type_registry_->AsWeakPtr();
318 weak_core.get();
320 sync_context_proxy_.reset(new syncer_v2::SyncContextProxyImpl(
321 base::ThreadTaskRunnerHandle::Get(), weak_core));
323 // Build a SyncSessionContext and store the worker in it.
324 DVLOG(1) << "Sync is bringing up SyncSessionContext.";
325 std::vector<SyncEngineEventListener*> listeners;
326 listeners.push_back(&allstatus_);
327 listeners.push_back(this);
328 session_context_ =
329 args->internal_components_factory->BuildContext(
330 connection_manager_.get(),
331 directory(),
332 args->extensions_activity,
333 listeners,
334 &debug_info_event_listener_,
335 model_type_registry_.get(),
336 args->invalidator_client_id)
337 .Pass();
338 session_context_->set_account_name(args->credentials.email);
339 scheduler_ = args->internal_components_factory->BuildScheduler(
340 name_, session_context_.get(), args->cancelation_signal).Pass();
342 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE, base::Time());
344 initialized_ = true;
346 net::NetworkChangeNotifier::AddIPAddressObserver(this);
347 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
348 observing_network_connectivity_changes_ = true;
350 UpdateCredentials(args->credentials);
352 NotifyInitializationSuccess();
355 void SyncManagerImpl::NotifyInitializationSuccess() {
356 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
357 OnInitializationComplete(
358 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
359 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
360 true, InitialSyncEndedTypes()));
363 void SyncManagerImpl::NotifyInitializationFailure() {
364 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
365 OnInitializationComplete(
366 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
367 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
368 false, ModelTypeSet()));
371 void SyncManagerImpl::OnPassphraseRequired(
372 PassphraseRequiredReason reason,
373 const sync_pb::EncryptedData& pending_keys) {
374 // Does nothing.
377 void SyncManagerImpl::OnPassphraseAccepted() {
378 // Does nothing.
381 void SyncManagerImpl::OnBootstrapTokenUpdated(
382 const std::string& bootstrap_token,
383 BootstrapTokenType type) {
384 if (type == KEYSTORE_BOOTSTRAP_TOKEN)
385 allstatus_.SetHasKeystoreKey(true);
388 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
389 bool encrypt_everything) {
390 allstatus_.SetEncryptedTypes(encrypted_types);
393 void SyncManagerImpl::OnEncryptionComplete() {
394 // Does nothing.
397 void SyncManagerImpl::OnCryptographerStateChanged(
398 Cryptographer* cryptographer) {
399 allstatus_.SetCryptographerReady(cryptographer->is_ready());
400 allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
401 allstatus_.SetKeystoreMigrationTime(
402 sync_encryption_handler_->migration_time());
405 void SyncManagerImpl::OnPassphraseTypeChanged(
406 PassphraseType type,
407 base::Time explicit_passphrase_time) {
408 allstatus_.SetPassphraseType(type);
409 allstatus_.SetKeystoreMigrationTime(
410 sync_encryption_handler_->migration_time());
413 void SyncManagerImpl::OnLocalSetPassphraseEncryption(
414 const SyncEncryptionHandler::NigoriState& nigori_state) {
417 void SyncManagerImpl::StartSyncingNormally(
418 const ModelSafeRoutingInfo& routing_info,
419 base::Time last_poll_time) {
420 // Start the sync scheduler.
421 // TODO(sync): We always want the newest set of routes when we switch back
422 // to normal mode. Figure out how to enforce set_routing_info is always
423 // appropriately set and that it's only modified when switching to normal
424 // mode.
425 DCHECK(thread_checker_.CalledOnValidThread());
426 session_context_->SetRoutingInfo(routing_info);
427 scheduler_->Start(SyncScheduler::NORMAL_MODE,
428 last_poll_time);
431 syncable::Directory* SyncManagerImpl::directory() {
432 return share_.directory.get();
435 const SyncScheduler* SyncManagerImpl::scheduler() const {
436 return scheduler_.get();
439 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
440 return connection_manager_->HasInvalidAuthToken();
443 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
444 DCHECK(!initialized_) << "Should only happen once";
446 // Set before Open().
447 change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
448 WeakHandle<syncable::TransactionObserver> transaction_observer(
449 MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
451 syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
452 open_result = directory()->Open(username, this, transaction_observer);
453 if (open_result != syncable::OPENED) {
454 LOG(ERROR) << "Could not open share for:" << username;
455 return false;
458 // Unapplied datatypes (those that do not have initial sync ended set) get
459 // re-downloaded during any configuration. But, it's possible for a datatype
460 // to have a progress marker but not have initial sync ended yet, making
461 // it a candidate for migration. This is a problem, as the DataTypeManager
462 // does not support a migration while it's already in the middle of a
463 // configuration. As a result, any partially synced datatype can stall the
464 // DTM, waiting for the configuration to complete, which it never will due
465 // to the migration error. In addition, a partially synced nigori will
466 // trigger the migration logic before the backend is initialized, resulting
467 // in crashes. We therefore detect and purge any partially synced types as
468 // part of initialization.
469 if (!PurgePartiallySyncedTypes())
470 return false;
472 return true;
475 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
476 ModelTypeSet partially_synced_types = ModelTypeSet::All();
477 partially_synced_types.RemoveAll(InitialSyncEndedTypes());
478 partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
479 ModelTypeSet::All()));
481 DVLOG(1) << "Purging partially synced types "
482 << ModelTypeSetToString(partially_synced_types);
483 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
484 partially_synced_types.Size());
485 if (partially_synced_types.Empty())
486 return true;
487 return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
488 ModelTypeSet(),
489 ModelTypeSet());
492 bool SyncManagerImpl::PurgeDisabledTypes(
493 ModelTypeSet to_purge,
494 ModelTypeSet to_journal,
495 ModelTypeSet to_unapply) {
496 if (to_purge.Empty())
497 return true;
498 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
499 DCHECK(to_purge.HasAll(to_journal));
500 DCHECK(to_purge.HasAll(to_unapply));
501 return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
504 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
505 DCHECK(thread_checker_.CalledOnValidThread());
506 DCHECK(initialized_);
507 DCHECK(!credentials.email.empty());
508 DCHECK(!credentials.sync_token.empty());
509 DCHECK(!credentials.scope_set.empty());
511 observing_network_connectivity_changes_ = true;
512 if (!connection_manager_->SetAuthToken(credentials.sync_token))
513 return; // Auth token is known to be invalid, so exit early.
515 scheduler_->OnCredentialsUpdated();
517 // TODO(zea): pass the credential age to the debug info event listener.
520 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
521 DCHECK(thread_checker_.CalledOnValidThread());
522 observers_.AddObserver(observer);
525 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
526 DCHECK(thread_checker_.CalledOnValidThread());
527 observers_.RemoveObserver(observer);
530 void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) {
531 DCHECK(thread_checker_.CalledOnValidThread());
533 // Prevent any in-flight method calls from running. Also
534 // invalidates |weak_handle_this_| and |change_observer_|.
535 weak_ptr_factory_.InvalidateWeakPtrs();
536 js_mutation_event_observer_.InvalidateWeakPtrs();
538 scheduler_.reset();
539 session_context_.reset();
541 if (model_type_registry_)
542 sync_encryption_handler_->RemoveObserver(model_type_registry_.get());
544 model_type_registry_.reset();
546 if (sync_encryption_handler_) {
547 sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
548 sync_encryption_handler_->RemoveObserver(this);
551 SetJsEventHandler(WeakHandle<JsEventHandler>());
552 RemoveObserver(&js_sync_manager_observer_);
554 RemoveObserver(&debug_info_event_listener_);
556 // |connection_manager_| may end up being NULL here in tests (in synchronous
557 // initialization mode).
559 // TODO(akalin): Fix this behavior.
560 if (connection_manager_)
561 connection_manager_->RemoveListener(this);
562 connection_manager_.reset();
564 net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
565 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
566 observing_network_connectivity_changes_ = false;
568 if (initialized_ && directory()) {
569 directory()->SaveChanges();
572 share_.directory.reset();
574 change_delegate_ = NULL;
576 initialized_ = false;
578 // We reset these here, since only now we know they will not be
579 // accessed from other threads (since we shut down everything).
580 change_observer_.Reset();
581 weak_handle_this_.Reset();
584 void SyncManagerImpl::OnIPAddressChanged() {
585 if (!observing_network_connectivity_changes_) {
586 DVLOG(1) << "IP address change dropped.";
587 return;
589 DVLOG(1) << "IP address change detected.";
590 OnNetworkConnectivityChangedImpl();
593 void SyncManagerImpl::OnConnectionTypeChanged(
594 net::NetworkChangeNotifier::ConnectionType) {
595 if (!observing_network_connectivity_changes_) {
596 DVLOG(1) << "Connection type change dropped.";
597 return;
599 DVLOG(1) << "Connection type change detected.";
600 OnNetworkConnectivityChangedImpl();
603 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
604 DCHECK(thread_checker_.CalledOnValidThread());
605 scheduler_->OnConnectionStatusChange();
608 void SyncManagerImpl::OnServerConnectionEvent(
609 const ServerConnectionEvent& event) {
610 DCHECK(thread_checker_.CalledOnValidThread());
611 if (event.connection_code ==
612 HttpResponse::SERVER_CONNECTION_OK) {
613 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
614 OnConnectionStatusChange(CONNECTION_OK));
617 if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
618 observing_network_connectivity_changes_ = false;
619 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
620 OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
623 if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
624 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
625 OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
629 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
630 ModelTypeSet models_with_changes) {
631 // This notification happens immediately after the transaction mutex is
632 // released. This allows work to be performed without blocking other threads
633 // from acquiring a transaction.
634 if (!change_delegate_)
635 return;
637 // Call commit.
638 for (ModelTypeSet::Iterator it = models_with_changes.First();
639 it.Good(); it.Inc()) {
640 change_delegate_->OnChangesComplete(it.Get());
641 change_observer_.Call(
642 FROM_HERE,
643 &SyncManager::ChangeObserver::OnChangesComplete,
644 it.Get());
648 ModelTypeSet
649 SyncManagerImpl::HandleTransactionEndingChangeEvent(
650 const ImmutableWriteTransactionInfo& write_transaction_info,
651 syncable::BaseTransaction* trans) {
652 // This notification happens immediately before a syncable WriteTransaction
653 // falls out of scope. It happens while the channel mutex is still held,
654 // and while the transaction mutex is held, so it cannot be re-entrant.
655 if (!change_delegate_ || change_records_.empty())
656 return ModelTypeSet();
658 // This will continue the WriteTransaction using a read only wrapper.
659 // This is the last chance for read to occur in the WriteTransaction
660 // that's closing. This special ReadTransaction will not close the
661 // underlying transaction.
662 ReadTransaction read_trans(GetUserShare(), trans);
664 ModelTypeSet models_with_changes;
665 for (ChangeRecordMap::const_iterator it = change_records_.begin();
666 it != change_records_.end(); ++it) {
667 DCHECK(!it->second.Get().empty());
668 ModelType type = ModelTypeFromInt(it->first);
669 change_delegate_->
670 OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
671 &read_trans, it->second);
672 change_observer_.Call(FROM_HERE,
673 &SyncManager::ChangeObserver::OnChangesApplied,
674 type, write_transaction_info.Get().id, it->second);
675 models_with_changes.Put(type);
677 change_records_.clear();
678 return models_with_changes;
681 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
682 const ImmutableWriteTransactionInfo& write_transaction_info,
683 syncable::BaseTransaction* trans,
684 std::vector<int64>* entries_changed) {
685 // We have been notified about a user action changing a sync model.
686 LOG_IF(WARNING, !change_records_.empty()) <<
687 "CALCULATE_CHANGES called with unapplied old changes.";
689 // The mutated model type, or UNSPECIFIED if nothing was mutated.
690 ModelTypeSet mutated_model_types;
692 const syncable::ImmutableEntryKernelMutationMap& mutations =
693 write_transaction_info.Get().mutations;
694 for (syncable::EntryKernelMutationMap::const_iterator it =
695 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
696 if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
697 continue;
700 ModelType model_type =
701 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
702 if (model_type < FIRST_REAL_MODEL_TYPE) {
703 NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
704 continue;
707 // Found real mutation.
708 if (model_type != UNSPECIFIED) {
709 mutated_model_types.Put(model_type);
710 entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
714 // Nudge if necessary.
715 if (!mutated_model_types.Empty()) {
716 if (weak_handle_this_.IsInitialized()) {
717 weak_handle_this_.Call(FROM_HERE,
718 &SyncManagerImpl::RequestNudgeForDataTypes,
719 FROM_HERE,
720 mutated_model_types);
721 } else {
722 NOTREACHED();
727 void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
728 ModelType type, ChangeReorderBuffer* buffer,
729 Cryptographer* cryptographer, const syncable::EntryKernel& original,
730 bool existed_before, bool exists_now) {
731 // If this is a deletion and the datatype was encrypted, we need to decrypt it
732 // and attach it to the buffer.
733 if (!exists_now && existed_before) {
734 sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
735 if (type == PASSWORDS) {
736 // Passwords must use their own legacy ExtraPasswordChangeRecordData.
737 scoped_ptr<sync_pb::PasswordSpecificsData> data(
738 DecryptPasswordSpecifics(original_specifics, cryptographer));
739 if (!data) {
740 NOTREACHED();
741 return;
743 buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
744 } else if (original_specifics.has_encrypted()) {
745 // All other datatypes can just create a new unencrypted specifics and
746 // attach it.
747 const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
748 if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
749 NOTREACHED();
750 return;
753 buffer->SetSpecificsForId(id, original_specifics);
757 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
758 const ImmutableWriteTransactionInfo& write_transaction_info,
759 syncable::BaseTransaction* trans,
760 std::vector<int64>* entries_changed) {
761 // We only expect one notification per sync step, so change_buffers_ should
762 // contain no pending entries.
763 LOG_IF(WARNING, !change_records_.empty()) <<
764 "CALCULATE_CHANGES called with unapplied old changes.";
766 ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
768 Cryptographer* crypto = directory()->GetCryptographer(trans);
769 const syncable::ImmutableEntryKernelMutationMap& mutations =
770 write_transaction_info.Get().mutations;
771 for (syncable::EntryKernelMutationMap::const_iterator it =
772 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
773 bool existed_before = !it->second.original.ref(syncable::IS_DEL);
774 bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
776 // Omit items that aren't associated with a model.
777 ModelType type =
778 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
779 if (type < FIRST_REAL_MODEL_TYPE)
780 continue;
782 int64 handle = it->first;
783 if (exists_now && !existed_before)
784 change_buffers[type].PushAddedItem(handle);
785 else if (!exists_now && existed_before)
786 change_buffers[type].PushDeletedItem(handle);
787 else if (exists_now && existed_before &&
788 VisiblePropertiesDiffer(it->second, crypto)) {
789 change_buffers[type].PushUpdatedItem(handle);
792 SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
793 it->second.original, existed_before, exists_now);
796 ReadTransaction read_trans(GetUserShare(), trans);
797 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
798 if (!change_buffers[i].IsEmpty()) {
799 if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
800 &(change_records_[i]))) {
801 for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
802 entries_changed->push_back((change_records_[i].Get())[j].id);
804 if (change_records_[i].Get().empty())
805 change_records_.erase(i);
810 void SyncManagerImpl::RequestNudgeForDataTypes(
811 const tracked_objects::Location& nudge_location,
812 ModelTypeSet types) {
813 debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
815 scheduler_->ScheduleLocalNudge(types, nudge_location);
818 void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) {
819 DCHECK(thread_checker_.CalledOnValidThread());
820 scheduler_->ScheduleInitialSyncNudge(type);
823 void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) {
824 DCHECK(thread_checker_.CalledOnValidThread());
825 RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type));
828 void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) {
829 DCHECK(thread_checker_.CalledOnValidThread());
830 RefreshTypes(ModelTypeSet(type));
833 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
834 DCHECK(thread_checker_.CalledOnValidThread());
835 // Only send an event if this is due to a cycle ending and this cycle
836 // concludes a canonical "sync" process; that is, based on what is known
837 // locally we are "all happy" and up-to-date. There may be new changes on
838 // the server, but we'll get them on a subsequent sync.
840 // Notifications are sent at the end of every sync cycle, regardless of
841 // whether we should sync again.
842 if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
843 if (!initialized_) {
844 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
845 << "initialized";
846 return;
849 DVLOG(1) << "Sending OnSyncCycleCompleted";
850 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
851 OnSyncCycleCompleted(event.snapshot));
855 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
856 FOR_EACH_OBSERVER(
857 SyncManager::Observer, observers_,
858 OnActionableError(error));
861 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
863 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
865 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
866 FOR_EACH_OBSERVER(
867 SyncManager::Observer, observers_,
868 OnMigrationRequested(types));
871 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
872 protocol_event_buffer_.RecordProtocolEvent(event);
873 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
874 OnProtocolEvent(event));
877 void SyncManagerImpl::SetJsEventHandler(
878 const WeakHandle<JsEventHandler>& event_handler) {
879 js_sync_manager_observer_.SetJsEventHandler(event_handler);
880 js_mutation_event_observer_.SetJsEventHandler(event_handler);
881 js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
884 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType(
885 syncer::ModelType type) {
886 DirectoryTypeDebugInfoEmitterMap* emitter_map =
887 model_type_registry_->directory_type_debug_info_emitter_map();
888 DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type);
890 if (it == emitter_map->end()) {
891 // This can happen in some cases. The UI thread makes requests of us
892 // when it doesn't really know which types are enabled or disabled.
893 DLOG(WARNING) << "Asked to return debug info for invalid type "
894 << ModelTypeToString(type);
895 return scoped_ptr<base::ListValue>(new base::ListValue());
898 return it->second->GetAllNodes();
901 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) {
902 DCHECK(thread_checker_.CalledOnValidThread());
904 DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled;
905 allstatus_.SetNotificationsEnabled(invalidator_enabled);
906 scheduler_->SetNotificationsEnabled(invalidator_enabled);
909 void SyncManagerImpl::OnIncomingInvalidation(
910 syncer::ModelType type,
911 scoped_ptr<InvalidationInterface> invalidation) {
912 DCHECK(thread_checker_.CalledOnValidThread());
914 allstatus_.IncrementNotificationsReceived();
915 scheduler_->ScheduleInvalidationNudge(
916 type,
917 invalidation.Pass(),
918 FROM_HERE);
921 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
922 DCHECK(thread_checker_.CalledOnValidThread());
923 if (types.Empty()) {
924 LOG(WARNING) << "Sync received refresh request with no types specified.";
925 } else {
926 scheduler_->ScheduleLocalRefreshRequest(
927 types, FROM_HERE);
931 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
932 return allstatus_.status();
935 void SyncManagerImpl::SaveChanges() {
936 directory()->SaveChanges();
939 UserShare* SyncManagerImpl::GetUserShare() {
940 DCHECK(initialized_);
941 return &share_;
944 syncer_v2::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() {
945 DCHECK(initialized_);
946 return sync_context_proxy_.get();
949 const std::string SyncManagerImpl::cache_guid() {
950 DCHECK(initialized_);
951 return directory()->cache_guid();
954 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
955 ReadTransaction trans(FROM_HERE, GetUserShare());
956 ReadNode nigori_node(&trans);
957 if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) {
958 DVLOG(1) << "Couldn't find Nigori node.";
959 return false;
961 bool found_experiment = false;
963 ReadNode favicon_sync_node(&trans);
964 if (favicon_sync_node.InitByClientTagLookup(
965 syncer::EXPERIMENTS,
966 syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
967 experiments->favicon_sync_limit =
968 favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
969 favicon_sync_limit();
970 found_experiment = true;
973 ReadNode pre_commit_update_avoidance_node(&trans);
974 if (pre_commit_update_avoidance_node.InitByClientTagLookup(
975 syncer::EXPERIMENTS,
976 syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
977 session_context_->set_server_enabled_pre_commit_update_avoidance(
978 pre_commit_update_avoidance_node.GetExperimentsSpecifics().
979 pre_commit_update_avoidance().enabled());
980 // We don't bother setting found_experiment. The frontend doesn't need to
981 // know about this.
984 ReadNode gcm_invalidations_node(&trans);
985 if (gcm_invalidations_node.InitByClientTagLookup(
986 syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) ==
987 BaseNode::INIT_OK) {
988 const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
989 gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
990 if (gcm_invalidations.has_enabled()) {
991 experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
992 found_experiment = true;
996 ReadNode wallet_sync_node(&trans);
997 if (wallet_sync_node.InitByClientTagLookup(
998 syncer::EXPERIMENTS, syncer::kWalletSyncTag) == BaseNode::INIT_OK) {
999 const sync_pb::WalletSyncFlags& wallet_sync =
1000 wallet_sync_node.GetExperimentsSpecifics().wallet_sync();
1001 if (wallet_sync.has_enabled()) {
1002 experiments->wallet_sync_enabled = wallet_sync.enabled();
1003 found_experiment = true;
1007 return found_experiment;
1010 bool SyncManagerImpl::HasUnsyncedItems() {
1011 ReadTransaction trans(FROM_HERE, GetUserShare());
1012 return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1015 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
1016 return sync_encryption_handler_.get();
1019 ScopedVector<syncer::ProtocolEvent>
1020 SyncManagerImpl::GetBufferedProtocolEvents() {
1021 return protocol_event_buffer_.GetBufferedProtocolEvents();
1024 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
1025 syncer::TypeDebugInfoObserver* observer) {
1026 model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer);
1029 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
1030 syncer::TypeDebugInfoObserver* observer) {
1031 model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer);
1034 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
1035 syncer::TypeDebugInfoObserver* observer) {
1036 return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer);
1039 void SyncManagerImpl::RequestEmitDebugInfo() {
1040 model_type_registry_->RequestEmitDebugInfo();
1043 void SyncManagerImpl::ClearServerData(const ClearServerDataCallback& callback) {
1044 DCHECK(thread_checker_.CalledOnValidThread());
1045 scheduler_->Start(SyncScheduler::CLEAR_SERVER_DATA_MODE, base::Time());
1046 ClearParams params(callback);
1047 scheduler_->ScheduleClearServerData(params);
1050 } // namespace syncer