mark PurpleImageClass as private
[pidgin-git.git] / libpurple / protocols / facebook / mqtt.c
blobf6332c1ef598af52e08d135644e4a86a6791433f
1 /* purple
3 * Purple is the legal property of its developers, whose names are too numerous
4 * to list here. Please refer to the COPYRIGHT file distributed with this
5 * source distribution.
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
22 #include "internal.h"
24 #include <glib/gprintf.h>
25 #include <stdarg.h>
26 #include <string.h>
28 #include "account.h"
29 #include "eventloop.h"
30 #include "glibcompat.h"
31 #include "purple-gio.h"
32 #include "queuedoutputstream.h"
34 #include "mqtt.h"
35 #include "util.h"
37 typedef struct
39 PurpleConnection *gc;
40 GIOStream *conn;
41 GBufferedInputStream *input;
42 PurpleQueuedOutputStream *output;
43 GCancellable *cancellable;
44 gboolean connected;
45 guint16 mid;
47 GByteArray *rbuf;
48 gsize remz;
50 gint tev;
51 } FbMqttPrivate;
53 /**
54 * FbMqtt:
56 * Represents an MQTT connection.
58 struct _FbMqtt
60 GObject parent;
61 FbMqttPrivate *priv;
64 G_DEFINE_TYPE_WITH_PRIVATE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
66 typedef struct
68 FbMqttMessageType type;
69 FbMqttMessageFlags flags;
71 GByteArray *bytes;
72 guint offset;
73 guint pos;
75 gboolean local;
76 } FbMqttMessagePrivate;
78 /**
79 * FbMqttMessage:
81 * Represents a reader/writer for an MQTT message.
83 struct _FbMqttMessage
85 GObject parent;
86 FbMqttMessagePrivate *priv;
89 G_DEFINE_TYPE_WITH_PRIVATE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
91 static void fb_mqtt_read_packet(FbMqtt *mqtt);
93 static void
94 fb_mqtt_dispose(GObject *obj)
96 FbMqtt *mqtt = FB_MQTT(obj);
97 FbMqttPrivate *priv = mqtt->priv;
99 fb_mqtt_close(mqtt);
100 g_byte_array_free(priv->rbuf, TRUE);
103 static void
104 fb_mqtt_class_init(FbMqttClass *klass)
106 GObjectClass *gklass = G_OBJECT_CLASS(klass);
108 gklass->dispose = fb_mqtt_dispose;
110 * FbMqtt::connect:
111 * @mqtt: The #FbMqtt.
113 * Emitted upon the successful completion of the connection
114 * process. This is emitted as a result of #fb_mqtt_connect().
116 g_signal_new("connect",
117 G_TYPE_FROM_CLASS(klass),
118 G_SIGNAL_ACTION,
120 NULL, NULL, NULL,
121 G_TYPE_NONE,
125 * FbMqtt::error:
126 * @mqtt: The #FbMqtt.
127 * @error: The #GError.
129 * Emitted whenever an error is hit within the #FbMqtt. This
130 * should close the #FbMqtt with #fb_mqtt_close().
132 g_signal_new("error",
133 G_TYPE_FROM_CLASS(klass),
134 G_SIGNAL_ACTION,
136 NULL, NULL, NULL,
137 G_TYPE_NONE,
138 1, G_TYPE_ERROR);
141 * FbMqtt::open:
142 * @mqtt: The #FbMqtt.
144 * Emitted upon the successful opening of the remote socket.
145 * This is emitted as a result of #fb_mqtt_open(). This should
146 * call #fb_mqtt_connect().
148 g_signal_new("open",
149 G_TYPE_FROM_CLASS(klass),
150 G_SIGNAL_ACTION,
152 NULL, NULL, NULL,
153 G_TYPE_NONE,
157 * FbMqtt::publish:
158 * @mqtt: The #FbMqtt.
159 * @topic: The topic.
160 * @pload: The payload.
162 * Emitted upon an incoming message from the steam.
164 g_signal_new("publish",
165 G_TYPE_FROM_CLASS(klass),
166 G_SIGNAL_ACTION,
168 NULL, NULL, NULL,
169 G_TYPE_NONE,
170 2, G_TYPE_STRING, G_TYPE_BYTE_ARRAY);
173 static void
174 fb_mqtt_init(FbMqtt *mqtt)
176 FbMqttPrivate *priv = fb_mqtt_get_instance_private(mqtt);
178 mqtt->priv = priv;
180 priv->rbuf = g_byte_array_new();
183 static void
184 fb_mqtt_message_dispose(GObject *obj)
186 FbMqttMessagePrivate *priv = FB_MQTT_MESSAGE(obj)->priv;
188 if ((priv->bytes != NULL) && priv->local) {
189 g_byte_array_free(priv->bytes, TRUE);
193 static void
194 fb_mqtt_message_class_init(FbMqttMessageClass *klass)
196 GObjectClass *gklass = G_OBJECT_CLASS(klass);
198 gklass->dispose = fb_mqtt_message_dispose;
201 static void
202 fb_mqtt_message_init(FbMqttMessage *msg)
204 FbMqttMessagePrivate *priv = fb_mqtt_message_get_instance_private(msg);
206 msg->priv = priv;
209 GQuark
210 fb_mqtt_error_quark(void)
212 static GQuark q = 0;
214 if (G_UNLIKELY(q == 0)) {
215 q = g_quark_from_static_string("fb-mqtt-error-quark");
218 return q;
221 FbMqtt *
222 fb_mqtt_new(PurpleConnection *gc)
224 FbMqtt *mqtt;
225 FbMqttPrivate *priv;
227 g_return_val_if_fail(PURPLE_IS_CONNECTION(gc), NULL);
229 mqtt = g_object_new(FB_TYPE_MQTT, NULL);
230 priv = mqtt->priv;
231 priv->gc = gc;
233 return mqtt;
236 void
237 fb_mqtt_close(FbMqtt *mqtt)
239 FbMqttPrivate *priv;
241 g_return_if_fail(FB_IS_MQTT(mqtt));
242 priv = mqtt->priv;
244 if (priv->tev > 0) {
245 g_source_remove(priv->tev);
246 priv->tev = 0;
249 if (priv->cancellable != NULL) {
250 g_cancellable_cancel(priv->cancellable);
251 g_clear_object(&priv->cancellable);
254 if (priv->conn != NULL) {
255 purple_gio_graceful_close(priv->conn,
256 G_INPUT_STREAM(priv->input),
257 G_OUTPUT_STREAM(priv->output));
258 g_clear_object(&priv->input);
259 g_clear_object(&priv->output);
260 g_clear_object(&priv->conn);
263 priv->connected = FALSE;
264 g_byte_array_set_size(priv->rbuf, 0);
267 static void
268 fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix)
270 if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
271 /* Return as cancelled means the connection is closing */
272 g_error_free(err);
273 return;
276 /* Now we can check for programming errors */
277 g_return_if_fail(FB_IS_MQTT(mqtt));
279 if (prefix != NULL) {
280 g_prefix_error(&err, "%s: ", prefix);
283 g_signal_emit_by_name(mqtt, "error", err);
284 g_error_free(err);
287 void
288 fb_mqtt_error(FbMqtt *mqtt, FbMqttError error, const gchar *format, ...)
290 GError *err;
291 va_list ap;
293 g_return_if_fail(FB_IS_MQTT(mqtt));
295 va_start(ap, format);
296 err = g_error_new_valist(FB_MQTT_ERROR, error, format, ap);
297 va_end(ap);
299 g_signal_emit_by_name(mqtt, "error", err);
300 g_error_free(err);
303 static gboolean
304 fb_mqtt_cb_timeout(gpointer data)
306 FbMqtt *mqtt = data;
307 FbMqttPrivate *priv = mqtt->priv;
309 priv->tev = 0;
310 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, _("Connection timed out"));
311 return FALSE;
314 static void
315 fb_mqtt_timeout_clear(FbMqtt *mqtt)
317 FbMqttPrivate *priv = mqtt->priv;
319 if (priv->tev > 0) {
320 g_source_remove(priv->tev);
321 priv->tev = 0;
325 static void
326 fb_mqtt_timeout(FbMqtt *mqtt)
328 FbMqttPrivate *priv = mqtt->priv;
330 fb_mqtt_timeout_clear(mqtt);
331 priv->tev = g_timeout_add(FB_MQTT_TIMEOUT_CONN,
332 fb_mqtt_cb_timeout, mqtt);
335 static gboolean
336 fb_mqtt_cb_ping(gpointer data)
338 FbMqtt *mqtt = data;
339 FbMqttMessage *msg;
340 FbMqttPrivate *priv = mqtt->priv;
342 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PINGREQ, 0);
343 fb_mqtt_write(mqtt, msg);
344 g_object_unref(msg);
346 priv->tev = 0;
347 fb_mqtt_timeout(mqtt);
348 return FALSE;
351 static void
352 fb_mqtt_ping(FbMqtt *mqtt)
354 FbMqttPrivate *priv = mqtt->priv;
356 fb_mqtt_timeout_clear(mqtt);
357 priv->tev = g_timeout_add(FB_MQTT_TIMEOUT_PING,
358 fb_mqtt_cb_ping, mqtt);
361 static void
362 fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data)
364 GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source);
365 FbMqtt *mqtt = data;
366 gssize ret;
367 GError *err = NULL;
369 ret = g_buffered_input_stream_fill_finish(input, res, &err);
371 if (ret < 1) {
372 if (ret == 0) {
373 err = g_error_new_literal(G_IO_ERROR,
374 G_IO_ERROR_CONNECTION_CLOSED,
375 _("Connection closed"));
378 fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header"));
379 return;
382 fb_mqtt_read_packet(mqtt);
385 static void
386 fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data)
388 FbMqtt *mqtt = data;
389 FbMqttPrivate *priv;
390 gssize ret;
391 FbMqttMessage *msg;
392 GError *err = NULL;
394 ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err);
396 if (ret < 1) {
397 if (ret == 0) {
398 err = g_error_new_literal(G_IO_ERROR,
399 G_IO_ERROR_CONNECTION_CLOSED,
400 _("Connection closed"));
403 fb_mqtt_take_error(mqtt, err, _("Failed to read packet data"));
404 return;
407 priv = mqtt->priv;
408 priv->remz -= ret;
410 if (priv->remz > 0) {
411 g_input_stream_read_async(G_INPUT_STREAM(source),
412 priv->rbuf->data +
413 priv->rbuf->len - priv->remz, priv->remz,
414 G_PRIORITY_DEFAULT, priv->cancellable,
415 fb_mqtt_cb_read_packet, mqtt);
416 return;
419 msg = fb_mqtt_message_new_bytes(priv->rbuf);
421 if (G_UNLIKELY(msg == NULL)) {
422 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
423 _("Failed to parse message"));
424 return;
427 fb_mqtt_read(mqtt, msg);
428 g_object_unref(msg);
430 /* Read another packet if connection wasn't reset in fb_mqtt_read() */
431 if (fb_mqtt_connected(mqtt, FALSE)) {
432 fb_mqtt_read_packet(mqtt);
436 static void
437 fb_mqtt_read_packet(FbMqtt *mqtt)
439 FbMqttPrivate *priv = mqtt->priv;
440 const guint8 *buf;
441 gsize count = 0;
442 gsize pos;
443 guint mult = 1;
444 guint8 byte;
445 gsize size = 0;
447 buf = g_buffered_input_stream_peek_buffer(priv->input, &count);
449 /* Start at 1 to skip the first byte */
450 pos = 1;
452 do {
453 if (pos >= count) {
454 /* Not enough data yet, try again later */
455 g_buffered_input_stream_fill_async(priv->input, -1,
456 G_PRIORITY_DEFAULT, priv->cancellable,
457 fb_mqtt_cb_fill, mqtt);
458 return;
461 byte = *(buf + pos++);
463 size += (byte & 127) * mult;
464 mult *= 128;
465 } while ((byte & 128) != 0);
467 /* Add header to size */
468 size += pos;
470 g_byte_array_set_size(priv->rbuf, size);
471 priv->remz = size;
473 /* TODO: Use g_input_stream_read_all_async() when available. */
474 /* TODO: Alternately, it would be nice to let the
475 * FbMqttMessage directly use the GBufferedInputStream
476 * buffer instead of copying it, provided it's consumed
477 * before the next read.
479 g_input_stream_read_async(G_INPUT_STREAM(priv->input),
480 priv->rbuf->data, priv->rbuf->len,
481 G_PRIORITY_DEFAULT, priv->cancellable,
482 fb_mqtt_cb_read_packet, mqtt);
485 void
486 fb_mqtt_read(FbMqtt *mqtt, FbMqttMessage *msg)
488 FbMqttMessage *nsg;
489 FbMqttPrivate *priv;
490 FbMqttMessagePrivate *mriv;
491 GByteArray *wytes;
492 gchar *str;
493 guint8 chr;
494 guint16 mid;
496 g_return_if_fail(FB_IS_MQTT(mqtt));
497 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
498 priv = mqtt->priv;
499 mriv = msg->priv;
501 fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes,
502 "Reading %d (flags: 0x%0X)",
503 mriv->type, mriv->flags);
505 switch (mriv->type) {
506 case FB_MQTT_MESSAGE_TYPE_CONNACK:
507 if (!fb_mqtt_message_read_byte(msg, NULL) ||
508 !fb_mqtt_message_read_byte(msg, &chr))
510 break;
513 if (chr != FB_MQTT_ERROR_SUCCESS) {
514 fb_mqtt_error(mqtt, chr, _("Connection failed (%u)"),
515 chr);
516 return;
519 priv->connected = TRUE;
520 fb_mqtt_ping(mqtt);
521 g_signal_emit_by_name(mqtt, "connect");
522 return;
524 case FB_MQTT_MESSAGE_TYPE_PUBLISH:
525 if (!fb_mqtt_message_read_str(msg, &str)) {
526 break;
529 if ((mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) ||
530 (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS2))
532 if (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) {
533 chr = FB_MQTT_MESSAGE_TYPE_PUBACK;
534 } else {
535 chr = FB_MQTT_MESSAGE_TYPE_PUBREC;
538 if (!fb_mqtt_message_read_mid(msg, &mid)) {
539 g_free(str);
540 break;
543 nsg = fb_mqtt_message_new(chr, 0);
544 fb_mqtt_message_write_u16(nsg, mid);
545 fb_mqtt_write(mqtt, nsg);
546 g_object_unref(nsg);
549 wytes = g_byte_array_new();
550 fb_mqtt_message_read_r(msg, wytes);
551 g_signal_emit_by_name(mqtt, "publish", str, wytes);
552 g_byte_array_free(wytes, TRUE);
553 g_free(str);
554 return;
556 case FB_MQTT_MESSAGE_TYPE_PUBREL:
557 if (!fb_mqtt_message_read_mid(msg, &mid)) {
558 break;
561 nsg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBCOMP, 0);
562 fb_mqtt_message_write_u16(nsg, mid); /* Message identifier */
563 fb_mqtt_write(mqtt, nsg);
564 g_object_unref(nsg);
565 return;
567 case FB_MQTT_MESSAGE_TYPE_PINGRESP:
568 fb_mqtt_ping(mqtt);
569 return;
571 case FB_MQTT_MESSAGE_TYPE_PUBACK:
572 case FB_MQTT_MESSAGE_TYPE_PUBCOMP:
573 case FB_MQTT_MESSAGE_TYPE_SUBACK:
574 case FB_MQTT_MESSAGE_TYPE_UNSUBACK:
575 return;
577 default:
578 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
579 _("Unknown packet (%u)"), mriv->type);
580 return;
583 /* Since no case returned, there was a parse error. */
584 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
585 _("Failed to parse message"));
588 static void
589 fb_mqtt_cb_push_bytes(GObject *source, GAsyncResult *res, gpointer data)
591 PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(source);
592 FbMqtt *mqtt = data;
593 GError *err = NULL;
595 if (!purple_queued_output_stream_push_bytes_finish(stream,
596 res, &err)) {
597 purple_queued_output_stream_clear_queue(stream);
599 fb_mqtt_take_error(mqtt, err, _("Failed to write data"));
600 return;
604 void
605 fb_mqtt_write(FbMqtt *mqtt, FbMqttMessage *msg)
607 const GByteArray *bytes;
608 FbMqttMessagePrivate *mriv;
609 FbMqttPrivate *priv;
610 GBytes *gbytes;
612 g_return_if_fail(FB_IS_MQTT(mqtt));
613 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
614 priv = mqtt->priv;
615 mriv = msg->priv;
617 bytes = fb_mqtt_message_bytes(msg);
619 if (G_UNLIKELY(bytes == NULL)) {
620 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
621 _("Failed to format data"));
622 return;
625 fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes,
626 "Writing %d (flags: 0x%0X)",
627 mriv->type, mriv->flags);
629 /* TODO: Would be nice to refactor this to not require copying bytes */
630 gbytes = g_bytes_new(bytes->data, bytes->len);
631 purple_queued_output_stream_push_bytes_async(priv->output, gbytes,
632 G_PRIORITY_DEFAULT, priv->cancellable,
633 fb_mqtt_cb_push_bytes, mqtt);
634 g_bytes_unref(gbytes);
637 static void
638 fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data)
640 FbMqtt *mqtt = data;
641 FbMqttPrivate *priv;
642 GSocketConnection *conn;
643 GError *err = NULL;
645 conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source),
646 res, &err);
648 if (conn == NULL) {
649 fb_mqtt_take_error(mqtt, err, NULL);
650 return;
653 fb_mqtt_timeout_clear(mqtt);
655 priv = mqtt->priv;
656 priv->conn = G_IO_STREAM(conn);
657 priv->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new(
658 g_io_stream_get_input_stream(priv->conn)));
659 priv->output = purple_queued_output_stream_new(
660 g_io_stream_get_output_stream(priv->conn));
662 fb_mqtt_read_packet(mqtt);
664 g_signal_emit_by_name(mqtt, "open");
667 void
668 fb_mqtt_open(FbMqtt *mqtt, const gchar *host, gint port)
670 FbMqttPrivate *priv;
671 PurpleAccount *acc;
672 GSocketClient *client;
673 GError *err = NULL;
675 g_return_if_fail(FB_IS_MQTT(mqtt));
676 priv = mqtt->priv;
678 acc = purple_connection_get_account(priv->gc);
679 fb_mqtt_close(mqtt);
681 client = purple_gio_socket_client_new(acc, &err);
683 if (client == NULL) {
684 fb_mqtt_take_error(mqtt, err, NULL);
685 return;
688 priv->cancellable = g_cancellable_new();
690 g_socket_client_set_tls(client, TRUE);
691 g_socket_client_connect_to_host_async(client, host, port,
692 priv->cancellable, fb_mqtt_cb_open, mqtt);
693 g_object_unref(client);
695 fb_mqtt_timeout(mqtt);
698 void
699 fb_mqtt_connect(FbMqtt *mqtt, guint8 flags, const GByteArray *pload)
701 FbMqttMessage *msg;
703 g_return_if_fail(!fb_mqtt_connected(mqtt, FALSE));
704 g_return_if_fail(pload != NULL);
706 /* Facebook always sends a CONNACK, use QoS1 */
707 flags |= FB_MQTT_CONNECT_FLAG_QOS1;
709 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_CONNECT, 0);
710 fb_mqtt_message_write_str(msg, FB_MQTT_NAME); /* Protocol name */
711 fb_mqtt_message_write_byte(msg, FB_MQTT_LEVEL); /* Protocol level */
712 fb_mqtt_message_write_byte(msg, flags); /* Flags */
713 fb_mqtt_message_write_u16(msg, FB_MQTT_KA); /* Keep alive */
715 fb_mqtt_message_write(msg, pload->data, pload->len);
716 fb_mqtt_write(mqtt, msg);
718 fb_mqtt_timeout(mqtt);
719 g_object_unref(msg);
722 gboolean
723 fb_mqtt_connected(FbMqtt *mqtt, gboolean error)
725 FbMqttPrivate *priv;
726 gboolean connected;
728 g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE);
729 priv = mqtt->priv;
730 connected = (priv->conn != NULL) && priv->connected;
732 if (!connected && error) {
733 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
734 _("Not connected"));
737 return connected;
740 void
741 fb_mqtt_disconnect(FbMqtt *mqtt)
743 FbMqttMessage *msg;
745 if (G_UNLIKELY(!fb_mqtt_connected(mqtt, FALSE))) {
746 return;
749 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_DISCONNECT, 0);
750 fb_mqtt_write(mqtt, msg);
751 g_object_unref(msg);
752 fb_mqtt_close(mqtt);
755 void
756 fb_mqtt_publish(FbMqtt *mqtt, const gchar *topic, const GByteArray *pload)
758 FbMqttMessage *msg;
759 FbMqttPrivate *priv;
761 g_return_if_fail(FB_IS_MQTT(mqtt));
762 g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
763 priv = mqtt->priv;
765 /* Message identifier not required, but for consistency use QoS1 */
766 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBLISH,
767 FB_MQTT_MESSAGE_FLAG_QOS1);
769 fb_mqtt_message_write_str(msg, topic); /* Message topic */
770 fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
772 if (pload != NULL) {
773 fb_mqtt_message_write(msg, pload->data, pload->len);
776 fb_mqtt_write(mqtt, msg);
777 g_object_unref(msg);
780 void
781 fb_mqtt_subscribe(FbMqtt *mqtt, const gchar *topic1, guint16 qos1, ...)
783 const gchar *topic;
784 FbMqttMessage *msg;
785 FbMqttPrivate *priv;
786 guint16 qos;
787 va_list ap;
789 g_return_if_fail(FB_IS_MQTT(mqtt));
790 g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
791 priv = mqtt->priv;
793 /* Facebook requires a message identifier, use QoS1 */
794 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_SUBSCRIBE,
795 FB_MQTT_MESSAGE_FLAG_QOS1);
797 fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
798 fb_mqtt_message_write_str(msg, topic1); /* First topics */
799 fb_mqtt_message_write_byte(msg, qos1); /* First QoS value */
801 va_start(ap, qos1);
803 while ((topic = va_arg(ap, const gchar*)) != NULL) {
804 qos = va_arg(ap, guint);
805 fb_mqtt_message_write_str(msg, topic); /* Remaining topics */
806 fb_mqtt_message_write_byte(msg, qos); /* Remaining QoS values */
809 va_end(ap);
811 fb_mqtt_write(mqtt, msg);
812 g_object_unref(msg);
815 void
816 fb_mqtt_unsubscribe(FbMqtt *mqtt, const gchar *topic1, ...)
818 const gchar *topic;
819 FbMqttMessage *msg;
820 FbMqttPrivate *priv;
821 va_list ap;
823 g_return_if_fail(FB_IS_MQTT(mqtt));
824 g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
825 priv = mqtt->priv;
827 /* Facebook requires a message identifier, use QoS1 */
828 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE,
829 FB_MQTT_MESSAGE_FLAG_QOS1);
831 fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
832 fb_mqtt_message_write_str(msg, topic1); /* First topic */
834 va_start(ap, topic1);
836 while ((topic = va_arg(ap, const gchar*)) != NULL) {
837 fb_mqtt_message_write_str(msg, topic); /* Remaining topics */
840 va_end(ap);
842 fb_mqtt_write(mqtt, msg);
843 g_object_unref(msg);
846 FbMqttMessage *
847 fb_mqtt_message_new(FbMqttMessageType type, FbMqttMessageFlags flags)
849 FbMqttMessage *msg;
850 FbMqttMessagePrivate *priv;
852 msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
853 priv = msg->priv;
855 priv->type = type;
856 priv->flags = flags;
857 priv->bytes = g_byte_array_new();
858 priv->local = TRUE;
860 return msg;
863 FbMqttMessage *
864 fb_mqtt_message_new_bytes(GByteArray *bytes)
866 FbMqttMessage *msg;
867 FbMqttMessagePrivate *priv;
868 guint8 *byte;
870 g_return_val_if_fail(bytes != NULL, NULL);
871 g_return_val_if_fail(bytes->len >= 2, NULL);
873 msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
874 priv = msg->priv;
876 priv->bytes = bytes;
877 priv->local = FALSE;
878 priv->type = (*bytes->data & 0xF0) >> 4;
879 priv->flags = *bytes->data & 0x0F;
881 /* Skip the fixed header */
882 for (byte = priv->bytes->data + 1; (*(byte++) & 128) != 0; );
883 priv->offset = byte - bytes->data;
884 priv->pos = priv->offset;
886 return msg;
889 void
890 fb_mqtt_message_reset(FbMqttMessage *msg)
892 FbMqttMessagePrivate *priv;
894 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
895 priv = msg->priv;
897 if (priv->offset > 0) {
898 g_byte_array_remove_range(priv->bytes, 0, priv->offset);
899 priv->offset = 0;
900 priv->pos = 0;
904 const GByteArray *
905 fb_mqtt_message_bytes(FbMqttMessage *msg)
907 FbMqttMessagePrivate *priv;
908 guint i;
909 guint8 byte;
910 guint8 sbuf[4];
911 guint32 size;
913 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), NULL);
914 priv = msg->priv;
916 i = 0;
917 size = priv->bytes->len - priv->offset;
919 do {
920 if (G_UNLIKELY(i >= G_N_ELEMENTS(sbuf))) {
921 return NULL;
924 byte = size % 128;
925 size /= 128;
927 if (size > 0) {
928 byte |= 128;
931 sbuf[i++] = byte;
932 } while (size > 0);
934 fb_mqtt_message_reset(msg);
935 g_byte_array_prepend(priv->bytes, sbuf, i);
937 byte = ((priv->type & 0x0F) << 4) | (priv->flags & 0x0F);
938 g_byte_array_prepend(priv->bytes, &byte, sizeof byte);
940 priv->pos = (i + 1) * (sizeof byte);
941 return priv->bytes;
944 gboolean
945 fb_mqtt_message_read(FbMqttMessage *msg, gpointer data, guint size)
947 FbMqttMessagePrivate *priv;
949 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
950 priv = msg->priv;
952 if ((priv->pos + size) > priv->bytes->len) {
953 return FALSE;
956 if ((data != NULL) && (size > 0)) {
957 memcpy(data, priv->bytes->data + priv->pos, size);
960 priv->pos += size;
961 return TRUE;
964 gboolean
965 fb_mqtt_message_read_r(FbMqttMessage *msg, GByteArray *bytes)
967 FbMqttMessagePrivate *priv;
968 guint size;
970 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
971 priv = msg->priv;
972 size = priv->bytes->len - priv->pos;
974 if (G_LIKELY(size > 0)) {
975 g_byte_array_append(bytes, priv->bytes->data + priv->pos,
976 size);
979 return TRUE;
982 gboolean
983 fb_mqtt_message_read_byte(FbMqttMessage *msg, guint8 *value)
985 return fb_mqtt_message_read(msg, value, sizeof *value);
988 gboolean
989 fb_mqtt_message_read_mid(FbMqttMessage *msg, guint16 *value)
991 return fb_mqtt_message_read_u16(msg, value);
994 gboolean
995 fb_mqtt_message_read_u16(FbMqttMessage *msg, guint16 *value)
997 if (!fb_mqtt_message_read(msg, value, sizeof *value)) {
998 return FALSE;
1001 if (value != NULL) {
1002 *value = g_ntohs(*value);
1005 return TRUE;
1008 gboolean
1009 fb_mqtt_message_read_str(FbMqttMessage *msg, gchar **value)
1011 guint8 *data;
1012 guint16 size;
1014 if (!fb_mqtt_message_read_u16(msg, &size)) {
1015 return FALSE;
1018 if (value != NULL) {
1019 data = g_new(guint8, size + 1);
1020 data[size] = 0;
1021 } else {
1022 data = NULL;
1025 if (!fb_mqtt_message_read(msg, data, size)) {
1026 g_free(data);
1027 return FALSE;
1030 if (value != NULL) {
1031 *value = (gchar *) data;
1034 return TRUE;
1037 void
1038 fb_mqtt_message_write(FbMqttMessage *msg, gconstpointer data, guint size)
1040 FbMqttMessagePrivate *priv;
1042 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
1043 priv = msg->priv;
1045 g_byte_array_append(priv->bytes, data, size);
1046 priv->pos += size;
1049 void
1050 fb_mqtt_message_write_byte(FbMqttMessage *msg, guint8 value)
1052 fb_mqtt_message_write(msg, &value, sizeof value);
1055 void
1056 fb_mqtt_message_write_mid(FbMqttMessage *msg, guint16 *value)
1058 g_return_if_fail(value != NULL);
1059 fb_mqtt_message_write_u16(msg, ++(*value));
1062 void
1063 fb_mqtt_message_write_u16(FbMqttMessage *msg, guint16 value)
1065 value = g_htons(value);
1066 fb_mqtt_message_write(msg, &value, sizeof value);
1069 void
1070 fb_mqtt_message_write_str(FbMqttMessage *msg, const gchar *value)
1072 gint16 size;
1074 g_return_if_fail(value != NULL);
1076 size = strlen(value);
1077 fb_mqtt_message_write_u16(msg, size);
1078 fb_mqtt_message_write(msg, value, size);