1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/engine/mcs_client.h"
9 #include "base/basictypes.h"
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/metrics/histogram.h"
13 #include "base/strings/string_number_conversions.h"
14 #include "base/time/clock.h"
15 #include "base/time/time.h"
16 #include "base/timer/timer.h"
17 #include "google_apis/gcm/base/mcs_util.h"
18 #include "google_apis/gcm/base/socket_stream.h"
19 #include "google_apis/gcm/engine/connection_factory.h"
20 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h"
22 using namespace google::protobuf::io
;
28 typedef scoped_ptr
<google::protobuf::MessageLite
> MCSProto
;
30 // The category of messages intended for the GCM client itself from MCS.
31 const char kMCSCategory
[] = "com.google.android.gsf.gtalkservice";
33 // The from field for messages originating in the GCM client.
34 const char kGCMFromField
[] = "gcm@android.com";
36 // MCS status message types.
37 // TODO(zea): handle these at the GCMClient layer.
38 const char kIdleNotification
[] = "IdleNotification";
39 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
40 // const char kPowerNotification[] = "PowerNotification";
41 // const char kDataActiveNotification[] = "DataActiveNotification";
43 // The number of unacked messages to allow before sending a stream ack.
44 // Applies to both incoming and outgoing messages.
45 // TODO(zea): make this server configurable.
46 const int kUnackedMessageBeforeStreamAck
= 10;
48 // The global maximum number of pending messages to have in the send queue.
49 const size_t kMaxSendQueueSize
= 10 * 1024;
51 // The maximum message size that can be sent to the server.
52 const int kMaxMessageBytes
= 4 * 1024; // 4KB, like the server.
54 // Helper for converting a proto persistent id list to a vector of strings.
55 bool BuildPersistentIdListFromProto(const google::protobuf::string
& bytes
,
56 std::vector
<std::string
>* id_list
) {
57 mcs_proto::SelectiveAck selective_ack
;
58 if (!selective_ack
.ParseFromString(bytes
))
60 std::vector
<std::string
> new_list
;
61 for (int i
= 0; i
< selective_ack
.id_size(); ++i
) {
62 DCHECK(!selective_ack
.id(i
).empty());
63 new_list
.push_back(selective_ack
.id(i
));
65 id_list
->swap(new_list
);
73 explicit CollapseKey(const mcs_proto::DataMessageStanza
& message
);
76 // Comparison operator for use in maps.
77 bool operator<(const CollapseKey
& right
) const;
79 // Whether the message had a valid collapse key.
82 std::string
token() const { return token_
; }
83 std::string
app_id() const { return app_id_
; }
84 int64
device_user_id() const { return device_user_id_
; }
87 const std::string token_
;
88 const std::string app_id_
;
89 const int64 device_user_id_
;
92 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza
& message
)
93 : token_(message
.token()),
94 app_id_(message
.category()),
95 device_user_id_(message
.device_user_id()) {}
97 CollapseKey::~CollapseKey() {}
99 bool CollapseKey::IsValid() const {
100 // Device user id is optional, but the application id and token are not.
101 return !token_
.empty() && !app_id_
.empty();
104 bool CollapseKey::operator<(const CollapseKey
& right
) const {
105 if (device_user_id_
!= right
.device_user_id())
106 return device_user_id_
< right
.device_user_id();
107 if (app_id_
!= right
.app_id())
108 return app_id_
< right
.app_id();
109 return token_
< right
.token();
112 struct ReliablePacketInfo
{
113 ReliablePacketInfo();
114 ~ReliablePacketInfo();
116 // The stream id with which the message was sent.
119 // If reliable delivery was requested, the persistent id of the message.
120 std::string persistent_id
;
122 // The type of message itself (for easier lookup).
125 // The protobuf of the message itself.
129 ReliablePacketInfo::ReliablePacketInfo()
130 : stream_id(0), tag(0) {
132 ReliablePacketInfo::~ReliablePacketInfo() {}
134 int MCSClient::GetSendQueueSize() const {
135 return to_send_
.size();
138 int MCSClient::GetResendQueueSize() const {
139 return to_resend_
.size();
142 std::string
MCSClient::GetStateString() const {
145 return "UNINITIALIZED";
154 return std::string();
158 MCSClient::MCSClient(const std::string
& version_string
,
160 ConnectionFactory
* connection_factory
,
162 GCMStatsRecorder
* recorder
)
163 : version_string_(version_string
),
165 state_(UNINITIALIZED
),
168 connection_factory_(connection_factory
),
169 connection_handler_(NULL
),
170 last_device_to_server_stream_id_received_(0),
171 last_server_to_device_stream_id_received_(0),
174 gcm_store_(gcm_store
),
176 weak_ptr_factory_(this) {
179 MCSClient::~MCSClient() {
182 void MCSClient::Initialize(
183 const ErrorCallback
& error_callback
,
184 const OnMessageReceivedCallback
& message_received_callback
,
185 const OnMessageSentCallback
& message_sent_callback
,
186 scoped_ptr
<GCMStore::LoadResult
> load_result
) {
187 DCHECK_EQ(state_
, UNINITIALIZED
);
190 mcs_error_callback_
= error_callback
;
191 message_received_callback_
= message_received_callback
;
192 message_sent_callback_
= message_sent_callback
;
194 connection_factory_
->Initialize(
195 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest
,
196 weak_ptr_factory_
.GetWeakPtr()),
197 base::Bind(&MCSClient::HandlePacketFromWire
,
198 weak_ptr_factory_
.GetWeakPtr()),
199 base::Bind(&MCSClient::MaybeSendMessage
,
200 weak_ptr_factory_
.GetWeakPtr()));
202 stream_id_out_
= 1; // Login request is hardcoded to id 1.
204 android_id_
= load_result
->device_android_id
;
205 security_token_
= load_result
->device_security_token
;
207 if (android_id_
== 0) {
208 DVLOG(1) << "No device credentials found, assuming new client.";
209 // No need to try and load RMQ data in that case.
213 // |android_id_| is non-zero, so should |security_token_|.
214 DCHECK_NE(0u, security_token_
) << "Security token invalid, while android id"
217 DVLOG(1) << "RMQ Load finished with " << load_result
->incoming_messages
.size()
218 << " incoming acks pending and "
219 << load_result
->outgoing_messages
.size()
220 << " outgoing messages pending.";
222 restored_unackeds_server_ids_
= load_result
->incoming_messages
;
224 // First go through and order the outgoing messages by recency.
225 std::map
<uint64
, google::protobuf::MessageLite
*> ordered_messages
;
226 std::vector
<PersistentId
> expired_ttl_ids
;
227 for (GCMStore::OutgoingMessageMap::iterator iter
=
228 load_result
->outgoing_messages
.begin();
229 iter
!= load_result
->outgoing_messages
.end(); ++iter
) {
230 uint64 timestamp
= 0;
231 if (!base::StringToUint64(iter
->first
, ×tamp
)) {
232 LOG(ERROR
) << "Invalid restored message.";
233 // TODO(fgorski): Error: data unreadable
234 mcs_error_callback_
.Run();
238 // Check if the TTL has expired for this message.
239 if (HasTTLExpired(*iter
->second
, clock_
)) {
240 expired_ttl_ids
.push_back(iter
->first
);
241 NotifyMessageSendStatus(*iter
->second
, TTL_EXCEEDED
);
245 ordered_messages
[timestamp
] = iter
->second
.release();
248 if (!expired_ttl_ids
.empty()) {
249 gcm_store_
->RemoveOutgoingMessages(
251 base::Bind(&MCSClient::OnGCMUpdateFinished
,
252 weak_ptr_factory_
.GetWeakPtr()));
255 // Now go through and add the outgoing messages to the send queue in their
256 // appropriate order (oldest at front, most recent at back).
257 for (std::map
<uint64
, google::protobuf::MessageLite
*>::iterator
258 iter
= ordered_messages
.begin();
259 iter
!= ordered_messages
.end(); ++iter
) {
260 ReliablePacketInfo
* packet_info
= new ReliablePacketInfo();
261 packet_info
->protobuf
.reset(iter
->second
);
262 packet_info
->tag
= GetMCSProtoTag(*iter
->second
);
263 packet_info
->persistent_id
= base::Uint64ToString(iter
->first
);
264 to_send_
.push_back(make_linked_ptr(packet_info
));
266 if (packet_info
->tag
== kDataMessageStanzaTag
) {
267 mcs_proto::DataMessageStanza
* data_message
=
268 reinterpret_cast<mcs_proto::DataMessageStanza
*>(
269 packet_info
->protobuf
.get());
270 CollapseKey
collapse_key(*data_message
);
271 if (collapse_key
.IsValid())
272 collapse_key_map_
[collapse_key
] = packet_info
;
277 void MCSClient::Login(uint64 android_id
, uint64 security_token
) {
278 DCHECK_EQ(state_
, LOADED
);
279 DCHECK(android_id_
== 0 || android_id_
== android_id
);
280 DCHECK(security_token_
== 0 || security_token_
== security_token
);
282 if (android_id
!= android_id_
&& security_token
!= security_token_
) {
284 DCHECK(security_token
);
285 android_id_
= android_id
;
286 security_token_
= security_token
;
289 DCHECK(android_id_
!= 0 || restored_unackeds_server_ids_
.empty());
292 connection_factory_
->Connect();
293 connection_handler_
= connection_factory_
->GetConnectionHandler();
296 void MCSClient::SendMessage(const MCSMessage
& message
) {
297 int ttl
= GetTTL(message
.GetProtobuf());
299 if (to_send_
.size() > kMaxSendQueueSize
) {
300 NotifyMessageSendStatus(message
.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED
);
303 if (message
.size() > kMaxMessageBytes
) {
304 NotifyMessageSendStatus(message
.GetProtobuf(), MESSAGE_TOO_LARGE
);
308 scoped_ptr
<ReliablePacketInfo
> packet_info(new ReliablePacketInfo());
309 packet_info
->tag
= message
.tag();
310 packet_info
->protobuf
= message
.CloneProtobuf();
313 DCHECK_EQ(message
.tag(), kDataMessageStanzaTag
);
315 // First check if this message should replace a pending message with the
316 // same collapse key.
317 mcs_proto::DataMessageStanza
* data_message
=
318 reinterpret_cast<mcs_proto::DataMessageStanza
*>(
319 packet_info
->protobuf
.get());
320 CollapseKey
collapse_key(*data_message
);
321 if (collapse_key
.IsValid() && collapse_key_map_
.count(collapse_key
) > 0) {
322 ReliablePacketInfo
* original_packet
= collapse_key_map_
[collapse_key
];
323 DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
324 << original_packet
->persistent_id
;
325 original_packet
->protobuf
= packet_info
->protobuf
.Pass();
326 SetPersistentId(original_packet
->persistent_id
,
327 original_packet
->protobuf
.get());
328 gcm_store_
->OverwriteOutgoingMessage(
329 original_packet
->persistent_id
,
331 base::Bind(&MCSClient::OnGCMUpdateFinished
,
332 weak_ptr_factory_
.GetWeakPtr()));
334 // The message is already queued, return.
337 PersistentId persistent_id
= GetNextPersistentId();
338 DVLOG(1) << "Setting persistent id to " << persistent_id
;
339 packet_info
->persistent_id
= persistent_id
;
340 SetPersistentId(persistent_id
, packet_info
->protobuf
.get());
341 if (!gcm_store_
->AddOutgoingMessage(
343 MCSMessage(message
.tag(), *(packet_info
->protobuf
)),
344 base::Bind(&MCSClient::OnGCMUpdateFinished
,
345 weak_ptr_factory_
.GetWeakPtr()))) {
346 NotifyMessageSendStatus(message
.GetProtobuf(),
347 APP_QUEUE_SIZE_LIMIT_REACHED
);
352 if (collapse_key
.IsValid())
353 collapse_key_map_
[collapse_key
] = packet_info
.get();
354 } else if (!connection_factory_
->IsEndpointReachable()) {
355 DVLOG(1) << "No active connection, dropping message.";
356 NotifyMessageSendStatus(message
.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL
);
360 to_send_
.push_back(make_linked_ptr(packet_info
.release()));
362 // Notify that the messages has been succsfully queued for sending.
363 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
364 NotifyMessageSendStatus(message
.GetProtobuf(), QUEUED
);
369 void MCSClient::UpdateHeartbeatTimer(scoped_ptr
<base::Timer
> timer
) {
370 heartbeat_manager_
.UpdateHeartbeatTimer(timer
.Pass());
373 void MCSClient::ResetStateAndBuildLoginRequest(
374 mcs_proto::LoginRequest
* request
) {
376 DCHECK(security_token_
);
379 last_device_to_server_stream_id_received_
= 0;
380 last_server_to_device_stream_id_received_
= 0;
382 heartbeat_manager_
.Stop();
384 // Add any pending acknowledgments to the list of ids.
385 for (StreamIdToPersistentIdMap::const_iterator iter
=
386 unacked_server_ids_
.begin();
387 iter
!= unacked_server_ids_
.end(); ++iter
) {
388 restored_unackeds_server_ids_
.push_back(iter
->second
);
390 unacked_server_ids_
.clear();
392 // Any acknowledged server ids which have not been confirmed by the server
393 // are treated like unacknowledged ids.
394 for (std::map
<StreamId
, PersistentIdList
>::const_iterator iter
=
395 acked_server_ids_
.begin();
396 iter
!= acked_server_ids_
.end(); ++iter
) {
397 restored_unackeds_server_ids_
.insert(restored_unackeds_server_ids_
.end(),
398 iter
->second
.begin(),
401 acked_server_ids_
.clear();
403 // Then build the request, consuming all pending acknowledgments.
404 request
->Swap(BuildLoginRequest(android_id_
,
406 version_string_
).get());
407 for (PersistentIdList::const_iterator iter
=
408 restored_unackeds_server_ids_
.begin();
409 iter
!= restored_unackeds_server_ids_
.end(); ++iter
) {
410 request
->add_received_persistent_id(*iter
);
412 acked_server_ids_
[stream_id_out_
] = restored_unackeds_server_ids_
;
413 restored_unackeds_server_ids_
.clear();
415 // Push all unacknowledged messages to front of send queue. No need to save
416 // to RMQ, as all messages that reach this point should already have been
417 // saved as necessary.
418 while (!to_resend_
.empty()) {
419 to_send_
.push_front(to_resend_
.back());
420 to_resend_
.pop_back();
423 // Drop all TTL == 0 or expired TTL messages from the queue.
424 std::deque
<MCSPacketInternal
> new_to_send
;
425 std::vector
<PersistentId
> expired_ttl_ids
;
426 while (!to_send_
.empty()) {
427 MCSPacketInternal packet
= PopMessageForSend();
428 if (GetTTL(*packet
->protobuf
) > 0 &&
429 !HasTTLExpired(*packet
->protobuf
, clock_
)) {
430 new_to_send
.push_back(packet
);
432 // If the TTL was 0 there is no persistent id, so no need to remove the
433 // message from the persistent store.
434 if (!packet
->persistent_id
.empty())
435 expired_ttl_ids
.push_back(packet
->persistent_id
);
436 NotifyMessageSendStatus(*packet
->protobuf
, TTL_EXCEEDED
);
440 if (!expired_ttl_ids
.empty()) {
441 DVLOG(1) << "Connection reset, " << expired_ttl_ids
.size()
442 << " messages expired.";
443 gcm_store_
->RemoveOutgoingMessages(
445 base::Bind(&MCSClient::OnGCMUpdateFinished
,
446 weak_ptr_factory_
.GetWeakPtr()));
449 to_send_
.swap(new_to_send
);
451 DVLOG(1) << "Resetting state, with " << request
->received_persistent_id_size()
452 << " incoming acks pending, and " << to_send_
.size()
453 << " pending outgoing messages.";
458 void MCSClient::SendHeartbeat() {
459 SendMessage(MCSMessage(kHeartbeatPingTag
, mcs_proto::HeartbeatPing()));
462 void MCSClient::OnGCMUpdateFinished(bool success
) {
463 LOG_IF(ERROR
, !success
) << "GCM Update failed!";
464 UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success
);
465 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
468 void MCSClient::MaybeSendMessage() {
469 if (to_send_
.empty())
472 // If the connection has been reset, do nothing. On reconnection
473 // MaybeSendMessage will be automatically invoked again.
474 // TODO(zea): consider doing TTL expiration at connection reset time, rather
475 // than reconnect time.
476 if (!connection_factory_
->IsEndpointReachable())
479 MCSPacketInternal packet
= PopMessageForSend();
480 if (HasTTLExpired(*packet
->protobuf
, clock_
)) {
481 DCHECK(!packet
->persistent_id
.empty());
482 DVLOG(1) << "Dropping expired message " << packet
->persistent_id
<< ".";
483 NotifyMessageSendStatus(*packet
->protobuf
, TTL_EXCEEDED
);
484 gcm_store_
->RemoveOutgoingMessage(
485 packet
->persistent_id
,
486 base::Bind(&MCSClient::OnGCMUpdateFinished
,
487 weak_ptr_factory_
.GetWeakPtr()));
488 base::MessageLoop::current()->PostTask(
490 base::Bind(&MCSClient::MaybeSendMessage
,
491 weak_ptr_factory_
.GetWeakPtr()));
494 DVLOG(1) << "Pending output message found, sending.";
495 if (!packet
->persistent_id
.empty())
496 to_resend_
.push_back(packet
);
497 SendPacketToWire(packet
.get());
500 void MCSClient::SendPacketToWire(ReliablePacketInfo
* packet_info
) {
501 packet_info
->stream_id
= ++stream_id_out_
;
502 DVLOG(1) << "Sending packet of type " << packet_info
->protobuf
->GetTypeName();
504 // Set the queued time as necessary.
505 if (packet_info
->tag
== kDataMessageStanzaTag
) {
506 mcs_proto::DataMessageStanza
* data_message
=
507 reinterpret_cast<mcs_proto::DataMessageStanza
*>(
508 packet_info
->protobuf
.get());
509 uint64 sent
= data_message
->sent();
511 int queued
= (clock_
->Now().ToInternalValue() /
512 base::Time::kMicrosecondsPerSecond
) - sent
;
513 DVLOG(1) << "Message was queued for " << queued
<< " seconds.";
514 data_message
->set_queued(queued
);
515 recorder_
->RecordDataSentToWire(
516 data_message
->category(),
522 // Set the proper last received stream id to acknowledge received server
524 DVLOG(1) << "Setting last stream id received to "
526 SetLastStreamIdReceived(stream_id_in_
,
527 packet_info
->protobuf
.get());
528 if (stream_id_in_
!= last_server_to_device_stream_id_received_
) {
529 last_server_to_device_stream_id_received_
= stream_id_in_
;
530 // Mark all acknowledged server messages as such. Note: they're not dropped,
531 // as it may be that they'll need to be re-acked if this message doesn't
533 PersistentIdList persistent_id_list
;
534 for (StreamIdToPersistentIdMap::const_iterator iter
=
535 unacked_server_ids_
.begin();
536 iter
!= unacked_server_ids_
.end(); ++iter
) {
537 DCHECK_LE(iter
->first
, last_server_to_device_stream_id_received_
);
538 persistent_id_list
.push_back(iter
->second
);
540 unacked_server_ids_
.clear();
541 acked_server_ids_
[stream_id_out_
] = persistent_id_list
;
544 connection_handler_
->SendMessage(*packet_info
->protobuf
);
547 void MCSClient::HandleMCSDataMesssage(
548 scoped_ptr
<google::protobuf::MessageLite
> protobuf
) {
549 mcs_proto::DataMessageStanza
* data_message
=
550 reinterpret_cast<mcs_proto::DataMessageStanza
*>(protobuf
.get());
551 // TODO(zea): implement a proper status manager rather than hardcoding these
553 scoped_ptr
<mcs_proto::DataMessageStanza
> response(
554 new mcs_proto::DataMessageStanza());
555 response
->set_from(kGCMFromField
);
556 response
->set_sent(clock_
->Now().ToInternalValue() /
557 base::Time::kMicrosecondsPerSecond
);
558 response
->set_ttl(0);
560 for (int i
= 0; i
< data_message
->app_data_size(); ++i
) {
561 const mcs_proto::AppData
& app_data
= data_message
->app_data(i
);
562 if (app_data
.key() == kIdleNotification
) {
563 // Tell the MCS server the client is not idle.
565 mcs_proto::AppData data
;
566 data
.set_key(kIdleNotification
);
567 data
.set_value("false");
568 response
->add_app_data()->CopyFrom(data
);
569 response
->set_category(kMCSCategory
);
574 SendMessage(MCSMessage(kDataMessageStanzaTag
, response
.Pass()));
578 void MCSClient::HandlePacketFromWire(
579 scoped_ptr
<google::protobuf::MessageLite
> protobuf
) {
582 uint8 tag
= GetMCSProtoTag(*protobuf
);
583 PersistentId persistent_id
= GetPersistentId(*protobuf
);
584 StreamId last_stream_id_received
= GetLastStreamIdReceived(*protobuf
);
586 if (last_stream_id_received
!= 0) {
587 last_device_to_server_stream_id_received_
= last_stream_id_received
;
589 // Process device to server messages that have now been acknowledged by the
590 // server. Because messages are stored in order, just pop off all that have
591 // a stream id lower than server's last received stream id.
592 HandleStreamAck(last_stream_id_received
);
594 // Process server_to_device_messages that the server now knows were
595 // acknowledged. Again, they're in order, so just keep going until the
596 // stream id is reached.
597 StreamIdList acked_stream_ids_to_remove
;
598 for (std::map
<StreamId
, PersistentIdList
>::iterator iter
=
599 acked_server_ids_
.begin();
600 iter
!= acked_server_ids_
.end() &&
601 iter
->first
<= last_stream_id_received
; ++iter
) {
602 acked_stream_ids_to_remove
.push_back(iter
->first
);
604 for (StreamIdList::iterator iter
= acked_stream_ids_to_remove
.begin();
605 iter
!= acked_stream_ids_to_remove
.end(); ++iter
) {
606 acked_server_ids_
.erase(*iter
);
611 if (!persistent_id
.empty()) {
612 unacked_server_ids_
[stream_id_in_
] = persistent_id
;
613 gcm_store_
->AddIncomingMessage(persistent_id
,
614 base::Bind(&MCSClient::OnGCMUpdateFinished
,
615 weak_ptr_factory_
.GetWeakPtr()));
618 DVLOG(1) << "Received message of type " << protobuf
->GetTypeName()
619 << " with persistent id "
620 << (persistent_id
.empty() ? "NULL" : persistent_id
)
621 << ", stream id " << stream_id_in_
<< " and last stream id received "
622 << last_stream_id_received
;
624 if (unacked_server_ids_
.size() > 0 &&
625 unacked_server_ids_
.size() % kUnackedMessageBeforeStreamAck
== 0) {
626 SendMessage(MCSMessage(kIqStanzaTag
, BuildStreamAck()));
629 // The connection is alive, treat this message as a heartbeat ack.
630 heartbeat_manager_
.OnHeartbeatAcked();
633 case kLoginResponseTag
: {
634 DCHECK_EQ(CONNECTING
, state_
);
635 mcs_proto::LoginResponse
* login_response
=
636 reinterpret_cast<mcs_proto::LoginResponse
*>(protobuf
.get());
637 DVLOG(1) << "Received login response:";
638 DVLOG(1) << " Id: " << login_response
->id();
639 DVLOG(1) << " Timestamp: " << login_response
->server_timestamp();
640 if (login_response
->has_error() && login_response
->error().code() != 0) {
641 state_
= UNINITIALIZED
;
642 DVLOG(1) << " Error code: " << login_response
->error().code();
643 DVLOG(1) << " Error message: " << login_response
->error().message();
644 LOG(ERROR
) << "Failed to log in to GCM, resetting connection.";
645 connection_factory_
->SignalConnectionReset(
646 ConnectionFactory::LOGIN_FAILURE
);
647 mcs_error_callback_
.Run();
651 if (login_response
->has_heartbeat_config()) {
652 heartbeat_manager_
.UpdateHeartbeatConfig(
653 login_response
->heartbeat_config());
657 stream_id_in_
= 1; // To account for the login response.
658 DCHECK_EQ(1U, stream_id_out_
);
660 // Pass the login response on up.
661 base::MessageLoop::current()->PostTask(
663 base::Bind(message_received_callback_
,
664 MCSMessage(tag
, protobuf
.Pass())));
666 // If there are pending messages, attempt to send one.
667 if (!to_send_
.empty()) {
668 base::MessageLoop::current()->PostTask(
670 base::Bind(&MCSClient::MaybeSendMessage
,
671 weak_ptr_factory_
.GetWeakPtr()));
674 heartbeat_manager_
.Start(
675 base::Bind(&MCSClient::SendHeartbeat
,
676 weak_ptr_factory_
.GetWeakPtr()),
677 base::Bind(&MCSClient::OnConnectionResetByHeartbeat
,
678 weak_ptr_factory_
.GetWeakPtr()));
681 case kHeartbeatPingTag
:
682 DCHECK_GE(stream_id_in_
, 1U);
683 DVLOG(1) << "Received heartbeat ping, sending ack.";
685 MCSMessage(kHeartbeatAckTag
, mcs_proto::HeartbeatAck()));
687 case kHeartbeatAckTag
:
688 DCHECK_GE(stream_id_in_
, 1U);
689 DVLOG(1) << "Received heartbeat ack.";
690 // Do nothing else, all messages act as heartbeat acks.
693 LOG(ERROR
) << "Received close command, resetting connection.";
695 connection_factory_
->SignalConnectionReset(
696 ConnectionFactory::CLOSE_COMMAND
);
699 DCHECK_GE(stream_id_in_
, 1U);
700 mcs_proto::IqStanza
* iq_stanza
=
701 reinterpret_cast<mcs_proto::IqStanza
*>(protobuf
.get());
702 const mcs_proto::Extension
& iq_extension
= iq_stanza
->extension();
703 switch (iq_extension
.id()) {
704 case kSelectiveAck
: {
705 PersistentIdList acked_ids
;
706 if (BuildPersistentIdListFromProto(iq_extension
.data(),
708 HandleSelectiveAck(acked_ids
);
713 // Do nothing. The last received stream id is always processed if it's
717 LOG(WARNING
) << "Received invalid iq stanza extension "
718 << iq_extension
.id();
722 case kDataMessageStanzaTag
: {
723 DCHECK_GE(stream_id_in_
, 1U);
724 mcs_proto::DataMessageStanza
* data_message
=
725 reinterpret_cast<mcs_proto::DataMessageStanza
*>(protobuf
.get());
726 if (data_message
->category() == kMCSCategory
) {
727 HandleMCSDataMesssage(protobuf
.Pass());
731 DCHECK(protobuf
.get());
732 base::MessageLoop::current()->PostTask(
734 base::Bind(message_received_callback_
,
735 MCSMessage(tag
, protobuf
.Pass())));
739 LOG(ERROR
) << "Received unexpected message of type "
740 << static_cast<int>(tag
);
745 void MCSClient::HandleStreamAck(StreamId last_stream_id_received
) {
746 PersistentIdList acked_outgoing_persistent_ids
;
747 StreamIdList acked_outgoing_stream_ids
;
748 while (!to_resend_
.empty() &&
749 to_resend_
.front()->stream_id
<= last_stream_id_received
) {
750 const MCSPacketInternal
& outgoing_packet
= to_resend_
.front();
751 acked_outgoing_persistent_ids
.push_back(outgoing_packet
->persistent_id
);
752 acked_outgoing_stream_ids
.push_back(outgoing_packet
->stream_id
);
753 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
754 to_resend_
.pop_front();
757 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids
.size()
758 << " outgoing messages, " << to_resend_
.size()
759 << " remaining unacked";
760 gcm_store_
->RemoveOutgoingMessages(
761 acked_outgoing_persistent_ids
,
762 base::Bind(&MCSClient::OnGCMUpdateFinished
,
763 weak_ptr_factory_
.GetWeakPtr()));
765 HandleServerConfirmedReceipt(last_stream_id_received
);
768 void MCSClient::HandleSelectiveAck(const PersistentIdList
& id_list
) {
769 std::set
<PersistentId
> remaining_ids(id_list
.begin(), id_list
.end());
771 StreamId last_stream_id_received
= 0;
773 // First check the to_resend_ queue. Acknowledgments are always contiguous,
774 // so if there's a pending message that hasn't been acked, all newer messages
775 // must also be unacked.
776 while(!to_resend_
.empty() && !remaining_ids
.empty()) {
777 const MCSPacketInternal
& outgoing_packet
= to_resend_
.front();
778 if (remaining_ids
.count(outgoing_packet
->persistent_id
) == 0)
779 break; // Newer message must be unacked too.
780 remaining_ids
.erase(outgoing_packet
->persistent_id
);
781 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
783 // No need to re-acknowledge any server messages this message already
785 StreamId device_stream_id
= outgoing_packet
->stream_id
;
786 if (device_stream_id
> last_stream_id_received
)
787 last_stream_id_received
= device_stream_id
;
788 to_resend_
.pop_front();
791 // If the acknowledged ids aren't all there, they might be in the to_send_
792 // queue (typically when a SelectiveAck confirms messages as part of a login
794 while (!to_send_
.empty() && !remaining_ids
.empty()) {
795 const MCSPacketInternal
& outgoing_packet
= to_send_
.front();
796 if (remaining_ids
.count(outgoing_packet
->persistent_id
) == 0)
797 break; // Newer messages must be unacked too.
798 remaining_ids
.erase(outgoing_packet
->persistent_id
);
799 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
801 // No need to re-acknowledge any server messages this message already
803 StreamId device_stream_id
= outgoing_packet
->stream_id
;
804 if (device_stream_id
> last_stream_id_received
)
805 last_stream_id_received
= device_stream_id
;
809 // Only handle the largest stream id value. All other stream ids are
810 // implicitly handled.
811 if (last_stream_id_received
> 0)
812 HandleServerConfirmedReceipt(last_stream_id_received
);
814 // At this point, all remaining acked ids are redundant.
815 PersistentIdList acked_ids
;
816 if (remaining_ids
.size() > 0) {
817 for (size_t i
= 0; i
< id_list
.size(); ++i
) {
818 if (remaining_ids
.count(id_list
[i
]) > 0)
820 acked_ids
.push_back(id_list
[i
]);
826 DVLOG(1) << "Server acked " << acked_ids
.size()
827 << " messages, " << to_resend_
.size() << " remaining unacked.";
828 gcm_store_
->RemoveOutgoingMessages(
830 base::Bind(&MCSClient::OnGCMUpdateFinished
,
831 weak_ptr_factory_
.GetWeakPtr()));
833 // Resend any remaining outgoing messages, as they were not received by the
835 DVLOG(1) << "Resending " << to_resend_
.size() << " messages.";
836 while (!to_resend_
.empty()) {
837 to_send_
.push_front(to_resend_
.back());
838 to_resend_
.pop_back();
840 base::MessageLoop::current()->PostTask(
842 base::Bind(&MCSClient::MaybeSendMessage
,
843 weak_ptr_factory_
.GetWeakPtr()));
846 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id
) {
847 PersistentIdList acked_incoming_ids
;
848 for (std::map
<StreamId
, PersistentIdList
>::iterator iter
=
849 acked_server_ids_
.begin();
850 iter
!= acked_server_ids_
.end() &&
851 iter
->first
<= device_stream_id
;) {
852 acked_incoming_ids
.insert(acked_incoming_ids
.end(),
853 iter
->second
.begin(),
855 acked_server_ids_
.erase(iter
++);
858 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids
.size()
859 << " acknowledged server messages.";
860 gcm_store_
->RemoveIncomingMessages(
862 base::Bind(&MCSClient::OnGCMUpdateFinished
,
863 weak_ptr_factory_
.GetWeakPtr()));
866 MCSClient::PersistentId
MCSClient::GetNextPersistentId() {
867 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
870 void MCSClient::OnConnectionResetByHeartbeat() {
871 connection_factory_
->SignalConnectionReset(
872 ConnectionFactory::HEARTBEAT_FAILURE
);
875 void MCSClient::NotifyMessageSendStatus(
876 const google::protobuf::MessageLite
& protobuf
,
877 MessageSendStatus status
) {
878 if (GetMCSProtoTag(protobuf
) != kDataMessageStanzaTag
)
881 const mcs_proto::DataMessageStanza
* data_message_stanza
=
882 reinterpret_cast<const mcs_proto::DataMessageStanza
*>(&protobuf
);
883 recorder_
->RecordNotifySendStatus(
884 data_message_stanza
->category(),
885 data_message_stanza
->to(),
886 data_message_stanza
->id(),
889 data_message_stanza
->ttl());
890 message_sent_callback_
.Run(
891 data_message_stanza
->device_user_id(),
892 data_message_stanza
->category(),
893 data_message_stanza
->id(),
897 MCSClient::MCSPacketInternal
MCSClient::PopMessageForSend() {
898 MCSPacketInternal packet
= to_send_
.front();
899 to_send_
.pop_front();
901 if (packet
->tag
== kDataMessageStanzaTag
) {
902 mcs_proto::DataMessageStanza
* data_message
=
903 reinterpret_cast<mcs_proto::DataMessageStanza
*>(packet
->protobuf
.get());
904 CollapseKey
collapse_key(*data_message
);
905 if (collapse_key
.IsValid())
906 collapse_key_map_
.erase(collapse_key
);