3 Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4 2011, 2012 Attractive Chaos <attractor@live.co.uk>
5 Copyright (C) 2009, 2013-2017 Genome Research Ltd
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
35 #include <sys/types.h>
38 #include "htslib/hts.h"
39 #include "htslib/bgzf.h"
40 #include "htslib/hfile.h"
41 #include "htslib/thread_pool.h"
42 #include "cram/pooled_alloc.h"
47 #define BLOCK_HEADER_LENGTH 18
48 #define BLOCK_FOOTER_LENGTH 8
51 /* BGZF/GZIP header (speciallized from RFC 1952; little endian):
52 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
53 | 31|139| 8| 4| 0| 0|255| 6| 66| 67| 2|BLK_LEN|
54 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
60 BGZF format is compatible with GZIP. It limits the size of each compressed
61 block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
65 static const uint8_t g_magic
[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
73 #include "htslib/khash.h"
74 KHASH_MAP_INIT_INT64(cache
, cache_t
)
79 typedef struct bgzf_job
{
81 unsigned char comp_data
[BGZF_MAX_BLOCK_SIZE
];
83 unsigned char uncomp_data
[BGZF_MAX_BLOCK_SIZE
];
86 int64_t block_address
;
97 typedef struct bgzf_mtaux_t
{
98 // Memory pool for bgzf_job structs, to avoid many malloc/free
99 pool_alloc_t
*job_pool
;
107 // Output queue holding completed bgzf_jobs
108 hts_tpool_process
*out_queue
;
112 pthread_mutex_t job_pool_m
;
113 int jobs_pending
; // number of jobs waiting
116 int hit_eof
; // r/w entirely within main thread
118 // Message passing to the reader thread; eg seek requests
120 uint64_t block_address
;
122 pthread_mutex_t command_m
; // Set whenever fp is being updated
123 pthread_cond_t command_c
;
124 enum mtaux_cmd command
;
130 uint64_t uaddr
; // offset w.r.t. uncompressed data
131 uint64_t caddr
; // offset w.r.t. compressed data
137 int noffs
, moffs
; // the size of the index, n:used, m:allocated
138 bgzidx1_t
*offs
; // offsets
139 uint64_t ublock_addr
; // offset of the current block (uncompressed data)
142 void bgzf_index_destroy(BGZF
*fp
);
143 int bgzf_index_add_block(BGZF
*fp
);
144 static void mt_destroy(mtaux_t
*mt
);
146 static inline void packInt16(uint8_t *buffer
, uint16_t value
)
149 buffer
[1] = value
>> 8;
152 static inline int unpackInt16(const uint8_t *buffer
)
154 return buffer
[0] | buffer
[1] << 8;
157 static inline void packInt32(uint8_t *buffer
, uint32_t value
)
160 buffer
[1] = value
>> 8;
161 buffer
[2] = value
>> 16;
162 buffer
[3] = value
>> 24;
165 static const char *bgzf_zerr(int errnum
, z_stream
*zs
)
167 static char buffer
[32];
169 /* Return zs->msg if available.
170 zlib doesn't set this very reliably. Looking at the source suggests
171 that it may get set to a useful message for deflateInit2, inflateInit2
172 and inflate when it returns Z_DATA_ERROR. For inflate with other
173 return codes, deflate, deflateEnd and inflateEnd it doesn't appear
174 to be useful. For the likely non-useful cases, the caller should
175 pass NULL into zs. */
177 if (zs
&& zs
->msg
) return zs
->msg
;
179 // gzerror OF((gzFile file, int *errnum)
182 return strerror(errno
);
184 return "invalid parameter/compression level, or inconsistent stream state";
186 return "invalid or incomplete IO";
188 return "out of memory";
190 return "progress temporarily not possible, or in() / out() returned an error";
191 case Z_VERSION_ERROR
:
192 return "zlib version mismatch";
193 case Z_OK
: // 0: maybe gzgets error Z_NULL
195 snprintf(buffer
, sizeof(buffer
), "[%d] unknown", errnum
);
196 return buffer
; // FIXME: Not thread-safe.
200 static BGZF
*bgzf_read_init(hFILE
*hfpr
)
204 ssize_t n
= hpeek(hfpr
, magic
, 18);
205 if (n
< 0) return NULL
;
207 fp
= (BGZF
*)calloc(1, sizeof(BGZF
));
208 if (fp
== NULL
) return NULL
;
211 fp
->uncompressed_block
= malloc(2 * BGZF_MAX_BLOCK_SIZE
);
212 if (fp
->uncompressed_block
== NULL
) { free(fp
); return NULL
; }
213 fp
->compressed_block
= (char *)fp
->uncompressed_block
+ BGZF_MAX_BLOCK_SIZE
;
214 fp
->is_compressed
= (n
==18 && magic
[0]==0x1f && magic
[1]==0x8b);
215 fp
->is_gzip
= ( !fp
->is_compressed
|| ((magic
[3]&4) && memcmp(&magic
[12], "BC\2\0",4)==0) ) ? 0 : 1;
217 fp
->cache
= kh_init(cache
);
222 // get the compress level from the mode string: compress_level==-1 for the default level, -2 plain uncompressed
223 static int mode2level(const char *mode
)
225 int i
, compress_level
= -1;
226 for (i
= 0; mode
[i
]; ++i
)
227 if (mode
[i
] >= '0' && mode
[i
] <= '9') break;
228 if (mode
[i
]) compress_level
= (int)mode
[i
] - '0';
229 if (strchr(mode
, 'u')) compress_level
= -2;
230 return compress_level
;
232 static BGZF
*bgzf_write_init(const char *mode
)
235 fp
= (BGZF
*)calloc(1, sizeof(BGZF
));
236 if (fp
== NULL
) goto mem_fail
;
238 int compress_level
= mode2level(mode
);
239 if ( compress_level
==-2 )
241 fp
->is_compressed
= 0;
244 fp
->is_compressed
= 1;
246 fp
->uncompressed_block
= malloc(2 * BGZF_MAX_BLOCK_SIZE
);
247 if (fp
->uncompressed_block
== NULL
) goto mem_fail
;
248 fp
->compressed_block
= (char *)fp
->uncompressed_block
+ BGZF_MAX_BLOCK_SIZE
;
250 fp
->compress_level
= compress_level
< 0? Z_DEFAULT_COMPRESSION
: compress_level
; // Z_DEFAULT_COMPRESSION==-1
251 if (fp
->compress_level
> 9) fp
->compress_level
= Z_DEFAULT_COMPRESSION
;
252 if ( strchr(mode
,'g') )
256 fp
->gz_stream
= (z_stream
*)calloc(1,sizeof(z_stream
));
257 if (fp
->gz_stream
== NULL
) goto mem_fail
;
258 fp
->gz_stream
->zalloc
= NULL
;
259 fp
->gz_stream
->zfree
= NULL
;
260 fp
->gz_stream
->msg
= NULL
;
262 int ret
= deflateInit2(fp
->gz_stream
, fp
->compress_level
, Z_DEFLATED
, 15|16, 8, Z_DEFAULT_STRATEGY
);
264 hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret
, fp
->gz_stream
));
271 hts_log_error("%s", strerror(errno
));
275 free(fp
->uncompressed_block
);
282 BGZF
*bgzf_open(const char *path
, const char *mode
)
285 assert(compressBound(BGZF_BLOCK_SIZE
) < BGZF_MAX_BLOCK_SIZE
);
286 if (strchr(mode
, 'r')) {
288 if ((fpr
= hopen(path
, mode
)) == 0) return 0;
289 fp
= bgzf_read_init(fpr
);
290 if (fp
== 0) { hclose_abruptly(fpr
); return NULL
; }
292 } else if (strchr(mode
, 'w') || strchr(mode
, 'a')) {
294 if ((fpw
= hopen(path
, mode
)) == 0) return 0;
295 fp
= bgzf_write_init(mode
);
296 if (fp
== NULL
) return NULL
;
299 else { errno
= EINVAL
; return 0; }
301 fp
->is_be
= ed_is_big();
305 BGZF
*bgzf_dopen(int fd
, const char *mode
)
308 assert(compressBound(BGZF_BLOCK_SIZE
) < BGZF_MAX_BLOCK_SIZE
);
309 if (strchr(mode
, 'r')) {
311 if ((fpr
= hdopen(fd
, mode
)) == 0) return 0;
312 fp
= bgzf_read_init(fpr
);
313 if (fp
== 0) { hclose_abruptly(fpr
); return NULL
; } // FIXME this closes fd
315 } else if (strchr(mode
, 'w') || strchr(mode
, 'a')) {
317 if ((fpw
= hdopen(fd
, mode
)) == 0) return 0;
318 fp
= bgzf_write_init(mode
);
319 if (fp
== NULL
) return NULL
;
322 else { errno
= EINVAL
; return 0; }
324 fp
->is_be
= ed_is_big();
328 BGZF
*bgzf_hopen(hFILE
*hfp
, const char *mode
)
331 assert(compressBound(BGZF_BLOCK_SIZE
) < BGZF_MAX_BLOCK_SIZE
);
332 if (strchr(mode
, 'r')) {
333 fp
= bgzf_read_init(hfp
);
334 if (fp
== NULL
) return NULL
;
335 } else if (strchr(mode
, 'w') || strchr(mode
, 'a')) {
336 fp
= bgzf_write_init(mode
);
337 if (fp
== NULL
) return NULL
;
339 else { errno
= EINVAL
; return 0; }
342 fp
->is_be
= ed_is_big();
346 int bgzf_compress(void *_dst
, size_t *dlen
, const void *src
, size_t slen
, int level
)
350 uint8_t *dst
= (uint8_t*)_dst
;
353 zs
.zalloc
= NULL
; zs
.zfree
= NULL
;
355 zs
.next_in
= (Bytef
*)src
;
357 zs
.next_out
= dst
+ BLOCK_HEADER_LENGTH
;
358 zs
.avail_out
= *dlen
- BLOCK_HEADER_LENGTH
- BLOCK_FOOTER_LENGTH
;
359 int ret
= deflateInit2(&zs
, level
, Z_DEFLATED
, -15, 8, Z_DEFAULT_STRATEGY
); // -15 to disable zlib header/footer
361 hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret
, &zs
));
364 if ((ret
= deflate(&zs
, Z_FINISH
)) != Z_STREAM_END
) {
365 hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret
, ret
== Z_DATA_ERROR
? &zs
: NULL
));
368 if ((ret
= deflateEnd(&zs
)) != Z_OK
) {
369 hts_log_error("Call to deflateEnd failed: %s", bgzf_zerr(ret
, NULL
));
372 *dlen
= zs
.total_out
+ BLOCK_HEADER_LENGTH
+ BLOCK_FOOTER_LENGTH
;
374 memcpy(dst
, g_magic
, BLOCK_HEADER_LENGTH
); // the last two bytes are a place holder for the length of the block
375 packInt16(&dst
[16], *dlen
- 1); // write the compressed length; -1 to fit 2 bytes
377 crc
= crc32(crc32(0L, NULL
, 0L), (Bytef
*)src
, slen
);
378 packInt32((uint8_t*)&dst
[*dlen
- 8], crc
);
379 packInt32((uint8_t*)&dst
[*dlen
- 4], slen
);
383 static int bgzf_gzip_compress(BGZF
*fp
, void *_dst
, size_t *dlen
, const void *src
, size_t slen
, int level
)
385 uint8_t *dst
= (uint8_t*)_dst
;
386 z_stream
*zs
= fp
->gz_stream
;
387 int flush
= slen
? Z_PARTIAL_FLUSH
: Z_FINISH
;
388 zs
->next_in
= (Bytef
*)src
;
391 zs
->avail_out
= *dlen
;
392 int ret
= deflate(zs
, flush
);
393 if (ret
== Z_STREAM_ERROR
) {
394 hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret
, NULL
));
397 if (zs
->avail_in
!= 0) {
398 hts_log_error("Deflate block too large for output buffer");
401 *dlen
= *dlen
- zs
->avail_out
;
405 // Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
406 static int deflate_block(BGZF
*fp
, int block_length
)
408 size_t comp_size
= BGZF_MAX_BLOCK_SIZE
;
411 ret
= bgzf_compress(fp
->compressed_block
, &comp_size
, fp
->uncompressed_block
, block_length
, fp
->compress_level
);
413 ret
= bgzf_gzip_compress(fp
, fp
->compressed_block
, &comp_size
, fp
->uncompressed_block
, block_length
, fp
->compress_level
);
417 hts_log_debug("Compression error %d", ret
);
418 fp
->errcode
|= BGZF_ERR_ZLIB
;
421 fp
->block_offset
= 0;
425 static int bgzf_uncompress(uint8_t *dst
, size_t *dlen
, const uint8_t *src
, size_t slen
) {
430 zs
.next_in
= (Bytef
*)src
;
432 zs
.next_out
= (Bytef
*)dst
;
433 zs
.avail_out
= *dlen
;
435 int ret
= inflateInit2(&zs
, -15);
437 hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret
, &zs
));
440 if ((ret
= inflate(&zs
, Z_FINISH
)) != Z_STREAM_END
) {
441 hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret
, ret
== Z_DATA_ERROR
? &zs
: NULL
));
442 if ((ret
= inflateEnd(&zs
)) != Z_OK
) {
443 hts_log_warning("Call to inflateEnd failed: %s", bgzf_zerr(ret
, NULL
));
447 if ((ret
= inflateEnd(&zs
)) != Z_OK
) {
448 hts_log_error("Call to inflateEnd failed: %s", bgzf_zerr(ret
, NULL
));
451 *dlen
= *dlen
- zs
.avail_out
;
455 // Inflate the block in fp->compressed_block into fp->uncompressed_block
456 static int inflate_block(BGZF
* fp
, int block_length
)
458 size_t dlen
= BGZF_MAX_BLOCK_SIZE
;
459 int ret
= bgzf_uncompress(fp
->uncompressed_block
, &dlen
,
460 (Bytef
*)fp
->compressed_block
+ 18, block_length
- 18);
462 fp
->errcode
|= BGZF_ERR_ZLIB
;
469 static int inflate_gzip_block(BGZF
*fp
, int cached
)
474 if ( !cached
&& fp
->gz_stream
->avail_out
!=0 )
476 fp
->gz_stream
->avail_in
= hread(fp
->fp
, fp
->compressed_block
, BGZF_BLOCK_SIZE
);
477 if ( fp
->gz_stream
->avail_in
<=0 ) return fp
->gz_stream
->avail_in
;
478 if ( fp
->gz_stream
->avail_in
==0 ) break;
479 fp
->gz_stream
->next_in
= fp
->compressed_block
;
484 fp
->gz_stream
->next_out
= (Bytef
*)fp
->uncompressed_block
+ fp
->block_offset
;
485 fp
->gz_stream
->avail_out
= BGZF_MAX_BLOCK_SIZE
- fp
->block_offset
;
486 fp
->gz_stream
->msg
= NULL
;
487 ret
= inflate(fp
->gz_stream
, Z_NO_FLUSH
);
488 if (ret
< 0 && ret
!= Z_BUF_ERROR
) {
489 hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret
, ret
== Z_DATA_ERROR
? fp
->gz_stream
: NULL
));
490 fp
->errcode
|= BGZF_ERR_ZLIB
;
493 unsigned int have
= BGZF_MAX_BLOCK_SIZE
- fp
->gz_stream
->avail_out
;
494 if ( have
) return have
;
496 while ( fp
->gz_stream
->avail_out
== 0 );
498 while (ret
!= Z_STREAM_END
);
499 return BGZF_MAX_BLOCK_SIZE
- fp
->gz_stream
->avail_out
;
502 // Returns: 0 on success (BGZF header); -1 on non-BGZF GZIP header; -2 on error
503 static int check_header(const uint8_t *header
)
505 if ( header
[0] != 31 || header
[1] != 139 || header
[2] != 8 ) return -2;
506 return ((header
[3] & 4) != 0
507 && unpackInt16((uint8_t*)&header
[10]) == 6
508 && header
[12] == 'B' && header
[13] == 'C'
509 && unpackInt16((uint8_t*)&header
[14]) == 2) ? 0 : -1;
513 static void free_cache(BGZF
*fp
)
516 khash_t(cache
) *h
= (khash_t(cache
)*)fp
->cache
;
517 if (fp
->is_write
) return;
518 for (k
= kh_begin(h
); k
< kh_end(h
); ++k
)
519 if (kh_exist(h
, k
)) free(kh_val(h
, k
).block
);
520 kh_destroy(cache
, h
);
523 static int load_block_from_cache(BGZF
*fp
, int64_t block_address
)
528 khash_t(cache
) *h
= (khash_t(cache
)*)fp
->cache
;
529 k
= kh_get(cache
, h
, block_address
);
530 if (k
== kh_end(h
)) return 0;
532 if (fp
->block_length
!= 0) fp
->block_offset
= 0;
533 fp
->block_address
= block_address
;
534 fp
->block_length
= p
->size
;
535 // FIXME: why BGZF_MAX_BLOCK_SIZE and not p->size?
536 memcpy(fp
->uncompressed_block
, p
->block
, BGZF_MAX_BLOCK_SIZE
);
537 if ( hseek(fp
->fp
, p
->end_offset
, SEEK_SET
) < 0 )
539 // todo: move the error up
540 hts_log_error("Could not hseek to %"PRId64
"", p
->end_offset
);
546 static void cache_block(BGZF
*fp
, int size
)
551 //fprintf(stderr, "Cache block at %llx\n", (int)fp->block_address);
552 khash_t(cache
) *h
= (khash_t(cache
)*)fp
->cache
;
553 if (BGZF_MAX_BLOCK_SIZE
>= fp
->cache_size
) return;
554 if ((kh_size(h
) + 1) * BGZF_MAX_BLOCK_SIZE
> (uint32_t)fp
->cache_size
) {
555 /* A better way would be to remove the oldest block in the
556 * cache, but here we remove a random one for simplicity. This
557 * should not have a big impact on performance. */
558 for (k
= kh_begin(h
); k
< kh_end(h
); ++k
)
559 if (kh_exist(h
, k
)) break;
561 free(kh_val(h
, k
).block
);
565 k
= kh_put(cache
, h
, fp
->block_address
, &ret
);
566 if (ret
== 0) return; // if this happens, a bug!
568 p
->size
= fp
->block_length
;
569 p
->end_offset
= fp
->block_address
+ size
;
570 p
->block
= (uint8_t*)malloc(BGZF_MAX_BLOCK_SIZE
);
571 memcpy(kh_val(h
, k
).block
, fp
->uncompressed_block
, BGZF_MAX_BLOCK_SIZE
);
574 static void free_cache(BGZF
*fp
) {}
575 static int load_block_from_cache(BGZF
*fp
, int64_t block_address
) {return 0;}
576 static void cache_block(BGZF
*fp
, int size
) {}
580 * Absolute htell in this compressed file.
582 * Do not confuse with the external bgzf_tell macro which returns the virtual
585 static off_t
bgzf_htell(BGZF
*fp
) {
587 pthread_mutex_lock(&fp
->mt
->job_pool_m
);
588 off_t pos
= fp
->block_address
+ fp
->block_clength
;
589 pthread_mutex_unlock(&fp
->mt
->job_pool_m
);
592 return htell(fp
->fp
);
596 int bgzf_read_block(BGZF
*fp
)
602 if (fp
->mt
->hit_eof
) {
603 // Further reading at EOF will always return 0
604 fp
->block_length
= 0;
607 r
= hts_tpool_next_result_wait(fp
->mt
->out_queue
);
608 bgzf_job
*j
= r
? (bgzf_job
*)hts_tpool_result_data(r
) : NULL
;
610 if (!j
|| j
->errcode
== BGZF_ERR_MT
) {
611 if (!fp
->mt
->free_block
) {
612 fp
->uncompressed_block
= malloc(2 * BGZF_MAX_BLOCK_SIZE
);
613 if (fp
->uncompressed_block
== NULL
) return -1;
614 fp
->compressed_block
= (char *)fp
->uncompressed_block
+ BGZF_MAX_BLOCK_SIZE
;
615 } // else it's already allocated with malloc, maybe even in-use.
618 hts_tpool_delete_result(r
, 0);
620 goto single_threaded
;
624 fp
->errcode
= j
->errcode
;
629 if (!fp
->last_block_eof
&& !fp
->no_eof_block
) {
630 fp
->no_eof_block
= 1;
631 hts_log_warning("EOF marker is absent. The input is probably truncated");
636 // Zero length blocks in the middle of a file are (wrongly)
637 // considered as EOF by many callers. We work around this by
638 // trying again to see if we hit a genuine EOF.
639 if (!j
->hit_eof
&& j
->uncomp_len
== 0) {
640 fp
->last_block_eof
= 1;
641 hts_tpool_delete_result(r
, 0);
645 // block_length=0 and block_offset set by bgzf_seek.
646 if (fp
->block_length
!= 0) fp
->block_offset
= 0;
647 if (!j
->hit_eof
) fp
->block_address
= j
->block_address
;
648 fp
->block_clength
= j
->comp_len
;
649 fp
->block_length
= j
->uncomp_len
;
650 // bgzf_read() can change fp->block_length
651 fp
->last_block_eof
= (fp
->block_length
== 0);
653 if ( j
->uncomp_len
&& j
->fp
->idx_build_otf
)
655 bgzf_index_add_block(j
->fp
);
656 j
->fp
->idx
->ublock_addr
+= j
->uncomp_len
;
659 // Steal the data block as it's quicker than a memcpy.
660 // We just need to make sure we delay the pool free.
661 if (fp
->mt
->curr_job
) {
662 pthread_mutex_lock(&fp
->mt
->job_pool_m
);
663 pool_free(fp
->mt
->job_pool
, fp
->mt
->curr_job
);
664 pthread_mutex_unlock(&fp
->mt
->job_pool_m
);
666 fp
->uncompressed_block
= j
->uncomp_data
;
667 fp
->mt
->curr_job
= j
;
668 if (fp
->mt
->free_block
) {
669 free(fp
->mt
->free_block
); // clear up last non-mt block
670 fp
->mt
->free_block
= NULL
;
673 hts_tpool_delete_result(r
, 0);
677 uint8_t header
[BLOCK_HEADER_LENGTH
], *compressed_block
;
678 int count
, size
, block_length
, remaining
;
683 // Reading an uncompressed file
684 if ( !fp
->is_compressed
)
686 count
= hread(fp
->fp
, fp
->uncompressed_block
, BGZF_MAX_BLOCK_SIZE
);
687 if (count
< 0) // Error
689 fp
->errcode
|= BGZF_ERR_IO
;
692 else if (count
== 0) // EOF
694 fp
->block_length
= 0;
697 if (fp
->block_length
!= 0) fp
->block_offset
= 0;
698 fp
->block_address
+= count
;
699 fp
->block_length
= count
;
703 // Reading compressed file
704 int64_t block_address
;
705 block_address
= bgzf_htell(fp
);
706 if ( fp
->is_gzip
&& fp
->gz_stream
) // is this is an initialized gzip stream?
708 count
= inflate_gzip_block(fp
, 0);
711 fp
->errcode
|= BGZF_ERR_ZLIB
;
714 fp
->block_length
= count
;
715 fp
->block_address
= block_address
;
718 if (fp
->cache_size
&& load_block_from_cache(fp
, block_address
)) return 0;
720 // loop to skip empty bgzf blocks
723 count
= hread(fp
->fp
, header
, sizeof(header
));
724 if (count
== 0) { // no data read
725 if (!fp
->last_block_eof
&& !fp
->no_eof_block
&& !fp
->is_gzip
) {
726 fp
->no_eof_block
= 1;
727 hts_log_warning("EOF marker is absent. The input is probably truncated");
729 fp
->block_length
= 0;
733 if ( count
!= sizeof(header
) || (ret
=check_header(header
))==-2 )
735 fp
->errcode
|= BGZF_ERR_HEADER
;
741 uint8_t *cblock
= (uint8_t*)fp
->compressed_block
;
742 memcpy(cblock
, header
, sizeof(header
));
743 count
= hread(fp
->fp
, cblock
+sizeof(header
), BGZF_BLOCK_SIZE
- sizeof(header
)) + sizeof(header
);
746 // Check optional fields to skip: FLG.FNAME,FLG.FCOMMENT,FLG.FHCRC,FLG.FEXTRA
747 // Note: Some of these fields are untested, I did not have appropriate data available
748 if ( header
[3] & 0x4 ) // FLG.FEXTRA
750 nskip
+= unpackInt16(&cblock
[nskip
]) + 2;
752 if ( header
[3] & 0x8 ) // FLG.FNAME
754 while ( nskip
<count
&& cblock
[nskip
] ) nskip
++;
757 if ( header
[3] & 0x10 ) // FLG.FCOMMENT
759 while ( nskip
<count
&& cblock
[nskip
] ) nskip
++;
762 if ( header
[3] & 0x2 ) nskip
+= 2; // FLG.FHCRC
764 /* FIXME: Should handle this better. There's no reason why
765 someone shouldn't include a massively long comment in their
767 if ( nskip
>= count
)
769 fp
->errcode
|= BGZF_ERR_HEADER
;
774 fp
->gz_stream
= (z_stream
*) calloc(1,sizeof(z_stream
));
775 int ret
= inflateInit2(fp
->gz_stream
, -15);
778 hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret
, fp
->gz_stream
));
779 fp
->errcode
|= BGZF_ERR_ZLIB
;
782 fp
->gz_stream
->avail_in
= count
- nskip
;
783 fp
->gz_stream
->next_in
= cblock
+ nskip
;
784 count
= inflate_gzip_block(fp
, 1);
787 fp
->errcode
|= BGZF_ERR_ZLIB
;
790 fp
->block_length
= count
;
791 fp
->block_address
= block_address
;
792 if ( fp
->idx_build_otf
) return -1; // cannot build index for gzip
796 block_length
= unpackInt16((uint8_t*)&header
[16]) + 1; // +1 because when writing this number, we used "-1"
797 if (block_length
< BLOCK_HEADER_LENGTH
)
799 fp
->errcode
|= BGZF_ERR_HEADER
;
802 compressed_block
= (uint8_t*)fp
->compressed_block
;
803 memcpy(compressed_block
, header
, BLOCK_HEADER_LENGTH
);
804 remaining
= block_length
- BLOCK_HEADER_LENGTH
;
805 count
= hread(fp
->fp
, &compressed_block
[BLOCK_HEADER_LENGTH
], remaining
);
806 if (count
!= remaining
) {
807 fp
->errcode
|= BGZF_ERR_IO
;
811 if ((count
= inflate_block(fp
, block_length
)) < 0) {
812 hts_log_debug("Inflate block operation failed: %s", bgzf_zerr(count
, NULL
));
813 fp
->errcode
|= BGZF_ERR_ZLIB
;
816 fp
->last_block_eof
= (count
== 0);
817 if ( count
) break; // otherwise an empty bgzf block
819 if (fp
->block_length
!= 0) fp
->block_offset
= 0; // Do not reset offset if this read follows a seek.
820 fp
->block_address
= block_address
;
821 fp
->block_length
= count
;
822 if ( fp
->idx_build_otf
)
824 bgzf_index_add_block(fp
);
825 fp
->idx
->ublock_addr
+= count
;
827 cache_block(fp
, size
);
831 ssize_t
bgzf_read(BGZF
*fp
, void *data
, size_t length
)
833 ssize_t bytes_read
= 0;
834 uint8_t *output
= (uint8_t*)data
;
835 if (length
<= 0) return 0;
836 assert(fp
->is_write
== 0);
837 while (bytes_read
< length
) {
838 int copy_length
, available
= fp
->block_length
- fp
->block_offset
;
840 if (available
<= 0) {
841 int ret
= bgzf_read_block(fp
);
843 hts_log_error("Read block operation failed with error %d after %zd of %zu bytes", ret
, bytes_read
, length
);
844 fp
->errcode
|= BGZF_ERR_ZLIB
;
847 available
= fp
->block_length
- fp
->block_offset
;
848 if (available
<= 0) break;
850 copy_length
= length
- bytes_read
< available
? length
- bytes_read
: available
;
851 buffer
= (uint8_t*)fp
->uncompressed_block
;
852 memcpy(output
, buffer
+ fp
->block_offset
, copy_length
);
853 fp
->block_offset
+= copy_length
;
854 output
+= copy_length
;
855 bytes_read
+= copy_length
;
857 // For raw gzip streams this avoids short reads.
858 if (fp
->block_offset
== fp
->block_length
) {
859 fp
->block_address
= bgzf_htell(fp
);
860 fp
->block_offset
= fp
->block_length
= 0;
864 fp
->uncompressed_address
+= bytes_read
;
869 ssize_t
bgzf_raw_read(BGZF
*fp
, void *data
, size_t length
)
871 ssize_t ret
= hread(fp
->fp
, data
, length
);
872 if (ret
< 0) fp
->errcode
|= BGZF_ERR_IO
;
878 void *bgzf_encode_func(void *arg
) {
879 bgzf_job
*j
= (bgzf_job
*)arg
;
881 j
->comp_len
= BGZF_MAX_BLOCK_SIZE
;
882 int ret
= bgzf_compress(j
->comp_data
, &j
->comp_len
,
883 j
->uncomp_data
, j
->uncomp_len
,
884 j
->fp
->compress_level
);
886 j
->errcode
|= BGZF_ERR_ZLIB
;
891 // Our input block has already been decoded by bgzf_mt_read_block().
892 // We need to split that into a fetch block (compressed) and make this
893 // do the actual decompression step.
894 void *bgzf_decode_func(void *arg
) {
895 bgzf_job
*j
= (bgzf_job
*)arg
;
897 j
->uncomp_len
= BGZF_MAX_BLOCK_SIZE
;
898 int ret
= bgzf_uncompress(j
->uncomp_data
, &j
->uncomp_len
,
899 j
->comp_data
+18, j
->comp_len
-18);
901 j
->errcode
|= BGZF_ERR_ZLIB
;
907 * Nul function so we can dispatch a job with the correct serial
908 * to mark failure or to indicate an empty read (EOF).
910 void *bgzf_nul_func(void *arg
) { return arg
; }
913 * Takes compressed blocks off the results queue and calls hwrite to
914 * punt them to the output stream.
916 * Returns NULL when no more are left, or -1 on error
918 static void *bgzf_mt_writer(void *vp
) {
919 BGZF
*fp
= (BGZF
*)vp
;
920 mtaux_t
*mt
= fp
->mt
;
923 // Iterates until result queue is shutdown, where it returns NULL.
924 while ((r
= hts_tpool_next_result_wait(mt
->out_queue
))) {
925 bgzf_job
*j
= (bgzf_job
*)hts_tpool_result_data(r
);
928 if (hwrite(fp
->fp
, j
->comp_data
, j
->comp_len
) != j
->comp_len
) {
929 fp
->errcode
|= BGZF_ERR_IO
;
934 * Periodically call hflush (which calls fsync when on a file).
935 * This avoids the fsync being done at the bgzf_close stage,
936 * which can sometimes cause signficant delays. As this is in
937 * a separate thread, spreading the sync delays throughout the
938 * program execution seems better.
939 * Frequency of 1/512 has been chosen by experimentation
940 * across local XFS, NFS and Lustre tests.
942 if (++mt
->flush_pending
% 512 == 0)
943 if (hflush(fp
->fp
) != 0)
947 hts_tpool_delete_result(r
, 0);
949 // Also updated by main thread
950 pthread_mutex_lock(&mt
->job_pool_m
);
951 pool_free(mt
->job_pool
, j
);
953 pthread_mutex_unlock(&mt
->job_pool_m
);
956 if (hflush(fp
->fp
) != 0)
959 hts_tpool_process_destroy(mt
->out_queue
);
964 hts_tpool_process_destroy(mt
->out_queue
);
970 * Reads a compressed block of data using hread and dispatches it to
971 * the thread pool for decompression. This is the analogue of the old
972 * non-threaded bgzf_read_block() function, but without modifying fp
973 * in any way (except for the read offset). All output goes via the
974 * supplied bgzf_job struct.
976 * Returns NULL when no more are left, or -1 on error
978 int bgzf_mt_read_block(BGZF
*fp
, bgzf_job
*j
)
980 uint8_t header
[BLOCK_HEADER_LENGTH
], *compressed_block
;
981 int count
, size
= 0, block_length
, remaining
;
983 // NOTE: Guaranteed to be compressed as we block multi-threading in
984 // uncompressed mode. However it may be gzip compression instead
987 // Reading compressed file
988 int64_t block_address
;
989 block_address
= htell(fp
->fp
);
991 if (fp
->cache_size
&& load_block_from_cache(fp
, block_address
)) return 0;
992 count
= hpeek(fp
->fp
, header
, sizeof(header
));
993 if (count
== 0) // no data read
996 if ( count
!= sizeof(header
) || (ret
=check_header(header
))==-2 )
998 j
->errcode
|= BGZF_ERR_HEADER
;
1002 j
->errcode
|= BGZF_ERR_MT
;
1006 count
= hread(fp
->fp
, header
, sizeof(header
));
1007 if (count
!= sizeof(header
)) // no data read
1011 block_length
= unpackInt16((uint8_t*)&header
[16]) + 1; // +1 because when writing this number, we used "-1"
1012 if (block_length
< BLOCK_HEADER_LENGTH
) {
1013 j
->errcode
|= BGZF_ERR_HEADER
;
1016 compressed_block
= (uint8_t*)j
->comp_data
;
1017 memcpy(compressed_block
, header
, BLOCK_HEADER_LENGTH
);
1018 remaining
= block_length
- BLOCK_HEADER_LENGTH
;
1019 count
= hread(fp
->fp
, &compressed_block
[BLOCK_HEADER_LENGTH
], remaining
);
1020 if (count
!= remaining
) {
1021 j
->errcode
|= BGZF_ERR_IO
;
1025 j
->comp_len
= block_length
;
1026 j
->uncomp_len
= BGZF_MAX_BLOCK_SIZE
;
1027 j
->block_address
= block_address
;
1035 static int bgzf_check_EOF_common(BGZF
*fp
)
1038 off_t offset
= htell(fp
->fp
);
1039 if (hseek(fp
->fp
, -28, SEEK_END
) < 0) {
1040 if (errno
== ESPIPE
) { hclearerr(fp
->fp
); return 2; }
1043 if ( hread(fp
->fp
, buf
, 28) != 28 ) return -1;
1044 if ( hseek(fp
->fp
, offset
, SEEK_SET
) < 0 ) return -1;
1045 return (memcmp("\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", buf
, 28) == 0)? 1 : 0;
1049 * Checks EOF from the reader thread.
1051 static void bgzf_mt_eof(BGZF
*fp
) {
1052 mtaux_t
*mt
= fp
->mt
;
1054 pthread_mutex_lock(&mt
->job_pool_m
);
1055 mt
->eof
= bgzf_check_EOF_common(fp
);
1056 pthread_mutex_unlock(&mt
->job_pool_m
);
1057 pthread_cond_signal(&mt
->command_c
);
1062 * Performs the seek (called by reader thread).
1064 * This simply drains the entire queue, throwing away blocks, seeks,
1065 * and starts it up again. Brute force, but maybe sufficient.
1067 static void bgzf_mt_seek(BGZF
*fp
) {
1068 mtaux_t
*mt
= fp
->mt
;
1070 hts_tpool_process_reset(mt
->out_queue
, 0);
1071 pthread_mutex_lock(&mt
->job_pool_m
);
1075 if (hseek(fp
->fp
, mt
->block_address
, SEEK_SET
) < 0)
1076 mt
->errcode
= BGZF_ERR_IO
;
1078 pthread_mutex_unlock(&mt
->job_pool_m
);
1079 pthread_cond_signal(&mt
->command_c
);
1082 static void *bgzf_mt_reader(void *vp
) {
1083 BGZF
*fp
= (BGZF
*)vp
;
1084 mtaux_t
*mt
= fp
->mt
;
1087 pthread_mutex_lock(&mt
->job_pool_m
);
1088 bgzf_job
*j
= pool_alloc(mt
->job_pool
);
1089 pthread_mutex_unlock(&mt
->job_pool_m
);
1095 while (bgzf_mt_read_block(fp
, j
) == 0) {
1097 hts_tpool_dispatch(mt
->pool
, mt
->out_queue
, bgzf_decode_func
, j
);
1099 // Check for command
1100 pthread_mutex_lock(&mt
->command_m
);
1101 switch (mt
->command
) {
1104 pthread_mutex_unlock(&mt
->command_m
);
1112 pthread_cond_signal(&mt
->command_c
);
1113 pthread_mutex_unlock(&mt
->command_m
);
1114 hts_tpool_process_destroy(mt
->out_queue
);
1120 pthread_mutex_unlock(&mt
->command_m
);
1122 // Allocate buffer for next block
1123 pthread_mutex_lock(&mt
->job_pool_m
);
1124 j
= pool_alloc(mt
->job_pool
);
1125 pthread_mutex_unlock(&mt
->job_pool_m
);
1132 if (j
->errcode
== BGZF_ERR_MT
) {
1133 // Attempt to multi-thread decode a raw gzip stream cannot be done.
1134 // We tear down the multi-threaded decoder and revert to the old code.
1135 hts_tpool_dispatch(mt
->pool
, mt
->out_queue
, bgzf_nul_func
, j
);
1136 hts_tpool_process_ref_decr(mt
->out_queue
);
1137 pthread_exit(&j
->errcode
);
1140 // Dispatch an empty block so EOF is spotted.
1141 // We also use this mechanism for returning errors, in which case
1142 // j->errcode is set already.
1145 hts_tpool_dispatch(mt
->pool
, mt
->out_queue
, bgzf_nul_func
, j
);
1146 if (j
->errcode
!= 0) {
1147 hts_tpool_process_destroy(mt
->out_queue
);
1148 pthread_exit(&j
->errcode
);
1151 // We hit EOF so can stop reading, but we may get a subsequent
1152 // seek request. In this case we need to restart the reader.
1154 // To handle this we wait on a condition variable and then
1155 // monitor the command. (This could be either seek or close.)
1157 pthread_mutex_lock(&mt
->command_m
);
1158 if (mt
->command
== NONE
)
1159 pthread_cond_wait(&mt
->command_c
, &mt
->command_m
);
1160 switch(mt
->command
) {
1162 pthread_mutex_unlock(&mt
->command_m
);
1167 pthread_mutex_unlock(&mt
->command_m
);
1172 pthread_mutex_unlock(&mt
->command_m
);
1176 pthread_cond_signal(&mt
->command_c
);
1177 pthread_mutex_unlock(&mt
->command_m
);
1178 hts_tpool_process_destroy(mt
->out_queue
);
1184 int bgzf_thread_pool(BGZF
*fp
, hts_tpool
*pool
, int qsize
) {
1185 // No gain from multi-threading when not compressed
1186 if (!fp
->is_compressed
)
1190 mt
= (mtaux_t
*)calloc(1, sizeof(mtaux_t
));
1195 mt
->n_threads
= hts_tpool_size(pool
);
1197 qsize
= mt
->n_threads
*2;
1198 if (!(mt
->out_queue
= hts_tpool_process_init(mt
->pool
, qsize
, 0))) {
1202 hts_tpool_process_ref_incr(mt
->out_queue
);
1204 mt
->job_pool
= pool_create(sizeof(bgzf_job
));
1206 pthread_mutex_init(&mt
->job_pool_m
, NULL
);
1207 pthread_mutex_init(&mt
->command_m
, NULL
);
1208 pthread_cond_init(&mt
->command_c
, NULL
);
1209 mt
->flush_pending
= 0;
1210 mt
->jobs_pending
= 0;
1211 mt
->free_block
= fp
->uncompressed_block
; // currently in-use block
1212 pthread_create(&mt
->io_task
, NULL
,
1213 fp
->is_write
? bgzf_mt_writer
: bgzf_mt_reader
, fp
);
1218 int bgzf_mt(BGZF
*fp
, int n_threads
, int n_sub_blks
)
1220 // No gain from multi-threading when not compressed
1221 if (!fp
->is_compressed
|| fp
->is_gzip
)
1224 if (n_threads
< 1) return -1;
1225 hts_tpool
*p
= hts_tpool_init(n_threads
);
1229 if (bgzf_thread_pool(fp
, p
, 0) != 0) {
1230 hts_tpool_destroy(p
);
1234 fp
->mt
->own_pool
= 1;
1239 static void mt_destroy(mtaux_t
*mt
)
1241 pthread_mutex_lock(&mt
->command_m
);
1242 mt
->command
= CLOSE
;
1243 pthread_cond_signal(&mt
->command_c
);
1244 hts_tpool_wake_dispatch(mt
->out_queue
); // unstick the reader
1245 pthread_mutex_unlock(&mt
->command_m
);
1247 // Destroying the queue first forces the writer to exit.
1248 hts_tpool_process_destroy(mt
->out_queue
);
1249 pthread_join(mt
->io_task
, NULL
);
1251 pthread_mutex_destroy(&mt
->job_pool_m
);
1252 pthread_mutex_destroy(&mt
->command_m
);
1253 pthread_cond_destroy(&mt
->command_c
);
1255 pool_free(mt
->job_pool
, mt
->curr_job
);
1258 hts_tpool_destroy(mt
->pool
);
1260 pool_destroy(mt
->job_pool
);
1266 static int mt_queue(BGZF
*fp
)
1268 mtaux_t
*mt
= fp
->mt
;
1270 // Also updated by writer thread
1271 pthread_mutex_lock(&mt
->job_pool_m
);
1272 bgzf_job
*j
= pool_alloc(mt
->job_pool
);
1274 pthread_mutex_unlock(&mt
->job_pool_m
);
1278 j
->uncomp_len
= fp
->block_offset
;
1279 memcpy(j
->uncomp_data
, fp
->uncompressed_block
, j
->uncomp_len
);
1281 // Need non-block vers & job_pending?
1282 hts_tpool_dispatch(mt
->pool
, mt
->out_queue
, bgzf_encode_func
, j
);
1284 fp
->block_offset
= 0;
1288 static int mt_flush_queue(BGZF
*fp
)
1290 mtaux_t
*mt
= fp
->mt
;
1292 // Drain the encoder jobs.
1293 // We cannot use hts_tpool_flush here as it can cause deadlock if
1294 // the queue is full up of decoder tasks. The best solution would
1295 // be to have one input queue per type of job, but we don't right now.
1296 //hts_tpool_flush(mt->pool);
1297 pthread_mutex_lock(&mt
->job_pool_m
);
1298 while (mt
->jobs_pending
!= 0) {
1299 pthread_mutex_unlock(&mt
->job_pool_m
);
1300 usleep(10000); // FIXME: replace by condition variable
1301 pthread_mutex_lock(&mt
->job_pool_m
);
1303 pthread_mutex_unlock(&mt
->job_pool_m
);
1305 // Wait on bgzf_mt_writer to drain the queue
1306 if (hts_tpool_process_flush(mt
->out_queue
) != 0)
1309 return (fp
->errcode
== 0)? 0 : -1;
1312 static int lazy_flush(BGZF
*fp
)
1315 return fp
->block_offset
? mt_queue(fp
) : 0;
1317 return bgzf_flush(fp
);
1320 #else // ~ #ifdef BGZF_MT
1322 int bgzf_mt(BGZF
*fp
, int n_threads
, int n_sub_blks
)
1327 static inline int lazy_flush(BGZF
*fp
)
1329 return bgzf_flush(fp
);
1332 #endif // ~ #ifdef BGZF_MT
1334 int bgzf_flush(BGZF
*fp
)
1336 if (!fp
->is_write
) return 0;
1340 if (fp
->block_offset
) ret
= mt_queue(fp
);
1341 return ret
? ret
: mt_flush_queue(fp
);
1344 while (fp
->block_offset
> 0) {
1346 if ( fp
->idx_build_otf
)
1348 bgzf_index_add_block(fp
);
1349 fp
->idx
->ublock_addr
+= fp
->block_offset
;
1351 block_length
= deflate_block(fp
, fp
->block_offset
);
1352 if (block_length
< 0) {
1353 hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length
, NULL
));
1356 if (hwrite(fp
->fp
, fp
->compressed_block
, block_length
) != block_length
) {
1357 hts_log_error("File write failed (wrong size)");
1358 fp
->errcode
|= BGZF_ERR_IO
; // possibly truncated file
1361 fp
->block_address
+= block_length
;
1366 int bgzf_flush_try(BGZF
*fp
, ssize_t size
)
1368 if (fp
->block_offset
+ size
> BGZF_BLOCK_SIZE
) return lazy_flush(fp
);
1372 ssize_t
bgzf_write(BGZF
*fp
, const void *data
, size_t length
)
1374 if ( !fp
->is_compressed
)
1375 return hwrite(fp
->fp
, data
, length
);
1377 const uint8_t *input
= (const uint8_t*)data
;
1378 ssize_t remaining
= length
;
1379 assert(fp
->is_write
);
1380 while (remaining
> 0) {
1381 uint8_t* buffer
= (uint8_t*)fp
->uncompressed_block
;
1382 int copy_length
= BGZF_BLOCK_SIZE
- fp
->block_offset
;
1383 if (copy_length
> remaining
) copy_length
= remaining
;
1384 memcpy(buffer
+ fp
->block_offset
, input
, copy_length
);
1385 fp
->block_offset
+= copy_length
;
1386 input
+= copy_length
;
1387 remaining
-= copy_length
;
1388 if (fp
->block_offset
== BGZF_BLOCK_SIZE
) {
1389 if (lazy_flush(fp
) != 0) return -1;
1392 return length
- remaining
;
1395 ssize_t
bgzf_block_write(BGZF
*fp
, const void *data
, size_t length
)
1397 if ( !fp
->is_compressed
)
1398 return hwrite(fp
->fp
, data
, length
);
1399 const uint8_t *input
= (const uint8_t*)data
;
1400 ssize_t remaining
= length
;
1401 assert(fp
->is_write
);
1402 uint64_t current_block
; //keep track of current block
1403 uint64_t ublock_size
; // amount of uncompressed data to be fed into next block
1404 while (remaining
> 0) {
1405 current_block
= fp
->idx
->moffs
- fp
->idx
->noffs
;
1406 ublock_size
= fp
->idx
->offs
[current_block
+1].uaddr
-fp
->idx
->offs
[current_block
].uaddr
;
1407 uint8_t* buffer
= (uint8_t*)fp
->uncompressed_block
;
1408 int copy_length
= ublock_size
- fp
->block_offset
;
1409 if (copy_length
> remaining
) copy_length
= remaining
;
1410 memcpy(buffer
+ fp
->block_offset
, input
, copy_length
);
1411 fp
->block_offset
+= copy_length
;
1412 input
+= copy_length
;
1413 remaining
-= copy_length
;
1414 if (fp
->block_offset
== ublock_size
) {
1415 if (lazy_flush(fp
) != 0) return -1;
1416 fp
->idx
->noffs
--; // decrement noffs to track the blocks
1419 return length
- remaining
;
1423 ssize_t
bgzf_raw_write(BGZF
*fp
, const void *data
, size_t length
)
1425 ssize_t ret
= hwrite(fp
->fp
, data
, length
);
1426 if (ret
< 0) fp
->errcode
|= BGZF_ERR_IO
;
1430 int bgzf_close(BGZF
* fp
)
1432 int ret
, block_length
;
1433 if (fp
== 0) return -1;
1434 if (fp
->is_write
&& fp
->is_compressed
) {
1435 if (bgzf_flush(fp
) != 0) return -1;
1436 fp
->compress_level
= -1;
1437 block_length
= deflate_block(fp
, 0); // write an empty block
1438 if (block_length
< 0) {
1439 hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length
, NULL
));
1442 if (hwrite(fp
->fp
, fp
->compressed_block
, block_length
) < 0
1443 || hflush(fp
->fp
) != 0) {
1444 hts_log_error("File write failed");
1445 fp
->errcode
|= BGZF_ERR_IO
;
1451 if (!fp
->mt
->free_block
)
1452 fp
->uncompressed_block
= NULL
;
1458 if (fp
->gz_stream
== NULL
) ret
= Z_OK
;
1459 else if (!fp
->is_write
) ret
= inflateEnd(fp
->gz_stream
);
1460 else ret
= deflateEnd(fp
->gz_stream
);
1462 hts_log_error("Call to inflateEnd/deflateEnd failed: %s", bgzf_zerr(ret
, NULL
));
1464 free(fp
->gz_stream
);
1466 ret
= hclose(fp
->fp
);
1467 if (ret
!= 0) return -1;
1468 bgzf_index_destroy(fp
);
1469 free(fp
->uncompressed_block
);
1475 void bgzf_set_cache_size(BGZF
*fp
, int cache_size
)
1477 if (fp
) fp
->cache_size
= cache_size
;
1480 int bgzf_check_EOF(BGZF
*fp
) {
1484 pthread_mutex_lock(&fp
->mt
->command_m
);
1485 fp
->mt
->command
= HAS_EOF
;
1486 pthread_cond_signal(&fp
->mt
->command_c
);
1487 hts_tpool_wake_dispatch(fp
->mt
->out_queue
);
1488 pthread_cond_wait(&fp
->mt
->command_c
, &fp
->mt
->command_m
);
1489 has_eof
= fp
->mt
->eof
;
1490 pthread_mutex_unlock(&fp
->mt
->command_m
);
1492 has_eof
= bgzf_check_EOF_common(fp
);
1495 fp
->no_eof_block
= (has_eof
== 0);
1500 int64_t bgzf_seek(BGZF
* fp
, int64_t pos
, int where
)
1503 int64_t block_address
;
1505 if (fp
->is_write
|| where
!= SEEK_SET
|| fp
->is_gzip
) {
1506 fp
->errcode
|= BGZF_ERR_MISUSE
;
1509 block_offset
= pos
& 0xFFFF;
1510 block_address
= pos
>> 16;
1513 // The reader runs asynchronous and does loops of:
1515 // Check & process command
1516 // Dispatch decode job
1518 // Once at EOF it then switches to loops of
1520 // Process command (possibly switching back to above loop).
1522 // To seek we therefore send the reader thread a SEEK command,
1523 // waking it up if blocked in dispatch and signalling if
1524 // waiting for a command. We then wait for the response so we
1525 // know the seek succeeded.
1526 pthread_mutex_lock(&fp
->mt
->command_m
);
1527 fp
->mt
->hit_eof
= 0;
1528 fp
->mt
->command
= SEEK
;
1529 fp
->mt
->block_address
= block_address
;
1530 pthread_cond_signal(&fp
->mt
->command_c
);
1531 hts_tpool_wake_dispatch(fp
->mt
->out_queue
);
1532 pthread_cond_wait(&fp
->mt
->command_c
, &fp
->mt
->command_m
);
1534 fp
->block_length
= 0; // indicates current block has not been loaded
1535 fp
->block_address
= block_address
;
1536 fp
->block_offset
= block_offset
;
1538 pthread_mutex_unlock(&fp
->mt
->command_m
);
1540 if (hseek(fp
->fp
, block_address
, SEEK_SET
) < 0) {
1541 fp
->errcode
|= BGZF_ERR_IO
;
1544 fp
->block_length
= 0; // indicates current block has not been loaded
1545 fp
->block_address
= block_address
;
1546 fp
->block_offset
= block_offset
;
1552 int bgzf_is_bgzf(const char *fn
)
1557 if ((fp
= hopen(fn
, "r")) == 0) return 0;
1558 n
= hread(fp
, buf
, 16);
1559 if (hclose(fp
) < 0) return 0;
1560 if (n
!= 16) return 0;
1561 return check_header(buf
) == 0? 1 : 0;
1564 int bgzf_compression(BGZF
*fp
)
1566 return (!fp
->is_compressed
)? no_compression
: (fp
->is_gzip
)? gzip
: bgzf
;
1569 int bgzf_getc(BGZF
*fp
)
1571 if (fp
->block_offset
+1 < fp
->block_length
) {
1572 fp
->uncompressed_address
++;
1573 return ((unsigned char*)fp
->uncompressed_block
)[fp
->block_offset
++];
1577 if (fp
->block_offset
>= fp
->block_length
) {
1578 if (bgzf_read_block(fp
) != 0) return -2; /* error */
1579 if (fp
->block_length
== 0) return -1; /* end-of-file */
1581 c
= ((unsigned char*)fp
->uncompressed_block
)[fp
->block_offset
++];
1582 if (fp
->block_offset
== fp
->block_length
) {
1583 fp
->block_address
= bgzf_htell(fp
);
1584 fp
->block_offset
= 0;
1585 fp
->block_length
= 0;
1587 fp
->uncompressed_address
++;
1592 #define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
1595 int bgzf_getline(BGZF
*fp
, int delim
, kstring_t
*str
)
1600 if (fp
->block_offset
>= fp
->block_length
) {
1601 if (bgzf_read_block(fp
) != 0) { state
= -2; break; }
1602 if (fp
->block_length
== 0) { state
= -1; break; }
1604 unsigned char *buf
= fp
->uncompressed_block
;
1605 for (l
= fp
->block_offset
; l
< fp
->block_length
&& buf
[l
] != delim
; ++l
);
1606 if (l
< fp
->block_length
) state
= 1;
1607 l
-= fp
->block_offset
;
1608 if (str
->l
+ l
+ 1 >= str
->m
) {
1609 str
->m
= str
->l
+ l
+ 2;
1611 str
->s
= (char*)realloc(str
->s
, str
->m
);
1613 memcpy(str
->s
+ str
->l
, buf
+ fp
->block_offset
, l
);
1615 fp
->block_offset
+= l
+ 1;
1616 if (fp
->block_offset
>= fp
->block_length
) {
1617 fp
->block_address
= bgzf_htell(fp
);
1618 fp
->block_offset
= 0;
1619 fp
->block_length
= 0;
1621 } while (state
== 0);
1622 if (str
->l
== 0 && state
< 0) return state
;
1623 fp
->uncompressed_address
+= str
->l
+ 1;
1624 if ( delim
=='\n' && str
->l
>0 && str
->s
[str
->l
-1]=='\r' ) str
->l
--;
1629 void bgzf_index_destroy(BGZF
*fp
)
1631 if ( !fp
->idx
) return;
1632 free(fp
->idx
->offs
);
1635 fp
->idx_build_otf
= 0;
1638 int bgzf_index_build_init(BGZF
*fp
)
1640 bgzf_index_destroy(fp
);
1641 fp
->idx
= (bgzidx_t
*) calloc(1,sizeof(bgzidx_t
));
1642 if ( !fp
->idx
) return -1;
1643 fp
->idx_build_otf
= 1; // build index on the fly
1647 int bgzf_index_add_block(BGZF
*fp
)
1650 if ( fp
->idx
->noffs
> fp
->idx
->moffs
)
1652 fp
->idx
->moffs
= fp
->idx
->noffs
;
1653 kroundup32(fp
->idx
->moffs
);
1654 fp
->idx
->offs
= (bgzidx1_t
*) realloc(fp
->idx
->offs
, fp
->idx
->moffs
*sizeof(bgzidx1_t
));
1655 if ( !fp
->idx
->offs
) return -1;
1657 fp
->idx
->offs
[ fp
->idx
->noffs
-1 ].uaddr
= fp
->idx
->ublock_addr
;
1658 fp
->idx
->offs
[ fp
->idx
->noffs
-1 ].caddr
= fp
->block_address
;
1662 static inline int hwrite_uint64(uint64_t x
, hFILE
*f
)
1664 if (ed_is_big()) x
= ed_swap_8(x
);
1665 if (hwrite(f
, &x
, sizeof(x
)) != sizeof(x
)) return -1;
1669 static char * get_name_suffix(const char *bname
, const char *suffix
)
1671 size_t len
= strlen(bname
) + strlen(suffix
) + 1;
1672 char *buff
= malloc(len
);
1673 if (!buff
) return NULL
;
1674 snprintf(buff
, len
, "%s%s", bname
, suffix
);
1678 int bgzf_index_dump_hfile(BGZF
*fp
, struct hFILE
*idx
, const char *name
)
1680 // Note that the index contains one extra record when indexing files opened
1681 // for reading. The terminating record is not present when opened for writing.
1682 // This is not a bug.
1687 hts_log_error("Called for BGZF handle with no index");
1692 if (bgzf_flush(fp
) != 0) return -1;
1694 if (hwrite_uint64(fp
->idx
->noffs
- 1, idx
) < 0) goto fail
;
1695 for (i
=1; i
<fp
->idx
->noffs
; i
++)
1697 if (hwrite_uint64(fp
->idx
->offs
[i
].caddr
, idx
) < 0) goto fail
;
1698 if (hwrite_uint64(fp
->idx
->offs
[i
].uaddr
, idx
) < 0) goto fail
;
1703 hts_log_error("Error writing to %s : %s", name
? name
: "index", strerror(errno
));
1707 int bgzf_index_dump(BGZF
*fp
, const char *bname
, const char *suffix
)
1709 const char *name
= bname
, *msg
= NULL
;
1714 hts_log_error("Called for BGZF handle with no index");
1721 tmp
= get_name_suffix(bname
, suffix
);
1722 if ( !tmp
) return -1;
1726 idx
= hopen(name
, "wb");
1728 msg
= "Error opening";
1732 if (bgzf_index_dump_hfile(fp
, idx
, name
) != 0) goto fail
;
1734 if (hclose(idx
) < 0)
1737 msg
= "Error on closing";
1746 hts_log_error("%s %s : %s", msg
, name
, strerror(errno
));
1748 if (idx
) hclose_abruptly(idx
);
1753 static inline int hread_uint64(uint64_t *xptr
, hFILE
*f
)
1755 if (hread(f
, xptr
, sizeof(*xptr
)) != sizeof(*xptr
)) return -1;
1756 if (ed_is_big()) ed_swap_8p(xptr
);
1760 int bgzf_index_load_hfile(BGZF
*fp
, struct hFILE
*idx
, const char *name
)
1762 fp
->idx
= (bgzidx_t
*) calloc(1,sizeof(bgzidx_t
));
1763 if (fp
->idx
== NULL
) goto fail
;
1765 if (hread_uint64(&x
, idx
) < 0) goto fail
;
1767 fp
->idx
->noffs
= fp
->idx
->moffs
= x
+ 1;
1768 fp
->idx
->offs
= (bgzidx1_t
*) malloc(fp
->idx
->moffs
*sizeof(bgzidx1_t
));
1769 if (fp
->idx
->offs
== NULL
) goto fail
;
1770 fp
->idx
->offs
[0].caddr
= fp
->idx
->offs
[0].uaddr
= 0;
1773 for (i
=1; i
<fp
->idx
->noffs
; i
++)
1775 if (hread_uint64(&fp
->idx
->offs
[i
].caddr
, idx
) < 0) goto fail
;
1776 if (hread_uint64(&fp
->idx
->offs
[i
].uaddr
, idx
) < 0) goto fail
;
1782 hts_log_error("Error reading %s : %s", name
? name
: "index", strerror(errno
));
1784 free(fp
->idx
->offs
);
1791 int bgzf_index_load(BGZF
*fp
, const char *bname
, const char *suffix
)
1793 const char *name
= bname
, *msg
= NULL
;
1798 tmp
= get_name_suffix(bname
, suffix
);
1799 if ( !tmp
) return -1;
1803 idx
= hopen(name
, "rb");
1805 msg
= "Error opening";
1809 if (bgzf_index_load_hfile(fp
, idx
, name
) != 0) goto fail
;
1811 if (hclose(idx
) != 0) {
1813 msg
= "Error closing";
1822 hts_log_error("%s %s : %s", msg
, name
, strerror(errno
));
1824 if (idx
) hclose_abruptly(idx
);
1829 int bgzf_useek(BGZF
*fp
, long uoffset
, int where
)
1831 if ( !fp
->is_compressed
)
1833 if (hseek(fp
->fp
, uoffset
, SEEK_SET
) < 0)
1835 fp
->errcode
|= BGZF_ERR_IO
;
1838 fp
->block_length
= 0; // indicates current block has not been loaded
1839 fp
->block_address
= uoffset
;
1840 fp
->block_offset
= 0;
1841 if (bgzf_read_block(fp
) < 0) {
1842 fp
->errcode
|= BGZF_ERR_IO
;
1845 fp
->uncompressed_address
= uoffset
;
1851 fp
->errcode
|= BGZF_ERR_IO
;
1856 int ilo
= 0, ihi
= fp
->idx
->noffs
- 1;
1859 int i
= (ilo
+ihi
)*0.5;
1860 if ( uoffset
< fp
->idx
->offs
[i
].uaddr
) ihi
= i
- 1;
1861 else if ( uoffset
>= fp
->idx
->offs
[i
].uaddr
) ilo
= i
+ 1;
1865 if (hseek(fp
->fp
, fp
->idx
->offs
[i
].caddr
, SEEK_SET
) < 0)
1867 fp
->errcode
|= BGZF_ERR_IO
;
1870 fp
->block_length
= 0; // indicates current block has not been loaded
1871 fp
->block_address
= fp
->idx
->offs
[i
].caddr
;
1872 fp
->block_offset
= 0;
1873 if ( bgzf_read_block(fp
) < 0 ) {
1874 fp
->errcode
|= BGZF_ERR_IO
;
1877 if ( uoffset
- fp
->idx
->offs
[i
].uaddr
> 0 )
1879 fp
->block_offset
= uoffset
- fp
->idx
->offs
[i
].uaddr
;
1880 assert( fp
->block_offset
<= fp
->block_length
); // todo: skipped, unindexed, blocks
1882 fp
->uncompressed_address
= uoffset
;
1886 long bgzf_utell(BGZF
*fp
)
1888 return fp
->uncompressed_address
; // currently maintained only when reading
1891 /* prototype is in hfile_internal.h */
1892 struct hFILE
*bgzf_hfile(struct BGZF
*fp
) {