1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/sessions/model_type_registry.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "base/observer_list.h"
10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/non_blocking_type_processor.h"
13 #include "sync/engine/non_blocking_type_processor_core.h"
14 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL
) {}
20 ModelTypeRegistry::ModelTypeRegistry(
21 const std::vector
<scoped_refptr
<ModelSafeWorker
> >& workers
,
22 syncable::Directory
* directory
)
23 : directory_(directory
) {
24 for (size_t i
= 0u; i
< workers
.size(); ++i
) {
26 std::make_pair(workers
[i
]->GetModelSafeGroup(), workers
[i
]));
30 ModelTypeRegistry::~ModelTypeRegistry() {}
32 void ModelTypeRegistry::SetEnabledDirectoryTypes(
33 const ModelSafeRoutingInfo
& routing_info
) {
34 // Remove all existing directory processors and delete them. The
35 // DebugInfoEmitters are not deleted here, since we want to preserve their
37 for (ModelTypeSet::Iterator it
= enabled_directory_types_
.First();
38 it
.Good(); it
.Inc()) {
39 size_t result1
= update_handler_map_
.erase(it
.Get());
40 size_t result2
= commit_contributor_map_
.erase(it
.Get());
41 DCHECK_EQ(1U, result1
);
42 DCHECK_EQ(1U, result2
);
45 // Clear the old instances of directory update handlers and commit
46 // contributors, deleting their contents in the processs.
47 directory_update_handlers_
.clear();
48 directory_commit_contributors_
.clear();
50 // Create new ones and add them to the appropriate containers.
51 for (ModelSafeRoutingInfo::const_iterator routing_iter
= routing_info
.begin();
52 routing_iter
!= routing_info
.end(); ++routing_iter
) {
53 ModelType type
= routing_iter
->first
;
54 ModelSafeGroup group
= routing_iter
->second
;
55 std::map
<ModelSafeGroup
, scoped_refptr
<ModelSafeWorker
> >::iterator
56 worker_it
= workers_map_
.find(group
);
57 DCHECK(worker_it
!= workers_map_
.end());
58 scoped_refptr
<ModelSafeWorker
> worker
= worker_it
->second
;
60 // DebugInfoEmitters are never deleted. Use existing one if we have it.
61 DirectoryTypeDebugInfoEmitter
* emitter
= NULL
;
62 DirectoryTypeDebugInfoEmitterMap::iterator it
=
63 directory_type_debug_info_emitter_map_
.find(type
);
64 if (it
!= directory_type_debug_info_emitter_map_
.end()) {
67 emitter
= new DirectoryTypeDebugInfoEmitter(directory_
, type
,
68 &type_debug_info_observers_
);
69 directory_type_debug_info_emitter_map_
.insert(
70 std::make_pair(type
, emitter
));
71 directory_type_debug_info_emitters_
.push_back(emitter
);
74 DirectoryCommitContributor
* committer
=
75 new DirectoryCommitContributor(directory_
, type
, emitter
);
76 DirectoryUpdateHandler
* updater
=
77 new DirectoryUpdateHandler(directory_
, type
, worker
, emitter
);
79 // These containers take ownership of their contents.
80 directory_commit_contributors_
.push_back(committer
);
81 directory_update_handlers_
.push_back(updater
);
84 update_handler_map_
.insert(std::make_pair(type
, updater
)).second
;
85 DCHECK(inserted1
) << "Attempt to override existing type handler in map";
88 commit_contributor_map_
.insert(std::make_pair(type
, committer
)).second
;
89 DCHECK(inserted2
) << "Attempt to override existing type handler in map";
92 enabled_directory_types_
= GetRoutingInfoTypes(routing_info
);
93 DCHECK(Intersection(GetEnabledDirectoryTypes(),
94 GetEnabledNonBlockingTypes()).Empty());
97 void ModelTypeRegistry::InitializeNonBlockingType(
99 scoped_refptr
<base::SequencedTaskRunner
> type_task_runner
,
100 base::WeakPtr
<NonBlockingTypeProcessor
> processor
) {
101 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type
);
103 // Initialize CoreProcessor -> Processor communication channel.
104 scoped_ptr
<NonBlockingTypeProcessorCore
> core(
105 new NonBlockingTypeProcessorCore(type
, type_task_runner
, processor
));
107 // Initialize Processor -> CoreProcessor communication channel.
108 type_task_runner
->PostTask(
110 base::Bind(&NonBlockingTypeProcessor::OnConnect
,
113 scoped_refptr
<base::SequencedTaskRunner
>(
114 base::MessageLoopProxy::current())));
116 DCHECK(update_handler_map_
.find(type
) == update_handler_map_
.end());
117 DCHECK(commit_contributor_map_
.find(type
) == commit_contributor_map_
.end());
119 update_handler_map_
.insert(std::make_pair(type
, core
.get()));
120 commit_contributor_map_
.insert(std::make_pair(type
, core
.get()));
122 // The container takes ownership.
123 non_blocking_type_processor_cores_
.push_back(core
.release());
125 DCHECK(Intersection(GetEnabledDirectoryTypes(),
126 GetEnabledNonBlockingTypes()).Empty());
129 void ModelTypeRegistry::RemoveNonBlockingType(ModelType type
) {
130 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type
);
131 DCHECK(update_handler_map_
.find(type
) != update_handler_map_
.end());
132 DCHECK(commit_contributor_map_
.find(type
) != commit_contributor_map_
.end());
134 size_t updaters_erased
= update_handler_map_
.erase(type
);
135 size_t committers_erased
= commit_contributor_map_
.erase(type
);
137 DCHECK_EQ(1U, updaters_erased
);
138 DCHECK_EQ(1U, committers_erased
);
140 // Remove from the ScopedVector, deleting the core in the process.
141 for (ScopedVector
<NonBlockingTypeProcessorCore
>::iterator it
=
142 non_blocking_type_processor_cores_
.begin();
143 it
!= non_blocking_type_processor_cores_
.end(); ++it
) {
144 if ((*it
)->GetModelType() == type
) {
145 non_blocking_type_processor_cores_
.erase(it
);
151 ModelTypeSet
ModelTypeRegistry::GetEnabledTypes() const {
152 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
155 UpdateHandlerMap
* ModelTypeRegistry::update_handler_map() {
156 return &update_handler_map_
;
159 CommitContributorMap
* ModelTypeRegistry::commit_contributor_map() {
160 return &commit_contributor_map_
;
163 DirectoryTypeDebugInfoEmitterMap
*
164 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
165 return &directory_type_debug_info_emitter_map_
;
168 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
169 syncer::TypeDebugInfoObserver
* observer
) {
170 if (!type_debug_info_observers_
.HasObserver(observer
))
171 type_debug_info_observers_
.AddObserver(observer
);
174 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
175 syncer::TypeDebugInfoObserver
* observer
) {
176 type_debug_info_observers_
.RemoveObserver(observer
);
179 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
180 syncer::TypeDebugInfoObserver
* observer
) {
181 return type_debug_info_observers_
.HasObserver(observer
);
184 void ModelTypeRegistry::RequestEmitDebugInfo() {
185 for (DirectoryTypeDebugInfoEmitterMap::iterator it
=
186 directory_type_debug_info_emitter_map_
.begin();
187 it
!= directory_type_debug_info_emitter_map_
.end(); ++it
) {
188 it
->second
->EmitCommitCountersUpdate();
189 it
->second
->EmitUpdateCountersUpdate();
190 it
->second
->EmitStatusCountersUpdate();
194 ModelTypeSet
ModelTypeRegistry::GetEnabledDirectoryTypes() const {
195 return enabled_directory_types_
;
198 ModelTypeSet
ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
199 ModelTypeSet enabled_off_thread_types
;
200 for (ScopedVector
<NonBlockingTypeProcessorCore
>::const_iterator it
=
201 non_blocking_type_processor_cores_
.begin();
202 it
!= non_blocking_type_processor_cores_
.end(); ++it
) {
203 enabled_off_thread_types
.Put((*it
)->GetModelType());
205 return enabled_off_thread_types
;
208 } // namespace syncer