5 * Purple is the legal property of its developers, whose names are too numerous
6 * to list here. Please refer to the COPYRIGHT file distributed with this
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
25 #include "queuedoutputstream.h"
27 struct _PurpleQueuedOutputStreamPrivate
{
32 static GObjectClass
*parent_class
= NULL
;
34 #define PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(obj) \
35 (G_TYPE_INSTANCE_GET_PRIVATE((obj), \
36 PURPLE_TYPE_QUEUED_OUTPUT_STREAM, \
37 PurpleQueuedOutputStreamPrivate))
39 G_DEFINE_TYPE_WITH_CODE(PurpleQueuedOutputStream
, purple_queued_output_stream
,
40 G_TYPE_FILTER_OUTPUT_STREAM
,
41 G_ADD_PRIVATE(PurpleQueuedOutputStream
))
43 static void purple_queued_output_stream_dispose(GObject
*object
);
44 static gboolean
purple_queued_output_stream_flush(GOutputStream
*stream
,
45 GCancellable
*cancellable
, GError
**error
);
46 static void purple_queued_output_stream_flush_async(GOutputStream
*stream
,
47 int io_priority
, GCancellable
*cancellable
,
48 GAsyncReadyCallback callback
, gpointer user_data
);
49 static gboolean
purple_queued_output_stream_flush_finish(GOutputStream
*stream
,
50 GAsyncResult
*result
, GError
**error
);
52 static void purple_queued_output_stream_start_flush_async(GTask
*task
);
55 purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass
*klass
)
57 GObjectClass
*object_class
;
58 GOutputStreamClass
*ostream_class
;
60 parent_class
= g_type_class_peek_parent(klass
);
62 object_class
= G_OBJECT_CLASS(klass
);
63 object_class
->dispose
= purple_queued_output_stream_dispose
;
65 ostream_class
= G_OUTPUT_STREAM_CLASS(klass
);
66 ostream_class
->flush
= purple_queued_output_stream_flush
;
67 ostream_class
->flush_async
= purple_queued_output_stream_flush_async
;
68 ostream_class
->flush_finish
= purple_queued_output_stream_flush_finish
;
71 PurpleQueuedOutputStream
*
72 purple_queued_output_stream_new(GOutputStream
*base_stream
)
74 PurpleQueuedOutputStream
*stream
;
76 g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream
), NULL
);
78 stream
= g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM
,
79 "base-stream", base_stream
,
86 purple_queued_output_stream_push_bytes(PurpleQueuedOutputStream
*stream
,
89 g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream
));
90 g_return_if_fail(bytes
!= NULL
);
92 g_async_queue_push(stream
->priv
->queue
, g_bytes_ref(bytes
));
96 purple_queued_output_stream_init(PurpleQueuedOutputStream
*stream
)
98 stream
->priv
= PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream
);
100 g_async_queue_new_full((GDestroyNotify
)g_bytes_unref
);
104 purple_queued_output_stream_dispose(GObject
*object
)
106 PurpleQueuedOutputStream
*stream
= PURPLE_QUEUED_OUTPUT_STREAM(object
);
108 /* Chain up first in case the stream is flushed */
109 G_OBJECT_CLASS(parent_class
)->dispose(object
);
111 g_clear_pointer(&stream
->priv
->queue
, g_async_queue_unref
);
112 g_clear_pointer(&stream
->priv
->next
, g_bytes_unref
);
116 purple_queued_output_stream_flush(GOutputStream
*stream
,
117 GCancellable
*cancellable
, GError
**error
)
119 PurpleQueuedOutputStreamPrivate
*priv
;
120 GOutputStream
*base_stream
;
124 gsize bytes_written
= 0;
127 priv
= PURPLE_QUEUED_OUTPUT_STREAM(stream
)->priv
;
128 base_stream
= g_filter_output_stream_get_base_stream(
129 G_FILTER_OUTPUT_STREAM(stream
));
132 if (priv
->next
!= NULL
) {
136 bytes
= g_async_queue_try_pop(priv
->queue
);
143 buffer
= g_bytes_get_data(bytes
, &count
);
145 ret
= g_output_stream_write_all(base_stream
, buffer
, count
,
146 &bytes_written
, cancellable
, error
);
151 if (bytes_written
> 0) {
152 queue_bytes
= g_bytes_new_from_bytes(bytes
,
154 count
- bytes_written
);
156 queue_bytes
= g_bytes_ref(bytes
);
159 priv
->next
= queue_bytes
;
162 g_bytes_unref(bytes
);
169 purple_queued_output_stream_flush_async_cb(GObject
*source
,
170 GAsyncResult
*res
, gpointer user_data
)
172 GTask
*task
= user_data
;
173 PurpleQueuedOutputStream
*stream
;
177 GError
*error
= NULL
;
179 written
= g_output_stream_write_bytes_finish(G_OUTPUT_STREAM(source
),
183 g_task_return_error(task
, error
);
187 stream
= PURPLE_QUEUED_OUTPUT_STREAM(g_task_get_source_object(task
));
188 size
= g_bytes_get_size(stream
->priv
->next
);
190 old_bytes
= stream
->priv
->next
;
191 stream
->priv
->next
= NULL
;
193 if (size
> (gsize
)written
) {
194 stream
->priv
->next
= g_bytes_new_from_bytes(old_bytes
,
195 written
, size
- written
);
198 g_bytes_unref(old_bytes
);
200 purple_queued_output_stream_start_flush_async(task
);
204 purple_queued_output_stream_start_flush_async(GTask
*task
)
206 PurpleQueuedOutputStream
*stream
;
207 GOutputStream
*base_stream
;
209 stream
= PURPLE_QUEUED_OUTPUT_STREAM(g_task_get_source_object(task
));
210 base_stream
= g_filter_output_stream_get_base_stream(
211 G_FILTER_OUTPUT_STREAM(stream
));
213 if (stream
->priv
->next
== NULL
) {
215 g_async_queue_try_pop(stream
->priv
->queue
);
217 if (stream
->priv
->next
== NULL
) {
218 g_task_return_boolean(task
, TRUE
);
223 g_output_stream_write_bytes_async(base_stream
, stream
->priv
->next
,
224 g_task_get_priority(task
),
225 g_task_get_cancellable(task
),
226 purple_queued_output_stream_flush_async_cb
, task
);
230 purple_queued_output_stream_flush_async(GOutputStream
*stream
,
231 int io_priority
, GCancellable
*cancellable
,
232 GAsyncReadyCallback callback
, gpointer user_data
)
236 task
= g_task_new(stream
, cancellable
, callback
, user_data
);
237 g_task_set_priority(task
, io_priority
);
239 purple_queued_output_stream_start_flush_async(task
);
243 purple_queued_output_stream_flush_finish(GOutputStream
*stream
,
244 GAsyncResult
*result
, GError
**error
)
246 g_return_val_if_fail(g_task_is_valid(result
, stream
), FALSE
);
248 return g_task_propagate_boolean(G_TASK(result
), error
);