Drive: Add BatchableRequest subclass.
[chromium-blink-merge.git] / sync / sessions / model_type_registry.cc
blob2366f205660c565079c4f66ac287aee0a6811f6a
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/sessions/model_type_registry.h"
7 #include "base/bind.h"
8 #include "base/observer_list.h"
9 #include "base/thread_task_runner_handle.h"
10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/model_type_sync_proxy.h"
13 #include "sync/engine/model_type_sync_proxy_impl.h"
14 #include "sync/engine/model_type_sync_worker.h"
15 #include "sync/engine/model_type_sync_worker_impl.h"
16 #include "sync/internal_api/public/non_blocking_sync_common.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 #include "sync/util/cryptographer.h"
20 namespace syncer {
22 namespace {
24 class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
25 public:
26 ModelTypeSyncProxyWrapper(
27 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
28 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
29 ~ModelTypeSyncProxyWrapper() override;
31 void OnCommitCompleted(const DataTypeState& type_state,
32 const CommitResponseDataList& response_list) override;
33 void OnUpdateReceived(const DataTypeState& type_state,
34 const UpdateResponseDataList& response_list,
35 const UpdateResponseDataList& pending_updates) override;
37 private:
38 base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
39 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
42 ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
43 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
44 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
45 : processor_(proxy), processor_task_runner_(processor_task_runner) {
48 ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
51 void ModelTypeSyncProxyWrapper::OnCommitCompleted(
52 const DataTypeState& type_state,
53 const CommitResponseDataList& response_list) {
54 processor_task_runner_->PostTask(
55 FROM_HERE,
56 base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted,
57 processor_,
58 type_state,
59 response_list));
62 void ModelTypeSyncProxyWrapper::OnUpdateReceived(
63 const DataTypeState& type_state,
64 const UpdateResponseDataList& response_list,
65 const UpdateResponseDataList& pending_updates) {
66 processor_task_runner_->PostTask(
67 FROM_HERE,
68 base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
69 processor_,
70 type_state,
71 response_list,
72 pending_updates));
75 class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
76 public:
77 ModelTypeSyncWorkerWrapper(
78 const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
79 const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
80 ~ModelTypeSyncWorkerWrapper() override;
82 void EnqueueForCommit(const CommitRequestDataList& list) override;
84 private:
85 base::WeakPtr<ModelTypeSyncWorkerImpl> worker_;
86 scoped_refptr<base::SequencedTaskRunner> sync_thread_;
89 ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
90 const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
91 const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
92 : worker_(worker), sync_thread_(sync_thread) {
95 ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
98 void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
99 const CommitRequestDataList& list) {
100 sync_thread_->PostTask(
101 FROM_HERE,
102 base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list));
105 } // namespace
107 ModelTypeRegistry::ModelTypeRegistry(
108 const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
109 syncable::Directory* directory,
110 NudgeHandler* nudge_handler)
111 : directory_(directory),
112 nudge_handler_(nudge_handler),
113 weak_ptr_factory_(this) {
114 for (size_t i = 0u; i < workers.size(); ++i) {
115 workers_map_.insert(
116 std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
120 ModelTypeRegistry::~ModelTypeRegistry() {
123 void ModelTypeRegistry::SetEnabledDirectoryTypes(
124 const ModelSafeRoutingInfo& routing_info) {
125 // Remove all existing directory processors and delete them. The
126 // DebugInfoEmitters are not deleted here, since we want to preserve their
127 // counters.
128 for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
129 it.Good(); it.Inc()) {
130 size_t result1 = update_handler_map_.erase(it.Get());
131 size_t result2 = commit_contributor_map_.erase(it.Get());
132 DCHECK_EQ(1U, result1);
133 DCHECK_EQ(1U, result2);
136 // Clear the old instances of directory update handlers and commit
137 // contributors, deleting their contents in the processs.
138 directory_update_handlers_.clear();
139 directory_commit_contributors_.clear();
141 // Create new ones and add them to the appropriate containers.
142 for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
143 routing_iter != routing_info.end(); ++routing_iter) {
144 ModelType type = routing_iter->first;
145 ModelSafeGroup group = routing_iter->second;
146 std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
147 worker_it = workers_map_.find(group);
148 DCHECK(worker_it != workers_map_.end());
149 scoped_refptr<ModelSafeWorker> worker = worker_it->second;
151 // DebugInfoEmitters are never deleted. Use existing one if we have it.
152 DirectoryTypeDebugInfoEmitter* emitter = NULL;
153 DirectoryTypeDebugInfoEmitterMap::iterator it =
154 directory_type_debug_info_emitter_map_.find(type);
155 if (it != directory_type_debug_info_emitter_map_.end()) {
156 emitter = it->second;
157 } else {
158 emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
159 &type_debug_info_observers_);
160 directory_type_debug_info_emitter_map_.insert(
161 std::make_pair(type, emitter));
162 directory_type_debug_info_emitters_.push_back(emitter);
165 DirectoryCommitContributor* committer =
166 new DirectoryCommitContributor(directory_, type, emitter);
167 DirectoryUpdateHandler* updater =
168 new DirectoryUpdateHandler(directory_, type, worker, emitter);
170 // These containers take ownership of their contents.
171 directory_commit_contributors_.push_back(committer);
172 directory_update_handlers_.push_back(updater);
174 bool inserted1 =
175 update_handler_map_.insert(std::make_pair(type, updater)).second;
176 DCHECK(inserted1) << "Attempt to override existing type handler in map";
178 bool inserted2 =
179 commit_contributor_map_.insert(std::make_pair(type, committer)).second;
180 DCHECK(inserted2) << "Attempt to override existing type handler in map";
183 enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
184 DCHECK(Intersection(GetEnabledDirectoryTypes(),
185 GetEnabledNonBlockingTypes()).Empty());
188 void ModelTypeRegistry::ConnectSyncTypeToWorker(
189 ModelType type,
190 const DataTypeState& data_type_state,
191 const UpdateResponseDataList& saved_pending_updates,
192 const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
193 const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
194 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
196 // Initialize Worker -> Proxy communication channel.
197 scoped_ptr<ModelTypeSyncProxy> proxy(
198 new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
199 scoped_ptr<Cryptographer> cryptographer_copy;
200 if (encrypted_types_.Has(type))
201 cryptographer_copy.reset(new Cryptographer(*cryptographer_));
203 scoped_ptr<ModelTypeSyncWorkerImpl> worker(
204 new ModelTypeSyncWorkerImpl(type,
205 data_type_state,
206 saved_pending_updates,
207 cryptographer_copy.Pass(),
208 nudge_handler_,
209 proxy.Pass()));
211 // Initialize Proxy -> Worker communication channel.
212 scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
213 new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(),
214 scoped_refptr<base::SequencedTaskRunner>(
215 base::ThreadTaskRunnerHandle::Get())));
216 type_task_runner->PostTask(FROM_HERE,
217 base::Bind(&ModelTypeSyncProxyImpl::OnConnect,
218 proxy_impl,
219 base::Passed(&wrapped_worker)));
221 DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
222 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
224 update_handler_map_.insert(std::make_pair(type, worker.get()));
225 commit_contributor_map_.insert(std::make_pair(type, worker.get()));
227 // The container takes ownership.
228 model_type_sync_workers_.push_back(worker.release());
230 DCHECK(Intersection(GetEnabledDirectoryTypes(),
231 GetEnabledNonBlockingTypes()).Empty());
234 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
235 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
236 DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
237 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
239 size_t updaters_erased = update_handler_map_.erase(type);
240 size_t committers_erased = commit_contributor_map_.erase(type);
242 DCHECK_EQ(1U, updaters_erased);
243 DCHECK_EQ(1U, committers_erased);
245 // Remove from the ScopedVector, deleting the worker in the process.
246 for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
247 model_type_sync_workers_.begin();
248 it != model_type_sync_workers_.end();
249 ++it) {
250 if ((*it)->GetModelType() == type) {
251 model_type_sync_workers_.erase(it);
252 break;
257 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
258 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
261 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
262 return &update_handler_map_;
265 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
266 return &commit_contributor_map_;
269 DirectoryTypeDebugInfoEmitterMap*
270 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
271 return &directory_type_debug_info_emitter_map_;
274 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
275 syncer::TypeDebugInfoObserver* observer) {
276 if (!type_debug_info_observers_.HasObserver(observer))
277 type_debug_info_observers_.AddObserver(observer);
280 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
281 syncer::TypeDebugInfoObserver* observer) {
282 type_debug_info_observers_.RemoveObserver(observer);
285 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
286 const syncer::TypeDebugInfoObserver* observer) const {
287 return type_debug_info_observers_.HasObserver(observer);
290 void ModelTypeRegistry::RequestEmitDebugInfo() {
291 for (DirectoryTypeDebugInfoEmitterMap::iterator it =
292 directory_type_debug_info_emitter_map_.begin();
293 it != directory_type_debug_info_emitter_map_.end(); ++it) {
294 it->second->EmitCommitCountersUpdate();
295 it->second->EmitUpdateCountersUpdate();
296 it->second->EmitStatusCountersUpdate();
300 base::WeakPtr<SyncContext> ModelTypeRegistry::AsWeakPtr() {
301 return weak_ptr_factory_.GetWeakPtr();
304 void ModelTypeRegistry::OnPassphraseRequired(
305 PassphraseRequiredReason reason,
306 const sync_pb::EncryptedData& pending_keys) {
309 void ModelTypeRegistry::OnPassphraseAccepted() {
312 void ModelTypeRegistry::OnBootstrapTokenUpdated(
313 const std::string& bootstrap_token,
314 BootstrapTokenType type) {
317 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
318 bool encrypt_everything) {
319 encrypted_types_ = encrypted_types;
320 OnEncryptionStateChanged();
323 void ModelTypeRegistry::OnEncryptionComplete() {
326 void ModelTypeRegistry::OnCryptographerStateChanged(
327 Cryptographer* cryptographer) {
328 cryptographer_.reset(new Cryptographer(*cryptographer));
329 OnEncryptionStateChanged();
332 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type,
333 base::Time passphrase_time) {
336 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
337 return enabled_directory_types_;
340 void ModelTypeRegistry::OnEncryptionStateChanged() {
341 for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
342 model_type_sync_workers_.begin();
343 it != model_type_sync_workers_.end();
344 ++it) {
345 if (encrypted_types_.Has((*it)->GetModelType())) {
346 (*it)->UpdateCryptographer(
347 make_scoped_ptr(new Cryptographer(*cryptographer_)));
352 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
353 ModelTypeSet enabled_off_thread_types;
354 for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it =
355 model_type_sync_workers_.begin();
356 it != model_type_sync_workers_.end();
357 ++it) {
358 enabled_off_thread_types.Put((*it)->GetModelType());
360 return enabled_off_thread_types;
363 } // namespace syncer