1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/internal_api/public/attachments/attachment_service_impl.h"
10 #include "base/message_loop/message_loop.h"
11 #include "base/thread_task_runner_handle.h"
12 #include "base/time/time.h"
13 #include "sync/api/attachments/attachment.h"
14 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
15 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h"
19 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls.
20 // GetOrDownloadState tracks completion of these calls and posts callback for
21 // consumer once all attachments are either retrieved or reported unavailable.
22 class AttachmentServiceImpl::GetOrDownloadState
23 : public base::RefCounted
<GetOrDownloadState
>,
24 public base::NonThreadSafe
{
26 // GetOrDownloadState gets parameter from values passed to
27 // AttachmentService::GetOrDownloadAttachments.
28 // |attachment_ids| is a list of attachmens to retrieve.
29 // |callback| will be posted on current thread when all attachments retrieved
30 // or confirmed unavailable.
31 GetOrDownloadState(const AttachmentIdList
& attachment_ids
,
32 const GetOrDownloadCallback
& callback
);
34 // Attachment was just retrieved. Add it to retrieved attachments.
35 void AddAttachment(const Attachment
& attachment
);
37 // Both reading from local store and downloading attachment failed.
38 // Add it to unavailable set.
39 void AddUnavailableAttachmentId(const AttachmentId
& attachment_id
);
42 friend class base::RefCounted
<GetOrDownloadState
>;
43 virtual ~GetOrDownloadState();
45 // If all attachment requests completed then post callback to consumer with
47 void PostResultIfAllRequestsCompleted();
49 GetOrDownloadCallback callback_
;
51 // Requests for these attachments are still in progress.
52 AttachmentIdSet in_progress_attachments_
;
54 AttachmentIdSet unavailable_attachments_
;
55 scoped_ptr
<AttachmentMap
> retrieved_attachments_
;
57 DISALLOW_COPY_AND_ASSIGN(GetOrDownloadState
);
60 AttachmentServiceImpl::GetOrDownloadState::GetOrDownloadState(
61 const AttachmentIdList
& attachment_ids
,
62 const GetOrDownloadCallback
& callback
)
63 : callback_(callback
), retrieved_attachments_(new AttachmentMap()) {
65 attachment_ids
.begin(),
67 std::inserter(in_progress_attachments_
, in_progress_attachments_
.end()));
68 PostResultIfAllRequestsCompleted();
71 AttachmentServiceImpl::GetOrDownloadState::~GetOrDownloadState() {
72 DCHECK(CalledOnValidThread());
75 void AttachmentServiceImpl::GetOrDownloadState::AddAttachment(
76 const Attachment
& attachment
) {
77 DCHECK(CalledOnValidThread());
78 DCHECK(retrieved_attachments_
->find(attachment
.GetId()) ==
79 retrieved_attachments_
->end());
80 retrieved_attachments_
->insert(
81 std::make_pair(attachment
.GetId(), attachment
));
82 DCHECK(in_progress_attachments_
.find(attachment
.GetId()) !=
83 in_progress_attachments_
.end());
84 in_progress_attachments_
.erase(attachment
.GetId());
85 PostResultIfAllRequestsCompleted();
88 void AttachmentServiceImpl::GetOrDownloadState::AddUnavailableAttachmentId(
89 const AttachmentId
& attachment_id
) {
90 DCHECK(CalledOnValidThread());
91 DCHECK(unavailable_attachments_
.find(attachment_id
) ==
92 unavailable_attachments_
.end());
93 unavailable_attachments_
.insert(attachment_id
);
94 DCHECK(in_progress_attachments_
.find(attachment_id
) !=
95 in_progress_attachments_
.end());
96 in_progress_attachments_
.erase(attachment_id
);
97 PostResultIfAllRequestsCompleted();
101 AttachmentServiceImpl::GetOrDownloadState::PostResultIfAllRequestsCompleted() {
102 if (in_progress_attachments_
.empty()) {
103 // All requests completed. Let's notify consumer.
104 GetOrDownloadResult result
=
105 unavailable_attachments_
.empty() ? GET_SUCCESS
: GET_UNSPECIFIED_ERROR
;
106 base::MessageLoop::current()->PostTask(
108 base::Bind(callback_
, result
, base::Passed(&retrieved_attachments_
)));
112 AttachmentServiceImpl::AttachmentServiceImpl(
113 scoped_refptr
<AttachmentStore
> attachment_store
,
114 scoped_ptr
<AttachmentUploader
> attachment_uploader
,
115 scoped_ptr
<AttachmentDownloader
> attachment_downloader
,
117 const base::TimeDelta
& initial_backoff_delay
,
118 const base::TimeDelta
& max_backoff_delay
)
119 : attachment_store_(attachment_store
),
120 attachment_uploader_(attachment_uploader
.Pass()),
121 attachment_downloader_(attachment_downloader
.Pass()),
123 weak_ptr_factory_(this) {
124 DCHECK(CalledOnValidThread());
125 DCHECK(attachment_store_
.get());
127 // TODO(maniscalco): Observe network connectivity change events. When the
128 // network becomes disconnected, consider suspending queue dispatch. When
129 // connectivity is restored, consider clearing any dispatch backoff (bug
131 upload_task_queue_
.reset(new TaskQueue
<AttachmentId
>(
132 base::Bind(&AttachmentServiceImpl::BeginUpload
,
133 weak_ptr_factory_
.GetWeakPtr()),
134 initial_backoff_delay
,
137 net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
140 AttachmentServiceImpl::~AttachmentServiceImpl() {
141 DCHECK(CalledOnValidThread());
142 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
146 scoped_ptr
<syncer::AttachmentService
> AttachmentServiceImpl::CreateForTest() {
147 scoped_refptr
<syncer::AttachmentStore
> attachment_store
=
148 AttachmentStore::CreateInMemoryStore();
149 scoped_ptr
<AttachmentUploader
> attachment_uploader(
150 new FakeAttachmentUploader
);
151 scoped_ptr
<AttachmentDownloader
> attachment_downloader(
152 new FakeAttachmentDownloader());
153 scoped_ptr
<syncer::AttachmentService
> attachment_service(
154 new syncer::AttachmentServiceImpl(attachment_store
,
155 attachment_uploader
.Pass(),
156 attachment_downloader
.Pass(),
160 return attachment_service
.Pass();
163 AttachmentStore
* AttachmentServiceImpl::GetStore() {
164 return attachment_store_
.get();
167 void AttachmentServiceImpl::GetOrDownloadAttachments(
168 const AttachmentIdList
& attachment_ids
,
169 const GetOrDownloadCallback
& callback
) {
170 DCHECK(CalledOnValidThread());
171 scoped_refptr
<GetOrDownloadState
> state(
172 new GetOrDownloadState(attachment_ids
, callback
));
173 attachment_store_
->Read(attachment_ids
,
174 base::Bind(&AttachmentServiceImpl::ReadDone
,
175 weak_ptr_factory_
.GetWeakPtr(),
179 void AttachmentServiceImpl::DropAttachments(
180 const AttachmentIdList
& attachment_ids
,
181 const DropCallback
& callback
) {
182 DCHECK(CalledOnValidThread());
183 attachment_store_
->Drop(attachment_ids
,
184 base::Bind(&AttachmentServiceImpl::DropDone
,
185 weak_ptr_factory_
.GetWeakPtr(),
189 void AttachmentServiceImpl::ReadDone(
190 const scoped_refptr
<GetOrDownloadState
>& state
,
191 const AttachmentStore::Result
& result
,
192 scoped_ptr
<AttachmentMap
> attachments
,
193 scoped_ptr
<AttachmentIdList
> unavailable_attachment_ids
) {
194 // Add read attachments to result.
195 for (AttachmentMap::const_iterator iter
= attachments
->begin();
196 iter
!= attachments
->end();
198 state
->AddAttachment(iter
->second
);
201 AttachmentIdList::const_iterator iter
= unavailable_attachment_ids
->begin();
202 AttachmentIdList::const_iterator end
= unavailable_attachment_ids
->end();
203 if (result
!= AttachmentStore::STORE_INITIALIZATION_FAILED
&&
204 attachment_downloader_
.get()) {
205 // Try to download locally unavailable attachments.
206 for (; iter
!= end
; ++iter
) {
207 attachment_downloader_
->DownloadAttachment(
209 base::Bind(&AttachmentServiceImpl::DownloadDone
,
210 weak_ptr_factory_
.GetWeakPtr(),
215 // No downloader so all locally unavailable attachments are unavailable.
216 for (; iter
!= end
; ++iter
) {
217 state
->AddUnavailableAttachmentId(*iter
);
222 void AttachmentServiceImpl::WriteDone(
223 const scoped_refptr
<GetOrDownloadState
>& state
,
224 const Attachment
& attachment
,
225 const AttachmentStore::Result
& result
) {
227 case AttachmentStore::SUCCESS
:
228 state
->AddAttachment(attachment
);
230 case AttachmentStore::UNSPECIFIED_ERROR
:
231 case AttachmentStore::STORE_INITIALIZATION_FAILED
:
232 state
->AddUnavailableAttachmentId(attachment
.GetId());
237 void AttachmentServiceImpl::DropDone(const DropCallback
& callback
,
238 const AttachmentStore::Result
& result
) {
239 AttachmentService::DropResult drop_result
=
240 AttachmentService::DROP_UNSPECIFIED_ERROR
;
241 if (result
== AttachmentStore::SUCCESS
) {
242 drop_result
= AttachmentService::DROP_SUCCESS
;
244 // TODO(maniscalco): Deal with case where an error occurred (bug 361251).
245 base::MessageLoop::current()->PostTask(FROM_HERE
,
246 base::Bind(callback
, drop_result
));
249 void AttachmentServiceImpl::UploadDone(
250 const AttachmentUploader::UploadResult
& result
,
251 const AttachmentId
& attachment_id
) {
252 DCHECK(CalledOnValidThread());
254 case AttachmentUploader::UPLOAD_SUCCESS
:
255 upload_task_queue_
->MarkAsSucceeded(attachment_id
);
257 delegate_
->OnAttachmentUploaded(attachment_id
);
260 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR
:
261 upload_task_queue_
->MarkAsFailed(attachment_id
);
262 upload_task_queue_
->AddToQueue(attachment_id
);
264 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR
:
265 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
266 upload_task_queue_
->MarkAsFailed(attachment_id
);
271 void AttachmentServiceImpl::DownloadDone(
272 const scoped_refptr
<GetOrDownloadState
>& state
,
273 const AttachmentId
& attachment_id
,
274 const AttachmentDownloader::DownloadResult
& result
,
275 scoped_ptr
<Attachment
> attachment
) {
277 case AttachmentDownloader::DOWNLOAD_SUCCESS
: {
278 AttachmentList attachment_list
;
279 attachment_list
.push_back(*attachment
.get());
280 attachment_store_
->Write(
282 base::Bind(&AttachmentServiceImpl::WriteDone
,
283 weak_ptr_factory_
.GetWeakPtr(), state
, *attachment
.get()));
286 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR
:
287 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR
:
288 state
->AddUnavailableAttachmentId(attachment_id
);
293 void AttachmentServiceImpl::BeginUpload(const AttachmentId
& attachment_id
) {
294 DCHECK(CalledOnValidThread());
295 AttachmentIdList attachment_ids
;
296 attachment_ids
.push_back(attachment_id
);
297 attachment_store_
->Read(attachment_ids
,
298 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload
,
299 weak_ptr_factory_
.GetWeakPtr()));
302 void AttachmentServiceImpl::UploadAttachments(
303 const AttachmentIdSet
& attachment_ids
) {
304 DCHECK(CalledOnValidThread());
305 if (!attachment_uploader_
.get()) {
308 AttachmentIdSet::const_iterator iter
= attachment_ids
.begin();
309 AttachmentIdSet::const_iterator end
= attachment_ids
.end();
310 for (; iter
!= end
; ++iter
) {
311 upload_task_queue_
->AddToQueue(*iter
);
315 void AttachmentServiceImpl::OnNetworkChanged(
316 net::NetworkChangeNotifier::ConnectionType type
) {
317 if (type
!= net::NetworkChangeNotifier::CONNECTION_NONE
) {
318 upload_task_queue_
->ResetBackoff();
322 void AttachmentServiceImpl::ReadDoneNowUpload(
323 const AttachmentStore::Result
& result
,
324 scoped_ptr
<AttachmentMap
> attachments
,
325 scoped_ptr
<AttachmentIdList
> unavailable_attachment_ids
) {
326 DCHECK(CalledOnValidThread());
327 if (!unavailable_attachment_ids
->empty()) {
328 // TODO(maniscalco): We failed to read some attachments. What should we do
330 AttachmentIdList::const_iterator iter
= unavailable_attachment_ids
->begin();
331 AttachmentIdList::const_iterator end
= unavailable_attachment_ids
->end();
332 for (; iter
!= end
; ++iter
) {
333 upload_task_queue_
->Cancel(*iter
);
337 AttachmentMap::const_iterator iter
= attachments
->begin();
338 AttachmentMap::const_iterator end
= attachments
->end();
339 for (; iter
!= end
; ++iter
) {
340 attachment_uploader_
->UploadAttachment(
342 base::Bind(&AttachmentServiceImpl::UploadDone
,
343 weak_ptr_factory_
.GetWeakPtr()));
347 void AttachmentServiceImpl::SetTimerForTest(scoped_ptr
<base::Timer
> timer
) {
348 upload_task_queue_
->SetTimerForTest(timer
.Pass());
351 } // namespace syncer