Complete SyncMessageFilter initialization after SyncChannel initialization
[chromium-blink-merge.git] / components / sync_driver / shared_change_processor.cc
blob88828bed838af05dc23ce6e03f7941986cc7407b
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/sync_driver/shared_change_processor.h"
7 #include "base/thread_task_runner_handle.h"
8 #include "components/sync_driver/generic_change_processor.h"
9 #include "components/sync_driver/generic_change_processor_factory.h"
10 #include "components/sync_driver/sync_api_component_factory.h"
11 #include "sync/api/sync_change.h"
13 using base::AutoLock;
15 namespace syncer {
16 class AttachmentService;
19 namespace sync_driver {
21 SharedChangeProcessor::SharedChangeProcessor()
22 : disconnected_(false),
23 type_(syncer::UNSPECIFIED),
24 frontend_task_runner_(base::ThreadTaskRunnerHandle::Get()),
25 generic_change_processor_(NULL),
26 error_handler_(NULL) {
29 SharedChangeProcessor::~SharedChangeProcessor() {
30 // We can either be deleted when the DTC is destroyed (on UI
31 // thread), or when the syncer::SyncableService stops syncing (datatype
32 // thread). |generic_change_processor_|, if non-NULL, must be
33 // deleted on |backend_loop_|.
34 if (backend_task_runner_.get()) {
35 if (backend_task_runner_->BelongsToCurrentThread()) {
36 delete generic_change_processor_;
37 } else {
38 DCHECK(frontend_task_runner_->BelongsToCurrentThread());
39 if (!backend_task_runner_->DeleteSoon(FROM_HERE,
40 generic_change_processor_)) {
41 NOTREACHED();
44 } else {
45 DCHECK(!generic_change_processor_);
49 base::WeakPtr<syncer::SyncableService> SharedChangeProcessor::Connect(
50 SyncApiComponentFactory* sync_factory,
51 GenericChangeProcessorFactory* processor_factory,
52 syncer::UserShare* user_share,
53 DataTypeErrorHandler* error_handler,
54 syncer::ModelType type,
55 const base::WeakPtr<syncer::SyncMergeResult>& merge_result) {
56 DCHECK(sync_factory);
57 DCHECK(error_handler);
58 DCHECK_NE(type, syncer::UNSPECIFIED);
59 backend_task_runner_ = base::ThreadTaskRunnerHandle::Get();
60 AutoLock lock(monitor_lock_);
61 if (disconnected_)
62 return base::WeakPtr<syncer::SyncableService>();
63 type_ = type;
64 error_handler_ = error_handler;
65 base::WeakPtr<syncer::SyncableService> local_service =
66 sync_factory->GetSyncableServiceForType(type);
67 if (!local_service.get()) {
68 LOG(WARNING) << "SyncableService destroyed before DTC was stopped.";
69 disconnected_ = true;
70 return base::WeakPtr<syncer::SyncableService>();
73 generic_change_processor_ =
74 processor_factory->CreateGenericChangeProcessor(type,
75 user_share,
76 error_handler,
77 local_service,
78 merge_result,
79 sync_factory).release();
80 // If available, propagate attachment service to the syncable service.
81 scoped_ptr<syncer::AttachmentService> attachment_service =
82 generic_change_processor_->GetAttachmentService();
83 if (attachment_service) {
84 local_service->SetAttachmentService(attachment_service.Pass());
86 return local_service;
89 bool SharedChangeProcessor::Disconnect() {
90 // May be called from any thread.
91 DVLOG(1) << "Disconnecting change processor.";
92 AutoLock lock(monitor_lock_);
93 bool was_connected = !disconnected_;
94 disconnected_ = true;
95 error_handler_ = NULL;
96 return was_connected;
99 ChangeProcessor* SharedChangeProcessor::generic_change_processor() {
100 return generic_change_processor_;
103 int SharedChangeProcessor::GetSyncCount() {
104 DCHECK(backend_task_runner_.get());
105 DCHECK(backend_task_runner_->BelongsToCurrentThread());
106 AutoLock lock(monitor_lock_);
107 if (disconnected_) {
108 LOG(ERROR) << "Change processor disconnected.";
109 return 0;
111 return generic_change_processor_->GetSyncCount();
114 syncer::SyncError SharedChangeProcessor::ProcessSyncChanges(
115 const tracked_objects::Location& from_here,
116 const syncer::SyncChangeList& list_of_changes) {
117 DCHECK(backend_task_runner_.get());
118 DCHECK(backend_task_runner_->BelongsToCurrentThread());
119 AutoLock lock(monitor_lock_);
120 if (disconnected_) {
121 // The DTC that disconnects us must ensure it posts a StopSyncing task.
122 // If we reach this, it means it just hasn't executed yet.
123 syncer::SyncError error(FROM_HERE,
124 syncer::SyncError::DATATYPE_ERROR,
125 "Change processor disconnected.",
126 type_);
127 return error;
129 return generic_change_processor_->ProcessSyncChanges(
130 from_here, list_of_changes);
133 syncer::SyncDataList SharedChangeProcessor::GetAllSyncData(
134 syncer::ModelType type) const {
135 syncer::SyncDataList data;
136 GetAllSyncDataReturnError(type, &data); // Handles the disconnect case.
137 return data;
140 syncer::SyncError SharedChangeProcessor::GetAllSyncDataReturnError(
141 syncer::ModelType type,
142 syncer::SyncDataList* data) const {
143 DCHECK(backend_task_runner_.get());
144 DCHECK(backend_task_runner_->BelongsToCurrentThread());
145 AutoLock lock(monitor_lock_);
146 if (disconnected_) {
147 syncer::SyncError error(FROM_HERE,
148 syncer::SyncError::DATATYPE_ERROR,
149 "Change processor disconnected.",
150 type_);
151 return error;
153 return generic_change_processor_->GetAllSyncDataReturnError(data);
156 syncer::SyncError SharedChangeProcessor::UpdateDataTypeContext(
157 syncer::ModelType type,
158 syncer::SyncChangeProcessor::ContextRefreshStatus refresh_status,
159 const std::string& context) {
160 DCHECK(backend_task_runner_.get());
161 DCHECK(backend_task_runner_->BelongsToCurrentThread());
162 AutoLock lock(monitor_lock_);
163 if (disconnected_) {
164 syncer::SyncError error(FROM_HERE,
165 syncer::SyncError::DATATYPE_ERROR,
166 "Change processor disconnected.",
167 type_);
168 return error;
170 return generic_change_processor_->UpdateDataTypeContext(
171 type, refresh_status, context);
174 bool SharedChangeProcessor::SyncModelHasUserCreatedNodes(bool* has_nodes) {
175 DCHECK(backend_task_runner_.get());
176 DCHECK(backend_task_runner_->BelongsToCurrentThread());
177 AutoLock lock(monitor_lock_);
178 if (disconnected_) {
179 LOG(ERROR) << "Change processor disconnected.";
180 return false;
182 return generic_change_processor_->SyncModelHasUserCreatedNodes(has_nodes);
185 bool SharedChangeProcessor::CryptoReadyIfNecessary() {
186 DCHECK(backend_task_runner_.get());
187 DCHECK(backend_task_runner_->BelongsToCurrentThread());
188 AutoLock lock(monitor_lock_);
189 if (disconnected_) {
190 LOG(ERROR) << "Change processor disconnected.";
191 return true; // Otherwise we get into infinite spin waiting.
193 return generic_change_processor_->CryptoReadyIfNecessary();
196 bool SharedChangeProcessor::GetDataTypeContext(std::string* context) const {
197 DCHECK(backend_task_runner_.get());
198 DCHECK(backend_task_runner_->BelongsToCurrentThread());
199 AutoLock lock(monitor_lock_);
200 if (disconnected_) {
201 LOG(ERROR) << "Change processor disconnected.";
202 return false;
204 return generic_change_processor_->GetDataTypeContext(context);
207 syncer::SyncError SharedChangeProcessor::CreateAndUploadError(
208 const tracked_objects::Location& location,
209 const std::string& message) {
210 AutoLock lock(monitor_lock_);
211 if (!disconnected_) {
212 return error_handler_->CreateAndUploadError(location, message, type_);
213 } else {
214 return syncer::SyncError(location,
215 syncer::SyncError::DATATYPE_ERROR,
216 message,
217 type_);
221 } // namespace sync_driver