1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/internal_api/sync_manager_impl.h"
9 #include "base/base64.h"
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/compiler_specific.h"
13 #include "base/json/json_writer.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/metrics/histogram.h"
16 #include "base/observer_list.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/thread_task_runner_handle.h"
19 #include "base/values.h"
20 #include "sync/engine/sync_scheduler.h"
21 #include "sync/engine/syncer_types.h"
22 #include "sync/internal_api/change_reorder_buffer.h"
23 #include "sync/internal_api/public/base/cancelation_signal.h"
24 #include "sync/internal_api/public/base/invalidation_interface.h"
25 #include "sync/internal_api/public/base/model_type.h"
26 #include "sync/internal_api/public/base_node.h"
27 #include "sync/internal_api/public/configure_reason.h"
28 #include "sync/internal_api/public/engine/polling_constants.h"
29 #include "sync/internal_api/public/http_post_provider_factory.h"
30 #include "sync/internal_api/public/internal_components_factory.h"
31 #include "sync/internal_api/public/read_node.h"
32 #include "sync/internal_api/public/read_transaction.h"
33 #include "sync/internal_api/public/sync_context.h"
34 #include "sync/internal_api/public/sync_context_proxy.h"
35 #include "sync/internal_api/public/user_share.h"
36 #include "sync/internal_api/public/util/experiments.h"
37 #include "sync/internal_api/public/write_node.h"
38 #include "sync/internal_api/public/write_transaction.h"
39 #include "sync/internal_api/sync_context_proxy_impl.h"
40 #include "sync/internal_api/syncapi_internal.h"
41 #include "sync/internal_api/syncapi_server_connection_manager.h"
42 #include "sync/protocol/proto_value_conversions.h"
43 #include "sync/protocol/sync.pb.h"
44 #include "sync/sessions/directory_type_debug_info_emitter.h"
45 #include "sync/syncable/directory.h"
46 #include "sync/syncable/entry.h"
47 #include "sync/syncable/in_memory_directory_backing_store.h"
48 #include "sync/syncable/on_disk_directory_backing_store.h"
50 using base::TimeDelta
;
51 using sync_pb::GetUpdatesCallerInfo
;
57 using sessions::SyncSessionContext
;
58 using syncable::ImmutableWriteTransactionInfo
;
59 using syncable::SPECIFICS
;
60 using syncable::UNIQUE_POSITION
;
64 GetUpdatesCallerInfo::GetUpdatesSource
GetSourceFromReason(
65 ConfigureReason reason
) {
67 case CONFIGURE_REASON_RECONFIGURATION
:
68 return GetUpdatesCallerInfo::RECONFIGURATION
;
69 case CONFIGURE_REASON_MIGRATION
:
70 return GetUpdatesCallerInfo::MIGRATION
;
71 case CONFIGURE_REASON_NEW_CLIENT
:
72 return GetUpdatesCallerInfo::NEW_CLIENT
;
73 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE
:
74 case CONFIGURE_REASON_CRYPTO
:
75 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE
;
76 case CONFIGURE_REASON_PROGRAMMATIC
:
77 return GetUpdatesCallerInfo::PROGRAMMATIC
;
81 return GetUpdatesCallerInfo::UNKNOWN
;
86 SyncManagerImpl::SyncManagerImpl(const std::string
& name
)
88 change_delegate_(NULL
),
90 observing_network_connectivity_changes_(false),
91 weak_ptr_factory_(this) {
92 // Pre-fill |notification_info_map_|.
93 for (int i
= FIRST_REAL_MODEL_TYPE
; i
< MODEL_TYPE_COUNT
; ++i
) {
94 notification_info_map_
.insert(
95 std::make_pair(ModelTypeFromInt(i
), NotificationInfo()));
99 SyncManagerImpl::~SyncManagerImpl() {
100 DCHECK(thread_checker_
.CalledOnValidThread());
101 CHECK(!initialized_
);
104 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
105 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
107 base::DictionaryValue
* SyncManagerImpl::NotificationInfo::ToValue() const {
108 base::DictionaryValue
* value
= new base::DictionaryValue();
109 value
->SetInteger("totalCount", total_count
);
110 value
->SetString("payload", payload
);
114 bool SyncManagerImpl::VisiblePositionsDiffer(
115 const syncable::EntryKernelMutation
& mutation
) const {
116 const syncable::EntryKernel
& a
= mutation
.original
;
117 const syncable::EntryKernel
& b
= mutation
.mutated
;
118 if (!b
.ShouldMaintainPosition())
120 if (!a
.ref(UNIQUE_POSITION
).Equals(b
.ref(UNIQUE_POSITION
)))
122 if (a
.ref(syncable::PARENT_ID
) != b
.ref(syncable::PARENT_ID
))
127 bool SyncManagerImpl::VisiblePropertiesDiffer(
128 const syncable::EntryKernelMutation
& mutation
,
129 Cryptographer
* cryptographer
) const {
130 const syncable::EntryKernel
& a
= mutation
.original
;
131 const syncable::EntryKernel
& b
= mutation
.mutated
;
132 const sync_pb::EntitySpecifics
& a_specifics
= a
.ref(SPECIFICS
);
133 const sync_pb::EntitySpecifics
& b_specifics
= b
.ref(SPECIFICS
);
134 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics
),
135 GetModelTypeFromSpecifics(b_specifics
));
136 ModelType model_type
= GetModelTypeFromSpecifics(b_specifics
);
137 // Suppress updates to items that aren't tracked by any browser model.
138 if (model_type
< FIRST_REAL_MODEL_TYPE
||
139 !a
.ref(syncable::UNIQUE_SERVER_TAG
).empty()) {
142 if (a
.ref(syncable::IS_DIR
) != b
.ref(syncable::IS_DIR
))
144 if (!AreSpecificsEqual(cryptographer
,
145 a
.ref(syncable::SPECIFICS
),
146 b
.ref(syncable::SPECIFICS
))) {
149 if (!AreAttachmentMetadataEqual(a
.ref(syncable::ATTACHMENT_METADATA
),
150 b
.ref(syncable::ATTACHMENT_METADATA
))) {
153 // We only care if the name has changed if neither specifics is encrypted
154 // (encrypted nodes blow away the NON_UNIQUE_NAME).
155 if (!a_specifics
.has_encrypted() && !b_specifics
.has_encrypted() &&
156 a
.ref(syncable::NON_UNIQUE_NAME
) != b
.ref(syncable::NON_UNIQUE_NAME
))
158 if (VisiblePositionsDiffer(mutation
))
163 ModelTypeSet
SyncManagerImpl::InitialSyncEndedTypes() {
164 return directory()->InitialSyncEndedTypes();
167 ModelTypeSet
SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
168 ModelTypeSet types
) {
170 for (ModelTypeSet::Iterator i
= types
.First(); i
.Good(); i
.Inc()) {
171 sync_pb::DataTypeProgressMarker marker
;
172 directory()->GetDownloadProgress(i
.Get(), &marker
);
174 if (marker
.token().empty())
180 void SyncManagerImpl::ConfigureSyncer(
181 ConfigureReason reason
,
182 ModelTypeSet to_download
,
183 ModelTypeSet to_purge
,
184 ModelTypeSet to_journal
,
185 ModelTypeSet to_unapply
,
186 const ModelSafeRoutingInfo
& new_routing_info
,
187 const base::Closure
& ready_task
,
188 const base::Closure
& retry_task
) {
189 DCHECK(thread_checker_
.CalledOnValidThread());
190 DCHECK(!ready_task
.is_null());
191 DCHECK(initialized_
);
193 DVLOG(1) << "Configuring -"
194 << "\n\t" << "current types: "
195 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info
))
196 << "\n\t" << "types to download: "
197 << ModelTypeSetToString(to_download
)
198 << "\n\t" << "types to purge: "
199 << ModelTypeSetToString(to_purge
)
200 << "\n\t" << "types to journal: "
201 << ModelTypeSetToString(to_journal
)
202 << "\n\t" << "types to unapply: "
203 << ModelTypeSetToString(to_unapply
);
204 if (!PurgeDisabledTypes(to_purge
,
207 // We failed to cleanup the types. Invoke the ready task without actually
208 // configuring any types. The caller should detect this as a configuration
209 // failure and act appropriately.
214 ConfigurationParams
params(GetSourceFromReason(reason
),
220 scheduler_
->Start(SyncScheduler::CONFIGURATION_MODE
, base::Time());
221 scheduler_
->ScheduleConfiguration(params
);
224 void SyncManagerImpl::Init(InitArgs
* args
) {
225 CHECK(!initialized_
);
226 DCHECK(thread_checker_
.CalledOnValidThread());
227 DCHECK(args
->post_factory
.get());
228 DCHECK(!args
->credentials
.email
.empty());
229 DCHECK(!args
->credentials
.sync_token
.empty());
230 DCHECK(!args
->credentials
.scope_set
.empty());
231 DCHECK(args
->cancelation_signal
);
232 DVLOG(1) << "SyncManager starting Init...";
234 weak_handle_this_
= MakeWeakHandle(weak_ptr_factory_
.GetWeakPtr());
236 change_delegate_
= args
->change_delegate
;
238 AddObserver(&js_sync_manager_observer_
);
239 SetJsEventHandler(args
->event_handler
);
241 AddObserver(&debug_info_event_listener_
);
243 database_path_
= args
->database_location
.Append(
244 syncable::Directory::kSyncDatabaseFilename
);
245 report_unrecoverable_error_function_
=
246 args
->report_unrecoverable_error_function
;
248 allstatus_
.SetHasKeystoreKey(
249 !args
->restored_keystore_key_for_bootstrapping
.empty());
250 sync_encryption_handler_
.reset(new SyncEncryptionHandlerImpl(
251 &share_
, args
->encryptor
, args
->restored_key_for_bootstrapping
,
252 args
->restored_keystore_key_for_bootstrapping
, args
->clear_data_option
));
253 sync_encryption_handler_
->AddObserver(this);
254 sync_encryption_handler_
->AddObserver(&debug_info_event_listener_
);
255 sync_encryption_handler_
->AddObserver(&js_sync_encryption_handler_observer_
);
257 base::FilePath absolute_db_path
= database_path_
;
258 DCHECK(absolute_db_path
.IsAbsolute());
260 scoped_ptr
<syncable::DirectoryBackingStore
> backing_store
=
261 args
->internal_components_factory
->BuildDirectoryBackingStore(
262 InternalComponentsFactory::STORAGE_ON_DISK
,
263 args
->credentials
.email
, absolute_db_path
).Pass();
265 DCHECK(backing_store
.get());
266 share_
.directory
.reset(
267 new syncable::Directory(
268 backing_store
.release(),
269 args
->unrecoverable_error_handler
,
270 report_unrecoverable_error_function_
,
271 sync_encryption_handler_
.get(),
272 sync_encryption_handler_
->GetCryptographerUnsafe()));
273 share_
.sync_credentials
= args
->credentials
;
275 // UserShare is accessible to a lot of code that doesn't need access to the
276 // sync token so clear sync_token from the UserShare.
277 share_
.sync_credentials
.sync_token
= "";
279 const std::string
& username
= args
->credentials
.email
;
280 DVLOG(1) << "Username: " << username
;
281 if (!OpenDirectory(username
)) {
282 NotifyInitializationFailure();
283 LOG(ERROR
) << "Sync manager initialization failed!";
287 // Now that we have opened the Directory we can restore any previously saved
289 if (args
->saved_nigori_state
) {
290 sync_encryption_handler_
->RestoreNigori(*args
->saved_nigori_state
);
291 args
->saved_nigori_state
.reset();
294 connection_manager_
.reset(new SyncAPIServerConnectionManager(
295 args
->service_url
.host() + args
->service_url
.path(),
296 args
->service_url
.EffectiveIntPort(),
297 args
->service_url
.SchemeIsCryptographic(), args
->post_factory
.release(),
298 args
->cancelation_signal
));
299 connection_manager_
->set_client_id(directory()->cache_guid());
300 connection_manager_
->AddListener(this);
302 std::string sync_id
= directory()->cache_guid();
304 DVLOG(1) << "Setting sync client ID: " << sync_id
;
305 allstatus_
.SetSyncId(sync_id
);
306 DVLOG(1) << "Setting invalidator client ID: " << args
->invalidator_client_id
;
307 allstatus_
.SetInvalidatorClientId(args
->invalidator_client_id
);
309 model_type_registry_
.reset(
310 new ModelTypeRegistry(args
->workers
, directory(), this));
311 sync_encryption_handler_
->AddObserver(model_type_registry_
.get());
313 // Bind the SyncContext WeakPtr to this thread. This helps us crash earlier
314 // if the pointer is misused in debug mode.
315 base::WeakPtr
<syncer_v2::SyncContext
> weak_core
=
316 model_type_registry_
->AsWeakPtr();
319 sync_context_proxy_
.reset(new syncer_v2::SyncContextProxyImpl(
320 base::ThreadTaskRunnerHandle::Get(), weak_core
));
322 // Build a SyncSessionContext and store the worker in it.
323 DVLOG(1) << "Sync is bringing up SyncSessionContext.";
324 std::vector
<SyncEngineEventListener
*> listeners
;
325 listeners
.push_back(&allstatus_
);
326 listeners
.push_back(this);
328 args
->internal_components_factory
->BuildContext(
329 connection_manager_
.get(),
331 args
->extensions_activity
,
333 &debug_info_event_listener_
,
334 model_type_registry_
.get(),
335 args
->invalidator_client_id
)
337 session_context_
->set_account_name(args
->credentials
.email
);
338 scheduler_
= args
->internal_components_factory
->BuildScheduler(
339 name_
, session_context_
.get(), args
->cancelation_signal
).Pass();
341 scheduler_
->Start(SyncScheduler::CONFIGURATION_MODE
, base::Time());
345 net::NetworkChangeNotifier::AddIPAddressObserver(this);
346 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
347 observing_network_connectivity_changes_
= true;
349 UpdateCredentials(args
->credentials
);
351 NotifyInitializationSuccess();
354 void SyncManagerImpl::NotifyInitializationSuccess() {
355 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
356 OnInitializationComplete(
357 MakeWeakHandle(weak_ptr_factory_
.GetWeakPtr()),
358 MakeWeakHandle(debug_info_event_listener_
.GetWeakPtr()),
359 true, InitialSyncEndedTypes()));
362 void SyncManagerImpl::NotifyInitializationFailure() {
363 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
364 OnInitializationComplete(
365 MakeWeakHandle(weak_ptr_factory_
.GetWeakPtr()),
366 MakeWeakHandle(debug_info_event_listener_
.GetWeakPtr()),
367 false, ModelTypeSet()));
370 void SyncManagerImpl::OnPassphraseRequired(
371 PassphraseRequiredReason reason
,
372 const sync_pb::EncryptedData
& pending_keys
) {
376 void SyncManagerImpl::OnPassphraseAccepted() {
380 void SyncManagerImpl::OnBootstrapTokenUpdated(
381 const std::string
& bootstrap_token
,
382 BootstrapTokenType type
) {
383 if (type
== KEYSTORE_BOOTSTRAP_TOKEN
)
384 allstatus_
.SetHasKeystoreKey(true);
387 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types
,
388 bool encrypt_everything
) {
389 allstatus_
.SetEncryptedTypes(encrypted_types
);
392 void SyncManagerImpl::OnEncryptionComplete() {
396 void SyncManagerImpl::OnCryptographerStateChanged(
397 Cryptographer
* cryptographer
) {
398 allstatus_
.SetCryptographerReady(cryptographer
->is_ready());
399 allstatus_
.SetCryptoHasPendingKeys(cryptographer
->has_pending_keys());
400 allstatus_
.SetKeystoreMigrationTime(
401 sync_encryption_handler_
->migration_time());
404 void SyncManagerImpl::OnPassphraseTypeChanged(
406 base::Time explicit_passphrase_time
) {
407 allstatus_
.SetPassphraseType(type
);
408 allstatus_
.SetKeystoreMigrationTime(
409 sync_encryption_handler_
->migration_time());
412 void SyncManagerImpl::OnLocalSetPassphraseEncryption(
413 const SyncEncryptionHandler::NigoriState
& nigori_state
) {
416 void SyncManagerImpl::StartSyncingNormally(
417 const ModelSafeRoutingInfo
& routing_info
,
418 base::Time last_poll_time
) {
419 // Start the sync scheduler.
420 // TODO(sync): We always want the newest set of routes when we switch back
421 // to normal mode. Figure out how to enforce set_routing_info is always
422 // appropriately set and that it's only modified when switching to normal
424 DCHECK(thread_checker_
.CalledOnValidThread());
425 session_context_
->SetRoutingInfo(routing_info
);
426 scheduler_
->Start(SyncScheduler::NORMAL_MODE
,
430 syncable::Directory
* SyncManagerImpl::directory() {
431 return share_
.directory
.get();
434 const SyncScheduler
* SyncManagerImpl::scheduler() const {
435 return scheduler_
.get();
438 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
439 return connection_manager_
->HasInvalidAuthToken();
442 bool SyncManagerImpl::OpenDirectory(const std::string
& username
) {
443 DCHECK(!initialized_
) << "Should only happen once";
445 // Set before Open().
446 change_observer_
= MakeWeakHandle(js_mutation_event_observer_
.AsWeakPtr());
447 WeakHandle
<syncable::TransactionObserver
> transaction_observer(
448 MakeWeakHandle(js_mutation_event_observer_
.AsWeakPtr()));
450 syncable::DirOpenResult open_result
= syncable::NOT_INITIALIZED
;
451 open_result
= directory()->Open(username
, this, transaction_observer
);
452 if (open_result
!= syncable::OPENED
) {
453 LOG(ERROR
) << "Could not open share for:" << username
;
457 // Unapplied datatypes (those that do not have initial sync ended set) get
458 // re-downloaded during any configuration. But, it's possible for a datatype
459 // to have a progress marker but not have initial sync ended yet, making
460 // it a candidate for migration. This is a problem, as the DataTypeManager
461 // does not support a migration while it's already in the middle of a
462 // configuration. As a result, any partially synced datatype can stall the
463 // DTM, waiting for the configuration to complete, which it never will due
464 // to the migration error. In addition, a partially synced nigori will
465 // trigger the migration logic before the backend is initialized, resulting
466 // in crashes. We therefore detect and purge any partially synced types as
467 // part of initialization.
468 if (!PurgePartiallySyncedTypes())
474 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
475 ModelTypeSet partially_synced_types
= ModelTypeSet::All();
476 partially_synced_types
.RemoveAll(InitialSyncEndedTypes());
477 partially_synced_types
.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
478 ModelTypeSet::All()));
480 DVLOG(1) << "Purging partially synced types "
481 << ModelTypeSetToString(partially_synced_types
);
482 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
483 partially_synced_types
.Size());
484 if (partially_synced_types
.Empty())
486 return directory()->PurgeEntriesWithTypeIn(partially_synced_types
,
491 bool SyncManagerImpl::PurgeDisabledTypes(
492 ModelTypeSet to_purge
,
493 ModelTypeSet to_journal
,
494 ModelTypeSet to_unapply
) {
495 if (to_purge
.Empty())
497 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge
);
498 DCHECK(to_purge
.HasAll(to_journal
));
499 DCHECK(to_purge
.HasAll(to_unapply
));
500 return directory()->PurgeEntriesWithTypeIn(to_purge
, to_journal
, to_unapply
);
503 void SyncManagerImpl::UpdateCredentials(const SyncCredentials
& credentials
) {
504 DCHECK(thread_checker_
.CalledOnValidThread());
505 DCHECK(initialized_
);
506 DCHECK(!credentials
.email
.empty());
507 DCHECK(!credentials
.sync_token
.empty());
508 DCHECK(!credentials
.scope_set
.empty());
510 observing_network_connectivity_changes_
= true;
511 if (!connection_manager_
->SetAuthToken(credentials
.sync_token
))
512 return; // Auth token is known to be invalid, so exit early.
514 scheduler_
->OnCredentialsUpdated();
516 // TODO(zea): pass the credential age to the debug info event listener.
519 void SyncManagerImpl::AddObserver(SyncManager::Observer
* observer
) {
520 DCHECK(thread_checker_
.CalledOnValidThread());
521 observers_
.AddObserver(observer
);
524 void SyncManagerImpl::RemoveObserver(SyncManager::Observer
* observer
) {
525 DCHECK(thread_checker_
.CalledOnValidThread());
526 observers_
.RemoveObserver(observer
);
529 void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason
) {
530 DCHECK(thread_checker_
.CalledOnValidThread());
532 // Prevent any in-flight method calls from running. Also
533 // invalidates |weak_handle_this_| and |change_observer_|.
534 weak_ptr_factory_
.InvalidateWeakPtrs();
535 js_mutation_event_observer_
.InvalidateWeakPtrs();
538 session_context_
.reset();
540 if (model_type_registry_
)
541 sync_encryption_handler_
->RemoveObserver(model_type_registry_
.get());
543 model_type_registry_
.reset();
545 if (sync_encryption_handler_
) {
546 sync_encryption_handler_
->RemoveObserver(&debug_info_event_listener_
);
547 sync_encryption_handler_
->RemoveObserver(this);
550 SetJsEventHandler(WeakHandle
<JsEventHandler
>());
551 RemoveObserver(&js_sync_manager_observer_
);
553 RemoveObserver(&debug_info_event_listener_
);
555 // |connection_manager_| may end up being NULL here in tests (in synchronous
556 // initialization mode).
558 // TODO(akalin): Fix this behavior.
559 if (connection_manager_
)
560 connection_manager_
->RemoveListener(this);
561 connection_manager_
.reset();
563 net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
564 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
565 observing_network_connectivity_changes_
= false;
567 if (initialized_
&& directory()) {
568 directory()->SaveChanges();
571 share_
.directory
.reset();
573 change_delegate_
= NULL
;
575 initialized_
= false;
577 // We reset these here, since only now we know they will not be
578 // accessed from other threads (since we shut down everything).
579 change_observer_
.Reset();
580 weak_handle_this_
.Reset();
583 void SyncManagerImpl::OnIPAddressChanged() {
584 if (!observing_network_connectivity_changes_
) {
585 DVLOG(1) << "IP address change dropped.";
588 DVLOG(1) << "IP address change detected.";
589 OnNetworkConnectivityChangedImpl();
592 void SyncManagerImpl::OnConnectionTypeChanged(
593 net::NetworkChangeNotifier::ConnectionType
) {
594 if (!observing_network_connectivity_changes_
) {
595 DVLOG(1) << "Connection type change dropped.";
598 DVLOG(1) << "Connection type change detected.";
599 OnNetworkConnectivityChangedImpl();
602 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
603 DCHECK(thread_checker_
.CalledOnValidThread());
604 scheduler_
->OnConnectionStatusChange();
607 void SyncManagerImpl::OnServerConnectionEvent(
608 const ServerConnectionEvent
& event
) {
609 DCHECK(thread_checker_
.CalledOnValidThread());
610 if (event
.connection_code
==
611 HttpResponse::SERVER_CONNECTION_OK
) {
612 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
613 OnConnectionStatusChange(CONNECTION_OK
));
616 if (event
.connection_code
== HttpResponse::SYNC_AUTH_ERROR
) {
617 observing_network_connectivity_changes_
= false;
618 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
619 OnConnectionStatusChange(CONNECTION_AUTH_ERROR
));
622 if (event
.connection_code
== HttpResponse::SYNC_SERVER_ERROR
) {
623 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
624 OnConnectionStatusChange(CONNECTION_SERVER_ERROR
));
628 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
629 ModelTypeSet models_with_changes
) {
630 // This notification happens immediately after the transaction mutex is
631 // released. This allows work to be performed without blocking other threads
632 // from acquiring a transaction.
633 if (!change_delegate_
)
637 for (ModelTypeSet::Iterator it
= models_with_changes
.First();
638 it
.Good(); it
.Inc()) {
639 change_delegate_
->OnChangesComplete(it
.Get());
640 change_observer_
.Call(
642 &SyncManager::ChangeObserver::OnChangesComplete
,
648 SyncManagerImpl::HandleTransactionEndingChangeEvent(
649 const ImmutableWriteTransactionInfo
& write_transaction_info
,
650 syncable::BaseTransaction
* trans
) {
651 // This notification happens immediately before a syncable WriteTransaction
652 // falls out of scope. It happens while the channel mutex is still held,
653 // and while the transaction mutex is held, so it cannot be re-entrant.
654 if (!change_delegate_
|| change_records_
.empty())
655 return ModelTypeSet();
657 // This will continue the WriteTransaction using a read only wrapper.
658 // This is the last chance for read to occur in the WriteTransaction
659 // that's closing. This special ReadTransaction will not close the
660 // underlying transaction.
661 ReadTransaction
read_trans(GetUserShare(), trans
);
663 ModelTypeSet models_with_changes
;
664 for (ChangeRecordMap::const_iterator it
= change_records_
.begin();
665 it
!= change_records_
.end(); ++it
) {
666 DCHECK(!it
->second
.Get().empty());
667 ModelType type
= ModelTypeFromInt(it
->first
);
669 OnChangesApplied(type
, trans
->directory()->GetTransactionVersion(type
),
670 &read_trans
, it
->second
);
671 change_observer_
.Call(FROM_HERE
,
672 &SyncManager::ChangeObserver::OnChangesApplied
,
673 type
, write_transaction_info
.Get().id
, it
->second
);
674 models_with_changes
.Put(type
);
676 change_records_
.clear();
677 return models_with_changes
;
680 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
681 const ImmutableWriteTransactionInfo
& write_transaction_info
,
682 syncable::BaseTransaction
* trans
,
683 std::vector
<int64
>* entries_changed
) {
684 // We have been notified about a user action changing a sync model.
685 LOG_IF(WARNING
, !change_records_
.empty()) <<
686 "CALCULATE_CHANGES called with unapplied old changes.";
688 // The mutated model type, or UNSPECIFIED if nothing was mutated.
689 ModelTypeSet mutated_model_types
;
691 const syncable::ImmutableEntryKernelMutationMap
& mutations
=
692 write_transaction_info
.Get().mutations
;
693 for (syncable::EntryKernelMutationMap::const_iterator it
=
694 mutations
.Get().begin(); it
!= mutations
.Get().end(); ++it
) {
695 if (!it
->second
.mutated
.ref(syncable::IS_UNSYNCED
)) {
699 ModelType model_type
=
700 GetModelTypeFromSpecifics(it
->second
.mutated
.ref(SPECIFICS
));
701 if (model_type
< FIRST_REAL_MODEL_TYPE
) {
702 NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
706 // Found real mutation.
707 if (model_type
!= UNSPECIFIED
) {
708 mutated_model_types
.Put(model_type
);
709 entries_changed
->push_back(it
->second
.mutated
.ref(syncable::META_HANDLE
));
713 // Nudge if necessary.
714 if (!mutated_model_types
.Empty()) {
715 if (weak_handle_this_
.IsInitialized()) {
716 weak_handle_this_
.Call(FROM_HERE
,
717 &SyncManagerImpl::RequestNudgeForDataTypes
,
719 mutated_model_types
);
726 void SyncManagerImpl::SetExtraChangeRecordData(int64 id
,
727 ModelType type
, ChangeReorderBuffer
* buffer
,
728 Cryptographer
* cryptographer
, const syncable::EntryKernel
& original
,
729 bool existed_before
, bool exists_now
) {
730 // If this is a deletion and the datatype was encrypted, we need to decrypt it
731 // and attach it to the buffer.
732 if (!exists_now
&& existed_before
) {
733 sync_pb::EntitySpecifics
original_specifics(original
.ref(SPECIFICS
));
734 if (type
== PASSWORDS
) {
735 // Passwords must use their own legacy ExtraPasswordChangeRecordData.
736 scoped_ptr
<sync_pb::PasswordSpecificsData
> data(
737 DecryptPasswordSpecifics(original_specifics
, cryptographer
));
742 buffer
->SetExtraDataForId(id
, new ExtraPasswordChangeRecordData(*data
));
743 } else if (original_specifics
.has_encrypted()) {
744 // All other datatypes can just create a new unencrypted specifics and
746 const sync_pb::EncryptedData
& encrypted
= original_specifics
.encrypted();
747 if (!cryptographer
->Decrypt(encrypted
, &original_specifics
)) {
752 buffer
->SetSpecificsForId(id
, original_specifics
);
756 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
757 const ImmutableWriteTransactionInfo
& write_transaction_info
,
758 syncable::BaseTransaction
* trans
,
759 std::vector
<int64
>* entries_changed
) {
760 // We only expect one notification per sync step, so change_buffers_ should
761 // contain no pending entries.
762 LOG_IF(WARNING
, !change_records_
.empty()) <<
763 "CALCULATE_CHANGES called with unapplied old changes.";
765 ChangeReorderBuffer change_buffers
[MODEL_TYPE_COUNT
];
767 Cryptographer
* crypto
= directory()->GetCryptographer(trans
);
768 const syncable::ImmutableEntryKernelMutationMap
& mutations
=
769 write_transaction_info
.Get().mutations
;
770 for (syncable::EntryKernelMutationMap::const_iterator it
=
771 mutations
.Get().begin(); it
!= mutations
.Get().end(); ++it
) {
772 bool existed_before
= !it
->second
.original
.ref(syncable::IS_DEL
);
773 bool exists_now
= !it
->second
.mutated
.ref(syncable::IS_DEL
);
775 // Omit items that aren't associated with a model.
777 GetModelTypeFromSpecifics(it
->second
.mutated
.ref(SPECIFICS
));
778 if (type
< FIRST_REAL_MODEL_TYPE
)
781 int64 handle
= it
->first
;
782 if (exists_now
&& !existed_before
)
783 change_buffers
[type
].PushAddedItem(handle
);
784 else if (!exists_now
&& existed_before
)
785 change_buffers
[type
].PushDeletedItem(handle
);
786 else if (exists_now
&& existed_before
&&
787 VisiblePropertiesDiffer(it
->second
, crypto
)) {
788 change_buffers
[type
].PushUpdatedItem(handle
);
791 SetExtraChangeRecordData(handle
, type
, &change_buffers
[type
], crypto
,
792 it
->second
.original
, existed_before
, exists_now
);
795 ReadTransaction
read_trans(GetUserShare(), trans
);
796 for (int i
= FIRST_REAL_MODEL_TYPE
; i
< MODEL_TYPE_COUNT
; ++i
) {
797 if (!change_buffers
[i
].IsEmpty()) {
798 if (change_buffers
[i
].GetAllChangesInTreeOrder(&read_trans
,
799 &(change_records_
[i
]))) {
800 for (size_t j
= 0; j
< change_records_
[i
].Get().size(); ++j
)
801 entries_changed
->push_back((change_records_
[i
].Get())[j
].id
);
803 if (change_records_
[i
].Get().empty())
804 change_records_
.erase(i
);
809 void SyncManagerImpl::RequestNudgeForDataTypes(
810 const tracked_objects::Location
& nudge_location
,
811 ModelTypeSet types
) {
812 debug_info_event_listener_
.OnNudgeFromDatatype(types
.First().Get());
814 scheduler_
->ScheduleLocalNudge(types
, nudge_location
);
817 void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type
) {
818 DCHECK(thread_checker_
.CalledOnValidThread());
819 scheduler_
->ScheduleInitialSyncNudge(type
);
822 void SyncManagerImpl::NudgeForCommit(syncer::ModelType type
) {
823 DCHECK(thread_checker_
.CalledOnValidThread());
824 RequestNudgeForDataTypes(FROM_HERE
, ModelTypeSet(type
));
827 void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type
) {
828 DCHECK(thread_checker_
.CalledOnValidThread());
829 RefreshTypes(ModelTypeSet(type
));
832 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent
& event
) {
833 DCHECK(thread_checker_
.CalledOnValidThread());
834 // Only send an event if this is due to a cycle ending and this cycle
835 // concludes a canonical "sync" process; that is, based on what is known
836 // locally we are "all happy" and up-to-date. There may be new changes on
837 // the server, but we'll get them on a subsequent sync.
839 // Notifications are sent at the end of every sync cycle, regardless of
840 // whether we should sync again.
841 if (event
.what_happened
== SyncCycleEvent::SYNC_CYCLE_ENDED
) {
843 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
848 DVLOG(1) << "Sending OnSyncCycleCompleted";
849 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
850 OnSyncCycleCompleted(event
.snapshot
));
854 void SyncManagerImpl::OnActionableError(const SyncProtocolError
& error
) {
856 SyncManager::Observer
, observers_
,
857 OnActionableError(error
));
860 void SyncManagerImpl::OnRetryTimeChanged(base::Time
) {}
862 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet
) {}
864 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types
) {
866 SyncManager::Observer
, observers_
,
867 OnMigrationRequested(types
));
870 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent
& event
) {
871 protocol_event_buffer_
.RecordProtocolEvent(event
);
872 FOR_EACH_OBSERVER(SyncManager::Observer
, observers_
,
873 OnProtocolEvent(event
));
876 void SyncManagerImpl::SetJsEventHandler(
877 const WeakHandle
<JsEventHandler
>& event_handler
) {
878 js_sync_manager_observer_
.SetJsEventHandler(event_handler
);
879 js_mutation_event_observer_
.SetJsEventHandler(event_handler
);
880 js_sync_encryption_handler_observer_
.SetJsEventHandler(event_handler
);
883 scoped_ptr
<base::ListValue
> SyncManagerImpl::GetAllNodesForType(
884 syncer::ModelType type
) {
885 DirectoryTypeDebugInfoEmitterMap
* emitter_map
=
886 model_type_registry_
->directory_type_debug_info_emitter_map();
887 DirectoryTypeDebugInfoEmitterMap::iterator it
= emitter_map
->find(type
);
889 if (it
== emitter_map
->end()) {
890 // This can happen in some cases. The UI thread makes requests of us
891 // when it doesn't really know which types are enabled or disabled.
892 DLOG(WARNING
) << "Asked to return debug info for invalid type "
893 << ModelTypeToString(type
);
894 return scoped_ptr
<base::ListValue
>(new base::ListValue());
897 return it
->second
->GetAllNodes();
900 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled
) {
901 DCHECK(thread_checker_
.CalledOnValidThread());
903 DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled
;
904 allstatus_
.SetNotificationsEnabled(invalidator_enabled
);
905 scheduler_
->SetNotificationsEnabled(invalidator_enabled
);
908 void SyncManagerImpl::OnIncomingInvalidation(
909 syncer::ModelType type
,
910 scoped_ptr
<InvalidationInterface
> invalidation
) {
911 DCHECK(thread_checker_
.CalledOnValidThread());
913 allstatus_
.IncrementNotificationsReceived();
914 scheduler_
->ScheduleInvalidationNudge(
920 void SyncManagerImpl::RefreshTypes(ModelTypeSet types
) {
921 DCHECK(thread_checker_
.CalledOnValidThread());
923 LOG(WARNING
) << "Sync received refresh request with no types specified.";
925 scheduler_
->ScheduleLocalRefreshRequest(
930 SyncStatus
SyncManagerImpl::GetDetailedStatus() const {
931 return allstatus_
.status();
934 void SyncManagerImpl::SaveChanges() {
935 directory()->SaveChanges();
938 UserShare
* SyncManagerImpl::GetUserShare() {
939 DCHECK(initialized_
);
943 syncer_v2::SyncContextProxy
* SyncManagerImpl::GetSyncContextProxy() {
944 DCHECK(initialized_
);
945 return sync_context_proxy_
.get();
948 const std::string
SyncManagerImpl::cache_guid() {
949 DCHECK(initialized_
);
950 return directory()->cache_guid();
953 bool SyncManagerImpl::ReceivedExperiment(Experiments
* experiments
) {
954 ReadTransaction
trans(FROM_HERE
, GetUserShare());
955 ReadNode
nigori_node(&trans
);
956 if (nigori_node
.InitTypeRoot(NIGORI
) != BaseNode::INIT_OK
) {
957 DVLOG(1) << "Couldn't find Nigori node.";
960 bool found_experiment
= false;
962 ReadNode
favicon_sync_node(&trans
);
963 if (favicon_sync_node
.InitByClientTagLookup(
965 syncer::kFaviconSyncTag
) == BaseNode::INIT_OK
) {
966 experiments
->favicon_sync_limit
=
967 favicon_sync_node
.GetExperimentsSpecifics().favicon_sync().
968 favicon_sync_limit();
969 found_experiment
= true;
972 ReadNode
pre_commit_update_avoidance_node(&trans
);
973 if (pre_commit_update_avoidance_node
.InitByClientTagLookup(
975 syncer::kPreCommitUpdateAvoidanceTag
) == BaseNode::INIT_OK
) {
976 session_context_
->set_server_enabled_pre_commit_update_avoidance(
977 pre_commit_update_avoidance_node
.GetExperimentsSpecifics().
978 pre_commit_update_avoidance().enabled());
979 // We don't bother setting found_experiment. The frontend doesn't need to
983 ReadNode
gcm_invalidations_node(&trans
);
984 if (gcm_invalidations_node
.InitByClientTagLookup(
985 syncer::EXPERIMENTS
, syncer::kGCMInvalidationsTag
) ==
987 const sync_pb::GcmInvalidationsFlags
& gcm_invalidations
=
988 gcm_invalidations_node
.GetExperimentsSpecifics().gcm_invalidations();
989 if (gcm_invalidations
.has_enabled()) {
990 experiments
->gcm_invalidations_enabled
= gcm_invalidations
.enabled();
991 found_experiment
= true;
995 ReadNode
wallet_sync_node(&trans
);
996 if (wallet_sync_node
.InitByClientTagLookup(
997 syncer::EXPERIMENTS
, syncer::kWalletSyncTag
) == BaseNode::INIT_OK
) {
998 const sync_pb::WalletSyncFlags
& wallet_sync
=
999 wallet_sync_node
.GetExperimentsSpecifics().wallet_sync();
1000 if (wallet_sync
.has_enabled()) {
1001 experiments
->wallet_sync_enabled
= wallet_sync
.enabled();
1002 found_experiment
= true;
1006 return found_experiment
;
1009 bool SyncManagerImpl::HasUnsyncedItems() {
1010 ReadTransaction
trans(FROM_HERE
, GetUserShare());
1011 return (trans
.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1014 SyncEncryptionHandler
* SyncManagerImpl::GetEncryptionHandler() {
1015 return sync_encryption_handler_
.get();
1018 ScopedVector
<syncer::ProtocolEvent
>
1019 SyncManagerImpl::GetBufferedProtocolEvents() {
1020 return protocol_event_buffer_
.GetBufferedProtocolEvents();
1023 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
1024 syncer::TypeDebugInfoObserver
* observer
) {
1025 model_type_registry_
->RegisterDirectoryTypeDebugInfoObserver(observer
);
1028 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
1029 syncer::TypeDebugInfoObserver
* observer
) {
1030 model_type_registry_
->UnregisterDirectoryTypeDebugInfoObserver(observer
);
1033 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
1034 syncer::TypeDebugInfoObserver
* observer
) {
1035 return model_type_registry_
->HasDirectoryTypeDebugInfoObserver(observer
);
1038 void SyncManagerImpl::RequestEmitDebugInfo() {
1039 model_type_registry_
->RequestEmitDebugInfo();
1042 void SyncManagerImpl::ClearServerData(const ClearServerDataCallback
& callback
) {
1043 DCHECK(thread_checker_
.CalledOnValidThread());
1044 scheduler_
->Start(SyncScheduler::CLEAR_SERVER_DATA_MODE
, base::Time());
1045 ClearParams
params(callback
);
1046 scheduler_
->ScheduleClearServerData(params
);
1049 } // namespace syncer