1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/engine/mcs_client.h"
9 #include "base/basictypes.h"
10 #include "base/message_loop/message_loop.h"
11 #include "base/metrics/histogram.h"
12 #include "base/strings/string_number_conversions.h"
13 #include "base/time/clock.h"
14 #include "base/time/time.h"
15 #include "google_apis/gcm/base/mcs_util.h"
16 #include "google_apis/gcm/base/socket_stream.h"
17 #include "google_apis/gcm/engine/connection_factory.h"
18 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h"
20 using namespace google::protobuf::io
;
26 typedef scoped_ptr
<google::protobuf::MessageLite
> MCSProto
;
28 // The category of messages intended for the GCM client itself from MCS.
29 const char kMCSCategory
[] = "com.google.android.gsf.gtalkservice";
31 // The from field for messages originating in the GCM client.
32 const char kGCMFromField
[] = "gcm@android.com";
34 // MCS status message types.
35 // TODO(zea): handle these at the GCMClient layer.
36 const char kIdleNotification
[] = "IdleNotification";
37 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
38 // const char kPowerNotification[] = "PowerNotification";
39 // const char kDataActiveNotification[] = "DataActiveNotification";
41 // The number of unacked messages to allow before sending a stream ack.
42 // Applies to both incoming and outgoing messages.
43 // TODO(zea): make this server configurable.
44 const int kUnackedMessageBeforeStreamAck
= 10;
46 // The global maximum number of pending messages to have in the send queue.
47 const size_t kMaxSendQueueSize
= 10 * 1024;
49 // The maximum message size that can be sent to the server.
50 const int kMaxMessageBytes
= 4 * 1024; // 4KB, like the server.
52 // Helper for converting a proto persistent id list to a vector of strings.
53 bool BuildPersistentIdListFromProto(const google::protobuf::string
& bytes
,
54 std::vector
<std::string
>* id_list
) {
55 mcs_proto::SelectiveAck selective_ack
;
56 if (!selective_ack
.ParseFromString(bytes
))
58 std::vector
<std::string
> new_list
;
59 for (int i
= 0; i
< selective_ack
.id_size(); ++i
) {
60 DCHECK(!selective_ack
.id(i
).empty());
61 new_list
.push_back(selective_ack
.id(i
));
63 id_list
->swap(new_list
);
71 explicit CollapseKey(const mcs_proto::DataMessageStanza
& message
);
74 // Comparison operator for use in maps.
75 bool operator<(const CollapseKey
& right
) const;
77 // Whether the message had a valid collapse key.
80 std::string
token() const { return token_
; }
81 std::string
app_id() const { return app_id_
; }
82 int64
device_user_id() const { return device_user_id_
; }
85 const std::string token_
;
86 const std::string app_id_
;
87 const int64 device_user_id_
;
90 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza
& message
)
91 : token_(message
.token()),
92 app_id_(message
.category()),
93 device_user_id_(message
.device_user_id()) {}
95 CollapseKey::~CollapseKey() {}
97 bool CollapseKey::IsValid() const {
98 // Device user id is optional, but the application id and token are not.
99 return !token_
.empty() && !app_id_
.empty();
102 bool CollapseKey::operator<(const CollapseKey
& right
) const {
103 if (device_user_id_
!= right
.device_user_id())
104 return device_user_id_
< right
.device_user_id();
105 if (app_id_
!= right
.app_id())
106 return app_id_
< right
.app_id();
107 return token_
< right
.token();
110 struct ReliablePacketInfo
{
111 ReliablePacketInfo();
112 ~ReliablePacketInfo();
114 // The stream id with which the message was sent.
117 // If reliable delivery was requested, the persistent id of the message.
118 std::string persistent_id
;
120 // The type of message itself (for easier lookup).
123 // The protobuf of the message itself.
127 ReliablePacketInfo::ReliablePacketInfo()
128 : stream_id(0), tag(0) {
130 ReliablePacketInfo::~ReliablePacketInfo() {}
132 int MCSClient::GetSendQueueSize() const {
133 return to_send_
.size();
136 int MCSClient::GetResendQueueSize() const {
137 return to_resend_
.size();
140 std::string
MCSClient::GetStateString() const {
143 return "UNINITIALIZED";
152 return std::string();
156 MCSClient::MCSClient(const std::string
& version_string
,
158 ConnectionFactory
* connection_factory
,
160 GCMStatsRecorder
* recorder
)
161 : version_string_(version_string
),
163 state_(UNINITIALIZED
),
166 connection_factory_(connection_factory
),
167 connection_handler_(NULL
),
168 last_device_to_server_stream_id_received_(0),
169 last_server_to_device_stream_id_received_(0),
172 gcm_store_(gcm_store
),
174 weak_ptr_factory_(this) {
177 MCSClient::~MCSClient() {
180 void MCSClient::Initialize(
181 const ErrorCallback
& error_callback
,
182 const OnMessageReceivedCallback
& message_received_callback
,
183 const OnMessageSentCallback
& message_sent_callback
,
184 scoped_ptr
<GCMStore::LoadResult
> load_result
) {
185 DCHECK_EQ(state_
, UNINITIALIZED
);
188 mcs_error_callback_
= error_callback
;
189 message_received_callback_
= message_received_callback
;
190 message_sent_callback_
= message_sent_callback
;
192 connection_factory_
->Initialize(
193 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest
,
194 weak_ptr_factory_
.GetWeakPtr()),
195 base::Bind(&MCSClient::HandlePacketFromWire
,
196 weak_ptr_factory_
.GetWeakPtr()),
197 base::Bind(&MCSClient::MaybeSendMessage
,
198 weak_ptr_factory_
.GetWeakPtr()));
199 connection_handler_
= connection_factory_
->GetConnectionHandler();
201 stream_id_out_
= 1; // Login request is hardcoded to id 1.
203 android_id_
= load_result
->device_android_id
;
204 security_token_
= load_result
->device_security_token
;
206 if (android_id_
== 0) {
207 DVLOG(1) << "No device credentials found, assuming new client.";
208 // No need to try and load RMQ data in that case.
212 // |android_id_| is non-zero, so should |security_token_|.
213 DCHECK_NE(0u, security_token_
) << "Security token invalid, while android id"
216 DVLOG(1) << "RMQ Load finished with " << load_result
->incoming_messages
.size()
217 << " incoming acks pending and "
218 << load_result
->outgoing_messages
.size()
219 << " outgoing messages pending.";
221 restored_unackeds_server_ids_
= load_result
->incoming_messages
;
223 // First go through and order the outgoing messages by recency.
224 std::map
<uint64
, google::protobuf::MessageLite
*> ordered_messages
;
225 std::vector
<PersistentId
> expired_ttl_ids
;
226 for (GCMStore::OutgoingMessageMap::iterator iter
=
227 load_result
->outgoing_messages
.begin();
228 iter
!= load_result
->outgoing_messages
.end(); ++iter
) {
229 uint64 timestamp
= 0;
230 if (!base::StringToUint64(iter
->first
, ×tamp
)) {
231 LOG(ERROR
) << "Invalid restored message.";
232 // TODO(fgorski): Error: data unreadable
233 mcs_error_callback_
.Run();
237 // Check if the TTL has expired for this message.
238 if (HasTTLExpired(*iter
->second
, clock_
)) {
239 expired_ttl_ids
.push_back(iter
->first
);
240 NotifyMessageSendStatus(*iter
->second
, TTL_EXCEEDED
);
244 ordered_messages
[timestamp
] = iter
->second
.release();
247 if (!expired_ttl_ids
.empty()) {
248 gcm_store_
->RemoveOutgoingMessages(
250 base::Bind(&MCSClient::OnGCMUpdateFinished
,
251 weak_ptr_factory_
.GetWeakPtr()));
254 // Now go through and add the outgoing messages to the send queue in their
255 // appropriate order (oldest at front, most recent at back).
256 for (std::map
<uint64
, google::protobuf::MessageLite
*>::iterator
257 iter
= ordered_messages
.begin();
258 iter
!= ordered_messages
.end(); ++iter
) {
259 ReliablePacketInfo
* packet_info
= new ReliablePacketInfo();
260 packet_info
->protobuf
.reset(iter
->second
);
261 packet_info
->tag
= GetMCSProtoTag(*iter
->second
);
262 packet_info
->persistent_id
= base::Uint64ToString(iter
->first
);
263 to_send_
.push_back(make_linked_ptr(packet_info
));
265 if (packet_info
->tag
== kDataMessageStanzaTag
) {
266 mcs_proto::DataMessageStanza
* data_message
=
267 reinterpret_cast<mcs_proto::DataMessageStanza
*>(
268 packet_info
->protobuf
.get());
269 CollapseKey
collapse_key(*data_message
);
270 if (collapse_key
.IsValid())
271 collapse_key_map_
[collapse_key
] = packet_info
;
276 void MCSClient::Login(uint64 android_id
, uint64 security_token
) {
277 DCHECK_EQ(state_
, LOADED
);
278 DCHECK(android_id_
== 0 || android_id_
== android_id
);
279 DCHECK(security_token_
== 0 || security_token_
== security_token
);
281 if (android_id
!= android_id_
&& security_token
!= security_token_
) {
283 DCHECK(security_token
);
284 android_id_
= android_id
;
285 security_token_
= security_token
;
288 DCHECK(android_id_
!= 0 || restored_unackeds_server_ids_
.empty());
291 connection_factory_
->Connect();
294 void MCSClient::SendMessage(const MCSMessage
& message
) {
295 int ttl
= GetTTL(message
.GetProtobuf());
297 if (to_send_
.size() > kMaxSendQueueSize
) {
298 NotifyMessageSendStatus(message
.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED
);
301 if (message
.size() > kMaxMessageBytes
) {
302 NotifyMessageSendStatus(message
.GetProtobuf(), MESSAGE_TOO_LARGE
);
306 scoped_ptr
<ReliablePacketInfo
> packet_info(new ReliablePacketInfo());
307 packet_info
->tag
= message
.tag();
308 packet_info
->protobuf
= message
.CloneProtobuf();
311 DCHECK_EQ(message
.tag(), kDataMessageStanzaTag
);
313 // First check if this message should replace a pending message with the
314 // same collapse key.
315 mcs_proto::DataMessageStanza
* data_message
=
316 reinterpret_cast<mcs_proto::DataMessageStanza
*>(
317 packet_info
->protobuf
.get());
318 CollapseKey
collapse_key(*data_message
);
319 if (collapse_key
.IsValid() && collapse_key_map_
.count(collapse_key
) > 0) {
320 ReliablePacketInfo
* original_packet
= collapse_key_map_
[collapse_key
];
321 DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
322 << original_packet
->persistent_id
;
323 original_packet
->protobuf
= packet_info
->protobuf
.Pass();
324 SetPersistentId(original_packet
->persistent_id
,
325 original_packet
->protobuf
.get());
326 gcm_store_
->OverwriteOutgoingMessage(
327 original_packet
->persistent_id
,
329 base::Bind(&MCSClient::OnGCMUpdateFinished
,
330 weak_ptr_factory_
.GetWeakPtr()));
332 // The message is already queued, return.
335 PersistentId persistent_id
= GetNextPersistentId();
336 DVLOG(1) << "Setting persistent id to " << persistent_id
;
337 packet_info
->persistent_id
= persistent_id
;
338 SetPersistentId(persistent_id
, packet_info
->protobuf
.get());
339 if (!gcm_store_
->AddOutgoingMessage(
341 MCSMessage(message
.tag(), *(packet_info
->protobuf
)),
342 base::Bind(&MCSClient::OnGCMUpdateFinished
,
343 weak_ptr_factory_
.GetWeakPtr()))) {
344 NotifyMessageSendStatus(message
.GetProtobuf(),
345 APP_QUEUE_SIZE_LIMIT_REACHED
);
350 if (collapse_key
.IsValid())
351 collapse_key_map_
[collapse_key
] = packet_info
.get();
352 } else if (!connection_factory_
->IsEndpointReachable()) {
353 DVLOG(1) << "No active connection, dropping message.";
354 NotifyMessageSendStatus(message
.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL
);
358 to_send_
.push_back(make_linked_ptr(packet_info
.release()));
360 // Notify that the messages has been succsfully queued for sending.
361 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
362 NotifyMessageSendStatus(message
.GetProtobuf(), QUEUED
);
367 void MCSClient::ResetStateAndBuildLoginRequest(
368 mcs_proto::LoginRequest
* request
) {
370 DCHECK(security_token_
);
373 last_device_to_server_stream_id_received_
= 0;
374 last_server_to_device_stream_id_received_
= 0;
376 heartbeat_manager_
.Stop();
378 // Add any pending acknowledgments to the list of ids.
379 for (StreamIdToPersistentIdMap::const_iterator iter
=
380 unacked_server_ids_
.begin();
381 iter
!= unacked_server_ids_
.end(); ++iter
) {
382 restored_unackeds_server_ids_
.push_back(iter
->second
);
384 unacked_server_ids_
.clear();
386 // Any acknowledged server ids which have not been confirmed by the server
387 // are treated like unacknowledged ids.
388 for (std::map
<StreamId
, PersistentIdList
>::const_iterator iter
=
389 acked_server_ids_
.begin();
390 iter
!= acked_server_ids_
.end(); ++iter
) {
391 restored_unackeds_server_ids_
.insert(restored_unackeds_server_ids_
.end(),
392 iter
->second
.begin(),
395 acked_server_ids_
.clear();
397 // Then build the request, consuming all pending acknowledgments.
398 request
->Swap(BuildLoginRequest(android_id_
,
400 version_string_
).get());
401 for (PersistentIdList::const_iterator iter
=
402 restored_unackeds_server_ids_
.begin();
403 iter
!= restored_unackeds_server_ids_
.end(); ++iter
) {
404 request
->add_received_persistent_id(*iter
);
406 acked_server_ids_
[stream_id_out_
] = restored_unackeds_server_ids_
;
407 restored_unackeds_server_ids_
.clear();
409 // Push all unacknowledged messages to front of send queue. No need to save
410 // to RMQ, as all messages that reach this point should already have been
411 // saved as necessary.
412 while (!to_resend_
.empty()) {
413 to_send_
.push_front(to_resend_
.back());
414 to_resend_
.pop_back();
417 // Drop all TTL == 0 or expired TTL messages from the queue.
418 std::deque
<MCSPacketInternal
> new_to_send
;
419 std::vector
<PersistentId
> expired_ttl_ids
;
420 while (!to_send_
.empty()) {
421 MCSPacketInternal packet
= PopMessageForSend();
422 if (GetTTL(*packet
->protobuf
) > 0 &&
423 !HasTTLExpired(*packet
->protobuf
, clock_
)) {
424 new_to_send
.push_back(packet
);
426 // If the TTL was 0 there is no persistent id, so no need to remove the
427 // message from the persistent store.
428 if (!packet
->persistent_id
.empty())
429 expired_ttl_ids
.push_back(packet
->persistent_id
);
430 NotifyMessageSendStatus(*packet
->protobuf
, TTL_EXCEEDED
);
434 if (!expired_ttl_ids
.empty()) {
435 DVLOG(1) << "Connection reset, " << expired_ttl_ids
.size()
436 << " messages expired.";
437 gcm_store_
->RemoveOutgoingMessages(
439 base::Bind(&MCSClient::OnGCMUpdateFinished
,
440 weak_ptr_factory_
.GetWeakPtr()));
443 to_send_
.swap(new_to_send
);
445 DVLOG(1) << "Resetting state, with " << request
->received_persistent_id_size()
446 << " incoming acks pending, and " << to_send_
.size()
447 << " pending outgoing messages.";
452 void MCSClient::SendHeartbeat() {
453 SendMessage(MCSMessage(kHeartbeatPingTag
, mcs_proto::HeartbeatPing()));
456 void MCSClient::OnGCMUpdateFinished(bool success
) {
457 LOG_IF(ERROR
, !success
) << "GCM Update failed!";
458 UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success
);
459 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
462 void MCSClient::MaybeSendMessage() {
463 if (to_send_
.empty())
466 // If the connection has been reset, do nothing. On reconnection
467 // MaybeSendMessage will be automatically invoked again.
468 // TODO(zea): consider doing TTL expiration at connection reset time, rather
469 // than reconnect time.
470 if (!connection_factory_
->IsEndpointReachable())
473 MCSPacketInternal packet
= PopMessageForSend();
474 if (HasTTLExpired(*packet
->protobuf
, clock_
)) {
475 DCHECK(!packet
->persistent_id
.empty());
476 DVLOG(1) << "Dropping expired message " << packet
->persistent_id
<< ".";
477 NotifyMessageSendStatus(*packet
->protobuf
, TTL_EXCEEDED
);
478 gcm_store_
->RemoveOutgoingMessage(
479 packet
->persistent_id
,
480 base::Bind(&MCSClient::OnGCMUpdateFinished
,
481 weak_ptr_factory_
.GetWeakPtr()));
482 base::MessageLoop::current()->PostTask(
484 base::Bind(&MCSClient::MaybeSendMessage
,
485 weak_ptr_factory_
.GetWeakPtr()));
488 DVLOG(1) << "Pending output message found, sending.";
489 if (!packet
->persistent_id
.empty())
490 to_resend_
.push_back(packet
);
491 SendPacketToWire(packet
.get());
494 void MCSClient::SendPacketToWire(ReliablePacketInfo
* packet_info
) {
495 packet_info
->stream_id
= ++stream_id_out_
;
496 DVLOG(1) << "Sending packet of type " << packet_info
->protobuf
->GetTypeName();
498 // Set the queued time as necessary.
499 if (packet_info
->tag
== kDataMessageStanzaTag
) {
500 mcs_proto::DataMessageStanza
* data_message
=
501 reinterpret_cast<mcs_proto::DataMessageStanza
*>(
502 packet_info
->protobuf
.get());
503 uint64 sent
= data_message
->sent();
505 int queued
= (clock_
->Now().ToInternalValue() /
506 base::Time::kMicrosecondsPerSecond
) - sent
;
507 DVLOG(1) << "Message was queued for " << queued
<< " seconds.";
508 data_message
->set_queued(queued
);
509 recorder_
->RecordDataSentToWire(
510 data_message
->category(),
516 // Set the proper last received stream id to acknowledge received server
518 DVLOG(1) << "Setting last stream id received to "
520 SetLastStreamIdReceived(stream_id_in_
,
521 packet_info
->protobuf
.get());
522 if (stream_id_in_
!= last_server_to_device_stream_id_received_
) {
523 last_server_to_device_stream_id_received_
= stream_id_in_
;
524 // Mark all acknowledged server messages as such. Note: they're not dropped,
525 // as it may be that they'll need to be re-acked if this message doesn't
527 PersistentIdList persistent_id_list
;
528 for (StreamIdToPersistentIdMap::const_iterator iter
=
529 unacked_server_ids_
.begin();
530 iter
!= unacked_server_ids_
.end(); ++iter
) {
531 DCHECK_LE(iter
->first
, last_server_to_device_stream_id_received_
);
532 persistent_id_list
.push_back(iter
->second
);
534 unacked_server_ids_
.clear();
535 acked_server_ids_
[stream_id_out_
] = persistent_id_list
;
538 connection_handler_
->SendMessage(*packet_info
->protobuf
);
541 void MCSClient::HandleMCSDataMesssage(
542 scoped_ptr
<google::protobuf::MessageLite
> protobuf
) {
543 mcs_proto::DataMessageStanza
* data_message
=
544 reinterpret_cast<mcs_proto::DataMessageStanza
*>(protobuf
.get());
545 // TODO(zea): implement a proper status manager rather than hardcoding these
547 scoped_ptr
<mcs_proto::DataMessageStanza
> response(
548 new mcs_proto::DataMessageStanza());
549 response
->set_from(kGCMFromField
);
550 response
->set_sent(clock_
->Now().ToInternalValue() /
551 base::Time::kMicrosecondsPerSecond
);
552 response
->set_ttl(0);
554 for (int i
= 0; i
< data_message
->app_data_size(); ++i
) {
555 const mcs_proto::AppData
& app_data
= data_message
->app_data(i
);
556 if (app_data
.key() == kIdleNotification
) {
557 // Tell the MCS server the client is not idle.
559 mcs_proto::AppData data
;
560 data
.set_key(kIdleNotification
);
561 data
.set_value("false");
562 response
->add_app_data()->CopyFrom(data
);
563 response
->set_category(kMCSCategory
);
569 MCSMessage(kDataMessageStanzaTag
,
570 response
.PassAs
<const google::protobuf::MessageLite
>()));
574 void MCSClient::HandlePacketFromWire(
575 scoped_ptr
<google::protobuf::MessageLite
> protobuf
) {
578 uint8 tag
= GetMCSProtoTag(*protobuf
);
579 PersistentId persistent_id
= GetPersistentId(*protobuf
);
580 StreamId last_stream_id_received
= GetLastStreamIdReceived(*protobuf
);
582 if (last_stream_id_received
!= 0) {
583 last_device_to_server_stream_id_received_
= last_stream_id_received
;
585 // Process device to server messages that have now been acknowledged by the
586 // server. Because messages are stored in order, just pop off all that have
587 // a stream id lower than server's last received stream id.
588 HandleStreamAck(last_stream_id_received
);
590 // Process server_to_device_messages that the server now knows were
591 // acknowledged. Again, they're in order, so just keep going until the
592 // stream id is reached.
593 StreamIdList acked_stream_ids_to_remove
;
594 for (std::map
<StreamId
, PersistentIdList
>::iterator iter
=
595 acked_server_ids_
.begin();
596 iter
!= acked_server_ids_
.end() &&
597 iter
->first
<= last_stream_id_received
; ++iter
) {
598 acked_stream_ids_to_remove
.push_back(iter
->first
);
600 for (StreamIdList::iterator iter
= acked_stream_ids_to_remove
.begin();
601 iter
!= acked_stream_ids_to_remove
.end(); ++iter
) {
602 acked_server_ids_
.erase(*iter
);
607 if (!persistent_id
.empty()) {
608 unacked_server_ids_
[stream_id_in_
] = persistent_id
;
609 gcm_store_
->AddIncomingMessage(persistent_id
,
610 base::Bind(&MCSClient::OnGCMUpdateFinished
,
611 weak_ptr_factory_
.GetWeakPtr()));
614 DVLOG(1) << "Received message of type " << protobuf
->GetTypeName()
615 << " with persistent id "
616 << (persistent_id
.empty() ? "NULL" : persistent_id
)
617 << ", stream id " << stream_id_in_
<< " and last stream id received "
618 << last_stream_id_received
;
620 if (unacked_server_ids_
.size() > 0 &&
621 unacked_server_ids_
.size() % kUnackedMessageBeforeStreamAck
== 0) {
622 SendMessage(MCSMessage(kIqStanzaTag
,
624 PassAs
<const google::protobuf::MessageLite
>()));
627 // The connection is alive, treat this message as a heartbeat ack.
628 heartbeat_manager_
.OnHeartbeatAcked();
631 case kLoginResponseTag
: {
632 DCHECK_EQ(CONNECTING
, state_
);
633 mcs_proto::LoginResponse
* login_response
=
634 reinterpret_cast<mcs_proto::LoginResponse
*>(protobuf
.get());
635 DVLOG(1) << "Received login response:";
636 DVLOG(1) << " Id: " << login_response
->id();
637 DVLOG(1) << " Timestamp: " << login_response
->server_timestamp();
638 if (login_response
->has_error() && login_response
->error().code() != 0) {
639 state_
= UNINITIALIZED
;
640 DVLOG(1) << " Error code: " << login_response
->error().code();
641 DVLOG(1) << " Error message: " << login_response
->error().message();
642 LOG(ERROR
) << "Failed to log in to GCM, resetting connection.";
643 connection_factory_
->SignalConnectionReset(
644 ConnectionFactory::LOGIN_FAILURE
);
645 mcs_error_callback_
.Run();
649 if (login_response
->has_heartbeat_config()) {
650 heartbeat_manager_
.UpdateHeartbeatConfig(
651 login_response
->heartbeat_config());
655 stream_id_in_
= 1; // To account for the login response.
656 DCHECK_EQ(1U, stream_id_out_
);
658 // Pass the login response on up.
659 base::MessageLoop::current()->PostTask(
661 base::Bind(message_received_callback_
,
664 const google::protobuf::MessageLite
>())));
666 // If there are pending messages, attempt to send one.
667 if (!to_send_
.empty()) {
668 base::MessageLoop::current()->PostTask(
670 base::Bind(&MCSClient::MaybeSendMessage
,
671 weak_ptr_factory_
.GetWeakPtr()));
674 heartbeat_manager_
.Start(
675 base::Bind(&MCSClient::SendHeartbeat
,
676 weak_ptr_factory_
.GetWeakPtr()),
677 base::Bind(&MCSClient::OnConnectionResetByHeartbeat
,
678 weak_ptr_factory_
.GetWeakPtr()));
681 case kHeartbeatPingTag
:
682 DCHECK_GE(stream_id_in_
, 1U);
683 DVLOG(1) << "Received heartbeat ping, sending ack.";
685 MCSMessage(kHeartbeatAckTag
, mcs_proto::HeartbeatAck()));
687 case kHeartbeatAckTag
:
688 DCHECK_GE(stream_id_in_
, 1U);
689 DVLOG(1) << "Received heartbeat ack.";
690 // Do nothing else, all messages act as heartbeat acks.
693 LOG(ERROR
) << "Received close command, resetting connection.";
695 connection_factory_
->SignalConnectionReset(
696 ConnectionFactory::CLOSE_COMMAND
);
699 DCHECK_GE(stream_id_in_
, 1U);
700 mcs_proto::IqStanza
* iq_stanza
=
701 reinterpret_cast<mcs_proto::IqStanza
*>(protobuf
.get());
702 const mcs_proto::Extension
& iq_extension
= iq_stanza
->extension();
703 switch (iq_extension
.id()) {
704 case kSelectiveAck
: {
705 PersistentIdList acked_ids
;
706 if (BuildPersistentIdListFromProto(iq_extension
.data(),
708 HandleSelectiveAck(acked_ids
);
713 // Do nothing. The last received stream id is always processed if it's
717 LOG(WARNING
) << "Received invalid iq stanza extension "
718 << iq_extension
.id();
722 case kDataMessageStanzaTag
: {
723 DCHECK_GE(stream_id_in_
, 1U);
724 mcs_proto::DataMessageStanza
* data_message
=
725 reinterpret_cast<mcs_proto::DataMessageStanza
*>(protobuf
.get());
726 if (data_message
->category() == kMCSCategory
) {
727 HandleMCSDataMesssage(protobuf
.Pass());
731 DCHECK(protobuf
.get());
732 base::MessageLoop::current()->PostTask(
734 base::Bind(message_received_callback_
,
737 const google::protobuf::MessageLite
>())));
741 LOG(ERROR
) << "Received unexpected message of type "
742 << static_cast<int>(tag
);
747 void MCSClient::HandleStreamAck(StreamId last_stream_id_received
) {
748 PersistentIdList acked_outgoing_persistent_ids
;
749 StreamIdList acked_outgoing_stream_ids
;
750 while (!to_resend_
.empty() &&
751 to_resend_
.front()->stream_id
<= last_stream_id_received
) {
752 const MCSPacketInternal
& outgoing_packet
= to_resend_
.front();
753 acked_outgoing_persistent_ids
.push_back(outgoing_packet
->persistent_id
);
754 acked_outgoing_stream_ids
.push_back(outgoing_packet
->stream_id
);
755 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
756 to_resend_
.pop_front();
759 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids
.size()
760 << " outgoing messages, " << to_resend_
.size()
761 << " remaining unacked";
762 gcm_store_
->RemoveOutgoingMessages(
763 acked_outgoing_persistent_ids
,
764 base::Bind(&MCSClient::OnGCMUpdateFinished
,
765 weak_ptr_factory_
.GetWeakPtr()));
767 HandleServerConfirmedReceipt(last_stream_id_received
);
770 void MCSClient::HandleSelectiveAck(const PersistentIdList
& id_list
) {
771 std::set
<PersistentId
> remaining_ids(id_list
.begin(), id_list
.end());
773 StreamId last_stream_id_received
= -1;
775 // First check the to_resend_ queue. Acknowledgments are always contiguous,
776 // so if there's a pending message that hasn't been acked, all newer messages
777 // must also be unacked.
778 while(!to_resend_
.empty() && !remaining_ids
.empty()) {
779 const MCSPacketInternal
& outgoing_packet
= to_resend_
.front();
780 if (remaining_ids
.count(outgoing_packet
->persistent_id
) == 0)
781 break; // Newer message must be unacked too.
782 remaining_ids
.erase(outgoing_packet
->persistent_id
);
783 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
785 // No need to re-acknowledge any server messages this message already
787 StreamId device_stream_id
= outgoing_packet
->stream_id
;
788 if (device_stream_id
> last_stream_id_received
)
789 last_stream_id_received
= device_stream_id
;
790 to_resend_
.pop_front();
793 // If the acknowledged ids aren't all there, they might be in the to_send_
794 // queue (typically when a SelectiveAck confirms messages as part of a login
796 while (!to_send_
.empty() && !remaining_ids
.empty()) {
797 const MCSPacketInternal
& outgoing_packet
= to_send_
.front();
798 if (remaining_ids
.count(outgoing_packet
->persistent_id
) == 0)
799 break; // Newer messages must be unacked too.
800 remaining_ids
.erase(outgoing_packet
->persistent_id
);
801 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
803 // No need to re-acknowledge any server messages this message already
805 StreamId device_stream_id
= outgoing_packet
->stream_id
;
806 if (device_stream_id
> last_stream_id_received
)
807 last_stream_id_received
= device_stream_id
;
811 // Only handle the largest stream id value. All other stream ids are
812 // implicitly handled.
813 if (last_stream_id_received
> 0)
814 HandleServerConfirmedReceipt(last_stream_id_received
);
816 // At this point, all remaining acked ids are redundant.
817 PersistentIdList acked_ids
;
818 if (remaining_ids
.size() > 0) {
819 for (size_t i
= 0; i
< id_list
.size(); ++i
) {
820 if (remaining_ids
.count(id_list
[i
]) > 0)
822 acked_ids
.push_back(id_list
[i
]);
828 DVLOG(1) << "Server acked " << acked_ids
.size()
829 << " messages, " << to_resend_
.size() << " remaining unacked.";
830 gcm_store_
->RemoveOutgoingMessages(
832 base::Bind(&MCSClient::OnGCMUpdateFinished
,
833 weak_ptr_factory_
.GetWeakPtr()));
835 // Resend any remaining outgoing messages, as they were not received by the
837 DVLOG(1) << "Resending " << to_resend_
.size() << " messages.";
838 while (!to_resend_
.empty()) {
839 to_send_
.push_front(to_resend_
.back());
840 to_resend_
.pop_back();
844 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id
) {
845 PersistentIdList acked_incoming_ids
;
846 for (std::map
<StreamId
, PersistentIdList
>::iterator iter
=
847 acked_server_ids_
.begin();
848 iter
!= acked_server_ids_
.end() &&
849 iter
->first
<= device_stream_id
;) {
850 acked_incoming_ids
.insert(acked_incoming_ids
.end(),
851 iter
->second
.begin(),
853 acked_server_ids_
.erase(iter
++);
856 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids
.size()
857 << " acknowledged server messages.";
858 gcm_store_
->RemoveIncomingMessages(
860 base::Bind(&MCSClient::OnGCMUpdateFinished
,
861 weak_ptr_factory_
.GetWeakPtr()));
864 MCSClient::PersistentId
MCSClient::GetNextPersistentId() {
865 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
868 void MCSClient::OnConnectionResetByHeartbeat() {
869 connection_factory_
->SignalConnectionReset(
870 ConnectionFactory::HEARTBEAT_FAILURE
);
873 void MCSClient::NotifyMessageSendStatus(
874 const google::protobuf::MessageLite
& protobuf
,
875 MessageSendStatus status
) {
876 if (GetMCSProtoTag(protobuf
) != kDataMessageStanzaTag
)
879 const mcs_proto::DataMessageStanza
* data_message_stanza
=
880 reinterpret_cast<const mcs_proto::DataMessageStanza
*>(&protobuf
);
881 recorder_
->RecordNotifySendStatus(
882 data_message_stanza
->category(),
883 data_message_stanza
->to(),
884 data_message_stanza
->id(),
887 data_message_stanza
->ttl());
888 message_sent_callback_
.Run(
889 data_message_stanza
->device_user_id(),
890 data_message_stanza
->category(),
891 data_message_stanza
->id(),
895 MCSClient::MCSPacketInternal
MCSClient::PopMessageForSend() {
896 MCSPacketInternal packet
= to_send_
.front();
897 to_send_
.pop_front();
899 if (packet
->tag
== kDataMessageStanzaTag
) {
900 mcs_proto::DataMessageStanza
* data_message
=
901 reinterpret_cast<mcs_proto::DataMessageStanza
*>(packet
->protobuf
.get());
902 CollapseKey
collapse_key(*data_message
);
903 if (collapse_key
.IsValid())
904 collapse_key_map_
.erase(collapse_key
);