1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/internal_api/sync_manager_impl.h"
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/values.h"
19 #include "sync/engine/sync_scheduler.h"
20 #include "sync/engine/syncer_types.h"
21 #include "sync/internal_api/change_reorder_buffer.h"
22 #include "sync/internal_api/public/base/cancelation_signal.h"
23 #include "sync/internal_api/public/base/model_type.h"
24 #include "sync/internal_api/public/base_node.h"
25 #include "sync/internal_api/public/configure_reason.h"
26 #include "sync/internal_api/public/engine/polling_constants.h"
27 #include "sync/internal_api/public/http_post_provider_factory.h"
28 #include "sync/internal_api/public/internal_components_factory.h"
29 #include "sync/internal_api/public/read_node.h"
30 #include "sync/internal_api/public/read_transaction.h"
31 #include "sync/internal_api/public/user_share.h"
32 #include "sync/internal_api/public/util/experiments.h"
33 #include "sync/internal_api/public/write_node.h"
34 #include "sync/internal_api/public/write_transaction.h"
35 #include "sync/internal_api/syncapi_internal.h"
36 #include "sync/internal_api/syncapi_server_connection_manager.h"
37 #include "sync/js/js_arg_list.h"
38 #include "sync/js/js_event_details.h"
39 #include "sync/js/js_event_handler.h"
40 #include "sync/js/js_reply_handler.h"
41 #include "sync/notifier/invalidation_util.h"
42 #include "sync/notifier/invalidator.h"
43 #include "sync/notifier/object_id_invalidation_map.h"
44 #include "sync/protocol/proto_value_conversions.h"
45 #include "sync/protocol/sync.pb.h"
46 #include "sync/syncable/directory.h"
47 #include "sync/syncable/entry.h"
48 #include "sync/syncable/in_memory_directory_backing_store.h"
49 #include "sync/syncable/on_disk_directory_backing_store.h"
51 using base::TimeDelta
;
52 using sync_pb::GetUpdatesCallerInfo
;
56 using sessions::SyncSessionContext
;
57 using syncable::ImmutableWriteTransactionInfo
;
58 using syncable::SPECIFICS
;
59 using syncable::UNIQUE_POSITION
;
63 // Delays for syncer nudges.
64 static const int kDefaultNudgeDelayMilliseconds
= 200;
65 static const int kPreferencesNudgeDelayMilliseconds
= 2000;
66 static const int kSyncRefreshDelayMsec
= 500;
67 static const int kSyncSchedulerDelayMsec
= 250;
69 // Maximum count and size for traffic recorder.
70 static const unsigned int kMaxMessagesToRecord
= 10;
71 static const unsigned int kMaxMessageSizeToRecord
= 5 * 1024;
73 GetUpdatesCallerInfo::GetUpdatesSource
GetSourceFromReason(
74 ConfigureReason reason
) {
76 case CONFIGURE_REASON_RECONFIGURATION
:
77 return GetUpdatesCallerInfo::RECONFIGURATION
;
78 case CONFIGURE_REASON_MIGRATION
:
79 return GetUpdatesCallerInfo::MIGRATION
;
80 case CONFIGURE_REASON_NEW_CLIENT
:
81 return GetUpdatesCallerInfo::NEW_CLIENT
;
82 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE
:
83 case CONFIGURE_REASON_CRYPTO
:
84 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE
;
88 return GetUpdatesCallerInfo::UNKNOWN
;
93 // A class to calculate nudge delays for types.
96 static TimeDelta
GetNudgeDelayTimeDelta(const ModelType
& model_type
,
97 SyncManagerImpl
* core
) {
98 NudgeDelayStrategy delay_type
= GetNudgeDelayStrategy(model_type
);
99 return GetNudgeDelayTimeDeltaFromType(delay_type
,
105 // Possible types of nudge delay for datatypes.
106 // Note: These are just hints. If a sync happens then all dirty entries
107 // would be committed as part of the sync.
108 enum NudgeDelayStrategy
{
112 // Sync this change while syncing another change.
115 // The datatype does not use one of the predefined wait times but defines
116 // its own wait time logic for nudge.
120 static NudgeDelayStrategy
GetNudgeDelayStrategy(const ModelType
& type
) {
123 return ACCOMPANY_ONLY
;
127 case FAVICON_TRACKING
:
134 static TimeDelta
GetNudgeDelayTimeDeltaFromType(
135 const NudgeDelayStrategy
& delay_type
, const ModelType
& model_type
,
136 const SyncManagerImpl
* core
) {
138 TimeDelta delay
= TimeDelta::FromMilliseconds(
139 kDefaultNudgeDelayMilliseconds
);
140 switch (delay_type
) {
142 delay
= TimeDelta::FromMilliseconds(
143 kDefaultNudgeDelayMilliseconds
);
146 delay
= TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds
);
149 switch (model_type
) {
151 delay
= TimeDelta::FromMilliseconds(
152 kPreferencesNudgeDelayMilliseconds
);
156 case FAVICON_TRACKING
:
157 delay
= core
->scheduler()->GetSessionsCommitDelay();
170 SyncManagerImpl::SyncManagerImpl(const std::string
& name
)
172 change_delegate_(NULL
),
174 observing_network_connectivity_changes_(false),
175 invalidator_state_(DEFAULT_INVALIDATION_ERROR
),
176 traffic_recorder_(kMaxMessagesToRecord
, kMaxMessageSizeToRecord
),
178 report_unrecoverable_error_function_(NULL
),
179 weak_ptr_factory_(this) {
180 // Pre-fill |notification_info_map_|.
181 for (int i
= FIRST_REAL_MODEL_TYPE
; i
< MODEL_TYPE_COUNT
; ++i
) {
182 notification_info_map_
.insert(
183 std::make_pair(ModelTypeFromInt(i
), NotificationInfo()));
186 // Bind message handlers.
187 BindJsMessageHandler(
188 "getNotificationState",
189 &SyncManagerImpl::GetNotificationState
);
190 BindJsMessageHandler(
191 "getNotificationInfo",
192 &SyncManagerImpl::GetNotificationInfo
);
193 BindJsMessageHandler(
195 &SyncManagerImpl::GetAllNodes
);
196 BindJsMessageHandler(
197 "getClientServerTraffic",
198 &SyncManagerImpl::GetClientServerTraffic
);
201 SyncManagerImpl::~SyncManagerImpl() {
202 DCHECK(thread_checker_
.CalledOnValidThread());
203 CHECK(!initialized_
);
206 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
207 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
209 base::DictionaryValue
* SyncManagerImpl::NotificationInfo::ToValue() const {
210 base::DictionaryValue
* value
= new base::DictionaryValue();
211 value
->SetInteger("totalCount", total_count
);
212 value
->SetString("payload", payload
);
216 bool SyncManagerImpl::VisiblePositionsDiffer(
217 const syncable::EntryKernelMutation
& mutation
) const {
218 const syncable::EntryKernel
& a
= mutation
.original
;
219 const syncable::EntryKernel
& b
= mutation
.mutated
;
220 if (!b
.ShouldMaintainPosition())
222 if (!a
.ref(UNIQUE_POSITION
).Equals(b
.ref(UNIQUE_POSITION
)))
224 if (a
.ref(syncable::PARENT_ID
) != b
.ref(syncable::PARENT_ID
))
229 bool SyncManagerImpl::VisiblePropertiesDiffer(
230 const syncable::EntryKernelMutation
& mutation
,
231 Cryptographer
* cryptographer
) const {
232 const syncable::EntryKernel
& a
= mutation
.original
;
233 const syncable::EntryKernel
& b
= mutation
.mutated
;
234 const sync_pb::EntitySpecifics
& a_specifics
= a
.ref(SPECIFICS
);
235 const sync_pb::EntitySpecifics
& b_specifics
= b
.ref(SPECIFICS
);
236 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics
),
237 GetModelTypeFromSpecifics(b_specifics
));
238 ModelType model_type
= GetModelTypeFromSpecifics(b_specifics
);
239 // Suppress updates to items that aren't tracked by any browser model.
240 if (model_type
< FIRST_REAL_MODEL_TYPE
||
241 !a
.ref(syncable::UNIQUE_SERVER_TAG
).empty()) {
244 if (a
.ref(syncable::IS_DIR
) != b
.ref(syncable::IS_DIR
))
246 if (!AreSpecificsEqual(cryptographer
,
247 a
.ref(syncable::SPECIFICS
),
248 b
.ref(syncable::SPECIFICS
))) {
251 // We only care if the name has changed if neither specifics is encrypted
252 // (encrypted nodes blow away the NON_UNIQUE_NAME).
253 if (!a_specifics
.has_encrypted() && !b_specifics
.has_encrypted() &&
254 a
.ref(syncable::NON_UNIQUE_NAME
) != b
.ref(syncable::NON_UNIQUE_NAME
))
256 if (VisiblePositionsDiffer(mutation
))
261 void SyncManagerImpl::ThrowUnrecoverableError() {
262 DCHECK(thread_checker_
.CalledOnValidThread());
263 ReadTransaction
trans(FROM_HERE
, GetUserShare());
264 trans
.GetWrappedTrans()->OnUnrecoverableError(
265 FROM_HERE
, "Simulating unrecoverable error for testing purposes.");
268 ModelTypeSet
SyncManagerImpl::InitialSyncEndedTypes() {
269 return directory()->InitialSyncEndedTypes();
272 ModelTypeSet
SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
273 ModelTypeSet types
) {
275 for (ModelTypeSet::Iterator i
= types
.First(); i
.Good(); i
.Inc()) {
276 sync_pb::DataTypeProgressMarker marker
;
277 directory()->GetDownloadProgress(i
.Get(), &marker
);
279 if (marker
.token().empty())
285 void SyncManagerImpl::ConfigureSyncer(
286 ConfigureReason reason
,
287 ModelTypeSet to_download
,
288 ModelTypeSet to_purge
,
289 ModelTypeSet to_journal
,
290 ModelTypeSet to_unapply
,
291 const ModelSafeRoutingInfo
& new_routing_info
,
292 const base::Closure
& ready_task
,
293 const base::Closure
& retry_task
) {
294 DCHECK(thread_checker_
.CalledOnValidThread());
295 DCHECK(!ready_task
.is_null());
296 DCHECK(!retry_task
.is_null());
298 DVLOG(1) << "Configuring -"
299 << "\n\t" << "current types: "
300 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info
))
301 << "\n\t" << "types to download: "
302 << ModelTypeSetToString(to_download
)
303 << "\n\t" << "types to purge: "
304 << ModelTypeSetToString(to_purge
)
305 << "\n\t" << "types to journal: "
306 << ModelTypeSetToString(to_journal
)
307 << "\n\t" << "types to unapply: "
308 << ModelTypeSetToString(to_unapply
);
309 if (!PurgeDisabledTypes(to_purge
,
312 // We failed to cleanup the types. Invoke the ready task without actually
313 // configuring any types. The caller should detect this as a configuration
314 // failure and act appropriately.
319 ConfigurationParams
params(GetSourceFromReason(reason
),
325 scheduler_
->Start(SyncScheduler::CONFIGURATION_MODE
);
326 scheduler_
->ScheduleConfiguration(params
);
329 void SyncManagerImpl::Init(
330 const base::FilePath
& database_location
,
331 const WeakHandle
<JsEventHandler
>& event_handler
,
332 const std::string
& sync_server_and_path
,
335 scoped_ptr
<HttpPostProviderFactory
> post_factory
,
336 const std::vector
<scoped_refptr
<ModelSafeWorker
> >& workers
,
337 ExtensionsActivity
* extensions_activity
,
338 SyncManager::ChangeDelegate
* change_delegate
,
339 const SyncCredentials
& credentials
,
340 const std::string
& invalidator_client_id
,
341 const std::string
& restored_key_for_bootstrapping
,
342 const std::string
& restored_keystore_key_for_bootstrapping
,
343 InternalComponentsFactory
* internal_components_factory
,
344 Encryptor
* encryptor
,
345 scoped_ptr
<UnrecoverableErrorHandler
> unrecoverable_error_handler
,
346 ReportUnrecoverableErrorFunction report_unrecoverable_error_function
,
347 CancelationSignal
* cancelation_signal
) {
348 CHECK(!initialized_
);
349 DCHECK(thread_checker_
.CalledOnValidThread());
350 DCHECK(post_factory
.get());
351 DCHECK(!credentials
.email
.empty());
352 DCHECK(!credentials
.sync_token
.empty());
353 DCHECK(cancelation_signal
);
354 DVLOG(1) << "SyncManager starting Init...";
356 weak_handle_this_
= MakeWeakHandle(weak_ptr_factory_
.GetWeakPtr());
358 change_delegate_
= change_delegate
;
360 AddObserver(&js_sync_manager_observer_
);
361 SetJsEventHandler(event_handler
);
363 AddObserver(&debug_info_event_listener_
);
365 database_path_
= database_location
.Append(
366 syncable::Directory::kSyncDatabaseFilename
);
367 encryptor_
= encryptor
;
368 unrecoverable_error_handler_
= unrecoverable_error_handler
.Pass();
369 report_unrecoverable_error_function_
= report_unrecoverable_error_function
;
371 allstatus_
.SetHasKeystoreKey(
372 !restored_keystore_key_for_bootstrapping
.empty());
373 sync_encryption_handler_
.reset(new SyncEncryptionHandlerImpl(
376 restored_key_for_bootstrapping
,
377 restored_keystore_key_for_bootstrapping
));
378 sync_encryption_handler_
->AddObserver(this);
379 sync_encryption_handler_
->AddObserver(&debug_info_event_listener_
);
380 sync_encryption_handler_
->AddObserver(&js_sync_encryption_handler_observer_
);
382 base::FilePath absolute_db_path
= database_path_
;
383 DCHECK(absolute_db_path
.IsAbsolute());
385 scoped_ptr
<syncable::DirectoryBackingStore
> backing_store
=
386 internal_components_factory
->BuildDirectoryBackingStore(
387 credentials
.email
, absolute_db_path
).Pass();
389 DCHECK(backing_store
.get());
390 const std::string
& username
= credentials
.email
;
391 share_
.directory
.reset(
392 new syncable::Directory(
393 backing_store
.release(),
394 unrecoverable_error_handler_
.get(),
395 report_unrecoverable_error_function_
,
396 sync_encryption_handler_
.get(),
397 sync_encryption_handler_
->GetCryptographerUnsafe()));
399 DVLOG(1) << "Username: " << username
;
400 if (!OpenDirectory(username
)) {
401 NotifyInitializationFailure();
402 LOG(ERROR
) << "Sync manager initialization failed!";
406 connection_manager_
.reset(new SyncAPIServerConnectionManager(
407 sync_server_and_path
, port
, use_ssl
,
408 post_factory
.release(), cancelation_signal
));
409 connection_manager_
->set_client_id(directory()->cache_guid());
410 connection_manager_
->AddListener(this);
412 std::string sync_id
= directory()->cache_guid();
414 DVLOG(1) << "Setting sync client ID: " << sync_id
;
415 allstatus_
.SetSyncId(sync_id
);
416 DVLOG(1) << "Setting invalidator client ID: " << invalidator_client_id
;
417 allstatus_
.SetInvalidatorClientId(invalidator_client_id
);
419 model_type_registry_
.reset(new ModelTypeRegistry(workers
, directory()));
421 // Build a SyncSessionContext and store the worker in it.
422 DVLOG(1) << "Sync is bringing up SyncSessionContext.";
423 std::vector
<SyncEngineEventListener
*> listeners
;
424 listeners
.push_back(&allstatus_
);
425 listeners
.push_back(this);
426 session_context_
= internal_components_factory
->BuildContext(
427 connection_manager_
.get(),
431 &debug_info_event_listener_
,
433 model_type_registry_
.get(),
434 invalidator_client_id
).Pass();
435 session_context_
->set_account_name(credentials
.email
);
436 scheduler_
= internal_components_factory
->BuildScheduler(
437 name_
, session_context_
.get(), cancelation_signal
).Pass();
439 scheduler_
->Start(SyncScheduler::CONFIGURATION_MODE
);
443 net::NetworkChangeNotifier::AddIPAddressObserver(this);
444 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
445 observing_network_connectivity_changes_
= true;
447 UpdateCredentials(credentials
);
449 NotifyInitializationSuccess();
452 void SyncManagerImpl::NotifyInitializationSuccess() {
453 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
454 OnInitializationComplete(
455 MakeWeakHandle(weak_ptr_factory_
.GetWeakPtr()),
456 MakeWeakHandle(debug_info_event_listener_
.GetWeakPtr()),
457 true, InitialSyncEndedTypes()));
460 void SyncManagerImpl::NotifyInitializationFailure() {
461 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
462 OnInitializationComplete(
463 MakeWeakHandle(weak_ptr_factory_
.GetWeakPtr()),
464 MakeWeakHandle(debug_info_event_listener_
.GetWeakPtr()),
465 false, ModelTypeSet()));
468 void SyncManagerImpl::OnPassphraseRequired(
469 PassphraseRequiredReason reason
,
470 const sync_pb::EncryptedData
& pending_keys
) {
474 void SyncManagerImpl::OnPassphraseAccepted() {
478 void SyncManagerImpl::OnBootstrapTokenUpdated(
479 const std::string
& bootstrap_token
,
480 BootstrapTokenType type
) {
481 if (type
== KEYSTORE_BOOTSTRAP_TOKEN
)
482 allstatus_
.SetHasKeystoreKey(true);
485 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types
,
486 bool encrypt_everything
) {
487 allstatus_
.SetEncryptedTypes(encrypted_types
);
490 void SyncManagerImpl::OnEncryptionComplete() {
494 void SyncManagerImpl::OnCryptographerStateChanged(
495 Cryptographer
* cryptographer
) {
496 allstatus_
.SetCryptographerReady(cryptographer
->is_ready());
497 allstatus_
.SetCryptoHasPendingKeys(cryptographer
->has_pending_keys());
498 allstatus_
.SetKeystoreMigrationTime(
499 sync_encryption_handler_
->migration_time());
502 void SyncManagerImpl::OnPassphraseTypeChanged(
504 base::Time explicit_passphrase_time
) {
505 allstatus_
.SetPassphraseType(type
);
506 allstatus_
.SetKeystoreMigrationTime(
507 sync_encryption_handler_
->migration_time());
510 void SyncManagerImpl::StartSyncingNormally(
511 const ModelSafeRoutingInfo
& routing_info
) {
512 // Start the sync scheduler.
513 // TODO(sync): We always want the newest set of routes when we switch back
514 // to normal mode. Figure out how to enforce set_routing_info is always
515 // appropriately set and that it's only modified when switching to normal
517 DCHECK(thread_checker_
.CalledOnValidThread());
518 session_context_
->SetRoutingInfo(routing_info
);
519 scheduler_
->Start(SyncScheduler::NORMAL_MODE
);
522 syncable::Directory
* SyncManagerImpl::directory() {
523 return share_
.directory
.get();
526 const SyncScheduler
* SyncManagerImpl::scheduler() const {
527 return scheduler_
.get();
530 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
531 return connection_manager_
->HasInvalidAuthToken();
534 bool SyncManagerImpl::OpenDirectory(const std::string
& username
) {
535 DCHECK(!initialized_
) << "Should only happen once";
537 // Set before Open().
538 change_observer_
= MakeWeakHandle(js_mutation_event_observer_
.AsWeakPtr());
539 WeakHandle
<syncable::TransactionObserver
> transaction_observer(
540 MakeWeakHandle(js_mutation_event_observer_
.AsWeakPtr()));
542 syncable::DirOpenResult open_result
= syncable::NOT_INITIALIZED
;
543 open_result
= directory()->Open(username
, this, transaction_observer
);
544 if (open_result
!= syncable::OPENED
) {
545 LOG(ERROR
) << "Could not open share for:" << username
;
549 // Unapplied datatypes (those that do not have initial sync ended set) get
550 // re-downloaded during any configuration. But, it's possible for a datatype
551 // to have a progress marker but not have initial sync ended yet, making
552 // it a candidate for migration. This is a problem, as the DataTypeManager
553 // does not support a migration while it's already in the middle of a
554 // configuration. As a result, any partially synced datatype can stall the
555 // DTM, waiting for the configuration to complete, which it never will due
556 // to the migration error. In addition, a partially synced nigori will
557 // trigger the migration logic before the backend is initialized, resulting
558 // in crashes. We therefore detect and purge any partially synced types as
559 // part of initialization.
560 if (!PurgePartiallySyncedTypes())
566 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
567 ModelTypeSet partially_synced_types
= ModelTypeSet::All();
568 partially_synced_types
.RemoveAll(InitialSyncEndedTypes());
569 partially_synced_types
.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
570 ModelTypeSet::All()));
572 DVLOG(1) << "Purging partially synced types "
573 << ModelTypeSetToString(partially_synced_types
);
574 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
575 partially_synced_types
.Size());
576 if (partially_synced_types
.Empty())
578 return directory()->PurgeEntriesWithTypeIn(partially_synced_types
,
583 bool SyncManagerImpl::PurgeDisabledTypes(
584 ModelTypeSet to_purge
,
585 ModelTypeSet to_journal
,
586 ModelTypeSet to_unapply
) {
587 if (to_purge
.Empty())
589 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge
);
590 DCHECK(to_purge
.HasAll(to_journal
));
591 DCHECK(to_purge
.HasAll(to_unapply
));
592 return directory()->PurgeEntriesWithTypeIn(to_purge
, to_journal
, to_unapply
);
595 void SyncManagerImpl::UpdateCredentials(const SyncCredentials
& credentials
) {
596 DCHECK(thread_checker_
.CalledOnValidThread());
597 DCHECK(initialized_
);
598 DCHECK(!credentials
.email
.empty());
599 DCHECK(!credentials
.sync_token
.empty());
601 observing_network_connectivity_changes_
= true;
602 if (!connection_manager_
->SetAuthToken(credentials
.sync_token
))
603 return; // Auth token is known to be invalid, so exit early.
605 scheduler_
->OnCredentialsUpdated();
607 // TODO(zea): pass the credential age to the debug info event listener.
610 void SyncManagerImpl::AddObserver(SyncManager::Observer
* observer
) {
611 DCHECK(thread_checker_
.CalledOnValidThread());
612 observers_
.AddObserver(observer
);
615 void SyncManagerImpl::RemoveObserver(SyncManager::Observer
* observer
) {
616 DCHECK(thread_checker_
.CalledOnValidThread());
617 observers_
.RemoveObserver(observer
);
620 void SyncManagerImpl::ShutdownOnSyncThread() {
621 DCHECK(thread_checker_
.CalledOnValidThread());
623 // Prevent any in-flight method calls from running. Also
624 // invalidates |weak_handle_this_| and |change_observer_|.
625 weak_ptr_factory_
.InvalidateWeakPtrs();
626 js_mutation_event_observer_
.InvalidateWeakPtrs();
629 session_context_
.reset();
630 model_type_registry_
.reset();
632 if (sync_encryption_handler_
) {
633 sync_encryption_handler_
->RemoveObserver(&debug_info_event_listener_
);
634 sync_encryption_handler_
->RemoveObserver(this);
637 SetJsEventHandler(WeakHandle
<JsEventHandler
>());
638 RemoveObserver(&js_sync_manager_observer_
);
640 RemoveObserver(&debug_info_event_listener_
);
642 // |connection_manager_| may end up being NULL here in tests (in synchronous
643 // initialization mode).
645 // TODO(akalin): Fix this behavior.
646 if (connection_manager_
)
647 connection_manager_
->RemoveListener(this);
648 connection_manager_
.reset();
650 net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
651 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
652 observing_network_connectivity_changes_
= false;
654 if (initialized_
&& directory()) {
655 directory()->SaveChanges();
658 share_
.directory
.reset();
660 change_delegate_
= NULL
;
662 initialized_
= false;
664 // We reset these here, since only now we know they will not be
665 // accessed from other threads (since we shut down everything).
666 change_observer_
.Reset();
667 weak_handle_this_
.Reset();
670 void SyncManagerImpl::OnIPAddressChanged() {
671 if (!observing_network_connectivity_changes_
) {
672 DVLOG(1) << "IP address change dropped.";
675 DVLOG(1) << "IP address change detected.";
676 OnNetworkConnectivityChangedImpl();
679 void SyncManagerImpl::OnConnectionTypeChanged(
680 net::NetworkChangeNotifier::ConnectionType
) {
681 if (!observing_network_connectivity_changes_
) {
682 DVLOG(1) << "Connection type change dropped.";
685 DVLOG(1) << "Connection type change detected.";
686 OnNetworkConnectivityChangedImpl();
689 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
690 DCHECK(thread_checker_
.CalledOnValidThread());
691 scheduler_
->OnConnectionStatusChange();
694 void SyncManagerImpl::OnServerConnectionEvent(
695 const ServerConnectionEvent
& event
) {
696 DCHECK(thread_checker_
.CalledOnValidThread());
697 if (event
.connection_code
==
698 HttpResponse::SERVER_CONNECTION_OK
) {
699 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
700 OnConnectionStatusChange(CONNECTION_OK
));
703 if (event
.connection_code
== HttpResponse::SYNC_AUTH_ERROR
) {
704 observing_network_connectivity_changes_
= false;
705 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
706 OnConnectionStatusChange(CONNECTION_AUTH_ERROR
));
709 if (event
.connection_code
== HttpResponse::SYNC_SERVER_ERROR
) {
710 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
711 OnConnectionStatusChange(CONNECTION_SERVER_ERROR
));
715 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
716 ModelTypeSet models_with_changes
) {
717 // This notification happens immediately after the transaction mutex is
718 // released. This allows work to be performed without blocking other threads
719 // from acquiring a transaction.
720 if (!change_delegate_
)
724 for (ModelTypeSet::Iterator it
= models_with_changes
.First();
725 it
.Good(); it
.Inc()) {
726 change_delegate_
->OnChangesComplete(it
.Get());
727 change_observer_
.Call(
729 &SyncManager::ChangeObserver::OnChangesComplete
,
735 SyncManagerImpl::HandleTransactionEndingChangeEvent(
736 const ImmutableWriteTransactionInfo
& write_transaction_info
,
737 syncable::BaseTransaction
* trans
) {
738 // This notification happens immediately before a syncable WriteTransaction
739 // falls out of scope. It happens while the channel mutex is still held,
740 // and while the transaction mutex is held, so it cannot be re-entrant.
741 if (!change_delegate_
|| change_records_
.empty())
742 return ModelTypeSet();
744 // This will continue the WriteTransaction using a read only wrapper.
745 // This is the last chance for read to occur in the WriteTransaction
746 // that's closing. This special ReadTransaction will not close the
747 // underlying transaction.
748 ReadTransaction
read_trans(GetUserShare(), trans
);
750 ModelTypeSet models_with_changes
;
751 for (ChangeRecordMap::const_iterator it
= change_records_
.begin();
752 it
!= change_records_
.end(); ++it
) {
753 DCHECK(!it
->second
.Get().empty());
754 ModelType type
= ModelTypeFromInt(it
->first
);
756 OnChangesApplied(type
, trans
->directory()->GetTransactionVersion(type
),
757 &read_trans
, it
->second
);
758 change_observer_
.Call(FROM_HERE
,
759 &SyncManager::ChangeObserver::OnChangesApplied
,
760 type
, write_transaction_info
.Get().id
, it
->second
);
761 models_with_changes
.Put(type
);
763 change_records_
.clear();
764 return models_with_changes
;
767 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
768 const ImmutableWriteTransactionInfo
& write_transaction_info
,
769 syncable::BaseTransaction
* trans
,
770 std::vector
<int64
>* entries_changed
) {
771 // We have been notified about a user action changing a sync model.
772 LOG_IF(WARNING
, !change_records_
.empty()) <<
773 "CALCULATE_CHANGES called with unapplied old changes.";
775 // The mutated model type, or UNSPECIFIED if nothing was mutated.
776 ModelTypeSet mutated_model_types
;
778 const syncable::ImmutableEntryKernelMutationMap
& mutations
=
779 write_transaction_info
.Get().mutations
;
780 for (syncable::EntryKernelMutationMap::const_iterator it
=
781 mutations
.Get().begin(); it
!= mutations
.Get().end(); ++it
) {
782 if (!it
->second
.mutated
.ref(syncable::IS_UNSYNCED
)) {
786 ModelType model_type
=
787 GetModelTypeFromSpecifics(it
->second
.mutated
.ref(SPECIFICS
));
788 if (model_type
< FIRST_REAL_MODEL_TYPE
) {
789 NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
793 // Found real mutation.
794 if (model_type
!= UNSPECIFIED
) {
795 mutated_model_types
.Put(model_type
);
796 entries_changed
->push_back(it
->second
.mutated
.ref(syncable::META_HANDLE
));
800 // Nudge if necessary.
801 if (!mutated_model_types
.Empty()) {
802 if (weak_handle_this_
.IsInitialized()) {
803 weak_handle_this_
.Call(FROM_HERE
,
804 &SyncManagerImpl::RequestNudgeForDataTypes
,
806 mutated_model_types
);
813 void SyncManagerImpl::SetExtraChangeRecordData(int64 id
,
814 ModelType type
, ChangeReorderBuffer
* buffer
,
815 Cryptographer
* cryptographer
, const syncable::EntryKernel
& original
,
816 bool existed_before
, bool exists_now
) {
817 // If this is a deletion and the datatype was encrypted, we need to decrypt it
818 // and attach it to the buffer.
819 if (!exists_now
&& existed_before
) {
820 sync_pb::EntitySpecifics
original_specifics(original
.ref(SPECIFICS
));
821 if (type
== PASSWORDS
) {
822 // Passwords must use their own legacy ExtraPasswordChangeRecordData.
823 scoped_ptr
<sync_pb::PasswordSpecificsData
> data(
824 DecryptPasswordSpecifics(original_specifics
, cryptographer
));
829 buffer
->SetExtraDataForId(id
, new ExtraPasswordChangeRecordData(*data
));
830 } else if (original_specifics
.has_encrypted()) {
831 // All other datatypes can just create a new unencrypted specifics and
833 const sync_pb::EncryptedData
& encrypted
= original_specifics
.encrypted();
834 if (!cryptographer
->Decrypt(encrypted
, &original_specifics
)) {
839 buffer
->SetSpecificsForId(id
, original_specifics
);
843 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
844 const ImmutableWriteTransactionInfo
& write_transaction_info
,
845 syncable::BaseTransaction
* trans
,
846 std::vector
<int64
>* entries_changed
) {
847 // We only expect one notification per sync step, so change_buffers_ should
848 // contain no pending entries.
849 LOG_IF(WARNING
, !change_records_
.empty()) <<
850 "CALCULATE_CHANGES called with unapplied old changes.";
852 ChangeReorderBuffer change_buffers
[MODEL_TYPE_COUNT
];
854 Cryptographer
* crypto
= directory()->GetCryptographer(trans
);
855 const syncable::ImmutableEntryKernelMutationMap
& mutations
=
856 write_transaction_info
.Get().mutations
;
857 for (syncable::EntryKernelMutationMap::const_iterator it
=
858 mutations
.Get().begin(); it
!= mutations
.Get().end(); ++it
) {
859 bool existed_before
= !it
->second
.original
.ref(syncable::IS_DEL
);
860 bool exists_now
= !it
->second
.mutated
.ref(syncable::IS_DEL
);
862 // Omit items that aren't associated with a model.
864 GetModelTypeFromSpecifics(it
->second
.mutated
.ref(SPECIFICS
));
865 if (type
< FIRST_REAL_MODEL_TYPE
)
868 int64 handle
= it
->first
;
869 if (exists_now
&& !existed_before
)
870 change_buffers
[type
].PushAddedItem(handle
);
871 else if (!exists_now
&& existed_before
)
872 change_buffers
[type
].PushDeletedItem(handle
);
873 else if (exists_now
&& existed_before
&&
874 VisiblePropertiesDiffer(it
->second
, crypto
)) {
875 change_buffers
[type
].PushUpdatedItem(handle
);
878 SetExtraChangeRecordData(handle
, type
, &change_buffers
[type
], crypto
,
879 it
->second
.original
, existed_before
, exists_now
);
882 ReadTransaction
read_trans(GetUserShare(), trans
);
883 for (int i
= FIRST_REAL_MODEL_TYPE
; i
< MODEL_TYPE_COUNT
; ++i
) {
884 if (!change_buffers
[i
].IsEmpty()) {
885 if (change_buffers
[i
].GetAllChangesInTreeOrder(&read_trans
,
886 &(change_records_
[i
]))) {
887 for (size_t j
= 0; j
< change_records_
[i
].Get().size(); ++j
)
888 entries_changed
->push_back((change_records_
[i
].Get())[j
].id
);
890 if (change_records_
[i
].Get().empty())
891 change_records_
.erase(i
);
896 TimeDelta
SyncManagerImpl::GetNudgeDelayTimeDelta(
897 const ModelType
& model_type
) {
898 return NudgeStrategy::GetNudgeDelayTimeDelta(model_type
, this);
901 void SyncManagerImpl::RequestNudgeForDataTypes(
902 const tracked_objects::Location
& nudge_location
,
903 ModelTypeSet types
) {
904 debug_info_event_listener_
.OnNudgeFromDatatype(types
.First().Get());
906 // TODO(lipalani) : Calculate the nudge delay based on all types.
907 base::TimeDelta nudge_delay
= NudgeStrategy::GetNudgeDelayTimeDelta(
910 allstatus_
.IncrementNudgeCounter(NUDGE_SOURCE_LOCAL
);
911 scheduler_
->ScheduleLocalNudge(nudge_delay
,
916 void SyncManagerImpl::OnSyncEngineEvent(const SyncEngineEvent
& event
) {
917 DCHECK(thread_checker_
.CalledOnValidThread());
918 // Only send an event if this is due to a cycle ending and this cycle
919 // concludes a canonical "sync" process; that is, based on what is known
920 // locally we are "all happy" and up-to-date. There may be new changes on
921 // the server, but we'll get them on a subsequent sync.
923 // Notifications are sent at the end of every sync cycle, regardless of
924 // whether we should sync again.
925 if (event
.what_happened
== SyncEngineEvent::SYNC_CYCLE_ENDED
) {
927 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
932 DVLOG(1) << "Sending OnSyncCycleCompleted";
933 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
934 OnSyncCycleCompleted(event
.snapshot
));
937 if (event
.what_happened
== SyncEngineEvent::STOP_SYNCING_PERMANENTLY
) {
938 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
939 OnStopSyncingPermanently());
943 if (event
.what_happened
== SyncEngineEvent::ACTIONABLE_ERROR
) {
945 SyncManager::Observer
, observers_
,
947 event
.snapshot
.model_neutral_state().sync_protocol_error
));
952 void SyncManagerImpl::SetJsEventHandler(
953 const WeakHandle
<JsEventHandler
>& event_handler
) {
954 js_event_handler_
= event_handler
;
955 js_sync_manager_observer_
.SetJsEventHandler(js_event_handler_
);
956 js_mutation_event_observer_
.SetJsEventHandler(js_event_handler_
);
957 js_sync_encryption_handler_observer_
.SetJsEventHandler(js_event_handler_
);
960 void SyncManagerImpl::ProcessJsMessage(
961 const std::string
& name
, const JsArgList
& args
,
962 const WeakHandle
<JsReplyHandler
>& reply_handler
) {
968 if (!reply_handler
.IsInitialized()) {
969 DVLOG(1) << "Uninitialized reply handler; dropping unknown message "
970 << name
<< " with args " << args
.ToString();
974 JsMessageHandler js_message_handler
= js_message_handlers_
[name
];
975 if (js_message_handler
.is_null()) {
976 DVLOG(1) << "Dropping unknown message " << name
977 << " with args " << args
.ToString();
981 reply_handler
.Call(FROM_HERE
,
982 &JsReplyHandler::HandleJsReply
,
983 name
, js_message_handler
.Run(args
));
986 void SyncManagerImpl::BindJsMessageHandler(
987 const std::string
& name
,
988 UnboundJsMessageHandler unbound_message_handler
) {
989 js_message_handlers_
[name
] =
990 base::Bind(unbound_message_handler
, base::Unretained(this));
993 base::DictionaryValue
* SyncManagerImpl::NotificationInfoToValue(
994 const NotificationInfoMap
& notification_info
) {
995 base::DictionaryValue
* value
= new base::DictionaryValue();
997 for (NotificationInfoMap::const_iterator it
= notification_info
.begin();
998 it
!= notification_info
.end(); ++it
) {
999 const std::string model_type_str
= ModelTypeToString(it
->first
);
1000 value
->Set(model_type_str
, it
->second
.ToValue());
1006 std::string
SyncManagerImpl::NotificationInfoToString(
1007 const NotificationInfoMap
& notification_info
) {
1008 scoped_ptr
<base::DictionaryValue
> value(
1009 NotificationInfoToValue(notification_info
));
1011 base::JSONWriter::Write(value
.get(), &str
);
1015 JsArgList
SyncManagerImpl::GetNotificationState(
1016 const JsArgList
& args
) {
1017 const std::string
& notification_state
=
1018 InvalidatorStateToString(invalidator_state_
);
1019 DVLOG(1) << "GetNotificationState: " << notification_state
;
1020 base::ListValue return_args
;
1021 return_args
.Append(new base::StringValue(notification_state
));
1022 return JsArgList(&return_args
);
1025 JsArgList
SyncManagerImpl::GetNotificationInfo(
1026 const JsArgList
& args
) {
1027 DVLOG(1) << "GetNotificationInfo: "
1028 << NotificationInfoToString(notification_info_map_
);
1029 base::ListValue return_args
;
1030 return_args
.Append(NotificationInfoToValue(notification_info_map_
));
1031 return JsArgList(&return_args
);
1034 JsArgList
SyncManagerImpl::GetClientServerTraffic(
1035 const JsArgList
& args
) {
1036 base::ListValue return_args
;
1037 base::ListValue
* value
= traffic_recorder_
.ToValue();
1039 return_args
.Append(value
);
1040 return JsArgList(&return_args
);
1043 JsArgList
SyncManagerImpl::GetAllNodes(const JsArgList
& args
) {
1044 ReadTransaction
trans(FROM_HERE
, GetUserShare());
1045 base::ListValue return_args
;
1046 scoped_ptr
<base::ListValue
> nodes(
1047 trans
.GetDirectory()->GetAllNodeDetails(trans
.GetWrappedTrans()));
1048 return_args
.Append(nodes
.release());
1049 return JsArgList(&return_args
);
1052 void SyncManagerImpl::UpdateNotificationInfo(
1053 const ObjectIdInvalidationMap
& invalidation_map
) {
1054 ObjectIdSet ids
= invalidation_map
.GetObjectIds();
1055 for (ObjectIdSet::const_iterator it
= ids
.begin(); it
!= ids
.end(); ++it
) {
1056 ModelType type
= UNSPECIFIED
;
1057 if (!ObjectIdToRealModelType(*it
, &type
)) {
1060 const SingleObjectInvalidationSet
& type_invalidations
=
1061 invalidation_map
.ForObject(*it
);
1062 for (SingleObjectInvalidationSet::const_iterator inv_it
=
1063 type_invalidations
.begin(); inv_it
!= type_invalidations
.end();
1065 NotificationInfo
* info
= ¬ification_info_map_
[type
];
1066 info
->total_count
++;
1067 std::string payload
=
1068 inv_it
->is_unknown_version() ? "UNKNOWN" : inv_it
->payload();
1069 info
->payload
= payload
;
1074 void SyncManagerImpl::OnInvalidatorStateChange(InvalidatorState state
) {
1075 DCHECK(thread_checker_
.CalledOnValidThread());
1077 const std::string
& state_str
= InvalidatorStateToString(state
);
1078 invalidator_state_
= state
;
1079 DVLOG(1) << "Invalidator state changed to: " << state_str
;
1080 const bool notifications_enabled
=
1081 (invalidator_state_
== INVALIDATIONS_ENABLED
);
1082 allstatus_
.SetNotificationsEnabled(notifications_enabled
);
1083 scheduler_
->SetNotificationsEnabled(notifications_enabled
);
1085 if (js_event_handler_
.IsInitialized()) {
1086 base::DictionaryValue details
;
1087 details
.SetString("state", state_str
);
1088 js_event_handler_
.Call(FROM_HERE
,
1089 &JsEventHandler::HandleJsEvent
,
1090 "onNotificationStateChange",
1091 JsEventDetails(&details
));
1095 void SyncManagerImpl::OnIncomingInvalidation(
1096 const ObjectIdInvalidationMap
& invalidation_map
) {
1097 DCHECK(thread_checker_
.CalledOnValidThread());
1099 // We should never receive IDs from non-sync objects.
1100 ObjectIdSet ids
= invalidation_map
.GetObjectIds();
1101 for (ObjectIdSet::const_iterator it
= ids
.begin(); it
!= ids
.end(); ++it
) {
1103 if (!ObjectIdToRealModelType(*it
, &type
)) {
1104 DLOG(WARNING
) << "Notification has invalid id: " << ObjectIdToString(*it
);
1108 if (invalidation_map
.Empty()) {
1109 LOG(WARNING
) << "Sync received invalidation without any type information.";
1111 allstatus_
.IncrementNudgeCounter(NUDGE_SOURCE_NOTIFICATION
);
1112 scheduler_
->ScheduleInvalidationNudge(
1113 TimeDelta::FromMilliseconds(kSyncSchedulerDelayMsec
),
1114 invalidation_map
, FROM_HERE
);
1115 allstatus_
.IncrementNotificationsReceived();
1116 UpdateNotificationInfo(invalidation_map
);
1117 debug_info_event_listener_
.OnIncomingNotification(invalidation_map
);
1120 if (js_event_handler_
.IsInitialized()) {
1121 base::DictionaryValue details
;
1122 base::ListValue
* changed_types
= new base::ListValue();
1123 details
.Set("changedTypes", changed_types
);
1125 ObjectIdSet id_set
= invalidation_map
.GetObjectIds();
1126 ModelTypeSet nudged_types
= ObjectIdSetToModelTypeSet(id_set
);
1127 DCHECK(!nudged_types
.Empty());
1128 for (ModelTypeSet::Iterator it
= nudged_types
.First();
1129 it
.Good(); it
.Inc()) {
1130 const std::string model_type_str
= ModelTypeToString(it
.Get());
1131 changed_types
->Append(new base::StringValue(model_type_str
));
1133 details
.SetString("source", "REMOTE_INVALIDATION");
1134 js_event_handler_
.Call(FROM_HERE
,
1135 &JsEventHandler::HandleJsEvent
,
1136 "onIncomingNotification",
1137 JsEventDetails(&details
));
1141 void SyncManagerImpl::RefreshTypes(ModelTypeSet types
) {
1142 DCHECK(thread_checker_
.CalledOnValidThread());
1143 if (types
.Empty()) {
1144 LOG(WARNING
) << "Sync received refresh request with no types specified.";
1146 allstatus_
.IncrementNudgeCounter(NUDGE_SOURCE_LOCAL_REFRESH
);
1147 scheduler_
->ScheduleLocalRefreshRequest(
1148 TimeDelta::FromMilliseconds(kSyncRefreshDelayMsec
),
1152 if (js_event_handler_
.IsInitialized()) {
1153 base::DictionaryValue details
;
1154 base::ListValue
* changed_types
= new base::ListValue();
1155 details
.Set("changedTypes", changed_types
);
1156 for (ModelTypeSet::Iterator it
= types
.First(); it
.Good(); it
.Inc()) {
1157 const std::string
& model_type_str
=
1158 ModelTypeToString(it
.Get());
1159 changed_types
->Append(new base::StringValue(model_type_str
));
1161 details
.SetString("source", "LOCAL_INVALIDATION");
1162 js_event_handler_
.Call(FROM_HERE
,
1163 &JsEventHandler::HandleJsEvent
,
1164 "onIncomingNotification",
1165 JsEventDetails(&details
));
1169 SyncStatus
SyncManagerImpl::GetDetailedStatus() const {
1170 return allstatus_
.status();
1173 void SyncManagerImpl::SaveChanges() {
1174 directory()->SaveChanges();
1177 UserShare
* SyncManagerImpl::GetUserShare() {
1178 DCHECK(initialized_
);
1182 const std::string
SyncManagerImpl::cache_guid() {
1183 DCHECK(initialized_
);
1184 return directory()->cache_guid();
1187 bool SyncManagerImpl::ReceivedExperiment(Experiments
* experiments
) {
1188 ReadTransaction
trans(FROM_HERE
, GetUserShare());
1189 ReadNode
nigori_node(&trans
);
1190 if (nigori_node
.InitByTagLookup(kNigoriTag
) != BaseNode::INIT_OK
) {
1191 DVLOG(1) << "Couldn't find Nigori node.";
1194 bool found_experiment
= false;
1196 ReadNode
autofill_culling_node(&trans
);
1197 if (autofill_culling_node
.InitByClientTagLookup(
1198 syncer::EXPERIMENTS
,
1199 syncer::kAutofillCullingTag
) == BaseNode::INIT_OK
&&
1200 autofill_culling_node
.GetExperimentsSpecifics().
1201 autofill_culling().enabled()) {
1202 experiments
->autofill_culling
= true;
1203 found_experiment
= true;
1206 ReadNode
favicon_sync_node(&trans
);
1207 if (favicon_sync_node
.InitByClientTagLookup(
1208 syncer::EXPERIMENTS
,
1209 syncer::kFaviconSyncTag
) == BaseNode::INIT_OK
) {
1210 experiments
->favicon_sync_limit
=
1211 favicon_sync_node
.GetExperimentsSpecifics().favicon_sync().
1212 favicon_sync_limit();
1213 found_experiment
= true;
1216 ReadNode
pre_commit_update_avoidance_node(&trans
);
1217 if (pre_commit_update_avoidance_node
.InitByClientTagLookup(
1218 syncer::EXPERIMENTS
,
1219 syncer::kPreCommitUpdateAvoidanceTag
) == BaseNode::INIT_OK
) {
1220 session_context_
->set_server_enabled_pre_commit_update_avoidance(
1221 pre_commit_update_avoidance_node
.GetExperimentsSpecifics().
1222 pre_commit_update_avoidance().enabled());
1223 // We don't bother setting found_experiment. The frontend doesn't need to
1227 return found_experiment
;
1230 bool SyncManagerImpl::HasUnsyncedItems() {
1231 ReadTransaction
trans(FROM_HERE
, GetUserShare());
1232 return (trans
.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1235 SyncEncryptionHandler
* SyncManagerImpl::GetEncryptionHandler() {
1236 return sync_encryption_handler_
.get();
1240 int SyncManagerImpl::GetDefaultNudgeDelay() {
1241 return kDefaultNudgeDelayMilliseconds
;
1245 int SyncManagerImpl::GetPreferencesNudgeDelay() {
1246 return kPreferencesNudgeDelayMilliseconds
;
1249 } // namespace syncer