2 * stream.c: svn_stream operations
4 * ====================================================================
5 * Copyright (c) 2000-2004, 2006 CollabNet. All rights reserved.
7 * This software is licensed as described in the file COPYING, which
8 * you should have received as part of this distribution. The terms
9 * are also available at http://subversion.tigris.org/license-1.html.
10 * If newer versions of this license are posted there, you may use a
11 * newer version instead, at your option.
13 * This software consists of voluntary contributions made by many
14 * individuals. For exact contribution history, see the revision
15 * history and logs, available at http://subversion.tigris.org/.
16 * ====================================================================
19 #include "svn_private_config.h"
25 #include <apr_pools.h>
26 #include <apr_strings.h>
27 #include <apr_file_io.h>
28 #include <apr_errno.h>
33 #include "svn_pools.h"
35 #include "svn_error.h"
36 #include "svn_string.h"
42 svn_read_fn_t read_fn
;
43 svn_write_fn_t write_fn
;
44 svn_close_fn_t close_fn
;
49 /*** Generic streams. ***/
52 svn_stream_create(void *baton
, apr_pool_t
*pool
)
56 stream
= apr_palloc(pool
, sizeof(*stream
));
57 stream
->baton
= baton
;
58 stream
->read_fn
= NULL
;
59 stream
->write_fn
= NULL
;
60 stream
->close_fn
= NULL
;
66 svn_stream_set_baton(svn_stream_t
*stream
, void *baton
)
68 stream
->baton
= baton
;
73 svn_stream_set_read(svn_stream_t
*stream
, svn_read_fn_t read_fn
)
75 stream
->read_fn
= read_fn
;
80 svn_stream_set_write(svn_stream_t
*stream
, svn_write_fn_t write_fn
)
82 stream
->write_fn
= write_fn
;
87 svn_stream_set_close(svn_stream_t
*stream
, svn_close_fn_t close_fn
)
89 stream
->close_fn
= close_fn
;
94 svn_stream_read(svn_stream_t
*stream
, char *buffer
, apr_size_t
*len
)
96 assert(stream
->read_fn
!= NULL
);
97 return stream
->read_fn(stream
->baton
, buffer
, len
);
102 svn_stream_write(svn_stream_t
*stream
, const char *data
, apr_size_t
*len
)
104 assert(stream
->write_fn
!= NULL
);
105 return stream
->write_fn(stream
->baton
, data
, len
);
110 svn_stream_close(svn_stream_t
*stream
)
112 if (stream
->close_fn
== NULL
)
114 return stream
->close_fn(stream
->baton
);
119 svn_stream_printf(svn_stream_t
*stream
,
129 message
= apr_pvsprintf(pool
, fmt
, ap
);
132 len
= strlen(message
);
133 return svn_stream_write(stream
, message
, &len
);
138 svn_stream_printf_from_utf8(svn_stream_t
*stream
,
139 const char *encoding
,
144 const char *message
, *translated
;
149 message
= apr_pvsprintf(pool
, fmt
, ap
);
152 SVN_ERR(svn_utf_cstring_from_utf8_ex2(&translated
, message
, encoding
,
155 len
= strlen(translated
);
157 return svn_stream_write(stream
, translated
, &len
);
162 svn_stream_readline(svn_stream_t
*stream
,
163 svn_stringbuf_t
**stringbuf
,
171 svn_stringbuf_t
*str
= svn_stringbuf_create("", pool
);
173 /* Since we're reading one character at a time, let's at least
174 optimize for the 90% case. 90% of the time, we can avoid the
175 stringbuf ever having to realloc() itself if we start it out at
177 svn_stringbuf_ensure(str
, 80);
183 SVN_ERR(svn_stream_read(stream
, &c
, &numbytes
));
186 /* a 'short' read means the stream has run out. */
197 svn_stringbuf_appendbytes(str
, &c
, 1);
201 svn_stringbuf_chop(str
, match
- eol
);
207 svn_error_t
*svn_stream_copy2(svn_stream_t
*from
, svn_stream_t
*to
,
208 svn_cancel_func_t cancel_func
,
212 char *buf
= apr_palloc(pool
, SVN__STREAM_CHUNK_SIZE
);
215 /* Read and write chunks until we get a short read, indicating the
216 end of the stream. (We can't get a short write without an
217 associated error.) */
220 len
= SVN__STREAM_CHUNK_SIZE
;
223 SVN_ERR(cancel_func(cancel_baton
));
225 SVN_ERR(svn_stream_read(from
, buf
, &len
));
227 SVN_ERR(svn_stream_write(to
, buf
, &len
));
228 if (len
!= SVN__STREAM_CHUNK_SIZE
)
234 svn_error_t
*svn_stream_copy(svn_stream_t
*from
, svn_stream_t
*to
,
237 return svn_stream_copy2(from
, to
, NULL
, NULL
, pool
);
241 svn_stream_contents_same(svn_boolean_t
*same
,
242 svn_stream_t
*stream1
,
243 svn_stream_t
*stream2
,
246 char *buf1
= apr_palloc(pool
, SVN__STREAM_CHUNK_SIZE
);
247 char *buf2
= apr_palloc(pool
, SVN__STREAM_CHUNK_SIZE
);
248 apr_size_t bytes_read1
= SVN__STREAM_CHUNK_SIZE
;
249 apr_size_t bytes_read2
= SVN__STREAM_CHUNK_SIZE
;
251 *same
= TRUE
; /* assume TRUE, until disproved below */
252 while (bytes_read1
== SVN__STREAM_CHUNK_SIZE
253 && bytes_read2
== SVN__STREAM_CHUNK_SIZE
)
255 SVN_ERR(svn_stream_read(stream1
, buf1
, &bytes_read1
));
256 SVN_ERR(svn_stream_read(stream2
, buf2
, &bytes_read2
));
258 if ((bytes_read1
!= bytes_read2
)
259 || (memcmp(buf1
, buf2
, bytes_read1
)))
271 /*** Generic readable empty stream ***/
274 read_handler_empty(void *baton
, char *buffer
, apr_size_t
*len
)
282 write_handler_empty(void *baton
, const char *data
, apr_size_t
*len
)
289 svn_stream_empty(apr_pool_t
*pool
)
291 svn_stream_t
*stream
;
293 stream
= svn_stream_create(NULL
, pool
);
294 svn_stream_set_read(stream
, read_handler_empty
);
295 svn_stream_set_write(stream
, write_handler_empty
);
302 /*** Ownership detaching stream ***/
305 read_handler_disown(void *baton
, char *buffer
, apr_size_t
*len
)
307 return svn_stream_read((svn_stream_t
*)baton
, buffer
, len
);
311 write_handler_disown(void *baton
, const char *buffer
, apr_size_t
*len
)
313 return svn_stream_write((svn_stream_t
*)baton
, buffer
, len
);
318 svn_stream_disown(svn_stream_t
*stream
, apr_pool_t
*pool
)
320 svn_stream_t
*s
= svn_stream_create(stream
, pool
);
322 svn_stream_set_read(s
, read_handler_disown
);
323 svn_stream_set_write(s
, write_handler_disown
);
330 /*** Generic stream for APR files ***/
338 read_handler_apr(void *baton
, char *buffer
, apr_size_t
*len
)
340 struct baton_apr
*btn
= baton
;
343 err
= svn_io_file_read_full(btn
->file
, buffer
, *len
, len
, btn
->pool
);
344 if (err
&& APR_STATUS_IS_EOF(err
->apr_err
))
346 svn_error_clear(err
);
355 write_handler_apr(void *baton
, const char *data
, apr_size_t
*len
)
357 struct baton_apr
*btn
= baton
;
359 return svn_io_file_write_full(btn
->file
, data
, *len
, len
, btn
->pool
);
363 close_handler_apr(void *baton
)
365 struct baton_apr
*btn
= baton
;
367 return svn_io_file_close(btn
->file
, btn
->pool
);
372 svn_stream_from_aprfile2(apr_file_t
*file
,
373 svn_boolean_t disown
,
376 struct baton_apr
*baton
;
377 svn_stream_t
*stream
;
380 return svn_stream_empty(pool
);
382 baton
= apr_palloc(pool
, sizeof(*baton
));
385 stream
= svn_stream_create(baton
, pool
);
386 svn_stream_set_read(stream
, read_handler_apr
);
387 svn_stream_set_write(stream
, write_handler_apr
);
390 svn_stream_set_close(stream
, close_handler_apr
);
396 svn_stream_from_aprfile(apr_file_t
*file
, apr_pool_t
*pool
)
398 return svn_stream_from_aprfile2(file
, TRUE
, pool
);
403 /* Compressed stream support */
405 #define ZBUFFER_SIZE 4096 /* The size of the buffer the
406 compressed stream uses to read from
407 the substream. Basically an
408 arbitrary value, picked to be about
412 z_stream
*in
; /* compressed stream for reading */
413 z_stream
*out
; /* compressed stream for writing */
414 svn_read_fn_t read
; /* substream's read function */
415 svn_write_fn_t write
; /* substream's write function */
416 svn_close_fn_t close
; /* substream's close function */
417 void *read_buffer
; /* buffer used for reading from
419 int read_flush
; /* what flush mode to use while
421 apr_pool_t
*pool
; /* The pool this baton is allocated
423 void *subbaton
; /* The substream's baton */
426 /* zlib alloc function. opaque is the pool we need. */
428 zalloc(voidpf opaque
, uInt items
, uInt size
)
430 apr_pool_t
*pool
= opaque
;
432 return apr_palloc(pool
, items
* size
);
435 /* zlib free function */
437 zfree(voidpf opaque
, voidpf address
)
439 /* Empty, since we allocate on the pool */
442 /* Converts a zlib error to an svn_error_t. zerr is the error code,
443 function is the function name, and stream is the z_stream we are
446 zerr_to_svn_error(int zerr
, const char *function
, z_stream
*stream
)
457 status
= SVN_ERR_STREAM_MALFORMED_DATA
;
458 message
= "stream error";
463 message
= "out of memory";
468 message
= "buffer error";
471 case Z_VERSION_ERROR
:
472 status
= SVN_ERR_STREAM_UNRECOGNIZED_DATA
;
473 message
= "version error";
477 status
= SVN_ERR_STREAM_MALFORMED_DATA
;
478 message
= "corrupted data";
482 status
= SVN_ERR_STREAM_UNRECOGNIZED_DATA
;
487 if (stream
->msg
!= NULL
)
488 return svn_error_createf(status
, NULL
, "zlib (%s): %s: %s", function
,
489 message
, stream
->msg
);
491 return svn_error_createf(status
, NULL
, "zlib (%s): %s", function
,
495 /* Helper function to figure out the sync mode */
497 read_helper_gz(svn_read_fn_t read_fn
,
500 uInt
*len
, int *zflush
)
502 uInt orig_len
= *len
;
504 /* There's no reason this value should grow bigger than the range of
505 uInt, but Subversion's API requires apr_size_t. */
506 apr_size_t apr_len
= (apr_size_t
) *len
;
508 SVN_ERR((*read_fn
)(baton
, buffer
, &apr_len
));
510 /* Type cast back to uInt type that zlib uses. On LP64 platforms
511 apr_size_t will be bigger than uInt. */
512 *len
= (uInt
) apr_len
;
514 /* I wanted to use Z_FINISH here, but we need to know our buffer is
516 *zflush
= (*len
) < orig_len
? Z_SYNC_FLUSH
: Z_SYNC_FLUSH
;
521 /* Handle reading from a compressed stream */
523 read_handler_gz(void *baton
, char *buffer
, apr_size_t
*len
)
525 struct zbaton
*btn
= baton
;
530 btn
->in
= apr_palloc(btn
->pool
, sizeof(z_stream
));
531 btn
->in
->zalloc
= zalloc
;
532 btn
->in
->zfree
= zfree
;
533 btn
->in
->opaque
= btn
->pool
;
534 btn
->read_buffer
= apr_palloc(btn
->pool
, ZBUFFER_SIZE
);
535 btn
->in
->next_in
= btn
->read_buffer
;
536 btn
->in
->avail_in
= ZBUFFER_SIZE
;
538 SVN_ERR(read_helper_gz(btn
->read
, btn
->subbaton
, btn
->read_buffer
,
539 &btn
->in
->avail_in
, &btn
->read_flush
));
541 zerr
= inflateInit(btn
->in
);
542 SVN_ERR(zerr_to_svn_error(zerr
, "inflateInit", btn
->in
));
545 btn
->in
->next_out
= (Bytef
*) buffer
;
546 btn
->in
->avail_out
= *len
;
548 while (btn
->in
->avail_out
> 0)
550 if (btn
->in
->avail_in
<= 0)
552 btn
->in
->avail_in
= ZBUFFER_SIZE
;
553 btn
->in
->next_in
= btn
->read_buffer
;
554 SVN_ERR(read_helper_gz(btn
->read
, btn
->subbaton
, btn
->read_buffer
,
555 &btn
->in
->avail_in
, &btn
->read_flush
));
558 zerr
= inflate(btn
->in
, btn
->read_flush
);
559 if (zerr
== Z_STREAM_END
)
561 else if (zerr
!= Z_OK
)
562 return zerr_to_svn_error(zerr
, "inflate", btn
->in
);
565 *len
-= btn
->in
->avail_out
;
569 /* Compress data and write it to the substream */
571 write_handler_gz(void *baton
, const char *buffer
, apr_size_t
*len
)
573 struct zbaton
*btn
= baton
;
576 apr_size_t buf_size
, write_len
;
579 if (btn
->out
== NULL
)
581 btn
->out
= apr_palloc(btn
->pool
, sizeof(z_stream
));
582 btn
->out
->zalloc
= zalloc
;
583 btn
->out
->zfree
= zfree
;
584 btn
->out
->opaque
= btn
->pool
;
586 zerr
= deflateInit(btn
->out
, Z_DEFAULT_COMPRESSION
);
587 SVN_ERR(zerr_to_svn_error(zerr
, "deflateInit", btn
->out
));
590 /* The largest buffer we should need is 0.1% larger than the
591 compressed data, + 12 bytes. This info comes from zlib.h. */
592 buf_size
= *len
+ (*len
/ 1000) + 13;
593 subpool
= svn_pool_create(btn
->pool
);
594 write_buf
= apr_palloc(subpool
, buf_size
);
596 btn
->out
->next_in
= (Bytef
*) buffer
; /* Casting away const! */
597 btn
->out
->avail_in
= *len
;
599 while (btn
->out
->avail_in
> 0)
601 btn
->out
->next_out
= write_buf
;
602 btn
->out
->avail_out
= buf_size
;
604 zerr
= deflate(btn
->out
, Z_NO_FLUSH
);
605 SVN_ERR(zerr_to_svn_error(zerr
, "deflate", btn
->out
));
606 write_len
= buf_size
- btn
->out
->avail_out
;
608 SVN_ERR(btn
->write(btn
->subbaton
, write_buf
, &write_len
));
611 svn_pool_destroy(subpool
);
616 /* Handle flushing and closing the stream */
618 close_handler_gz(void *baton
)
620 struct zbaton
*btn
= baton
;
625 zerr
= inflateEnd(btn
->in
);
626 SVN_ERR(zerr_to_svn_error(zerr
, "inflateEnd", btn
->in
));
629 if (btn
->out
!= NULL
)
632 apr_size_t write_len
;
634 buf
= apr_palloc(btn
->pool
, ZBUFFER_SIZE
);
638 btn
->out
->next_out
= buf
;
639 btn
->out
->avail_out
= ZBUFFER_SIZE
;
641 zerr
= deflate(btn
->out
, Z_FINISH
);
642 if (zerr
!= Z_STREAM_END
&& zerr
!= Z_OK
)
643 return zerr_to_svn_error(zerr
, "deflate", btn
->out
);
644 write_len
= ZBUFFER_SIZE
- btn
->out
->avail_out
;
646 SVN_ERR(btn
->write(btn
->subbaton
, buf
, &write_len
));
647 if (zerr
== Z_STREAM_END
)
651 zerr
= deflateEnd(btn
->out
);
652 SVN_ERR(zerr_to_svn_error(zerr
, "deflateEnd", btn
->out
));
655 if (btn
->close
!= NULL
)
656 return btn
->close(btn
->subbaton
);
663 svn_stream_compressed(svn_stream_t
*stream
, apr_pool_t
*pool
)
665 struct svn_stream_t
*zstream
;
666 struct zbaton
*baton
;
668 assert(stream
!= NULL
);
670 baton
= apr_palloc(pool
, sizeof(*baton
));
671 baton
->in
= baton
->out
= NULL
;
672 baton
->read
= stream
->read_fn
;
673 baton
->write
= stream
->write_fn
;
674 baton
->close
= stream
->close_fn
;
675 baton
->subbaton
= stream
->baton
;
677 baton
->read_buffer
= NULL
;
678 baton
->read_flush
= Z_SYNC_FLUSH
;
680 zstream
= svn_stream_create(baton
, pool
);
681 svn_stream_set_read(zstream
, read_handler_gz
);
682 svn_stream_set_write(zstream
, write_handler_gz
);
683 svn_stream_set_close(zstream
, close_handler_gz
);
689 /* MD5 checked stream support */
691 struct md5_stream_baton
693 apr_md5_ctx_t read_ctx
, write_ctx
;
694 const unsigned char **read_digest
;
695 const unsigned char **write_digest
;
696 unsigned char read_digest_buf
[APR_MD5_DIGESTSIZE
];
697 unsigned char write_digest_buf
[APR_MD5_DIGESTSIZE
];
700 /* True if more data should be read when closing the stream. */
701 svn_boolean_t read_more
;
703 /* Pool to allocate read buffer from. */
708 read_handler_md5(void *baton
, char *buffer
, apr_size_t
*len
)
710 struct md5_stream_baton
*btn
= baton
;
711 apr_size_t saved_len
= *len
;
713 SVN_ERR(svn_stream_read(btn
->proxy
, buffer
, len
));
715 if (btn
->read_digest
)
717 apr_status_t apr_err
= apr_md5_update(&btn
->read_ctx
, buffer
, *len
);
720 return svn_error_create(apr_err
, NULL
, NULL
);
723 if (saved_len
!= *len
)
724 btn
->read_more
= FALSE
;
731 write_handler_md5(void *baton
, const char *buffer
, apr_size_t
*len
)
733 struct md5_stream_baton
*btn
= baton
;
735 if (btn
->write_digest
&& *len
> 0)
737 apr_status_t apr_err
= apr_md5_update(&btn
->write_ctx
, buffer
, *len
);
740 return svn_error_create(apr_err
, NULL
, NULL
);
743 return svn_stream_write(btn
->proxy
, buffer
, len
);
748 close_handler_md5(void *baton
)
750 struct md5_stream_baton
*btn
= baton
;
752 /* If we're supposed to drain the stream, do so before finalizing the
756 char *buf
= apr_palloc(btn
->pool
, SVN__STREAM_CHUNK_SIZE
);
757 apr_size_t len
= SVN__STREAM_CHUNK_SIZE
;
761 SVN_ERR(read_handler_md5(baton
, buf
, &len
));
763 while (btn
->read_more
);
766 if (btn
->read_digest
)
769 = apr_md5_final(btn
->read_digest_buf
, &btn
->read_ctx
);
772 return svn_error_create(apr_err
, NULL
, NULL
);
774 *btn
->read_digest
= btn
->read_digest_buf
;
777 if (btn
->write_digest
)
780 = apr_md5_final(btn
->write_digest_buf
, &btn
->write_ctx
);
783 return svn_error_create(apr_err
, NULL
, NULL
);
785 *btn
->write_digest
= btn
->write_digest_buf
;
788 return svn_stream_close(btn
->proxy
);
793 svn_stream_checksummed(svn_stream_t
*stream
,
794 const unsigned char **read_digest
,
795 const unsigned char **write_digest
,
796 svn_boolean_t read_all
,
800 struct md5_stream_baton
*baton
;
802 if (! read_digest
&& ! write_digest
)
805 baton
= apr_palloc(pool
, sizeof(*baton
));
806 apr_md5_init(&baton
->read_ctx
);
807 apr_md5_init(&baton
->write_ctx
);
808 baton
->read_digest
= read_digest
;
809 baton
->write_digest
= write_digest
;
810 baton
->proxy
= stream
;
811 baton
->read_more
= read_all
;
814 s
= svn_stream_create(baton
, pool
);
815 svn_stream_set_read(s
, read_handler_md5
);
816 svn_stream_set_write(s
, write_handler_md5
);
817 svn_stream_set_close(s
, close_handler_md5
);
824 /* Miscellaneous stream functions. */
825 struct string_stream_baton
827 svn_stringbuf_t
*str
;
832 read_handler_string(void *baton
, char *buffer
, apr_size_t
*len
)
834 struct string_stream_baton
*btn
= baton
;
835 apr_size_t left_to_read
= btn
->str
->len
- btn
->amt_read
;
837 *len
= (*len
> left_to_read
) ? left_to_read
: *len
;
838 memcpy(buffer
, btn
->str
->data
+ btn
->amt_read
, *len
);
839 btn
->amt_read
+= *len
;
844 write_handler_string(void *baton
, const char *data
, apr_size_t
*len
)
846 struct string_stream_baton
*btn
= baton
;
848 svn_stringbuf_appendbytes(btn
->str
, data
, *len
);
853 svn_stream_from_stringbuf(svn_stringbuf_t
*str
,
856 svn_stream_t
*stream
;
857 struct string_stream_baton
*baton
;
860 return svn_stream_empty(pool
);
862 baton
= apr_palloc(pool
, sizeof(*baton
));
865 stream
= svn_stream_create(baton
, pool
);
866 svn_stream_set_read(stream
, read_handler_string
);
867 svn_stream_set_write(stream
, write_handler_string
);
873 svn_stream_for_stdout(svn_stream_t
**out
, apr_pool_t
*pool
)
875 apr_file_t
*stdout_file
;
876 apr_status_t apr_err
;
878 apr_err
= apr_file_open_stdout(&stdout_file
, pool
);
880 return svn_error_wrap_apr(apr_err
, "Can't open stdout");
882 *out
= svn_stream_from_aprfile(stdout_file
, pool
);