Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / google_apis / gcm / engine / mcs_client.cc
blobf20d93777367010d428279df0620eae78476ccb4
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/engine/mcs_client.h"
7 #include <set>
9 #include "base/basictypes.h"
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/metrics/histogram.h"
13 #include "base/strings/string_number_conversions.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "base/time/clock.h"
16 #include "base/time/time.h"
17 #include "base/timer/timer.h"
18 #include "google_apis/gcm/base/mcs_util.h"
19 #include "google_apis/gcm/base/socket_stream.h"
20 #include "google_apis/gcm/engine/connection_factory.h"
21 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h"
23 using namespace google::protobuf::io;
25 namespace gcm {
27 namespace {
29 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
31 // The category of messages intended for the GCM client itself from MCS.
32 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
34 // The from field for messages originating in the GCM client.
35 const char kGCMFromField[] = "gcm@android.com";
37 // MCS status message types.
38 // TODO(zea): handle these at the GCMClient layer.
39 const char kIdleNotification[] = "IdleNotification";
40 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
41 // const char kPowerNotification[] = "PowerNotification";
42 // const char kDataActiveNotification[] = "DataActiveNotification";
44 // Settings for MCS Login packet.
45 const char kHeartbeatIntervalSettingName[] = "hbping";
46 const int kNoCustomHeartbeat = 0;
48 // The number of unacked messages to allow before sending a stream ack.
49 // Applies to both incoming and outgoing messages.
50 // TODO(zea): make this server configurable.
51 const int kUnackedMessageBeforeStreamAck = 10;
53 // The global maximum number of pending messages to have in the send queue.
54 const size_t kMaxSendQueueSize = 10 * 1024;
56 // The maximum message size that can be sent to the server.
57 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server.
59 // Helper for converting a proto persistent id list to a vector of strings.
60 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
61 std::vector<std::string>* id_list) {
62 mcs_proto::SelectiveAck selective_ack;
63 if (!selective_ack.ParseFromString(bytes))
64 return false;
65 std::vector<std::string> new_list;
66 for (int i = 0; i < selective_ack.id_size(); ++i) {
67 DCHECK(!selective_ack.id(i).empty());
68 new_list.push_back(selective_ack.id(i));
70 id_list->swap(new_list);
71 return true;
74 } // namespace
76 class CollapseKey {
77 public:
78 explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
79 ~CollapseKey();
81 // Comparison operator for use in maps.
82 bool operator<(const CollapseKey& right) const;
84 // Whether the message had a valid collapse key.
85 bool IsValid() const;
87 std::string token() const { return token_; }
88 std::string app_id() const { return app_id_; }
89 int64 device_user_id() const { return device_user_id_; }
91 private:
92 const std::string token_;
93 const std::string app_id_;
94 const int64 device_user_id_;
97 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
98 : token_(message.token()),
99 app_id_(message.category()),
100 device_user_id_(message.device_user_id()) {}
102 CollapseKey::~CollapseKey() {}
104 bool CollapseKey::IsValid() const {
105 // Device user id is optional, but the application id and token are not.
106 return !token_.empty() && !app_id_.empty();
109 bool CollapseKey::operator<(const CollapseKey& right) const {
110 if (device_user_id_ != right.device_user_id())
111 return device_user_id_ < right.device_user_id();
112 if (app_id_ != right.app_id())
113 return app_id_ < right.app_id();
114 return token_ < right.token();
117 struct ReliablePacketInfo {
118 ReliablePacketInfo();
119 ~ReliablePacketInfo();
121 // The stream id with which the message was sent.
122 uint32 stream_id;
124 // If reliable delivery was requested, the persistent id of the message.
125 std::string persistent_id;
127 // The type of message itself (for easier lookup).
128 uint8 tag;
130 // The protobuf of the message itself.
131 MCSProto protobuf;
134 ReliablePacketInfo::ReliablePacketInfo()
135 : stream_id(0), tag(0) {
137 ReliablePacketInfo::~ReliablePacketInfo() {}
139 int MCSClient::GetSendQueueSize() const {
140 return to_send_.size();
143 int MCSClient::GetResendQueueSize() const {
144 return to_resend_.size();
147 std::string MCSClient::GetStateString() const {
148 switch(state_) {
149 case UNINITIALIZED:
150 return "UNINITIALIZED";
151 case LOADED:
152 return "LOADED";
153 case CONNECTING:
154 return "CONNECTING";
155 case CONNECTED:
156 return "CONNECTED";
157 default:
158 NOTREACHED();
159 return std::string();
163 MCSClient::MCSClient(const std::string& version_string,
164 base::Clock* clock,
165 ConnectionFactory* connection_factory,
166 GCMStore* gcm_store,
167 GCMStatsRecorder* recorder)
168 : version_string_(version_string),
169 clock_(clock),
170 state_(UNINITIALIZED),
171 android_id_(0),
172 security_token_(0),
173 connection_factory_(connection_factory),
174 connection_handler_(NULL),
175 last_device_to_server_stream_id_received_(0),
176 last_server_to_device_stream_id_received_(0),
177 stream_id_out_(0),
178 stream_id_in_(0),
179 gcm_store_(gcm_store),
180 recorder_(recorder),
181 weak_ptr_factory_(this) {
184 MCSClient::~MCSClient() {
187 void MCSClient::Initialize(
188 const ErrorCallback& error_callback,
189 const OnMessageReceivedCallback& message_received_callback,
190 const OnMessageSentCallback& message_sent_callback,
191 scoped_ptr<GCMStore::LoadResult> load_result) {
192 DCHECK_EQ(state_, UNINITIALIZED);
194 state_ = LOADED;
195 mcs_error_callback_ = error_callback;
196 message_received_callback_ = message_received_callback;
197 message_sent_callback_ = message_sent_callback;
199 connection_factory_->Initialize(
200 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
201 weak_ptr_factory_.GetWeakPtr()),
202 base::Bind(&MCSClient::HandlePacketFromWire,
203 weak_ptr_factory_.GetWeakPtr()),
204 base::Bind(&MCSClient::MaybeSendMessage,
205 weak_ptr_factory_.GetWeakPtr()));
207 stream_id_out_ = 1; // Login request is hardcoded to id 1.
209 android_id_ = load_result->device_android_id;
210 security_token_ = load_result->device_security_token;
212 if (android_id_ == 0) {
213 DVLOG(1) << "No device credentials found, assuming new client.";
214 // No need to try and load RMQ data in that case.
215 return;
218 // |android_id_| is non-zero, so should |security_token_|.
219 DCHECK_NE(0u, security_token_) << "Security token invalid, while android id"
220 << " is non-zero.";
222 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size()
223 << " incoming acks pending and "
224 << load_result->outgoing_messages.size()
225 << " outgoing messages pending.";
227 restored_unackeds_server_ids_ = load_result->incoming_messages;
229 // First go through and order the outgoing messages by recency.
230 std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
231 std::vector<PersistentId> expired_ttl_ids;
232 for (GCMStore::OutgoingMessageMap::iterator iter =
233 load_result->outgoing_messages.begin();
234 iter != load_result->outgoing_messages.end(); ++iter) {
235 uint64 timestamp = 0;
236 if (!base::StringToUint64(iter->first, &timestamp)) {
237 LOG(ERROR) << "Invalid restored message.";
238 // TODO(fgorski): Error: data unreadable
239 mcs_error_callback_.Run();
240 return;
243 // Check if the TTL has expired for this message.
244 if (HasTTLExpired(*iter->second, clock_)) {
245 expired_ttl_ids.push_back(iter->first);
246 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED);
247 continue;
250 ordered_messages[timestamp] = iter->second.release();
253 if (!expired_ttl_ids.empty()) {
254 gcm_store_->RemoveOutgoingMessages(
255 expired_ttl_ids,
256 base::Bind(&MCSClient::OnGCMUpdateFinished,
257 weak_ptr_factory_.GetWeakPtr()));
260 // Now go through and add the outgoing messages to the send queue in their
261 // appropriate order (oldest at front, most recent at back).
262 for (std::map<uint64, google::protobuf::MessageLite*>::iterator
263 iter = ordered_messages.begin();
264 iter != ordered_messages.end(); ++iter) {
265 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
266 packet_info->protobuf.reset(iter->second);
267 packet_info->tag = GetMCSProtoTag(*iter->second);
268 packet_info->persistent_id = base::Uint64ToString(iter->first);
269 to_send_.push_back(make_linked_ptr(packet_info));
271 if (packet_info->tag == kDataMessageStanzaTag) {
272 mcs_proto::DataMessageStanza* data_message =
273 reinterpret_cast<mcs_proto::DataMessageStanza*>(
274 packet_info->protobuf.get());
275 CollapseKey collapse_key(*data_message);
276 if (collapse_key.IsValid())
277 collapse_key_map_[collapse_key] = packet_info;
281 // Establish if there is any custom client interval persisted from the last
282 // run and set it on the heartbeat manager.
283 custom_heartbeat_intervals_.swap(load_result->heartbeat_intervals);
284 int min_interval_ms = GetMinHeartbeatIntervalMs();
285 heartbeat_manager_.SetClientHeartbeatIntervalMs(min_interval_ms);
288 void MCSClient::Login(uint64 android_id, uint64 security_token) {
289 DCHECK_EQ(state_, LOADED);
290 DCHECK(android_id_ == 0 || android_id_ == android_id);
291 DCHECK(security_token_ == 0 || security_token_ == security_token);
293 if (android_id != android_id_ && security_token != security_token_) {
294 DCHECK(android_id);
295 DCHECK(security_token);
296 android_id_ = android_id;
297 security_token_ = security_token;
300 DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty());
302 state_ = CONNECTING;
303 connection_factory_->Connect();
304 connection_handler_ = connection_factory_->GetConnectionHandler();
307 void MCSClient::SendMessage(const MCSMessage& message) {
308 int ttl = GetTTL(message.GetProtobuf());
309 DCHECK_GE(ttl, 0);
310 if (to_send_.size() > kMaxSendQueueSize) {
311 NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED);
312 return;
314 if (message.size() > kMaxMessageBytes) {
315 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
316 return;
319 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
320 packet_info->tag = message.tag();
321 packet_info->protobuf = message.CloneProtobuf();
323 if (ttl > 0) {
324 DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
326 // First check if this message should replace a pending message with the
327 // same collapse key.
328 mcs_proto::DataMessageStanza* data_message =
329 reinterpret_cast<mcs_proto::DataMessageStanza*>(
330 packet_info->protobuf.get());
331 CollapseKey collapse_key(*data_message);
332 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
333 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
334 DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
335 << original_packet->persistent_id;
336 original_packet->protobuf = packet_info->protobuf.Pass();
337 SetPersistentId(original_packet->persistent_id,
338 original_packet->protobuf.get());
339 gcm_store_->OverwriteOutgoingMessage(
340 original_packet->persistent_id,
341 message,
342 base::Bind(&MCSClient::OnGCMUpdateFinished,
343 weak_ptr_factory_.GetWeakPtr()));
345 // The message is already queued, return.
346 return;
347 } else {
348 PersistentId persistent_id = GetNextPersistentId();
349 DVLOG(1) << "Setting persistent id to " << persistent_id;
350 packet_info->persistent_id = persistent_id;
351 SetPersistentId(persistent_id, packet_info->protobuf.get());
352 if (!gcm_store_->AddOutgoingMessage(
353 persistent_id,
354 MCSMessage(message.tag(), *(packet_info->protobuf)),
355 base::Bind(&MCSClient::OnGCMUpdateFinished,
356 weak_ptr_factory_.GetWeakPtr()))) {
357 NotifyMessageSendStatus(message.GetProtobuf(),
358 APP_QUEUE_SIZE_LIMIT_REACHED);
359 return;
363 if (collapse_key.IsValid())
364 collapse_key_map_[collapse_key] = packet_info.get();
365 } else if (!connection_factory_->IsEndpointReachable()) {
366 DVLOG(1) << "No active connection, dropping message.";
367 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
368 return;
371 to_send_.push_back(make_linked_ptr(packet_info.release()));
373 // Notify that the messages has been succsfully queued for sending.
374 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
375 NotifyMessageSendStatus(message.GetProtobuf(), QUEUED);
377 MaybeSendMessage();
380 void MCSClient::UpdateHeartbeatTimer(scoped_ptr<base::Timer> timer) {
381 heartbeat_manager_.UpdateHeartbeatTimer(timer.Pass());
384 void MCSClient::AddHeartbeatInterval(const std::string& scope,
385 int interval_ms) {
386 if (!heartbeat_manager_.IsValidClientHeartbeatInterval(interval_ms))
387 return;
389 custom_heartbeat_intervals_[scope] = interval_ms;
390 gcm_store_->AddHeartbeatInterval(scope, interval_ms,
391 base::Bind(&MCSClient::OnGCMUpdateFinished,
392 weak_ptr_factory_.GetWeakPtr()));
394 int min_interval_ms = GetMinHeartbeatIntervalMs();
395 heartbeat_manager_.SetClientHeartbeatIntervalMs(min_interval_ms);
398 void MCSClient::RemoveHeartbeatInterval(const std::string& scope) {
399 custom_heartbeat_intervals_.erase(scope);
400 gcm_store_->RemoveHeartbeatInterval(
401 scope, base::Bind(&MCSClient::OnGCMUpdateFinished,
402 weak_ptr_factory_.GetWeakPtr()));
404 int min_interval = GetMinHeartbeatIntervalMs();
405 heartbeat_manager_.SetClientHeartbeatIntervalMs(min_interval);
408 int MCSClient::GetMinHeartbeatIntervalMs() {
409 if (custom_heartbeat_intervals_.empty())
410 return kNoCustomHeartbeat;
412 int min_interval = custom_heartbeat_intervals_.begin()->second;
413 for (std::map<std::string, int>::const_iterator it =
414 custom_heartbeat_intervals_.begin();
415 it != custom_heartbeat_intervals_.end();
416 ++it) {
417 if (it->second < min_interval)
418 min_interval = it->second;
420 return min_interval;
423 void MCSClient::ResetStateAndBuildLoginRequest(
424 mcs_proto::LoginRequest* request) {
425 DCHECK(android_id_);
426 DCHECK(security_token_);
427 stream_id_in_ = 0;
428 stream_id_out_ = 1;
429 last_device_to_server_stream_id_received_ = 0;
430 last_server_to_device_stream_id_received_ = 0;
432 heartbeat_manager_.Stop();
434 // Add any pending acknowledgments to the list of ids.
435 for (StreamIdToPersistentIdMap::const_iterator iter =
436 unacked_server_ids_.begin();
437 iter != unacked_server_ids_.end(); ++iter) {
438 restored_unackeds_server_ids_.push_back(iter->second);
440 unacked_server_ids_.clear();
442 // Any acknowledged server ids which have not been confirmed by the server
443 // are treated like unacknowledged ids.
444 for (std::map<StreamId, PersistentIdList>::const_iterator iter =
445 acked_server_ids_.begin();
446 iter != acked_server_ids_.end(); ++iter) {
447 restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
448 iter->second.begin(),
449 iter->second.end());
451 acked_server_ids_.clear();
453 // Then build the request, consuming all pending acknowledgments.
454 request->Swap(BuildLoginRequest(android_id_,
455 security_token_,
456 version_string_).get());
458 // Set custom heartbeat interval if specified.
459 if (heartbeat_manager_.HasClientHeartbeatInterval()) {
460 // Ensure that the custom heartbeat interval is communicated to the server.
461 mcs_proto::Setting* setting = request->add_setting();
462 setting->set_name(kHeartbeatIntervalSettingName);
463 setting->set_value(base::IntToString(
464 heartbeat_manager_.GetClientHeartbeatIntervalMs()));
467 for (PersistentIdList::const_iterator iter =
468 restored_unackeds_server_ids_.begin();
469 iter != restored_unackeds_server_ids_.end(); ++iter) {
470 request->add_received_persistent_id(*iter);
472 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
473 restored_unackeds_server_ids_.clear();
475 // Push all unacknowledged messages to front of send queue. No need to save
476 // to RMQ, as all messages that reach this point should already have been
477 // saved as necessary.
478 while (!to_resend_.empty()) {
479 to_send_.push_front(to_resend_.back());
480 to_resend_.pop_back();
483 // Drop all TTL == 0 or expired TTL messages from the queue.
484 std::deque<MCSPacketInternal> new_to_send;
485 std::vector<PersistentId> expired_ttl_ids;
486 while (!to_send_.empty()) {
487 MCSPacketInternal packet = PopMessageForSend();
488 if (GetTTL(*packet->protobuf) > 0 &&
489 !HasTTLExpired(*packet->protobuf, clock_)) {
490 new_to_send.push_back(packet);
491 } else {
492 // If the TTL was 0 there is no persistent id, so no need to remove the
493 // message from the persistent store.
494 if (!packet->persistent_id.empty())
495 expired_ttl_ids.push_back(packet->persistent_id);
496 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
500 if (!expired_ttl_ids.empty()) {
501 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
502 << " messages expired.";
503 gcm_store_->RemoveOutgoingMessages(
504 expired_ttl_ids,
505 base::Bind(&MCSClient::OnGCMUpdateFinished,
506 weak_ptr_factory_.GetWeakPtr()));
509 to_send_.swap(new_to_send);
511 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
512 << " incoming acks pending, and " << to_send_.size()
513 << " pending outgoing messages.";
515 state_ = CONNECTING;
518 void MCSClient::SendHeartbeat() {
519 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
522 void MCSClient::OnGCMUpdateFinished(bool success) {
523 LOG_IF(ERROR, !success) << "GCM Update failed!";
524 UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success);
525 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
528 void MCSClient::MaybeSendMessage() {
529 if (to_send_.empty())
530 return;
532 // If the connection has been reset, do nothing. On reconnection
533 // MaybeSendMessage will be automatically invoked again.
534 // TODO(zea): consider doing TTL expiration at connection reset time, rather
535 // than reconnect time.
536 if (!connection_factory_->IsEndpointReachable())
537 return;
539 MCSPacketInternal packet = PopMessageForSend();
540 if (HasTTLExpired(*packet->protobuf, clock_)) {
541 DCHECK(!packet->persistent_id.empty());
542 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
543 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
544 gcm_store_->RemoveOutgoingMessage(
545 packet->persistent_id,
546 base::Bind(&MCSClient::OnGCMUpdateFinished,
547 weak_ptr_factory_.GetWeakPtr()));
548 base::ThreadTaskRunnerHandle::Get()->PostTask(
549 FROM_HERE,
550 base::Bind(&MCSClient::MaybeSendMessage,
551 weak_ptr_factory_.GetWeakPtr()));
552 return;
554 DVLOG(1) << "Pending output message found, sending.";
555 if (!packet->persistent_id.empty())
556 to_resend_.push_back(packet);
557 SendPacketToWire(packet.get());
560 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
561 packet_info->stream_id = ++stream_id_out_;
562 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
564 // Set the queued time as necessary.
565 if (packet_info->tag == kDataMessageStanzaTag) {
566 mcs_proto::DataMessageStanza* data_message =
567 reinterpret_cast<mcs_proto::DataMessageStanza*>(
568 packet_info->protobuf.get());
569 uint64 sent = data_message->sent();
570 DCHECK_GT(sent, 0U);
571 int queued = (clock_->Now().ToInternalValue() /
572 base::Time::kMicrosecondsPerSecond) - sent;
573 DVLOG(1) << "Message was queued for " << queued << " seconds.";
574 data_message->set_queued(queued);
575 recorder_->RecordDataSentToWire(
576 data_message->category(),
577 data_message->to(),
578 data_message->id(),
579 queued);
582 // Set the proper last received stream id to acknowledge received server
583 // packets.
584 DVLOG(1) << "Setting last stream id received to "
585 << stream_id_in_;
586 SetLastStreamIdReceived(stream_id_in_,
587 packet_info->protobuf.get());
588 if (stream_id_in_ != last_server_to_device_stream_id_received_) {
589 last_server_to_device_stream_id_received_ = stream_id_in_;
590 // Mark all acknowledged server messages as such. Note: they're not dropped,
591 // as it may be that they'll need to be re-acked if this message doesn't
592 // make it.
593 PersistentIdList persistent_id_list;
594 for (StreamIdToPersistentIdMap::const_iterator iter =
595 unacked_server_ids_.begin();
596 iter != unacked_server_ids_.end(); ++iter) {
597 DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
598 persistent_id_list.push_back(iter->second);
600 unacked_server_ids_.clear();
601 acked_server_ids_[stream_id_out_] = persistent_id_list;
604 connection_handler_->SendMessage(*packet_info->protobuf);
607 void MCSClient::HandleMCSDataMesssage(
608 scoped_ptr<google::protobuf::MessageLite> protobuf) {
609 mcs_proto::DataMessageStanza* data_message =
610 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
611 // TODO(zea): implement a proper status manager rather than hardcoding these
612 // values.
613 scoped_ptr<mcs_proto::DataMessageStanza> response(
614 new mcs_proto::DataMessageStanza());
615 response->set_from(kGCMFromField);
616 response->set_sent(clock_->Now().ToInternalValue() /
617 base::Time::kMicrosecondsPerSecond);
618 response->set_ttl(0);
619 bool send = false;
620 for (int i = 0; i < data_message->app_data_size(); ++i) {
621 const mcs_proto::AppData& app_data = data_message->app_data(i);
622 if (app_data.key() == kIdleNotification) {
623 // Tell the MCS server the client is not idle.
624 send = true;
625 mcs_proto::AppData data;
626 data.set_key(kIdleNotification);
627 data.set_value("false");
628 response->add_app_data()->CopyFrom(data);
629 response->set_category(kMCSCategory);
633 if (send) {
634 SendMessage(MCSMessage(kDataMessageStanzaTag, response.Pass()));
638 void MCSClient::HandlePacketFromWire(
639 scoped_ptr<google::protobuf::MessageLite> protobuf) {
640 if (!protobuf.get())
641 return;
642 uint8 tag = GetMCSProtoTag(*protobuf);
643 PersistentId persistent_id = GetPersistentId(*protobuf);
644 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
646 if (last_stream_id_received != 0) {
647 last_device_to_server_stream_id_received_ = last_stream_id_received;
649 // Process device to server messages that have now been acknowledged by the
650 // server. Because messages are stored in order, just pop off all that have
651 // a stream id lower than server's last received stream id.
652 HandleStreamAck(last_stream_id_received);
654 // Process server_to_device_messages that the server now knows were
655 // acknowledged. Again, they're in order, so just keep going until the
656 // stream id is reached.
657 StreamIdList acked_stream_ids_to_remove;
658 for (std::map<StreamId, PersistentIdList>::iterator iter =
659 acked_server_ids_.begin();
660 iter != acked_server_ids_.end() &&
661 iter->first <= last_stream_id_received; ++iter) {
662 acked_stream_ids_to_remove.push_back(iter->first);
664 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
665 iter != acked_stream_ids_to_remove.end(); ++iter) {
666 acked_server_ids_.erase(*iter);
670 ++stream_id_in_;
671 if (!persistent_id.empty()) {
672 unacked_server_ids_[stream_id_in_] = persistent_id;
673 gcm_store_->AddIncomingMessage(persistent_id,
674 base::Bind(&MCSClient::OnGCMUpdateFinished,
675 weak_ptr_factory_.GetWeakPtr()));
678 DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
679 << " with persistent id "
680 << (persistent_id.empty() ? "NULL" : persistent_id)
681 << ", stream id " << stream_id_in_ << " and last stream id received "
682 << last_stream_id_received;
684 if (unacked_server_ids_.size() > 0 &&
685 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
686 SendMessage(MCSMessage(kIqStanzaTag, BuildStreamAck()));
689 // The connection is alive, treat this message as a heartbeat ack.
690 heartbeat_manager_.OnHeartbeatAcked();
692 switch (tag) {
693 case kLoginResponseTag: {
694 DCHECK_EQ(CONNECTING, state_);
695 mcs_proto::LoginResponse* login_response =
696 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
697 DVLOG(1) << "Received login response:";
698 DVLOG(1) << " Id: " << login_response->id();
699 DVLOG(1) << " Timestamp: " << login_response->server_timestamp();
700 if (login_response->has_error() && login_response->error().code() != 0) {
701 state_ = UNINITIALIZED;
702 DVLOG(1) << " Error code: " << login_response->error().code();
703 DVLOG(1) << " Error message: " << login_response->error().message();
704 LOG(ERROR) << "Failed to log in to GCM, resetting connection.";
705 connection_factory_->SignalConnectionReset(
706 ConnectionFactory::LOGIN_FAILURE);
707 mcs_error_callback_.Run();
708 return;
711 if (login_response->has_heartbeat_config()) {
712 heartbeat_manager_.UpdateHeartbeatConfig(
713 login_response->heartbeat_config());
716 state_ = CONNECTED;
717 stream_id_in_ = 1; // To account for the login response.
718 DCHECK_EQ(1U, stream_id_out_);
720 // Pass the login response on up.
721 base::ThreadTaskRunnerHandle::Get()->PostTask(
722 FROM_HERE,
723 base::Bind(message_received_callback_,
724 MCSMessage(tag, protobuf.Pass())));
726 // If there are pending messages, attempt to send one.
727 if (!to_send_.empty()) {
728 base::ThreadTaskRunnerHandle::Get()->PostTask(
729 FROM_HERE,
730 base::Bind(&MCSClient::MaybeSendMessage,
731 weak_ptr_factory_.GetWeakPtr()));
734 heartbeat_manager_.Start(
735 base::Bind(&MCSClient::SendHeartbeat,
736 weak_ptr_factory_.GetWeakPtr()),
737 base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
738 weak_ptr_factory_.GetWeakPtr()));
739 return;
741 case kHeartbeatPingTag:
742 DCHECK_GE(stream_id_in_, 1U);
743 DVLOG(1) << "Received heartbeat ping, sending ack.";
744 SendMessage(
745 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
746 return;
747 case kHeartbeatAckTag:
748 DCHECK_GE(stream_id_in_, 1U);
749 DVLOG(1) << "Received heartbeat ack.";
750 // Do nothing else, all messages act as heartbeat acks.
751 return;
752 case kCloseTag:
753 LOG(ERROR) << "Received close command, resetting connection.";
754 state_ = LOADED;
755 connection_factory_->SignalConnectionReset(
756 ConnectionFactory::CLOSE_COMMAND);
757 return;
758 case kIqStanzaTag: {
759 DCHECK_GE(stream_id_in_, 1U);
760 mcs_proto::IqStanza* iq_stanza =
761 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
762 const mcs_proto::Extension& iq_extension = iq_stanza->extension();
763 switch (iq_extension.id()) {
764 case kSelectiveAck: {
765 PersistentIdList acked_ids;
766 if (BuildPersistentIdListFromProto(iq_extension.data(),
767 &acked_ids)) {
768 HandleSelectiveAck(acked_ids);
770 return;
772 case kStreamAck:
773 // Do nothing. The last received stream id is always processed if it's
774 // present.
775 return;
776 default:
777 LOG(WARNING) << "Received invalid iq stanza extension "
778 << iq_extension.id();
779 return;
782 case kDataMessageStanzaTag: {
783 DCHECK_GE(stream_id_in_, 1U);
784 mcs_proto::DataMessageStanza* data_message =
785 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
786 if (data_message->category() == kMCSCategory) {
787 HandleMCSDataMesssage(protobuf.Pass());
788 return;
791 DCHECK(protobuf.get());
792 base::ThreadTaskRunnerHandle::Get()->PostTask(
793 FROM_HERE,
794 base::Bind(message_received_callback_,
795 MCSMessage(tag, protobuf.Pass())));
796 return;
798 default:
799 LOG(ERROR) << "Received unexpected message of type "
800 << static_cast<int>(tag);
801 return;
805 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
806 PersistentIdList acked_outgoing_persistent_ids;
807 StreamIdList acked_outgoing_stream_ids;
808 while (!to_resend_.empty() &&
809 to_resend_.front()->stream_id <= last_stream_id_received) {
810 const MCSPacketInternal& outgoing_packet = to_resend_.front();
811 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
812 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
813 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
814 to_resend_.pop_front();
817 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
818 << " outgoing messages, " << to_resend_.size()
819 << " remaining unacked";
820 gcm_store_->RemoveOutgoingMessages(
821 acked_outgoing_persistent_ids,
822 base::Bind(&MCSClient::OnGCMUpdateFinished,
823 weak_ptr_factory_.GetWeakPtr()));
825 HandleServerConfirmedReceipt(last_stream_id_received);
828 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
829 std::set<PersistentId> remaining_ids(id_list.begin(), id_list.end());
831 StreamId last_stream_id_received = 0;
833 // First check the to_resend_ queue. Acknowledgments are always contiguous,
834 // so if there's a pending message that hasn't been acked, all newer messages
835 // must also be unacked.
836 while(!to_resend_.empty() && !remaining_ids.empty()) {
837 const MCSPacketInternal& outgoing_packet = to_resend_.front();
838 if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
839 break; // Newer message must be unacked too.
840 remaining_ids.erase(outgoing_packet->persistent_id);
841 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
843 // No need to re-acknowledge any server messages this message already
844 // acknowledged.
845 StreamId device_stream_id = outgoing_packet->stream_id;
846 if (device_stream_id > last_stream_id_received)
847 last_stream_id_received = device_stream_id;
848 to_resend_.pop_front();
851 // If the acknowledged ids aren't all there, they might be in the to_send_
852 // queue (typically when a SelectiveAck confirms messages as part of a login
853 // response).
854 while (!to_send_.empty() && !remaining_ids.empty()) {
855 const MCSPacketInternal& outgoing_packet = to_send_.front();
856 if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
857 break; // Newer messages must be unacked too.
858 remaining_ids.erase(outgoing_packet->persistent_id);
859 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
861 // No need to re-acknowledge any server messages this message already
862 // acknowledged.
863 StreamId device_stream_id = outgoing_packet->stream_id;
864 if (device_stream_id > last_stream_id_received)
865 last_stream_id_received = device_stream_id;
866 PopMessageForSend();
869 // Only handle the largest stream id value. All other stream ids are
870 // implicitly handled.
871 if (last_stream_id_received > 0)
872 HandleServerConfirmedReceipt(last_stream_id_received);
874 // At this point, all remaining acked ids are redundant.
875 PersistentIdList acked_ids;
876 if (remaining_ids.size() > 0) {
877 for (size_t i = 0; i < id_list.size(); ++i) {
878 if (remaining_ids.count(id_list[i]) > 0)
879 continue;
880 acked_ids.push_back(id_list[i]);
882 } else {
883 acked_ids = id_list;
886 DVLOG(1) << "Server acked " << acked_ids.size()
887 << " messages, " << to_resend_.size() << " remaining unacked.";
888 gcm_store_->RemoveOutgoingMessages(
889 acked_ids,
890 base::Bind(&MCSClient::OnGCMUpdateFinished,
891 weak_ptr_factory_.GetWeakPtr()));
893 // Resend any remaining outgoing messages, as they were not received by the
894 // server.
895 DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
896 while (!to_resend_.empty()) {
897 to_send_.push_front(to_resend_.back());
898 to_resend_.pop_back();
900 base::ThreadTaskRunnerHandle::Get()->PostTask(
901 FROM_HERE,
902 base::Bind(&MCSClient::MaybeSendMessage,
903 weak_ptr_factory_.GetWeakPtr()));
906 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
907 PersistentIdList acked_incoming_ids;
908 for (std::map<StreamId, PersistentIdList>::iterator iter =
909 acked_server_ids_.begin();
910 iter != acked_server_ids_.end() &&
911 iter->first <= device_stream_id;) {
912 acked_incoming_ids.insert(acked_incoming_ids.end(),
913 iter->second.begin(),
914 iter->second.end());
915 acked_server_ids_.erase(iter++);
918 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
919 << " acknowledged server messages.";
920 gcm_store_->RemoveIncomingMessages(
921 acked_incoming_ids,
922 base::Bind(&MCSClient::OnGCMUpdateFinished,
923 weak_ptr_factory_.GetWeakPtr()));
926 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
927 return base::Int64ToString(base::TimeTicks::Now().ToInternalValue());
930 void MCSClient::OnConnectionResetByHeartbeat(
931 ConnectionFactory::ConnectionResetReason reason) {
932 connection_factory_->SignalConnectionReset(reason);
935 void MCSClient::NotifyMessageSendStatus(
936 const google::protobuf::MessageLite& protobuf,
937 MessageSendStatus status) {
938 if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag)
939 return;
941 const mcs_proto::DataMessageStanza* data_message_stanza =
942 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
943 recorder_->RecordNotifySendStatus(
944 data_message_stanza->category(),
945 data_message_stanza->to(),
946 data_message_stanza->id(),
947 status,
948 protobuf.ByteSize(),
949 data_message_stanza->ttl());
950 message_sent_callback_.Run(
951 data_message_stanza->device_user_id(),
952 data_message_stanza->category(),
953 data_message_stanza->id(),
954 status);
957 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
958 MCSPacketInternal packet = to_send_.front();
959 to_send_.pop_front();
961 if (packet->tag == kDataMessageStanzaTag) {
962 mcs_proto::DataMessageStanza* data_message =
963 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
964 CollapseKey collapse_key(*data_message);
965 if (collapse_key.IsValid())
966 collapse_key_map_.erase(collapse_key);
969 return packet;
972 } // namespace gcm