Cast: Stop logging kVideoFrameSentToEncoder and rename a couple events.
[chromium-blink-merge.git] / sync / internal_api / sync_manager_impl.cc
blob60bd3daff456d03c2fda8244e8b30cf3abc29b3b
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/internal_api/sync_manager_impl.h"
7 #include <string>
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/values.h"
19 #include "sync/engine/sync_scheduler.h"
20 #include "sync/engine/syncer_types.h"
21 #include "sync/internal_api/change_reorder_buffer.h"
22 #include "sync/internal_api/public/base/cancelation_signal.h"
23 #include "sync/internal_api/public/base/model_type.h"
24 #include "sync/internal_api/public/base_node.h"
25 #include "sync/internal_api/public/configure_reason.h"
26 #include "sync/internal_api/public/engine/polling_constants.h"
27 #include "sync/internal_api/public/http_post_provider_factory.h"
28 #include "sync/internal_api/public/internal_components_factory.h"
29 #include "sync/internal_api/public/read_node.h"
30 #include "sync/internal_api/public/read_transaction.h"
31 #include "sync/internal_api/public/sync_core_proxy.h"
32 #include "sync/internal_api/public/user_share.h"
33 #include "sync/internal_api/public/util/experiments.h"
34 #include "sync/internal_api/public/write_node.h"
35 #include "sync/internal_api/public/write_transaction.h"
36 #include "sync/internal_api/sync_core.h"
37 #include "sync/internal_api/sync_core_proxy_impl.h"
38 #include "sync/internal_api/syncapi_internal.h"
39 #include "sync/internal_api/syncapi_server_connection_manager.h"
40 #include "sync/notifier/invalidation_util.h"
41 #include "sync/notifier/invalidator.h"
42 #include "sync/notifier/object_id_invalidation_map.h"
43 #include "sync/protocol/proto_value_conversions.h"
44 #include "sync/protocol/sync.pb.h"
45 #include "sync/sessions/directory_type_debug_info_emitter.h"
46 #include "sync/syncable/directory.h"
47 #include "sync/syncable/entry.h"
48 #include "sync/syncable/in_memory_directory_backing_store.h"
49 #include "sync/syncable/on_disk_directory_backing_store.h"
51 using base::TimeDelta;
52 using sync_pb::GetUpdatesCallerInfo;
54 namespace syncer {
56 using sessions::SyncSessionContext;
57 using syncable::ImmutableWriteTransactionInfo;
58 using syncable::SPECIFICS;
59 using syncable::UNIQUE_POSITION;
61 namespace {
63 // Delays for syncer nudges.
64 static const int kDefaultNudgeDelayMilliseconds = 200;
65 static const int kPreferencesNudgeDelayMilliseconds = 2000;
66 static const int kSyncRefreshDelayMsec = 500;
67 static const int kSyncSchedulerDelayMsec = 250;
69 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
70 ConfigureReason reason) {
71 switch (reason) {
72 case CONFIGURE_REASON_RECONFIGURATION:
73 return GetUpdatesCallerInfo::RECONFIGURATION;
74 case CONFIGURE_REASON_MIGRATION:
75 return GetUpdatesCallerInfo::MIGRATION;
76 case CONFIGURE_REASON_NEW_CLIENT:
77 return GetUpdatesCallerInfo::NEW_CLIENT;
78 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
79 case CONFIGURE_REASON_CRYPTO:
80 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
81 default:
82 NOTREACHED();
84 return GetUpdatesCallerInfo::UNKNOWN;
87 } // namespace
89 // A class to calculate nudge delays for types.
90 class NudgeStrategy {
91 public:
92 static TimeDelta GetNudgeDelayTimeDelta(const ModelType& model_type,
93 SyncManagerImpl* core) {
94 NudgeDelayStrategy delay_type = GetNudgeDelayStrategy(model_type);
95 return GetNudgeDelayTimeDeltaFromType(delay_type,
96 model_type,
97 core);
100 private:
101 // Possible types of nudge delay for datatypes.
102 // Note: These are just hints. If a sync happens then all dirty entries
103 // would be committed as part of the sync.
104 enum NudgeDelayStrategy {
105 // Sync right away.
106 IMMEDIATE,
108 // Sync this change while syncing another change.
109 ACCOMPANY_ONLY,
111 // The datatype does not use one of the predefined wait times but defines
112 // its own wait time logic for nudge.
113 CUSTOM,
116 static NudgeDelayStrategy GetNudgeDelayStrategy(const ModelType& type) {
117 switch (type) {
118 case AUTOFILL:
119 return ACCOMPANY_ONLY;
120 case PREFERENCES:
121 case SESSIONS:
122 case FAVICON_IMAGES:
123 case FAVICON_TRACKING:
124 return CUSTOM;
125 default:
126 return IMMEDIATE;
130 static TimeDelta GetNudgeDelayTimeDeltaFromType(
131 const NudgeDelayStrategy& delay_type, const ModelType& model_type,
132 const SyncManagerImpl* core) {
133 CHECK(core);
134 TimeDelta delay = TimeDelta::FromMilliseconds(
135 kDefaultNudgeDelayMilliseconds);
136 switch (delay_type) {
137 case IMMEDIATE:
138 delay = TimeDelta::FromMilliseconds(
139 kDefaultNudgeDelayMilliseconds);
140 break;
141 case ACCOMPANY_ONLY:
142 delay = TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds);
143 break;
144 case CUSTOM:
145 switch (model_type) {
146 case PREFERENCES:
147 delay = TimeDelta::FromMilliseconds(
148 kPreferencesNudgeDelayMilliseconds);
149 break;
150 case SESSIONS:
151 case FAVICON_IMAGES:
152 case FAVICON_TRACKING:
153 delay = core->scheduler()->GetSessionsCommitDelay();
154 break;
155 default:
156 NOTREACHED();
158 break;
159 default:
160 NOTREACHED();
162 return delay;
166 SyncManagerImpl::SyncManagerImpl(const std::string& name)
167 : name_(name),
168 change_delegate_(NULL),
169 initialized_(false),
170 observing_network_connectivity_changes_(false),
171 invalidator_state_(DEFAULT_INVALIDATION_ERROR),
172 report_unrecoverable_error_function_(NULL),
173 weak_ptr_factory_(this) {
174 // Pre-fill |notification_info_map_|.
175 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
176 notification_info_map_.insert(
177 std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
181 SyncManagerImpl::~SyncManagerImpl() {
182 DCHECK(thread_checker_.CalledOnValidThread());
183 CHECK(!initialized_);
186 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
187 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
189 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
190 base::DictionaryValue* value = new base::DictionaryValue();
191 value->SetInteger("totalCount", total_count);
192 value->SetString("payload", payload);
193 return value;
196 bool SyncManagerImpl::VisiblePositionsDiffer(
197 const syncable::EntryKernelMutation& mutation) const {
198 const syncable::EntryKernel& a = mutation.original;
199 const syncable::EntryKernel& b = mutation.mutated;
200 if (!b.ShouldMaintainPosition())
201 return false;
202 if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
203 return true;
204 if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
205 return true;
206 return false;
209 bool SyncManagerImpl::VisiblePropertiesDiffer(
210 const syncable::EntryKernelMutation& mutation,
211 Cryptographer* cryptographer) const {
212 const syncable::EntryKernel& a = mutation.original;
213 const syncable::EntryKernel& b = mutation.mutated;
214 const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
215 const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
216 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
217 GetModelTypeFromSpecifics(b_specifics));
218 ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
219 // Suppress updates to items that aren't tracked by any browser model.
220 if (model_type < FIRST_REAL_MODEL_TYPE ||
221 !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
222 return false;
224 if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
225 return true;
226 if (!AreSpecificsEqual(cryptographer,
227 a.ref(syncable::SPECIFICS),
228 b.ref(syncable::SPECIFICS))) {
229 return true;
231 // We only care if the name has changed if neither specifics is encrypted
232 // (encrypted nodes blow away the NON_UNIQUE_NAME).
233 if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
234 a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
235 return true;
236 if (VisiblePositionsDiffer(mutation))
237 return true;
238 return false;
241 void SyncManagerImpl::ThrowUnrecoverableError() {
242 DCHECK(thread_checker_.CalledOnValidThread());
243 ReadTransaction trans(FROM_HERE, GetUserShare());
244 trans.GetWrappedTrans()->OnUnrecoverableError(
245 FROM_HERE, "Simulating unrecoverable error for testing purposes.");
248 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
249 return directory()->InitialSyncEndedTypes();
252 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
253 ModelTypeSet types) {
254 ModelTypeSet result;
255 for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
256 sync_pb::DataTypeProgressMarker marker;
257 directory()->GetDownloadProgress(i.Get(), &marker);
259 if (marker.token().empty())
260 result.Put(i.Get());
262 return result;
265 void SyncManagerImpl::ConfigureSyncer(
266 ConfigureReason reason,
267 ModelTypeSet to_download,
268 ModelTypeSet to_purge,
269 ModelTypeSet to_journal,
270 ModelTypeSet to_unapply,
271 const ModelSafeRoutingInfo& new_routing_info,
272 const base::Closure& ready_task,
273 const base::Closure& retry_task) {
274 DCHECK(thread_checker_.CalledOnValidThread());
275 DCHECK(!ready_task.is_null());
276 DCHECK(!retry_task.is_null());
278 DVLOG(1) << "Configuring -"
279 << "\n\t" << "current types: "
280 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
281 << "\n\t" << "types to download: "
282 << ModelTypeSetToString(to_download)
283 << "\n\t" << "types to purge: "
284 << ModelTypeSetToString(to_purge)
285 << "\n\t" << "types to journal: "
286 << ModelTypeSetToString(to_journal)
287 << "\n\t" << "types to unapply: "
288 << ModelTypeSetToString(to_unapply);
289 if (!PurgeDisabledTypes(to_purge,
290 to_journal,
291 to_unapply)) {
292 // We failed to cleanup the types. Invoke the ready task without actually
293 // configuring any types. The caller should detect this as a configuration
294 // failure and act appropriately.
295 ready_task.Run();
296 return;
299 ConfigurationParams params(GetSourceFromReason(reason),
300 to_download,
301 new_routing_info,
302 ready_task,
303 retry_task);
305 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
306 scheduler_->ScheduleConfiguration(params);
309 void SyncManagerImpl::Init(
310 const base::FilePath& database_location,
311 const WeakHandle<JsEventHandler>& event_handler,
312 const std::string& sync_server_and_path,
313 int port,
314 bool use_ssl,
315 scoped_ptr<HttpPostProviderFactory> post_factory,
316 const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
317 ExtensionsActivity* extensions_activity,
318 SyncManager::ChangeDelegate* change_delegate,
319 const SyncCredentials& credentials,
320 const std::string& invalidator_client_id,
321 const std::string& restored_key_for_bootstrapping,
322 const std::string& restored_keystore_key_for_bootstrapping,
323 InternalComponentsFactory* internal_components_factory,
324 Encryptor* encryptor,
325 scoped_ptr<UnrecoverableErrorHandler> unrecoverable_error_handler,
326 ReportUnrecoverableErrorFunction report_unrecoverable_error_function,
327 CancelationSignal* cancelation_signal) {
328 CHECK(!initialized_);
329 DCHECK(thread_checker_.CalledOnValidThread());
330 DCHECK(post_factory.get());
331 DCHECK(!credentials.email.empty());
332 DCHECK(!credentials.sync_token.empty());
333 DCHECK(cancelation_signal);
334 DVLOG(1) << "SyncManager starting Init...";
336 weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
338 change_delegate_ = change_delegate;
340 AddObserver(&js_sync_manager_observer_);
341 SetJsEventHandler(event_handler);
343 AddObserver(&debug_info_event_listener_);
345 database_path_ = database_location.Append(
346 syncable::Directory::kSyncDatabaseFilename);
347 unrecoverable_error_handler_ = unrecoverable_error_handler.Pass();
348 report_unrecoverable_error_function_ = report_unrecoverable_error_function;
350 allstatus_.SetHasKeystoreKey(
351 !restored_keystore_key_for_bootstrapping.empty());
352 sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
353 &share_,
354 encryptor,
355 restored_key_for_bootstrapping,
356 restored_keystore_key_for_bootstrapping));
357 sync_encryption_handler_->AddObserver(this);
358 sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
359 sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
361 base::FilePath absolute_db_path = database_path_;
362 DCHECK(absolute_db_path.IsAbsolute());
364 scoped_ptr<syncable::DirectoryBackingStore> backing_store =
365 internal_components_factory->BuildDirectoryBackingStore(
366 credentials.email, absolute_db_path).Pass();
368 DCHECK(backing_store.get());
369 const std::string& username = credentials.email;
370 share_.directory.reset(
371 new syncable::Directory(
372 backing_store.release(),
373 unrecoverable_error_handler_.get(),
374 report_unrecoverable_error_function_,
375 sync_encryption_handler_.get(),
376 sync_encryption_handler_->GetCryptographerUnsafe()));
378 DVLOG(1) << "Username: " << username;
379 if (!OpenDirectory(username)) {
380 NotifyInitializationFailure();
381 LOG(ERROR) << "Sync manager initialization failed!";
382 return;
385 connection_manager_.reset(new SyncAPIServerConnectionManager(
386 sync_server_and_path, port, use_ssl,
387 post_factory.release(), cancelation_signal));
388 connection_manager_->set_client_id(directory()->cache_guid());
389 connection_manager_->AddListener(this);
391 std::string sync_id = directory()->cache_guid();
393 DVLOG(1) << "Setting sync client ID: " << sync_id;
394 allstatus_.SetSyncId(sync_id);
395 DVLOG(1) << "Setting invalidator client ID: " << invalidator_client_id;
396 allstatus_.SetInvalidatorClientId(invalidator_client_id);
398 model_type_registry_.reset(new ModelTypeRegistry(workers, directory()));
400 sync_core_.reset(new SyncCore(model_type_registry_.get()));
402 // Bind the SyncCore WeakPtr to this thread. This helps us crash earlier if
403 // the pointer is misused in debug mode.
404 base::WeakPtr<SyncCore> weak_core = sync_core_->AsWeakPtr();
405 weak_core.get();
407 sync_core_proxy_.reset(
408 new SyncCoreProxyImpl(base::MessageLoopProxy::current(), weak_core));
410 // Build a SyncSessionContext and store the worker in it.
411 DVLOG(1) << "Sync is bringing up SyncSessionContext.";
412 std::vector<SyncEngineEventListener*> listeners;
413 listeners.push_back(&allstatus_);
414 listeners.push_back(this);
415 session_context_ = internal_components_factory->BuildContext(
416 connection_manager_.get(),
417 directory(),
418 extensions_activity,
419 listeners,
420 &debug_info_event_listener_,
421 model_type_registry_.get(),
422 invalidator_client_id).Pass();
423 session_context_->set_account_name(credentials.email);
424 scheduler_ = internal_components_factory->BuildScheduler(
425 name_, session_context_.get(), cancelation_signal).Pass();
427 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
429 initialized_ = true;
431 net::NetworkChangeNotifier::AddIPAddressObserver(this);
432 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
433 observing_network_connectivity_changes_ = true;
435 UpdateCredentials(credentials);
437 NotifyInitializationSuccess();
440 void SyncManagerImpl::NotifyInitializationSuccess() {
441 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
442 OnInitializationComplete(
443 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
444 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
445 true, InitialSyncEndedTypes()));
448 void SyncManagerImpl::NotifyInitializationFailure() {
449 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
450 OnInitializationComplete(
451 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
452 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
453 false, ModelTypeSet()));
456 void SyncManagerImpl::OnPassphraseRequired(
457 PassphraseRequiredReason reason,
458 const sync_pb::EncryptedData& pending_keys) {
459 // Does nothing.
462 void SyncManagerImpl::OnPassphraseAccepted() {
463 // Does nothing.
466 void SyncManagerImpl::OnBootstrapTokenUpdated(
467 const std::string& bootstrap_token,
468 BootstrapTokenType type) {
469 if (type == KEYSTORE_BOOTSTRAP_TOKEN)
470 allstatus_.SetHasKeystoreKey(true);
473 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
474 bool encrypt_everything) {
475 allstatus_.SetEncryptedTypes(encrypted_types);
478 void SyncManagerImpl::OnEncryptionComplete() {
479 // Does nothing.
482 void SyncManagerImpl::OnCryptographerStateChanged(
483 Cryptographer* cryptographer) {
484 allstatus_.SetCryptographerReady(cryptographer->is_ready());
485 allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
486 allstatus_.SetKeystoreMigrationTime(
487 sync_encryption_handler_->migration_time());
490 void SyncManagerImpl::OnPassphraseTypeChanged(
491 PassphraseType type,
492 base::Time explicit_passphrase_time) {
493 allstatus_.SetPassphraseType(type);
494 allstatus_.SetKeystoreMigrationTime(
495 sync_encryption_handler_->migration_time());
498 void SyncManagerImpl::StartSyncingNormally(
499 const ModelSafeRoutingInfo& routing_info) {
500 // Start the sync scheduler.
501 // TODO(sync): We always want the newest set of routes when we switch back
502 // to normal mode. Figure out how to enforce set_routing_info is always
503 // appropriately set and that it's only modified when switching to normal
504 // mode.
505 DCHECK(thread_checker_.CalledOnValidThread());
506 session_context_->SetRoutingInfo(routing_info);
507 scheduler_->Start(SyncScheduler::NORMAL_MODE);
510 syncable::Directory* SyncManagerImpl::directory() {
511 return share_.directory.get();
514 const SyncScheduler* SyncManagerImpl::scheduler() const {
515 return scheduler_.get();
518 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
519 return connection_manager_->HasInvalidAuthToken();
522 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
523 DCHECK(!initialized_) << "Should only happen once";
525 // Set before Open().
526 change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
527 WeakHandle<syncable::TransactionObserver> transaction_observer(
528 MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
530 syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
531 open_result = directory()->Open(username, this, transaction_observer);
532 if (open_result != syncable::OPENED) {
533 LOG(ERROR) << "Could not open share for:" << username;
534 return false;
537 // Unapplied datatypes (those that do not have initial sync ended set) get
538 // re-downloaded during any configuration. But, it's possible for a datatype
539 // to have a progress marker but not have initial sync ended yet, making
540 // it a candidate for migration. This is a problem, as the DataTypeManager
541 // does not support a migration while it's already in the middle of a
542 // configuration. As a result, any partially synced datatype can stall the
543 // DTM, waiting for the configuration to complete, which it never will due
544 // to the migration error. In addition, a partially synced nigori will
545 // trigger the migration logic before the backend is initialized, resulting
546 // in crashes. We therefore detect and purge any partially synced types as
547 // part of initialization.
548 if (!PurgePartiallySyncedTypes())
549 return false;
551 return true;
554 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
555 ModelTypeSet partially_synced_types = ModelTypeSet::All();
556 partially_synced_types.RemoveAll(InitialSyncEndedTypes());
557 partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
558 ModelTypeSet::All()));
560 DVLOG(1) << "Purging partially synced types "
561 << ModelTypeSetToString(partially_synced_types);
562 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
563 partially_synced_types.Size());
564 if (partially_synced_types.Empty())
565 return true;
566 return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
567 ModelTypeSet(),
568 ModelTypeSet());
571 bool SyncManagerImpl::PurgeDisabledTypes(
572 ModelTypeSet to_purge,
573 ModelTypeSet to_journal,
574 ModelTypeSet to_unapply) {
575 if (to_purge.Empty())
576 return true;
577 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
578 DCHECK(to_purge.HasAll(to_journal));
579 DCHECK(to_purge.HasAll(to_unapply));
580 return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
583 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
584 DCHECK(thread_checker_.CalledOnValidThread());
585 DCHECK(initialized_);
586 DCHECK(!credentials.email.empty());
587 DCHECK(!credentials.sync_token.empty());
589 observing_network_connectivity_changes_ = true;
590 if (!connection_manager_->SetAuthToken(credentials.sync_token))
591 return; // Auth token is known to be invalid, so exit early.
593 scheduler_->OnCredentialsUpdated();
595 // TODO(zea): pass the credential age to the debug info event listener.
598 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
599 DCHECK(thread_checker_.CalledOnValidThread());
600 observers_.AddObserver(observer);
603 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
604 DCHECK(thread_checker_.CalledOnValidThread());
605 observers_.RemoveObserver(observer);
608 void SyncManagerImpl::ShutdownOnSyncThread() {
609 DCHECK(thread_checker_.CalledOnValidThread());
611 // Prevent any in-flight method calls from running. Also
612 // invalidates |weak_handle_this_| and |change_observer_|.
613 weak_ptr_factory_.InvalidateWeakPtrs();
614 js_mutation_event_observer_.InvalidateWeakPtrs();
616 scheduler_.reset();
617 session_context_.reset();
618 model_type_registry_.reset();
620 if (sync_encryption_handler_) {
621 sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
622 sync_encryption_handler_->RemoveObserver(this);
625 SetJsEventHandler(WeakHandle<JsEventHandler>());
626 RemoveObserver(&js_sync_manager_observer_);
628 RemoveObserver(&debug_info_event_listener_);
630 // |connection_manager_| may end up being NULL here in tests (in synchronous
631 // initialization mode).
633 // TODO(akalin): Fix this behavior.
634 if (connection_manager_)
635 connection_manager_->RemoveListener(this);
636 connection_manager_.reset();
638 net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
639 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
640 observing_network_connectivity_changes_ = false;
642 if (initialized_ && directory()) {
643 directory()->SaveChanges();
646 share_.directory.reset();
648 change_delegate_ = NULL;
650 initialized_ = false;
652 // We reset these here, since only now we know they will not be
653 // accessed from other threads (since we shut down everything).
654 change_observer_.Reset();
655 weak_handle_this_.Reset();
658 void SyncManagerImpl::OnIPAddressChanged() {
659 if (!observing_network_connectivity_changes_) {
660 DVLOG(1) << "IP address change dropped.";
661 return;
663 DVLOG(1) << "IP address change detected.";
664 OnNetworkConnectivityChangedImpl();
667 void SyncManagerImpl::OnConnectionTypeChanged(
668 net::NetworkChangeNotifier::ConnectionType) {
669 if (!observing_network_connectivity_changes_) {
670 DVLOG(1) << "Connection type change dropped.";
671 return;
673 DVLOG(1) << "Connection type change detected.";
674 OnNetworkConnectivityChangedImpl();
677 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
678 DCHECK(thread_checker_.CalledOnValidThread());
679 scheduler_->OnConnectionStatusChange();
682 void SyncManagerImpl::OnServerConnectionEvent(
683 const ServerConnectionEvent& event) {
684 DCHECK(thread_checker_.CalledOnValidThread());
685 if (event.connection_code ==
686 HttpResponse::SERVER_CONNECTION_OK) {
687 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
688 OnConnectionStatusChange(CONNECTION_OK));
691 if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
692 observing_network_connectivity_changes_ = false;
693 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
694 OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
697 if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
698 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
699 OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
703 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
704 ModelTypeSet models_with_changes) {
705 // This notification happens immediately after the transaction mutex is
706 // released. This allows work to be performed without blocking other threads
707 // from acquiring a transaction.
708 if (!change_delegate_)
709 return;
711 // Call commit.
712 for (ModelTypeSet::Iterator it = models_with_changes.First();
713 it.Good(); it.Inc()) {
714 change_delegate_->OnChangesComplete(it.Get());
715 change_observer_.Call(
716 FROM_HERE,
717 &SyncManager::ChangeObserver::OnChangesComplete,
718 it.Get());
722 ModelTypeSet
723 SyncManagerImpl::HandleTransactionEndingChangeEvent(
724 const ImmutableWriteTransactionInfo& write_transaction_info,
725 syncable::BaseTransaction* trans) {
726 // This notification happens immediately before a syncable WriteTransaction
727 // falls out of scope. It happens while the channel mutex is still held,
728 // and while the transaction mutex is held, so it cannot be re-entrant.
729 if (!change_delegate_ || change_records_.empty())
730 return ModelTypeSet();
732 // This will continue the WriteTransaction using a read only wrapper.
733 // This is the last chance for read to occur in the WriteTransaction
734 // that's closing. This special ReadTransaction will not close the
735 // underlying transaction.
736 ReadTransaction read_trans(GetUserShare(), trans);
738 ModelTypeSet models_with_changes;
739 for (ChangeRecordMap::const_iterator it = change_records_.begin();
740 it != change_records_.end(); ++it) {
741 DCHECK(!it->second.Get().empty());
742 ModelType type = ModelTypeFromInt(it->first);
743 change_delegate_->
744 OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
745 &read_trans, it->second);
746 change_observer_.Call(FROM_HERE,
747 &SyncManager::ChangeObserver::OnChangesApplied,
748 type, write_transaction_info.Get().id, it->second);
749 models_with_changes.Put(type);
751 change_records_.clear();
752 return models_with_changes;
755 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
756 const ImmutableWriteTransactionInfo& write_transaction_info,
757 syncable::BaseTransaction* trans,
758 std::vector<int64>* entries_changed) {
759 // We have been notified about a user action changing a sync model.
760 LOG_IF(WARNING, !change_records_.empty()) <<
761 "CALCULATE_CHANGES called with unapplied old changes.";
763 // The mutated model type, or UNSPECIFIED if nothing was mutated.
764 ModelTypeSet mutated_model_types;
766 const syncable::ImmutableEntryKernelMutationMap& mutations =
767 write_transaction_info.Get().mutations;
768 for (syncable::EntryKernelMutationMap::const_iterator it =
769 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
770 if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
771 continue;
774 ModelType model_type =
775 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
776 if (model_type < FIRST_REAL_MODEL_TYPE) {
777 NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
778 continue;
781 // Found real mutation.
782 if (model_type != UNSPECIFIED) {
783 mutated_model_types.Put(model_type);
784 entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
788 // Nudge if necessary.
789 if (!mutated_model_types.Empty()) {
790 if (weak_handle_this_.IsInitialized()) {
791 weak_handle_this_.Call(FROM_HERE,
792 &SyncManagerImpl::RequestNudgeForDataTypes,
793 FROM_HERE,
794 mutated_model_types);
795 } else {
796 NOTREACHED();
801 void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
802 ModelType type, ChangeReorderBuffer* buffer,
803 Cryptographer* cryptographer, const syncable::EntryKernel& original,
804 bool existed_before, bool exists_now) {
805 // If this is a deletion and the datatype was encrypted, we need to decrypt it
806 // and attach it to the buffer.
807 if (!exists_now && existed_before) {
808 sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
809 if (type == PASSWORDS) {
810 // Passwords must use their own legacy ExtraPasswordChangeRecordData.
811 scoped_ptr<sync_pb::PasswordSpecificsData> data(
812 DecryptPasswordSpecifics(original_specifics, cryptographer));
813 if (!data) {
814 NOTREACHED();
815 return;
817 buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
818 } else if (original_specifics.has_encrypted()) {
819 // All other datatypes can just create a new unencrypted specifics and
820 // attach it.
821 const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
822 if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
823 NOTREACHED();
824 return;
827 buffer->SetSpecificsForId(id, original_specifics);
831 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
832 const ImmutableWriteTransactionInfo& write_transaction_info,
833 syncable::BaseTransaction* trans,
834 std::vector<int64>* entries_changed) {
835 // We only expect one notification per sync step, so change_buffers_ should
836 // contain no pending entries.
837 LOG_IF(WARNING, !change_records_.empty()) <<
838 "CALCULATE_CHANGES called with unapplied old changes.";
840 ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
842 Cryptographer* crypto = directory()->GetCryptographer(trans);
843 const syncable::ImmutableEntryKernelMutationMap& mutations =
844 write_transaction_info.Get().mutations;
845 for (syncable::EntryKernelMutationMap::const_iterator it =
846 mutations.Get().begin(); it != mutations.Get().end(); ++it) {
847 bool existed_before = !it->second.original.ref(syncable::IS_DEL);
848 bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
850 // Omit items that aren't associated with a model.
851 ModelType type =
852 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
853 if (type < FIRST_REAL_MODEL_TYPE)
854 continue;
856 int64 handle = it->first;
857 if (exists_now && !existed_before)
858 change_buffers[type].PushAddedItem(handle);
859 else if (!exists_now && existed_before)
860 change_buffers[type].PushDeletedItem(handle);
861 else if (exists_now && existed_before &&
862 VisiblePropertiesDiffer(it->second, crypto)) {
863 change_buffers[type].PushUpdatedItem(handle);
866 SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
867 it->second.original, existed_before, exists_now);
870 ReadTransaction read_trans(GetUserShare(), trans);
871 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
872 if (!change_buffers[i].IsEmpty()) {
873 if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
874 &(change_records_[i]))) {
875 for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
876 entries_changed->push_back((change_records_[i].Get())[j].id);
878 if (change_records_[i].Get().empty())
879 change_records_.erase(i);
884 TimeDelta SyncManagerImpl::GetNudgeDelayTimeDelta(
885 const ModelType& model_type) {
886 return NudgeStrategy::GetNudgeDelayTimeDelta(model_type, this);
889 void SyncManagerImpl::RequestNudgeForDataTypes(
890 const tracked_objects::Location& nudge_location,
891 ModelTypeSet types) {
892 debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
894 // TODO(lipalani) : Calculate the nudge delay based on all types.
895 base::TimeDelta nudge_delay = NudgeStrategy::GetNudgeDelayTimeDelta(
896 types.First().Get(),
897 this);
898 scheduler_->ScheduleLocalNudge(nudge_delay,
899 types,
900 nudge_location);
903 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
904 DCHECK(thread_checker_.CalledOnValidThread());
905 // Only send an event if this is due to a cycle ending and this cycle
906 // concludes a canonical "sync" process; that is, based on what is known
907 // locally we are "all happy" and up-to-date. There may be new changes on
908 // the server, but we'll get them on a subsequent sync.
910 // Notifications are sent at the end of every sync cycle, regardless of
911 // whether we should sync again.
912 if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
913 if (!initialized_) {
914 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
915 << "initialized";
916 return;
919 DVLOG(1) << "Sending OnSyncCycleCompleted";
920 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
921 OnSyncCycleCompleted(event.snapshot));
925 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
926 FOR_EACH_OBSERVER(
927 SyncManager::Observer, observers_,
928 OnActionableError(error));
931 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
933 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
935 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
936 FOR_EACH_OBSERVER(
937 SyncManager::Observer, observers_,
938 OnMigrationRequested(types));
941 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
942 protocol_event_buffer_.RecordProtocolEvent(event);
943 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
944 OnProtocolEvent(event));
947 void SyncManagerImpl::SetJsEventHandler(
948 const WeakHandle<JsEventHandler>& event_handler) {
949 js_sync_manager_observer_.SetJsEventHandler(event_handler);
950 js_mutation_event_observer_.SetJsEventHandler(event_handler);
951 js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
954 scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType(
955 syncer::ModelType type) {
956 DirectoryTypeDebugInfoEmitterMap* emitter_map =
957 model_type_registry_->directory_type_debug_info_emitter_map();
958 DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type);
960 if (it == emitter_map->end()) {
961 // This can happen in some cases. The UI thread makes requests of us
962 // when it doesn't really know which types are enabled or disabled.
963 DLOG(WARNING) << "Asked to return debug info for invalid type "
964 << ModelTypeToString(type);
965 return scoped_ptr<base::ListValue>();
968 return it->second->GetAllNodes();
971 void SyncManagerImpl::OnInvalidatorStateChange(InvalidatorState state) {
972 DCHECK(thread_checker_.CalledOnValidThread());
974 const std::string& state_str = InvalidatorStateToString(state);
975 invalidator_state_ = state;
976 DVLOG(1) << "Invalidator state changed to: " << state_str;
977 const bool notifications_enabled =
978 (invalidator_state_ == INVALIDATIONS_ENABLED);
979 allstatus_.SetNotificationsEnabled(notifications_enabled);
980 scheduler_->SetNotificationsEnabled(notifications_enabled);
983 void SyncManagerImpl::OnIncomingInvalidation(
984 const ObjectIdInvalidationMap& invalidation_map) {
985 DCHECK(thread_checker_.CalledOnValidThread());
987 // We should never receive IDs from non-sync objects.
988 ObjectIdSet ids = invalidation_map.GetObjectIds();
989 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
990 ModelType type;
991 if (!ObjectIdToRealModelType(*it, &type)) {
992 DLOG(WARNING) << "Notification has invalid id: " << ObjectIdToString(*it);
996 if (invalidation_map.Empty()) {
997 LOG(WARNING) << "Sync received invalidation without any type information.";
998 } else {
999 scheduler_->ScheduleInvalidationNudge(
1000 TimeDelta::FromMilliseconds(kSyncSchedulerDelayMsec),
1001 invalidation_map, FROM_HERE);
1002 debug_info_event_listener_.OnIncomingNotification(invalidation_map);
1006 std::string SyncManagerImpl::GetOwnerName() const { return "SyncManagerImpl"; }
1008 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
1009 DCHECK(thread_checker_.CalledOnValidThread());
1010 if (types.Empty()) {
1011 LOG(WARNING) << "Sync received refresh request with no types specified.";
1012 } else {
1013 scheduler_->ScheduleLocalRefreshRequest(
1014 TimeDelta::FromMilliseconds(kSyncRefreshDelayMsec),
1015 types, FROM_HERE);
1019 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
1020 return allstatus_.status();
1023 void SyncManagerImpl::SaveChanges() {
1024 directory()->SaveChanges();
1027 UserShare* SyncManagerImpl::GetUserShare() {
1028 DCHECK(initialized_);
1029 return &share_;
1032 syncer::SyncCoreProxy* SyncManagerImpl::GetSyncCoreProxy() {
1033 DCHECK(initialized_);
1034 return sync_core_proxy_.get();
1037 const std::string SyncManagerImpl::cache_guid() {
1038 DCHECK(initialized_);
1039 return directory()->cache_guid();
1042 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
1043 ReadTransaction trans(FROM_HERE, GetUserShare());
1044 ReadNode nigori_node(&trans);
1045 if (nigori_node.InitByTagLookup(kNigoriTag) != BaseNode::INIT_OK) {
1046 DVLOG(1) << "Couldn't find Nigori node.";
1047 return false;
1049 bool found_experiment = false;
1051 ReadNode favicon_sync_node(&trans);
1052 if (favicon_sync_node.InitByClientTagLookup(
1053 syncer::EXPERIMENTS,
1054 syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
1055 experiments->favicon_sync_limit =
1056 favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
1057 favicon_sync_limit();
1058 found_experiment = true;
1061 ReadNode pre_commit_update_avoidance_node(&trans);
1062 if (pre_commit_update_avoidance_node.InitByClientTagLookup(
1063 syncer::EXPERIMENTS,
1064 syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
1065 session_context_->set_server_enabled_pre_commit_update_avoidance(
1066 pre_commit_update_avoidance_node.GetExperimentsSpecifics().
1067 pre_commit_update_avoidance().enabled());
1068 // We don't bother setting found_experiment. The frontend doesn't need to
1069 // know about this.
1072 ReadNode gcm_channel_node(&trans);
1073 if (gcm_channel_node.InitByClientTagLookup(
1074 syncer::EXPERIMENTS,
1075 syncer::kGCMChannelTag) == BaseNode::INIT_OK &&
1076 gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) {
1077 experiments->gcm_channel_state =
1078 (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ?
1079 syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED);
1080 found_experiment = true;
1083 ReadNode enhanced_bookmarks_node(&trans);
1084 if (enhanced_bookmarks_node.InitByClientTagLookup(
1085 syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) ==
1086 BaseNode::INIT_OK &&
1087 enhanced_bookmarks_node.GetExperimentsSpecifics()
1088 .has_enhanced_bookmarks()) {
1089 const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks =
1090 enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks();
1091 if (enhanced_bookmarks.has_enabled())
1092 experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled();
1093 if (enhanced_bookmarks.has_extension_id()) {
1094 experiments->enhanced_bookmarks_ext_id =
1095 enhanced_bookmarks.extension_id();
1097 found_experiment = true;
1100 ReadNode gcm_invalidations_node(&trans);
1101 if (gcm_invalidations_node.InitByClientTagLookup(
1102 syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) ==
1103 BaseNode::INIT_OK) {
1104 const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
1105 gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
1106 if (gcm_invalidations.has_enabled()) {
1107 experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
1108 found_experiment = true;
1112 return found_experiment;
1115 bool SyncManagerImpl::HasUnsyncedItems() {
1116 ReadTransaction trans(FROM_HERE, GetUserShare());
1117 return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1120 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
1121 return sync_encryption_handler_.get();
1124 ScopedVector<syncer::ProtocolEvent>
1125 SyncManagerImpl::GetBufferedProtocolEvents() {
1126 return protocol_event_buffer_.GetBufferedProtocolEvents();
1129 // static.
1130 int SyncManagerImpl::GetDefaultNudgeDelay() {
1131 return kDefaultNudgeDelayMilliseconds;
1134 // static.
1135 int SyncManagerImpl::GetPreferencesNudgeDelay() {
1136 return kPreferencesNudgeDelayMilliseconds;
1139 } // namespace syncer