3 * Purple is the legal property of its developers, whose names are too numerous
4 * to list here. Please refer to the COPYRIGHT file distributed with this
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
24 #include <glib/gprintf.h>
29 #include "eventloop.h"
30 #include "glibcompat.h"
31 #include "purple-gio.h"
32 #include "queuedoutputstream.h"
41 GBufferedInputStream
*input
;
42 PurpleQueuedOutputStream
*output
;
43 GCancellable
*cancellable
;
56 * Represents an MQTT connection.
64 G_DEFINE_TYPE_WITH_PRIVATE(FbMqtt
, fb_mqtt
, G_TYPE_OBJECT
);
68 FbMqttMessageType type
;
69 FbMqttMessageFlags flags
;
76 } FbMqttMessagePrivate
;
81 * Represents a reader/writer for an MQTT message.
86 FbMqttMessagePrivate
*priv
;
89 G_DEFINE_TYPE_WITH_PRIVATE(FbMqttMessage
, fb_mqtt_message
, G_TYPE_OBJECT
);
91 static void fb_mqtt_read_packet(FbMqtt
*mqtt
);
94 fb_mqtt_dispose(GObject
*obj
)
96 FbMqtt
*mqtt
= FB_MQTT(obj
);
97 FbMqttPrivate
*priv
= mqtt
->priv
;
100 g_byte_array_free(priv
->rbuf
, TRUE
);
104 fb_mqtt_class_init(FbMqttClass
*klass
)
106 GObjectClass
*gklass
= G_OBJECT_CLASS(klass
);
108 gklass
->dispose
= fb_mqtt_dispose
;
111 * @mqtt: The #FbMqtt.
113 * Emitted upon the successful completion of the connection
114 * process. This is emitted as a result of #fb_mqtt_connect().
116 g_signal_new("connect",
117 G_TYPE_FROM_CLASS(klass
),
126 * @mqtt: The #FbMqtt.
127 * @error: The #GError.
129 * Emitted whenever an error is hit within the #FbMqtt. This
130 * should close the #FbMqtt with #fb_mqtt_close().
132 g_signal_new("error",
133 G_TYPE_FROM_CLASS(klass
),
142 * @mqtt: The #FbMqtt.
144 * Emitted upon the successful opening of the remote socket.
145 * This is emitted as a result of #fb_mqtt_open(). This should
146 * call #fb_mqtt_connect().
149 G_TYPE_FROM_CLASS(klass
),
158 * @mqtt: The #FbMqtt.
160 * @pload: The payload.
162 * Emitted upon an incoming message from the steam.
164 g_signal_new("publish",
165 G_TYPE_FROM_CLASS(klass
),
170 2, G_TYPE_STRING
, G_TYPE_BYTE_ARRAY
);
174 fb_mqtt_init(FbMqtt
*mqtt
)
176 FbMqttPrivate
*priv
= fb_mqtt_get_instance_private(mqtt
);
180 priv
->rbuf
= g_byte_array_new();
184 fb_mqtt_message_dispose(GObject
*obj
)
186 FbMqttMessagePrivate
*priv
= FB_MQTT_MESSAGE(obj
)->priv
;
188 if ((priv
->bytes
!= NULL
) && priv
->local
) {
189 g_byte_array_free(priv
->bytes
, TRUE
);
194 fb_mqtt_message_class_init(FbMqttMessageClass
*klass
)
196 GObjectClass
*gklass
= G_OBJECT_CLASS(klass
);
198 gklass
->dispose
= fb_mqtt_message_dispose
;
202 fb_mqtt_message_init(FbMqttMessage
*msg
)
204 FbMqttMessagePrivate
*priv
= fb_mqtt_message_get_instance_private(msg
);
210 fb_mqtt_error_quark(void)
214 if (G_UNLIKELY(q
== 0)) {
215 q
= g_quark_from_static_string("fb-mqtt-error-quark");
222 fb_mqtt_new(PurpleConnection
*gc
)
227 g_return_val_if_fail(PURPLE_IS_CONNECTION(gc
), NULL
);
229 mqtt
= g_object_new(FB_TYPE_MQTT
, NULL
);
237 fb_mqtt_close(FbMqtt
*mqtt
)
241 g_return_if_fail(FB_IS_MQTT(mqtt
));
245 g_source_remove(priv
->tev
);
249 if (priv
->cancellable
!= NULL
) {
250 g_cancellable_cancel(priv
->cancellable
);
251 g_clear_object(&priv
->cancellable
);
254 if (priv
->conn
!= NULL
) {
255 purple_gio_graceful_close(priv
->conn
,
256 G_INPUT_STREAM(priv
->input
),
257 G_OUTPUT_STREAM(priv
->output
));
258 g_clear_object(&priv
->input
);
259 g_clear_object(&priv
->output
);
260 g_clear_object(&priv
->conn
);
263 priv
->connected
= FALSE
;
264 g_byte_array_set_size(priv
->rbuf
, 0);
268 fb_mqtt_take_error(FbMqtt
*mqtt
, GError
*err
, const gchar
*prefix
)
270 if (g_error_matches(err
, G_IO_ERROR
, G_IO_ERROR_CANCELLED
)) {
271 /* Return as cancelled means the connection is closing */
276 /* Now we can check for programming errors */
277 g_return_if_fail(FB_IS_MQTT(mqtt
));
279 if (prefix
!= NULL
) {
280 g_prefix_error(&err
, "%s: ", prefix
);
283 g_signal_emit_by_name(mqtt
, "error", err
);
288 fb_mqtt_error(FbMqtt
*mqtt
, FbMqttError error
, const gchar
*format
, ...)
293 g_return_if_fail(FB_IS_MQTT(mqtt
));
295 va_start(ap
, format
);
296 err
= g_error_new_valist(FB_MQTT_ERROR
, error
, format
, ap
);
299 g_signal_emit_by_name(mqtt
, "error", err
);
304 fb_mqtt_cb_timeout(gpointer data
)
307 FbMqttPrivate
*priv
= mqtt
->priv
;
310 fb_mqtt_error(mqtt
, FB_MQTT_ERROR_GENERAL
, _("Connection timed out"));
315 fb_mqtt_timeout_clear(FbMqtt
*mqtt
)
317 FbMqttPrivate
*priv
= mqtt
->priv
;
320 g_source_remove(priv
->tev
);
326 fb_mqtt_timeout(FbMqtt
*mqtt
)
328 FbMqttPrivate
*priv
= mqtt
->priv
;
330 fb_mqtt_timeout_clear(mqtt
);
331 priv
->tev
= g_timeout_add(FB_MQTT_TIMEOUT_CONN
,
332 fb_mqtt_cb_timeout
, mqtt
);
336 fb_mqtt_cb_ping(gpointer data
)
340 FbMqttPrivate
*priv
= mqtt
->priv
;
342 msg
= fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PINGREQ
, 0);
343 fb_mqtt_write(mqtt
, msg
);
347 fb_mqtt_timeout(mqtt
);
352 fb_mqtt_ping(FbMqtt
*mqtt
)
354 FbMqttPrivate
*priv
= mqtt
->priv
;
356 fb_mqtt_timeout_clear(mqtt
);
357 priv
->tev
= g_timeout_add(FB_MQTT_TIMEOUT_PING
,
358 fb_mqtt_cb_ping
, mqtt
);
362 fb_mqtt_cb_fill(GObject
*source
, GAsyncResult
*res
, gpointer data
)
364 GBufferedInputStream
*input
= G_BUFFERED_INPUT_STREAM(source
);
369 ret
= g_buffered_input_stream_fill_finish(input
, res
, &err
);
373 err
= g_error_new_literal(G_IO_ERROR
,
374 G_IO_ERROR_CONNECTION_CLOSED
,
375 _("Connection closed"));
378 fb_mqtt_take_error(mqtt
, err
, _("Failed to read fixed header"));
382 fb_mqtt_read_packet(mqtt
);
386 fb_mqtt_cb_read_packet(GObject
*source
, GAsyncResult
*res
, gpointer data
)
394 ret
= g_input_stream_read_finish(G_INPUT_STREAM(source
), res
, &err
);
398 err
= g_error_new_literal(G_IO_ERROR
,
399 G_IO_ERROR_CONNECTION_CLOSED
,
400 _("Connection closed"));
403 fb_mqtt_take_error(mqtt
, err
, _("Failed to read packet data"));
410 if (priv
->remz
> 0) {
411 g_input_stream_read_async(G_INPUT_STREAM(source
),
413 priv
->rbuf
->len
- priv
->remz
, priv
->remz
,
414 G_PRIORITY_DEFAULT
, priv
->cancellable
,
415 fb_mqtt_cb_read_packet
, mqtt
);
419 msg
= fb_mqtt_message_new_bytes(priv
->rbuf
);
421 if (G_UNLIKELY(msg
== NULL
)) {
422 fb_mqtt_error(mqtt
, FB_MQTT_ERROR_GENERAL
,
423 _("Failed to parse message"));
427 fb_mqtt_read(mqtt
, msg
);
430 /* Read another packet if connection wasn't reset in fb_mqtt_read() */
431 if (fb_mqtt_connected(mqtt
, FALSE
)) {
432 fb_mqtt_read_packet(mqtt
);
437 fb_mqtt_read_packet(FbMqtt
*mqtt
)
439 FbMqttPrivate
*priv
= mqtt
->priv
;
447 buf
= g_buffered_input_stream_peek_buffer(priv
->input
, &count
);
449 /* Start at 1 to skip the first byte */
454 /* Not enough data yet, try again later */
455 g_buffered_input_stream_fill_async(priv
->input
, -1,
456 G_PRIORITY_DEFAULT
, priv
->cancellable
,
457 fb_mqtt_cb_fill
, mqtt
);
461 byte
= *(buf
+ pos
++);
463 size
+= (byte
& 127) * mult
;
465 } while ((byte
& 128) != 0);
467 /* Add header to size */
470 g_byte_array_set_size(priv
->rbuf
, size
);
473 /* TODO: Use g_input_stream_read_all_async() when available. */
474 /* TODO: Alternately, it would be nice to let the
475 * FbMqttMessage directly use the GBufferedInputStream
476 * buffer instead of copying it, provided it's consumed
477 * before the next read.
479 g_input_stream_read_async(G_INPUT_STREAM(priv
->input
),
480 priv
->rbuf
->data
, priv
->rbuf
->len
,
481 G_PRIORITY_DEFAULT
, priv
->cancellable
,
482 fb_mqtt_cb_read_packet
, mqtt
);
486 fb_mqtt_read(FbMqtt
*mqtt
, FbMqttMessage
*msg
)
490 FbMqttMessagePrivate
*mriv
;
496 g_return_if_fail(FB_IS_MQTT(mqtt
));
497 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg
));
501 fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO
, mriv
->bytes
,
502 "Reading %d (flags: 0x%0X)",
503 mriv
->type
, mriv
->flags
);
505 switch (mriv
->type
) {
506 case FB_MQTT_MESSAGE_TYPE_CONNACK
:
507 if (!fb_mqtt_message_read_byte(msg
, NULL
) ||
508 !fb_mqtt_message_read_byte(msg
, &chr
))
513 if (chr
!= FB_MQTT_ERROR_SUCCESS
) {
514 fb_mqtt_error(mqtt
, chr
, _("Connection failed (%u)"),
519 priv
->connected
= TRUE
;
521 g_signal_emit_by_name(mqtt
, "connect");
524 case FB_MQTT_MESSAGE_TYPE_PUBLISH
:
525 if (!fb_mqtt_message_read_str(msg
, &str
)) {
529 if ((mriv
->flags
& FB_MQTT_MESSAGE_FLAG_QOS1
) ||
530 (mriv
->flags
& FB_MQTT_MESSAGE_FLAG_QOS2
))
532 if (mriv
->flags
& FB_MQTT_MESSAGE_FLAG_QOS1
) {
533 chr
= FB_MQTT_MESSAGE_TYPE_PUBACK
;
535 chr
= FB_MQTT_MESSAGE_TYPE_PUBREC
;
538 if (!fb_mqtt_message_read_mid(msg
, &mid
)) {
543 nsg
= fb_mqtt_message_new(chr
, 0);
544 fb_mqtt_message_write_u16(nsg
, mid
);
545 fb_mqtt_write(mqtt
, nsg
);
549 wytes
= g_byte_array_new();
550 fb_mqtt_message_read_r(msg
, wytes
);
551 g_signal_emit_by_name(mqtt
, "publish", str
, wytes
);
552 g_byte_array_free(wytes
, TRUE
);
556 case FB_MQTT_MESSAGE_TYPE_PUBREL
:
557 if (!fb_mqtt_message_read_mid(msg
, &mid
)) {
561 nsg
= fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBCOMP
, 0);
562 fb_mqtt_message_write_u16(nsg
, mid
); /* Message identifier */
563 fb_mqtt_write(mqtt
, nsg
);
567 case FB_MQTT_MESSAGE_TYPE_PINGRESP
:
571 case FB_MQTT_MESSAGE_TYPE_PUBACK
:
572 case FB_MQTT_MESSAGE_TYPE_PUBCOMP
:
573 case FB_MQTT_MESSAGE_TYPE_SUBACK
:
574 case FB_MQTT_MESSAGE_TYPE_UNSUBACK
:
578 fb_mqtt_error(mqtt
, FB_MQTT_ERROR_GENERAL
,
579 _("Unknown packet (%u)"), mriv
->type
);
583 /* Since no case returned, there was a parse error. */
584 fb_mqtt_error(mqtt
, FB_MQTT_ERROR_GENERAL
,
585 _("Failed to parse message"));
589 fb_mqtt_cb_push_bytes(GObject
*source
, GAsyncResult
*res
, gpointer data
)
591 PurpleQueuedOutputStream
*stream
= PURPLE_QUEUED_OUTPUT_STREAM(source
);
595 if (!purple_queued_output_stream_push_bytes_finish(stream
,
597 purple_queued_output_stream_clear_queue(stream
);
599 fb_mqtt_take_error(mqtt
, err
, _("Failed to write data"));
605 fb_mqtt_write(FbMqtt
*mqtt
, FbMqttMessage
*msg
)
607 const GByteArray
*bytes
;
608 FbMqttMessagePrivate
*mriv
;
612 g_return_if_fail(FB_IS_MQTT(mqtt
));
613 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg
));
617 bytes
= fb_mqtt_message_bytes(msg
);
619 if (G_UNLIKELY(bytes
== NULL
)) {
620 fb_mqtt_error(mqtt
, FB_MQTT_ERROR_GENERAL
,
621 _("Failed to format data"));
625 fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO
, mriv
->bytes
,
626 "Writing %d (flags: 0x%0X)",
627 mriv
->type
, mriv
->flags
);
629 /* TODO: Would be nice to refactor this to not require copying bytes */
630 gbytes
= g_bytes_new(bytes
->data
, bytes
->len
);
631 purple_queued_output_stream_push_bytes_async(priv
->output
, gbytes
,
632 G_PRIORITY_DEFAULT
, priv
->cancellable
,
633 fb_mqtt_cb_push_bytes
, mqtt
);
634 g_bytes_unref(gbytes
);
638 fb_mqtt_cb_open(GObject
*source
, GAsyncResult
*res
, gpointer data
)
642 GSocketConnection
*conn
;
645 conn
= g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source
),
649 fb_mqtt_take_error(mqtt
, err
, NULL
);
653 fb_mqtt_timeout_clear(mqtt
);
656 priv
->conn
= G_IO_STREAM(conn
);
657 priv
->input
= G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new(
658 g_io_stream_get_input_stream(priv
->conn
)));
659 priv
->output
= purple_queued_output_stream_new(
660 g_io_stream_get_output_stream(priv
->conn
));
662 fb_mqtt_read_packet(mqtt
);
664 g_signal_emit_by_name(mqtt
, "open");
668 fb_mqtt_open(FbMqtt
*mqtt
, const gchar
*host
, gint port
)
672 GSocketClient
*client
;
675 g_return_if_fail(FB_IS_MQTT(mqtt
));
678 acc
= purple_connection_get_account(priv
->gc
);
681 client
= purple_gio_socket_client_new(acc
, &err
);
683 if (client
== NULL
) {
684 fb_mqtt_take_error(mqtt
, err
, NULL
);
688 priv
->cancellable
= g_cancellable_new();
690 g_socket_client_set_tls(client
, TRUE
);
691 g_socket_client_connect_to_host_async(client
, host
, port
,
692 priv
->cancellable
, fb_mqtt_cb_open
, mqtt
);
693 g_object_unref(client
);
695 fb_mqtt_timeout(mqtt
);
699 fb_mqtt_connect(FbMqtt
*mqtt
, guint8 flags
, const GByteArray
*pload
)
703 g_return_if_fail(!fb_mqtt_connected(mqtt
, FALSE
));
704 g_return_if_fail(pload
!= NULL
);
706 /* Facebook always sends a CONNACK, use QoS1 */
707 flags
|= FB_MQTT_CONNECT_FLAG_QOS1
;
709 msg
= fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_CONNECT
, 0);
710 fb_mqtt_message_write_str(msg
, FB_MQTT_NAME
); /* Protocol name */
711 fb_mqtt_message_write_byte(msg
, FB_MQTT_LEVEL
); /* Protocol level */
712 fb_mqtt_message_write_byte(msg
, flags
); /* Flags */
713 fb_mqtt_message_write_u16(msg
, FB_MQTT_KA
); /* Keep alive */
715 fb_mqtt_message_write(msg
, pload
->data
, pload
->len
);
716 fb_mqtt_write(mqtt
, msg
);
718 fb_mqtt_timeout(mqtt
);
723 fb_mqtt_connected(FbMqtt
*mqtt
, gboolean error
)
728 g_return_val_if_fail(FB_IS_MQTT(mqtt
), FALSE
);
730 connected
= (priv
->conn
!= NULL
) && priv
->connected
;
732 if (!connected
&& error
) {
733 fb_mqtt_error(mqtt
, FB_MQTT_ERROR_GENERAL
,
741 fb_mqtt_disconnect(FbMqtt
*mqtt
)
745 if (G_UNLIKELY(!fb_mqtt_connected(mqtt
, FALSE
))) {
749 msg
= fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_DISCONNECT
, 0);
750 fb_mqtt_write(mqtt
, msg
);
756 fb_mqtt_publish(FbMqtt
*mqtt
, const gchar
*topic
, const GByteArray
*pload
)
761 g_return_if_fail(FB_IS_MQTT(mqtt
));
762 g_return_if_fail(fb_mqtt_connected(mqtt
, FALSE
));
765 /* Message identifier not required, but for consistency use QoS1 */
766 msg
= fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBLISH
,
767 FB_MQTT_MESSAGE_FLAG_QOS1
);
769 fb_mqtt_message_write_str(msg
, topic
); /* Message topic */
770 fb_mqtt_message_write_mid(msg
, &priv
->mid
); /* Message identifier */
773 fb_mqtt_message_write(msg
, pload
->data
, pload
->len
);
776 fb_mqtt_write(mqtt
, msg
);
781 fb_mqtt_subscribe(FbMqtt
*mqtt
, const gchar
*topic1
, guint16 qos1
, ...)
789 g_return_if_fail(FB_IS_MQTT(mqtt
));
790 g_return_if_fail(fb_mqtt_connected(mqtt
, FALSE
));
793 /* Facebook requires a message identifier, use QoS1 */
794 msg
= fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_SUBSCRIBE
,
795 FB_MQTT_MESSAGE_FLAG_QOS1
);
797 fb_mqtt_message_write_mid(msg
, &priv
->mid
); /* Message identifier */
798 fb_mqtt_message_write_str(msg
, topic1
); /* First topics */
799 fb_mqtt_message_write_byte(msg
, qos1
); /* First QoS value */
803 while ((topic
= va_arg(ap
, const gchar
*)) != NULL
) {
804 qos
= va_arg(ap
, guint
);
805 fb_mqtt_message_write_str(msg
, topic
); /* Remaining topics */
806 fb_mqtt_message_write_byte(msg
, qos
); /* Remaining QoS values */
811 fb_mqtt_write(mqtt
, msg
);
816 fb_mqtt_unsubscribe(FbMqtt
*mqtt
, const gchar
*topic1
, ...)
823 g_return_if_fail(FB_IS_MQTT(mqtt
));
824 g_return_if_fail(fb_mqtt_connected(mqtt
, FALSE
));
827 /* Facebook requires a message identifier, use QoS1 */
828 msg
= fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE
,
829 FB_MQTT_MESSAGE_FLAG_QOS1
);
831 fb_mqtt_message_write_mid(msg
, &priv
->mid
); /* Message identifier */
832 fb_mqtt_message_write_str(msg
, topic1
); /* First topic */
834 va_start(ap
, topic1
);
836 while ((topic
= va_arg(ap
, const gchar
*)) != NULL
) {
837 fb_mqtt_message_write_str(msg
, topic
); /* Remaining topics */
842 fb_mqtt_write(mqtt
, msg
);
847 fb_mqtt_message_new(FbMqttMessageType type
, FbMqttMessageFlags flags
)
850 FbMqttMessagePrivate
*priv
;
852 msg
= g_object_new(FB_TYPE_MQTT_MESSAGE
, NULL
);
857 priv
->bytes
= g_byte_array_new();
864 fb_mqtt_message_new_bytes(GByteArray
*bytes
)
867 FbMqttMessagePrivate
*priv
;
870 g_return_val_if_fail(bytes
!= NULL
, NULL
);
871 g_return_val_if_fail(bytes
->len
>= 2, NULL
);
873 msg
= g_object_new(FB_TYPE_MQTT_MESSAGE
, NULL
);
878 priv
->type
= (*bytes
->data
& 0xF0) >> 4;
879 priv
->flags
= *bytes
->data
& 0x0F;
881 /* Skip the fixed header */
882 for (byte
= priv
->bytes
->data
+ 1; (*(byte
++) & 128) != 0; );
883 priv
->offset
= byte
- bytes
->data
;
884 priv
->pos
= priv
->offset
;
890 fb_mqtt_message_reset(FbMqttMessage
*msg
)
892 FbMqttMessagePrivate
*priv
;
894 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg
));
897 if (priv
->offset
> 0) {
898 g_byte_array_remove_range(priv
->bytes
, 0, priv
->offset
);
905 fb_mqtt_message_bytes(FbMqttMessage
*msg
)
907 FbMqttMessagePrivate
*priv
;
913 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg
), NULL
);
917 size
= priv
->bytes
->len
- priv
->offset
;
920 if (G_UNLIKELY(i
>= G_N_ELEMENTS(sbuf
))) {
934 fb_mqtt_message_reset(msg
);
935 g_byte_array_prepend(priv
->bytes
, sbuf
, i
);
937 byte
= ((priv
->type
& 0x0F) << 4) | (priv
->flags
& 0x0F);
938 g_byte_array_prepend(priv
->bytes
, &byte
, sizeof byte
);
940 priv
->pos
= (i
+ 1) * (sizeof byte
);
945 fb_mqtt_message_read(FbMqttMessage
*msg
, gpointer data
, guint size
)
947 FbMqttMessagePrivate
*priv
;
949 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg
), FALSE
);
952 if ((priv
->pos
+ size
) > priv
->bytes
->len
) {
956 if ((data
!= NULL
) && (size
> 0)) {
957 memcpy(data
, priv
->bytes
->data
+ priv
->pos
, size
);
965 fb_mqtt_message_read_r(FbMqttMessage
*msg
, GByteArray
*bytes
)
967 FbMqttMessagePrivate
*priv
;
970 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg
), FALSE
);
972 size
= priv
->bytes
->len
- priv
->pos
;
974 if (G_LIKELY(size
> 0)) {
975 g_byte_array_append(bytes
, priv
->bytes
->data
+ priv
->pos
,
983 fb_mqtt_message_read_byte(FbMqttMessage
*msg
, guint8
*value
)
985 return fb_mqtt_message_read(msg
, value
, sizeof *value
);
989 fb_mqtt_message_read_mid(FbMqttMessage
*msg
, guint16
*value
)
991 return fb_mqtt_message_read_u16(msg
, value
);
995 fb_mqtt_message_read_u16(FbMqttMessage
*msg
, guint16
*value
)
997 if (!fb_mqtt_message_read(msg
, value
, sizeof *value
)) {
1001 if (value
!= NULL
) {
1002 *value
= g_ntohs(*value
);
1009 fb_mqtt_message_read_str(FbMqttMessage
*msg
, gchar
**value
)
1014 if (!fb_mqtt_message_read_u16(msg
, &size
)) {
1018 if (value
!= NULL
) {
1019 data
= g_new(guint8
, size
+ 1);
1025 if (!fb_mqtt_message_read(msg
, data
, size
)) {
1030 if (value
!= NULL
) {
1031 *value
= (gchar
*) data
;
1038 fb_mqtt_message_write(FbMqttMessage
*msg
, gconstpointer data
, guint size
)
1040 FbMqttMessagePrivate
*priv
;
1042 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg
));
1045 g_byte_array_append(priv
->bytes
, data
, size
);
1050 fb_mqtt_message_write_byte(FbMqttMessage
*msg
, guint8 value
)
1052 fb_mqtt_message_write(msg
, &value
, sizeof value
);
1056 fb_mqtt_message_write_mid(FbMqttMessage
*msg
, guint16
*value
)
1058 g_return_if_fail(value
!= NULL
);
1059 fb_mqtt_message_write_u16(msg
, ++(*value
));
1063 fb_mqtt_message_write_u16(FbMqttMessage
*msg
, guint16 value
)
1065 value
= g_htons(value
);
1066 fb_mqtt_message_write(msg
, &value
, sizeof value
);
1070 fb_mqtt_message_write_str(FbMqttMessage
*msg
, const gchar
*value
)
1074 g_return_if_fail(value
!= NULL
);
1076 size
= strlen(value
);
1077 fb_mqtt_message_write_u16(msg
, size
);
1078 fb_mqtt_message_write(msg
, value
, size
);