Mechanical rename of tracing includes for /ppapi
[chromium-blink-merge.git] / google_apis / gcm / engine / mcs_client.cc
blobf952839a59e0567efb0f3bc1d38588e5540d5f81
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/engine/mcs_client.h"
7 #include <set>
9 #include "base/basictypes.h"
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/metrics/histogram.h"
13 #include "base/strings/string_number_conversions.h"
14 #include "base/time/clock.h"
15 #include "base/time/time.h"
16 #include "base/timer/timer.h"
17 #include "google_apis/gcm/base/mcs_util.h"
18 #include "google_apis/gcm/base/socket_stream.h"
19 #include "google_apis/gcm/engine/connection_factory.h"
20 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h"
22 using namespace google::protobuf::io;
24 namespace gcm {
26 namespace {
28 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
30 // The category of messages intended for the GCM client itself from MCS.
31 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
33 // The from field for messages originating in the GCM client.
34 const char kGCMFromField[] = "gcm@android.com";
36 // MCS status message types.
37 // TODO(zea): handle these at the GCMClient layer.
38 const char kIdleNotification[] = "IdleNotification";
39 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
40 // const char kPowerNotification[] = "PowerNotification";
41 // const char kDataActiveNotification[] = "DataActiveNotification";
43 // The number of unacked messages to allow before sending a stream ack.
44 // Applies to both incoming and outgoing messages.
45 // TODO(zea): make this server configurable.
46 const int kUnackedMessageBeforeStreamAck = 10;
48 // The global maximum number of pending messages to have in the send queue.
49 const size_t kMaxSendQueueSize = 10 * 1024;
51 // The maximum message size that can be sent to the server.
52 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server.
54 // Helper for converting a proto persistent id list to a vector of strings.
55 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
56 std::vector<std::string>* id_list) {
57 mcs_proto::SelectiveAck selective_ack;
58 if (!selective_ack.ParseFromString(bytes))
59 return false;
60 std::vector<std::string> new_list;
61 for (int i = 0; i < selective_ack.id_size(); ++i) {
62 DCHECK(!selective_ack.id(i).empty());
63 new_list.push_back(selective_ack.id(i));
65 id_list->swap(new_list);
66 return true;
69 } // namespace
71 class CollapseKey {
72 public:
73 explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
74 ~CollapseKey();
76 // Comparison operator for use in maps.
77 bool operator<(const CollapseKey& right) const;
79 // Whether the message had a valid collapse key.
80 bool IsValid() const;
82 std::string token() const { return token_; }
83 std::string app_id() const { return app_id_; }
84 int64 device_user_id() const { return device_user_id_; }
86 private:
87 const std::string token_;
88 const std::string app_id_;
89 const int64 device_user_id_;
92 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
93 : token_(message.token()),
94 app_id_(message.category()),
95 device_user_id_(message.device_user_id()) {}
97 CollapseKey::~CollapseKey() {}
99 bool CollapseKey::IsValid() const {
100 // Device user id is optional, but the application id and token are not.
101 return !token_.empty() && !app_id_.empty();
104 bool CollapseKey::operator<(const CollapseKey& right) const {
105 if (device_user_id_ != right.device_user_id())
106 return device_user_id_ < right.device_user_id();
107 if (app_id_ != right.app_id())
108 return app_id_ < right.app_id();
109 return token_ < right.token();
112 struct ReliablePacketInfo {
113 ReliablePacketInfo();
114 ~ReliablePacketInfo();
116 // The stream id with which the message was sent.
117 uint32 stream_id;
119 // If reliable delivery was requested, the persistent id of the message.
120 std::string persistent_id;
122 // The type of message itself (for easier lookup).
123 uint8 tag;
125 // The protobuf of the message itself.
126 MCSProto protobuf;
129 ReliablePacketInfo::ReliablePacketInfo()
130 : stream_id(0), tag(0) {
132 ReliablePacketInfo::~ReliablePacketInfo() {}
134 int MCSClient::GetSendQueueSize() const {
135 return to_send_.size();
138 int MCSClient::GetResendQueueSize() const {
139 return to_resend_.size();
142 std::string MCSClient::GetStateString() const {
143 switch(state_) {
144 case UNINITIALIZED:
145 return "UNINITIALIZED";
146 case LOADED:
147 return "LOADED";
148 case CONNECTING:
149 return "CONNECTING";
150 case CONNECTED:
151 return "CONNECTED";
152 default:
153 NOTREACHED();
154 return std::string();
158 MCSClient::MCSClient(const std::string& version_string,
159 base::Clock* clock,
160 ConnectionFactory* connection_factory,
161 GCMStore* gcm_store,
162 GCMStatsRecorder* recorder)
163 : version_string_(version_string),
164 clock_(clock),
165 state_(UNINITIALIZED),
166 android_id_(0),
167 security_token_(0),
168 connection_factory_(connection_factory),
169 connection_handler_(NULL),
170 last_device_to_server_stream_id_received_(0),
171 last_server_to_device_stream_id_received_(0),
172 stream_id_out_(0),
173 stream_id_in_(0),
174 gcm_store_(gcm_store),
175 recorder_(recorder),
176 weak_ptr_factory_(this) {
179 MCSClient::~MCSClient() {
182 void MCSClient::Initialize(
183 const ErrorCallback& error_callback,
184 const OnMessageReceivedCallback& message_received_callback,
185 const OnMessageSentCallback& message_sent_callback,
186 scoped_ptr<GCMStore::LoadResult> load_result) {
187 DCHECK_EQ(state_, UNINITIALIZED);
189 state_ = LOADED;
190 mcs_error_callback_ = error_callback;
191 message_received_callback_ = message_received_callback;
192 message_sent_callback_ = message_sent_callback;
194 connection_factory_->Initialize(
195 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
196 weak_ptr_factory_.GetWeakPtr()),
197 base::Bind(&MCSClient::HandlePacketFromWire,
198 weak_ptr_factory_.GetWeakPtr()),
199 base::Bind(&MCSClient::MaybeSendMessage,
200 weak_ptr_factory_.GetWeakPtr()));
202 stream_id_out_ = 1; // Login request is hardcoded to id 1.
204 android_id_ = load_result->device_android_id;
205 security_token_ = load_result->device_security_token;
207 if (android_id_ == 0) {
208 DVLOG(1) << "No device credentials found, assuming new client.";
209 // No need to try and load RMQ data in that case.
210 return;
213 // |android_id_| is non-zero, so should |security_token_|.
214 DCHECK_NE(0u, security_token_) << "Security token invalid, while android id"
215 << " is non-zero.";
217 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size()
218 << " incoming acks pending and "
219 << load_result->outgoing_messages.size()
220 << " outgoing messages pending.";
222 restored_unackeds_server_ids_ = load_result->incoming_messages;
224 // First go through and order the outgoing messages by recency.
225 std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
226 std::vector<PersistentId> expired_ttl_ids;
227 for (GCMStore::OutgoingMessageMap::iterator iter =
228 load_result->outgoing_messages.begin();
229 iter != load_result->outgoing_messages.end(); ++iter) {
230 uint64 timestamp = 0;
231 if (!base::StringToUint64(iter->first, &timestamp)) {
232 LOG(ERROR) << "Invalid restored message.";
233 // TODO(fgorski): Error: data unreadable
234 mcs_error_callback_.Run();
235 return;
238 // Check if the TTL has expired for this message.
239 if (HasTTLExpired(*iter->second, clock_)) {
240 expired_ttl_ids.push_back(iter->first);
241 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED);
242 continue;
245 ordered_messages[timestamp] = iter->second.release();
248 if (!expired_ttl_ids.empty()) {
249 gcm_store_->RemoveOutgoingMessages(
250 expired_ttl_ids,
251 base::Bind(&MCSClient::OnGCMUpdateFinished,
252 weak_ptr_factory_.GetWeakPtr()));
255 // Now go through and add the outgoing messages to the send queue in their
256 // appropriate order (oldest at front, most recent at back).
257 for (std::map<uint64, google::protobuf::MessageLite*>::iterator
258 iter = ordered_messages.begin();
259 iter != ordered_messages.end(); ++iter) {
260 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
261 packet_info->protobuf.reset(iter->second);
262 packet_info->tag = GetMCSProtoTag(*iter->second);
263 packet_info->persistent_id = base::Uint64ToString(iter->first);
264 to_send_.push_back(make_linked_ptr(packet_info));
266 if (packet_info->tag == kDataMessageStanzaTag) {
267 mcs_proto::DataMessageStanza* data_message =
268 reinterpret_cast<mcs_proto::DataMessageStanza*>(
269 packet_info->protobuf.get());
270 CollapseKey collapse_key(*data_message);
271 if (collapse_key.IsValid())
272 collapse_key_map_[collapse_key] = packet_info;
277 void MCSClient::Login(uint64 android_id, uint64 security_token) {
278 DCHECK_EQ(state_, LOADED);
279 DCHECK(android_id_ == 0 || android_id_ == android_id);
280 DCHECK(security_token_ == 0 || security_token_ == security_token);
282 if (android_id != android_id_ && security_token != security_token_) {
283 DCHECK(android_id);
284 DCHECK(security_token);
285 android_id_ = android_id;
286 security_token_ = security_token;
289 DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty());
291 state_ = CONNECTING;
292 connection_factory_->Connect();
293 connection_handler_ = connection_factory_->GetConnectionHandler();
296 void MCSClient::SendMessage(const MCSMessage& message) {
297 int ttl = GetTTL(message.GetProtobuf());
298 DCHECK_GE(ttl, 0);
299 if (to_send_.size() > kMaxSendQueueSize) {
300 NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED);
301 return;
303 if (message.size() > kMaxMessageBytes) {
304 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
305 return;
308 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
309 packet_info->tag = message.tag();
310 packet_info->protobuf = message.CloneProtobuf();
312 if (ttl > 0) {
313 DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
315 // First check if this message should replace a pending message with the
316 // same collapse key.
317 mcs_proto::DataMessageStanza* data_message =
318 reinterpret_cast<mcs_proto::DataMessageStanza*>(
319 packet_info->protobuf.get());
320 CollapseKey collapse_key(*data_message);
321 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
322 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
323 DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
324 << original_packet->persistent_id;
325 original_packet->protobuf = packet_info->protobuf.Pass();
326 SetPersistentId(original_packet->persistent_id,
327 original_packet->protobuf.get());
328 gcm_store_->OverwriteOutgoingMessage(
329 original_packet->persistent_id,
330 message,
331 base::Bind(&MCSClient::OnGCMUpdateFinished,
332 weak_ptr_factory_.GetWeakPtr()));
334 // The message is already queued, return.
335 return;
336 } else {
337 PersistentId persistent_id = GetNextPersistentId();
338 DVLOG(1) << "Setting persistent id to " << persistent_id;
339 packet_info->persistent_id = persistent_id;
340 SetPersistentId(persistent_id, packet_info->protobuf.get());
341 if (!gcm_store_->AddOutgoingMessage(
342 persistent_id,
343 MCSMessage(message.tag(), *(packet_info->protobuf)),
344 base::Bind(&MCSClient::OnGCMUpdateFinished,
345 weak_ptr_factory_.GetWeakPtr()))) {
346 NotifyMessageSendStatus(message.GetProtobuf(),
347 APP_QUEUE_SIZE_LIMIT_REACHED);
348 return;
352 if (collapse_key.IsValid())
353 collapse_key_map_[collapse_key] = packet_info.get();
354 } else if (!connection_factory_->IsEndpointReachable()) {
355 DVLOG(1) << "No active connection, dropping message.";
356 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
357 return;
360 to_send_.push_back(make_linked_ptr(packet_info.release()));
362 // Notify that the messages has been succsfully queued for sending.
363 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
364 NotifyMessageSendStatus(message.GetProtobuf(), QUEUED);
366 MaybeSendMessage();
369 void MCSClient::UpdateHeartbeatTimer(scoped_ptr<base::Timer> timer) {
370 heartbeat_manager_.UpdateHeartbeatTimer(timer.Pass());
373 void MCSClient::ResetStateAndBuildLoginRequest(
374 mcs_proto::LoginRequest* request) {
375 DCHECK(android_id_);
376 DCHECK(security_token_);
377 stream_id_in_ = 0;
378 stream_id_out_ = 1;
379 last_device_to_server_stream_id_received_ = 0;
380 last_server_to_device_stream_id_received_ = 0;
382 heartbeat_manager_.Stop();
384 // Add any pending acknowledgments to the list of ids.
385 for (StreamIdToPersistentIdMap::const_iterator iter =
386 unacked_server_ids_.begin();
387 iter != unacked_server_ids_.end(); ++iter) {
388 restored_unackeds_server_ids_.push_back(iter->second);
390 unacked_server_ids_.clear();
392 // Any acknowledged server ids which have not been confirmed by the server
393 // are treated like unacknowledged ids.
394 for (std::map<StreamId, PersistentIdList>::const_iterator iter =
395 acked_server_ids_.begin();
396 iter != acked_server_ids_.end(); ++iter) {
397 restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
398 iter->second.begin(),
399 iter->second.end());
401 acked_server_ids_.clear();
403 // Then build the request, consuming all pending acknowledgments.
404 request->Swap(BuildLoginRequest(android_id_,
405 security_token_,
406 version_string_).get());
407 for (PersistentIdList::const_iterator iter =
408 restored_unackeds_server_ids_.begin();
409 iter != restored_unackeds_server_ids_.end(); ++iter) {
410 request->add_received_persistent_id(*iter);
412 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
413 restored_unackeds_server_ids_.clear();
415 // Push all unacknowledged messages to front of send queue. No need to save
416 // to RMQ, as all messages that reach this point should already have been
417 // saved as necessary.
418 while (!to_resend_.empty()) {
419 to_send_.push_front(to_resend_.back());
420 to_resend_.pop_back();
423 // Drop all TTL == 0 or expired TTL messages from the queue.
424 std::deque<MCSPacketInternal> new_to_send;
425 std::vector<PersistentId> expired_ttl_ids;
426 while (!to_send_.empty()) {
427 MCSPacketInternal packet = PopMessageForSend();
428 if (GetTTL(*packet->protobuf) > 0 &&
429 !HasTTLExpired(*packet->protobuf, clock_)) {
430 new_to_send.push_back(packet);
431 } else {
432 // If the TTL was 0 there is no persistent id, so no need to remove the
433 // message from the persistent store.
434 if (!packet->persistent_id.empty())
435 expired_ttl_ids.push_back(packet->persistent_id);
436 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
440 if (!expired_ttl_ids.empty()) {
441 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
442 << " messages expired.";
443 gcm_store_->RemoveOutgoingMessages(
444 expired_ttl_ids,
445 base::Bind(&MCSClient::OnGCMUpdateFinished,
446 weak_ptr_factory_.GetWeakPtr()));
449 to_send_.swap(new_to_send);
451 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
452 << " incoming acks pending, and " << to_send_.size()
453 << " pending outgoing messages.";
455 state_ = CONNECTING;
458 void MCSClient::SendHeartbeat() {
459 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
462 void MCSClient::OnGCMUpdateFinished(bool success) {
463 LOG_IF(ERROR, !success) << "GCM Update failed!";
464 UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success);
465 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
468 void MCSClient::MaybeSendMessage() {
469 if (to_send_.empty())
470 return;
472 // If the connection has been reset, do nothing. On reconnection
473 // MaybeSendMessage will be automatically invoked again.
474 // TODO(zea): consider doing TTL expiration at connection reset time, rather
475 // than reconnect time.
476 if (!connection_factory_->IsEndpointReachable())
477 return;
479 MCSPacketInternal packet = PopMessageForSend();
480 if (HasTTLExpired(*packet->protobuf, clock_)) {
481 DCHECK(!packet->persistent_id.empty());
482 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
483 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
484 gcm_store_->RemoveOutgoingMessage(
485 packet->persistent_id,
486 base::Bind(&MCSClient::OnGCMUpdateFinished,
487 weak_ptr_factory_.GetWeakPtr()));
488 base::MessageLoop::current()->PostTask(
489 FROM_HERE,
490 base::Bind(&MCSClient::MaybeSendMessage,
491 weak_ptr_factory_.GetWeakPtr()));
492 return;
494 DVLOG(1) << "Pending output message found, sending.";
495 if (!packet->persistent_id.empty())
496 to_resend_.push_back(packet);
497 SendPacketToWire(packet.get());
500 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
501 packet_info->stream_id = ++stream_id_out_;
502 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
504 // Set the queued time as necessary.
505 if (packet_info->tag == kDataMessageStanzaTag) {
506 mcs_proto::DataMessageStanza* data_message =
507 reinterpret_cast<mcs_proto::DataMessageStanza*>(
508 packet_info->protobuf.get());
509 uint64 sent = data_message->sent();
510 DCHECK_GT(sent, 0U);
511 int queued = (clock_->Now().ToInternalValue() /
512 base::Time::kMicrosecondsPerSecond) - sent;
513 DVLOG(1) << "Message was queued for " << queued << " seconds.";
514 data_message->set_queued(queued);
515 recorder_->RecordDataSentToWire(
516 data_message->category(),
517 data_message->to(),
518 data_message->id(),
519 queued);
522 // Set the proper last received stream id to acknowledge received server
523 // packets.
524 DVLOG(1) << "Setting last stream id received to "
525 << stream_id_in_;
526 SetLastStreamIdReceived(stream_id_in_,
527 packet_info->protobuf.get());
528 if (stream_id_in_ != last_server_to_device_stream_id_received_) {
529 last_server_to_device_stream_id_received_ = stream_id_in_;
530 // Mark all acknowledged server messages as such. Note: they're not dropped,
531 // as it may be that they'll need to be re-acked if this message doesn't
532 // make it.
533 PersistentIdList persistent_id_list;
534 for (StreamIdToPersistentIdMap::const_iterator iter =
535 unacked_server_ids_.begin();
536 iter != unacked_server_ids_.end(); ++iter) {
537 DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
538 persistent_id_list.push_back(iter->second);
540 unacked_server_ids_.clear();
541 acked_server_ids_[stream_id_out_] = persistent_id_list;
544 connection_handler_->SendMessage(*packet_info->protobuf);
547 void MCSClient::HandleMCSDataMesssage(
548 scoped_ptr<google::protobuf::MessageLite> protobuf) {
549 mcs_proto::DataMessageStanza* data_message =
550 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
551 // TODO(zea): implement a proper status manager rather than hardcoding these
552 // values.
553 scoped_ptr<mcs_proto::DataMessageStanza> response(
554 new mcs_proto::DataMessageStanza());
555 response->set_from(kGCMFromField);
556 response->set_sent(clock_->Now().ToInternalValue() /
557 base::Time::kMicrosecondsPerSecond);
558 response->set_ttl(0);
559 bool send = false;
560 for (int i = 0; i < data_message->app_data_size(); ++i) {
561 const mcs_proto::AppData& app_data = data_message->app_data(i);
562 if (app_data.key() == kIdleNotification) {
563 // Tell the MCS server the client is not idle.
564 send = true;
565 mcs_proto::AppData data;
566 data.set_key(kIdleNotification);
567 data.set_value("false");
568 response->add_app_data()->CopyFrom(data);
569 response->set_category(kMCSCategory);
573 if (send) {
574 SendMessage(MCSMessage(kDataMessageStanzaTag, response.Pass()));
578 void MCSClient::HandlePacketFromWire(
579 scoped_ptr<google::protobuf::MessageLite> protobuf) {
580 if (!protobuf.get())
581 return;
582 uint8 tag = GetMCSProtoTag(*protobuf);
583 PersistentId persistent_id = GetPersistentId(*protobuf);
584 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
586 if (last_stream_id_received != 0) {
587 last_device_to_server_stream_id_received_ = last_stream_id_received;
589 // Process device to server messages that have now been acknowledged by the
590 // server. Because messages are stored in order, just pop off all that have
591 // a stream id lower than server's last received stream id.
592 HandleStreamAck(last_stream_id_received);
594 // Process server_to_device_messages that the server now knows were
595 // acknowledged. Again, they're in order, so just keep going until the
596 // stream id is reached.
597 StreamIdList acked_stream_ids_to_remove;
598 for (std::map<StreamId, PersistentIdList>::iterator iter =
599 acked_server_ids_.begin();
600 iter != acked_server_ids_.end() &&
601 iter->first <= last_stream_id_received; ++iter) {
602 acked_stream_ids_to_remove.push_back(iter->first);
604 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
605 iter != acked_stream_ids_to_remove.end(); ++iter) {
606 acked_server_ids_.erase(*iter);
610 ++stream_id_in_;
611 if (!persistent_id.empty()) {
612 unacked_server_ids_[stream_id_in_] = persistent_id;
613 gcm_store_->AddIncomingMessage(persistent_id,
614 base::Bind(&MCSClient::OnGCMUpdateFinished,
615 weak_ptr_factory_.GetWeakPtr()));
618 DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
619 << " with persistent id "
620 << (persistent_id.empty() ? "NULL" : persistent_id)
621 << ", stream id " << stream_id_in_ << " and last stream id received "
622 << last_stream_id_received;
624 if (unacked_server_ids_.size() > 0 &&
625 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
626 SendMessage(MCSMessage(kIqStanzaTag, BuildStreamAck()));
629 // The connection is alive, treat this message as a heartbeat ack.
630 heartbeat_manager_.OnHeartbeatAcked();
632 switch (tag) {
633 case kLoginResponseTag: {
634 DCHECK_EQ(CONNECTING, state_);
635 mcs_proto::LoginResponse* login_response =
636 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
637 DVLOG(1) << "Received login response:";
638 DVLOG(1) << " Id: " << login_response->id();
639 DVLOG(1) << " Timestamp: " << login_response->server_timestamp();
640 if (login_response->has_error() && login_response->error().code() != 0) {
641 state_ = UNINITIALIZED;
642 DVLOG(1) << " Error code: " << login_response->error().code();
643 DVLOG(1) << " Error message: " << login_response->error().message();
644 LOG(ERROR) << "Failed to log in to GCM, resetting connection.";
645 connection_factory_->SignalConnectionReset(
646 ConnectionFactory::LOGIN_FAILURE);
647 mcs_error_callback_.Run();
648 return;
651 if (login_response->has_heartbeat_config()) {
652 heartbeat_manager_.UpdateHeartbeatConfig(
653 login_response->heartbeat_config());
656 state_ = CONNECTED;
657 stream_id_in_ = 1; // To account for the login response.
658 DCHECK_EQ(1U, stream_id_out_);
660 // Pass the login response on up.
661 base::MessageLoop::current()->PostTask(
662 FROM_HERE,
663 base::Bind(message_received_callback_,
664 MCSMessage(tag, protobuf.Pass())));
666 // If there are pending messages, attempt to send one.
667 if (!to_send_.empty()) {
668 base::MessageLoop::current()->PostTask(
669 FROM_HERE,
670 base::Bind(&MCSClient::MaybeSendMessage,
671 weak_ptr_factory_.GetWeakPtr()));
674 heartbeat_manager_.Start(
675 base::Bind(&MCSClient::SendHeartbeat,
676 weak_ptr_factory_.GetWeakPtr()),
677 base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
678 weak_ptr_factory_.GetWeakPtr()));
679 return;
681 case kHeartbeatPingTag:
682 DCHECK_GE(stream_id_in_, 1U);
683 DVLOG(1) << "Received heartbeat ping, sending ack.";
684 SendMessage(
685 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
686 return;
687 case kHeartbeatAckTag:
688 DCHECK_GE(stream_id_in_, 1U);
689 DVLOG(1) << "Received heartbeat ack.";
690 // Do nothing else, all messages act as heartbeat acks.
691 return;
692 case kCloseTag:
693 LOG(ERROR) << "Received close command, resetting connection.";
694 state_ = LOADED;
695 connection_factory_->SignalConnectionReset(
696 ConnectionFactory::CLOSE_COMMAND);
697 return;
698 case kIqStanzaTag: {
699 DCHECK_GE(stream_id_in_, 1U);
700 mcs_proto::IqStanza* iq_stanza =
701 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
702 const mcs_proto::Extension& iq_extension = iq_stanza->extension();
703 switch (iq_extension.id()) {
704 case kSelectiveAck: {
705 PersistentIdList acked_ids;
706 if (BuildPersistentIdListFromProto(iq_extension.data(),
707 &acked_ids)) {
708 HandleSelectiveAck(acked_ids);
710 return;
712 case kStreamAck:
713 // Do nothing. The last received stream id is always processed if it's
714 // present.
715 return;
716 default:
717 LOG(WARNING) << "Received invalid iq stanza extension "
718 << iq_extension.id();
719 return;
722 case kDataMessageStanzaTag: {
723 DCHECK_GE(stream_id_in_, 1U);
724 mcs_proto::DataMessageStanza* data_message =
725 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
726 if (data_message->category() == kMCSCategory) {
727 HandleMCSDataMesssage(protobuf.Pass());
728 return;
731 DCHECK(protobuf.get());
732 base::MessageLoop::current()->PostTask(
733 FROM_HERE,
734 base::Bind(message_received_callback_,
735 MCSMessage(tag, protobuf.Pass())));
736 return;
738 default:
739 LOG(ERROR) << "Received unexpected message of type "
740 << static_cast<int>(tag);
741 return;
745 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
746 PersistentIdList acked_outgoing_persistent_ids;
747 StreamIdList acked_outgoing_stream_ids;
748 while (!to_resend_.empty() &&
749 to_resend_.front()->stream_id <= last_stream_id_received) {
750 const MCSPacketInternal& outgoing_packet = to_resend_.front();
751 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
752 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
753 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
754 to_resend_.pop_front();
757 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
758 << " outgoing messages, " << to_resend_.size()
759 << " remaining unacked";
760 gcm_store_->RemoveOutgoingMessages(
761 acked_outgoing_persistent_ids,
762 base::Bind(&MCSClient::OnGCMUpdateFinished,
763 weak_ptr_factory_.GetWeakPtr()));
765 HandleServerConfirmedReceipt(last_stream_id_received);
768 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
769 std::set<PersistentId> remaining_ids(id_list.begin(), id_list.end());
771 StreamId last_stream_id_received = 0;
773 // First check the to_resend_ queue. Acknowledgments are always contiguous,
774 // so if there's a pending message that hasn't been acked, all newer messages
775 // must also be unacked.
776 while(!to_resend_.empty() && !remaining_ids.empty()) {
777 const MCSPacketInternal& outgoing_packet = to_resend_.front();
778 if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
779 break; // Newer message must be unacked too.
780 remaining_ids.erase(outgoing_packet->persistent_id);
781 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
783 // No need to re-acknowledge any server messages this message already
784 // acknowledged.
785 StreamId device_stream_id = outgoing_packet->stream_id;
786 if (device_stream_id > last_stream_id_received)
787 last_stream_id_received = device_stream_id;
788 to_resend_.pop_front();
791 // If the acknowledged ids aren't all there, they might be in the to_send_
792 // queue (typically when a SelectiveAck confirms messages as part of a login
793 // response).
794 while (!to_send_.empty() && !remaining_ids.empty()) {
795 const MCSPacketInternal& outgoing_packet = to_send_.front();
796 if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
797 break; // Newer messages must be unacked too.
798 remaining_ids.erase(outgoing_packet->persistent_id);
799 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
801 // No need to re-acknowledge any server messages this message already
802 // acknowledged.
803 StreamId device_stream_id = outgoing_packet->stream_id;
804 if (device_stream_id > last_stream_id_received)
805 last_stream_id_received = device_stream_id;
806 PopMessageForSend();
809 // Only handle the largest stream id value. All other stream ids are
810 // implicitly handled.
811 if (last_stream_id_received > 0)
812 HandleServerConfirmedReceipt(last_stream_id_received);
814 // At this point, all remaining acked ids are redundant.
815 PersistentIdList acked_ids;
816 if (remaining_ids.size() > 0) {
817 for (size_t i = 0; i < id_list.size(); ++i) {
818 if (remaining_ids.count(id_list[i]) > 0)
819 continue;
820 acked_ids.push_back(id_list[i]);
822 } else {
823 acked_ids = id_list;
826 DVLOG(1) << "Server acked " << acked_ids.size()
827 << " messages, " << to_resend_.size() << " remaining unacked.";
828 gcm_store_->RemoveOutgoingMessages(
829 acked_ids,
830 base::Bind(&MCSClient::OnGCMUpdateFinished,
831 weak_ptr_factory_.GetWeakPtr()));
833 // Resend any remaining outgoing messages, as they were not received by the
834 // server.
835 DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
836 while (!to_resend_.empty()) {
837 to_send_.push_front(to_resend_.back());
838 to_resend_.pop_back();
840 base::MessageLoop::current()->PostTask(
841 FROM_HERE,
842 base::Bind(&MCSClient::MaybeSendMessage,
843 weak_ptr_factory_.GetWeakPtr()));
846 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
847 PersistentIdList acked_incoming_ids;
848 for (std::map<StreamId, PersistentIdList>::iterator iter =
849 acked_server_ids_.begin();
850 iter != acked_server_ids_.end() &&
851 iter->first <= device_stream_id;) {
852 acked_incoming_ids.insert(acked_incoming_ids.end(),
853 iter->second.begin(),
854 iter->second.end());
855 acked_server_ids_.erase(iter++);
858 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
859 << " acknowledged server messages.";
860 gcm_store_->RemoveIncomingMessages(
861 acked_incoming_ids,
862 base::Bind(&MCSClient::OnGCMUpdateFinished,
863 weak_ptr_factory_.GetWeakPtr()));
866 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
867 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
870 void MCSClient::OnConnectionResetByHeartbeat() {
871 connection_factory_->SignalConnectionReset(
872 ConnectionFactory::HEARTBEAT_FAILURE);
875 void MCSClient::NotifyMessageSendStatus(
876 const google::protobuf::MessageLite& protobuf,
877 MessageSendStatus status) {
878 if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag)
879 return;
881 const mcs_proto::DataMessageStanza* data_message_stanza =
882 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
883 recorder_->RecordNotifySendStatus(
884 data_message_stanza->category(),
885 data_message_stanza->to(),
886 data_message_stanza->id(),
887 status,
888 protobuf.ByteSize(),
889 data_message_stanza->ttl());
890 message_sent_callback_.Run(
891 data_message_stanza->device_user_id(),
892 data_message_stanza->category(),
893 data_message_stanza->id(),
894 status);
897 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
898 MCSPacketInternal packet = to_send_.front();
899 to_send_.pop_front();
901 if (packet->tag == kDataMessageStanzaTag) {
902 mcs_proto::DataMessageStanza* data_message =
903 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
904 CollapseKey collapse_key(*data_message);
905 if (collapse_key.IsValid())
906 collapse_key_map_.erase(collapse_key);
909 return packet;
912 } // namespace gcm