1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: Christian Kellner <gicmo@gnome.org>
24 #include "gbufferedoutputstream.h"
25 #include "goutputstream.h"
26 #include "gsimpleasyncresult.h"
31 * SECTION:gbufferedoutputstream
32 * @short_description: Buffered Output Stream
34 * @see_also: #GFilterOutputStream, #GOutputStream
36 * Buffered output stream implements #GFilterOutputStream and provides
37 * for buffered writes.
39 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
41 * To create a buffered output stream, use g_buffered_output_stream_new(),
42 * or g_buffered_output_stream_new_sized() to specify the buffer's size
45 * To get the size of a buffer within a buffered input stream, use
46 * g_buffered_output_stream_get_buffer_size(). To change the size of a
47 * buffered output stream's buffer, use
48 * g_buffered_output_stream_set_buffer_size(). Note that the buffer's
49 * size cannot be reduced below the size of the data within the buffer.
52 #define DEFAULT_BUFFER_SIZE 4096
54 struct _GBufferedOutputStreamPrivate
{
67 static void g_buffered_output_stream_set_property (GObject
*object
,
72 static void g_buffered_output_stream_get_property (GObject
*object
,
76 static void g_buffered_output_stream_finalize (GObject
*object
);
79 static gssize
g_buffered_output_stream_write (GOutputStream
*stream
,
82 GCancellable
*cancellable
,
84 static gboolean
g_buffered_output_stream_flush (GOutputStream
*stream
,
85 GCancellable
*cancellable
,
87 static gboolean
g_buffered_output_stream_close (GOutputStream
*stream
,
88 GCancellable
*cancellable
,
91 static void g_buffered_output_stream_write_async (GOutputStream
*stream
,
95 GCancellable
*cancellable
,
96 GAsyncReadyCallback callback
,
98 static gssize
g_buffered_output_stream_write_finish (GOutputStream
*stream
,
101 static void g_buffered_output_stream_flush_async (GOutputStream
*stream
,
103 GCancellable
*cancellable
,
104 GAsyncReadyCallback callback
,
106 static gboolean
g_buffered_output_stream_flush_finish (GOutputStream
*stream
,
107 GAsyncResult
*result
,
109 static void g_buffered_output_stream_close_async (GOutputStream
*stream
,
111 GCancellable
*cancellable
,
112 GAsyncReadyCallback callback
,
114 static gboolean
g_buffered_output_stream_close_finish (GOutputStream
*stream
,
115 GAsyncResult
*result
,
118 G_DEFINE_TYPE (GBufferedOutputStream
,
119 g_buffered_output_stream
,
120 G_TYPE_FILTER_OUTPUT_STREAM
)
124 g_buffered_output_stream_class_init (GBufferedOutputStreamClass
*klass
)
126 GObjectClass
*object_class
;
127 GOutputStreamClass
*ostream_class
;
129 g_type_class_add_private (klass
, sizeof (GBufferedOutputStreamPrivate
));
131 object_class
= G_OBJECT_CLASS (klass
);
132 object_class
->get_property
= g_buffered_output_stream_get_property
;
133 object_class
->set_property
= g_buffered_output_stream_set_property
;
134 object_class
->finalize
= g_buffered_output_stream_finalize
;
136 ostream_class
= G_OUTPUT_STREAM_CLASS (klass
);
137 ostream_class
->write_fn
= g_buffered_output_stream_write
;
138 ostream_class
->flush
= g_buffered_output_stream_flush
;
139 ostream_class
->close_fn
= g_buffered_output_stream_close
;
140 ostream_class
->write_async
= g_buffered_output_stream_write_async
;
141 ostream_class
->write_finish
= g_buffered_output_stream_write_finish
;
142 ostream_class
->flush_async
= g_buffered_output_stream_flush_async
;
143 ostream_class
->flush_finish
= g_buffered_output_stream_flush_finish
;
144 ostream_class
->close_async
= g_buffered_output_stream_close_async
;
145 ostream_class
->close_finish
= g_buffered_output_stream_close_finish
;
147 g_object_class_install_property (object_class
,
149 g_param_spec_uint ("buffer-size",
151 P_("The size of the backend buffer"),
155 G_PARAM_READWRITE
|G_PARAM_CONSTRUCT
|
156 G_PARAM_STATIC_NAME
|G_PARAM_STATIC_NICK
|G_PARAM_STATIC_BLURB
));
158 g_object_class_install_property (object_class
,
160 g_param_spec_boolean ("auto-grow",
162 P_("Whether the buffer should automatically grow"),
165 G_PARAM_STATIC_NAME
|G_PARAM_STATIC_NICK
|G_PARAM_STATIC_BLURB
));
170 * g_buffered_output_stream_get_buffer_size:
171 * @stream: a #GBufferedOutputStream.
173 * Gets the size of the buffer in the @stream.
175 * Returns: the current size of the buffer.
178 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream
*stream
)
180 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
), -1);
182 return stream
->priv
->len
;
186 * g_buffered_output_stream_set_buffer_size:
187 * @stream: a #GBufferedOutputStream.
190 * Sets the size of the internal buffer to @size.
193 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream
*stream
,
196 GBufferedOutputStreamPrivate
*priv
;
199 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
));
203 if (size
== priv
->len
)
208 size
= MAX (size
, priv
->pos
);
210 buffer
= g_malloc (size
);
211 memcpy (buffer
, priv
->buffer
, priv
->pos
);
212 g_free (priv
->buffer
);
213 priv
->buffer
= buffer
;
219 priv
->buffer
= g_malloc (size
);
224 g_object_notify (G_OBJECT (stream
), "buffer-size");
228 * g_buffered_output_stream_get_auto_grow:
229 * @stream: a #GBufferedOutputStream.
231 * Checks if the buffer automatically grows as data is added.
233 * Returns: %TRUE if the @stream's buffer automatically grows,
237 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream
*stream
)
239 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
), FALSE
);
241 return stream
->priv
->auto_grow
;
245 * g_buffered_output_stream_set_auto_grow:
246 * @stream: a #GBufferedOutputStream.
247 * @auto_grow: a #gboolean.
249 * Sets whether or not the @stream's buffer should automatically grow.
250 * If @auto_grow is true, then each write will just make the buffer
251 * larger, and you must manually flush the buffer to actually write out
252 * the data to the underlying stream.
255 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream
*stream
,
258 GBufferedOutputStreamPrivate
*priv
;
259 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
));
261 auto_grow
= auto_grow
!= FALSE
;
262 if (priv
->auto_grow
!= auto_grow
)
264 priv
->auto_grow
= auto_grow
;
265 g_object_notify (G_OBJECT (stream
), "auto-grow");
270 g_buffered_output_stream_set_property (GObject
*object
,
275 GBufferedOutputStream
*stream
;
277 stream
= G_BUFFERED_OUTPUT_STREAM (object
);
282 g_buffered_output_stream_set_buffer_size (stream
, g_value_get_uint (value
));
286 g_buffered_output_stream_set_auto_grow (stream
, g_value_get_boolean (value
));
290 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
297 g_buffered_output_stream_get_property (GObject
*object
,
302 GBufferedOutputStream
*buffered_stream
;
303 GBufferedOutputStreamPrivate
*priv
;
305 buffered_stream
= G_BUFFERED_OUTPUT_STREAM (object
);
306 priv
= buffered_stream
->priv
;
311 g_value_set_uint (value
, priv
->len
);
315 g_value_set_boolean (value
, priv
->auto_grow
);
319 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
326 g_buffered_output_stream_finalize (GObject
*object
)
328 GBufferedOutputStream
*stream
;
329 GBufferedOutputStreamPrivate
*priv
;
331 stream
= G_BUFFERED_OUTPUT_STREAM (object
);
334 g_free (priv
->buffer
);
336 G_OBJECT_CLASS (g_buffered_output_stream_parent_class
)->finalize (object
);
340 g_buffered_output_stream_init (GBufferedOutputStream
*stream
)
342 stream
->priv
= G_TYPE_INSTANCE_GET_PRIVATE (stream
,
343 G_TYPE_BUFFERED_OUTPUT_STREAM
,
344 GBufferedOutputStreamPrivate
);
349 * g_buffered_output_stream_new:
350 * @base_stream: a #GOutputStream.
352 * Creates a new buffered output stream for a base stream.
354 * Returns: a #GOutputStream for the given @base_stream.
357 g_buffered_output_stream_new (GOutputStream
*base_stream
)
359 GOutputStream
*stream
;
361 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream
), NULL
);
363 stream
= g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM
,
364 "base-stream", base_stream
,
371 * g_buffered_output_stream_new_sized:
372 * @base_stream: a #GOutputStream.
375 * Creates a new buffered output stream with a given buffer size.
377 * Returns: a #GOutputStream with an internal buffer set to @size.
380 g_buffered_output_stream_new_sized (GOutputStream
*base_stream
,
383 GOutputStream
*stream
;
385 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream
), NULL
);
387 stream
= g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM
,
388 "base-stream", base_stream
,
396 flush_buffer (GBufferedOutputStream
*stream
,
397 GCancellable
*cancellable
,
400 GBufferedOutputStreamPrivate
*priv
;
401 GOutputStream
*base_stream
;
408 base_stream
= G_FILTER_OUTPUT_STREAM (stream
)->base_stream
;
410 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream
), FALSE
);
412 res
= g_output_stream_write_all (base_stream
,
419 count
= priv
->pos
- bytes_written
;
422 g_memmove (priv
->buffer
, priv
->buffer
+ bytes_written
, count
);
424 priv
->pos
-= bytes_written
;
430 g_buffered_output_stream_write (GOutputStream
*stream
,
433 GCancellable
*cancellable
,
436 GBufferedOutputStream
*bstream
;
437 GBufferedOutputStreamPrivate
*priv
;
442 bstream
= G_BUFFERED_OUTPUT_STREAM (stream
);
443 priv
= bstream
->priv
;
445 n
= priv
->len
- priv
->pos
;
447 if (priv
->auto_grow
&& n
< count
)
449 new_size
= MAX (priv
->len
* 2, priv
->len
+ count
);
450 g_buffered_output_stream_set_buffer_size (bstream
, new_size
);
454 res
= flush_buffer (bstream
, cancellable
, error
);
460 n
= priv
->len
- priv
->pos
;
462 count
= MIN (count
, n
);
463 memcpy (priv
->buffer
+ priv
->pos
, buffer
, count
);
470 g_buffered_output_stream_flush (GOutputStream
*stream
,
471 GCancellable
*cancellable
,
474 GBufferedOutputStream
*bstream
;
475 GOutputStream
*base_stream
;
478 bstream
= G_BUFFERED_OUTPUT_STREAM (stream
);
479 base_stream
= G_FILTER_OUTPUT_STREAM (stream
)->base_stream
;
481 res
= flush_buffer (bstream
, cancellable
, error
);
486 res
= g_output_stream_flush (base_stream
, cancellable
, error
);
492 g_buffered_output_stream_close (GOutputStream
*stream
,
493 GCancellable
*cancellable
,
496 GBufferedOutputStream
*bstream
;
497 GOutputStream
*base_stream
;
500 bstream
= G_BUFFERED_OUTPUT_STREAM (stream
);
501 base_stream
= G_FILTER_OUTPUT_STREAM (bstream
)->base_stream
;
502 res
= flush_buffer (bstream
, cancellable
, error
);
504 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream
)))
506 /* report the first error but still close the stream */
508 res
= g_output_stream_close (base_stream
, cancellable
, error
);
510 g_output_stream_close (base_stream
, cancellable
, NULL
);
516 /* ************************** */
517 /* Async stuff implementation */
518 /* ************************** */
520 /* TODO: This should be using the base class async ops, not threads */
524 guint flush_stream
: 1;
525 guint close_stream
: 1;
530 free_flush_data (gpointer data
)
532 g_slice_free (FlushData
, data
);
535 /* This function is used by all three (i.e.
536 * _write, _flush, _close) functions since
537 * all of them will need to flush the buffer
538 * and so closing and writing is just a special
539 * case of flushing + some addition stuff */
541 flush_buffer_thread (GSimpleAsyncResult
*result
,
543 GCancellable
*cancellable
)
545 GBufferedOutputStream
*stream
;
546 GOutputStream
*base_stream
;
549 GError
*error
= NULL
;
551 stream
= G_BUFFERED_OUTPUT_STREAM (object
);
552 fdata
= g_simple_async_result_get_op_res_gpointer (result
);
553 base_stream
= G_FILTER_OUTPUT_STREAM (stream
)->base_stream
;
555 res
= flush_buffer (stream
, cancellable
, &error
);
557 /* if flushing the buffer didn't work don't even bother
558 * to flush the stream but just report that error */
559 if (res
&& fdata
->flush_stream
)
560 res
= g_output_stream_flush (base_stream
, cancellable
, &error
);
562 if (fdata
->close_stream
)
565 /* if flushing the buffer or the stream returned
566 * an error report that first error but still try
567 * close the stream */
568 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream
)))
571 g_output_stream_close (base_stream
, cancellable
, NULL
);
573 res
= g_output_stream_close (base_stream
, cancellable
, &error
);
578 g_simple_async_result_take_error (result
, error
);
591 free_write_data (gpointer data
)
593 g_slice_free (WriteData
, data
);
597 g_buffered_output_stream_write_async (GOutputStream
*stream
,
601 GCancellable
*cancellable
,
602 GAsyncReadyCallback callback
,
605 GBufferedOutputStream
*buffered_stream
;
606 GBufferedOutputStreamPrivate
*priv
;
607 GSimpleAsyncResult
*res
;
610 buffered_stream
= G_BUFFERED_OUTPUT_STREAM (stream
);
611 priv
= buffered_stream
->priv
;
613 wdata
= g_slice_new (WriteData
);
614 wdata
->count
= count
;
615 wdata
->buffer
= buffer
;
617 res
= g_simple_async_result_new (G_OBJECT (stream
),
620 g_buffered_output_stream_write_async
);
622 g_simple_async_result_set_op_res_gpointer (res
, wdata
, free_write_data
);
624 /* if we have space left directly call the
625 * callback (from idle) otherwise schedule a buffer
626 * flush in the thread. In both cases the actual
627 * copying of the data to the buffer will be done in
628 * the write_finish () func since that should
630 if (priv
->len
- priv
->pos
> 0)
632 g_simple_async_result_complete_in_idle (res
);
636 wdata
->fdata
.flush_stream
= FALSE
;
637 wdata
->fdata
.close_stream
= FALSE
;
638 g_simple_async_result_run_in_thread (res
,
642 g_object_unref (res
);
647 g_buffered_output_stream_write_finish (GOutputStream
*stream
,
648 GAsyncResult
*result
,
651 GBufferedOutputStreamPrivate
*priv
;
652 GBufferedOutputStream
*buffered_stream
;
653 GSimpleAsyncResult
*simple
;
657 simple
= G_SIMPLE_ASYNC_RESULT (result
);
658 buffered_stream
= G_BUFFERED_OUTPUT_STREAM (stream
);
659 priv
= buffered_stream
->priv
;
661 g_warn_if_fail (g_simple_async_result_get_source_tag (simple
) ==
662 g_buffered_output_stream_write_async
);
664 wdata
= g_simple_async_result_get_op_res_gpointer (simple
);
666 /* Now do the real copying of data to the buffer */
667 count
= priv
->len
- priv
->pos
;
668 count
= MIN (wdata
->count
, count
);
670 memcpy (priv
->buffer
+ priv
->pos
, wdata
->buffer
, count
);
678 g_buffered_output_stream_flush_async (GOutputStream
*stream
,
680 GCancellable
*cancellable
,
681 GAsyncReadyCallback callback
,
684 GSimpleAsyncResult
*res
;
687 fdata
= g_slice_new (FlushData
);
688 fdata
->flush_stream
= TRUE
;
689 fdata
->close_stream
= FALSE
;
691 res
= g_simple_async_result_new (G_OBJECT (stream
),
694 g_buffered_output_stream_flush_async
);
696 g_simple_async_result_set_op_res_gpointer (res
, fdata
, free_flush_data
);
698 g_simple_async_result_run_in_thread (res
,
702 g_object_unref (res
);
706 g_buffered_output_stream_flush_finish (GOutputStream
*stream
,
707 GAsyncResult
*result
,
710 GSimpleAsyncResult
*simple
;
712 simple
= G_SIMPLE_ASYNC_RESULT (result
);
714 g_warn_if_fail (g_simple_async_result_get_source_tag (simple
) ==
715 g_buffered_output_stream_flush_async
);
721 g_buffered_output_stream_close_async (GOutputStream
*stream
,
723 GCancellable
*cancellable
,
724 GAsyncReadyCallback callback
,
727 GSimpleAsyncResult
*res
;
730 fdata
= g_slice_new (FlushData
);
731 fdata
->close_stream
= TRUE
;
733 res
= g_simple_async_result_new (G_OBJECT (stream
),
736 g_buffered_output_stream_close_async
);
738 g_simple_async_result_set_op_res_gpointer (res
, fdata
, free_flush_data
);
740 g_simple_async_result_run_in_thread (res
,
744 g_object_unref (res
);
748 g_buffered_output_stream_close_finish (GOutputStream
*stream
,
749 GAsyncResult
*result
,
752 GSimpleAsyncResult
*simple
;
754 simple
= G_SIMPLE_ASYNC_RESULT (result
);
756 g_warn_if_fail (g_simple_async_result_get_source_tag (simple
) ==
757 g_buffered_output_stream_close_async
);