5 * Purple is the legal property of its developers, whose names are too numerous
6 * to list here. Please refer to the COPYRIGHT file distributed with this
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
25 #include "queuedoutputstream.h"
28 * PurpleQueuedOutputStream:
30 * An implementation of #GFilterOutputStream which allows queuing data for
31 * output. This allows data to be queued while other data is being output.
32 * Therefore, data doesn't have to be manually stored while waiting for
33 * stream operations to finish.
35 * To create a queued output stream, use #purple_queued_output_stream_new().
37 * To queue data, use #purple_queued_output_stream_push_bytes_async().
39 * If there's a fatal stream error, it's suggested to clear the remaining
40 * bytes queued with #purple_queued_output_stream_clear_queue() to avoid
41 * excessive errors returned in
42 * #purple_queued_output_stream_push_bytes_async()'s async callback.
44 struct _PurpleQueuedOutputStream
46 GFilterOutputStream parent
;
52 gboolean pending_queued
;
53 } PurpleQueuedOutputStreamPrivate
;
55 G_DEFINE_TYPE_WITH_PRIVATE(PurpleQueuedOutputStream
,
56 purple_queued_output_stream
, G_TYPE_FILTER_OUTPUT_STREAM
)
58 /******************************************************************************
60 *****************************************************************************/
62 static void purple_queued_output_stream_start_push_bytes_async(GTask
*task
);
65 purple_queued_output_stream_push_bytes_async_cb(GObject
*source
,
66 GAsyncResult
*res
, gpointer user_data
)
68 GTask
*task
= G_TASK(user_data
);
69 PurpleQueuedOutputStream
*stream
= g_task_get_source_object(task
);
70 PurpleQueuedOutputStreamPrivate
*priv
= purple_queued_output_stream_get_instance_private(stream
);
76 written
= g_output_stream_write_bytes_finish(G_OUTPUT_STREAM(source
),
79 bytes
= g_task_get_task_data(task
);
80 size
= g_bytes_get_size(bytes
);
83 /* Error occurred, return error */
84 g_task_return_error(task
, error
);
85 g_clear_object(&task
);
86 } else if (size
> written
) {
87 /* Partial write, prepare to send remaining data */
88 bytes
= g_bytes_new_from_bytes(bytes
, written
, size
- written
);
89 g_task_set_task_data(task
, bytes
,
90 (GDestroyNotify
)g_bytes_unref
);
92 /* Full write, this task is finished */
93 g_task_return_boolean(task
, TRUE
);
94 g_clear_object(&task
);
97 /* If g_task_return_* was called in this function, the callback
98 * may have cleared the stream. If so, there will be no remaining
99 * tasks to process here.
103 /* Any queued data left? */
104 task
= g_async_queue_try_pop(priv
->queue
);
108 /* More to process */
109 purple_queued_output_stream_start_push_bytes_async(task
);
112 priv
->pending_queued
= FALSE
;
113 g_output_stream_clear_pending(G_OUTPUT_STREAM(stream
));
118 purple_queued_output_stream_start_push_bytes_async(GTask
*task
)
120 PurpleQueuedOutputStream
*stream
= g_task_get_source_object(task
);
121 GOutputStream
*base_stream
;
123 base_stream
= g_filter_output_stream_get_base_stream(
124 G_FILTER_OUTPUT_STREAM(stream
));
126 g_output_stream_write_bytes_async(base_stream
,
127 g_task_get_task_data(task
),
128 g_task_get_priority(task
),
129 g_task_get_cancellable(task
),
130 purple_queued_output_stream_push_bytes_async_cb
,
134 /******************************************************************************
135 * GObject Implementation
136 *****************************************************************************/
139 purple_queued_output_stream_dispose(GObject
*object
)
141 PurpleQueuedOutputStream
*stream
= PURPLE_QUEUED_OUTPUT_STREAM(object
);
142 PurpleQueuedOutputStreamPrivate
*priv
= purple_queued_output_stream_get_instance_private(stream
);
144 g_clear_pointer(&priv
->queue
, g_async_queue_unref
);
146 G_OBJECT_CLASS(purple_queued_output_stream_parent_class
)->dispose(object
);
150 purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass
*klass
)
152 GObjectClass
*obj_class
= G_OBJECT_CLASS(klass
);
154 obj_class
->dispose
= purple_queued_output_stream_dispose
;
158 purple_queued_output_stream_init(PurpleQueuedOutputStream
*stream
)
160 PurpleQueuedOutputStreamPrivate
*priv
= purple_queued_output_stream_get_instance_private(stream
);
161 priv
->queue
= g_async_queue_new_full((GDestroyNotify
)g_bytes_unref
);
162 priv
->pending_queued
= FALSE
;
165 /******************************************************************************
167 *****************************************************************************/
169 PurpleQueuedOutputStream
*
170 purple_queued_output_stream_new(GOutputStream
*base_stream
)
172 PurpleQueuedOutputStream
*stream
;
174 g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream
), NULL
);
176 stream
= g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM
,
177 "base-stream", base_stream
,
184 purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream
*stream
,
185 GBytes
*bytes
, int io_priority
, GCancellable
*cancellable
,
186 GAsyncReadyCallback callback
, gpointer user_data
)
189 gboolean set_pending
;
190 GError
*error
= NULL
;
191 PurpleQueuedOutputStreamPrivate
*priv
= NULL
;
193 g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream
));
194 g_return_if_fail(bytes
!= NULL
);
196 priv
= purple_queued_output_stream_get_instance_private(stream
);
198 task
= g_task_new(stream
, cancellable
, callback
, user_data
);
199 g_task_set_task_data(task
, g_bytes_ref(bytes
),
200 (GDestroyNotify
)g_bytes_unref
);
201 g_task_set_source_tag(task
,
202 purple_queued_output_stream_push_bytes_async
);
203 g_task_set_priority(task
, io_priority
);
205 set_pending
= g_output_stream_set_pending(
206 G_OUTPUT_STREAM(stream
), &error
);
208 /* Since we're allowing queuing requests without blocking,
209 * it's not an error to be pending while processing queued operations.
211 if (!set_pending
&& (!g_error_matches(error
,
212 G_IO_ERROR
, G_IO_ERROR_PENDING
) ||
213 !priv
->pending_queued
)) {
214 g_task_return_error(task
, error
);
215 g_object_unref(task
);
219 priv
->pending_queued
= TRUE
;
222 /* Start processing if there were no pending operations */
223 purple_queued_output_stream_start_push_bytes_async(task
);
225 /* Otherwise queue the data */
226 g_async_queue_push(priv
->queue
, task
);
231 purple_queued_output_stream_push_bytes_finish(PurpleQueuedOutputStream
*stream
,
232 GAsyncResult
*result
, GError
**error
)
234 g_return_val_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream
), FALSE
);
235 g_return_val_if_fail(g_task_is_valid(result
, stream
), FALSE
);
236 g_return_val_if_fail(g_async_result_is_tagged(result
,
237 purple_queued_output_stream_push_bytes_async
), FALSE
);
239 return g_task_propagate_boolean(G_TASK(result
), error
);
243 purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream
*stream
)
246 PurpleQueuedOutputStreamPrivate
*priv
= NULL
;
248 g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream
));
250 priv
= purple_queued_output_stream_get_instance_private(stream
);
252 while ((task
= g_async_queue_try_pop(priv
->queue
)) != NULL
) {
253 g_task_return_new_error(task
, G_IO_ERROR
, G_IO_ERROR_CANCELLED
,
254 "PurpleQueuedOutputStream queue cleared");
255 g_object_unref(task
);