1 /* GDBus - GLib D-Bus Library
3 * Copyright (C) 2008-2010 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
18 * Author: David Zeuthen <davidz@redhat.com>
28 #include "gdbusprivate.h"
29 #include "gdbusmessage.h"
30 #include "gdbuserror.h"
31 #include "gdbusintrospection.h"
33 #include "ginputstream.h"
34 #include "gmemoryinputstream.h"
35 #include "giostream.h"
36 #include "glib/gstdio.h"
37 #include "gsocketcontrolmessage.h"
38 #include "gsocketconnection.h"
39 #include "gsocketoutputstream.h"
42 #include "gunixfdmessage.h"
43 #include "gunixconnection.h"
44 #include "gunixcredentialsmessage.h"
53 static gboolean
_g_dbus_worker_do_initial_read (gpointer data
);
54 static void schedule_pending_close (GDBusWorker
*worker
);
56 /* ---------------------------------------------------------------------------------------------------- */
59 _g_dbus_hexdump (const gchar
*data
, gsize len
, guint indent
)
64 ret
= g_string_new (NULL
);
66 for (n
= 0; n
< len
; n
+= 16)
68 g_string_append_printf (ret
, "%*s%04x: ", indent
, "", n
);
70 for (m
= n
; m
< n
+ 16; m
++)
72 if (m
> n
&& (m
%4) == 0)
73 g_string_append_c (ret
, ' ');
75 g_string_append_printf (ret
, "%02x ", (guchar
) data
[m
]);
77 g_string_append (ret
, " ");
80 g_string_append (ret
, " ");
82 for (m
= n
; m
< len
&& m
< n
+ 16; m
++)
83 g_string_append_c (ret
, g_ascii_isprint (data
[m
]) ? data
[m
] : '.');
85 g_string_append_c (ret
, '\n');
88 return g_string_free (ret
, FALSE
);
91 /* ---------------------------------------------------------------------------------------------------- */
93 /* Unfortunately ancillary messages are discarded when reading from a
94 * socket using the GSocketInputStream abstraction. So we provide a
95 * very GInputStream-ish API that uses GSocket in this case (very
96 * similar to GSocketInputStream).
104 GSocketControlMessage
***messages
;
106 } ReadWithControlData
;
109 read_with_control_data_free (ReadWithControlData
*data
)
111 g_slice_free (ReadWithControlData
, data
);
115 _g_socket_read_with_control_messages_ready (GSocket
*socket
,
116 GIOCondition condition
,
119 GTask
*task
= user_data
;
120 ReadWithControlData
*data
= g_task_get_task_data (task
);
126 vector
.buffer
= data
->buffer
;
127 vector
.size
= data
->count
;
128 result
= g_socket_receive_message (socket
,
135 g_task_get_cancellable (task
),
138 if (g_error_matches (error
, G_IO_ERROR
, G_IO_ERROR_WOULD_BLOCK
))
140 g_error_free (error
);
144 g_assert (result
>= 0 || error
!= NULL
);
146 g_task_return_int (task
, result
);
148 g_task_return_error (task
, error
);
149 g_object_unref (task
);
155 _g_socket_read_with_control_messages (GSocket
*socket
,
158 GSocketControlMessage
***messages
,
161 GCancellable
*cancellable
,
162 GAsyncReadyCallback callback
,
166 ReadWithControlData
*data
;
169 data
= g_slice_new0 (ReadWithControlData
);
170 data
->buffer
= buffer
;
172 data
->messages
= messages
;
173 data
->num_messages
= num_messages
;
175 task
= g_task_new (socket
, cancellable
, callback
, user_data
);
176 g_task_set_source_tag (task
, _g_socket_read_with_control_messages
);
177 g_task_set_task_data (task
, data
, (GDestroyNotify
) read_with_control_data_free
);
179 if (g_socket_condition_check (socket
, G_IO_IN
))
181 if (!_g_socket_read_with_control_messages_ready (socket
, G_IO_IN
, task
))
185 source
= g_socket_create_source (socket
,
186 G_IO_IN
| G_IO_HUP
| G_IO_ERR
,
188 g_task_attach_source (task
, source
, (GSourceFunc
) _g_socket_read_with_control_messages_ready
);
189 g_source_unref (source
);
193 _g_socket_read_with_control_messages_finish (GSocket
*socket
,
194 GAsyncResult
*result
,
197 g_return_val_if_fail (G_IS_SOCKET (socket
), -1);
198 g_return_val_if_fail (g_task_is_valid (result
, socket
), -1);
200 return g_task_propagate_int (G_TASK (result
), error
);
203 /* ---------------------------------------------------------------------------------------------------- */
205 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
207 static GPtrArray
*ensured_classes
= NULL
;
210 ensure_type (GType gtype
)
212 g_ptr_array_add (ensured_classes
, g_type_class_ref (gtype
));
216 release_required_types (void)
218 g_ptr_array_foreach (ensured_classes
, (GFunc
) g_type_class_unref
, NULL
);
219 g_ptr_array_unref (ensured_classes
);
220 ensured_classes
= NULL
;
224 ensure_required_types (void)
226 g_assert (ensured_classes
== NULL
);
227 ensured_classes
= g_ptr_array_new ();
228 ensure_type (G_TYPE_TASK
);
229 ensure_type (G_TYPE_MEMORY_INPUT_STREAM
);
231 /* ---------------------------------------------------------------------------------------------------- */
235 volatile gint refcount
;
237 GMainContext
*context
;
242 gdbus_shared_thread_func (gpointer user_data
)
244 SharedThreadData
*data
= user_data
;
246 g_main_context_push_thread_default (data
->context
);
247 g_main_loop_run (data
->loop
);
248 g_main_context_pop_thread_default (data
->context
);
250 release_required_types ();
255 /* ---------------------------------------------------------------------------------------------------- */
257 static SharedThreadData
*
258 _g_dbus_shared_thread_ref (void)
260 static gsize shared_thread_data
= 0;
261 SharedThreadData
*ret
;
263 if (g_once_init_enter (&shared_thread_data
))
265 SharedThreadData
*data
;
267 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
268 ensure_required_types ();
270 data
= g_new0 (SharedThreadData
, 1);
273 data
->context
= g_main_context_new ();
274 data
->loop
= g_main_loop_new (data
->context
, FALSE
);
275 data
->thread
= g_thread_new ("gdbus",
276 gdbus_shared_thread_func
,
278 /* We can cast between gsize and gpointer safely */
279 g_once_init_leave (&shared_thread_data
, (gsize
) data
);
282 ret
= (SharedThreadData
*) shared_thread_data
;
283 g_atomic_int_inc (&ret
->refcount
);
288 _g_dbus_shared_thread_unref (SharedThreadData
*data
)
290 /* TODO: actually destroy the shared thread here */
292 g_assert (data
!= NULL
);
293 if (g_atomic_int_dec_and_test (&data
->refcount
))
295 g_main_loop_quit (data
->loop
);
296 //g_thread_join (data->thread);
297 g_main_loop_unref (data
->loop
);
298 g_main_context_unref (data
->context
);
303 /* ---------------------------------------------------------------------------------------------------- */
314 volatile gint ref_count
;
316 SharedThreadData
*shared_thread_data
;
318 /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
319 volatile gint stopped
;
321 /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
322 * only affects messages received from the other peer (since GDBusServer is the
323 * only user) - we might want it to affect messages sent to the other peer too?
326 GDBusCapabilityFlags capabilities
;
327 GQueue
*received_messages_while_frozen
;
330 GCancellable
*cancellable
;
331 GDBusWorkerMessageReceivedCallback message_received_callback
;
332 GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback
;
333 GDBusWorkerDisconnectedCallback disconnected_callback
;
336 /* if not NULL, stream is GSocketConnection */
339 /* used for reading */
342 gsize read_buffer_allocated_size
;
343 gsize read_buffer_cur_size
;
344 gsize read_buffer_bytes_wanted
;
345 GUnixFDList
*read_fd_list
;
346 GSocketControlMessage
**read_ancillary_messages
;
347 gint read_num_ancillary_messages
;
349 /* Whether an async write, flush or close, or none of those, is pending.
350 * Only the worker thread may change its value, and only with the write_lock.
351 * Other threads may read its value when holding the write_lock.
352 * The worker thread may read its value at any time.
354 OutputPending output_pending
;
355 /* used for writing */
357 /* queue of MessageToWriteData, protected by write_lock */
359 /* protected by write_lock */
360 guint64 write_num_messages_written
;
361 /* number of messages we'd written out last time we flushed;
362 * protected by write_lock
364 guint64 write_num_messages_flushed
;
365 /* list of FlushData, protected by write_lock */
366 GList
*write_pending_flushes
;
367 /* list of CloseData, protected by write_lock */
368 GList
*pending_close_attempts
;
369 /* no lock - only used from the worker thread */
370 gboolean close_expected
;
373 static void _g_dbus_worker_unref (GDBusWorker
*worker
);
375 /* ---------------------------------------------------------------------------------------------------- */
381 guint64 number_to_wait_for
;
385 struct _MessageToWriteData
;
386 typedef struct _MessageToWriteData MessageToWriteData
;
388 static void message_to_write_data_free (MessageToWriteData
*data
);
390 static void read_message_print_transport_debug (gssize bytes_read
,
391 GDBusWorker
*worker
);
393 static void write_message_print_transport_debug (gssize bytes_written
,
394 MessageToWriteData
*data
);
401 static void close_data_free (CloseData
*close_data
)
403 g_clear_object (&close_data
->task
);
405 _g_dbus_worker_unref (close_data
->worker
);
406 g_slice_free (CloseData
, close_data
);
409 /* ---------------------------------------------------------------------------------------------------- */
412 _g_dbus_worker_ref (GDBusWorker
*worker
)
414 g_atomic_int_inc (&worker
->ref_count
);
419 _g_dbus_worker_unref (GDBusWorker
*worker
)
421 if (g_atomic_int_dec_and_test (&worker
->ref_count
))
423 g_assert (worker
->write_pending_flushes
== NULL
);
425 _g_dbus_shared_thread_unref (worker
->shared_thread_data
);
427 g_object_unref (worker
->stream
);
429 g_mutex_clear (&worker
->read_lock
);
430 g_object_unref (worker
->cancellable
);
431 if (worker
->read_fd_list
!= NULL
)
432 g_object_unref (worker
->read_fd_list
);
434 g_queue_free_full (worker
->received_messages_while_frozen
, (GDestroyNotify
) g_object_unref
);
435 g_mutex_clear (&worker
->write_lock
);
436 g_queue_free_full (worker
->write_queue
, (GDestroyNotify
) message_to_write_data_free
);
437 g_free (worker
->read_buffer
);
444 _g_dbus_worker_emit_disconnected (GDBusWorker
*worker
,
445 gboolean remote_peer_vanished
,
448 if (!g_atomic_int_get (&worker
->stopped
))
449 worker
->disconnected_callback (worker
, remote_peer_vanished
, error
, worker
->user_data
);
453 _g_dbus_worker_emit_message_received (GDBusWorker
*worker
,
454 GDBusMessage
*message
)
456 if (!g_atomic_int_get (&worker
->stopped
))
457 worker
->message_received_callback (worker
, message
, worker
->user_data
);
460 static GDBusMessage
*
461 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker
*worker
,
462 GDBusMessage
*message
)
465 if (!g_atomic_int_get (&worker
->stopped
))
466 ret
= worker
->message_about_to_be_sent_callback (worker
, message
, worker
->user_data
);
472 /* can only be called from private thread with read-lock held - takes ownership of @message */
474 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker
*worker
,
475 GDBusMessage
*message
)
477 if (worker
->frozen
|| g_queue_get_length (worker
->received_messages_while_frozen
) > 0)
480 g_queue_push_tail (worker
->received_messages_while_frozen
, message
);
484 /* not frozen, nor anything in queue */
485 _g_dbus_worker_emit_message_received (worker
, message
);
486 g_object_unref (message
);
490 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
492 unfreeze_in_idle_cb (gpointer user_data
)
494 GDBusWorker
*worker
= user_data
;
495 GDBusMessage
*message
;
497 g_mutex_lock (&worker
->read_lock
);
500 while ((message
= g_queue_pop_head (worker
->received_messages_while_frozen
)) != NULL
)
502 _g_dbus_worker_emit_message_received (worker
, message
);
503 g_object_unref (message
);
505 worker
->frozen
= FALSE
;
509 g_assert (g_queue_get_length (worker
->received_messages_while_frozen
) == 0);
511 g_mutex_unlock (&worker
->read_lock
);
515 /* can be called from any thread */
517 _g_dbus_worker_unfreeze (GDBusWorker
*worker
)
519 GSource
*idle_source
;
520 idle_source
= g_idle_source_new ();
521 g_source_set_priority (idle_source
, G_PRIORITY_DEFAULT
);
522 g_source_set_callback (idle_source
,
524 _g_dbus_worker_ref (worker
),
525 (GDestroyNotify
) _g_dbus_worker_unref
);
526 g_source_set_name (idle_source
, "[gio] unfreeze_in_idle_cb");
527 g_source_attach (idle_source
, worker
->shared_thread_data
->context
);
528 g_source_unref (idle_source
);
531 /* ---------------------------------------------------------------------------------------------------- */
533 static void _g_dbus_worker_do_read_unlocked (GDBusWorker
*worker
);
535 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
537 _g_dbus_worker_do_read_cb (GInputStream
*input_stream
,
541 GDBusWorker
*worker
= user_data
;
545 g_mutex_lock (&worker
->read_lock
);
547 /* If already stopped, don't even process the reply */
548 if (g_atomic_int_get (&worker
->stopped
))
552 if (worker
->socket
== NULL
)
553 bytes_read
= g_input_stream_read_finish (g_io_stream_get_input_stream (worker
->stream
),
557 bytes_read
= _g_socket_read_with_control_messages_finish (worker
->socket
,
560 if (worker
->read_num_ancillary_messages
> 0)
563 for (n
= 0; n
< worker
->read_num_ancillary_messages
; n
++)
565 GSocketControlMessage
*control_message
= G_SOCKET_CONTROL_MESSAGE (worker
->read_ancillary_messages
[n
]);
571 else if (G_IS_UNIX_FD_MESSAGE (control_message
))
573 GUnixFDMessage
*fd_message
;
577 fd_message
= G_UNIX_FD_MESSAGE (control_message
);
578 fds
= g_unix_fd_message_steal_fds (fd_message
, &num_fds
);
579 if (worker
->read_fd_list
== NULL
)
581 worker
->read_fd_list
= g_unix_fd_list_new_from_array (fds
, num_fds
);
586 for (n
= 0; n
< num_fds
; n
++)
588 /* TODO: really want a append_steal() */
589 g_unix_fd_list_append (worker
->read_fd_list
, fds
[n
], NULL
);
590 (void) g_close (fds
[n
], NULL
);
595 else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message
))
607 "Unexpected ancillary message of type %s received from peer",
608 g_type_name (G_TYPE_FROM_INSTANCE (control_message
)));
609 _g_dbus_worker_emit_disconnected (worker
, TRUE
, error
);
610 g_error_free (error
);
611 g_object_unref (control_message
);
613 while (n
< worker
->read_num_ancillary_messages
)
614 g_object_unref (worker
->read_ancillary_messages
[n
++]);
615 g_free (worker
->read_ancillary_messages
);
619 g_object_unref (control_message
);
621 g_free (worker
->read_ancillary_messages
);
624 if (bytes_read
== -1)
626 if (G_UNLIKELY (_g_dbus_debug_transport ()))
628 _g_dbus_debug_print_lock ();
629 g_print ("========================================================================\n"
630 "GDBus-debug:Transport:\n"
631 " ---- READ ERROR on stream of type %s:\n"
633 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker
->stream
))),
634 g_quark_to_string (error
->domain
), error
->code
,
636 _g_dbus_debug_print_unlock ();
639 /* Every async read that uses this callback uses worker->cancellable
640 * as its GCancellable. worker->cancellable gets cancelled if and only
641 * if the GDBusConnection tells us to close (either via
642 * _g_dbus_worker_stop, which is called on last-unref, or directly),
643 * so a cancelled read must mean our connection was closed locally.
645 * If we're closing, other errors are possible - notably,
646 * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
647 * read in-flight. It seems sensible to treat all read errors during
648 * closing as an expected thing that doesn't trip exit-on-close.
650 * Because close_expected can't be set until we get into the worker
651 * thread, but the cancellable is signalled sooner (from another
652 * thread), we do still need to check the error.
654 if (worker
->close_expected
||
655 g_error_matches (error
, G_IO_ERROR
, G_IO_ERROR_CANCELLED
))
656 _g_dbus_worker_emit_disconnected (worker
, FALSE
, NULL
);
658 _g_dbus_worker_emit_disconnected (worker
, TRUE
, error
);
660 g_error_free (error
);
665 g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
667 g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker
->stream
))),
668 g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker
->stream
))),
669 g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker
->stream
)),
670 G_IO_IN
| G_IO_OUT
| G_IO_HUP
),
675 /* TODO: hmm, hmm... */
681 "Underlying GIOStream returned 0 bytes on an async read");
682 _g_dbus_worker_emit_disconnected (worker
, TRUE
, error
);
683 g_error_free (error
);
687 read_message_print_transport_debug (bytes_read
, worker
);
689 worker
->read_buffer_cur_size
+= bytes_read
;
690 if (worker
->read_buffer_bytes_wanted
== worker
->read_buffer_cur_size
)
692 /* OK, got what we asked for! */
693 if (worker
->read_buffer_bytes_wanted
== 16)
696 /* OK, got the header - determine how many more bytes are needed */
698 message_len
= g_dbus_message_bytes_needed ((guchar
*) worker
->read_buffer
,
701 if (message_len
== -1)
703 g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error
->message
);
704 _g_dbus_worker_emit_disconnected (worker
, FALSE
, error
);
705 g_error_free (error
);
709 worker
->read_buffer_bytes_wanted
= message_len
;
710 _g_dbus_worker_do_read_unlocked (worker
);
714 GDBusMessage
*message
;
717 /* TODO: use connection->priv->auth to decode the message */
719 message
= g_dbus_message_new_from_blob ((guchar
*) worker
->read_buffer
,
720 worker
->read_buffer_cur_size
,
721 worker
->capabilities
,
726 s
= _g_dbus_hexdump (worker
->read_buffer
, worker
->read_buffer_cur_size
, 2);
727 g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT
" bytes\n"
729 "The payload is as follows:\n"
731 worker
->read_buffer_cur_size
,
735 _g_dbus_worker_emit_disconnected (worker
, FALSE
, error
);
736 g_error_free (error
);
741 if (worker
->read_fd_list
!= NULL
)
743 g_dbus_message_set_unix_fd_list (message
, worker
->read_fd_list
);
744 g_object_unref (worker
->read_fd_list
);
745 worker
->read_fd_list
= NULL
;
749 if (G_UNLIKELY (_g_dbus_debug_message ()))
752 _g_dbus_debug_print_lock ();
753 g_print ("========================================================================\n"
754 "GDBus-debug:Message:\n"
755 " <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT
" bytes)\n",
756 worker
->read_buffer_cur_size
);
757 s
= g_dbus_message_print (message
, 2);
760 if (G_UNLIKELY (_g_dbus_debug_payload ()))
762 s
= _g_dbus_hexdump (worker
->read_buffer
, worker
->read_buffer_cur_size
, 2);
766 _g_dbus_debug_print_unlock ();
769 /* yay, got a message, go deliver it */
770 _g_dbus_worker_queue_or_deliver_received_message (worker
, message
);
772 /* start reading another message! */
773 worker
->read_buffer_bytes_wanted
= 0;
774 worker
->read_buffer_cur_size
= 0;
775 _g_dbus_worker_do_read_unlocked (worker
);
780 /* didn't get all the bytes we requested - so repeat the request... */
781 _g_dbus_worker_do_read_unlocked (worker
);
785 g_mutex_unlock (&worker
->read_lock
);
787 /* gives up the reference acquired when calling g_input_stream_read_async() */
788 _g_dbus_worker_unref (worker
);
790 /* check if there is any pending close */
791 schedule_pending_close (worker
);
794 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
796 _g_dbus_worker_do_read_unlocked (GDBusWorker
*worker
)
798 /* Note that we do need to keep trying to read even if close_expected is
799 * true, because only failing a read causes us to signal 'closed'.
802 /* if bytes_wanted is zero, it means start reading a message */
803 if (worker
->read_buffer_bytes_wanted
== 0)
805 worker
->read_buffer_cur_size
= 0;
806 worker
->read_buffer_bytes_wanted
= 16;
809 /* ensure we have a (big enough) buffer */
810 if (worker
->read_buffer
== NULL
|| worker
->read_buffer_bytes_wanted
> worker
->read_buffer_allocated_size
)
812 /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
813 worker
->read_buffer_allocated_size
= MAX (worker
->read_buffer_bytes_wanted
, 4096);
814 worker
->read_buffer
= g_realloc (worker
->read_buffer
, worker
->read_buffer_allocated_size
);
817 if (worker
->socket
== NULL
)
818 g_input_stream_read_async (g_io_stream_get_input_stream (worker
->stream
),
819 worker
->read_buffer
+ worker
->read_buffer_cur_size
,
820 worker
->read_buffer_bytes_wanted
- worker
->read_buffer_cur_size
,
823 (GAsyncReadyCallback
) _g_dbus_worker_do_read_cb
,
824 _g_dbus_worker_ref (worker
));
827 worker
->read_ancillary_messages
= NULL
;
828 worker
->read_num_ancillary_messages
= 0;
829 _g_socket_read_with_control_messages (worker
->socket
,
830 worker
->read_buffer
+ worker
->read_buffer_cur_size
,
831 worker
->read_buffer_bytes_wanted
- worker
->read_buffer_cur_size
,
832 &worker
->read_ancillary_messages
,
833 &worker
->read_num_ancillary_messages
,
836 (GAsyncReadyCallback
) _g_dbus_worker_do_read_cb
,
837 _g_dbus_worker_ref (worker
));
841 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
843 _g_dbus_worker_do_initial_read (gpointer data
)
845 GDBusWorker
*worker
= data
;
846 g_mutex_lock (&worker
->read_lock
);
847 _g_dbus_worker_do_read_unlocked (worker
);
848 g_mutex_unlock (&worker
->read_lock
);
852 /* ---------------------------------------------------------------------------------------------------- */
854 struct _MessageToWriteData
857 GDBusMessage
*message
;
866 message_to_write_data_free (MessageToWriteData
*data
)
868 _g_dbus_worker_unref (data
->worker
);
870 g_object_unref (data
->message
);
872 g_slice_free (MessageToWriteData
, data
);
875 /* ---------------------------------------------------------------------------------------------------- */
877 static void write_message_continue_writing (MessageToWriteData
*data
);
879 /* called in private thread shared by all GDBusConnection instances
881 * write-lock is not held on entry
882 * output_pending is PENDING_WRITE on entry
885 write_message_async_cb (GObject
*source_object
,
889 MessageToWriteData
*data
= user_data
;
891 gssize bytes_written
;
894 /* Note: we can't access data->task after calling g_task_return_* () because the
895 * callback can free @data and we're not completing in idle. So use a copy of the pointer.
900 bytes_written
= g_output_stream_write_finish (G_OUTPUT_STREAM (source_object
),
903 if (bytes_written
== -1)
905 g_task_return_error (task
, error
);
906 g_object_unref (task
);
909 g_assert (bytes_written
> 0); /* zero is never returned */
911 write_message_print_transport_debug (bytes_written
, data
);
913 data
->total_written
+= bytes_written
;
914 g_assert (data
->total_written
<= data
->blob_size
);
915 if (data
->total_written
== data
->blob_size
)
917 g_task_return_boolean (task
, TRUE
);
918 g_object_unref (task
);
922 write_message_continue_writing (data
);
928 /* called in private thread shared by all GDBusConnection instances
930 * write-lock is not held on entry
931 * output_pending is PENDING_WRITE on entry
935 on_socket_ready (GSocket
*socket
,
936 GIOCondition condition
,
939 MessageToWriteData
*data
= user_data
;
940 write_message_continue_writing (data
);
941 return FALSE
; /* remove source */
945 /* called in private thread shared by all GDBusConnection instances
947 * write-lock is not held on entry
948 * output_pending is PENDING_WRITE on entry
951 write_message_continue_writing (MessageToWriteData
*data
)
953 GOutputStream
*ostream
;
956 GUnixFDList
*fd_list
;
960 /* Note: we can't access data->task after calling g_task_return_* () because the
961 * callback can free @data and we're not completing in idle. So use a copy of the pointer.
966 ostream
= g_io_stream_get_output_stream (data
->worker
->stream
);
968 fd_list
= g_dbus_message_get_unix_fd_list (data
->message
);
971 g_assert (!g_output_stream_has_pending (ostream
));
972 g_assert_cmpint (data
->total_written
, <, data
->blob_size
);
978 else if (G_IS_SOCKET_OUTPUT_STREAM (ostream
) && data
->total_written
== 0)
980 GOutputVector vector
;
981 GSocketControlMessage
*control_message
;
982 gssize bytes_written
;
985 vector
.buffer
= data
->blob
;
986 vector
.size
= data
->blob_size
;
988 control_message
= NULL
;
989 if (fd_list
!= NULL
&& g_unix_fd_list_get_length (fd_list
) > 0)
991 if (!(data
->worker
->capabilities
& G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING
))
993 g_task_return_new_error (task
,
996 "Tried sending a file descriptor but remote peer does not support this capability");
997 g_object_unref (task
);
1000 control_message
= g_unix_fd_message_new_with_fd_list (fd_list
);
1004 bytes_written
= g_socket_send_message (data
->worker
->socket
,
1008 control_message
!= NULL
? &control_message
: NULL
,
1009 control_message
!= NULL
? 1 : 0,
1011 data
->worker
->cancellable
,
1013 if (control_message
!= NULL
)
1014 g_object_unref (control_message
);
1016 if (bytes_written
== -1)
1018 /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1019 if (g_error_matches (error
, G_IO_ERROR
, G_IO_ERROR_WOULD_BLOCK
))
1022 source
= g_socket_create_source (data
->worker
->socket
,
1023 G_IO_OUT
| G_IO_HUP
| G_IO_ERR
,
1024 data
->worker
->cancellable
);
1025 g_source_set_callback (source
,
1026 (GSourceFunc
) on_socket_ready
,
1028 NULL
); /* GDestroyNotify */
1029 g_source_attach (source
, g_main_context_get_thread_default ());
1030 g_source_unref (source
);
1031 g_error_free (error
);
1034 g_task_return_error (task
, error
);
1035 g_object_unref (task
);
1038 g_assert (bytes_written
> 0); /* zero is never returned */
1040 write_message_print_transport_debug (bytes_written
, data
);
1042 data
->total_written
+= bytes_written
;
1043 g_assert (data
->total_written
<= data
->blob_size
);
1044 if (data
->total_written
== data
->blob_size
)
1046 g_task_return_boolean (task
, TRUE
);
1047 g_object_unref (task
);
1051 write_message_continue_writing (data
);
1057 if (fd_list
!= NULL
)
1059 g_task_return_new_error (task
,
1062 "Tried sending a file descriptor on unsupported stream of type %s",
1063 g_type_name (G_TYPE_FROM_INSTANCE (ostream
)));
1064 g_object_unref (task
);
1069 g_output_stream_write_async (ostream
,
1070 (const gchar
*) data
->blob
+ data
->total_written
,
1071 data
->blob_size
- data
->total_written
,
1073 data
->worker
->cancellable
,
1074 write_message_async_cb
,
1083 /* called in private thread shared by all GDBusConnection instances
1085 * write-lock is not held on entry
1086 * output_pending is PENDING_WRITE on entry
1089 write_message_async (GDBusWorker
*worker
,
1090 MessageToWriteData
*data
,
1091 GAsyncReadyCallback callback
,
1094 data
->task
= g_task_new (NULL
, NULL
, callback
, user_data
);
1095 g_task_set_source_tag (data
->task
, write_message_async
);
1096 data
->total_written
= 0;
1097 write_message_continue_writing (data
);
1100 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1102 write_message_finish (GAsyncResult
*res
,
1105 g_return_val_if_fail (g_task_is_valid (res
, NULL
), FALSE
);
1107 return g_task_propagate_boolean (G_TASK (res
), error
);
1109 /* ---------------------------------------------------------------------------------------------------- */
1111 static void continue_writing (GDBusWorker
*worker
);
1115 GDBusWorker
*worker
;
1120 flush_data_list_complete (const GList
*flushers
,
1121 const GError
*error
)
1125 for (l
= flushers
; l
!= NULL
; l
= l
->next
)
1127 FlushData
*f
= l
->data
;
1129 f
->error
= error
!= NULL
? g_error_copy (error
) : NULL
;
1131 g_mutex_lock (&f
->mutex
);
1132 g_cond_signal (&f
->cond
);
1133 g_mutex_unlock (&f
->mutex
);
1137 /* called in private thread shared by all GDBusConnection instances
1139 * write-lock is not held on entry
1140 * output_pending is PENDING_FLUSH on entry
1143 ostream_flush_cb (GObject
*source_object
,
1147 FlushAsyncData
*data
= user_data
;
1151 g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object
),
1157 if (G_UNLIKELY (_g_dbus_debug_transport ()))
1159 _g_dbus_debug_print_lock ();
1160 g_print ("========================================================================\n"
1161 "GDBus-debug:Transport:\n"
1162 " ---- FLUSHED stream of type %s\n",
1163 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data
->worker
->stream
))));
1164 _g_dbus_debug_print_unlock ();
1168 g_assert (data
->flushers
!= NULL
);
1169 flush_data_list_complete (data
->flushers
, error
);
1170 g_list_free (data
->flushers
);
1173 g_error_free (error
);
1175 /* Make sure we tell folks that we don't have additional
1177 g_mutex_lock (&data
->worker
->write_lock
);
1178 data
->worker
->write_num_messages_flushed
= data
->worker
->write_num_messages_written
;
1179 g_assert (data
->worker
->output_pending
== PENDING_FLUSH
);
1180 data
->worker
->output_pending
= PENDING_NONE
;
1181 g_mutex_unlock (&data
->worker
->write_lock
);
1183 /* OK, cool, finally kick off the next write */
1184 continue_writing (data
->worker
);
1186 _g_dbus_worker_unref (data
->worker
);
1190 /* called in private thread shared by all GDBusConnection instances
1192 * write-lock is not held on entry
1193 * output_pending is PENDING_FLUSH on entry
1196 start_flush (FlushAsyncData
*data
)
1198 g_output_stream_flush_async (g_io_stream_get_output_stream (data
->worker
->stream
),
1200 data
->worker
->cancellable
,
1205 /* called in private thread shared by all GDBusConnection instances
1207 * write-lock is held on entry
1208 * output_pending is PENDING_NONE on entry
1211 message_written_unlocked (GDBusWorker
*worker
,
1212 MessageToWriteData
*message_data
)
1214 if (G_UNLIKELY (_g_dbus_debug_message ()))
1217 _g_dbus_debug_print_lock ();
1218 g_print ("========================================================================\n"
1219 "GDBus-debug:Message:\n"
1220 " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT
" bytes)\n",
1221 message_data
->blob_size
);
1222 s
= g_dbus_message_print (message_data
->message
, 2);
1225 if (G_UNLIKELY (_g_dbus_debug_payload ()))
1227 s
= _g_dbus_hexdump (message_data
->blob
, message_data
->blob_size
, 2);
1228 g_print ("%s\n", s
);
1231 _g_dbus_debug_print_unlock ();
1234 worker
->write_num_messages_written
+= 1;
1237 /* called in private thread shared by all GDBusConnection instances
1239 * write-lock is held on entry
1240 * output_pending is PENDING_NONE on entry
1242 * Returns: non-%NULL, setting @output_pending, if we need to flush now
1244 static FlushAsyncData
*
1245 prepare_flush_unlocked (GDBusWorker
*worker
)
1252 for (l
= worker
->write_pending_flushes
; l
!= NULL
; l
= ll
)
1254 FlushData
*f
= l
->data
;
1257 if (f
->number_to_wait_for
== worker
->write_num_messages_written
)
1259 flushers
= g_list_append (flushers
, f
);
1260 worker
->write_pending_flushes
= g_list_delete_link (worker
->write_pending_flushes
, l
);
1263 if (flushers
!= NULL
)
1265 g_assert (worker
->output_pending
== PENDING_NONE
);
1266 worker
->output_pending
= PENDING_FLUSH
;
1269 if (flushers
!= NULL
)
1271 FlushAsyncData
*data
;
1273 data
= g_new0 (FlushAsyncData
, 1);
1274 data
->worker
= _g_dbus_worker_ref (worker
);
1275 data
->flushers
= flushers
;
1282 /* called in private thread shared by all GDBusConnection instances
1284 * write-lock is not held on entry
1285 * output_pending is PENDING_WRITE on entry
1288 write_message_cb (GObject
*source_object
,
1292 MessageToWriteData
*data
= user_data
;
1295 g_mutex_lock (&data
->worker
->write_lock
);
1296 g_assert (data
->worker
->output_pending
== PENDING_WRITE
);
1297 data
->worker
->output_pending
= PENDING_NONE
;
1300 if (!write_message_finish (res
, &error
))
1302 g_mutex_unlock (&data
->worker
->write_lock
);
1305 _g_dbus_worker_emit_disconnected (data
->worker
, TRUE
, error
);
1306 g_error_free (error
);
1308 g_mutex_lock (&data
->worker
->write_lock
);
1311 message_written_unlocked (data
->worker
, data
);
1313 g_mutex_unlock (&data
->worker
->write_lock
);
1315 continue_writing (data
->worker
);
1317 message_to_write_data_free (data
);
1320 /* called in private thread shared by all GDBusConnection instances
1322 * write-lock is not held on entry
1323 * output_pending is PENDING_CLOSE on entry
1326 iostream_close_cb (GObject
*source_object
,
1330 GDBusWorker
*worker
= user_data
;
1331 GError
*error
= NULL
;
1332 GList
*pending_close_attempts
, *pending_flush_attempts
;
1335 g_io_stream_close_finish (worker
->stream
, res
, &error
);
1337 g_mutex_lock (&worker
->write_lock
);
1339 pending_close_attempts
= worker
->pending_close_attempts
;
1340 worker
->pending_close_attempts
= NULL
;
1342 pending_flush_attempts
= worker
->write_pending_flushes
;
1343 worker
->write_pending_flushes
= NULL
;
1345 send_queue
= worker
->write_queue
;
1346 worker
->write_queue
= g_queue_new ();
1348 g_assert (worker
->output_pending
== PENDING_CLOSE
);
1349 worker
->output_pending
= PENDING_NONE
;
1351 g_mutex_unlock (&worker
->write_lock
);
1353 while (pending_close_attempts
!= NULL
)
1355 CloseData
*close_data
= pending_close_attempts
->data
;
1357 pending_close_attempts
= g_list_delete_link (pending_close_attempts
,
1358 pending_close_attempts
);
1360 if (close_data
->task
!= NULL
)
1363 g_task_return_error (close_data
->task
, g_error_copy (error
));
1365 g_task_return_boolean (close_data
->task
, TRUE
);
1368 close_data_free (close_data
);
1371 g_clear_error (&error
);
1373 /* all messages queued for sending are discarded */
1374 g_queue_free_full (send_queue
, (GDestroyNotify
) message_to_write_data_free
);
1375 /* all queued flushes fail */
1376 error
= g_error_new (G_IO_ERROR
, G_IO_ERROR_CANCELLED
,
1377 _("Operation was cancelled"));
1378 flush_data_list_complete (pending_flush_attempts
, error
);
1379 g_list_free (pending_flush_attempts
);
1380 g_clear_error (&error
);
1382 _g_dbus_worker_unref (worker
);
1385 /* called in private thread shared by all GDBusConnection instances
1387 * write-lock is not held on entry
1388 * output_pending must be PENDING_NONE on entry
1391 continue_writing (GDBusWorker
*worker
)
1393 MessageToWriteData
*data
;
1394 FlushAsyncData
*flush_async_data
;
1397 /* we mustn't try to write two things at once */
1398 g_assert (worker
->output_pending
== PENDING_NONE
);
1400 g_mutex_lock (&worker
->write_lock
);
1403 flush_async_data
= NULL
;
1405 /* if we want to close the connection, that takes precedence */
1406 if (worker
->pending_close_attempts
!= NULL
)
1408 GInputStream
*input
= g_io_stream_get_input_stream (worker
->stream
);
1410 if (!g_input_stream_has_pending (input
))
1412 worker
->close_expected
= TRUE
;
1413 worker
->output_pending
= PENDING_CLOSE
;
1415 g_io_stream_close_async (worker
->stream
, G_PRIORITY_DEFAULT
,
1416 NULL
, iostream_close_cb
,
1417 _g_dbus_worker_ref (worker
));
1422 flush_async_data
= prepare_flush_unlocked (worker
);
1424 if (flush_async_data
== NULL
)
1426 data
= g_queue_pop_head (worker
->write_queue
);
1429 worker
->output_pending
= PENDING_WRITE
;
1433 g_mutex_unlock (&worker
->write_lock
);
1435 /* Note that write_lock is only used for protecting the @write_queue
1436 * and @output_pending fields of the GDBusWorker struct ... which we
1437 * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1439 * Therefore, it's fine to drop it here when calling back into user
1440 * code and then writing the message out onto the GIOStream since this
1441 * function only runs on the worker thread.
1444 if (flush_async_data
!= NULL
)
1446 start_flush (flush_async_data
);
1447 g_assert (data
== NULL
);
1449 else if (data
!= NULL
)
1451 GDBusMessage
*old_message
;
1453 gsize new_blob_size
;
1456 old_message
= data
->message
;
1457 data
->message
= _g_dbus_worker_emit_message_about_to_be_sent (worker
, data
->message
);
1458 if (data
->message
== old_message
)
1460 /* filters had no effect - do nothing */
1462 else if (data
->message
== NULL
)
1464 /* filters dropped message */
1465 g_mutex_lock (&worker
->write_lock
);
1466 worker
->output_pending
= PENDING_NONE
;
1467 g_mutex_unlock (&worker
->write_lock
);
1468 message_to_write_data_free (data
);
1473 /* filters altered the message -> reencode */
1475 new_blob
= g_dbus_message_to_blob (data
->message
,
1477 worker
->capabilities
,
1479 if (new_blob
== NULL
)
1481 /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1482 * the old message instead
1484 g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1485 g_dbus_message_get_serial (data
->message
),
1487 g_error_free (error
);
1491 g_free (data
->blob
);
1492 data
->blob
= (gchar
*) new_blob
;
1493 data
->blob_size
= new_blob_size
;
1497 write_message_async (worker
,
1504 /* called in private thread shared by all GDBusConnection instances
1506 * write-lock is not held on entry
1507 * output_pending may be anything
1510 continue_writing_in_idle_cb (gpointer user_data
)
1512 GDBusWorker
*worker
= user_data
;
1514 /* Because this is the worker thread, we can read this struct member
1515 * without holding the lock: no other thread ever modifies it.
1517 if (worker
->output_pending
== PENDING_NONE
)
1518 continue_writing (worker
);
1524 * @write_data: (transfer full) (nullable):
1525 * @flush_data: (transfer full) (nullable):
1526 * @close_data: (transfer full) (nullable):
1528 * Can be called from any thread
1530 * write_lock is held on entry
1531 * output_pending may be anything
1534 schedule_writing_unlocked (GDBusWorker
*worker
,
1535 MessageToWriteData
*write_data
,
1536 FlushData
*flush_data
,
1537 CloseData
*close_data
)
1539 if (write_data
!= NULL
)
1540 g_queue_push_tail (worker
->write_queue
, write_data
);
1542 if (flush_data
!= NULL
)
1543 worker
->write_pending_flushes
= g_list_prepend (worker
->write_pending_flushes
, flush_data
);
1545 if (close_data
!= NULL
)
1546 worker
->pending_close_attempts
= g_list_prepend (worker
->pending_close_attempts
,
1549 /* If we had output pending, the next bit of output will happen
1550 * automatically when it finishes, so we only need to do this
1551 * if nothing was pending.
1553 * The idle callback will re-check that output_pending is still
1554 * PENDING_NONE, to guard against output starting before the idle.
1556 if (worker
->output_pending
== PENDING_NONE
)
1558 GSource
*idle_source
;
1559 idle_source
= g_idle_source_new ();
1560 g_source_set_priority (idle_source
, G_PRIORITY_DEFAULT
);
1561 g_source_set_callback (idle_source
,
1562 continue_writing_in_idle_cb
,
1563 _g_dbus_worker_ref (worker
),
1564 (GDestroyNotify
) _g_dbus_worker_unref
);
1565 g_source_set_name (idle_source
, "[gio] continue_writing_in_idle_cb");
1566 g_source_attach (idle_source
, worker
->shared_thread_data
->context
);
1567 g_source_unref (idle_source
);
1572 schedule_pending_close (GDBusWorker
*worker
)
1574 g_mutex_lock (&worker
->write_lock
);
1575 if (worker
->pending_close_attempts
)
1576 schedule_writing_unlocked (worker
, NULL
, NULL
, NULL
);
1577 g_mutex_unlock (&worker
->write_lock
);
1580 /* ---------------------------------------------------------------------------------------------------- */
1582 /* can be called from any thread - steals blob
1584 * write_lock is not held on entry
1585 * output_pending may be anything
1588 _g_dbus_worker_send_message (GDBusWorker
*worker
,
1589 GDBusMessage
*message
,
1593 MessageToWriteData
*data
;
1595 g_return_if_fail (G_IS_DBUS_MESSAGE (message
));
1596 g_return_if_fail (blob
!= NULL
);
1597 g_return_if_fail (blob_len
> 16);
1599 data
= g_slice_new0 (MessageToWriteData
);
1600 data
->worker
= _g_dbus_worker_ref (worker
);
1601 data
->message
= g_object_ref (message
);
1602 data
->blob
= blob
; /* steal! */
1603 data
->blob_size
= blob_len
;
1605 g_mutex_lock (&worker
->write_lock
);
1606 schedule_writing_unlocked (worker
, data
, NULL
, NULL
);
1607 g_mutex_unlock (&worker
->write_lock
);
1610 /* ---------------------------------------------------------------------------------------------------- */
1613 _g_dbus_worker_new (GIOStream
*stream
,
1614 GDBusCapabilityFlags capabilities
,
1615 gboolean initially_frozen
,
1616 GDBusWorkerMessageReceivedCallback message_received_callback
,
1617 GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback
,
1618 GDBusWorkerDisconnectedCallback disconnected_callback
,
1621 GDBusWorker
*worker
;
1622 GSource
*idle_source
;
1624 g_return_val_if_fail (G_IS_IO_STREAM (stream
), NULL
);
1625 g_return_val_if_fail (message_received_callback
!= NULL
, NULL
);
1626 g_return_val_if_fail (message_about_to_be_sent_callback
!= NULL
, NULL
);
1627 g_return_val_if_fail (disconnected_callback
!= NULL
, NULL
);
1629 worker
= g_new0 (GDBusWorker
, 1);
1630 worker
->ref_count
= 1;
1632 g_mutex_init (&worker
->read_lock
);
1633 worker
->message_received_callback
= message_received_callback
;
1634 worker
->message_about_to_be_sent_callback
= message_about_to_be_sent_callback
;
1635 worker
->disconnected_callback
= disconnected_callback
;
1636 worker
->user_data
= user_data
;
1637 worker
->stream
= g_object_ref (stream
);
1638 worker
->capabilities
= capabilities
;
1639 worker
->cancellable
= g_cancellable_new ();
1640 worker
->output_pending
= PENDING_NONE
;
1642 worker
->frozen
= initially_frozen
;
1643 worker
->received_messages_while_frozen
= g_queue_new ();
1645 g_mutex_init (&worker
->write_lock
);
1646 worker
->write_queue
= g_queue_new ();
1648 if (G_IS_SOCKET_CONNECTION (worker
->stream
))
1649 worker
->socket
= g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker
->stream
));
1651 worker
->shared_thread_data
= _g_dbus_shared_thread_ref ();
1654 idle_source
= g_idle_source_new ();
1655 g_source_set_priority (idle_source
, G_PRIORITY_DEFAULT
);
1656 g_source_set_callback (idle_source
,
1657 _g_dbus_worker_do_initial_read
,
1658 _g_dbus_worker_ref (worker
),
1659 (GDestroyNotify
) _g_dbus_worker_unref
);
1660 g_source_set_name (idle_source
, "[gio] _g_dbus_worker_do_initial_read");
1661 g_source_attach (idle_source
, worker
->shared_thread_data
->context
);
1662 g_source_unref (idle_source
);
1667 /* ---------------------------------------------------------------------------------------------------- */
1669 /* can be called from any thread
1671 * write_lock is not held on entry
1672 * output_pending may be anything
1675 _g_dbus_worker_close (GDBusWorker
*worker
,
1678 CloseData
*close_data
;
1680 close_data
= g_slice_new0 (CloseData
);
1681 close_data
->worker
= _g_dbus_worker_ref (worker
);
1682 close_data
->task
= (task
== NULL
? NULL
: g_object_ref (task
));
1684 /* Don't set worker->close_expected here - we're in the wrong thread.
1685 * It'll be set before the actual close happens.
1687 g_cancellable_cancel (worker
->cancellable
);
1688 g_mutex_lock (&worker
->write_lock
);
1689 schedule_writing_unlocked (worker
, NULL
, NULL
, close_data
);
1690 g_mutex_unlock (&worker
->write_lock
);
1693 /* This can be called from any thread - frees worker. Note that
1694 * callbacks might still happen if called from another thread than the
1695 * worker - use your own synchronization primitive in the callbacks.
1697 * write_lock is not held on entry
1698 * output_pending may be anything
1701 _g_dbus_worker_stop (GDBusWorker
*worker
)
1703 g_atomic_int_set (&worker
->stopped
, TRUE
);
1705 /* Cancel any pending operations and schedule a close of the underlying I/O
1706 * stream in the worker thread
1708 _g_dbus_worker_close (worker
, NULL
);
1710 /* _g_dbus_worker_close holds a ref until after an idle in the worker
1711 * thread has run, so we no longer need to unref in an idle like in
1714 _g_dbus_worker_unref (worker
);
1717 /* ---------------------------------------------------------------------------------------------------- */
1719 /* can be called from any thread (except the worker thread) - blocks
1720 * calling thread until all queued outgoing messages are written and
1721 * the transport has been flushed
1723 * write_lock is not held on entry
1724 * output_pending may be anything
1727 _g_dbus_worker_flush_sync (GDBusWorker
*worker
,
1728 GCancellable
*cancellable
,
1733 guint64 pending_writes
;
1738 g_mutex_lock (&worker
->write_lock
);
1740 /* if the queue is empty, no write is in-flight and we haven't written
1741 * anything since the last flush, then there's nothing to wait for
1743 pending_writes
= g_queue_get_length (worker
->write_queue
);
1745 /* if a write is in-flight, we shouldn't be satisfied until the first
1746 * flush operation that follows it
1748 if (worker
->output_pending
== PENDING_WRITE
)
1749 pending_writes
+= 1;
1751 if (pending_writes
> 0 ||
1752 worker
->write_num_messages_written
!= worker
->write_num_messages_flushed
)
1754 data
= g_new0 (FlushData
, 1);
1755 g_mutex_init (&data
->mutex
);
1756 g_cond_init (&data
->cond
);
1757 data
->number_to_wait_for
= worker
->write_num_messages_written
+ pending_writes
;
1758 g_mutex_lock (&data
->mutex
);
1760 schedule_writing_unlocked (worker
, NULL
, data
, NULL
);
1762 g_mutex_unlock (&worker
->write_lock
);
1766 g_cond_wait (&data
->cond
, &data
->mutex
);
1767 g_mutex_unlock (&data
->mutex
);
1769 /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1770 g_cond_clear (&data
->cond
);
1771 g_mutex_clear (&data
->mutex
);
1772 if (data
->error
!= NULL
)
1775 g_propagate_error (error
, data
->error
);
1783 /* ---------------------------------------------------------------------------------------------------- */
1785 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1786 #define G_DBUS_DEBUG_TRANSPORT (1<<1)
1787 #define G_DBUS_DEBUG_MESSAGE (1<<2)
1788 #define G_DBUS_DEBUG_PAYLOAD (1<<3)
1789 #define G_DBUS_DEBUG_CALL (1<<4)
1790 #define G_DBUS_DEBUG_SIGNAL (1<<5)
1791 #define G_DBUS_DEBUG_INCOMING (1<<6)
1792 #define G_DBUS_DEBUG_RETURN (1<<7)
1793 #define G_DBUS_DEBUG_EMISSION (1<<8)
1794 #define G_DBUS_DEBUG_ADDRESS (1<<9)
1796 static gint _gdbus_debug_flags
= 0;
1799 _g_dbus_debug_authentication (void)
1801 _g_dbus_initialize ();
1802 return (_gdbus_debug_flags
& G_DBUS_DEBUG_AUTHENTICATION
) != 0;
1806 _g_dbus_debug_transport (void)
1808 _g_dbus_initialize ();
1809 return (_gdbus_debug_flags
& G_DBUS_DEBUG_TRANSPORT
) != 0;
1813 _g_dbus_debug_message (void)
1815 _g_dbus_initialize ();
1816 return (_gdbus_debug_flags
& G_DBUS_DEBUG_MESSAGE
) != 0;
1820 _g_dbus_debug_payload (void)
1822 _g_dbus_initialize ();
1823 return (_gdbus_debug_flags
& G_DBUS_DEBUG_PAYLOAD
) != 0;
1827 _g_dbus_debug_call (void)
1829 _g_dbus_initialize ();
1830 return (_gdbus_debug_flags
& G_DBUS_DEBUG_CALL
) != 0;
1834 _g_dbus_debug_signal (void)
1836 _g_dbus_initialize ();
1837 return (_gdbus_debug_flags
& G_DBUS_DEBUG_SIGNAL
) != 0;
1841 _g_dbus_debug_incoming (void)
1843 _g_dbus_initialize ();
1844 return (_gdbus_debug_flags
& G_DBUS_DEBUG_INCOMING
) != 0;
1848 _g_dbus_debug_return (void)
1850 _g_dbus_initialize ();
1851 return (_gdbus_debug_flags
& G_DBUS_DEBUG_RETURN
) != 0;
1855 _g_dbus_debug_emission (void)
1857 _g_dbus_initialize ();
1858 return (_gdbus_debug_flags
& G_DBUS_DEBUG_EMISSION
) != 0;
1862 _g_dbus_debug_address (void)
1864 _g_dbus_initialize ();
1865 return (_gdbus_debug_flags
& G_DBUS_DEBUG_ADDRESS
) != 0;
1868 G_LOCK_DEFINE_STATIC (print_lock
);
1871 _g_dbus_debug_print_lock (void)
1873 G_LOCK (print_lock
);
1877 _g_dbus_debug_print_unlock (void)
1879 G_UNLOCK (print_lock
);
1883 * _g_dbus_initialize:
1885 * Does various one-time init things such as
1887 * - registering the G_DBUS_ERROR error domain
1888 * - parses the G_DBUS_DEBUG environment variable
1891 _g_dbus_initialize (void)
1893 static volatile gsize initialized
= 0;
1895 if (g_once_init_enter (&initialized
))
1897 volatile GQuark g_dbus_error_domain
;
1900 g_dbus_error_domain
= G_DBUS_ERROR
;
1901 (g_dbus_error_domain
); /* To avoid -Wunused-but-set-variable */
1903 debug
= g_getenv ("G_DBUS_DEBUG");
1906 const GDebugKey keys
[] = {
1907 { "authentication", G_DBUS_DEBUG_AUTHENTICATION
},
1908 { "transport", G_DBUS_DEBUG_TRANSPORT
},
1909 { "message", G_DBUS_DEBUG_MESSAGE
},
1910 { "payload", G_DBUS_DEBUG_PAYLOAD
},
1911 { "call", G_DBUS_DEBUG_CALL
},
1912 { "signal", G_DBUS_DEBUG_SIGNAL
},
1913 { "incoming", G_DBUS_DEBUG_INCOMING
},
1914 { "return", G_DBUS_DEBUG_RETURN
},
1915 { "emission", G_DBUS_DEBUG_EMISSION
},
1916 { "address", G_DBUS_DEBUG_ADDRESS
}
1919 _gdbus_debug_flags
= g_parse_debug_string (debug
, keys
, G_N_ELEMENTS (keys
));
1920 if (_gdbus_debug_flags
& G_DBUS_DEBUG_PAYLOAD
)
1921 _gdbus_debug_flags
|= G_DBUS_DEBUG_MESSAGE
;
1924 g_once_init_leave (&initialized
, 1);
1928 /* ---------------------------------------------------------------------------------------------------- */
1931 _g_dbus_compute_complete_signature (GDBusArgInfo
**args
)
1933 const GVariantType
*arg_types
[256];
1937 for (n
= 0; args
[n
] != NULL
; n
++)
1939 /* DBus places a hard limit of 255 on signature length.
1940 * therefore number of args must be less than 256.
1944 arg_types
[n
] = G_VARIANT_TYPE (args
[n
]->signature
);
1946 if G_UNLIKELY (arg_types
[n
] == NULL
)
1952 return g_variant_type_new_tuple (arg_types
, n
);
1955 /* ---------------------------------------------------------------------------------------------------- */
1959 extern BOOL WINAPI
ConvertSidToStringSidA (PSID Sid
, LPSTR
*StringSid
);
1962 _g_dbus_win32_get_user_sid (void)
1966 DWORD token_information_len
;
1973 h
= INVALID_HANDLE_VALUE
;
1975 if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY
, &h
))
1977 g_warning ("OpenProcessToken failed with error code %d", (gint
) GetLastError ());
1981 /* Get length of buffer */
1982 token_information_len
= 0;
1983 if (!GetTokenInformation (h
, TokenUser
, NULL
, 0, &token_information_len
))
1985 if (GetLastError () != ERROR_INSUFFICIENT_BUFFER
)
1987 g_warning ("GetTokenInformation() failed with error code %d", (gint
) GetLastError ());
1991 user
= g_malloc (token_information_len
);
1992 if (!GetTokenInformation (h
, TokenUser
, user
, token_information_len
, &token_information_len
))
1994 g_warning ("GetTokenInformation() failed with error code %d", (gint
) GetLastError ());
1998 psid
= user
->User
.Sid
;
1999 if (!IsValidSid (psid
))
2001 g_warning ("Invalid SID");
2005 if (!ConvertSidToStringSidA (psid
, &sid
))
2007 g_warning ("Invalid SID");
2011 ret
= g_strdup (sid
);
2016 if (h
!= INVALID_HANDLE_VALUE
)
2022 /* ---------------------------------------------------------------------------------------------------- */
2025 _g_dbus_get_machine_id (GError
**error
)
2028 HW_PROFILE_INFOA info
;
2029 char *src
, *dest
, *res
;
2032 if (!GetCurrentHwProfileA (&info
))
2034 char *message
= g_win32_error_message (GetLastError ());
2038 _("Unable to get Hardware profile: %s"), message
);
2043 /* Form: {12340001-4980-1920-6788-123456789012} */
2044 src
= &info
.szHwProfileGuid
[0];
2046 res
= g_malloc (32+1);
2050 for (i
= 0; i
< 8; i
++)
2053 for (i
= 0; i
< 4; i
++)
2056 for (i
= 0; i
< 4; i
++)
2059 for (i
= 0; i
< 4; i
++)
2062 for (i
= 0; i
< 12; i
++)
2069 GError
*first_error
;
2070 /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2073 if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2077 !g_file_get_contents ("/etc/machine-id",
2082 g_propagate_prefixed_error (error
, first_error
,
2083 _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2087 /* ignore the error from the first try, if any */
2088 g_clear_error (&first_error
);
2089 /* TODO: validate value */
2096 /* ---------------------------------------------------------------------------------------------------- */
2099 _g_dbus_enum_to_string (GType enum_type
, gint value
)
2103 GEnumValue
*enum_value
;
2105 klass
= g_type_class_ref (enum_type
);
2106 enum_value
= g_enum_get_value (klass
, value
);
2107 if (enum_value
!= NULL
)
2108 ret
= g_strdup (enum_value
->value_nick
);
2110 ret
= g_strdup_printf ("unknown (value %d)", value
);
2111 g_type_class_unref (klass
);
2115 /* ---------------------------------------------------------------------------------------------------- */
2118 write_message_print_transport_debug (gssize bytes_written
,
2119 MessageToWriteData
*data
)
2121 if (G_LIKELY (!_g_dbus_debug_transport ()))
2124 _g_dbus_debug_print_lock ();
2125 g_print ("========================================================================\n"
2126 "GDBus-debug:Transport:\n"
2127 " >>>> WROTE %" G_GSSIZE_FORMAT
" bytes of message with serial %d and\n"
2128 " size %" G_GSIZE_FORMAT
" from offset %" G_GSIZE_FORMAT
" on a %s\n",
2130 g_dbus_message_get_serial (data
->message
),
2132 data
->total_written
,
2133 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data
->worker
->stream
))));
2134 _g_dbus_debug_print_unlock ();
2139 /* ---------------------------------------------------------------------------------------------------- */
2142 read_message_print_transport_debug (gssize bytes_read
,
2143 GDBusWorker
*worker
)
2147 gint32 message_length
;
2149 if (G_LIKELY (!_g_dbus_debug_transport ()))
2152 size
= bytes_read
+ worker
->read_buffer_cur_size
;
2156 message_length
= g_dbus_message_bytes_needed ((guchar
*) worker
->read_buffer
, size
, NULL
);
2159 switch (worker
->read_buffer
[0])
2163 serial
= GUINT32_FROM_LE (((guint32
*) worker
->read_buffer
)[2]);
2167 serial
= GUINT32_FROM_BE (((guint32
*) worker
->read_buffer
)[2]);
2170 /* an error will be set elsewhere if this happens */
2175 _g_dbus_debug_print_lock ();
2176 g_print ("========================================================================\n"
2177 "GDBus-debug:Transport:\n"
2178 " <<<< READ %" G_GSSIZE_FORMAT
" bytes of message with serial %d and\n"
2179 " size %d to offset %" G_GSIZE_FORMAT
" from a %s\n",
2183 worker
->read_buffer_cur_size
,
2184 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker
->stream
))));
2185 _g_dbus_debug_print_unlock ();
2190 /* ---------------------------------------------------------------------------------------------------- */
2193 _g_signal_accumulator_false_handled (GSignalInvocationHint
*ihint
,
2194 GValue
*return_accu
,
2195 const GValue
*handler_return
,
2198 gboolean continue_emission
;
2199 gboolean signal_return
;
2201 signal_return
= g_value_get_boolean (handler_return
);
2202 g_value_set_boolean (return_accu
, signal_return
);
2203 continue_emission
= signal_return
;
2205 return continue_emission
;