1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: Christian Kellner <gicmo@gnome.org>
24 #include "gbufferedoutputstream.h"
25 #include "goutputstream.h"
26 #include "gseekable.h"
33 * SECTION:gbufferedoutputstream
34 * @short_description: Buffered Output Stream
36 * @see_also: #GFilterOutputStream, #GOutputStream
38 * Buffered output stream implements #GFilterOutputStream and provides
39 * for buffered writes.
41 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
43 * To create a buffered output stream, use g_buffered_output_stream_new(),
44 * or g_buffered_output_stream_new_sized() to specify the buffer's size
47 * To get the size of a buffer within a buffered input stream, use
48 * g_buffered_output_stream_get_buffer_size(). To change the size of a
49 * buffered output stream's buffer, use
50 * g_buffered_output_stream_set_buffer_size(). Note that the buffer's
51 * size cannot be reduced below the size of the data within the buffer.
54 #define DEFAULT_BUFFER_SIZE 4096
56 struct _GBufferedOutputStreamPrivate
{
69 static void g_buffered_output_stream_set_property (GObject
*object
,
74 static void g_buffered_output_stream_get_property (GObject
*object
,
78 static void g_buffered_output_stream_finalize (GObject
*object
);
81 static gssize
g_buffered_output_stream_write (GOutputStream
*stream
,
84 GCancellable
*cancellable
,
86 static gboolean
g_buffered_output_stream_flush (GOutputStream
*stream
,
87 GCancellable
*cancellable
,
89 static gboolean
g_buffered_output_stream_close (GOutputStream
*stream
,
90 GCancellable
*cancellable
,
93 static void g_buffered_output_stream_flush_async (GOutputStream
*stream
,
95 GCancellable
*cancellable
,
96 GAsyncReadyCallback callback
,
98 static gboolean
g_buffered_output_stream_flush_finish (GOutputStream
*stream
,
101 static void g_buffered_output_stream_close_async (GOutputStream
*stream
,
103 GCancellable
*cancellable
,
104 GAsyncReadyCallback callback
,
106 static gboolean
g_buffered_output_stream_close_finish (GOutputStream
*stream
,
107 GAsyncResult
*result
,
110 static void g_buffered_output_stream_seekable_iface_init (GSeekableIface
*iface
);
111 static goffset
g_buffered_output_stream_tell (GSeekable
*seekable
);
112 static gboolean
g_buffered_output_stream_can_seek (GSeekable
*seekable
);
113 static gboolean
g_buffered_output_stream_seek (GSeekable
*seekable
,
116 GCancellable
*cancellable
,
118 static gboolean
g_buffered_output_stream_can_truncate (GSeekable
*seekable
);
119 static gboolean
g_buffered_output_stream_truncate (GSeekable
*seekable
,
121 GCancellable
*cancellable
,
124 G_DEFINE_TYPE_WITH_CODE (GBufferedOutputStream
,
125 g_buffered_output_stream
,
126 G_TYPE_FILTER_OUTPUT_STREAM
,
127 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE
,
128 g_buffered_output_stream_seekable_iface_init
))
132 g_buffered_output_stream_class_init (GBufferedOutputStreamClass
*klass
)
134 GObjectClass
*object_class
;
135 GOutputStreamClass
*ostream_class
;
137 g_type_class_add_private (klass
, sizeof (GBufferedOutputStreamPrivate
));
139 object_class
= G_OBJECT_CLASS (klass
);
140 object_class
->get_property
= g_buffered_output_stream_get_property
;
141 object_class
->set_property
= g_buffered_output_stream_set_property
;
142 object_class
->finalize
= g_buffered_output_stream_finalize
;
144 ostream_class
= G_OUTPUT_STREAM_CLASS (klass
);
145 ostream_class
->write_fn
= g_buffered_output_stream_write
;
146 ostream_class
->flush
= g_buffered_output_stream_flush
;
147 ostream_class
->close_fn
= g_buffered_output_stream_close
;
148 ostream_class
->flush_async
= g_buffered_output_stream_flush_async
;
149 ostream_class
->flush_finish
= g_buffered_output_stream_flush_finish
;
150 ostream_class
->close_async
= g_buffered_output_stream_close_async
;
151 ostream_class
->close_finish
= g_buffered_output_stream_close_finish
;
153 g_object_class_install_property (object_class
,
155 g_param_spec_uint ("buffer-size",
157 P_("The size of the backend buffer"),
161 G_PARAM_READWRITE
|G_PARAM_CONSTRUCT
|
162 G_PARAM_STATIC_NAME
|G_PARAM_STATIC_NICK
|G_PARAM_STATIC_BLURB
));
164 g_object_class_install_property (object_class
,
166 g_param_spec_boolean ("auto-grow",
168 P_("Whether the buffer should automatically grow"),
171 G_PARAM_STATIC_NAME
|G_PARAM_STATIC_NICK
|G_PARAM_STATIC_BLURB
));
176 * g_buffered_output_stream_get_buffer_size:
177 * @stream: a #GBufferedOutputStream.
179 * Gets the size of the buffer in the @stream.
181 * Returns: the current size of the buffer.
184 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream
*stream
)
186 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
), -1);
188 return stream
->priv
->len
;
192 * g_buffered_output_stream_set_buffer_size:
193 * @stream: a #GBufferedOutputStream.
196 * Sets the size of the internal buffer to @size.
199 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream
*stream
,
202 GBufferedOutputStreamPrivate
*priv
;
205 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
));
209 if (size
== priv
->len
)
214 size
= MAX (size
, priv
->pos
);
216 buffer
= g_malloc (size
);
217 memcpy (buffer
, priv
->buffer
, priv
->pos
);
218 g_free (priv
->buffer
);
219 priv
->buffer
= buffer
;
225 priv
->buffer
= g_malloc (size
);
230 g_object_notify (G_OBJECT (stream
), "buffer-size");
234 * g_buffered_output_stream_get_auto_grow:
235 * @stream: a #GBufferedOutputStream.
237 * Checks if the buffer automatically grows as data is added.
239 * Returns: %TRUE if the @stream's buffer automatically grows,
243 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream
*stream
)
245 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
), FALSE
);
247 return stream
->priv
->auto_grow
;
251 * g_buffered_output_stream_set_auto_grow:
252 * @stream: a #GBufferedOutputStream.
253 * @auto_grow: a #gboolean.
255 * Sets whether or not the @stream's buffer should automatically grow.
256 * If @auto_grow is true, then each write will just make the buffer
257 * larger, and you must manually flush the buffer to actually write out
258 * the data to the underlying stream.
261 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream
*stream
,
264 GBufferedOutputStreamPrivate
*priv
;
265 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
));
267 auto_grow
= auto_grow
!= FALSE
;
268 if (priv
->auto_grow
!= auto_grow
)
270 priv
->auto_grow
= auto_grow
;
271 g_object_notify (G_OBJECT (stream
), "auto-grow");
276 g_buffered_output_stream_set_property (GObject
*object
,
281 GBufferedOutputStream
*stream
;
283 stream
= G_BUFFERED_OUTPUT_STREAM (object
);
288 g_buffered_output_stream_set_buffer_size (stream
, g_value_get_uint (value
));
292 g_buffered_output_stream_set_auto_grow (stream
, g_value_get_boolean (value
));
296 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
303 g_buffered_output_stream_get_property (GObject
*object
,
308 GBufferedOutputStream
*buffered_stream
;
309 GBufferedOutputStreamPrivate
*priv
;
311 buffered_stream
= G_BUFFERED_OUTPUT_STREAM (object
);
312 priv
= buffered_stream
->priv
;
317 g_value_set_uint (value
, priv
->len
);
321 g_value_set_boolean (value
, priv
->auto_grow
);
325 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
332 g_buffered_output_stream_finalize (GObject
*object
)
334 GBufferedOutputStream
*stream
;
335 GBufferedOutputStreamPrivate
*priv
;
337 stream
= G_BUFFERED_OUTPUT_STREAM (object
);
340 g_free (priv
->buffer
);
342 G_OBJECT_CLASS (g_buffered_output_stream_parent_class
)->finalize (object
);
346 g_buffered_output_stream_init (GBufferedOutputStream
*stream
)
348 stream
->priv
= G_TYPE_INSTANCE_GET_PRIVATE (stream
,
349 G_TYPE_BUFFERED_OUTPUT_STREAM
,
350 GBufferedOutputStreamPrivate
);
355 g_buffered_output_stream_seekable_iface_init (GSeekableIface
*iface
)
357 iface
->tell
= g_buffered_output_stream_tell
;
358 iface
->can_seek
= g_buffered_output_stream_can_seek
;
359 iface
->seek
= g_buffered_output_stream_seek
;
360 iface
->can_truncate
= g_buffered_output_stream_can_truncate
;
361 iface
->truncate_fn
= g_buffered_output_stream_truncate
;
365 * g_buffered_output_stream_new:
366 * @base_stream: a #GOutputStream.
368 * Creates a new buffered output stream for a base stream.
370 * Returns: a #GOutputStream for the given @base_stream.
373 g_buffered_output_stream_new (GOutputStream
*base_stream
)
375 GOutputStream
*stream
;
377 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream
), NULL
);
379 stream
= g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM
,
380 "base-stream", base_stream
,
387 * g_buffered_output_stream_new_sized:
388 * @base_stream: a #GOutputStream.
391 * Creates a new buffered output stream with a given buffer size.
393 * Returns: a #GOutputStream with an internal buffer set to @size.
396 g_buffered_output_stream_new_sized (GOutputStream
*base_stream
,
399 GOutputStream
*stream
;
401 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream
), NULL
);
403 stream
= g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM
,
404 "base-stream", base_stream
,
412 flush_buffer (GBufferedOutputStream
*stream
,
413 GCancellable
*cancellable
,
416 GBufferedOutputStreamPrivate
*priv
;
417 GOutputStream
*base_stream
;
424 base_stream
= G_FILTER_OUTPUT_STREAM (stream
)->base_stream
;
426 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream
), FALSE
);
428 res
= g_output_stream_write_all (base_stream
,
435 count
= priv
->pos
- bytes_written
;
438 g_memmove (priv
->buffer
, priv
->buffer
+ bytes_written
, count
);
440 priv
->pos
-= bytes_written
;
446 g_buffered_output_stream_write (GOutputStream
*stream
,
449 GCancellable
*cancellable
,
452 GBufferedOutputStream
*bstream
;
453 GBufferedOutputStreamPrivate
*priv
;
458 bstream
= G_BUFFERED_OUTPUT_STREAM (stream
);
459 priv
= bstream
->priv
;
461 n
= priv
->len
- priv
->pos
;
463 if (priv
->auto_grow
&& n
< count
)
465 new_size
= MAX (priv
->len
* 2, priv
->len
+ count
);
466 g_buffered_output_stream_set_buffer_size (bstream
, new_size
);
470 res
= flush_buffer (bstream
, cancellable
, error
);
476 n
= priv
->len
- priv
->pos
;
478 count
= MIN (count
, n
);
479 memcpy (priv
->buffer
+ priv
->pos
, buffer
, count
);
486 g_buffered_output_stream_flush (GOutputStream
*stream
,
487 GCancellable
*cancellable
,
490 GBufferedOutputStream
*bstream
;
491 GOutputStream
*base_stream
;
494 bstream
= G_BUFFERED_OUTPUT_STREAM (stream
);
495 base_stream
= G_FILTER_OUTPUT_STREAM (stream
)->base_stream
;
497 res
= flush_buffer (bstream
, cancellable
, error
);
502 res
= g_output_stream_flush (base_stream
, cancellable
, error
);
508 g_buffered_output_stream_close (GOutputStream
*stream
,
509 GCancellable
*cancellable
,
512 GBufferedOutputStream
*bstream
;
513 GOutputStream
*base_stream
;
516 bstream
= G_BUFFERED_OUTPUT_STREAM (stream
);
517 base_stream
= G_FILTER_OUTPUT_STREAM (bstream
)->base_stream
;
518 res
= flush_buffer (bstream
, cancellable
, error
);
520 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream
)))
522 /* report the first error but still close the stream */
524 res
= g_output_stream_close (base_stream
, cancellable
, error
);
526 g_output_stream_close (base_stream
, cancellable
, NULL
);
533 g_buffered_output_stream_tell (GSeekable
*seekable
)
535 GBufferedOutputStream
*bstream
;
536 GBufferedOutputStreamPrivate
*priv
;
537 GOutputStream
*base_stream
;
538 GSeekable
*base_stream_seekable
;
541 bstream
= G_BUFFERED_OUTPUT_STREAM (seekable
);
542 priv
= bstream
->priv
;
544 base_stream
= G_FILTER_OUTPUT_STREAM (seekable
)->base_stream
;
545 if (!G_IS_SEEKABLE (base_stream
))
548 base_stream_seekable
= G_SEEKABLE (base_stream
);
550 base_offset
= g_seekable_tell (base_stream_seekable
);
551 return base_offset
+ priv
->pos
;
555 g_buffered_output_stream_can_seek (GSeekable
*seekable
)
557 GOutputStream
*base_stream
;
559 base_stream
= G_FILTER_OUTPUT_STREAM (seekable
)->base_stream
;
560 return G_IS_SEEKABLE (base_stream
) && g_seekable_can_seek (G_SEEKABLE (base_stream
));
564 g_buffered_output_stream_seek (GSeekable
*seekable
,
567 GCancellable
*cancellable
,
570 GBufferedOutputStream
*bstream
;
571 GOutputStream
*base_stream
;
572 GSeekable
*base_stream_seekable
;
575 bstream
= G_BUFFERED_OUTPUT_STREAM (seekable
);
577 base_stream
= G_FILTER_OUTPUT_STREAM (seekable
)->base_stream
;
578 if (!G_IS_SEEKABLE (base_stream
))
580 g_set_error_literal (error
, G_IO_ERROR
, G_IO_ERROR_NOT_SUPPORTED
,
581 _("Seek not supported on base stream"));
585 base_stream_seekable
= G_SEEKABLE (base_stream
);
586 flushed
= flush_buffer (bstream
, cancellable
, error
);
590 return g_seekable_seek (base_stream_seekable
, offset
, type
, cancellable
, error
);
594 g_buffered_output_stream_can_truncate (GSeekable
*seekable
)
596 GOutputStream
*base_stream
;
598 base_stream
= G_FILTER_OUTPUT_STREAM (seekable
)->base_stream
;
599 return G_IS_SEEKABLE (base_stream
) && g_seekable_can_truncate (G_SEEKABLE (base_stream
));
603 g_buffered_output_stream_truncate (GSeekable
*seekable
,
605 GCancellable
*cancellable
,
608 GBufferedOutputStream
*bstream
;
609 GOutputStream
*base_stream
;
610 GSeekable
*base_stream_seekable
;
613 bstream
= G_BUFFERED_OUTPUT_STREAM (seekable
);
614 base_stream
= G_FILTER_OUTPUT_STREAM (seekable
)->base_stream
;
615 if (!G_IS_SEEKABLE (base_stream
))
617 g_set_error_literal (error
, G_IO_ERROR
, G_IO_ERROR_NOT_SUPPORTED
,
618 _("Truncate not supported on base stream"));
622 base_stream_seekable
= G_SEEKABLE (base_stream
);
624 flushed
= flush_buffer (bstream
, cancellable
, error
);
627 return g_seekable_truncate (base_stream_seekable
, offset
, cancellable
, error
);
630 /* ************************** */
631 /* Async stuff implementation */
632 /* ************************** */
634 /* TODO: This should be using the base class async ops, not threads */
638 guint flush_stream
: 1;
639 guint close_stream
: 1;
644 free_flush_data (gpointer data
)
646 g_slice_free (FlushData
, data
);
649 /* This function is used by all three (i.e.
650 * _write, _flush, _close) functions since
651 * all of them will need to flush the buffer
652 * and so closing and writing is just a special
653 * case of flushing + some addition stuff */
655 flush_buffer_thread (GTask
*task
,
658 GCancellable
*cancellable
)
660 GBufferedOutputStream
*stream
;
661 GOutputStream
*base_stream
;
664 GError
*error
= NULL
;
666 stream
= G_BUFFERED_OUTPUT_STREAM (object
);
668 base_stream
= G_FILTER_OUTPUT_STREAM (stream
)->base_stream
;
670 res
= flush_buffer (stream
, cancellable
, &error
);
672 /* if flushing the buffer didn't work don't even bother
673 * to flush the stream but just report that error */
674 if (res
&& fdata
->flush_stream
)
675 res
= g_output_stream_flush (base_stream
, cancellable
, &error
);
677 if (fdata
->close_stream
)
680 /* if flushing the buffer or the stream returned
681 * an error report that first error but still try
682 * close the stream */
683 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream
)))
686 g_output_stream_close (base_stream
, cancellable
, NULL
);
688 res
= g_output_stream_close (base_stream
, cancellable
, &error
);
693 g_task_return_error (task
, error
);
695 g_task_return_boolean (task
, TRUE
);
699 g_buffered_output_stream_flush_async (GOutputStream
*stream
,
701 GCancellable
*cancellable
,
702 GAsyncReadyCallback callback
,
708 fdata
= g_slice_new (FlushData
);
709 fdata
->flush_stream
= TRUE
;
710 fdata
->close_stream
= FALSE
;
712 task
= g_task_new (stream
, cancellable
, callback
, data
);
713 g_task_set_task_data (task
, fdata
, free_flush_data
);
714 g_task_set_priority (task
, io_priority
);
716 g_task_run_in_thread (task
, flush_buffer_thread
);
717 g_object_unref (task
);
721 g_buffered_output_stream_flush_finish (GOutputStream
*stream
,
722 GAsyncResult
*result
,
725 g_return_val_if_fail (g_task_is_valid (result
, stream
), FALSE
);
727 return g_task_propagate_boolean (G_TASK (result
), error
);
731 g_buffered_output_stream_close_async (GOutputStream
*stream
,
733 GCancellable
*cancellable
,
734 GAsyncReadyCallback callback
,
740 fdata
= g_slice_new (FlushData
);
741 fdata
->close_stream
= TRUE
;
743 task
= g_task_new (stream
, cancellable
, callback
, data
);
744 g_task_set_task_data (task
, fdata
, free_flush_data
);
745 g_task_set_priority (task
, io_priority
);
747 g_task_run_in_thread (task
, flush_buffer_thread
);
748 g_object_unref (task
);
752 g_buffered_output_stream_close_finish (GOutputStream
*stream
,
753 GAsyncResult
*result
,
756 g_return_val_if_fail (g_task_is_valid (result
, stream
), FALSE
);
758 return g_task_propagate_boolean (G_TASK (result
), error
);