1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
18 * Author: Christian Kellner <gicmo@gnome.org>
22 #include "gbufferedoutputstream.h"
23 #include "goutputstream.h"
24 #include "gseekable.h"
31 * SECTION:gbufferedoutputstream
32 * @short_description: Buffered Output Stream
34 * @see_also: #GFilterOutputStream, #GOutputStream
36 * Buffered output stream implements #GFilterOutputStream and provides
37 * for buffered writes.
39 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
41 * To create a buffered output stream, use g_buffered_output_stream_new(),
42 * or g_buffered_output_stream_new_sized() to specify the buffer's size
45 * To get the size of a buffer within a buffered input stream, use
46 * g_buffered_output_stream_get_buffer_size(). To change the size of a
47 * buffered output stream's buffer, use
48 * g_buffered_output_stream_set_buffer_size(). Note that the buffer's
49 * size cannot be reduced below the size of the data within the buffer.
52 #define DEFAULT_BUFFER_SIZE 4096
54 struct _GBufferedOutputStreamPrivate
{
67 static void g_buffered_output_stream_set_property (GObject
*object
,
72 static void g_buffered_output_stream_get_property (GObject
*object
,
76 static void g_buffered_output_stream_finalize (GObject
*object
);
79 static gssize
g_buffered_output_stream_write (GOutputStream
*stream
,
82 GCancellable
*cancellable
,
84 static gboolean
g_buffered_output_stream_flush (GOutputStream
*stream
,
85 GCancellable
*cancellable
,
87 static gboolean
g_buffered_output_stream_close (GOutputStream
*stream
,
88 GCancellable
*cancellable
,
91 static void g_buffered_output_stream_flush_async (GOutputStream
*stream
,
93 GCancellable
*cancellable
,
94 GAsyncReadyCallback callback
,
96 static gboolean
g_buffered_output_stream_flush_finish (GOutputStream
*stream
,
99 static void g_buffered_output_stream_close_async (GOutputStream
*stream
,
101 GCancellable
*cancellable
,
102 GAsyncReadyCallback callback
,
104 static gboolean
g_buffered_output_stream_close_finish (GOutputStream
*stream
,
105 GAsyncResult
*result
,
108 static void g_buffered_output_stream_seekable_iface_init (GSeekableIface
*iface
);
109 static goffset
g_buffered_output_stream_tell (GSeekable
*seekable
);
110 static gboolean
g_buffered_output_stream_can_seek (GSeekable
*seekable
);
111 static gboolean
g_buffered_output_stream_seek (GSeekable
*seekable
,
114 GCancellable
*cancellable
,
116 static gboolean
g_buffered_output_stream_can_truncate (GSeekable
*seekable
);
117 static gboolean
g_buffered_output_stream_truncate (GSeekable
*seekable
,
119 GCancellable
*cancellable
,
122 G_DEFINE_TYPE_WITH_CODE (GBufferedOutputStream
,
123 g_buffered_output_stream
,
124 G_TYPE_FILTER_OUTPUT_STREAM
,
125 G_ADD_PRIVATE (GBufferedOutputStream
)
126 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE
,
127 g_buffered_output_stream_seekable_iface_init
))
131 g_buffered_output_stream_class_init (GBufferedOutputStreamClass
*klass
)
133 GObjectClass
*object_class
;
134 GOutputStreamClass
*ostream_class
;
136 object_class
= G_OBJECT_CLASS (klass
);
137 object_class
->get_property
= g_buffered_output_stream_get_property
;
138 object_class
->set_property
= g_buffered_output_stream_set_property
;
139 object_class
->finalize
= g_buffered_output_stream_finalize
;
141 ostream_class
= G_OUTPUT_STREAM_CLASS (klass
);
142 ostream_class
->write_fn
= g_buffered_output_stream_write
;
143 ostream_class
->flush
= g_buffered_output_stream_flush
;
144 ostream_class
->close_fn
= g_buffered_output_stream_close
;
145 ostream_class
->flush_async
= g_buffered_output_stream_flush_async
;
146 ostream_class
->flush_finish
= g_buffered_output_stream_flush_finish
;
147 ostream_class
->close_async
= g_buffered_output_stream_close_async
;
148 ostream_class
->close_finish
= g_buffered_output_stream_close_finish
;
150 g_object_class_install_property (object_class
,
152 g_param_spec_uint ("buffer-size",
154 P_("The size of the backend buffer"),
158 G_PARAM_READWRITE
|G_PARAM_CONSTRUCT
|
159 G_PARAM_STATIC_NAME
|G_PARAM_STATIC_NICK
|G_PARAM_STATIC_BLURB
));
161 g_object_class_install_property (object_class
,
163 g_param_spec_boolean ("auto-grow",
165 P_("Whether the buffer should automatically grow"),
168 G_PARAM_STATIC_NAME
|G_PARAM_STATIC_NICK
|G_PARAM_STATIC_BLURB
));
173 * g_buffered_output_stream_get_buffer_size:
174 * @stream: a #GBufferedOutputStream.
176 * Gets the size of the buffer in the @stream.
178 * Returns: the current size of the buffer.
181 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream
*stream
)
183 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
), -1);
185 return stream
->priv
->len
;
189 * g_buffered_output_stream_set_buffer_size:
190 * @stream: a #GBufferedOutputStream.
193 * Sets the size of the internal buffer to @size.
196 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream
*stream
,
199 GBufferedOutputStreamPrivate
*priv
;
202 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
));
206 if (size
== priv
->len
)
211 size
= MAX (size
, priv
->pos
);
213 buffer
= g_malloc (size
);
214 memcpy (buffer
, priv
->buffer
, priv
->pos
);
215 g_free (priv
->buffer
);
216 priv
->buffer
= buffer
;
222 priv
->buffer
= g_malloc (size
);
227 g_object_notify (G_OBJECT (stream
), "buffer-size");
231 * g_buffered_output_stream_get_auto_grow:
232 * @stream: a #GBufferedOutputStream.
234 * Checks if the buffer automatically grows as data is added.
236 * Returns: %TRUE if the @stream's buffer automatically grows,
240 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream
*stream
)
242 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
), FALSE
);
244 return stream
->priv
->auto_grow
;
248 * g_buffered_output_stream_set_auto_grow:
249 * @stream: a #GBufferedOutputStream.
250 * @auto_grow: a #gboolean.
252 * Sets whether or not the @stream's buffer should automatically grow.
253 * If @auto_grow is true, then each write will just make the buffer
254 * larger, and you must manually flush the buffer to actually write out
255 * the data to the underlying stream.
258 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream
*stream
,
261 GBufferedOutputStreamPrivate
*priv
;
262 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream
));
264 auto_grow
= auto_grow
!= FALSE
;
265 if (priv
->auto_grow
!= auto_grow
)
267 priv
->auto_grow
= auto_grow
;
268 g_object_notify (G_OBJECT (stream
), "auto-grow");
273 g_buffered_output_stream_set_property (GObject
*object
,
278 GBufferedOutputStream
*stream
;
280 stream
= G_BUFFERED_OUTPUT_STREAM (object
);
285 g_buffered_output_stream_set_buffer_size (stream
, g_value_get_uint (value
));
289 g_buffered_output_stream_set_auto_grow (stream
, g_value_get_boolean (value
));
293 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
300 g_buffered_output_stream_get_property (GObject
*object
,
305 GBufferedOutputStream
*buffered_stream
;
306 GBufferedOutputStreamPrivate
*priv
;
308 buffered_stream
= G_BUFFERED_OUTPUT_STREAM (object
);
309 priv
= buffered_stream
->priv
;
314 g_value_set_uint (value
, priv
->len
);
318 g_value_set_boolean (value
, priv
->auto_grow
);
322 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
329 g_buffered_output_stream_finalize (GObject
*object
)
331 GBufferedOutputStream
*stream
;
332 GBufferedOutputStreamPrivate
*priv
;
334 stream
= G_BUFFERED_OUTPUT_STREAM (object
);
337 g_free (priv
->buffer
);
339 G_OBJECT_CLASS (g_buffered_output_stream_parent_class
)->finalize (object
);
343 g_buffered_output_stream_init (GBufferedOutputStream
*stream
)
345 stream
->priv
= g_buffered_output_stream_get_instance_private (stream
);
349 g_buffered_output_stream_seekable_iface_init (GSeekableIface
*iface
)
351 iface
->tell
= g_buffered_output_stream_tell
;
352 iface
->can_seek
= g_buffered_output_stream_can_seek
;
353 iface
->seek
= g_buffered_output_stream_seek
;
354 iface
->can_truncate
= g_buffered_output_stream_can_truncate
;
355 iface
->truncate_fn
= g_buffered_output_stream_truncate
;
359 * g_buffered_output_stream_new:
360 * @base_stream: a #GOutputStream.
362 * Creates a new buffered output stream for a base stream.
364 * Returns: a #GOutputStream for the given @base_stream.
367 g_buffered_output_stream_new (GOutputStream
*base_stream
)
369 GOutputStream
*stream
;
371 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream
), NULL
);
373 stream
= g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM
,
374 "base-stream", base_stream
,
381 * g_buffered_output_stream_new_sized:
382 * @base_stream: a #GOutputStream.
385 * Creates a new buffered output stream with a given buffer size.
387 * Returns: a #GOutputStream with an internal buffer set to @size.
390 g_buffered_output_stream_new_sized (GOutputStream
*base_stream
,
393 GOutputStream
*stream
;
395 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream
), NULL
);
397 stream
= g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM
,
398 "base-stream", base_stream
,
406 flush_buffer (GBufferedOutputStream
*stream
,
407 GCancellable
*cancellable
,
410 GBufferedOutputStreamPrivate
*priv
;
411 GOutputStream
*base_stream
;
418 base_stream
= G_FILTER_OUTPUT_STREAM (stream
)->base_stream
;
420 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream
), FALSE
);
422 res
= g_output_stream_write_all (base_stream
,
429 count
= priv
->pos
- bytes_written
;
432 memmove (priv
->buffer
, priv
->buffer
+ bytes_written
, count
);
434 priv
->pos
-= bytes_written
;
440 g_buffered_output_stream_write (GOutputStream
*stream
,
443 GCancellable
*cancellable
,
446 GBufferedOutputStream
*bstream
;
447 GBufferedOutputStreamPrivate
*priv
;
452 bstream
= G_BUFFERED_OUTPUT_STREAM (stream
);
453 priv
= bstream
->priv
;
455 n
= priv
->len
- priv
->pos
;
457 if (priv
->auto_grow
&& n
< count
)
459 new_size
= MAX (priv
->len
* 2, priv
->len
+ count
);
460 g_buffered_output_stream_set_buffer_size (bstream
, new_size
);
464 res
= flush_buffer (bstream
, cancellable
, error
);
470 n
= priv
->len
- priv
->pos
;
472 count
= MIN (count
, n
);
473 memcpy (priv
->buffer
+ priv
->pos
, buffer
, count
);
480 g_buffered_output_stream_flush (GOutputStream
*stream
,
481 GCancellable
*cancellable
,
484 GBufferedOutputStream
*bstream
;
485 GOutputStream
*base_stream
;
488 bstream
= G_BUFFERED_OUTPUT_STREAM (stream
);
489 base_stream
= G_FILTER_OUTPUT_STREAM (stream
)->base_stream
;
491 res
= flush_buffer (bstream
, cancellable
, error
);
496 res
= g_output_stream_flush (base_stream
, cancellable
, error
);
502 g_buffered_output_stream_close (GOutputStream
*stream
,
503 GCancellable
*cancellable
,
506 GBufferedOutputStream
*bstream
;
507 GOutputStream
*base_stream
;
510 bstream
= G_BUFFERED_OUTPUT_STREAM (stream
);
511 base_stream
= G_FILTER_OUTPUT_STREAM (bstream
)->base_stream
;
512 res
= flush_buffer (bstream
, cancellable
, error
);
514 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream
)))
516 /* report the first error but still close the stream */
518 res
= g_output_stream_close (base_stream
, cancellable
, error
);
520 g_output_stream_close (base_stream
, cancellable
, NULL
);
527 g_buffered_output_stream_tell (GSeekable
*seekable
)
529 GBufferedOutputStream
*bstream
;
530 GBufferedOutputStreamPrivate
*priv
;
531 GOutputStream
*base_stream
;
532 GSeekable
*base_stream_seekable
;
535 bstream
= G_BUFFERED_OUTPUT_STREAM (seekable
);
536 priv
= bstream
->priv
;
538 base_stream
= G_FILTER_OUTPUT_STREAM (seekable
)->base_stream
;
539 if (!G_IS_SEEKABLE (base_stream
))
542 base_stream_seekable
= G_SEEKABLE (base_stream
);
544 base_offset
= g_seekable_tell (base_stream_seekable
);
545 return base_offset
+ priv
->pos
;
549 g_buffered_output_stream_can_seek (GSeekable
*seekable
)
551 GOutputStream
*base_stream
;
553 base_stream
= G_FILTER_OUTPUT_STREAM (seekable
)->base_stream
;
554 return G_IS_SEEKABLE (base_stream
) && g_seekable_can_seek (G_SEEKABLE (base_stream
));
558 g_buffered_output_stream_seek (GSeekable
*seekable
,
561 GCancellable
*cancellable
,
564 GBufferedOutputStream
*bstream
;
565 GOutputStream
*base_stream
;
566 GSeekable
*base_stream_seekable
;
569 bstream
= G_BUFFERED_OUTPUT_STREAM (seekable
);
571 base_stream
= G_FILTER_OUTPUT_STREAM (seekable
)->base_stream
;
572 if (!G_IS_SEEKABLE (base_stream
))
574 g_set_error_literal (error
, G_IO_ERROR
, G_IO_ERROR_NOT_SUPPORTED
,
575 _("Seek not supported on base stream"));
579 base_stream_seekable
= G_SEEKABLE (base_stream
);
580 flushed
= flush_buffer (bstream
, cancellable
, error
);
584 return g_seekable_seek (base_stream_seekable
, offset
, type
, cancellable
, error
);
588 g_buffered_output_stream_can_truncate (GSeekable
*seekable
)
590 GOutputStream
*base_stream
;
592 base_stream
= G_FILTER_OUTPUT_STREAM (seekable
)->base_stream
;
593 return G_IS_SEEKABLE (base_stream
) && g_seekable_can_truncate (G_SEEKABLE (base_stream
));
597 g_buffered_output_stream_truncate (GSeekable
*seekable
,
599 GCancellable
*cancellable
,
602 GBufferedOutputStream
*bstream
;
603 GOutputStream
*base_stream
;
604 GSeekable
*base_stream_seekable
;
607 bstream
= G_BUFFERED_OUTPUT_STREAM (seekable
);
608 base_stream
= G_FILTER_OUTPUT_STREAM (seekable
)->base_stream
;
609 if (!G_IS_SEEKABLE (base_stream
))
611 g_set_error_literal (error
, G_IO_ERROR
, G_IO_ERROR_NOT_SUPPORTED
,
612 _("Truncate not supported on base stream"));
616 base_stream_seekable
= G_SEEKABLE (base_stream
);
618 flushed
= flush_buffer (bstream
, cancellable
, error
);
621 return g_seekable_truncate (base_stream_seekable
, offset
, cancellable
, error
);
624 /* ************************** */
625 /* Async stuff implementation */
626 /* ************************** */
628 /* TODO: This should be using the base class async ops, not threads */
632 guint flush_stream
: 1;
633 guint close_stream
: 1;
638 free_flush_data (gpointer data
)
640 g_slice_free (FlushData
, data
);
643 /* This function is used by all three (i.e.
644 * _write, _flush, _close) functions since
645 * all of them will need to flush the buffer
646 * and so closing and writing is just a special
647 * case of flushing + some addition stuff */
649 flush_buffer_thread (GTask
*task
,
652 GCancellable
*cancellable
)
654 GBufferedOutputStream
*stream
;
655 GOutputStream
*base_stream
;
658 GError
*error
= NULL
;
660 stream
= G_BUFFERED_OUTPUT_STREAM (object
);
662 base_stream
= G_FILTER_OUTPUT_STREAM (stream
)->base_stream
;
664 res
= flush_buffer (stream
, cancellable
, &error
);
666 /* if flushing the buffer didn't work don't even bother
667 * to flush the stream but just report that error */
668 if (res
&& fdata
->flush_stream
)
669 res
= g_output_stream_flush (base_stream
, cancellable
, &error
);
671 if (fdata
->close_stream
)
674 /* if flushing the buffer or the stream returned
675 * an error report that first error but still try
676 * close the stream */
677 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream
)))
680 g_output_stream_close (base_stream
, cancellable
, NULL
);
682 res
= g_output_stream_close (base_stream
, cancellable
, &error
);
687 g_task_return_error (task
, error
);
689 g_task_return_boolean (task
, TRUE
);
693 g_buffered_output_stream_flush_async (GOutputStream
*stream
,
695 GCancellable
*cancellable
,
696 GAsyncReadyCallback callback
,
702 fdata
= g_slice_new0 (FlushData
);
703 fdata
->flush_stream
= TRUE
;
704 fdata
->close_stream
= FALSE
;
706 task
= g_task_new (stream
, cancellable
, callback
, data
);
707 g_task_set_source_tag (task
, g_buffered_output_stream_flush_async
);
708 g_task_set_task_data (task
, fdata
, free_flush_data
);
709 g_task_set_priority (task
, io_priority
);
711 g_task_run_in_thread (task
, flush_buffer_thread
);
712 g_object_unref (task
);
716 g_buffered_output_stream_flush_finish (GOutputStream
*stream
,
717 GAsyncResult
*result
,
720 g_return_val_if_fail (g_task_is_valid (result
, stream
), FALSE
);
722 return g_task_propagate_boolean (G_TASK (result
), error
);
726 g_buffered_output_stream_close_async (GOutputStream
*stream
,
728 GCancellable
*cancellable
,
729 GAsyncReadyCallback callback
,
735 fdata
= g_slice_new0 (FlushData
);
736 fdata
->close_stream
= TRUE
;
738 task
= g_task_new (stream
, cancellable
, callback
, data
);
739 g_task_set_source_tag (task
, g_buffered_output_stream_close_async
);
740 g_task_set_task_data (task
, fdata
, free_flush_data
);
741 g_task_set_priority (task
, io_priority
);
743 g_task_run_in_thread (task
, flush_buffer_thread
);
744 g_object_unref (task
);
748 g_buffered_output_stream_close_finish (GOutputStream
*stream
,
749 GAsyncResult
*result
,
752 g_return_val_if_fail (g_task_is_valid (result
, stream
), FALSE
);
754 return g_task_propagate_boolean (G_TASK (result
), error
);