Update ASan/Android runtime and setup script to LLVM r200682.
[chromium-blink-merge.git] / google_apis / gcm / engine / mcs_client.cc
blob127f9bf6541566e9a9c020a6337abfe721ba4d2c
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/engine/mcs_client.h"
7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h"
9 #include "base/metrics/histogram.h"
10 #include "base/strings/string_number_conversions.h"
11 #include "base/time/clock.h"
12 #include "base/time/time.h"
13 #include "google_apis/gcm/base/mcs_util.h"
14 #include "google_apis/gcm/base/socket_stream.h"
15 #include "google_apis/gcm/engine/connection_factory.h"
17 using namespace google::protobuf::io;
19 namespace gcm {
21 namespace {
23 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
25 // The category of messages intended for the GCM client itself from MCS.
26 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
28 // The from field for messages originating in the GCM client.
29 const char kGCMFromField[] = "gcm@android.com";
31 // MCS status message types.
32 // TODO(zea): handle these at the GCMClient layer.
33 const char kIdleNotification[] = "IdleNotification";
34 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
35 // const char kPowerNotification[] = "PowerNotification";
36 // const char kDataActiveNotification[] = "DataActiveNotification";
38 // The number of unacked messages to allow before sending a stream ack.
39 // Applies to both incoming and outgoing messages.
40 // TODO(zea): make this server configurable.
41 const int kUnackedMessageBeforeStreamAck = 10;
43 // The global maximum number of pending messages to have in the send queue.
44 const size_t kMaxSendQueueSize = 10 * 1024;
46 // The maximum message size that can be sent to the server.
47 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server.
49 // Helper for converting a proto persistent id list to a vector of strings.
50 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
51 std::vector<std::string>* id_list) {
52 mcs_proto::SelectiveAck selective_ack;
53 if (!selective_ack.ParseFromString(bytes))
54 return false;
55 std::vector<std::string> new_list;
56 for (int i = 0; i < selective_ack.id_size(); ++i) {
57 DCHECK(!selective_ack.id(i).empty());
58 new_list.push_back(selective_ack.id(i));
60 id_list->swap(new_list);
61 return true;
64 } // namespace
66 struct ReliablePacketInfo {
67 ReliablePacketInfo();
68 ~ReliablePacketInfo();
70 // The stream id with which the message was sent.
71 uint32 stream_id;
73 // If reliable delivery was requested, the persistent id of the message.
74 std::string persistent_id;
76 // The type of message itself (for easier lookup).
77 uint8 tag;
79 // The protobuf of the message itself.
80 MCSProto protobuf;
83 ReliablePacketInfo::ReliablePacketInfo()
84 : stream_id(0), tag(0) {
86 ReliablePacketInfo::~ReliablePacketInfo() {}
88 MCSClient::MCSClient(base::Clock* clock,
89 ConnectionFactory* connection_factory,
90 GCMStore* gcm_store)
91 : clock_(clock),
92 state_(UNINITIALIZED),
93 android_id_(0),
94 security_token_(0),
95 connection_factory_(connection_factory),
96 connection_handler_(NULL),
97 last_device_to_server_stream_id_received_(0),
98 last_server_to_device_stream_id_received_(0),
99 stream_id_out_(0),
100 stream_id_in_(0),
101 gcm_store_(gcm_store),
102 weak_ptr_factory_(this) {
105 MCSClient::~MCSClient() {
108 void MCSClient::Initialize(
109 const ErrorCallback& error_callback,
110 const OnMessageReceivedCallback& message_received_callback,
111 const OnMessageSentCallback& message_sent_callback,
112 scoped_ptr<GCMStore::LoadResult> load_result) {
113 DCHECK_EQ(state_, UNINITIALIZED);
115 state_ = LOADED;
116 mcs_error_callback_ = error_callback;
117 message_received_callback_ = message_received_callback;
118 message_sent_callback_ = message_sent_callback;
120 connection_factory_->Initialize(
121 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
122 weak_ptr_factory_.GetWeakPtr()),
123 base::Bind(&MCSClient::HandlePacketFromWire,
124 weak_ptr_factory_.GetWeakPtr()),
125 base::Bind(&MCSClient::MaybeSendMessage,
126 weak_ptr_factory_.GetWeakPtr()));
127 connection_handler_ = connection_factory_->GetConnectionHandler();
129 stream_id_out_ = 1; // Login request is hardcoded to id 1.
131 android_id_ = load_result->device_android_id;
132 security_token_ = load_result->device_security_token;
134 if (android_id_ == 0) {
135 DVLOG(1) << "No device credentials found, assuming new client.";
136 // No need to try and load RMQ data in that case.
137 return;
140 // |android_id_| is non-zero, so should |security_token_|.
141 DCHECK_NE(0u, security_token_) << "Security token invalid, while android id"
142 << " is non-zero.";
144 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size()
145 << " incoming acks pending and "
146 << load_result->outgoing_messages.size()
147 << " outgoing messages pending.";
149 restored_unackeds_server_ids_ = load_result->incoming_messages;
151 // First go through and order the outgoing messages by recency.
152 std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
153 std::vector<PersistentId> expired_ttl_ids;
154 for (GCMStore::OutgoingMessageMap::iterator iter =
155 load_result->outgoing_messages.begin();
156 iter != load_result->outgoing_messages.end(); ++iter) {
157 uint64 timestamp = 0;
158 if (!base::StringToUint64(iter->first, &timestamp)) {
159 LOG(ERROR) << "Invalid restored message.";
160 // TODO(fgorski): Error: data unreadable
161 mcs_error_callback_.Run();
162 return;
165 // Check if the TTL has expired for this message.
166 if (HasTTLExpired(*iter->second, clock_)) {
167 expired_ttl_ids.push_back(iter->first);
168 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED);
169 continue;
172 ordered_messages[timestamp] = iter->second.release();
175 if (!expired_ttl_ids.empty()) {
176 gcm_store_->RemoveOutgoingMessages(
177 expired_ttl_ids,
178 base::Bind(&MCSClient::OnGCMUpdateFinished,
179 weak_ptr_factory_.GetWeakPtr()));
182 // Now go through and add the outgoing messages to the send queue in their
183 // appropriate order (oldest at front, most recent at back).
184 for (std::map<uint64, google::protobuf::MessageLite*>::iterator
185 iter = ordered_messages.begin();
186 iter != ordered_messages.end(); ++iter) {
187 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
188 packet_info->protobuf.reset(iter->second);
189 packet_info->tag = GetMCSProtoTag(*iter->second);
190 packet_info->persistent_id = base::Uint64ToString(iter->first);
191 to_send_.push_back(make_linked_ptr(packet_info));
195 void MCSClient::Login(uint64 android_id,
196 uint64 security_token,
197 const std::vector<int64>& user_serial_numbers) {
198 DCHECK_EQ(state_, LOADED);
199 DCHECK(android_id_ == 0 || android_id_ == android_id);
200 DCHECK(security_token_ == 0 || security_token_ == security_token);
202 if (android_id != android_id_ && security_token != security_token_) {
203 DCHECK(android_id);
204 DCHECK(security_token);
205 android_id_ = android_id;
206 security_token_ = security_token;
208 user_serial_numbers_ = user_serial_numbers;
210 DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty());
212 state_ = CONNECTING;
213 connection_factory_->Connect();
216 void MCSClient::SendMessage(const MCSMessage& message) {
217 int ttl = GetTTL(message.GetProtobuf());
218 DCHECK_GE(ttl, 0);
219 if (to_send_.size() > kMaxSendQueueSize) {
220 NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED);
221 return;
223 if (message.size() > kMaxMessageBytes) {
224 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
225 return;
228 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
229 packet_info->tag = message.tag();
230 packet_info->protobuf = message.CloneProtobuf();
232 if (ttl > 0) {
233 PersistentId persistent_id = GetNextPersistentId();
234 DVLOG(1) << "Setting persistent id to " << persistent_id;
235 packet_info->persistent_id = persistent_id;
236 SetPersistentId(persistent_id,
237 packet_info->protobuf.get());
238 if (!gcm_store_->AddOutgoingMessage(
239 persistent_id,
240 MCSMessage(message.tag(),
241 *(packet_info->protobuf)),
242 base::Bind(&MCSClient::OnGCMUpdateFinished,
243 weak_ptr_factory_.GetWeakPtr()))) {
244 NotifyMessageSendStatus(message.GetProtobuf(),
245 APP_QUEUE_SIZE_LIMIT_REACHED);
246 return;
248 } else if (!connection_factory_->IsEndpointReachable()) {
249 DVLOG(1) << "No active connection, dropping message.";
250 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
251 return;
254 to_send_.push_back(make_linked_ptr(packet_info.release()));
256 // Notify that the messages has been succsfully queued for sending.
257 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
258 NotifyMessageSendStatus(message.GetProtobuf(), QUEUED);
260 MaybeSendMessage();
263 void MCSClient::Destroy() {
264 gcm_store_->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished,
265 weak_ptr_factory_.GetWeakPtr()));
268 void MCSClient::ResetStateAndBuildLoginRequest(
269 mcs_proto::LoginRequest* request) {
270 DCHECK(android_id_);
271 DCHECK(security_token_);
272 stream_id_in_ = 0;
273 stream_id_out_ = 1;
274 last_device_to_server_stream_id_received_ = 0;
275 last_server_to_device_stream_id_received_ = 0;
277 heartbeat_manager_.Stop();
279 // Add any pending acknowledgments to the list of ids.
280 for (StreamIdToPersistentIdMap::const_iterator iter =
281 unacked_server_ids_.begin();
282 iter != unacked_server_ids_.end(); ++iter) {
283 restored_unackeds_server_ids_.push_back(iter->second);
285 unacked_server_ids_.clear();
287 // Any acknowledged server ids which have not been confirmed by the server
288 // are treated like unacknowledged ids.
289 for (std::map<StreamId, PersistentIdList>::const_iterator iter =
290 acked_server_ids_.begin();
291 iter != acked_server_ids_.end(); ++iter) {
292 restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
293 iter->second.begin(),
294 iter->second.end());
296 acked_server_ids_.clear();
298 // Then build the request, consuming all pending acknowledgments.
299 request->Swap(BuildLoginRequest(
300 android_id_, security_token_, user_serial_numbers_).get());
301 for (PersistentIdList::const_iterator iter =
302 restored_unackeds_server_ids_.begin();
303 iter != restored_unackeds_server_ids_.end(); ++iter) {
304 request->add_received_persistent_id(*iter);
306 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
307 restored_unackeds_server_ids_.clear();
309 // Push all unacknowledged messages to front of send queue. No need to save
310 // to RMQ, as all messages that reach this point should already have been
311 // saved as necessary.
312 while (!to_resend_.empty()) {
313 to_send_.push_front(to_resend_.back());
314 to_resend_.pop_back();
317 // Drop all TTL == 0 or expired TTL messages from the queue.
318 std::deque<MCSPacketInternal> new_to_send;
319 std::vector<PersistentId> expired_ttl_ids;
320 while (!to_send_.empty()) {
321 MCSPacketInternal packet = to_send_.front();
322 to_send_.pop_front();
323 if (GetTTL(*packet->protobuf) > 0 &&
324 !HasTTLExpired(*packet->protobuf, clock_)) {
325 new_to_send.push_back(packet);
326 } else {
327 // If the TTL was 0 there is no persistent id, so no need to remove the
328 // message from the persistent store.
329 if (!packet->persistent_id.empty())
330 expired_ttl_ids.push_back(packet->persistent_id);
331 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
335 if (!expired_ttl_ids.empty()) {
336 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
337 << " messages expired.";
338 gcm_store_->RemoveOutgoingMessages(
339 expired_ttl_ids,
340 base::Bind(&MCSClient::OnGCMUpdateFinished,
341 weak_ptr_factory_.GetWeakPtr()));
344 to_send_.swap(new_to_send);
346 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
347 << " incoming acks pending, and " << to_send_.size()
348 << " pending outgoing messages.";
350 state_ = CONNECTING;
353 void MCSClient::SendHeartbeat() {
354 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
357 void MCSClient::OnGCMUpdateFinished(bool success) {
358 LOG_IF(ERROR, !success) << "GCM Update failed!";
359 UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success);
360 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
363 void MCSClient::MaybeSendMessage() {
364 if (to_send_.empty())
365 return;
367 // If the connection has been reset, do nothing. On reconnection
368 // MaybeSendMessage will be automatically invoked again.
369 // TODO(zea): consider doing TTL expiration at connection reset time, rather
370 // than reconnect time.
371 if (!connection_factory_->IsEndpointReachable())
372 return;
374 MCSPacketInternal packet = to_send_.front();
375 to_send_.pop_front();
376 if (HasTTLExpired(*packet->protobuf, clock_)) {
377 DCHECK(!packet->persistent_id.empty());
378 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
379 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
380 gcm_store_->RemoveOutgoingMessage(
381 packet->persistent_id,
382 base::Bind(&MCSClient::OnGCMUpdateFinished,
383 weak_ptr_factory_.GetWeakPtr()));
384 base::MessageLoop::current()->PostTask(
385 FROM_HERE,
386 base::Bind(&MCSClient::MaybeSendMessage,
387 weak_ptr_factory_.GetWeakPtr()));
388 return;
390 DVLOG(1) << "Pending output message found, sending.";
391 if (!packet->persistent_id.empty())
392 to_resend_.push_back(packet);
393 SendPacketToWire(packet.get());
396 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
397 packet_info->stream_id = ++stream_id_out_;
398 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
400 // Set the queued time as necessary.
401 if (packet_info->tag == kDataMessageStanzaTag) {
402 mcs_proto::DataMessageStanza* data_message =
403 reinterpret_cast<mcs_proto::DataMessageStanza*>(
404 packet_info->protobuf.get());
405 uint64 sent = data_message->sent();
406 DCHECK_GT(sent, 0U);
407 int queued = (clock_->Now().ToInternalValue() /
408 base::Time::kMicrosecondsPerSecond) - sent;
409 DVLOG(1) << "Message was queued for " << queued << " seconds.";
410 data_message->set_queued(queued);
413 // Set the proper last received stream id to acknowledge received server
414 // packets.
415 DVLOG(1) << "Setting last stream id received to "
416 << stream_id_in_;
417 SetLastStreamIdReceived(stream_id_in_,
418 packet_info->protobuf.get());
419 if (stream_id_in_ != last_server_to_device_stream_id_received_) {
420 last_server_to_device_stream_id_received_ = stream_id_in_;
421 // Mark all acknowledged server messages as such. Note: they're not dropped,
422 // as it may be that they'll need to be re-acked if this message doesn't
423 // make it.
424 PersistentIdList persistent_id_list;
425 for (StreamIdToPersistentIdMap::const_iterator iter =
426 unacked_server_ids_.begin();
427 iter != unacked_server_ids_.end(); ++iter) {
428 DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
429 persistent_id_list.push_back(iter->second);
431 unacked_server_ids_.clear();
432 acked_server_ids_[stream_id_out_] = persistent_id_list;
435 connection_handler_->SendMessage(*packet_info->protobuf);
438 void MCSClient::HandleMCSDataMesssage(
439 scoped_ptr<google::protobuf::MessageLite> protobuf) {
440 mcs_proto::DataMessageStanza* data_message =
441 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
442 // TODO(zea): implement a proper status manager rather than hardcoding these
443 // values.
444 scoped_ptr<mcs_proto::DataMessageStanza> response(
445 new mcs_proto::DataMessageStanza());
446 response->set_from(kGCMFromField);
447 response->set_sent(clock_->Now().ToInternalValue() /
448 base::Time::kMicrosecondsPerSecond);
449 response->set_ttl(0);
450 bool send = false;
451 for (int i = 0; i < data_message->app_data_size(); ++i) {
452 const mcs_proto::AppData& app_data = data_message->app_data(i);
453 if (app_data.key() == kIdleNotification) {
454 // Tell the MCS server the client is not idle.
455 send = true;
456 mcs_proto::AppData data;
457 data.set_key(kIdleNotification);
458 data.set_value("false");
459 response->add_app_data()->CopyFrom(data);
460 response->set_category(kMCSCategory);
464 if (send) {
465 SendMessage(
466 MCSMessage(kDataMessageStanzaTag,
467 response.PassAs<const google::protobuf::MessageLite>()));
471 void MCSClient::HandlePacketFromWire(
472 scoped_ptr<google::protobuf::MessageLite> protobuf) {
473 if (!protobuf.get())
474 return;
475 uint8 tag = GetMCSProtoTag(*protobuf);
476 PersistentId persistent_id = GetPersistentId(*protobuf);
477 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
479 if (last_stream_id_received != 0) {
480 last_device_to_server_stream_id_received_ = last_stream_id_received;
482 // Process device to server messages that have now been acknowledged by the
483 // server. Because messages are stored in order, just pop off all that have
484 // a stream id lower than server's last received stream id.
485 HandleStreamAck(last_stream_id_received);
487 // Process server_to_device_messages that the server now knows were
488 // acknowledged. Again, they're in order, so just keep going until the
489 // stream id is reached.
490 StreamIdList acked_stream_ids_to_remove;
491 for (std::map<StreamId, PersistentIdList>::iterator iter =
492 acked_server_ids_.begin();
493 iter != acked_server_ids_.end() &&
494 iter->first <= last_stream_id_received; ++iter) {
495 acked_stream_ids_to_remove.push_back(iter->first);
497 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
498 iter != acked_stream_ids_to_remove.end(); ++iter) {
499 acked_server_ids_.erase(*iter);
503 ++stream_id_in_;
504 if (!persistent_id.empty()) {
505 unacked_server_ids_[stream_id_in_] = persistent_id;
506 gcm_store_->AddIncomingMessage(persistent_id,
507 base::Bind(&MCSClient::OnGCMUpdateFinished,
508 weak_ptr_factory_.GetWeakPtr()));
511 DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
512 << " with persistent id "
513 << (persistent_id.empty() ? "NULL" : persistent_id)
514 << ", stream id " << stream_id_in_ << " and last stream id received "
515 << last_stream_id_received;
517 if (unacked_server_ids_.size() > 0 &&
518 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
519 SendMessage(MCSMessage(kIqStanzaTag,
520 BuildStreamAck().
521 PassAs<const google::protobuf::MessageLite>()));
524 // The connection is alive, treat this message as a heartbeat ack.
525 heartbeat_manager_.OnHeartbeatAcked();
527 switch (tag) {
528 case kLoginResponseTag: {
529 DCHECK_EQ(CONNECTING, state_);
530 mcs_proto::LoginResponse* login_response =
531 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
532 DVLOG(1) << "Received login response:";
533 DVLOG(1) << " Id: " << login_response->id();
534 DVLOG(1) << " Timestamp: " << login_response->server_timestamp();
535 if (login_response->has_error() && login_response->error().code() != 0) {
536 state_ = UNINITIALIZED;
537 DVLOG(1) << " Error code: " << login_response->error().code();
538 DVLOG(1) << " Error message: " << login_response->error().message();
539 mcs_error_callback_.Run();
540 return;
543 if (login_response->has_heartbeat_config()) {
544 heartbeat_manager_.UpdateHeartbeatConfig(
545 login_response->heartbeat_config());
548 state_ = CONNECTED;
549 stream_id_in_ = 1; // To account for the login response.
550 DCHECK_EQ(1U, stream_id_out_);
552 // Pass the login response on up.
553 base::MessageLoop::current()->PostTask(
554 FROM_HERE,
555 base::Bind(message_received_callback_,
556 MCSMessage(tag,
557 protobuf.PassAs<
558 const google::protobuf::MessageLite>())));
560 // If there are pending messages, attempt to send one.
561 if (!to_send_.empty()) {
562 base::MessageLoop::current()->PostTask(
563 FROM_HERE,
564 base::Bind(&MCSClient::MaybeSendMessage,
565 weak_ptr_factory_.GetWeakPtr()));
568 heartbeat_manager_.Start(
569 base::Bind(&MCSClient::SendHeartbeat,
570 weak_ptr_factory_.GetWeakPtr()),
571 base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
572 weak_ptr_factory_.GetWeakPtr()));
573 return;
575 case kHeartbeatPingTag:
576 DCHECK_GE(stream_id_in_, 1U);
577 DVLOG(1) << "Received heartbeat ping, sending ack.";
578 SendMessage(
579 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
580 return;
581 case kHeartbeatAckTag:
582 DCHECK_GE(stream_id_in_, 1U);
583 DVLOG(1) << "Received heartbeat ack.";
584 // Do nothing else, all messages act as heartbeat acks.
585 return;
586 case kCloseTag:
587 LOG(ERROR) << "Received close command, resetting connection.";
588 state_ = LOADED;
589 connection_factory_->SignalConnectionReset();
590 return;
591 case kIqStanzaTag: {
592 DCHECK_GE(stream_id_in_, 1U);
593 mcs_proto::IqStanza* iq_stanza =
594 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
595 const mcs_proto::Extension& iq_extension = iq_stanza->extension();
596 switch (iq_extension.id()) {
597 case kSelectiveAck: {
598 PersistentIdList acked_ids;
599 if (BuildPersistentIdListFromProto(iq_extension.data(),
600 &acked_ids)) {
601 HandleSelectiveAck(acked_ids);
603 return;
605 case kStreamAck:
606 // Do nothing. The last received stream id is always processed if it's
607 // present.
608 return;
609 default:
610 LOG(WARNING) << "Received invalid iq stanza extension "
611 << iq_extension.id();
612 return;
615 case kDataMessageStanzaTag: {
616 DCHECK_GE(stream_id_in_, 1U);
617 mcs_proto::DataMessageStanza* data_message =
618 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
619 if (data_message->category() == kMCSCategory) {
620 HandleMCSDataMesssage(protobuf.Pass());
621 return;
624 DCHECK(protobuf.get());
625 base::MessageLoop::current()->PostTask(
626 FROM_HERE,
627 base::Bind(message_received_callback_,
628 MCSMessage(tag,
629 protobuf.PassAs<
630 const google::protobuf::MessageLite>())));
631 return;
633 default:
634 LOG(ERROR) << "Received unexpected message of type "
635 << static_cast<int>(tag);
636 return;
640 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
641 PersistentIdList acked_outgoing_persistent_ids;
642 StreamIdList acked_outgoing_stream_ids;
643 while (!to_resend_.empty() &&
644 to_resend_.front()->stream_id <= last_stream_id_received) {
645 const MCSPacketInternal& outgoing_packet = to_resend_.front();
646 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
647 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
648 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
649 to_resend_.pop_front();
652 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
653 << " outgoing messages, " << to_resend_.size()
654 << " remaining unacked";
655 gcm_store_->RemoveOutgoingMessages(
656 acked_outgoing_persistent_ids,
657 base::Bind(&MCSClient::OnGCMUpdateFinished,
658 weak_ptr_factory_.GetWeakPtr()));
660 HandleServerConfirmedReceipt(last_stream_id_received);
663 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
664 // First check the to_resend_ queue. Acknowledgments should always happen
665 // in the order they were sent, so if messages are present they should match
666 // the acknowledge list.
667 PersistentIdList::const_iterator iter = id_list.begin();
668 for (; iter != id_list.end() && !to_resend_.empty(); ++iter) {
669 const MCSPacketInternal& outgoing_packet = to_resend_.front();
670 DCHECK_EQ(outgoing_packet->persistent_id, *iter);
671 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
673 // No need to re-acknowledge any server messages this message already
674 // acknowledged.
675 StreamId device_stream_id = outgoing_packet->stream_id;
676 HandleServerConfirmedReceipt(device_stream_id);
678 to_resend_.pop_front();
681 // If the acknowledged ids aren't all there, they might be in the to_send_
682 // queue (typically when a StreamAck confirms messages as part of a login
683 // response).
684 for (; iter != id_list.end() && !to_send_.empty(); ++iter) {
685 const MCSPacketInternal& outgoing_packet = to_send_.front();
686 DCHECK_EQ(outgoing_packet->persistent_id, *iter);
687 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
689 // No need to re-acknowledge any server messages this message already
690 // acknowledged.
691 StreamId device_stream_id = outgoing_packet->stream_id;
692 HandleServerConfirmedReceipt(device_stream_id);
694 to_send_.pop_front();
697 DCHECK(iter == id_list.end());
699 DVLOG(1) << "Server acked " << id_list.size()
700 << " messages, " << to_resend_.size() << " remaining unacked.";
701 gcm_store_->RemoveOutgoingMessages(
702 id_list,
703 base::Bind(&MCSClient::OnGCMUpdateFinished,
704 weak_ptr_factory_.GetWeakPtr()));
706 // Resend any remaining outgoing messages, as they were not received by the
707 // server.
708 DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
709 while (!to_resend_.empty()) {
710 to_send_.push_front(to_resend_.back());
711 to_resend_.pop_back();
715 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
716 PersistentIdList acked_incoming_ids;
717 for (std::map<StreamId, PersistentIdList>::iterator iter =
718 acked_server_ids_.begin();
719 iter != acked_server_ids_.end() &&
720 iter->first <= device_stream_id;) {
721 acked_incoming_ids.insert(acked_incoming_ids.end(),
722 iter->second.begin(),
723 iter->second.end());
724 acked_server_ids_.erase(iter++);
727 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
728 << " acknowledged server messages.";
729 gcm_store_->RemoveIncomingMessages(
730 acked_incoming_ids,
731 base::Bind(&MCSClient::OnGCMUpdateFinished,
732 weak_ptr_factory_.GetWeakPtr()));
735 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
736 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
739 void MCSClient::OnConnectionResetByHeartbeat() {
740 connection_factory_->SignalConnectionReset();
743 void MCSClient::NotifyMessageSendStatus(
744 const google::protobuf::MessageLite& protobuf,
745 MessageSendStatus status) {
746 if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag)
747 return;
749 const mcs_proto::DataMessageStanza* data_message_stanza =
750 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
751 message_sent_callback_.Run(
752 data_message_stanza->device_user_id(),
753 data_message_stanza->category(),
754 data_message_stanza->id(),
755 status);
758 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) {
759 gcm_store_ = gcm_store;
762 } // namespace gcm