1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/sessions/model_type_registry.h"
8 #include "base/observer_list.h"
9 #include "base/thread_task_runner_handle.h"
10 #include "sync/engine/commit_queue.h"
11 #include "sync/engine/directory_commit_contributor.h"
12 #include "sync/engine/directory_update_handler.h"
13 #include "sync/engine/model_type_processor.h"
14 #include "sync/engine/model_type_processor_impl.h"
15 #include "sync/engine/model_type_worker.h"
16 #include "sync/internal_api/public/non_blocking_sync_common.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 #include "sync/util/cryptographer.h"
24 class ModelTypeProcessorProxy
: public syncer_v2::ModelTypeProcessor
{
26 ModelTypeProcessorProxy(
27 const base::WeakPtr
<syncer_v2::ModelTypeProcessor
>& processor
,
28 const scoped_refptr
<base::SequencedTaskRunner
>& processor_task_runner
);
29 ~ModelTypeProcessorProxy() override
;
31 void OnConnect(scoped_ptr
<syncer_v2::CommitQueue
> worker
) override
;
32 void OnCommitCompleted(
33 const syncer_v2::DataTypeState
& type_state
,
34 const syncer_v2::CommitResponseDataList
& response_list
) override
;
35 void OnUpdateReceived(
36 const syncer_v2::DataTypeState
& type_state
,
37 const syncer_v2::UpdateResponseDataList
& response_list
,
38 const syncer_v2::UpdateResponseDataList
& pending_updates
) override
;
41 base::WeakPtr
<syncer_v2::ModelTypeProcessor
> processor_
;
42 scoped_refptr
<base::SequencedTaskRunner
> processor_task_runner_
;
45 ModelTypeProcessorProxy::ModelTypeProcessorProxy(
46 const base::WeakPtr
<syncer_v2::ModelTypeProcessor
>& processor
,
47 const scoped_refptr
<base::SequencedTaskRunner
>& processor_task_runner
)
48 : processor_(processor
), processor_task_runner_(processor_task_runner
) {}
50 ModelTypeProcessorProxy::~ModelTypeProcessorProxy() {
53 void ModelTypeProcessorProxy::OnConnect(
54 scoped_ptr
<syncer_v2::CommitQueue
> worker
) {
55 processor_task_runner_
->PostTask(
56 FROM_HERE
, base::Bind(&syncer_v2::ModelTypeProcessor::OnConnect
,
57 processor_
, base::Passed(worker
.Pass())));
60 void ModelTypeProcessorProxy::OnCommitCompleted(
61 const syncer_v2::DataTypeState
& type_state
,
62 const syncer_v2::CommitResponseDataList
& response_list
) {
63 processor_task_runner_
->PostTask(
65 base::Bind(&syncer_v2::ModelTypeProcessor::OnCommitCompleted
,
66 processor_
, type_state
, response_list
));
69 void ModelTypeProcessorProxy::OnUpdateReceived(
70 const syncer_v2::DataTypeState
& type_state
,
71 const syncer_v2::UpdateResponseDataList
& response_list
,
72 const syncer_v2::UpdateResponseDataList
& pending_updates
) {
73 processor_task_runner_
->PostTask(
75 base::Bind(&syncer_v2::ModelTypeProcessor::OnUpdateReceived
,
76 processor_
, type_state
, response_list
, pending_updates
));
79 class CommitQueueProxy
: public syncer_v2::CommitQueue
{
81 CommitQueueProxy(const base::WeakPtr
<syncer_v2::ModelTypeWorker
>& worker
,
82 const scoped_refptr
<base::SequencedTaskRunner
>& sync_thread
);
83 ~CommitQueueProxy() override
;
85 void EnqueueForCommit(const syncer_v2::CommitRequestDataList
& list
) override
;
88 base::WeakPtr
<syncer_v2::ModelTypeWorker
> worker_
;
89 scoped_refptr
<base::SequencedTaskRunner
> sync_thread_
;
92 CommitQueueProxy::CommitQueueProxy(
93 const base::WeakPtr
<syncer_v2::ModelTypeWorker
>& worker
,
94 const scoped_refptr
<base::SequencedTaskRunner
>& sync_thread
)
95 : worker_(worker
), sync_thread_(sync_thread
) {}
97 CommitQueueProxy::~CommitQueueProxy() {}
99 void CommitQueueProxy::EnqueueForCommit(
100 const syncer_v2::CommitRequestDataList
& list
) {
101 sync_thread_
->PostTask(
103 base::Bind(&syncer_v2::ModelTypeWorker::EnqueueForCommit
, worker_
, list
));
108 ModelTypeRegistry::ModelTypeRegistry(
109 const std::vector
<scoped_refptr
<ModelSafeWorker
> >& workers
,
110 syncable::Directory
* directory
,
111 NudgeHandler
* nudge_handler
)
112 : directory_(directory
),
113 nudge_handler_(nudge_handler
),
114 weak_ptr_factory_(this) {
115 for (size_t i
= 0u; i
< workers
.size(); ++i
) {
117 std::make_pair(workers
[i
]->GetModelSafeGroup(), workers
[i
]));
121 ModelTypeRegistry::~ModelTypeRegistry() {
124 void ModelTypeRegistry::SetEnabledDirectoryTypes(
125 const ModelSafeRoutingInfo
& routing_info
) {
126 // Remove all existing directory processors and delete them. The
127 // DebugInfoEmitters are not deleted here, since we want to preserve their
129 for (ModelTypeSet::Iterator it
= enabled_directory_types_
.First();
130 it
.Good(); it
.Inc()) {
131 size_t result1
= update_handler_map_
.erase(it
.Get());
132 size_t result2
= commit_contributor_map_
.erase(it
.Get());
133 DCHECK_EQ(1U, result1
);
134 DCHECK_EQ(1U, result2
);
137 // Clear the old instances of directory update handlers and commit
138 // contributors, deleting their contents in the processs.
139 directory_update_handlers_
.clear();
140 directory_commit_contributors_
.clear();
142 // Create new ones and add them to the appropriate containers.
143 for (ModelSafeRoutingInfo::const_iterator routing_iter
= routing_info
.begin();
144 routing_iter
!= routing_info
.end(); ++routing_iter
) {
145 ModelType type
= routing_iter
->first
;
146 ModelSafeGroup group
= routing_iter
->second
;
147 std::map
<ModelSafeGroup
, scoped_refptr
<ModelSafeWorker
> >::iterator
148 worker_it
= workers_map_
.find(group
);
149 DCHECK(worker_it
!= workers_map_
.end());
150 scoped_refptr
<ModelSafeWorker
> worker
= worker_it
->second
;
152 // DebugInfoEmitters are never deleted. Use existing one if we have it.
153 DirectoryTypeDebugInfoEmitter
* emitter
= NULL
;
154 DirectoryTypeDebugInfoEmitterMap::iterator it
=
155 directory_type_debug_info_emitter_map_
.find(type
);
156 if (it
!= directory_type_debug_info_emitter_map_
.end()) {
157 emitter
= it
->second
;
159 emitter
= new DirectoryTypeDebugInfoEmitter(directory_
, type
,
160 &type_debug_info_observers_
);
161 directory_type_debug_info_emitter_map_
.insert(
162 std::make_pair(type
, emitter
));
163 directory_type_debug_info_emitters_
.push_back(emitter
);
166 DirectoryCommitContributor
* committer
=
167 new DirectoryCommitContributor(directory_
, type
, emitter
);
168 DirectoryUpdateHandler
* updater
=
169 new DirectoryUpdateHandler(directory_
, type
, worker
, emitter
);
171 // These containers take ownership of their contents.
172 directory_commit_contributors_
.push_back(committer
);
173 directory_update_handlers_
.push_back(updater
);
176 update_handler_map_
.insert(std::make_pair(type
, updater
)).second
;
177 DCHECK(inserted1
) << "Attempt to override existing type handler in map";
180 commit_contributor_map_
.insert(std::make_pair(type
, committer
)).second
;
181 DCHECK(inserted2
) << "Attempt to override existing type handler in map";
184 enabled_directory_types_
= GetRoutingInfoTypes(routing_info
);
185 DCHECK(Intersection(GetEnabledDirectoryTypes(),
186 GetEnabledNonBlockingTypes()).Empty());
189 void ModelTypeRegistry::ConnectSyncTypeToWorker(
191 const syncer_v2::DataTypeState
& data_type_state
,
192 const syncer_v2::UpdateResponseDataList
& saved_pending_updates
,
193 const scoped_refptr
<base::SequencedTaskRunner
>& type_task_runner
,
194 const base::WeakPtr
<syncer_v2::ModelTypeProcessor
>& type_processor
) {
195 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type
);
197 // Initialize Worker -> Processor communication channel.
198 scoped_ptr
<syncer_v2::ModelTypeProcessor
> processor_proxy(
199 new ModelTypeProcessorProxy(type_processor
, type_task_runner
));
200 scoped_ptr
<Cryptographer
> cryptographer_copy
;
201 if (encrypted_types_
.Has(type
))
202 cryptographer_copy
.reset(new Cryptographer(*cryptographer_
));
204 scoped_ptr
<syncer_v2::ModelTypeWorker
> worker(new syncer_v2::ModelTypeWorker(
205 type
, data_type_state
, saved_pending_updates
, cryptographer_copy
.Pass(),
206 nudge_handler_
, processor_proxy
.Pass()));
208 // Initialize Processor -> Worker communication channel.
209 scoped_ptr
<syncer_v2::CommitQueue
> commit_queue_proxy(new CommitQueueProxy(
210 worker
->AsWeakPtr(), scoped_refptr
<base::SequencedTaskRunner
>(
211 base::ThreadTaskRunnerHandle::Get())));
212 type_task_runner
->PostTask(
213 FROM_HERE
, base::Bind(&syncer_v2::ModelTypeProcessor::OnConnect
,
214 type_processor
, base::Passed(&commit_queue_proxy
)));
216 DCHECK(update_handler_map_
.find(type
) == update_handler_map_
.end());
217 DCHECK(commit_contributor_map_
.find(type
) == commit_contributor_map_
.end());
219 update_handler_map_
.insert(std::make_pair(type
, worker
.get()));
220 commit_contributor_map_
.insert(std::make_pair(type
, worker
.get()));
222 // The container takes ownership.
223 model_type_workers_
.push_back(worker
.Pass());
225 DCHECK(Intersection(GetEnabledDirectoryTypes(),
226 GetEnabledNonBlockingTypes()).Empty());
229 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type
) {
230 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type
);
231 DCHECK(update_handler_map_
.find(type
) != update_handler_map_
.end());
232 DCHECK(commit_contributor_map_
.find(type
) != commit_contributor_map_
.end());
234 size_t updaters_erased
= update_handler_map_
.erase(type
);
235 size_t committers_erased
= commit_contributor_map_
.erase(type
);
237 DCHECK_EQ(1U, updaters_erased
);
238 DCHECK_EQ(1U, committers_erased
);
240 // Remove from the ScopedVector, deleting the worker in the process.
241 for (ScopedVector
<syncer_v2::ModelTypeWorker
>::iterator it
=
242 model_type_workers_
.begin();
243 it
!= model_type_workers_
.end(); ++it
) {
244 if ((*it
)->GetModelType() == type
) {
245 model_type_workers_
.erase(it
);
251 ModelTypeSet
ModelTypeRegistry::GetEnabledTypes() const {
252 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
255 UpdateHandlerMap
* ModelTypeRegistry::update_handler_map() {
256 return &update_handler_map_
;
259 CommitContributorMap
* ModelTypeRegistry::commit_contributor_map() {
260 return &commit_contributor_map_
;
263 DirectoryTypeDebugInfoEmitterMap
*
264 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
265 return &directory_type_debug_info_emitter_map_
;
268 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
269 syncer::TypeDebugInfoObserver
* observer
) {
270 if (!type_debug_info_observers_
.HasObserver(observer
))
271 type_debug_info_observers_
.AddObserver(observer
);
274 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
275 syncer::TypeDebugInfoObserver
* observer
) {
276 type_debug_info_observers_
.RemoveObserver(observer
);
279 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
280 const syncer::TypeDebugInfoObserver
* observer
) const {
281 return type_debug_info_observers_
.HasObserver(observer
);
284 void ModelTypeRegistry::RequestEmitDebugInfo() {
285 for (DirectoryTypeDebugInfoEmitterMap::iterator it
=
286 directory_type_debug_info_emitter_map_
.begin();
287 it
!= directory_type_debug_info_emitter_map_
.end(); ++it
) {
288 it
->second
->EmitCommitCountersUpdate();
289 it
->second
->EmitUpdateCountersUpdate();
290 it
->second
->EmitStatusCountersUpdate();
294 base::WeakPtr
<syncer_v2::SyncContext
> ModelTypeRegistry::AsWeakPtr() {
295 return weak_ptr_factory_
.GetWeakPtr();
298 void ModelTypeRegistry::OnPassphraseRequired(
299 PassphraseRequiredReason reason
,
300 const sync_pb::EncryptedData
& pending_keys
) {
303 void ModelTypeRegistry::OnPassphraseAccepted() {
306 void ModelTypeRegistry::OnBootstrapTokenUpdated(
307 const std::string
& bootstrap_token
,
308 BootstrapTokenType type
) {
311 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types
,
312 bool encrypt_everything
) {
313 encrypted_types_
= encrypted_types
;
314 OnEncryptionStateChanged();
317 void ModelTypeRegistry::OnEncryptionComplete() {
320 void ModelTypeRegistry::OnCryptographerStateChanged(
321 Cryptographer
* cryptographer
) {
322 cryptographer_
.reset(new Cryptographer(*cryptographer
));
323 OnEncryptionStateChanged();
326 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type
,
327 base::Time passphrase_time
) {
330 void ModelTypeRegistry::OnLocalSetPassphraseEncryption(
331 const SyncEncryptionHandler::NigoriState
& nigori_state
) {
334 ModelTypeSet
ModelTypeRegistry::GetEnabledDirectoryTypes() const {
335 return enabled_directory_types_
;
338 void ModelTypeRegistry::OnEncryptionStateChanged() {
339 for (ScopedVector
<syncer_v2::ModelTypeWorker
>::iterator it
=
340 model_type_workers_
.begin();
341 it
!= model_type_workers_
.end(); ++it
) {
342 if (encrypted_types_
.Has((*it
)->GetModelType())) {
343 (*it
)->UpdateCryptographer(
344 make_scoped_ptr(new Cryptographer(*cryptographer_
)));
349 ModelTypeSet
ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
350 ModelTypeSet enabled_off_thread_types
;
351 for (ScopedVector
<syncer_v2::ModelTypeWorker
>::const_iterator it
=
352 model_type_workers_
.begin();
353 it
!= model_type_workers_
.end(); ++it
) {
354 enabled_off_thread_types
.Put((*it
)->GetModelType());
356 return enabled_off_thread_types
;
359 } // namespace syncer