1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/sessions/model_type_registry.h"
8 #include "base/observer_list.h"
9 #include "base/thread_task_runner_handle.h"
10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/model_type_sync_proxy.h"
13 #include "sync/engine/model_type_sync_proxy_impl.h"
14 #include "sync/engine/model_type_sync_worker.h"
15 #include "sync/engine/model_type_sync_worker_impl.h"
16 #include "sync/internal_api/public/non_blocking_sync_common.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 #include "sync/util/cryptographer.h"
24 class ModelTypeSyncProxyWrapper
: public ModelTypeSyncProxy
{
26 ModelTypeSyncProxyWrapper(
27 const base::WeakPtr
<ModelTypeSyncProxyImpl
>& proxy
,
28 const scoped_refptr
<base::SequencedTaskRunner
>& processor_task_runner
);
29 ~ModelTypeSyncProxyWrapper() override
;
31 void OnCommitCompleted(const DataTypeState
& type_state
,
32 const CommitResponseDataList
& response_list
) override
;
33 void OnUpdateReceived(const DataTypeState
& type_state
,
34 const UpdateResponseDataList
& response_list
,
35 const UpdateResponseDataList
& pending_updates
) override
;
38 base::WeakPtr
<ModelTypeSyncProxyImpl
> processor_
;
39 scoped_refptr
<base::SequencedTaskRunner
> processor_task_runner_
;
42 ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
43 const base::WeakPtr
<ModelTypeSyncProxyImpl
>& proxy
,
44 const scoped_refptr
<base::SequencedTaskRunner
>& processor_task_runner
)
45 : processor_(proxy
), processor_task_runner_(processor_task_runner
) {
48 ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
51 void ModelTypeSyncProxyWrapper::OnCommitCompleted(
52 const DataTypeState
& type_state
,
53 const CommitResponseDataList
& response_list
) {
54 processor_task_runner_
->PostTask(
56 base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted
,
62 void ModelTypeSyncProxyWrapper::OnUpdateReceived(
63 const DataTypeState
& type_state
,
64 const UpdateResponseDataList
& response_list
,
65 const UpdateResponseDataList
& pending_updates
) {
66 processor_task_runner_
->PostTask(
68 base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived
,
75 class ModelTypeSyncWorkerWrapper
: public ModelTypeSyncWorker
{
77 ModelTypeSyncWorkerWrapper(
78 const base::WeakPtr
<ModelTypeSyncWorkerImpl
>& worker
,
79 const scoped_refptr
<base::SequencedTaskRunner
>& sync_thread
);
80 ~ModelTypeSyncWorkerWrapper() override
;
82 void EnqueueForCommit(const CommitRequestDataList
& list
) override
;
85 base::WeakPtr
<ModelTypeSyncWorkerImpl
> worker_
;
86 scoped_refptr
<base::SequencedTaskRunner
> sync_thread_
;
89 ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
90 const base::WeakPtr
<ModelTypeSyncWorkerImpl
>& worker
,
91 const scoped_refptr
<base::SequencedTaskRunner
>& sync_thread
)
92 : worker_(worker
), sync_thread_(sync_thread
) {
95 ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
98 void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
99 const CommitRequestDataList
& list
) {
100 sync_thread_
->PostTask(
102 base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit
, worker_
, list
));
107 ModelTypeRegistry::ModelTypeRegistry(
108 const std::vector
<scoped_refptr
<ModelSafeWorker
> >& workers
,
109 syncable::Directory
* directory
,
110 NudgeHandler
* nudge_handler
)
111 : directory_(directory
),
112 nudge_handler_(nudge_handler
),
113 weak_ptr_factory_(this) {
114 for (size_t i
= 0u; i
< workers
.size(); ++i
) {
116 std::make_pair(workers
[i
]->GetModelSafeGroup(), workers
[i
]));
120 ModelTypeRegistry::~ModelTypeRegistry() {
123 void ModelTypeRegistry::SetEnabledDirectoryTypes(
124 const ModelSafeRoutingInfo
& routing_info
) {
125 // Remove all existing directory processors and delete them. The
126 // DebugInfoEmitters are not deleted here, since we want to preserve their
128 for (ModelTypeSet::Iterator it
= enabled_directory_types_
.First();
129 it
.Good(); it
.Inc()) {
130 size_t result1
= update_handler_map_
.erase(it
.Get());
131 size_t result2
= commit_contributor_map_
.erase(it
.Get());
132 DCHECK_EQ(1U, result1
);
133 DCHECK_EQ(1U, result2
);
136 // Clear the old instances of directory update handlers and commit
137 // contributors, deleting their contents in the processs.
138 directory_update_handlers_
.clear();
139 directory_commit_contributors_
.clear();
141 // Create new ones and add them to the appropriate containers.
142 for (ModelSafeRoutingInfo::const_iterator routing_iter
= routing_info
.begin();
143 routing_iter
!= routing_info
.end(); ++routing_iter
) {
144 ModelType type
= routing_iter
->first
;
145 ModelSafeGroup group
= routing_iter
->second
;
146 std::map
<ModelSafeGroup
, scoped_refptr
<ModelSafeWorker
> >::iterator
147 worker_it
= workers_map_
.find(group
);
148 DCHECK(worker_it
!= workers_map_
.end());
149 scoped_refptr
<ModelSafeWorker
> worker
= worker_it
->second
;
151 // DebugInfoEmitters are never deleted. Use existing one if we have it.
152 DirectoryTypeDebugInfoEmitter
* emitter
= NULL
;
153 DirectoryTypeDebugInfoEmitterMap::iterator it
=
154 directory_type_debug_info_emitter_map_
.find(type
);
155 if (it
!= directory_type_debug_info_emitter_map_
.end()) {
156 emitter
= it
->second
;
158 emitter
= new DirectoryTypeDebugInfoEmitter(directory_
, type
,
159 &type_debug_info_observers_
);
160 directory_type_debug_info_emitter_map_
.insert(
161 std::make_pair(type
, emitter
));
162 directory_type_debug_info_emitters_
.push_back(emitter
);
165 DirectoryCommitContributor
* committer
=
166 new DirectoryCommitContributor(directory_
, type
, emitter
);
167 DirectoryUpdateHandler
* updater
=
168 new DirectoryUpdateHandler(directory_
, type
, worker
, emitter
);
170 // These containers take ownership of their contents.
171 directory_commit_contributors_
.push_back(committer
);
172 directory_update_handlers_
.push_back(updater
);
175 update_handler_map_
.insert(std::make_pair(type
, updater
)).second
;
176 DCHECK(inserted1
) << "Attempt to override existing type handler in map";
179 commit_contributor_map_
.insert(std::make_pair(type
, committer
)).second
;
180 DCHECK(inserted2
) << "Attempt to override existing type handler in map";
183 enabled_directory_types_
= GetRoutingInfoTypes(routing_info
);
184 DCHECK(Intersection(GetEnabledDirectoryTypes(),
185 GetEnabledNonBlockingTypes()).Empty());
188 void ModelTypeRegistry::ConnectSyncTypeToWorker(
190 const DataTypeState
& data_type_state
,
191 const UpdateResponseDataList
& saved_pending_updates
,
192 const scoped_refptr
<base::SequencedTaskRunner
>& type_task_runner
,
193 const base::WeakPtr
<ModelTypeSyncProxyImpl
>& proxy_impl
) {
194 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type
);
196 // Initialize Worker -> Proxy communication channel.
197 scoped_ptr
<ModelTypeSyncProxy
> proxy(
198 new ModelTypeSyncProxyWrapper(proxy_impl
, type_task_runner
));
199 scoped_ptr
<Cryptographer
> cryptographer_copy
;
200 if (encrypted_types_
.Has(type
))
201 cryptographer_copy
.reset(new Cryptographer(*cryptographer_
));
203 scoped_ptr
<ModelTypeSyncWorkerImpl
> worker(
204 new ModelTypeSyncWorkerImpl(type
,
206 saved_pending_updates
,
207 cryptographer_copy
.Pass(),
211 // Initialize Proxy -> Worker communication channel.
212 scoped_ptr
<ModelTypeSyncWorker
> wrapped_worker(
213 new ModelTypeSyncWorkerWrapper(worker
->AsWeakPtr(),
214 scoped_refptr
<base::SequencedTaskRunner
>(
215 base::ThreadTaskRunnerHandle::Get())));
216 type_task_runner
->PostTask(FROM_HERE
,
217 base::Bind(&ModelTypeSyncProxyImpl::OnConnect
,
219 base::Passed(&wrapped_worker
)));
221 DCHECK(update_handler_map_
.find(type
) == update_handler_map_
.end());
222 DCHECK(commit_contributor_map_
.find(type
) == commit_contributor_map_
.end());
224 update_handler_map_
.insert(std::make_pair(type
, worker
.get()));
225 commit_contributor_map_
.insert(std::make_pair(type
, worker
.get()));
227 // The container takes ownership.
228 model_type_sync_workers_
.push_back(worker
.Pass());
230 DCHECK(Intersection(GetEnabledDirectoryTypes(),
231 GetEnabledNonBlockingTypes()).Empty());
234 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type
) {
235 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type
);
236 DCHECK(update_handler_map_
.find(type
) != update_handler_map_
.end());
237 DCHECK(commit_contributor_map_
.find(type
) != commit_contributor_map_
.end());
239 size_t updaters_erased
= update_handler_map_
.erase(type
);
240 size_t committers_erased
= commit_contributor_map_
.erase(type
);
242 DCHECK_EQ(1U, updaters_erased
);
243 DCHECK_EQ(1U, committers_erased
);
245 // Remove from the ScopedVector, deleting the worker in the process.
246 for (ScopedVector
<ModelTypeSyncWorkerImpl
>::iterator it
=
247 model_type_sync_workers_
.begin();
248 it
!= model_type_sync_workers_
.end();
250 if ((*it
)->GetModelType() == type
) {
251 model_type_sync_workers_
.erase(it
);
257 ModelTypeSet
ModelTypeRegistry::GetEnabledTypes() const {
258 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
261 UpdateHandlerMap
* ModelTypeRegistry::update_handler_map() {
262 return &update_handler_map_
;
265 CommitContributorMap
* ModelTypeRegistry::commit_contributor_map() {
266 return &commit_contributor_map_
;
269 DirectoryTypeDebugInfoEmitterMap
*
270 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
271 return &directory_type_debug_info_emitter_map_
;
274 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
275 syncer::TypeDebugInfoObserver
* observer
) {
276 if (!type_debug_info_observers_
.HasObserver(observer
))
277 type_debug_info_observers_
.AddObserver(observer
);
280 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
281 syncer::TypeDebugInfoObserver
* observer
) {
282 type_debug_info_observers_
.RemoveObserver(observer
);
285 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
286 const syncer::TypeDebugInfoObserver
* observer
) const {
287 return type_debug_info_observers_
.HasObserver(observer
);
290 void ModelTypeRegistry::RequestEmitDebugInfo() {
291 for (DirectoryTypeDebugInfoEmitterMap::iterator it
=
292 directory_type_debug_info_emitter_map_
.begin();
293 it
!= directory_type_debug_info_emitter_map_
.end(); ++it
) {
294 it
->second
->EmitCommitCountersUpdate();
295 it
->second
->EmitUpdateCountersUpdate();
296 it
->second
->EmitStatusCountersUpdate();
300 base::WeakPtr
<SyncContext
> ModelTypeRegistry::AsWeakPtr() {
301 return weak_ptr_factory_
.GetWeakPtr();
304 void ModelTypeRegistry::OnPassphraseRequired(
305 PassphraseRequiredReason reason
,
306 const sync_pb::EncryptedData
& pending_keys
) {
309 void ModelTypeRegistry::OnPassphraseAccepted() {
312 void ModelTypeRegistry::OnBootstrapTokenUpdated(
313 const std::string
& bootstrap_token
,
314 BootstrapTokenType type
) {
317 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types
,
318 bool encrypt_everything
) {
319 encrypted_types_
= encrypted_types
;
320 OnEncryptionStateChanged();
323 void ModelTypeRegistry::OnEncryptionComplete() {
326 void ModelTypeRegistry::OnCryptographerStateChanged(
327 Cryptographer
* cryptographer
) {
328 cryptographer_
.reset(new Cryptographer(*cryptographer
));
329 OnEncryptionStateChanged();
332 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type
,
333 base::Time passphrase_time
) {
336 ModelTypeSet
ModelTypeRegistry::GetEnabledDirectoryTypes() const {
337 return enabled_directory_types_
;
340 void ModelTypeRegistry::OnEncryptionStateChanged() {
341 for (ScopedVector
<ModelTypeSyncWorkerImpl
>::iterator it
=
342 model_type_sync_workers_
.begin();
343 it
!= model_type_sync_workers_
.end();
345 if (encrypted_types_
.Has((*it
)->GetModelType())) {
346 (*it
)->UpdateCryptographer(
347 make_scoped_ptr(new Cryptographer(*cryptographer_
)));
352 ModelTypeSet
ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
353 ModelTypeSet enabled_off_thread_types
;
354 for (ScopedVector
<ModelTypeSyncWorkerImpl
>::const_iterator it
=
355 model_type_sync_workers_
.begin();
356 it
!= model_type_sync_workers_
.end();
358 enabled_off_thread_types
.Put((*it
)->GetModelType());
360 return enabled_off_thread_types
;
363 } // namespace syncer