1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/engine/mcs_client.h"
7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h"
9 #include "base/metrics/histogram.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/time/clock.h"
12 #include "base/time/time.h"
13 #include "google_apis/gcm/base/mcs_util.h"
14 #include "google_apis/gcm/base/socket_stream.h"
15 #include "google_apis/gcm/engine/connection_factory.h"
17 using namespace google::protobuf::io
;
23 typedef scoped_ptr
<google::protobuf::MessageLite
> MCSProto
;
25 // The category of messages intended for the GCM client itself from MCS.
26 const char kMCSCategory
[] = "com.google.android.gsf.gtalkservice";
28 // The from field for messages originating in the GCM client.
29 const char kGCMFromField
[] = "gcm@android.com";
31 // MCS status message types.
32 // TODO(zea): handle these at the GCMClient layer.
33 const char kIdleNotification
[] = "IdleNotification";
34 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
35 // const char kPowerNotification[] = "PowerNotification";
36 // const char kDataActiveNotification[] = "DataActiveNotification";
38 // The number of unacked messages to allow before sending a stream ack.
39 // Applies to both incoming and outgoing messages.
40 // TODO(zea): make this server configurable.
41 const int kUnackedMessageBeforeStreamAck
= 10;
43 // The global maximum number of pending messages to have in the send queue.
44 const size_t kMaxSendQueueSize
= 10 * 1024;
46 // The maximum message size that can be sent to the server.
47 const int kMaxMessageBytes
= 4 * 1024; // 4KB, like the server.
49 // Helper for converting a proto persistent id list to a vector of strings.
50 bool BuildPersistentIdListFromProto(const google::protobuf::string
& bytes
,
51 std::vector
<std::string
>* id_list
) {
52 mcs_proto::SelectiveAck selective_ack
;
53 if (!selective_ack
.ParseFromString(bytes
))
55 std::vector
<std::string
> new_list
;
56 for (int i
= 0; i
< selective_ack
.id_size(); ++i
) {
57 DCHECK(!selective_ack
.id(i
).empty());
58 new_list
.push_back(selective_ack
.id(i
));
60 id_list
->swap(new_list
);
66 struct ReliablePacketInfo
{
68 ~ReliablePacketInfo();
70 // The stream id with which the message was sent.
73 // If reliable delivery was requested, the persistent id of the message.
74 std::string persistent_id
;
76 // The type of message itself (for easier lookup).
79 // The protobuf of the message itself.
83 ReliablePacketInfo::ReliablePacketInfo()
84 : stream_id(0), tag(0) {
86 ReliablePacketInfo::~ReliablePacketInfo() {}
88 MCSClient::MCSClient(base::Clock
* clock
,
89 ConnectionFactory
* connection_factory
,
92 state_(UNINITIALIZED
),
95 connection_factory_(connection_factory
),
96 connection_handler_(NULL
),
97 last_device_to_server_stream_id_received_(0),
98 last_server_to_device_stream_id_received_(0),
101 gcm_store_(gcm_store
),
102 weak_ptr_factory_(this) {
105 MCSClient::~MCSClient() {
108 void MCSClient::Initialize(
109 const ErrorCallback
& error_callback
,
110 const OnMessageReceivedCallback
& message_received_callback
,
111 const OnMessageSentCallback
& message_sent_callback
,
112 scoped_ptr
<GCMStore::LoadResult
> load_result
) {
113 DCHECK_EQ(state_
, UNINITIALIZED
);
116 mcs_error_callback_
= error_callback
;
117 message_received_callback_
= message_received_callback
;
118 message_sent_callback_
= message_sent_callback
;
120 connection_factory_
->Initialize(
121 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest
,
122 weak_ptr_factory_
.GetWeakPtr()),
123 base::Bind(&MCSClient::HandlePacketFromWire
,
124 weak_ptr_factory_
.GetWeakPtr()),
125 base::Bind(&MCSClient::MaybeSendMessage
,
126 weak_ptr_factory_
.GetWeakPtr()));
127 connection_handler_
= connection_factory_
->GetConnectionHandler();
129 stream_id_out_
= 1; // Login request is hardcoded to id 1.
131 android_id_
= load_result
->device_android_id
;
132 security_token_
= load_result
->device_security_token
;
134 if (android_id_
== 0) {
135 DVLOG(1) << "No device credentials found, assuming new client.";
136 // No need to try and load RMQ data in that case.
140 // |android_id_| is non-zero, so should |security_token_|.
141 DCHECK_NE(0u, security_token_
) << "Security token invalid, while android id"
144 DVLOG(1) << "RMQ Load finished with " << load_result
->incoming_messages
.size()
145 << " incoming acks pending and "
146 << load_result
->outgoing_messages
.size()
147 << " outgoing messages pending.";
149 restored_unackeds_server_ids_
= load_result
->incoming_messages
;
151 // First go through and order the outgoing messages by recency.
152 std::map
<uint64
, google::protobuf::MessageLite
*> ordered_messages
;
153 std::vector
<PersistentId
> expired_ttl_ids
;
154 for (GCMStore::OutgoingMessageMap::iterator iter
=
155 load_result
->outgoing_messages
.begin();
156 iter
!= load_result
->outgoing_messages
.end(); ++iter
) {
157 uint64 timestamp
= 0;
158 if (!base::StringToUint64(iter
->first
, ×tamp
)) {
159 LOG(ERROR
) << "Invalid restored message.";
160 // TODO(fgorski): Error: data unreadable
161 mcs_error_callback_
.Run();
165 // Check if the TTL has expired for this message.
166 if (HasTTLExpired(*iter
->second
, clock_
)) {
167 expired_ttl_ids
.push_back(iter
->first
);
168 NotifyMessageSendStatus(*iter
->second
, TTL_EXCEEDED
);
172 ordered_messages
[timestamp
] = iter
->second
.release();
175 if (!expired_ttl_ids
.empty()) {
176 gcm_store_
->RemoveOutgoingMessages(
178 base::Bind(&MCSClient::OnGCMUpdateFinished
,
179 weak_ptr_factory_
.GetWeakPtr()));
182 // Now go through and add the outgoing messages to the send queue in their
183 // appropriate order (oldest at front, most recent at back).
184 for (std::map
<uint64
, google::protobuf::MessageLite
*>::iterator
185 iter
= ordered_messages
.begin();
186 iter
!= ordered_messages
.end(); ++iter
) {
187 ReliablePacketInfo
* packet_info
= new ReliablePacketInfo();
188 packet_info
->protobuf
.reset(iter
->second
);
189 packet_info
->tag
= GetMCSProtoTag(*iter
->second
);
190 packet_info
->persistent_id
= base::Uint64ToString(iter
->first
);
191 to_send_
.push_back(make_linked_ptr(packet_info
));
195 void MCSClient::Login(uint64 android_id
,
196 uint64 security_token
,
197 const std::vector
<int64
>& user_serial_numbers
) {
198 DCHECK_EQ(state_
, LOADED
);
199 DCHECK(android_id_
== 0 || android_id_
== android_id
);
200 DCHECK(security_token_
== 0 || security_token_
== security_token
);
202 if (android_id
!= android_id_
&& security_token
!= security_token_
) {
204 DCHECK(security_token
);
205 android_id_
= android_id
;
206 security_token_
= security_token
;
208 user_serial_numbers_
= user_serial_numbers
;
210 DCHECK(android_id_
!= 0 || restored_unackeds_server_ids_
.empty());
213 connection_factory_
->Connect();
216 void MCSClient::SendMessage(const MCSMessage
& message
) {
217 int ttl
= GetTTL(message
.GetProtobuf());
219 if (to_send_
.size() > kMaxSendQueueSize
) {
220 NotifyMessageSendStatus(message
.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED
);
223 if (message
.size() > kMaxMessageBytes
) {
224 NotifyMessageSendStatus(message
.GetProtobuf(), MESSAGE_TOO_LARGE
);
228 scoped_ptr
<ReliablePacketInfo
> packet_info(new ReliablePacketInfo());
229 packet_info
->tag
= message
.tag();
230 packet_info
->protobuf
= message
.CloneProtobuf();
233 PersistentId persistent_id
= GetNextPersistentId();
234 DVLOG(1) << "Setting persistent id to " << persistent_id
;
235 packet_info
->persistent_id
= persistent_id
;
236 SetPersistentId(persistent_id
,
237 packet_info
->protobuf
.get());
238 if (!gcm_store_
->AddOutgoingMessage(
240 MCSMessage(message
.tag(),
241 *(packet_info
->protobuf
)),
242 base::Bind(&MCSClient::OnGCMUpdateFinished
,
243 weak_ptr_factory_
.GetWeakPtr()))) {
244 NotifyMessageSendStatus(message
.GetProtobuf(),
245 APP_QUEUE_SIZE_LIMIT_REACHED
);
248 } else if (!connection_factory_
->IsEndpointReachable()) {
249 DVLOG(1) << "No active connection, dropping message.";
250 NotifyMessageSendStatus(message
.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL
);
254 to_send_
.push_back(make_linked_ptr(packet_info
.release()));
256 // Notify that the messages has been succsfully queued for sending.
257 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
258 NotifyMessageSendStatus(message
.GetProtobuf(), QUEUED
);
263 void MCSClient::Destroy() {
264 gcm_store_
->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished
,
265 weak_ptr_factory_
.GetWeakPtr()));
268 void MCSClient::ResetStateAndBuildLoginRequest(
269 mcs_proto::LoginRequest
* request
) {
271 DCHECK(security_token_
);
274 last_device_to_server_stream_id_received_
= 0;
275 last_server_to_device_stream_id_received_
= 0;
277 heartbeat_manager_
.Stop();
279 // Add any pending acknowledgments to the list of ids.
280 for (StreamIdToPersistentIdMap::const_iterator iter
=
281 unacked_server_ids_
.begin();
282 iter
!= unacked_server_ids_
.end(); ++iter
) {
283 restored_unackeds_server_ids_
.push_back(iter
->second
);
285 unacked_server_ids_
.clear();
287 // Any acknowledged server ids which have not been confirmed by the server
288 // are treated like unacknowledged ids.
289 for (std::map
<StreamId
, PersistentIdList
>::const_iterator iter
=
290 acked_server_ids_
.begin();
291 iter
!= acked_server_ids_
.end(); ++iter
) {
292 restored_unackeds_server_ids_
.insert(restored_unackeds_server_ids_
.end(),
293 iter
->second
.begin(),
296 acked_server_ids_
.clear();
298 // Then build the request, consuming all pending acknowledgments.
299 request
->Swap(BuildLoginRequest(
300 android_id_
, security_token_
, user_serial_numbers_
).get());
301 for (PersistentIdList::const_iterator iter
=
302 restored_unackeds_server_ids_
.begin();
303 iter
!= restored_unackeds_server_ids_
.end(); ++iter
) {
304 request
->add_received_persistent_id(*iter
);
306 acked_server_ids_
[stream_id_out_
] = restored_unackeds_server_ids_
;
307 restored_unackeds_server_ids_
.clear();
309 // Push all unacknowledged messages to front of send queue. No need to save
310 // to RMQ, as all messages that reach this point should already have been
311 // saved as necessary.
312 while (!to_resend_
.empty()) {
313 to_send_
.push_front(to_resend_
.back());
314 to_resend_
.pop_back();
317 // Drop all TTL == 0 or expired TTL messages from the queue.
318 std::deque
<MCSPacketInternal
> new_to_send
;
319 std::vector
<PersistentId
> expired_ttl_ids
;
320 while (!to_send_
.empty()) {
321 MCSPacketInternal packet
= to_send_
.front();
322 to_send_
.pop_front();
323 if (GetTTL(*packet
->protobuf
) > 0 &&
324 !HasTTLExpired(*packet
->protobuf
, clock_
)) {
325 new_to_send
.push_back(packet
);
327 // If the TTL was 0 there is no persistent id, so no need to remove the
328 // message from the persistent store.
329 if (!packet
->persistent_id
.empty())
330 expired_ttl_ids
.push_back(packet
->persistent_id
);
331 NotifyMessageSendStatus(*packet
->protobuf
, TTL_EXCEEDED
);
335 if (!expired_ttl_ids
.empty()) {
336 DVLOG(1) << "Connection reset, " << expired_ttl_ids
.size()
337 << " messages expired.";
338 gcm_store_
->RemoveOutgoingMessages(
340 base::Bind(&MCSClient::OnGCMUpdateFinished
,
341 weak_ptr_factory_
.GetWeakPtr()));
344 to_send_
.swap(new_to_send
);
346 DVLOG(1) << "Resetting state, with " << request
->received_persistent_id_size()
347 << " incoming acks pending, and " << to_send_
.size()
348 << " pending outgoing messages.";
353 void MCSClient::SendHeartbeat() {
354 SendMessage(MCSMessage(kHeartbeatPingTag
, mcs_proto::HeartbeatPing()));
357 void MCSClient::OnGCMUpdateFinished(bool success
) {
358 LOG_IF(ERROR
, !success
) << "GCM Update failed!";
359 UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success
);
360 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
363 void MCSClient::MaybeSendMessage() {
364 if (to_send_
.empty())
367 // If the connection has been reset, do nothing. On reconnection
368 // MaybeSendMessage will be automatically invoked again.
369 // TODO(zea): consider doing TTL expiration at connection reset time, rather
370 // than reconnect time.
371 if (!connection_factory_
->IsEndpointReachable())
374 MCSPacketInternal packet
= to_send_
.front();
375 to_send_
.pop_front();
376 if (HasTTLExpired(*packet
->protobuf
, clock_
)) {
377 DCHECK(!packet
->persistent_id
.empty());
378 DVLOG(1) << "Dropping expired message " << packet
->persistent_id
<< ".";
379 NotifyMessageSendStatus(*packet
->protobuf
, TTL_EXCEEDED
);
380 gcm_store_
->RemoveOutgoingMessage(
381 packet
->persistent_id
,
382 base::Bind(&MCSClient::OnGCMUpdateFinished
,
383 weak_ptr_factory_
.GetWeakPtr()));
384 base::MessageLoop::current()->PostTask(
386 base::Bind(&MCSClient::MaybeSendMessage
,
387 weak_ptr_factory_
.GetWeakPtr()));
390 DVLOG(1) << "Pending output message found, sending.";
391 if (!packet
->persistent_id
.empty())
392 to_resend_
.push_back(packet
);
393 SendPacketToWire(packet
.get());
396 void MCSClient::SendPacketToWire(ReliablePacketInfo
* packet_info
) {
397 packet_info
->stream_id
= ++stream_id_out_
;
398 DVLOG(1) << "Sending packet of type " << packet_info
->protobuf
->GetTypeName();
400 // Set the queued time as necessary.
401 if (packet_info
->tag
== kDataMessageStanzaTag
) {
402 mcs_proto::DataMessageStanza
* data_message
=
403 reinterpret_cast<mcs_proto::DataMessageStanza
*>(
404 packet_info
->protobuf
.get());
405 uint64 sent
= data_message
->sent();
407 int queued
= (clock_
->Now().ToInternalValue() /
408 base::Time::kMicrosecondsPerSecond
) - sent
;
409 DVLOG(1) << "Message was queued for " << queued
<< " seconds.";
410 data_message
->set_queued(queued
);
413 // Set the proper last received stream id to acknowledge received server
415 DVLOG(1) << "Setting last stream id received to "
417 SetLastStreamIdReceived(stream_id_in_
,
418 packet_info
->protobuf
.get());
419 if (stream_id_in_
!= last_server_to_device_stream_id_received_
) {
420 last_server_to_device_stream_id_received_
= stream_id_in_
;
421 // Mark all acknowledged server messages as such. Note: they're not dropped,
422 // as it may be that they'll need to be re-acked if this message doesn't
424 PersistentIdList persistent_id_list
;
425 for (StreamIdToPersistentIdMap::const_iterator iter
=
426 unacked_server_ids_
.begin();
427 iter
!= unacked_server_ids_
.end(); ++iter
) {
428 DCHECK_LE(iter
->first
, last_server_to_device_stream_id_received_
);
429 persistent_id_list
.push_back(iter
->second
);
431 unacked_server_ids_
.clear();
432 acked_server_ids_
[stream_id_out_
] = persistent_id_list
;
435 connection_handler_
->SendMessage(*packet_info
->protobuf
);
438 void MCSClient::HandleMCSDataMesssage(
439 scoped_ptr
<google::protobuf::MessageLite
> protobuf
) {
440 mcs_proto::DataMessageStanza
* data_message
=
441 reinterpret_cast<mcs_proto::DataMessageStanza
*>(protobuf
.get());
442 // TODO(zea): implement a proper status manager rather than hardcoding these
444 scoped_ptr
<mcs_proto::DataMessageStanza
> response(
445 new mcs_proto::DataMessageStanza());
446 response
->set_from(kGCMFromField
);
447 response
->set_sent(clock_
->Now().ToInternalValue() /
448 base::Time::kMicrosecondsPerSecond
);
449 response
->set_ttl(0);
451 for (int i
= 0; i
< data_message
->app_data_size(); ++i
) {
452 const mcs_proto::AppData
& app_data
= data_message
->app_data(i
);
453 if (app_data
.key() == kIdleNotification
) {
454 // Tell the MCS server the client is not idle.
456 mcs_proto::AppData data
;
457 data
.set_key(kIdleNotification
);
458 data
.set_value("false");
459 response
->add_app_data()->CopyFrom(data
);
460 response
->set_category(kMCSCategory
);
466 MCSMessage(kDataMessageStanzaTag
,
467 response
.PassAs
<const google::protobuf::MessageLite
>()));
471 void MCSClient::HandlePacketFromWire(
472 scoped_ptr
<google::protobuf::MessageLite
> protobuf
) {
475 uint8 tag
= GetMCSProtoTag(*protobuf
);
476 PersistentId persistent_id
= GetPersistentId(*protobuf
);
477 StreamId last_stream_id_received
= GetLastStreamIdReceived(*protobuf
);
479 if (last_stream_id_received
!= 0) {
480 last_device_to_server_stream_id_received_
= last_stream_id_received
;
482 // Process device to server messages that have now been acknowledged by the
483 // server. Because messages are stored in order, just pop off all that have
484 // a stream id lower than server's last received stream id.
485 HandleStreamAck(last_stream_id_received
);
487 // Process server_to_device_messages that the server now knows were
488 // acknowledged. Again, they're in order, so just keep going until the
489 // stream id is reached.
490 StreamIdList acked_stream_ids_to_remove
;
491 for (std::map
<StreamId
, PersistentIdList
>::iterator iter
=
492 acked_server_ids_
.begin();
493 iter
!= acked_server_ids_
.end() &&
494 iter
->first
<= last_stream_id_received
; ++iter
) {
495 acked_stream_ids_to_remove
.push_back(iter
->first
);
497 for (StreamIdList::iterator iter
= acked_stream_ids_to_remove
.begin();
498 iter
!= acked_stream_ids_to_remove
.end(); ++iter
) {
499 acked_server_ids_
.erase(*iter
);
504 if (!persistent_id
.empty()) {
505 unacked_server_ids_
[stream_id_in_
] = persistent_id
;
506 gcm_store_
->AddIncomingMessage(persistent_id
,
507 base::Bind(&MCSClient::OnGCMUpdateFinished
,
508 weak_ptr_factory_
.GetWeakPtr()));
511 DVLOG(1) << "Received message of type " << protobuf
->GetTypeName()
512 << " with persistent id "
513 << (persistent_id
.empty() ? "NULL" : persistent_id
)
514 << ", stream id " << stream_id_in_
<< " and last stream id received "
515 << last_stream_id_received
;
517 if (unacked_server_ids_
.size() > 0 &&
518 unacked_server_ids_
.size() % kUnackedMessageBeforeStreamAck
== 0) {
519 SendMessage(MCSMessage(kIqStanzaTag
,
521 PassAs
<const google::protobuf::MessageLite
>()));
524 // The connection is alive, treat this message as a heartbeat ack.
525 heartbeat_manager_
.OnHeartbeatAcked();
528 case kLoginResponseTag
: {
529 DCHECK_EQ(CONNECTING
, state_
);
530 mcs_proto::LoginResponse
* login_response
=
531 reinterpret_cast<mcs_proto::LoginResponse
*>(protobuf
.get());
532 DVLOG(1) << "Received login response:";
533 DVLOG(1) << " Id: " << login_response
->id();
534 DVLOG(1) << " Timestamp: " << login_response
->server_timestamp();
535 if (login_response
->has_error() && login_response
->error().code() != 0) {
536 state_
= UNINITIALIZED
;
537 DVLOG(1) << " Error code: " << login_response
->error().code();
538 DVLOG(1) << " Error message: " << login_response
->error().message();
539 mcs_error_callback_
.Run();
543 if (login_response
->has_heartbeat_config()) {
544 heartbeat_manager_
.UpdateHeartbeatConfig(
545 login_response
->heartbeat_config());
549 stream_id_in_
= 1; // To account for the login response.
550 DCHECK_EQ(1U, stream_id_out_
);
552 // Pass the login response on up.
553 base::MessageLoop::current()->PostTask(
555 base::Bind(message_received_callback_
,
558 const google::protobuf::MessageLite
>())));
560 // If there are pending messages, attempt to send one.
561 if (!to_send_
.empty()) {
562 base::MessageLoop::current()->PostTask(
564 base::Bind(&MCSClient::MaybeSendMessage
,
565 weak_ptr_factory_
.GetWeakPtr()));
568 heartbeat_manager_
.Start(
569 base::Bind(&MCSClient::SendHeartbeat
,
570 weak_ptr_factory_
.GetWeakPtr()),
571 base::Bind(&MCSClient::OnConnectionResetByHeartbeat
,
572 weak_ptr_factory_
.GetWeakPtr()));
575 case kHeartbeatPingTag
:
576 DCHECK_GE(stream_id_in_
, 1U);
577 DVLOG(1) << "Received heartbeat ping, sending ack.";
579 MCSMessage(kHeartbeatAckTag
, mcs_proto::HeartbeatAck()));
581 case kHeartbeatAckTag
:
582 DCHECK_GE(stream_id_in_
, 1U);
583 DVLOG(1) << "Received heartbeat ack.";
584 // Do nothing else, all messages act as heartbeat acks.
587 LOG(ERROR
) << "Received close command, resetting connection.";
589 connection_factory_
->SignalConnectionReset();
592 DCHECK_GE(stream_id_in_
, 1U);
593 mcs_proto::IqStanza
* iq_stanza
=
594 reinterpret_cast<mcs_proto::IqStanza
*>(protobuf
.get());
595 const mcs_proto::Extension
& iq_extension
= iq_stanza
->extension();
596 switch (iq_extension
.id()) {
597 case kSelectiveAck
: {
598 PersistentIdList acked_ids
;
599 if (BuildPersistentIdListFromProto(iq_extension
.data(),
601 HandleSelectiveAck(acked_ids
);
606 // Do nothing. The last received stream id is always processed if it's
610 LOG(WARNING
) << "Received invalid iq stanza extension "
611 << iq_extension
.id();
615 case kDataMessageStanzaTag
: {
616 DCHECK_GE(stream_id_in_
, 1U);
617 mcs_proto::DataMessageStanza
* data_message
=
618 reinterpret_cast<mcs_proto::DataMessageStanza
*>(protobuf
.get());
619 if (data_message
->category() == kMCSCategory
) {
620 HandleMCSDataMesssage(protobuf
.Pass());
624 DCHECK(protobuf
.get());
625 base::MessageLoop::current()->PostTask(
627 base::Bind(message_received_callback_
,
630 const google::protobuf::MessageLite
>())));
634 LOG(ERROR
) << "Received unexpected message of type "
635 << static_cast<int>(tag
);
640 void MCSClient::HandleStreamAck(StreamId last_stream_id_received
) {
641 PersistentIdList acked_outgoing_persistent_ids
;
642 StreamIdList acked_outgoing_stream_ids
;
643 while (!to_resend_
.empty() &&
644 to_resend_
.front()->stream_id
<= last_stream_id_received
) {
645 const MCSPacketInternal
& outgoing_packet
= to_resend_
.front();
646 acked_outgoing_persistent_ids
.push_back(outgoing_packet
->persistent_id
);
647 acked_outgoing_stream_ids
.push_back(outgoing_packet
->stream_id
);
648 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
649 to_resend_
.pop_front();
652 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids
.size()
653 << " outgoing messages, " << to_resend_
.size()
654 << " remaining unacked";
655 gcm_store_
->RemoveOutgoingMessages(
656 acked_outgoing_persistent_ids
,
657 base::Bind(&MCSClient::OnGCMUpdateFinished
,
658 weak_ptr_factory_
.GetWeakPtr()));
660 HandleServerConfirmedReceipt(last_stream_id_received
);
663 void MCSClient::HandleSelectiveAck(const PersistentIdList
& id_list
) {
664 // First check the to_resend_ queue. Acknowledgments should always happen
665 // in the order they were sent, so if messages are present they should match
666 // the acknowledge list.
667 PersistentIdList::const_iterator iter
= id_list
.begin();
668 for (; iter
!= id_list
.end() && !to_resend_
.empty(); ++iter
) {
669 const MCSPacketInternal
& outgoing_packet
= to_resend_
.front();
670 DCHECK_EQ(outgoing_packet
->persistent_id
, *iter
);
671 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
673 // No need to re-acknowledge any server messages this message already
675 StreamId device_stream_id
= outgoing_packet
->stream_id
;
676 HandleServerConfirmedReceipt(device_stream_id
);
678 to_resend_
.pop_front();
681 // If the acknowledged ids aren't all there, they might be in the to_send_
682 // queue (typically when a StreamAck confirms messages as part of a login
684 for (; iter
!= id_list
.end() && !to_send_
.empty(); ++iter
) {
685 const MCSPacketInternal
& outgoing_packet
= to_send_
.front();
686 DCHECK_EQ(outgoing_packet
->persistent_id
, *iter
);
687 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
689 // No need to re-acknowledge any server messages this message already
691 StreamId device_stream_id
= outgoing_packet
->stream_id
;
692 HandleServerConfirmedReceipt(device_stream_id
);
694 to_send_
.pop_front();
697 DCHECK(iter
== id_list
.end());
699 DVLOG(1) << "Server acked " << id_list
.size()
700 << " messages, " << to_resend_
.size() << " remaining unacked.";
701 gcm_store_
->RemoveOutgoingMessages(
703 base::Bind(&MCSClient::OnGCMUpdateFinished
,
704 weak_ptr_factory_
.GetWeakPtr()));
706 // Resend any remaining outgoing messages, as they were not received by the
708 DVLOG(1) << "Resending " << to_resend_
.size() << " messages.";
709 while (!to_resend_
.empty()) {
710 to_send_
.push_front(to_resend_
.back());
711 to_resend_
.pop_back();
715 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id
) {
716 PersistentIdList acked_incoming_ids
;
717 for (std::map
<StreamId
, PersistentIdList
>::iterator iter
=
718 acked_server_ids_
.begin();
719 iter
!= acked_server_ids_
.end() &&
720 iter
->first
<= device_stream_id
;) {
721 acked_incoming_ids
.insert(acked_incoming_ids
.end(),
722 iter
->second
.begin(),
724 acked_server_ids_
.erase(iter
++);
727 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids
.size()
728 << " acknowledged server messages.";
729 gcm_store_
->RemoveIncomingMessages(
731 base::Bind(&MCSClient::OnGCMUpdateFinished
,
732 weak_ptr_factory_
.GetWeakPtr()));
735 MCSClient::PersistentId
MCSClient::GetNextPersistentId() {
736 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
739 void MCSClient::OnConnectionResetByHeartbeat() {
740 connection_factory_
->SignalConnectionReset();
743 void MCSClient::NotifyMessageSendStatus(
744 const google::protobuf::MessageLite
& protobuf
,
745 MessageSendStatus status
) {
746 if (GetMCSProtoTag(protobuf
) != kDataMessageStanzaTag
)
749 const mcs_proto::DataMessageStanza
* data_message_stanza
=
750 reinterpret_cast<const mcs_proto::DataMessageStanza
*>(&protobuf
);
751 message_sent_callback_
.Run(
752 data_message_stanza
->device_user_id(),
753 data_message_stanza
->category(),
754 data_message_stanza
->id(),
758 void MCSClient::SetGCMStoreForTesting(GCMStore
* gcm_store
) {
759 gcm_store_
= gcm_store
;