1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/sessions/model_type_registry.h"
8 #include "base/observer_list.h"
9 #include "base/thread_task_runner_handle.h"
10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/model_type_sync_proxy.h"
13 #include "sync/engine/model_type_sync_proxy_impl.h"
14 #include "sync/engine/model_type_sync_worker.h"
15 #include "sync/engine/model_type_sync_worker_impl.h"
16 #include "sync/internal_api/public/non_blocking_sync_common.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 #include "sync/util/cryptographer.h"
24 class ModelTypeSyncProxyWrapper
: public ModelTypeSyncProxy
{
26 ModelTypeSyncProxyWrapper(
27 const base::WeakPtr
<ModelTypeSyncProxyImpl
>& proxy
,
28 const scoped_refptr
<base::SequencedTaskRunner
>& processor_task_runner
);
29 ~ModelTypeSyncProxyWrapper() override
;
31 void OnCommitCompleted(
32 const syncer_v2::DataTypeState
& type_state
,
33 const syncer_v2::CommitResponseDataList
& response_list
) override
;
34 void OnUpdateReceived(
35 const syncer_v2::DataTypeState
& type_state
,
36 const syncer_v2::UpdateResponseDataList
& response_list
,
37 const syncer_v2::UpdateResponseDataList
& pending_updates
) override
;
40 base::WeakPtr
<ModelTypeSyncProxyImpl
> processor_
;
41 scoped_refptr
<base::SequencedTaskRunner
> processor_task_runner_
;
44 ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
45 const base::WeakPtr
<ModelTypeSyncProxyImpl
>& proxy
,
46 const scoped_refptr
<base::SequencedTaskRunner
>& processor_task_runner
)
47 : processor_(proxy
), processor_task_runner_(processor_task_runner
) {
50 ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
53 void ModelTypeSyncProxyWrapper::OnCommitCompleted(
54 const syncer_v2::DataTypeState
& type_state
,
55 const syncer_v2::CommitResponseDataList
& response_list
) {
56 processor_task_runner_
->PostTask(
58 base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted
,
64 void ModelTypeSyncProxyWrapper::OnUpdateReceived(
65 const syncer_v2::DataTypeState
& type_state
,
66 const syncer_v2::UpdateResponseDataList
& response_list
,
67 const syncer_v2::UpdateResponseDataList
& pending_updates
) {
68 processor_task_runner_
->PostTask(
70 base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived
,
77 class ModelTypeSyncWorkerWrapper
: public ModelTypeSyncWorker
{
79 ModelTypeSyncWorkerWrapper(
80 const base::WeakPtr
<ModelTypeSyncWorkerImpl
>& worker
,
81 const scoped_refptr
<base::SequencedTaskRunner
>& sync_thread
);
82 ~ModelTypeSyncWorkerWrapper() override
;
84 void EnqueueForCommit(const syncer_v2::CommitRequestDataList
& list
) override
;
87 base::WeakPtr
<ModelTypeSyncWorkerImpl
> worker_
;
88 scoped_refptr
<base::SequencedTaskRunner
> sync_thread_
;
91 ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
92 const base::WeakPtr
<ModelTypeSyncWorkerImpl
>& worker
,
93 const scoped_refptr
<base::SequencedTaskRunner
>& sync_thread
)
94 : worker_(worker
), sync_thread_(sync_thread
) {
97 ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
100 void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
101 const syncer_v2::CommitRequestDataList
& list
) {
102 sync_thread_
->PostTask(
104 base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit
, worker_
, list
));
109 ModelTypeRegistry::ModelTypeRegistry(
110 const std::vector
<scoped_refptr
<ModelSafeWorker
> >& workers
,
111 syncable::Directory
* directory
,
112 NudgeHandler
* nudge_handler
)
113 : directory_(directory
),
114 nudge_handler_(nudge_handler
),
115 weak_ptr_factory_(this) {
116 for (size_t i
= 0u; i
< workers
.size(); ++i
) {
118 std::make_pair(workers
[i
]->GetModelSafeGroup(), workers
[i
]));
122 ModelTypeRegistry::~ModelTypeRegistry() {
125 void ModelTypeRegistry::SetEnabledDirectoryTypes(
126 const ModelSafeRoutingInfo
& routing_info
) {
127 // Remove all existing directory processors and delete them. The
128 // DebugInfoEmitters are not deleted here, since we want to preserve their
130 for (ModelTypeSet::Iterator it
= enabled_directory_types_
.First();
131 it
.Good(); it
.Inc()) {
132 size_t result1
= update_handler_map_
.erase(it
.Get());
133 size_t result2
= commit_contributor_map_
.erase(it
.Get());
134 DCHECK_EQ(1U, result1
);
135 DCHECK_EQ(1U, result2
);
138 // Clear the old instances of directory update handlers and commit
139 // contributors, deleting their contents in the processs.
140 directory_update_handlers_
.clear();
141 directory_commit_contributors_
.clear();
143 // Create new ones and add them to the appropriate containers.
144 for (ModelSafeRoutingInfo::const_iterator routing_iter
= routing_info
.begin();
145 routing_iter
!= routing_info
.end(); ++routing_iter
) {
146 ModelType type
= routing_iter
->first
;
147 ModelSafeGroup group
= routing_iter
->second
;
148 std::map
<ModelSafeGroup
, scoped_refptr
<ModelSafeWorker
> >::iterator
149 worker_it
= workers_map_
.find(group
);
150 DCHECK(worker_it
!= workers_map_
.end());
151 scoped_refptr
<ModelSafeWorker
> worker
= worker_it
->second
;
153 // DebugInfoEmitters are never deleted. Use existing one if we have it.
154 DirectoryTypeDebugInfoEmitter
* emitter
= NULL
;
155 DirectoryTypeDebugInfoEmitterMap::iterator it
=
156 directory_type_debug_info_emitter_map_
.find(type
);
157 if (it
!= directory_type_debug_info_emitter_map_
.end()) {
158 emitter
= it
->second
;
160 emitter
= new DirectoryTypeDebugInfoEmitter(directory_
, type
,
161 &type_debug_info_observers_
);
162 directory_type_debug_info_emitter_map_
.insert(
163 std::make_pair(type
, emitter
));
164 directory_type_debug_info_emitters_
.push_back(emitter
);
167 DirectoryCommitContributor
* committer
=
168 new DirectoryCommitContributor(directory_
, type
, emitter
);
169 DirectoryUpdateHandler
* updater
=
170 new DirectoryUpdateHandler(directory_
, type
, worker
, emitter
);
172 // These containers take ownership of their contents.
173 directory_commit_contributors_
.push_back(committer
);
174 directory_update_handlers_
.push_back(updater
);
177 update_handler_map_
.insert(std::make_pair(type
, updater
)).second
;
178 DCHECK(inserted1
) << "Attempt to override existing type handler in map";
181 commit_contributor_map_
.insert(std::make_pair(type
, committer
)).second
;
182 DCHECK(inserted2
) << "Attempt to override existing type handler in map";
185 enabled_directory_types_
= GetRoutingInfoTypes(routing_info
);
186 DCHECK(Intersection(GetEnabledDirectoryTypes(),
187 GetEnabledNonBlockingTypes()).Empty());
190 void ModelTypeRegistry::ConnectSyncTypeToWorker(
192 const syncer_v2::DataTypeState
& data_type_state
,
193 const syncer_v2::UpdateResponseDataList
& saved_pending_updates
,
194 const scoped_refptr
<base::SequencedTaskRunner
>& type_task_runner
,
195 const base::WeakPtr
<ModelTypeSyncProxyImpl
>& proxy_impl
) {
196 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type
);
198 // Initialize Worker -> Proxy communication channel.
199 scoped_ptr
<ModelTypeSyncProxy
> proxy(
200 new ModelTypeSyncProxyWrapper(proxy_impl
, type_task_runner
));
201 scoped_ptr
<Cryptographer
> cryptographer_copy
;
202 if (encrypted_types_
.Has(type
))
203 cryptographer_copy
.reset(new Cryptographer(*cryptographer_
));
205 scoped_ptr
<ModelTypeSyncWorkerImpl
> worker(
206 new ModelTypeSyncWorkerImpl(type
,
208 saved_pending_updates
,
209 cryptographer_copy
.Pass(),
213 // Initialize Proxy -> Worker communication channel.
214 scoped_ptr
<ModelTypeSyncWorker
> wrapped_worker(
215 new ModelTypeSyncWorkerWrapper(worker
->AsWeakPtr(),
216 scoped_refptr
<base::SequencedTaskRunner
>(
217 base::ThreadTaskRunnerHandle::Get())));
218 type_task_runner
->PostTask(FROM_HERE
,
219 base::Bind(&ModelTypeSyncProxyImpl::OnConnect
,
221 base::Passed(&wrapped_worker
)));
223 DCHECK(update_handler_map_
.find(type
) == update_handler_map_
.end());
224 DCHECK(commit_contributor_map_
.find(type
) == commit_contributor_map_
.end());
226 update_handler_map_
.insert(std::make_pair(type
, worker
.get()));
227 commit_contributor_map_
.insert(std::make_pair(type
, worker
.get()));
229 // The container takes ownership.
230 model_type_sync_workers_
.push_back(worker
.Pass());
232 DCHECK(Intersection(GetEnabledDirectoryTypes(),
233 GetEnabledNonBlockingTypes()).Empty());
236 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type
) {
237 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type
);
238 DCHECK(update_handler_map_
.find(type
) != update_handler_map_
.end());
239 DCHECK(commit_contributor_map_
.find(type
) != commit_contributor_map_
.end());
241 size_t updaters_erased
= update_handler_map_
.erase(type
);
242 size_t committers_erased
= commit_contributor_map_
.erase(type
);
244 DCHECK_EQ(1U, updaters_erased
);
245 DCHECK_EQ(1U, committers_erased
);
247 // Remove from the ScopedVector, deleting the worker in the process.
248 for (ScopedVector
<ModelTypeSyncWorkerImpl
>::iterator it
=
249 model_type_sync_workers_
.begin();
250 it
!= model_type_sync_workers_
.end();
252 if ((*it
)->GetModelType() == type
) {
253 model_type_sync_workers_
.erase(it
);
259 ModelTypeSet
ModelTypeRegistry::GetEnabledTypes() const {
260 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
263 UpdateHandlerMap
* ModelTypeRegistry::update_handler_map() {
264 return &update_handler_map_
;
267 CommitContributorMap
* ModelTypeRegistry::commit_contributor_map() {
268 return &commit_contributor_map_
;
271 DirectoryTypeDebugInfoEmitterMap
*
272 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
273 return &directory_type_debug_info_emitter_map_
;
276 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
277 syncer::TypeDebugInfoObserver
* observer
) {
278 if (!type_debug_info_observers_
.HasObserver(observer
))
279 type_debug_info_observers_
.AddObserver(observer
);
282 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
283 syncer::TypeDebugInfoObserver
* observer
) {
284 type_debug_info_observers_
.RemoveObserver(observer
);
287 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
288 const syncer::TypeDebugInfoObserver
* observer
) const {
289 return type_debug_info_observers_
.HasObserver(observer
);
292 void ModelTypeRegistry::RequestEmitDebugInfo() {
293 for (DirectoryTypeDebugInfoEmitterMap::iterator it
=
294 directory_type_debug_info_emitter_map_
.begin();
295 it
!= directory_type_debug_info_emitter_map_
.end(); ++it
) {
296 it
->second
->EmitCommitCountersUpdate();
297 it
->second
->EmitUpdateCountersUpdate();
298 it
->second
->EmitStatusCountersUpdate();
302 base::WeakPtr
<syncer_v2::SyncContext
> ModelTypeRegistry::AsWeakPtr() {
303 return weak_ptr_factory_
.GetWeakPtr();
306 void ModelTypeRegistry::OnPassphraseRequired(
307 PassphraseRequiredReason reason
,
308 const sync_pb::EncryptedData
& pending_keys
) {
311 void ModelTypeRegistry::OnPassphraseAccepted() {
314 void ModelTypeRegistry::OnBootstrapTokenUpdated(
315 const std::string
& bootstrap_token
,
316 BootstrapTokenType type
) {
319 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types
,
320 bool encrypt_everything
) {
321 encrypted_types_
= encrypted_types
;
322 OnEncryptionStateChanged();
325 void ModelTypeRegistry::OnEncryptionComplete() {
328 void ModelTypeRegistry::OnCryptographerStateChanged(
329 Cryptographer
* cryptographer
) {
330 cryptographer_
.reset(new Cryptographer(*cryptographer
));
331 OnEncryptionStateChanged();
334 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type
,
335 base::Time passphrase_time
) {
338 void ModelTypeRegistry::OnLocalSetPassphraseEncryption(
339 const SyncEncryptionHandler::NigoriState
& nigori_state
) {
342 ModelTypeSet
ModelTypeRegistry::GetEnabledDirectoryTypes() const {
343 return enabled_directory_types_
;
346 void ModelTypeRegistry::OnEncryptionStateChanged() {
347 for (ScopedVector
<ModelTypeSyncWorkerImpl
>::iterator it
=
348 model_type_sync_workers_
.begin();
349 it
!= model_type_sync_workers_
.end();
351 if (encrypted_types_
.Has((*it
)->GetModelType())) {
352 (*it
)->UpdateCryptographer(
353 make_scoped_ptr(new Cryptographer(*cryptographer_
)));
358 ModelTypeSet
ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
359 ModelTypeSet enabled_off_thread_types
;
360 for (ScopedVector
<ModelTypeSyncWorkerImpl
>::const_iterator it
=
361 model_type_sync_workers_
.begin();
362 it
!= model_type_sync_workers_
.end();
364 enabled_off_thread_types
.Put((*it
)->GetModelType());
366 return enabled_off_thread_types
;
369 } // namespace syncer