cygprofile: increase timeouts to allow showing web contents
[chromium-blink-merge.git] / sync / sessions / model_type_registry.cc
blobf0375f7b2092858dd20f0b0449c155b43f6b345c
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/sessions/model_type_registry.h"
7 #include "base/bind.h"
8 #include "base/observer_list.h"
9 #include "base/thread_task_runner_handle.h"
10 #include "sync/engine/commit_queue.h"
11 #include "sync/engine/directory_commit_contributor.h"
12 #include "sync/engine/directory_update_handler.h"
13 #include "sync/engine/model_type_processor.h"
14 #include "sync/engine/model_type_processor_impl.h"
15 #include "sync/engine/model_type_worker.h"
16 #include "sync/internal_api/public/non_blocking_sync_common.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 #include "sync/util/cryptographer.h"
20 namespace syncer {
22 namespace {
24 class ModelTypeProcessorProxy : public syncer_v2::ModelTypeProcessor {
25 public:
26 ModelTypeProcessorProxy(
27 const base::WeakPtr<syncer_v2::ModelTypeProcessor>& processor,
28 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
29 ~ModelTypeProcessorProxy() override;
31 void OnConnect(scoped_ptr<syncer_v2::CommitQueue> worker) override;
32 void OnCommitCompleted(
33 const syncer_v2::DataTypeState& type_state,
34 const syncer_v2::CommitResponseDataList& response_list) override;
35 void OnUpdateReceived(
36 const syncer_v2::DataTypeState& type_state,
37 const syncer_v2::UpdateResponseDataList& response_list,
38 const syncer_v2::UpdateResponseDataList& pending_updates) override;
40 private:
41 base::WeakPtr<syncer_v2::ModelTypeProcessor> processor_;
42 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
45 ModelTypeProcessorProxy::ModelTypeProcessorProxy(
46 const base::WeakPtr<syncer_v2::ModelTypeProcessor>& processor,
47 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
48 : processor_(processor), processor_task_runner_(processor_task_runner) {}
50 ModelTypeProcessorProxy::~ModelTypeProcessorProxy() {
53 void ModelTypeProcessorProxy::OnConnect(
54 scoped_ptr<syncer_v2::CommitQueue> worker) {
55 processor_task_runner_->PostTask(
56 FROM_HERE, base::Bind(&syncer_v2::ModelTypeProcessor::OnConnect,
57 processor_, base::Passed(worker.Pass())));
60 void ModelTypeProcessorProxy::OnCommitCompleted(
61 const syncer_v2::DataTypeState& type_state,
62 const syncer_v2::CommitResponseDataList& response_list) {
63 processor_task_runner_->PostTask(
64 FROM_HERE,
65 base::Bind(&syncer_v2::ModelTypeProcessor::OnCommitCompleted,
66 processor_, type_state, response_list));
69 void ModelTypeProcessorProxy::OnUpdateReceived(
70 const syncer_v2::DataTypeState& type_state,
71 const syncer_v2::UpdateResponseDataList& response_list,
72 const syncer_v2::UpdateResponseDataList& pending_updates) {
73 processor_task_runner_->PostTask(
74 FROM_HERE,
75 base::Bind(&syncer_v2::ModelTypeProcessor::OnUpdateReceived,
76 processor_, type_state, response_list, pending_updates));
79 class CommitQueueProxy : public syncer_v2::CommitQueue {
80 public:
81 CommitQueueProxy(const base::WeakPtr<syncer_v2::ModelTypeWorker>& worker,
82 const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
83 ~CommitQueueProxy() override;
85 void EnqueueForCommit(const syncer_v2::CommitRequestDataList& list) override;
87 private:
88 base::WeakPtr<syncer_v2::ModelTypeWorker> worker_;
89 scoped_refptr<base::SequencedTaskRunner> sync_thread_;
92 CommitQueueProxy::CommitQueueProxy(
93 const base::WeakPtr<syncer_v2::ModelTypeWorker>& worker,
94 const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
95 : worker_(worker), sync_thread_(sync_thread) {}
97 CommitQueueProxy::~CommitQueueProxy() {}
99 void CommitQueueProxy::EnqueueForCommit(
100 const syncer_v2::CommitRequestDataList& list) {
101 sync_thread_->PostTask(
102 FROM_HERE,
103 base::Bind(&syncer_v2::ModelTypeWorker::EnqueueForCommit, worker_, list));
106 } // namespace
108 ModelTypeRegistry::ModelTypeRegistry(
109 const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
110 syncable::Directory* directory,
111 NudgeHandler* nudge_handler)
112 : directory_(directory),
113 nudge_handler_(nudge_handler),
114 weak_ptr_factory_(this) {
115 for (size_t i = 0u; i < workers.size(); ++i) {
116 workers_map_.insert(
117 std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
121 ModelTypeRegistry::~ModelTypeRegistry() {
124 void ModelTypeRegistry::SetEnabledDirectoryTypes(
125 const ModelSafeRoutingInfo& routing_info) {
126 // Remove all existing directory processors and delete them. The
127 // DebugInfoEmitters are not deleted here, since we want to preserve their
128 // counters.
129 for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
130 it.Good(); it.Inc()) {
131 size_t result1 = update_handler_map_.erase(it.Get());
132 size_t result2 = commit_contributor_map_.erase(it.Get());
133 DCHECK_EQ(1U, result1);
134 DCHECK_EQ(1U, result2);
137 // Clear the old instances of directory update handlers and commit
138 // contributors, deleting their contents in the processs.
139 directory_update_handlers_.clear();
140 directory_commit_contributors_.clear();
142 // Create new ones and add them to the appropriate containers.
143 for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
144 routing_iter != routing_info.end(); ++routing_iter) {
145 ModelType type = routing_iter->first;
146 ModelSafeGroup group = routing_iter->second;
147 std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
148 worker_it = workers_map_.find(group);
149 DCHECK(worker_it != workers_map_.end());
150 scoped_refptr<ModelSafeWorker> worker = worker_it->second;
152 // DebugInfoEmitters are never deleted. Use existing one if we have it.
153 DirectoryTypeDebugInfoEmitter* emitter = NULL;
154 DirectoryTypeDebugInfoEmitterMap::iterator it =
155 directory_type_debug_info_emitter_map_.find(type);
156 if (it != directory_type_debug_info_emitter_map_.end()) {
157 emitter = it->second;
158 } else {
159 emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
160 &type_debug_info_observers_);
161 directory_type_debug_info_emitter_map_.insert(
162 std::make_pair(type, emitter));
163 directory_type_debug_info_emitters_.push_back(emitter);
166 DirectoryCommitContributor* committer =
167 new DirectoryCommitContributor(directory_, type, emitter);
168 DirectoryUpdateHandler* updater =
169 new DirectoryUpdateHandler(directory_, type, worker, emitter);
171 // These containers take ownership of their contents.
172 directory_commit_contributors_.push_back(committer);
173 directory_update_handlers_.push_back(updater);
175 bool inserted1 =
176 update_handler_map_.insert(std::make_pair(type, updater)).second;
177 DCHECK(inserted1) << "Attempt to override existing type handler in map";
179 bool inserted2 =
180 commit_contributor_map_.insert(std::make_pair(type, committer)).second;
181 DCHECK(inserted2) << "Attempt to override existing type handler in map";
184 enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
185 DCHECK(Intersection(GetEnabledDirectoryTypes(),
186 GetEnabledNonBlockingTypes()).Empty());
189 void ModelTypeRegistry::ConnectSyncTypeToWorker(
190 ModelType type,
191 const syncer_v2::DataTypeState& data_type_state,
192 const syncer_v2::UpdateResponseDataList& saved_pending_updates,
193 const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
194 const base::WeakPtr<syncer_v2::ModelTypeProcessor>& type_processor) {
195 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
197 // Initialize Worker -> Processor communication channel.
198 scoped_ptr<syncer_v2::ModelTypeProcessor> processor_proxy(
199 new ModelTypeProcessorProxy(type_processor, type_task_runner));
200 scoped_ptr<Cryptographer> cryptographer_copy;
201 if (encrypted_types_.Has(type))
202 cryptographer_copy.reset(new Cryptographer(*cryptographer_));
204 scoped_ptr<syncer_v2::ModelTypeWorker> worker(new syncer_v2::ModelTypeWorker(
205 type, data_type_state, saved_pending_updates, cryptographer_copy.Pass(),
206 nudge_handler_, processor_proxy.Pass()));
208 // Initialize Processor -> Worker communication channel.
209 scoped_ptr<syncer_v2::CommitQueue> commit_queue_proxy(new CommitQueueProxy(
210 worker->AsWeakPtr(), scoped_refptr<base::SequencedTaskRunner>(
211 base::ThreadTaskRunnerHandle::Get())));
212 type_task_runner->PostTask(
213 FROM_HERE, base::Bind(&syncer_v2::ModelTypeProcessor::OnConnect,
214 type_processor, base::Passed(&commit_queue_proxy)));
216 DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
217 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
219 update_handler_map_.insert(std::make_pair(type, worker.get()));
220 commit_contributor_map_.insert(std::make_pair(type, worker.get()));
222 // The container takes ownership.
223 model_type_workers_.push_back(worker.Pass());
225 DCHECK(Intersection(GetEnabledDirectoryTypes(),
226 GetEnabledNonBlockingTypes()).Empty());
229 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
230 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
231 DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
232 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
234 size_t updaters_erased = update_handler_map_.erase(type);
235 size_t committers_erased = commit_contributor_map_.erase(type);
237 DCHECK_EQ(1U, updaters_erased);
238 DCHECK_EQ(1U, committers_erased);
240 // Remove from the ScopedVector, deleting the worker in the process.
241 for (ScopedVector<syncer_v2::ModelTypeWorker>::iterator it =
242 model_type_workers_.begin();
243 it != model_type_workers_.end(); ++it) {
244 if ((*it)->GetModelType() == type) {
245 model_type_workers_.erase(it);
246 break;
251 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
252 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
255 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
256 return &update_handler_map_;
259 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
260 return &commit_contributor_map_;
263 DirectoryTypeDebugInfoEmitterMap*
264 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
265 return &directory_type_debug_info_emitter_map_;
268 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
269 syncer::TypeDebugInfoObserver* observer) {
270 if (!type_debug_info_observers_.HasObserver(observer))
271 type_debug_info_observers_.AddObserver(observer);
274 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
275 syncer::TypeDebugInfoObserver* observer) {
276 type_debug_info_observers_.RemoveObserver(observer);
279 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
280 const syncer::TypeDebugInfoObserver* observer) const {
281 return type_debug_info_observers_.HasObserver(observer);
284 void ModelTypeRegistry::RequestEmitDebugInfo() {
285 for (DirectoryTypeDebugInfoEmitterMap::iterator it =
286 directory_type_debug_info_emitter_map_.begin();
287 it != directory_type_debug_info_emitter_map_.end(); ++it) {
288 it->second->EmitCommitCountersUpdate();
289 it->second->EmitUpdateCountersUpdate();
290 it->second->EmitStatusCountersUpdate();
294 base::WeakPtr<syncer_v2::SyncContext> ModelTypeRegistry::AsWeakPtr() {
295 return weak_ptr_factory_.GetWeakPtr();
298 void ModelTypeRegistry::OnPassphraseRequired(
299 PassphraseRequiredReason reason,
300 const sync_pb::EncryptedData& pending_keys) {
303 void ModelTypeRegistry::OnPassphraseAccepted() {
306 void ModelTypeRegistry::OnBootstrapTokenUpdated(
307 const std::string& bootstrap_token,
308 BootstrapTokenType type) {
311 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
312 bool encrypt_everything) {
313 encrypted_types_ = encrypted_types;
314 OnEncryptionStateChanged();
317 void ModelTypeRegistry::OnEncryptionComplete() {
320 void ModelTypeRegistry::OnCryptographerStateChanged(
321 Cryptographer* cryptographer) {
322 cryptographer_.reset(new Cryptographer(*cryptographer));
323 OnEncryptionStateChanged();
326 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type,
327 base::Time passphrase_time) {
330 void ModelTypeRegistry::OnLocalSetPassphraseEncryption(
331 const SyncEncryptionHandler::NigoriState& nigori_state) {
334 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
335 return enabled_directory_types_;
338 void ModelTypeRegistry::OnEncryptionStateChanged() {
339 for (ScopedVector<syncer_v2::ModelTypeWorker>::iterator it =
340 model_type_workers_.begin();
341 it != model_type_workers_.end(); ++it) {
342 if (encrypted_types_.Has((*it)->GetModelType())) {
343 (*it)->UpdateCryptographer(
344 make_scoped_ptr(new Cryptographer(*cryptographer_)));
349 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
350 ModelTypeSet enabled_off_thread_types;
351 for (ScopedVector<syncer_v2::ModelTypeWorker>::const_iterator it =
352 model_type_workers_.begin();
353 it != model_type_workers_.end(); ++it) {
354 enabled_off_thread_types.Put((*it)->GetModelType());
356 return enabled_off_thread_types;
359 } // namespace syncer