1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 * Copyright (C) 2007 Jürg Billeter
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General
17 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
19 * Author: Christian Kellner <gicmo@gnome.org>
23 #include "gbufferedinputstream.h"
24 #include "ginputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
28 #include "gseekable.h"
35 * SECTION:gbufferedinputstream
36 * @short_description: Buffered Input Stream
38 * @see_also: #GFilterInputStream, #GInputStream
40 * Buffered input stream implements #GFilterInputStream and provides
43 * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
45 * To create a buffered input stream, use g_buffered_input_stream_new(),
46 * or g_buffered_input_stream_new_sized() to specify the buffer's size at
49 * To get the size of a buffer within a buffered input stream, use
50 * g_buffered_input_stream_get_buffer_size(). To change the size of a
51 * buffered input stream's buffer, use
52 * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
53 * cannot be reduced below the size of the data within the buffer.
57 #define DEFAULT_BUFFER_SIZE 4096
59 struct _GBufferedInputStreamPrivate
{
64 GAsyncReadyCallback outstanding_callback
;
72 static void g_buffered_input_stream_set_property (GObject
*object
,
77 static void g_buffered_input_stream_get_property (GObject
*object
,
81 static void g_buffered_input_stream_finalize (GObject
*object
);
84 static gssize
g_buffered_input_stream_skip (GInputStream
*stream
,
86 GCancellable
*cancellable
,
88 static void g_buffered_input_stream_skip_async (GInputStream
*stream
,
91 GCancellable
*cancellable
,
92 GAsyncReadyCallback callback
,
94 static gssize
g_buffered_input_stream_skip_finish (GInputStream
*stream
,
97 static gssize
g_buffered_input_stream_read (GInputStream
*stream
,
100 GCancellable
*cancellable
,
102 static gssize
g_buffered_input_stream_real_fill (GBufferedInputStream
*stream
,
104 GCancellable
*cancellable
,
106 static void g_buffered_input_stream_real_fill_async (GBufferedInputStream
*stream
,
109 GCancellable
*cancellable
,
110 GAsyncReadyCallback callback
,
112 static gssize
g_buffered_input_stream_real_fill_finish (GBufferedInputStream
*stream
,
113 GAsyncResult
*result
,
116 static void g_buffered_input_stream_seekable_iface_init (GSeekableIface
*iface
);
117 static goffset
g_buffered_input_stream_tell (GSeekable
*seekable
);
118 static gboolean
g_buffered_input_stream_can_seek (GSeekable
*seekable
);
119 static gboolean
g_buffered_input_stream_seek (GSeekable
*seekable
,
122 GCancellable
*cancellable
,
124 static gboolean
g_buffered_input_stream_can_truncate (GSeekable
*seekable
);
125 static gboolean
g_buffered_input_stream_truncate (GSeekable
*seekable
,
127 GCancellable
*cancellable
,
130 static void compact_buffer (GBufferedInputStream
*stream
);
132 G_DEFINE_TYPE_WITH_CODE (GBufferedInputStream
,
133 g_buffered_input_stream
,
134 G_TYPE_FILTER_INPUT_STREAM
,
135 G_ADD_PRIVATE (GBufferedInputStream
)
136 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE
,
137 g_buffered_input_stream_seekable_iface_init
))
140 g_buffered_input_stream_class_init (GBufferedInputStreamClass
*klass
)
142 GObjectClass
*object_class
;
143 GInputStreamClass
*istream_class
;
144 GBufferedInputStreamClass
*bstream_class
;
146 object_class
= G_OBJECT_CLASS (klass
);
147 object_class
->get_property
= g_buffered_input_stream_get_property
;
148 object_class
->set_property
= g_buffered_input_stream_set_property
;
149 object_class
->finalize
= g_buffered_input_stream_finalize
;
151 istream_class
= G_INPUT_STREAM_CLASS (klass
);
152 istream_class
->skip
= g_buffered_input_stream_skip
;
153 istream_class
->skip_async
= g_buffered_input_stream_skip_async
;
154 istream_class
->skip_finish
= g_buffered_input_stream_skip_finish
;
155 istream_class
->read_fn
= g_buffered_input_stream_read
;
157 bstream_class
= G_BUFFERED_INPUT_STREAM_CLASS (klass
);
158 bstream_class
->fill
= g_buffered_input_stream_real_fill
;
159 bstream_class
->fill_async
= g_buffered_input_stream_real_fill_async
;
160 bstream_class
->fill_finish
= g_buffered_input_stream_real_fill_finish
;
162 g_object_class_install_property (object_class
,
164 g_param_spec_uint ("buffer-size",
166 P_("The size of the backend buffer"),
170 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT
|
171 G_PARAM_STATIC_NAME
|G_PARAM_STATIC_NICK
|G_PARAM_STATIC_BLURB
));
177 * g_buffered_input_stream_get_buffer_size:
178 * @stream: a #GBufferedInputStream
180 * Gets the size of the input buffer.
182 * Returns: the current buffer size.
185 g_buffered_input_stream_get_buffer_size (GBufferedInputStream
*stream
)
187 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream
), 0);
189 return stream
->priv
->len
;
193 * g_buffered_input_stream_set_buffer_size:
194 * @stream: a #GBufferedInputStream
197 * Sets the size of the internal buffer of @stream to @size, or to the
198 * size of the contents of the buffer. The buffer can never be resized
199 * smaller than its current contents.
202 g_buffered_input_stream_set_buffer_size (GBufferedInputStream
*stream
,
205 GBufferedInputStreamPrivate
*priv
;
209 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream
));
213 if (priv
->len
== size
)
218 in_buffer
= priv
->end
- priv
->pos
;
220 /* Never resize smaller than current buffer contents */
221 size
= MAX (size
, in_buffer
);
223 buffer
= g_malloc (size
);
224 memcpy (buffer
, priv
->buffer
+ priv
->pos
, in_buffer
);
227 priv
->end
= in_buffer
;
228 g_free (priv
->buffer
);
229 priv
->buffer
= buffer
;
236 priv
->buffer
= g_malloc (size
);
239 g_object_notify (G_OBJECT (stream
), "buffer-size");
243 g_buffered_input_stream_set_property (GObject
*object
,
248 GBufferedInputStream
*bstream
;
250 bstream
= G_BUFFERED_INPUT_STREAM (object
);
255 g_buffered_input_stream_set_buffer_size (bstream
, g_value_get_uint (value
));
259 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
265 g_buffered_input_stream_get_property (GObject
*object
,
270 GBufferedInputStreamPrivate
*priv
;
271 GBufferedInputStream
*bstream
;
273 bstream
= G_BUFFERED_INPUT_STREAM (object
);
274 priv
= bstream
->priv
;
279 g_value_set_uint (value
, priv
->len
);
283 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
289 g_buffered_input_stream_finalize (GObject
*object
)
291 GBufferedInputStreamPrivate
*priv
;
292 GBufferedInputStream
*stream
;
294 stream
= G_BUFFERED_INPUT_STREAM (object
);
297 g_free (priv
->buffer
);
299 G_OBJECT_CLASS (g_buffered_input_stream_parent_class
)->finalize (object
);
303 g_buffered_input_stream_seekable_iface_init (GSeekableIface
*iface
)
305 iface
->tell
= g_buffered_input_stream_tell
;
306 iface
->can_seek
= g_buffered_input_stream_can_seek
;
307 iface
->seek
= g_buffered_input_stream_seek
;
308 iface
->can_truncate
= g_buffered_input_stream_can_truncate
;
309 iface
->truncate_fn
= g_buffered_input_stream_truncate
;
313 g_buffered_input_stream_init (GBufferedInputStream
*stream
)
315 stream
->priv
= g_buffered_input_stream_get_instance_private (stream
);
320 * g_buffered_input_stream_new:
321 * @base_stream: a #GInputStream
323 * Creates a new #GInputStream from the given @base_stream, with
324 * a buffer set to the default size (4 kilobytes).
326 * Returns: a #GInputStream for the given @base_stream.
329 g_buffered_input_stream_new (GInputStream
*base_stream
)
331 GInputStream
*stream
;
333 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream
), NULL
);
335 stream
= g_object_new (G_TYPE_BUFFERED_INPUT_STREAM
,
336 "base-stream", base_stream
,
343 * g_buffered_input_stream_new_sized:
344 * @base_stream: a #GInputStream
347 * Creates a new #GBufferedInputStream from the given @base_stream,
348 * with a buffer set to @size.
350 * Returns: a #GInputStream.
353 g_buffered_input_stream_new_sized (GInputStream
*base_stream
,
356 GInputStream
*stream
;
358 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream
), NULL
);
360 stream
= g_object_new (G_TYPE_BUFFERED_INPUT_STREAM
,
361 "base-stream", base_stream
,
362 "buffer-size", (guint
)size
,
369 * g_buffered_input_stream_fill:
370 * @stream: a #GBufferedInputStream
371 * @count: the number of bytes that will be read from the stream
372 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
373 * @error: location to store the error occurring, or %NULL to ignore
375 * Tries to read @count bytes from the stream into the buffer.
376 * Will block during this read.
378 * If @count is zero, returns zero and does nothing. A value of @count
379 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
381 * On success, the number of bytes read into the buffer is returned.
382 * It is not an error if this is not the same as the requested size, as it
383 * can happen e.g. near the end of a file. Zero is returned on end of file
384 * (or if @count is zero), but never otherwise.
386 * If @count is -1 then the attempted read size is equal to the number of
387 * bytes that are required to fill the buffer.
389 * If @cancellable is not %NULL, then the operation can be cancelled by
390 * triggering the cancellable object from another thread. If the operation
391 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
392 * operation was partially finished when the operation was cancelled the
393 * partial result will be returned, without an error.
395 * On error -1 is returned and @error is set accordingly.
397 * For the asynchronous, non-blocking, version of this function, see
398 * g_buffered_input_stream_fill_async().
400 * Returns: the number of bytes read into @stream's buffer, up to @count,
404 g_buffered_input_stream_fill (GBufferedInputStream
*stream
,
406 GCancellable
*cancellable
,
409 GBufferedInputStreamClass
*class;
410 GInputStream
*input_stream
;
413 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream
), -1);
415 input_stream
= G_INPUT_STREAM (stream
);
419 g_set_error (error
, G_IO_ERROR
, G_IO_ERROR_INVALID_ARGUMENT
,
420 _("Too large count value passed to %s"), G_STRFUNC
);
424 if (!g_input_stream_set_pending (input_stream
, error
))
428 g_cancellable_push_current (cancellable
);
430 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream
);
431 res
= class->fill (stream
, count
, cancellable
, error
);
434 g_cancellable_pop_current (cancellable
);
436 g_input_stream_clear_pending (input_stream
);
442 async_fill_callback_wrapper (GObject
*source_object
,
446 GBufferedInputStream
*stream
= G_BUFFERED_INPUT_STREAM (source_object
);
448 g_input_stream_clear_pending (G_INPUT_STREAM (stream
));
449 (*stream
->priv
->outstanding_callback
) (source_object
, res
, user_data
);
450 g_object_unref (stream
);
454 * g_buffered_input_stream_fill_async:
455 * @stream: a #GBufferedInputStream
456 * @count: the number of bytes that will be read from the stream
457 * @io_priority: the [I/O priority][io-priority] of the request
458 * @cancellable: (nullable): optional #GCancellable object
459 * @callback: (scope async): a #GAsyncReadyCallback
460 * @user_data: (closure): a #gpointer
462 * Reads data into @stream's buffer asynchronously, up to @count size.
463 * @io_priority can be used to prioritize reads. For the synchronous
464 * version of this function, see g_buffered_input_stream_fill().
466 * If @count is -1 then the attempted read size is equal to the number
467 * of bytes that are required to fill the buffer.
470 g_buffered_input_stream_fill_async (GBufferedInputStream
*stream
,
473 GCancellable
*cancellable
,
474 GAsyncReadyCallback callback
,
477 GBufferedInputStreamClass
*class;
478 GError
*error
= NULL
;
480 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream
));
486 task
= g_task_new (stream
, cancellable
, callback
, user_data
);
487 g_task_set_source_tag (task
, g_buffered_input_stream_fill_async
);
488 g_task_return_int (task
, 0);
489 g_object_unref (task
);
495 g_task_report_new_error (stream
, callback
, user_data
,
496 g_buffered_input_stream_fill_async
,
497 G_IO_ERROR
, G_IO_ERROR_INVALID_ARGUMENT
,
498 _("Too large count value passed to %s"),
503 if (!g_input_stream_set_pending (G_INPUT_STREAM (stream
), &error
))
505 g_task_report_error (stream
, callback
, user_data
,
506 g_buffered_input_stream_fill_async
,
511 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream
);
513 stream
->priv
->outstanding_callback
= callback
;
514 g_object_ref (stream
);
515 class->fill_async (stream
, count
, io_priority
, cancellable
,
516 async_fill_callback_wrapper
, user_data
);
520 * g_buffered_input_stream_fill_finish:
521 * @stream: a #GBufferedInputStream
522 * @result: a #GAsyncResult
525 * Finishes an asynchronous read.
527 * Returns: a #gssize of the read stream, or %-1 on an error.
530 g_buffered_input_stream_fill_finish (GBufferedInputStream
*stream
,
531 GAsyncResult
*result
,
534 GBufferedInputStreamClass
*class;
536 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream
), -1);
537 g_return_val_if_fail (G_IS_ASYNC_RESULT (result
), -1);
539 if (g_async_result_legacy_propagate_error (result
, error
))
541 else if (g_async_result_is_tagged (result
, g_buffered_input_stream_fill_async
))
542 return g_task_propagate_int (G_TASK (result
), error
);
544 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream
);
545 return class->fill_finish (stream
, result
, error
);
549 * g_buffered_input_stream_get_available:
550 * @stream: #GBufferedInputStream
552 * Gets the size of the available data within the stream.
554 * Returns: size of the available stream.
557 g_buffered_input_stream_get_available (GBufferedInputStream
*stream
)
559 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream
), -1);
561 return stream
->priv
->end
- stream
->priv
->pos
;
565 * g_buffered_input_stream_peek:
566 * @stream: a #GBufferedInputStream
567 * @buffer: (array length=count) (element-type guint8): a pointer to
568 * an allocated chunk of memory
572 * Peeks in the buffer, copying data of size @count into @buffer,
573 * offset @offset bytes.
575 * Returns: a #gsize of the number of bytes peeked, or -1 on error.
578 g_buffered_input_stream_peek (GBufferedInputStream
*stream
,
586 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream
), -1);
587 g_return_val_if_fail (buffer
!= NULL
, -1);
589 available
= g_buffered_input_stream_get_available (stream
);
591 if (offset
> available
)
594 end
= MIN (offset
+ count
, available
);
595 count
= end
- offset
;
597 memcpy (buffer
, stream
->priv
->buffer
+ stream
->priv
->pos
+ offset
, count
);
602 * g_buffered_input_stream_peek_buffer:
603 * @stream: a #GBufferedInputStream
604 * @count: (out): a #gsize to get the number of bytes available in the buffer
606 * Returns the buffer with the currently available bytes. The returned
607 * buffer must not be modified and will become invalid when reading from
608 * the stream or filling the buffer.
610 * Returns: (array length=count) (element-type guint8) (transfer none):
614 g_buffered_input_stream_peek_buffer (GBufferedInputStream
*stream
,
617 GBufferedInputStreamPrivate
*priv
;
619 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream
), NULL
);
624 *count
= priv
->end
- priv
->pos
;
626 return priv
->buffer
+ priv
->pos
;
630 compact_buffer (GBufferedInputStream
*stream
)
632 GBufferedInputStreamPrivate
*priv
;
637 current_size
= priv
->end
- priv
->pos
;
639 memmove (priv
->buffer
, priv
->buffer
+ priv
->pos
, current_size
);
642 priv
->end
= current_size
;
646 g_buffered_input_stream_real_fill (GBufferedInputStream
*stream
,
648 GCancellable
*cancellable
,
651 GBufferedInputStreamPrivate
*priv
;
652 GInputStream
*base_stream
;
661 in_buffer
= priv
->end
- priv
->pos
;
663 /* Never fill more than can fit in the buffer */
664 count
= MIN (count
, priv
->len
- in_buffer
);
666 /* If requested length does not fit at end, compact */
667 if (priv
->len
- priv
->end
< count
)
668 compact_buffer (stream
);
670 base_stream
= G_FILTER_INPUT_STREAM (stream
)->base_stream
;
671 nread
= g_input_stream_read (base_stream
,
672 priv
->buffer
+ priv
->end
,
684 g_buffered_input_stream_skip (GInputStream
*stream
,
686 GCancellable
*cancellable
,
689 GBufferedInputStream
*bstream
;
690 GBufferedInputStreamPrivate
*priv
;
691 GBufferedInputStreamClass
*class;
692 GInputStream
*base_stream
;
693 gsize available
, bytes_skipped
;
696 bstream
= G_BUFFERED_INPUT_STREAM (stream
);
697 priv
= bstream
->priv
;
699 available
= priv
->end
- priv
->pos
;
701 if (count
<= available
)
707 /* Full request not available, skip all currently available and
708 * request refill for more
713 bytes_skipped
= available
;
716 if (bytes_skipped
> 0)
717 error
= NULL
; /* Ignore further errors if we already read some data */
719 if (count
> priv
->len
)
721 /* Large request, shortcut buffer */
723 base_stream
= G_FILTER_INPUT_STREAM (stream
)->base_stream
;
725 nread
= g_input_stream_skip (base_stream
,
730 if (nread
< 0 && bytes_skipped
== 0)
734 bytes_skipped
+= nread
;
736 return bytes_skipped
;
739 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream
);
740 nread
= class->fill (bstream
, priv
->len
, cancellable
, error
);
744 if (bytes_skipped
== 0)
747 return bytes_skipped
;
750 available
= priv
->end
- priv
->pos
;
751 count
= MIN (count
, available
);
753 bytes_skipped
+= count
;
756 return bytes_skipped
;
760 g_buffered_input_stream_read (GInputStream
*stream
,
763 GCancellable
*cancellable
,
766 GBufferedInputStream
*bstream
;
767 GBufferedInputStreamPrivate
*priv
;
768 GBufferedInputStreamClass
*class;
769 GInputStream
*base_stream
;
770 gsize available
, bytes_read
;
773 bstream
= G_BUFFERED_INPUT_STREAM (stream
);
774 priv
= bstream
->priv
;
776 available
= priv
->end
- priv
->pos
;
778 if (count
<= available
)
780 memcpy (buffer
, priv
->buffer
+ priv
->pos
, count
);
785 /* Full request not available, read all currently available and
786 * request refill for more
789 memcpy (buffer
, priv
->buffer
+ priv
->pos
, available
);
792 bytes_read
= available
;
796 error
= NULL
; /* Ignore further errors if we already read some data */
798 if (count
> priv
->len
)
800 /* Large request, shortcut buffer */
802 base_stream
= G_FILTER_INPUT_STREAM (stream
)->base_stream
;
804 nread
= g_input_stream_read (base_stream
,
805 (char *)buffer
+ bytes_read
,
810 if (nread
< 0 && bytes_read
== 0)
819 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream
);
820 nread
= class->fill (bstream
, priv
->len
, cancellable
, error
);
829 available
= priv
->end
- priv
->pos
;
830 count
= MIN (count
, available
);
832 memcpy ((char *)buffer
+ bytes_read
, (char *)priv
->buffer
+ priv
->pos
, count
);
840 g_buffered_input_stream_tell (GSeekable
*seekable
)
842 GBufferedInputStream
*bstream
;
843 GBufferedInputStreamPrivate
*priv
;
844 GInputStream
*base_stream
;
845 GSeekable
*base_stream_seekable
;
849 bstream
= G_BUFFERED_INPUT_STREAM (seekable
);
850 priv
= bstream
->priv
;
852 base_stream
= G_FILTER_INPUT_STREAM (seekable
)->base_stream
;
853 if (!G_IS_SEEKABLE (base_stream
))
855 base_stream_seekable
= G_SEEKABLE (base_stream
);
857 available
= priv
->end
- priv
->pos
;
858 base_offset
= g_seekable_tell (base_stream_seekable
);
860 return base_offset
- available
;
864 g_buffered_input_stream_can_seek (GSeekable
*seekable
)
866 GInputStream
*base_stream
;
868 base_stream
= G_FILTER_INPUT_STREAM (seekable
)->base_stream
;
869 return G_IS_SEEKABLE (base_stream
) && g_seekable_can_seek (G_SEEKABLE (base_stream
));
873 g_buffered_input_stream_seek (GSeekable
*seekable
,
876 GCancellable
*cancellable
,
879 GBufferedInputStream
*bstream
;
880 GBufferedInputStreamPrivate
*priv
;
881 GInputStream
*base_stream
;
882 GSeekable
*base_stream_seekable
;
884 bstream
= G_BUFFERED_INPUT_STREAM (seekable
);
885 priv
= bstream
->priv
;
887 base_stream
= G_FILTER_INPUT_STREAM (seekable
)->base_stream
;
888 if (!G_IS_SEEKABLE (base_stream
))
890 g_set_error_literal (error
, G_IO_ERROR
, G_IO_ERROR_NOT_SUPPORTED
,
891 _("Seek not supported on base stream"));
895 base_stream_seekable
= G_SEEKABLE (base_stream
);
897 if (type
== G_SEEK_CUR
)
899 if (offset
<= priv
->end
- priv
->pos
&& offset
>= -priv
->pos
)
906 offset
-= priv
->end
- priv
->pos
;
910 if (g_seekable_seek (base_stream_seekable
, offset
, type
, cancellable
, error
))
923 g_buffered_input_stream_can_truncate (GSeekable
*seekable
)
929 g_buffered_input_stream_truncate (GSeekable
*seekable
,
931 GCancellable
*cancellable
,
934 g_set_error_literal (error
,
936 G_IO_ERROR_NOT_SUPPORTED
,
937 _("Cannot truncate GBufferedInputStream"));
942 * g_buffered_input_stream_read_byte:
943 * @stream: a #GBufferedInputStream
944 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
945 * @error: location to store the error occurring, or %NULL to ignore
947 * Tries to read a single byte from the stream or the buffer. Will block
950 * On success, the byte read from the stream is returned. On end of stream
951 * -1 is returned but it's not an exceptional error and @error is not set.
953 * If @cancellable is not %NULL, then the operation can be cancelled by
954 * triggering the cancellable object from another thread. If the operation
955 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
956 * operation was partially finished when the operation was cancelled the
957 * partial result will be returned, without an error.
959 * On error -1 is returned and @error is set accordingly.
961 * Returns: the byte read from the @stream, or -1 on end of stream or error.
964 g_buffered_input_stream_read_byte (GBufferedInputStream
*stream
,
965 GCancellable
*cancellable
,
968 GBufferedInputStreamPrivate
*priv
;
969 GBufferedInputStreamClass
*class;
970 GInputStream
*input_stream
;
974 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream
), -1);
977 input_stream
= G_INPUT_STREAM (stream
);
979 if (g_input_stream_is_closed (input_stream
))
981 g_set_error_literal (error
, G_IO_ERROR
, G_IO_ERROR_CLOSED
,
982 _("Stream is already closed"));
986 if (!g_input_stream_set_pending (input_stream
, error
))
989 available
= priv
->end
- priv
->pos
;
993 g_input_stream_clear_pending (input_stream
);
994 return priv
->buffer
[priv
->pos
++];
997 /* Byte not available, request refill for more */
1000 g_cancellable_push_current (cancellable
);
1005 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream
);
1006 nread
= class->fill (stream
, priv
->len
, cancellable
, error
);
1009 g_cancellable_pop_current (cancellable
);
1011 g_input_stream_clear_pending (input_stream
);
1014 return -1; /* error or end of stream */
1016 return priv
->buffer
[priv
->pos
++];
1019 /* ************************** */
1020 /* Async stuff implementation */
1021 /* ************************** */
1024 fill_async_callback (GObject
*source_object
,
1025 GAsyncResult
*result
,
1030 GTask
*task
= user_data
;
1033 res
= g_input_stream_read_finish (G_INPUT_STREAM (source_object
),
1036 g_task_return_error (task
, error
);
1039 GBufferedInputStream
*stream
;
1040 GBufferedInputStreamPrivate
*priv
;
1042 stream
= g_task_get_source_object (task
);
1043 priv
= G_BUFFERED_INPUT_STREAM (stream
)->priv
;
1045 g_assert_cmpint (priv
->end
+ res
, <=, priv
->len
);
1048 g_task_return_int (task
, res
);
1051 g_object_unref (task
);
1055 g_buffered_input_stream_real_fill_async (GBufferedInputStream
*stream
,
1058 GCancellable
*cancellable
,
1059 GAsyncReadyCallback callback
,
1062 GBufferedInputStreamPrivate
*priv
;
1063 GInputStream
*base_stream
;
1067 priv
= stream
->priv
;
1072 in_buffer
= priv
->end
- priv
->pos
;
1074 /* Never fill more than can fit in the buffer */
1075 count
= MIN (count
, priv
->len
- in_buffer
);
1077 /* If requested length does not fit at end, compact */
1078 if (priv
->len
- priv
->end
< count
)
1079 compact_buffer (stream
);
1081 task
= g_task_new (stream
, cancellable
, callback
, user_data
);
1082 g_task_set_source_tag (task
, g_buffered_input_stream_real_fill_async
);
1084 base_stream
= G_FILTER_INPUT_STREAM (stream
)->base_stream
;
1085 g_input_stream_read_async (base_stream
,
1086 priv
->buffer
+ priv
->end
,
1090 fill_async_callback
,
1095 g_buffered_input_stream_real_fill_finish (GBufferedInputStream
*stream
,
1096 GAsyncResult
*result
,
1099 g_return_val_if_fail (g_task_is_valid (result
, stream
), -1);
1101 return g_task_propagate_int (G_TASK (result
), error
);
1106 gssize bytes_skipped
;
1111 free_skip_async_data (gpointer _data
)
1113 SkipAsyncData
*data
= _data
;
1114 g_slice_free (SkipAsyncData
, data
);
1118 large_skip_callback (GObject
*source_object
,
1119 GAsyncResult
*result
,
1122 GTask
*task
= G_TASK (user_data
);
1123 SkipAsyncData
*data
;
1127 data
= g_task_get_task_data (task
);
1130 nread
= g_input_stream_skip_finish (G_INPUT_STREAM (source_object
),
1133 /* Only report the error if we've not already read some data */
1134 if (nread
< 0 && data
->bytes_skipped
== 0)
1135 g_task_return_error (task
, error
);
1139 g_error_free (error
);
1142 data
->bytes_skipped
+= nread
;
1144 g_task_return_int (task
, data
->bytes_skipped
);
1147 g_object_unref (task
);
1151 skip_fill_buffer_callback (GObject
*source_object
,
1152 GAsyncResult
*result
,
1155 GTask
*task
= G_TASK (user_data
);
1156 GBufferedInputStream
*bstream
;
1157 GBufferedInputStreamPrivate
*priv
;
1158 SkipAsyncData
*data
;
1163 bstream
= G_BUFFERED_INPUT_STREAM (source_object
);
1164 priv
= bstream
->priv
;
1166 data
= g_task_get_task_data (task
);
1169 nread
= g_buffered_input_stream_fill_finish (bstream
,
1172 if (nread
< 0 && data
->bytes_skipped
== 0)
1173 g_task_return_error (task
, error
);
1177 g_error_free (error
);
1181 available
= priv
->end
- priv
->pos
;
1182 data
->count
= MIN (data
->count
, available
);
1184 data
->bytes_skipped
+= data
->count
;
1185 priv
->pos
+= data
->count
;
1188 g_task_return_int (task
, data
->bytes_skipped
);
1191 g_object_unref (task
);
1195 g_buffered_input_stream_skip_async (GInputStream
*stream
,
1198 GCancellable
*cancellable
,
1199 GAsyncReadyCallback callback
,
1202 GBufferedInputStream
*bstream
;
1203 GBufferedInputStreamPrivate
*priv
;
1204 GBufferedInputStreamClass
*class;
1205 GInputStream
*base_stream
;
1208 SkipAsyncData
*data
;
1210 bstream
= G_BUFFERED_INPUT_STREAM (stream
);
1211 priv
= bstream
->priv
;
1213 data
= g_slice_new (SkipAsyncData
);
1214 data
->bytes_skipped
= 0;
1215 task
= g_task_new (stream
, cancellable
, callback
, user_data
);
1216 g_task_set_source_tag (task
, g_buffered_input_stream_skip_async
);
1217 g_task_set_task_data (task
, data
, free_skip_async_data
);
1219 available
= priv
->end
- priv
->pos
;
1221 if (count
<= available
)
1225 g_task_return_int (task
, count
);
1226 g_object_unref (task
);
1230 /* Full request not available, skip all currently available
1231 * and request refill for more
1239 data
->bytes_skipped
= available
;
1240 data
->count
= count
;
1242 if (count
> priv
->len
)
1244 /* Large request, shortcut buffer */
1246 base_stream
= G_FILTER_INPUT_STREAM (stream
)->base_stream
;
1248 g_input_stream_skip_async (base_stream
,
1250 io_priority
, cancellable
,
1251 large_skip_callback
,
1256 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream
);
1257 class->fill_async (bstream
, priv
->len
, io_priority
, cancellable
,
1258 skip_fill_buffer_callback
, task
);
1263 g_buffered_input_stream_skip_finish (GInputStream
*stream
,
1264 GAsyncResult
*result
,
1267 g_return_val_if_fail (g_task_is_valid (result
, stream
), -1);
1269 return g_task_propagate_int (G_TASK (result
), error
);