[SyncFS] Build indexes from FileTracker entries on disk.
[chromium-blink-merge.git] / sync / sessions / model_type_registry.cc
blob69a54883d1f1f9c71ae8084e83e1dd7b3a6a5219
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/sessions/model_type_registry.h"
7 #include "base/bind.h"
8 #include "base/observer_list.h"
9 #include "base/thread_task_runner_handle.h"
10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/model_type_sync_proxy.h"
13 #include "sync/engine/model_type_sync_proxy_impl.h"
14 #include "sync/engine/model_type_sync_worker.h"
15 #include "sync/engine/model_type_sync_worker_impl.h"
16 #include "sync/engine/non_blocking_sync_common.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
19 namespace syncer {
21 namespace {
23 class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
24 public:
25 ModelTypeSyncProxyWrapper(
26 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
27 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
28 virtual ~ModelTypeSyncProxyWrapper();
30 virtual void OnCommitCompleted(
31 const DataTypeState& type_state,
32 const CommitResponseDataList& response_list) OVERRIDE;
33 virtual void OnUpdateReceived(
34 const DataTypeState& type_state,
35 const UpdateResponseDataList& response_list) OVERRIDE;
37 private:
38 base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
39 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
42 ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
43 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
44 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
45 : processor_(proxy), processor_task_runner_(processor_task_runner) {
48 ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
51 void ModelTypeSyncProxyWrapper::OnCommitCompleted(
52 const DataTypeState& type_state,
53 const CommitResponseDataList& response_list) {
54 processor_task_runner_->PostTask(
55 FROM_HERE,
56 base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted,
57 processor_,
58 type_state,
59 response_list));
62 void ModelTypeSyncProxyWrapper::OnUpdateReceived(
63 const DataTypeState& type_state,
64 const UpdateResponseDataList& response_list) {
65 processor_task_runner_->PostTask(
66 FROM_HERE,
67 base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
68 processor_,
69 type_state,
70 response_list));
73 class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
74 public:
75 ModelTypeSyncWorkerWrapper(
76 const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
77 const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
78 virtual ~ModelTypeSyncWorkerWrapper();
80 virtual void EnqueueForCommit(const CommitRequestDataList& list) OVERRIDE;
82 private:
83 base::WeakPtr<ModelTypeSyncWorkerImpl> worker_;
84 scoped_refptr<base::SequencedTaskRunner> sync_thread_;
87 ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
88 const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
89 const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
90 : worker_(worker), sync_thread_(sync_thread) {
93 ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
96 void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
97 const CommitRequestDataList& list) {
98 sync_thread_->PostTask(
99 FROM_HERE,
100 base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list));
103 } // namespace
105 ModelTypeRegistry::ModelTypeRegistry(
106 const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
107 syncable::Directory* directory,
108 NudgeHandler* nudge_handler)
109 : directory_(directory),
110 nudge_handler_(nudge_handler),
111 weak_ptr_factory_(this) {
112 for (size_t i = 0u; i < workers.size(); ++i) {
113 workers_map_.insert(
114 std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
118 ModelTypeRegistry::~ModelTypeRegistry() {}
120 void ModelTypeRegistry::SetEnabledDirectoryTypes(
121 const ModelSafeRoutingInfo& routing_info) {
122 // Remove all existing directory processors and delete them. The
123 // DebugInfoEmitters are not deleted here, since we want to preserve their
124 // counters.
125 for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
126 it.Good(); it.Inc()) {
127 size_t result1 = update_handler_map_.erase(it.Get());
128 size_t result2 = commit_contributor_map_.erase(it.Get());
129 DCHECK_EQ(1U, result1);
130 DCHECK_EQ(1U, result2);
133 // Clear the old instances of directory update handlers and commit
134 // contributors, deleting their contents in the processs.
135 directory_update_handlers_.clear();
136 directory_commit_contributors_.clear();
138 // Create new ones and add them to the appropriate containers.
139 for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
140 routing_iter != routing_info.end(); ++routing_iter) {
141 ModelType type = routing_iter->first;
142 ModelSafeGroup group = routing_iter->second;
143 std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
144 worker_it = workers_map_.find(group);
145 DCHECK(worker_it != workers_map_.end());
146 scoped_refptr<ModelSafeWorker> worker = worker_it->second;
148 // DebugInfoEmitters are never deleted. Use existing one if we have it.
149 DirectoryTypeDebugInfoEmitter* emitter = NULL;
150 DirectoryTypeDebugInfoEmitterMap::iterator it =
151 directory_type_debug_info_emitter_map_.find(type);
152 if (it != directory_type_debug_info_emitter_map_.end()) {
153 emitter = it->second;
154 } else {
155 emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
156 &type_debug_info_observers_);
157 directory_type_debug_info_emitter_map_.insert(
158 std::make_pair(type, emitter));
159 directory_type_debug_info_emitters_.push_back(emitter);
162 DirectoryCommitContributor* committer =
163 new DirectoryCommitContributor(directory_, type, emitter);
164 DirectoryUpdateHandler* updater =
165 new DirectoryUpdateHandler(directory_, type, worker, emitter);
167 // These containers take ownership of their contents.
168 directory_commit_contributors_.push_back(committer);
169 directory_update_handlers_.push_back(updater);
171 bool inserted1 =
172 update_handler_map_.insert(std::make_pair(type, updater)).second;
173 DCHECK(inserted1) << "Attempt to override existing type handler in map";
175 bool inserted2 =
176 commit_contributor_map_.insert(std::make_pair(type, committer)).second;
177 DCHECK(inserted2) << "Attempt to override existing type handler in map";
180 enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
181 DCHECK(Intersection(GetEnabledDirectoryTypes(),
182 GetEnabledNonBlockingTypes()).Empty());
185 void ModelTypeRegistry::ConnectSyncTypeToWorker(
186 ModelType type,
187 const DataTypeState& data_type_state,
188 const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
189 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
190 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
192 // Initialize Worker -> Proxy communication channel.
193 scoped_ptr<ModelTypeSyncProxy> proxy(
194 new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
195 scoped_ptr<ModelTypeSyncWorkerImpl> worker(new ModelTypeSyncWorkerImpl(
196 type, data_type_state, nudge_handler_, proxy.Pass()));
198 // Initialize Proxy -> Worker communication channel.
199 scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
200 new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(),
201 scoped_refptr<base::SequencedTaskRunner>(
202 base::ThreadTaskRunnerHandle::Get())));
203 type_task_runner->PostTask(FROM_HERE,
204 base::Bind(&ModelTypeSyncProxyImpl::OnConnect,
205 proxy_impl,
206 base::Passed(&wrapped_worker)));
208 DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
209 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
211 update_handler_map_.insert(std::make_pair(type, worker.get()));
212 commit_contributor_map_.insert(std::make_pair(type, worker.get()));
214 // The container takes ownership.
215 model_type_sync_workers_.push_back(worker.release());
217 DCHECK(Intersection(GetEnabledDirectoryTypes(),
218 GetEnabledNonBlockingTypes()).Empty());
221 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
222 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
223 DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
224 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
226 size_t updaters_erased = update_handler_map_.erase(type);
227 size_t committers_erased = commit_contributor_map_.erase(type);
229 DCHECK_EQ(1U, updaters_erased);
230 DCHECK_EQ(1U, committers_erased);
232 // Remove from the ScopedVector, deleting the worker in the process.
233 for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
234 model_type_sync_workers_.begin();
235 it != model_type_sync_workers_.end();
236 ++it) {
237 if ((*it)->GetModelType() == type) {
238 model_type_sync_workers_.erase(it);
239 break;
244 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
245 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
248 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
249 return &update_handler_map_;
252 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
253 return &commit_contributor_map_;
256 DirectoryTypeDebugInfoEmitterMap*
257 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
258 return &directory_type_debug_info_emitter_map_;
261 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
262 syncer::TypeDebugInfoObserver* observer) {
263 if (!type_debug_info_observers_.HasObserver(observer))
264 type_debug_info_observers_.AddObserver(observer);
267 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
268 syncer::TypeDebugInfoObserver* observer) {
269 type_debug_info_observers_.RemoveObserver(observer);
272 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
273 syncer::TypeDebugInfoObserver* observer) {
274 return type_debug_info_observers_.HasObserver(observer);
277 void ModelTypeRegistry::RequestEmitDebugInfo() {
278 for (DirectoryTypeDebugInfoEmitterMap::iterator it =
279 directory_type_debug_info_emitter_map_.begin();
280 it != directory_type_debug_info_emitter_map_.end(); ++it) {
281 it->second->EmitCommitCountersUpdate();
282 it->second->EmitUpdateCountersUpdate();
283 it->second->EmitStatusCountersUpdate();
287 base::WeakPtr<SyncContext> ModelTypeRegistry::AsWeakPtr() {
288 return weak_ptr_factory_.GetWeakPtr();
291 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
292 return enabled_directory_types_;
295 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
296 ModelTypeSet enabled_off_thread_types;
297 for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it =
298 model_type_sync_workers_.begin();
299 it != model_type_sync_workers_.end();
300 ++it) {
301 enabled_off_thread_types.Put((*it)->GetModelType());
303 return enabled_off_thread_types;
306 } // namespace syncer