1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/engine/mcs_client.h"
9 #include "base/basictypes.h"
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/metrics/histogram.h"
13 #include "base/strings/string_number_conversions.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "base/time/clock.h"
16 #include "base/time/time.h"
17 #include "base/timer/timer.h"
18 #include "google_apis/gcm/base/mcs_util.h"
19 #include "google_apis/gcm/base/socket_stream.h"
20 #include "google_apis/gcm/engine/connection_factory.h"
21 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h"
23 using namespace google::protobuf::io
;
29 typedef scoped_ptr
<google::protobuf::MessageLite
> MCSProto
;
31 // The category of messages intended for the GCM client itself from MCS.
32 const char kMCSCategory
[] = "com.google.android.gsf.gtalkservice";
34 // The from field for messages originating in the GCM client.
35 const char kGCMFromField
[] = "gcm@android.com";
37 // MCS status message types.
38 // TODO(zea): handle these at the GCMClient layer.
39 const char kIdleNotification
[] = "IdleNotification";
40 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
41 // const char kPowerNotification[] = "PowerNotification";
42 // const char kDataActiveNotification[] = "DataActiveNotification";
44 // Settings for MCS Login packet.
45 const char kHeartbeatIntervalSettingName
[] = "hbping";
46 const int kNoCustomHeartbeat
= 0;
48 // The number of unacked messages to allow before sending a stream ack.
49 // Applies to both incoming and outgoing messages.
50 // TODO(zea): make this server configurable.
51 const int kUnackedMessageBeforeStreamAck
= 10;
53 // The global maximum number of pending messages to have in the send queue.
54 const size_t kMaxSendQueueSize
= 10 * 1024;
56 // The maximum message size that can be sent to the server.
57 const int kMaxMessageBytes
= 4 * 1024; // 4KB, like the server.
59 // Helper for converting a proto persistent id list to a vector of strings.
60 bool BuildPersistentIdListFromProto(const google::protobuf::string
& bytes
,
61 std::vector
<std::string
>* id_list
) {
62 mcs_proto::SelectiveAck selective_ack
;
63 if (!selective_ack
.ParseFromString(bytes
))
65 std::vector
<std::string
> new_list
;
66 for (int i
= 0; i
< selective_ack
.id_size(); ++i
) {
67 DCHECK(!selective_ack
.id(i
).empty());
68 new_list
.push_back(selective_ack
.id(i
));
70 id_list
->swap(new_list
);
78 explicit CollapseKey(const mcs_proto::DataMessageStanza
& message
);
81 // Comparison operator for use in maps.
82 bool operator<(const CollapseKey
& right
) const;
84 // Whether the message had a valid collapse key.
87 std::string
token() const { return token_
; }
88 std::string
app_id() const { return app_id_
; }
89 int64
device_user_id() const { return device_user_id_
; }
92 const std::string token_
;
93 const std::string app_id_
;
94 const int64 device_user_id_
;
97 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza
& message
)
98 : token_(message
.token()),
99 app_id_(message
.category()),
100 device_user_id_(message
.device_user_id()) {}
102 CollapseKey::~CollapseKey() {}
104 bool CollapseKey::IsValid() const {
105 // Device user id is optional, but the application id and token are not.
106 return !token_
.empty() && !app_id_
.empty();
109 bool CollapseKey::operator<(const CollapseKey
& right
) const {
110 if (device_user_id_
!= right
.device_user_id())
111 return device_user_id_
< right
.device_user_id();
112 if (app_id_
!= right
.app_id())
113 return app_id_
< right
.app_id();
114 return token_
< right
.token();
117 struct ReliablePacketInfo
{
118 ReliablePacketInfo();
119 ~ReliablePacketInfo();
121 // The stream id with which the message was sent.
124 // If reliable delivery was requested, the persistent id of the message.
125 std::string persistent_id
;
127 // The type of message itself (for easier lookup).
130 // The protobuf of the message itself.
134 ReliablePacketInfo::ReliablePacketInfo()
135 : stream_id(0), tag(0) {
137 ReliablePacketInfo::~ReliablePacketInfo() {}
139 int MCSClient::GetSendQueueSize() const {
140 return to_send_
.size();
143 int MCSClient::GetResendQueueSize() const {
144 return to_resend_
.size();
147 std::string
MCSClient::GetStateString() const {
150 return "UNINITIALIZED";
159 return std::string();
163 MCSClient::MCSClient(const std::string
& version_string
,
165 ConnectionFactory
* connection_factory
,
167 GCMStatsRecorder
* recorder
)
168 : version_string_(version_string
),
170 state_(UNINITIALIZED
),
173 connection_factory_(connection_factory
),
174 connection_handler_(NULL
),
175 last_device_to_server_stream_id_received_(0),
176 last_server_to_device_stream_id_received_(0),
179 gcm_store_(gcm_store
),
181 weak_ptr_factory_(this) {
184 MCSClient::~MCSClient() {
187 void MCSClient::Initialize(
188 const ErrorCallback
& error_callback
,
189 const OnMessageReceivedCallback
& message_received_callback
,
190 const OnMessageSentCallback
& message_sent_callback
,
191 scoped_ptr
<GCMStore::LoadResult
> load_result
) {
192 DCHECK_EQ(state_
, UNINITIALIZED
);
195 mcs_error_callback_
= error_callback
;
196 message_received_callback_
= message_received_callback
;
197 message_sent_callback_
= message_sent_callback
;
199 connection_factory_
->Initialize(
200 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest
,
201 weak_ptr_factory_
.GetWeakPtr()),
202 base::Bind(&MCSClient::HandlePacketFromWire
,
203 weak_ptr_factory_
.GetWeakPtr()),
204 base::Bind(&MCSClient::MaybeSendMessage
,
205 weak_ptr_factory_
.GetWeakPtr()));
207 stream_id_out_
= 1; // Login request is hardcoded to id 1.
209 android_id_
= load_result
->device_android_id
;
210 security_token_
= load_result
->device_security_token
;
212 if (android_id_
== 0) {
213 DVLOG(1) << "No device credentials found, assuming new client.";
214 // No need to try and load RMQ data in that case.
218 // |android_id_| is non-zero, so should |security_token_|.
219 DCHECK_NE(0u, security_token_
) << "Security token invalid, while android id"
222 DVLOG(1) << "RMQ Load finished with " << load_result
->incoming_messages
.size()
223 << " incoming acks pending and "
224 << load_result
->outgoing_messages
.size()
225 << " outgoing messages pending.";
227 restored_unackeds_server_ids_
= load_result
->incoming_messages
;
229 // First go through and order the outgoing messages by recency.
230 std::map
<uint64
, google::protobuf::MessageLite
*> ordered_messages
;
231 std::vector
<PersistentId
> expired_ttl_ids
;
232 for (GCMStore::OutgoingMessageMap::iterator iter
=
233 load_result
->outgoing_messages
.begin();
234 iter
!= load_result
->outgoing_messages
.end(); ++iter
) {
235 uint64 timestamp
= 0;
236 if (!base::StringToUint64(iter
->first
, ×tamp
)) {
237 LOG(ERROR
) << "Invalid restored message.";
238 // TODO(fgorski): Error: data unreadable
239 mcs_error_callback_
.Run();
243 // Check if the TTL has expired for this message.
244 if (HasTTLExpired(*iter
->second
, clock_
)) {
245 expired_ttl_ids
.push_back(iter
->first
);
246 NotifyMessageSendStatus(*iter
->second
, TTL_EXCEEDED
);
250 ordered_messages
[timestamp
] = iter
->second
.release();
253 if (!expired_ttl_ids
.empty()) {
254 gcm_store_
->RemoveOutgoingMessages(
256 base::Bind(&MCSClient::OnGCMUpdateFinished
,
257 weak_ptr_factory_
.GetWeakPtr()));
260 // Now go through and add the outgoing messages to the send queue in their
261 // appropriate order (oldest at front, most recent at back).
262 for (std::map
<uint64
, google::protobuf::MessageLite
*>::iterator
263 iter
= ordered_messages
.begin();
264 iter
!= ordered_messages
.end(); ++iter
) {
265 ReliablePacketInfo
* packet_info
= new ReliablePacketInfo();
266 packet_info
->protobuf
.reset(iter
->second
);
267 packet_info
->tag
= GetMCSProtoTag(*iter
->second
);
268 packet_info
->persistent_id
= base::Uint64ToString(iter
->first
);
269 to_send_
.push_back(make_linked_ptr(packet_info
));
271 if (packet_info
->tag
== kDataMessageStanzaTag
) {
272 mcs_proto::DataMessageStanza
* data_message
=
273 reinterpret_cast<mcs_proto::DataMessageStanza
*>(
274 packet_info
->protobuf
.get());
275 CollapseKey
collapse_key(*data_message
);
276 if (collapse_key
.IsValid())
277 collapse_key_map_
[collapse_key
] = packet_info
;
281 // Establish if there is any custom client interval persisted from the last
282 // run and set it on the heartbeat manager.
283 custom_heartbeat_intervals_
.swap(load_result
->heartbeat_intervals
);
284 int min_interval_ms
= GetMinHeartbeatIntervalMs();
285 heartbeat_manager_
.SetClientHeartbeatIntervalMs(min_interval_ms
);
288 void MCSClient::Login(uint64 android_id
, uint64 security_token
) {
289 DCHECK_EQ(state_
, LOADED
);
290 DCHECK(android_id_
== 0 || android_id_
== android_id
);
291 DCHECK(security_token_
== 0 || security_token_
== security_token
);
293 if (android_id
!= android_id_
&& security_token
!= security_token_
) {
295 DCHECK(security_token
);
296 android_id_
= android_id
;
297 security_token_
= security_token
;
300 DCHECK(android_id_
!= 0 || restored_unackeds_server_ids_
.empty());
303 connection_factory_
->Connect();
304 connection_handler_
= connection_factory_
->GetConnectionHandler();
307 void MCSClient::SendMessage(const MCSMessage
& message
) {
308 int ttl
= GetTTL(message
.GetProtobuf());
310 if (to_send_
.size() > kMaxSendQueueSize
) {
311 NotifyMessageSendStatus(message
.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED
);
314 if (message
.size() > kMaxMessageBytes
) {
315 NotifyMessageSendStatus(message
.GetProtobuf(), MESSAGE_TOO_LARGE
);
319 scoped_ptr
<ReliablePacketInfo
> packet_info(new ReliablePacketInfo());
320 packet_info
->tag
= message
.tag();
321 packet_info
->protobuf
= message
.CloneProtobuf();
324 DCHECK_EQ(message
.tag(), kDataMessageStanzaTag
);
326 // First check if this message should replace a pending message with the
327 // same collapse key.
328 mcs_proto::DataMessageStanza
* data_message
=
329 reinterpret_cast<mcs_proto::DataMessageStanza
*>(
330 packet_info
->protobuf
.get());
331 CollapseKey
collapse_key(*data_message
);
332 if (collapse_key
.IsValid() && collapse_key_map_
.count(collapse_key
) > 0) {
333 ReliablePacketInfo
* original_packet
= collapse_key_map_
[collapse_key
];
334 DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
335 << original_packet
->persistent_id
;
336 original_packet
->protobuf
= packet_info
->protobuf
.Pass();
337 SetPersistentId(original_packet
->persistent_id
,
338 original_packet
->protobuf
.get());
339 gcm_store_
->OverwriteOutgoingMessage(
340 original_packet
->persistent_id
,
342 base::Bind(&MCSClient::OnGCMUpdateFinished
,
343 weak_ptr_factory_
.GetWeakPtr()));
345 // The message is already queued, return.
348 PersistentId persistent_id
= GetNextPersistentId();
349 DVLOG(1) << "Setting persistent id to " << persistent_id
;
350 packet_info
->persistent_id
= persistent_id
;
351 SetPersistentId(persistent_id
, packet_info
->protobuf
.get());
352 if (!gcm_store_
->AddOutgoingMessage(
354 MCSMessage(message
.tag(), *(packet_info
->protobuf
)),
355 base::Bind(&MCSClient::OnGCMUpdateFinished
,
356 weak_ptr_factory_
.GetWeakPtr()))) {
357 NotifyMessageSendStatus(message
.GetProtobuf(),
358 APP_QUEUE_SIZE_LIMIT_REACHED
);
363 if (collapse_key
.IsValid())
364 collapse_key_map_
[collapse_key
] = packet_info
.get();
365 } else if (!connection_factory_
->IsEndpointReachable()) {
366 DVLOG(1) << "No active connection, dropping message.";
367 NotifyMessageSendStatus(message
.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL
);
371 to_send_
.push_back(make_linked_ptr(packet_info
.release()));
373 // Notify that the messages has been succsfully queued for sending.
374 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
375 NotifyMessageSendStatus(message
.GetProtobuf(), QUEUED
);
380 void MCSClient::UpdateHeartbeatTimer(scoped_ptr
<base::Timer
> timer
) {
381 heartbeat_manager_
.UpdateHeartbeatTimer(timer
.Pass());
384 void MCSClient::AddHeartbeatInterval(const std::string
& scope
,
386 if (!heartbeat_manager_
.IsValidClientHeartbeatInterval(interval_ms
))
389 custom_heartbeat_intervals_
[scope
] = interval_ms
;
390 gcm_store_
->AddHeartbeatInterval(scope
, interval_ms
,
391 base::Bind(&MCSClient::OnGCMUpdateFinished
,
392 weak_ptr_factory_
.GetWeakPtr()));
394 int min_interval_ms
= GetMinHeartbeatIntervalMs();
395 heartbeat_manager_
.SetClientHeartbeatIntervalMs(min_interval_ms
);
398 void MCSClient::RemoveHeartbeatInterval(const std::string
& scope
) {
399 custom_heartbeat_intervals_
.erase(scope
);
400 gcm_store_
->RemoveHeartbeatInterval(
401 scope
, base::Bind(&MCSClient::OnGCMUpdateFinished
,
402 weak_ptr_factory_
.GetWeakPtr()));
404 int min_interval
= GetMinHeartbeatIntervalMs();
405 heartbeat_manager_
.SetClientHeartbeatIntervalMs(min_interval
);
408 int MCSClient::GetMinHeartbeatIntervalMs() {
409 if (custom_heartbeat_intervals_
.empty())
410 return kNoCustomHeartbeat
;
412 int min_interval
= custom_heartbeat_intervals_
.begin()->second
;
413 for (std::map
<std::string
, int>::const_iterator it
=
414 custom_heartbeat_intervals_
.begin();
415 it
!= custom_heartbeat_intervals_
.end();
417 if (it
->second
< min_interval
)
418 min_interval
= it
->second
;
423 void MCSClient::ResetStateAndBuildLoginRequest(
424 mcs_proto::LoginRequest
* request
) {
426 DCHECK(security_token_
);
429 last_device_to_server_stream_id_received_
= 0;
430 last_server_to_device_stream_id_received_
= 0;
432 heartbeat_manager_
.Stop();
434 // Add any pending acknowledgments to the list of ids.
435 for (StreamIdToPersistentIdMap::const_iterator iter
=
436 unacked_server_ids_
.begin();
437 iter
!= unacked_server_ids_
.end(); ++iter
) {
438 restored_unackeds_server_ids_
.push_back(iter
->second
);
440 unacked_server_ids_
.clear();
442 // Any acknowledged server ids which have not been confirmed by the server
443 // are treated like unacknowledged ids.
444 for (std::map
<StreamId
, PersistentIdList
>::const_iterator iter
=
445 acked_server_ids_
.begin();
446 iter
!= acked_server_ids_
.end(); ++iter
) {
447 restored_unackeds_server_ids_
.insert(restored_unackeds_server_ids_
.end(),
448 iter
->second
.begin(),
451 acked_server_ids_
.clear();
453 // Then build the request, consuming all pending acknowledgments.
454 request
->Swap(BuildLoginRequest(android_id_
,
456 version_string_
).get());
458 // Set custom heartbeat interval if specified.
459 if (heartbeat_manager_
.HasClientHeartbeatInterval()) {
460 // Ensure that the custom heartbeat interval is communicated to the server.
461 mcs_proto::Setting
* setting
= request
->add_setting();
462 setting
->set_name(kHeartbeatIntervalSettingName
);
463 setting
->set_value(base::IntToString(
464 heartbeat_manager_
.GetClientHeartbeatIntervalMs()));
467 for (PersistentIdList::const_iterator iter
=
468 restored_unackeds_server_ids_
.begin();
469 iter
!= restored_unackeds_server_ids_
.end(); ++iter
) {
470 request
->add_received_persistent_id(*iter
);
472 acked_server_ids_
[stream_id_out_
] = restored_unackeds_server_ids_
;
473 restored_unackeds_server_ids_
.clear();
475 // Push all unacknowledged messages to front of send queue. No need to save
476 // to RMQ, as all messages that reach this point should already have been
477 // saved as necessary.
478 while (!to_resend_
.empty()) {
479 to_send_
.push_front(to_resend_
.back());
480 to_resend_
.pop_back();
483 // Drop all TTL == 0 or expired TTL messages from the queue.
484 std::deque
<MCSPacketInternal
> new_to_send
;
485 std::vector
<PersistentId
> expired_ttl_ids
;
486 while (!to_send_
.empty()) {
487 MCSPacketInternal packet
= PopMessageForSend();
488 if (GetTTL(*packet
->protobuf
) > 0 &&
489 !HasTTLExpired(*packet
->protobuf
, clock_
)) {
490 new_to_send
.push_back(packet
);
492 // If the TTL was 0 there is no persistent id, so no need to remove the
493 // message from the persistent store.
494 if (!packet
->persistent_id
.empty())
495 expired_ttl_ids
.push_back(packet
->persistent_id
);
496 NotifyMessageSendStatus(*packet
->protobuf
, TTL_EXCEEDED
);
500 if (!expired_ttl_ids
.empty()) {
501 DVLOG(1) << "Connection reset, " << expired_ttl_ids
.size()
502 << " messages expired.";
503 gcm_store_
->RemoveOutgoingMessages(
505 base::Bind(&MCSClient::OnGCMUpdateFinished
,
506 weak_ptr_factory_
.GetWeakPtr()));
509 to_send_
.swap(new_to_send
);
511 DVLOG(1) << "Resetting state, with " << request
->received_persistent_id_size()
512 << " incoming acks pending, and " << to_send_
.size()
513 << " pending outgoing messages.";
518 void MCSClient::SendHeartbeat() {
519 SendMessage(MCSMessage(kHeartbeatPingTag
, mcs_proto::HeartbeatPing()));
522 void MCSClient::OnGCMUpdateFinished(bool success
) {
523 LOG_IF(ERROR
, !success
) << "GCM Update failed!";
524 UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success
);
525 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
528 void MCSClient::MaybeSendMessage() {
529 if (to_send_
.empty())
532 // If the connection has been reset, do nothing. On reconnection
533 // MaybeSendMessage will be automatically invoked again.
534 // TODO(zea): consider doing TTL expiration at connection reset time, rather
535 // than reconnect time.
536 if (!connection_factory_
->IsEndpointReachable())
539 MCSPacketInternal packet
= PopMessageForSend();
540 if (HasTTLExpired(*packet
->protobuf
, clock_
)) {
541 DCHECK(!packet
->persistent_id
.empty());
542 DVLOG(1) << "Dropping expired message " << packet
->persistent_id
<< ".";
543 NotifyMessageSendStatus(*packet
->protobuf
, TTL_EXCEEDED
);
544 gcm_store_
->RemoveOutgoingMessage(
545 packet
->persistent_id
,
546 base::Bind(&MCSClient::OnGCMUpdateFinished
,
547 weak_ptr_factory_
.GetWeakPtr()));
548 base::ThreadTaskRunnerHandle::Get()->PostTask(
550 base::Bind(&MCSClient::MaybeSendMessage
,
551 weak_ptr_factory_
.GetWeakPtr()));
554 DVLOG(1) << "Pending output message found, sending.";
555 if (!packet
->persistent_id
.empty())
556 to_resend_
.push_back(packet
);
557 SendPacketToWire(packet
.get());
560 void MCSClient::SendPacketToWire(ReliablePacketInfo
* packet_info
) {
561 packet_info
->stream_id
= ++stream_id_out_
;
562 DVLOG(1) << "Sending packet of type " << packet_info
->protobuf
->GetTypeName();
564 // Set the queued time as necessary.
565 if (packet_info
->tag
== kDataMessageStanzaTag
) {
566 mcs_proto::DataMessageStanza
* data_message
=
567 reinterpret_cast<mcs_proto::DataMessageStanza
*>(
568 packet_info
->protobuf
.get());
569 uint64 sent
= data_message
->sent();
571 int queued
= (clock_
->Now().ToInternalValue() /
572 base::Time::kMicrosecondsPerSecond
) - sent
;
573 DVLOG(1) << "Message was queued for " << queued
<< " seconds.";
574 data_message
->set_queued(queued
);
575 recorder_
->RecordDataSentToWire(
576 data_message
->category(),
582 // Set the proper last received stream id to acknowledge received server
584 DVLOG(1) << "Setting last stream id received to "
586 SetLastStreamIdReceived(stream_id_in_
,
587 packet_info
->protobuf
.get());
588 if (stream_id_in_
!= last_server_to_device_stream_id_received_
) {
589 last_server_to_device_stream_id_received_
= stream_id_in_
;
590 // Mark all acknowledged server messages as such. Note: they're not dropped,
591 // as it may be that they'll need to be re-acked if this message doesn't
593 PersistentIdList persistent_id_list
;
594 for (StreamIdToPersistentIdMap::const_iterator iter
=
595 unacked_server_ids_
.begin();
596 iter
!= unacked_server_ids_
.end(); ++iter
) {
597 DCHECK_LE(iter
->first
, last_server_to_device_stream_id_received_
);
598 persistent_id_list
.push_back(iter
->second
);
600 unacked_server_ids_
.clear();
601 acked_server_ids_
[stream_id_out_
] = persistent_id_list
;
604 connection_handler_
->SendMessage(*packet_info
->protobuf
);
607 void MCSClient::HandleMCSDataMesssage(
608 scoped_ptr
<google::protobuf::MessageLite
> protobuf
) {
609 mcs_proto::DataMessageStanza
* data_message
=
610 reinterpret_cast<mcs_proto::DataMessageStanza
*>(protobuf
.get());
611 // TODO(zea): implement a proper status manager rather than hardcoding these
613 scoped_ptr
<mcs_proto::DataMessageStanza
> response(
614 new mcs_proto::DataMessageStanza());
615 response
->set_from(kGCMFromField
);
616 response
->set_sent(clock_
->Now().ToInternalValue() /
617 base::Time::kMicrosecondsPerSecond
);
618 response
->set_ttl(0);
620 for (int i
= 0; i
< data_message
->app_data_size(); ++i
) {
621 const mcs_proto::AppData
& app_data
= data_message
->app_data(i
);
622 if (app_data
.key() == kIdleNotification
) {
623 // Tell the MCS server the client is not idle.
625 mcs_proto::AppData data
;
626 data
.set_key(kIdleNotification
);
627 data
.set_value("false");
628 response
->add_app_data()->CopyFrom(data
);
629 response
->set_category(kMCSCategory
);
634 SendMessage(MCSMessage(kDataMessageStanzaTag
, response
.Pass()));
638 void MCSClient::HandlePacketFromWire(
639 scoped_ptr
<google::protobuf::MessageLite
> protobuf
) {
642 uint8 tag
= GetMCSProtoTag(*protobuf
);
643 PersistentId persistent_id
= GetPersistentId(*protobuf
);
644 StreamId last_stream_id_received
= GetLastStreamIdReceived(*protobuf
);
646 if (last_stream_id_received
!= 0) {
647 last_device_to_server_stream_id_received_
= last_stream_id_received
;
649 // Process device to server messages that have now been acknowledged by the
650 // server. Because messages are stored in order, just pop off all that have
651 // a stream id lower than server's last received stream id.
652 HandleStreamAck(last_stream_id_received
);
654 // Process server_to_device_messages that the server now knows were
655 // acknowledged. Again, they're in order, so just keep going until the
656 // stream id is reached.
657 StreamIdList acked_stream_ids_to_remove
;
658 for (std::map
<StreamId
, PersistentIdList
>::iterator iter
=
659 acked_server_ids_
.begin();
660 iter
!= acked_server_ids_
.end() &&
661 iter
->first
<= last_stream_id_received
; ++iter
) {
662 acked_stream_ids_to_remove
.push_back(iter
->first
);
664 for (StreamIdList::iterator iter
= acked_stream_ids_to_remove
.begin();
665 iter
!= acked_stream_ids_to_remove
.end(); ++iter
) {
666 acked_server_ids_
.erase(*iter
);
671 if (!persistent_id
.empty()) {
672 unacked_server_ids_
[stream_id_in_
] = persistent_id
;
673 gcm_store_
->AddIncomingMessage(persistent_id
,
674 base::Bind(&MCSClient::OnGCMUpdateFinished
,
675 weak_ptr_factory_
.GetWeakPtr()));
678 DVLOG(1) << "Received message of type " << protobuf
->GetTypeName()
679 << " with persistent id "
680 << (persistent_id
.empty() ? "NULL" : persistent_id
)
681 << ", stream id " << stream_id_in_
<< " and last stream id received "
682 << last_stream_id_received
;
684 if (unacked_server_ids_
.size() > 0 &&
685 unacked_server_ids_
.size() % kUnackedMessageBeforeStreamAck
== 0) {
686 SendMessage(MCSMessage(kIqStanzaTag
, BuildStreamAck()));
689 // The connection is alive, treat this message as a heartbeat ack.
690 heartbeat_manager_
.OnHeartbeatAcked();
693 case kLoginResponseTag
: {
694 DCHECK_EQ(CONNECTING
, state_
);
695 mcs_proto::LoginResponse
* login_response
=
696 reinterpret_cast<mcs_proto::LoginResponse
*>(protobuf
.get());
697 DVLOG(1) << "Received login response:";
698 DVLOG(1) << " Id: " << login_response
->id();
699 DVLOG(1) << " Timestamp: " << login_response
->server_timestamp();
700 if (login_response
->has_error() && login_response
->error().code() != 0) {
701 state_
= UNINITIALIZED
;
702 DVLOG(1) << " Error code: " << login_response
->error().code();
703 DVLOG(1) << " Error message: " << login_response
->error().message();
704 LOG(ERROR
) << "Failed to log in to GCM, resetting connection.";
705 connection_factory_
->SignalConnectionReset(
706 ConnectionFactory::LOGIN_FAILURE
);
707 mcs_error_callback_
.Run();
711 if (login_response
->has_heartbeat_config()) {
712 heartbeat_manager_
.UpdateHeartbeatConfig(
713 login_response
->heartbeat_config());
717 stream_id_in_
= 1; // To account for the login response.
718 DCHECK_EQ(1U, stream_id_out_
);
720 // Pass the login response on up.
721 base::ThreadTaskRunnerHandle::Get()->PostTask(
723 base::Bind(message_received_callback_
,
724 MCSMessage(tag
, protobuf
.Pass())));
726 // If there are pending messages, attempt to send one.
727 if (!to_send_
.empty()) {
728 base::ThreadTaskRunnerHandle::Get()->PostTask(
730 base::Bind(&MCSClient::MaybeSendMessage
,
731 weak_ptr_factory_
.GetWeakPtr()));
734 heartbeat_manager_
.Start(
735 base::Bind(&MCSClient::SendHeartbeat
,
736 weak_ptr_factory_
.GetWeakPtr()),
737 base::Bind(&MCSClient::OnConnectionResetByHeartbeat
,
738 weak_ptr_factory_
.GetWeakPtr()));
741 case kHeartbeatPingTag
:
742 DCHECK_GE(stream_id_in_
, 1U);
743 DVLOG(1) << "Received heartbeat ping, sending ack.";
745 MCSMessage(kHeartbeatAckTag
, mcs_proto::HeartbeatAck()));
747 case kHeartbeatAckTag
:
748 DCHECK_GE(stream_id_in_
, 1U);
749 DVLOG(1) << "Received heartbeat ack.";
750 // Do nothing else, all messages act as heartbeat acks.
753 LOG(ERROR
) << "Received close command, resetting connection.";
755 connection_factory_
->SignalConnectionReset(
756 ConnectionFactory::CLOSE_COMMAND
);
759 DCHECK_GE(stream_id_in_
, 1U);
760 mcs_proto::IqStanza
* iq_stanza
=
761 reinterpret_cast<mcs_proto::IqStanza
*>(protobuf
.get());
762 const mcs_proto::Extension
& iq_extension
= iq_stanza
->extension();
763 switch (iq_extension
.id()) {
764 case kSelectiveAck
: {
765 PersistentIdList acked_ids
;
766 if (BuildPersistentIdListFromProto(iq_extension
.data(),
768 HandleSelectiveAck(acked_ids
);
773 // Do nothing. The last received stream id is always processed if it's
777 LOG(WARNING
) << "Received invalid iq stanza extension "
778 << iq_extension
.id();
782 case kDataMessageStanzaTag
: {
783 DCHECK_GE(stream_id_in_
, 1U);
784 mcs_proto::DataMessageStanza
* data_message
=
785 reinterpret_cast<mcs_proto::DataMessageStanza
*>(protobuf
.get());
786 if (data_message
->category() == kMCSCategory
) {
787 HandleMCSDataMesssage(protobuf
.Pass());
791 DCHECK(protobuf
.get());
792 base::ThreadTaskRunnerHandle::Get()->PostTask(
794 base::Bind(message_received_callback_
,
795 MCSMessage(tag
, protobuf
.Pass())));
799 LOG(ERROR
) << "Received unexpected message of type "
800 << static_cast<int>(tag
);
805 void MCSClient::HandleStreamAck(StreamId last_stream_id_received
) {
806 PersistentIdList acked_outgoing_persistent_ids
;
807 StreamIdList acked_outgoing_stream_ids
;
808 while (!to_resend_
.empty() &&
809 to_resend_
.front()->stream_id
<= last_stream_id_received
) {
810 const MCSPacketInternal
& outgoing_packet
= to_resend_
.front();
811 acked_outgoing_persistent_ids
.push_back(outgoing_packet
->persistent_id
);
812 acked_outgoing_stream_ids
.push_back(outgoing_packet
->stream_id
);
813 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
814 to_resend_
.pop_front();
817 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids
.size()
818 << " outgoing messages, " << to_resend_
.size()
819 << " remaining unacked";
820 gcm_store_
->RemoveOutgoingMessages(
821 acked_outgoing_persistent_ids
,
822 base::Bind(&MCSClient::OnGCMUpdateFinished
,
823 weak_ptr_factory_
.GetWeakPtr()));
825 HandleServerConfirmedReceipt(last_stream_id_received
);
828 void MCSClient::HandleSelectiveAck(const PersistentIdList
& id_list
) {
829 std::set
<PersistentId
> remaining_ids(id_list
.begin(), id_list
.end());
831 StreamId last_stream_id_received
= 0;
833 // First check the to_resend_ queue. Acknowledgments are always contiguous,
834 // so if there's a pending message that hasn't been acked, all newer messages
835 // must also be unacked.
836 while(!to_resend_
.empty() && !remaining_ids
.empty()) {
837 const MCSPacketInternal
& outgoing_packet
= to_resend_
.front();
838 if (remaining_ids
.count(outgoing_packet
->persistent_id
) == 0)
839 break; // Newer message must be unacked too.
840 remaining_ids
.erase(outgoing_packet
->persistent_id
);
841 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
843 // No need to re-acknowledge any server messages this message already
845 StreamId device_stream_id
= outgoing_packet
->stream_id
;
846 if (device_stream_id
> last_stream_id_received
)
847 last_stream_id_received
= device_stream_id
;
848 to_resend_
.pop_front();
851 // If the acknowledged ids aren't all there, they might be in the to_send_
852 // queue (typically when a SelectiveAck confirms messages as part of a login
854 while (!to_send_
.empty() && !remaining_ids
.empty()) {
855 const MCSPacketInternal
& outgoing_packet
= to_send_
.front();
856 if (remaining_ids
.count(outgoing_packet
->persistent_id
) == 0)
857 break; // Newer messages must be unacked too.
858 remaining_ids
.erase(outgoing_packet
->persistent_id
);
859 NotifyMessageSendStatus(*outgoing_packet
->protobuf
, SENT
);
861 // No need to re-acknowledge any server messages this message already
863 StreamId device_stream_id
= outgoing_packet
->stream_id
;
864 if (device_stream_id
> last_stream_id_received
)
865 last_stream_id_received
= device_stream_id
;
869 // Only handle the largest stream id value. All other stream ids are
870 // implicitly handled.
871 if (last_stream_id_received
> 0)
872 HandleServerConfirmedReceipt(last_stream_id_received
);
874 // At this point, all remaining acked ids are redundant.
875 PersistentIdList acked_ids
;
876 if (remaining_ids
.size() > 0) {
877 for (size_t i
= 0; i
< id_list
.size(); ++i
) {
878 if (remaining_ids
.count(id_list
[i
]) > 0)
880 acked_ids
.push_back(id_list
[i
]);
886 DVLOG(1) << "Server acked " << acked_ids
.size()
887 << " messages, " << to_resend_
.size() << " remaining unacked.";
888 gcm_store_
->RemoveOutgoingMessages(
890 base::Bind(&MCSClient::OnGCMUpdateFinished
,
891 weak_ptr_factory_
.GetWeakPtr()));
893 // Resend any remaining outgoing messages, as they were not received by the
895 DVLOG(1) << "Resending " << to_resend_
.size() << " messages.";
896 while (!to_resend_
.empty()) {
897 to_send_
.push_front(to_resend_
.back());
898 to_resend_
.pop_back();
900 base::ThreadTaskRunnerHandle::Get()->PostTask(
902 base::Bind(&MCSClient::MaybeSendMessage
,
903 weak_ptr_factory_
.GetWeakPtr()));
906 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id
) {
907 PersistentIdList acked_incoming_ids
;
908 for (std::map
<StreamId
, PersistentIdList
>::iterator iter
=
909 acked_server_ids_
.begin();
910 iter
!= acked_server_ids_
.end() &&
911 iter
->first
<= device_stream_id
;) {
912 acked_incoming_ids
.insert(acked_incoming_ids
.end(),
913 iter
->second
.begin(),
915 acked_server_ids_
.erase(iter
++);
918 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids
.size()
919 << " acknowledged server messages.";
920 gcm_store_
->RemoveIncomingMessages(
922 base::Bind(&MCSClient::OnGCMUpdateFinished
,
923 weak_ptr_factory_
.GetWeakPtr()));
926 MCSClient::PersistentId
MCSClient::GetNextPersistentId() {
927 return base::Int64ToString(base::TimeTicks::Now().ToInternalValue());
930 void MCSClient::OnConnectionResetByHeartbeat(
931 ConnectionFactory::ConnectionResetReason reason
) {
932 connection_factory_
->SignalConnectionReset(reason
);
935 void MCSClient::NotifyMessageSendStatus(
936 const google::protobuf::MessageLite
& protobuf
,
937 MessageSendStatus status
) {
938 if (GetMCSProtoTag(protobuf
) != kDataMessageStanzaTag
)
941 const mcs_proto::DataMessageStanza
* data_message_stanza
=
942 reinterpret_cast<const mcs_proto::DataMessageStanza
*>(&protobuf
);
943 recorder_
->RecordNotifySendStatus(
944 data_message_stanza
->category(),
945 data_message_stanza
->to(),
946 data_message_stanza
->id(),
949 data_message_stanza
->ttl());
950 message_sent_callback_
.Run(
951 data_message_stanza
->device_user_id(),
952 data_message_stanza
->category(),
953 data_message_stanza
->id(),
957 MCSClient::MCSPacketInternal
MCSClient::PopMessageForSend() {
958 MCSPacketInternal packet
= to_send_
.front();
959 to_send_
.pop_front();
961 if (packet
->tag
== kDataMessageStanzaTag
) {
962 mcs_proto::DataMessageStanza
* data_message
=
963 reinterpret_cast<mcs_proto::DataMessageStanza
*>(packet
->protobuf
.get());
964 CollapseKey
collapse_key(*data_message
);
965 if (collapse_key
.IsValid())
966 collapse_key_map_
.erase(collapse_key
);