Migrate certificates, icons, logs to XDG dirs
[pidgin-git.git] / libpurple / queuedoutputstream.c
blob182f2953b2ec366497e2e7b253c56a15883b67cb
1 /*
3 * purple
5 * Purple is the legal property of its developers, whose names are too numerous
6 * to list here. Please refer to the COPYRIGHT file distributed with this
7 * source distribution.
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
24 #include "internal.h"
25 #include "queuedoutputstream.h"
27 struct _PurpleQueuedOutputStreamPrivate {
28 GAsyncQueue *queue;
29 GBytes *next;
32 static GObjectClass *parent_class = NULL;
34 #define PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(obj) \
35 (G_TYPE_INSTANCE_GET_PRIVATE((obj), \
36 PURPLE_TYPE_QUEUED_OUTPUT_STREAM, \
37 PurpleQueuedOutputStreamPrivate))
39 G_DEFINE_TYPE_WITH_CODE(PurpleQueuedOutputStream, purple_queued_output_stream,
40 G_TYPE_FILTER_OUTPUT_STREAM,
41 G_ADD_PRIVATE(PurpleQueuedOutputStream))
43 static void purple_queued_output_stream_dispose(GObject *object);
44 static gboolean purple_queued_output_stream_flush(GOutputStream *stream,
45 GCancellable *cancellable, GError **error);
46 static void purple_queued_output_stream_flush_async(GOutputStream *stream,
47 int io_priority, GCancellable *cancellable,
48 GAsyncReadyCallback callback, gpointer user_data);
49 static gboolean purple_queued_output_stream_flush_finish(GOutputStream *stream,
50 GAsyncResult *result, GError **error);
52 static void purple_queued_output_stream_start_flush_async(GTask *task);
54 static void
55 purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
57 GObjectClass *object_class;
58 GOutputStreamClass *ostream_class;
60 parent_class = g_type_class_peek_parent(klass);
62 object_class = G_OBJECT_CLASS(klass);
63 object_class->dispose = purple_queued_output_stream_dispose;
65 ostream_class = G_OUTPUT_STREAM_CLASS(klass);
66 ostream_class->flush = purple_queued_output_stream_flush;
67 ostream_class->flush_async = purple_queued_output_stream_flush_async;
68 ostream_class->flush_finish = purple_queued_output_stream_flush_finish;
71 PurpleQueuedOutputStream *
72 purple_queued_output_stream_new(GOutputStream *base_stream)
74 PurpleQueuedOutputStream *stream;
76 g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL);
78 stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM,
79 "base-stream", base_stream,
80 NULL);
82 return stream;
85 void
86 purple_queued_output_stream_push_bytes(PurpleQueuedOutputStream *stream,
87 GBytes *bytes)
89 g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream));
90 g_return_if_fail(bytes != NULL);
92 g_async_queue_push(stream->priv->queue, g_bytes_ref(bytes));
95 static void
96 purple_queued_output_stream_init(PurpleQueuedOutputStream *stream)
98 stream->priv = PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream);
99 stream->priv->queue =
100 g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
103 static void
104 purple_queued_output_stream_dispose(GObject *object)
106 PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object);
108 /* Chain up first in case the stream is flushed */
109 G_OBJECT_CLASS(parent_class)->dispose(object);
111 g_clear_pointer(&stream->priv->queue, g_async_queue_unref);
112 g_clear_pointer(&stream->priv->next, g_bytes_unref);
115 static gboolean
116 purple_queued_output_stream_flush(GOutputStream *stream,
117 GCancellable *cancellable, GError **error)
119 PurpleQueuedOutputStreamPrivate *priv;
120 GOutputStream *base_stream;
121 GBytes *bytes;
122 const void *buffer;
123 gsize count;
124 gsize bytes_written = 0;
125 gboolean ret = TRUE;
127 priv = PURPLE_QUEUED_OUTPUT_STREAM(stream)->priv;
128 base_stream = g_filter_output_stream_get_base_stream(
129 G_FILTER_OUTPUT_STREAM(stream));
131 do {
132 if (priv->next != NULL) {
133 bytes = priv->next;
134 priv->next = NULL;
135 } else {
136 bytes = g_async_queue_try_pop(priv->queue);
139 if (bytes == NULL) {
140 break;
143 buffer = g_bytes_get_data(bytes, &count);
145 ret = g_output_stream_write_all(base_stream, buffer, count,
146 &bytes_written, cancellable, error);
148 if (!ret) {
149 GBytes *queue_bytes;
151 if (bytes_written > 0) {
152 queue_bytes = g_bytes_new_from_bytes(bytes,
153 bytes_written,
154 count - bytes_written);
155 } else {
156 queue_bytes = g_bytes_ref(bytes);
159 priv->next = queue_bytes;
162 g_bytes_unref(bytes);
163 } while (ret);
165 return ret;
168 static void
169 purple_queued_output_stream_flush_async_cb(GObject *source,
170 GAsyncResult *res, gpointer user_data)
172 GTask *task = user_data;
173 PurpleQueuedOutputStream *stream;
174 gssize written;
175 gsize size;
176 GBytes *old_bytes;
177 GError *error = NULL;
179 written = g_output_stream_write_bytes_finish(G_OUTPUT_STREAM(source),
180 res, &error);
182 if (written < 0) {
183 g_task_return_error(task, error);
184 return;
187 stream = PURPLE_QUEUED_OUTPUT_STREAM(g_task_get_source_object(task));
188 size = g_bytes_get_size(stream->priv->next);
190 old_bytes = stream->priv->next;
191 stream->priv->next = NULL;
193 if (size > (gsize)written) {
194 stream->priv->next = g_bytes_new_from_bytes(old_bytes,
195 written, size - written);
198 g_bytes_unref(old_bytes);
200 purple_queued_output_stream_start_flush_async(task);
203 static void
204 purple_queued_output_stream_start_flush_async(GTask *task)
206 PurpleQueuedOutputStream *stream;
207 GOutputStream *base_stream;
209 stream = PURPLE_QUEUED_OUTPUT_STREAM(g_task_get_source_object(task));
210 base_stream = g_filter_output_stream_get_base_stream(
211 G_FILTER_OUTPUT_STREAM(stream));
213 if (stream->priv->next == NULL) {
214 stream->priv->next =
215 g_async_queue_try_pop(stream->priv->queue);
217 if (stream->priv->next == NULL) {
218 g_task_return_boolean(task, TRUE);
219 return;
223 g_output_stream_write_bytes_async(base_stream, stream->priv->next,
224 g_task_get_priority(task),
225 g_task_get_cancellable(task),
226 purple_queued_output_stream_flush_async_cb, task);
229 static void
230 purple_queued_output_stream_flush_async(GOutputStream *stream,
231 int io_priority, GCancellable *cancellable,
232 GAsyncReadyCallback callback, gpointer user_data)
234 GTask *task;
236 task = g_task_new(stream, cancellable, callback, user_data);
237 g_task_set_priority(task, io_priority);
239 purple_queued_output_stream_start_flush_async(task);
242 static gboolean
243 purple_queued_output_stream_flush_finish(GOutputStream *stream,
244 GAsyncResult *result, GError **error)
246 g_return_val_if_fail(g_task_is_valid(result, stream), FALSE);
248 return g_task_propagate_boolean(G_TASK(result), error);