modified: src1/input.c
[GalaxyCodeBases.git] / c_cpp / lib / htslib / bgzf.c
blob7e80b8cfd8ee1cc517a95354c726fe84ed904703
1 /* The MIT License
3 Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4 2011, 2012 Attractive Chaos <attractor@live.co.uk>
5 Copyright (C) 2009, 2013-2017 Genome Research Ltd
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 THE SOFTWARE.
26 #include <config.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <unistd.h>
33 #include <assert.h>
34 #include <pthread.h>
35 #include <sys/types.h>
36 #include <inttypes.h>
38 #include "htslib/hts.h"
39 #include "htslib/bgzf.h"
40 #include "htslib/hfile.h"
41 #include "htslib/thread_pool.h"
42 #include "cram/pooled_alloc.h"
44 #define BGZF_CACHE
45 #define BGZF_MT
47 #define BLOCK_HEADER_LENGTH 18
48 #define BLOCK_FOOTER_LENGTH 8
51 /* BGZF/GZIP header (speciallized from RFC 1952; little endian):
52 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
53 | 31|139| 8| 4| 0| 0|255| 6| 66| 67| 2|BLK_LEN|
54 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
55 BGZF extension:
56 ^ ^ ^ ^
57 | | | |
58 FLG.EXTRA XLEN B C
60 BGZF format is compatible with GZIP. It limits the size of each compressed
61 block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
62 records the size.
65 static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
67 #ifdef BGZF_CACHE
68 typedef struct {
69 int size;
70 uint8_t *block;
71 int64_t end_offset;
72 } cache_t;
73 #include "htslib/khash.h"
74 KHASH_MAP_INIT_INT64(cache, cache_t)
75 #endif
77 #ifdef BGZF_MT
79 typedef struct bgzf_job {
80 BGZF *fp;
81 unsigned char comp_data[BGZF_MAX_BLOCK_SIZE];
82 size_t comp_len;
83 unsigned char uncomp_data[BGZF_MAX_BLOCK_SIZE];
84 size_t uncomp_len;
85 int errcode;
86 int64_t block_address;
87 int hit_eof;
88 } bgzf_job;
90 enum mtaux_cmd {
91 NONE = 0,
92 SEEK,
93 HAS_EOF,
94 CLOSE,
97 typedef struct bgzf_mtaux_t {
98 // Memory pool for bgzf_job structs, to avoid many malloc/free
99 pool_alloc_t *job_pool;
100 bgzf_job *curr_job;
102 // Thread pool
103 int n_threads;
104 int own_pool;
105 hts_tpool *pool;
107 // Output queue holding completed bgzf_jobs
108 hts_tpool_process *out_queue;
110 // I/O thread.
111 pthread_t io_task;
112 pthread_mutex_t job_pool_m;
113 int jobs_pending; // number of jobs waiting
114 int flush_pending;
115 void *free_block;
116 int hit_eof; // r/w entirely within main thread
118 // Message passing to the reader thread; eg seek requests
119 int errcode;
120 uint64_t block_address;
121 int eof;
122 pthread_mutex_t command_m; // Set whenever fp is being updated
123 pthread_cond_t command_c;
124 enum mtaux_cmd command;
125 } mtaux_t;
126 #endif
128 typedef struct
130 uint64_t uaddr; // offset w.r.t. uncompressed data
131 uint64_t caddr; // offset w.r.t. compressed data
133 bgzidx1_t;
135 struct __bgzidx_t
137 int noffs, moffs; // the size of the index, n:used, m:allocated
138 bgzidx1_t *offs; // offsets
139 uint64_t ublock_addr; // offset of the current block (uncompressed data)
142 void bgzf_index_destroy(BGZF *fp);
143 int bgzf_index_add_block(BGZF *fp);
144 static void mt_destroy(mtaux_t *mt);
146 static inline void packInt16(uint8_t *buffer, uint16_t value)
148 buffer[0] = value;
149 buffer[1] = value >> 8;
152 static inline int unpackInt16(const uint8_t *buffer)
154 return buffer[0] | buffer[1] << 8;
157 static inline void packInt32(uint8_t *buffer, uint32_t value)
159 buffer[0] = value;
160 buffer[1] = value >> 8;
161 buffer[2] = value >> 16;
162 buffer[3] = value >> 24;
165 static const char *bgzf_zerr(int errnum, z_stream *zs)
167 static char buffer[32];
169 /* Return zs->msg if available.
170 zlib doesn't set this very reliably. Looking at the source suggests
171 that it may get set to a useful message for deflateInit2, inflateInit2
172 and inflate when it returns Z_DATA_ERROR. For inflate with other
173 return codes, deflate, deflateEnd and inflateEnd it doesn't appear
174 to be useful. For the likely non-useful cases, the caller should
175 pass NULL into zs. */
177 if (zs && zs->msg) return zs->msg;
179 // gzerror OF((gzFile file, int *errnum)
180 switch (errnum) {
181 case Z_ERRNO:
182 return strerror(errno);
183 case Z_STREAM_ERROR:
184 return "invalid parameter/compression level, or inconsistent stream state";
185 case Z_DATA_ERROR:
186 return "invalid or incomplete IO";
187 case Z_MEM_ERROR:
188 return "out of memory";
189 case Z_BUF_ERROR:
190 return "progress temporarily not possible, or in() / out() returned an error";
191 case Z_VERSION_ERROR:
192 return "zlib version mismatch";
193 case Z_OK: // 0: maybe gzgets error Z_NULL
194 default:
195 snprintf(buffer, sizeof(buffer), "[%d] unknown", errnum);
196 return buffer; // FIXME: Not thread-safe.
200 static BGZF *bgzf_read_init(hFILE *hfpr)
202 BGZF *fp;
203 uint8_t magic[18];
204 ssize_t n = hpeek(hfpr, magic, 18);
205 if (n < 0) return NULL;
207 fp = (BGZF*)calloc(1, sizeof(BGZF));
208 if (fp == NULL) return NULL;
210 fp->is_write = 0;
211 fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
212 if (fp->uncompressed_block == NULL) { free(fp); return NULL; }
213 fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
214 fp->is_compressed = (n==18 && magic[0]==0x1f && magic[1]==0x8b);
215 fp->is_gzip = ( !fp->is_compressed || ((magic[3]&4) && memcmp(&magic[12], "BC\2\0",4)==0) ) ? 0 : 1;
216 #ifdef BGZF_CACHE
217 fp->cache = kh_init(cache);
218 #endif
219 return fp;
222 // get the compress level from the mode string: compress_level==-1 for the default level, -2 plain uncompressed
223 static int mode2level(const char *mode)
225 int i, compress_level = -1;
226 for (i = 0; mode[i]; ++i)
227 if (mode[i] >= '0' && mode[i] <= '9') break;
228 if (mode[i]) compress_level = (int)mode[i] - '0';
229 if (strchr(mode, 'u')) compress_level = -2;
230 return compress_level;
232 static BGZF *bgzf_write_init(const char *mode)
234 BGZF *fp;
235 fp = (BGZF*)calloc(1, sizeof(BGZF));
236 if (fp == NULL) goto mem_fail;
237 fp->is_write = 1;
238 int compress_level = mode2level(mode);
239 if ( compress_level==-2 )
241 fp->is_compressed = 0;
242 return fp;
244 fp->is_compressed = 1;
246 fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
247 if (fp->uncompressed_block == NULL) goto mem_fail;
248 fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
250 fp->compress_level = compress_level < 0? Z_DEFAULT_COMPRESSION : compress_level; // Z_DEFAULT_COMPRESSION==-1
251 if (fp->compress_level > 9) fp->compress_level = Z_DEFAULT_COMPRESSION;
252 if ( strchr(mode,'g') )
254 // gzip output
255 fp->is_gzip = 1;
256 fp->gz_stream = (z_stream*)calloc(1,sizeof(z_stream));
257 if (fp->gz_stream == NULL) goto mem_fail;
258 fp->gz_stream->zalloc = NULL;
259 fp->gz_stream->zfree = NULL;
260 fp->gz_stream->msg = NULL;
262 int ret = deflateInit2(fp->gz_stream, fp->compress_level, Z_DEFLATED, 15|16, 8, Z_DEFAULT_STRATEGY);
263 if (ret!=Z_OK) {
264 hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret, fp->gz_stream));
265 goto fail;
268 return fp;
270 mem_fail:
271 hts_log_error("%s", strerror(errno));
273 fail:
274 if (fp != NULL) {
275 free(fp->uncompressed_block);
276 free(fp->gz_stream);
277 free(fp);
279 return NULL;
282 BGZF *bgzf_open(const char *path, const char *mode)
284 BGZF *fp = 0;
285 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
286 if (strchr(mode, 'r')) {
287 hFILE *fpr;
288 if ((fpr = hopen(path, mode)) == 0) return 0;
289 fp = bgzf_read_init(fpr);
290 if (fp == 0) { hclose_abruptly(fpr); return NULL; }
291 fp->fp = fpr;
292 } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
293 hFILE *fpw;
294 if ((fpw = hopen(path, mode)) == 0) return 0;
295 fp = bgzf_write_init(mode);
296 if (fp == NULL) return NULL;
297 fp->fp = fpw;
299 else { errno = EINVAL; return 0; }
301 fp->is_be = ed_is_big();
302 return fp;
305 BGZF *bgzf_dopen(int fd, const char *mode)
307 BGZF *fp = 0;
308 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
309 if (strchr(mode, 'r')) {
310 hFILE *fpr;
311 if ((fpr = hdopen(fd, mode)) == 0) return 0;
312 fp = bgzf_read_init(fpr);
313 if (fp == 0) { hclose_abruptly(fpr); return NULL; } // FIXME this closes fd
314 fp->fp = fpr;
315 } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
316 hFILE *fpw;
317 if ((fpw = hdopen(fd, mode)) == 0) return 0;
318 fp = bgzf_write_init(mode);
319 if (fp == NULL) return NULL;
320 fp->fp = fpw;
322 else { errno = EINVAL; return 0; }
324 fp->is_be = ed_is_big();
325 return fp;
328 BGZF *bgzf_hopen(hFILE *hfp, const char *mode)
330 BGZF *fp = NULL;
331 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
332 if (strchr(mode, 'r')) {
333 fp = bgzf_read_init(hfp);
334 if (fp == NULL) return NULL;
335 } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
336 fp = bgzf_write_init(mode);
337 if (fp == NULL) return NULL;
339 else { errno = EINVAL; return 0; }
341 fp->fp = hfp;
342 fp->is_be = ed_is_big();
343 return fp;
346 int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
348 uint32_t crc;
349 z_stream zs;
350 uint8_t *dst = (uint8_t*)_dst;
352 // compress the body
353 zs.zalloc = NULL; zs.zfree = NULL;
354 zs.msg = NULL;
355 zs.next_in = (Bytef*)src;
356 zs.avail_in = slen;
357 zs.next_out = dst + BLOCK_HEADER_LENGTH;
358 zs.avail_out = *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
359 int ret = deflateInit2(&zs, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); // -15 to disable zlib header/footer
360 if (ret!=Z_OK) {
361 hts_log_error("Call to deflateInit2 failed: %s", bgzf_zerr(ret, &zs));
362 return -1;
364 if ((ret = deflate(&zs, Z_FINISH)) != Z_STREAM_END) {
365 hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
366 return -1;
368 if ((ret = deflateEnd(&zs)) != Z_OK) {
369 hts_log_error("Call to deflateEnd failed: %s", bgzf_zerr(ret, NULL));
370 return -1;
372 *dlen = zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
373 // write the header
374 memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
375 packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
376 // write the footer
377 crc = crc32(crc32(0L, NULL, 0L), (Bytef*)src, slen);
378 packInt32((uint8_t*)&dst[*dlen - 8], crc);
379 packInt32((uint8_t*)&dst[*dlen - 4], slen);
380 return 0;
383 static int bgzf_gzip_compress(BGZF *fp, void *_dst, size_t *dlen, const void *src, size_t slen, int level)
385 uint8_t *dst = (uint8_t*)_dst;
386 z_stream *zs = fp->gz_stream;
387 int flush = slen ? Z_PARTIAL_FLUSH : Z_FINISH;
388 zs->next_in = (Bytef*)src;
389 zs->avail_in = slen;
390 zs->next_out = dst;
391 zs->avail_out = *dlen;
392 int ret = deflate(zs, flush);
393 if (ret == Z_STREAM_ERROR) {
394 hts_log_error("Deflate operation failed: %s", bgzf_zerr(ret, NULL));
395 return -1;
397 if (zs->avail_in != 0) {
398 hts_log_error("Deflate block too large for output buffer");
399 return -1;
401 *dlen = *dlen - zs->avail_out;
402 return 0;
405 // Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
406 static int deflate_block(BGZF *fp, int block_length)
408 size_t comp_size = BGZF_MAX_BLOCK_SIZE;
409 int ret;
410 if ( !fp->is_gzip )
411 ret = bgzf_compress(fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
412 else
413 ret = bgzf_gzip_compress(fp, fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
415 if ( ret != 0 )
417 hts_log_debug("Compression error %d", ret);
418 fp->errcode |= BGZF_ERR_ZLIB;
419 return -1;
421 fp->block_offset = 0;
422 return comp_size;
425 static int bgzf_uncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen) {
426 z_stream zs;
427 zs.zalloc = NULL;
428 zs.zfree = NULL;
429 zs.msg = NULL;
430 zs.next_in = (Bytef*)src;
431 zs.avail_in = slen;
432 zs.next_out = (Bytef*)dst;
433 zs.avail_out = *dlen;
435 int ret = inflateInit2(&zs, -15);
436 if (ret != Z_OK) {
437 hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret, &zs));
438 return -1;
440 if ((ret = inflate(&zs, Z_FINISH)) != Z_STREAM_END) {
441 hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
442 if ((ret = inflateEnd(&zs)) != Z_OK) {
443 hts_log_warning("Call to inflateEnd failed: %s", bgzf_zerr(ret, NULL));
445 return -1;
447 if ((ret = inflateEnd(&zs)) != Z_OK) {
448 hts_log_error("Call to inflateEnd failed: %s", bgzf_zerr(ret, NULL));
449 return -1;
451 *dlen = *dlen - zs.avail_out;
452 return 0;
455 // Inflate the block in fp->compressed_block into fp->uncompressed_block
456 static int inflate_block(BGZF* fp, int block_length)
458 size_t dlen = BGZF_MAX_BLOCK_SIZE;
459 int ret = bgzf_uncompress(fp->uncompressed_block, &dlen,
460 (Bytef*)fp->compressed_block + 18, block_length - 18);
461 if (ret < 0) {
462 fp->errcode |= BGZF_ERR_ZLIB;
463 return -1;
466 return dlen;
469 static int inflate_gzip_block(BGZF *fp, int cached)
471 int ret = Z_OK;
474 if ( !cached && fp->gz_stream->avail_out!=0 )
476 fp->gz_stream->avail_in = hread(fp->fp, fp->compressed_block, BGZF_BLOCK_SIZE);
477 if ( fp->gz_stream->avail_in<=0 ) return fp->gz_stream->avail_in;
478 if ( fp->gz_stream->avail_in==0 ) break;
479 fp->gz_stream->next_in = fp->compressed_block;
481 else cached = 0;
484 fp->gz_stream->next_out = (Bytef*)fp->uncompressed_block + fp->block_offset;
485 fp->gz_stream->avail_out = BGZF_MAX_BLOCK_SIZE - fp->block_offset;
486 fp->gz_stream->msg = NULL;
487 ret = inflate(fp->gz_stream, Z_NO_FLUSH);
488 if (ret < 0 && ret != Z_BUF_ERROR) {
489 hts_log_error("Inflate operation failed: %s", bgzf_zerr(ret, ret == Z_DATA_ERROR ? fp->gz_stream : NULL));
490 fp->errcode |= BGZF_ERR_ZLIB;
491 return -1;
493 unsigned int have = BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
494 if ( have ) return have;
496 while ( fp->gz_stream->avail_out == 0 );
498 while (ret != Z_STREAM_END);
499 return BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
502 // Returns: 0 on success (BGZF header); -1 on non-BGZF GZIP header; -2 on error
503 static int check_header(const uint8_t *header)
505 if ( header[0] != 31 || header[1] != 139 || header[2] != 8 ) return -2;
506 return ((header[3] & 4) != 0
507 && unpackInt16((uint8_t*)&header[10]) == 6
508 && header[12] == 'B' && header[13] == 'C'
509 && unpackInt16((uint8_t*)&header[14]) == 2) ? 0 : -1;
512 #ifdef BGZF_CACHE
513 static void free_cache(BGZF *fp)
515 khint_t k;
516 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
517 if (fp->is_write) return;
518 for (k = kh_begin(h); k < kh_end(h); ++k)
519 if (kh_exist(h, k)) free(kh_val(h, k).block);
520 kh_destroy(cache, h);
523 static int load_block_from_cache(BGZF *fp, int64_t block_address)
525 khint_t k;
526 cache_t *p;
528 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
529 k = kh_get(cache, h, block_address);
530 if (k == kh_end(h)) return 0;
531 p = &kh_val(h, k);
532 if (fp->block_length != 0) fp->block_offset = 0;
533 fp->block_address = block_address;
534 fp->block_length = p->size;
535 // FIXME: why BGZF_MAX_BLOCK_SIZE and not p->size?
536 memcpy(fp->uncompressed_block, p->block, BGZF_MAX_BLOCK_SIZE);
537 if ( hseek(fp->fp, p->end_offset, SEEK_SET) < 0 )
539 // todo: move the error up
540 hts_log_error("Could not hseek to %"PRId64"", p->end_offset);
541 exit(1);
543 return p->size;
546 static void cache_block(BGZF *fp, int size)
548 int ret;
549 khint_t k;
550 cache_t *p;
551 //fprintf(stderr, "Cache block at %llx\n", (int)fp->block_address);
552 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
553 if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return;
554 if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > (uint32_t)fp->cache_size) {
555 /* A better way would be to remove the oldest block in the
556 * cache, but here we remove a random one for simplicity. This
557 * should not have a big impact on performance. */
558 for (k = kh_begin(h); k < kh_end(h); ++k)
559 if (kh_exist(h, k)) break;
560 if (k < kh_end(h)) {
561 free(kh_val(h, k).block);
562 kh_del(cache, h, k);
565 k = kh_put(cache, h, fp->block_address, &ret);
566 if (ret == 0) return; // if this happens, a bug!
567 p = &kh_val(h, k);
568 p->size = fp->block_length;
569 p->end_offset = fp->block_address + size;
570 p->block = (uint8_t*)malloc(BGZF_MAX_BLOCK_SIZE);
571 memcpy(kh_val(h, k).block, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
573 #else
574 static void free_cache(BGZF *fp) {}
575 static int load_block_from_cache(BGZF *fp, int64_t block_address) {return 0;}
576 static void cache_block(BGZF *fp, int size) {}
577 #endif
580 * Absolute htell in this compressed file.
582 * Do not confuse with the external bgzf_tell macro which returns the virtual
583 * offset.
585 static off_t bgzf_htell(BGZF *fp) {
586 if (fp->mt) {
587 pthread_mutex_lock(&fp->mt->job_pool_m);
588 off_t pos = fp->block_address + fp->block_clength;
589 pthread_mutex_unlock(&fp->mt->job_pool_m);
590 return pos;
591 } else {
592 return htell(fp->fp);
596 int bgzf_read_block(BGZF *fp)
598 hts_tpool_result *r;
600 if (fp->mt) {
601 again:
602 if (fp->mt->hit_eof) {
603 // Further reading at EOF will always return 0
604 fp->block_length = 0;
605 return 0;
607 r = hts_tpool_next_result_wait(fp->mt->out_queue);
608 bgzf_job *j = r ? (bgzf_job *)hts_tpool_result_data(r) : NULL;
610 if (!j || j->errcode == BGZF_ERR_MT) {
611 if (!fp->mt->free_block) {
612 fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
613 if (fp->uncompressed_block == NULL) return -1;
614 fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
615 } // else it's already allocated with malloc, maybe even in-use.
616 mt_destroy(fp->mt);
617 fp->mt = NULL;
618 hts_tpool_delete_result(r, 0);
620 goto single_threaded;
623 if (j->errcode) {
624 fp->errcode = j->errcode;
625 return -1;
628 if (j->hit_eof) {
629 if (!fp->last_block_eof && !fp->no_eof_block) {
630 fp->no_eof_block = 1;
631 hts_log_warning("EOF marker is absent. The input is probably truncated");
633 fp->mt->hit_eof = 1;
636 // Zero length blocks in the middle of a file are (wrongly)
637 // considered as EOF by many callers. We work around this by
638 // trying again to see if we hit a genuine EOF.
639 if (!j->hit_eof && j->uncomp_len == 0) {
640 fp->last_block_eof = 1;
641 hts_tpool_delete_result(r, 0);
642 goto again;
645 // block_length=0 and block_offset set by bgzf_seek.
646 if (fp->block_length != 0) fp->block_offset = 0;
647 if (!j->hit_eof) fp->block_address = j->block_address;
648 fp->block_clength = j->comp_len;
649 fp->block_length = j->uncomp_len;
650 // bgzf_read() can change fp->block_length
651 fp->last_block_eof = (fp->block_length == 0);
653 if ( j->uncomp_len && j->fp->idx_build_otf )
655 bgzf_index_add_block(j->fp);
656 j->fp->idx->ublock_addr += j->uncomp_len;
659 // Steal the data block as it's quicker than a memcpy.
660 // We just need to make sure we delay the pool free.
661 if (fp->mt->curr_job) {
662 pthread_mutex_lock(&fp->mt->job_pool_m);
663 pool_free(fp->mt->job_pool, fp->mt->curr_job);
664 pthread_mutex_unlock(&fp->mt->job_pool_m);
666 fp->uncompressed_block = j->uncomp_data;
667 fp->mt->curr_job = j;
668 if (fp->mt->free_block) {
669 free(fp->mt->free_block); // clear up last non-mt block
670 fp->mt->free_block = NULL;
673 hts_tpool_delete_result(r, 0);
674 return 0;
677 uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
678 int count, size, block_length, remaining;
680 single_threaded:
681 size = 0;
683 // Reading an uncompressed file
684 if ( !fp->is_compressed )
686 count = hread(fp->fp, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
687 if (count < 0) // Error
689 fp->errcode |= BGZF_ERR_IO;
690 return -1;
692 else if (count == 0) // EOF
694 fp->block_length = 0;
695 return 0;
697 if (fp->block_length != 0) fp->block_offset = 0;
698 fp->block_address += count;
699 fp->block_length = count;
700 return 0;
703 // Reading compressed file
704 int64_t block_address;
705 block_address = bgzf_htell(fp);
706 if ( fp->is_gzip && fp->gz_stream ) // is this is an initialized gzip stream?
708 count = inflate_gzip_block(fp, 0);
709 if ( count<0 )
711 fp->errcode |= BGZF_ERR_ZLIB;
712 return -1;
714 fp->block_length = count;
715 fp->block_address = block_address;
716 return 0;
718 if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
720 // loop to skip empty bgzf blocks
721 while (1)
723 count = hread(fp->fp, header, sizeof(header));
724 if (count == 0) { // no data read
725 if (!fp->last_block_eof && !fp->no_eof_block && !fp->is_gzip) {
726 fp->no_eof_block = 1;
727 hts_log_warning("EOF marker is absent. The input is probably truncated");
729 fp->block_length = 0;
730 return 0;
732 int ret;
733 if ( count != sizeof(header) || (ret=check_header(header))==-2 )
735 fp->errcode |= BGZF_ERR_HEADER;
736 return -1;
738 if ( ret==-1 )
740 // GZIP, not BGZF
741 uint8_t *cblock = (uint8_t*)fp->compressed_block;
742 memcpy(cblock, header, sizeof(header));
743 count = hread(fp->fp, cblock+sizeof(header), BGZF_BLOCK_SIZE - sizeof(header)) + sizeof(header);
744 int nskip = 10;
746 // Check optional fields to skip: FLG.FNAME,FLG.FCOMMENT,FLG.FHCRC,FLG.FEXTRA
747 // Note: Some of these fields are untested, I did not have appropriate data available
748 if ( header[3] & 0x4 ) // FLG.FEXTRA
750 nskip += unpackInt16(&cblock[nskip]) + 2;
752 if ( header[3] & 0x8 ) // FLG.FNAME
754 while ( nskip<count && cblock[nskip] ) nskip++;
755 nskip++;
757 if ( header[3] & 0x10 ) // FLG.FCOMMENT
759 while ( nskip<count && cblock[nskip] ) nskip++;
760 nskip++;
762 if ( header[3] & 0x2 ) nskip += 2; // FLG.FHCRC
764 /* FIXME: Should handle this better. There's no reason why
765 someone shouldn't include a massively long comment in their
766 gzip stream. */
767 if ( nskip >= count )
769 fp->errcode |= BGZF_ERR_HEADER;
770 return -1;
773 fp->is_gzip = 1;
774 fp->gz_stream = (z_stream*) calloc(1,sizeof(z_stream));
775 int ret = inflateInit2(fp->gz_stream, -15);
776 if (ret != Z_OK)
778 hts_log_error("Call to inflateInit2 failed: %s", bgzf_zerr(ret, fp->gz_stream));
779 fp->errcode |= BGZF_ERR_ZLIB;
780 return -1;
782 fp->gz_stream->avail_in = count - nskip;
783 fp->gz_stream->next_in = cblock + nskip;
784 count = inflate_gzip_block(fp, 1);
785 if ( count<0 )
787 fp->errcode |= BGZF_ERR_ZLIB;
788 return -1;
790 fp->block_length = count;
791 fp->block_address = block_address;
792 if ( fp->idx_build_otf ) return -1; // cannot build index for gzip
793 return 0;
795 size = count;
796 block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
797 if (block_length < BLOCK_HEADER_LENGTH)
799 fp->errcode |= BGZF_ERR_HEADER;
800 return -1;
802 compressed_block = (uint8_t*)fp->compressed_block;
803 memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
804 remaining = block_length - BLOCK_HEADER_LENGTH;
805 count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
806 if (count != remaining) {
807 fp->errcode |= BGZF_ERR_IO;
808 return -1;
810 size += count;
811 if ((count = inflate_block(fp, block_length)) < 0) {
812 hts_log_debug("Inflate block operation failed: %s", bgzf_zerr(count, NULL));
813 fp->errcode |= BGZF_ERR_ZLIB;
814 return -1;
816 fp->last_block_eof = (count == 0);
817 if ( count ) break; // otherwise an empty bgzf block
819 if (fp->block_length != 0) fp->block_offset = 0; // Do not reset offset if this read follows a seek.
820 fp->block_address = block_address;
821 fp->block_length = count;
822 if ( fp->idx_build_otf )
824 bgzf_index_add_block(fp);
825 fp->idx->ublock_addr += count;
827 cache_block(fp, size);
828 return 0;
831 ssize_t bgzf_read(BGZF *fp, void *data, size_t length)
833 ssize_t bytes_read = 0;
834 uint8_t *output = (uint8_t*)data;
835 if (length <= 0) return 0;
836 assert(fp->is_write == 0);
837 while (bytes_read < length) {
838 int copy_length, available = fp->block_length - fp->block_offset;
839 uint8_t *buffer;
840 if (available <= 0) {
841 int ret = bgzf_read_block(fp);
842 if (ret != 0) {
843 hts_log_error("Read block operation failed with error %d after %zd of %zu bytes", ret, bytes_read, length);
844 fp->errcode |= BGZF_ERR_ZLIB;
845 return -1;
847 available = fp->block_length - fp->block_offset;
848 if (available <= 0) break;
850 copy_length = length - bytes_read < available? length - bytes_read : available;
851 buffer = (uint8_t*)fp->uncompressed_block;
852 memcpy(output, buffer + fp->block_offset, copy_length);
853 fp->block_offset += copy_length;
854 output += copy_length;
855 bytes_read += copy_length;
857 // For raw gzip streams this avoids short reads.
858 if (fp->block_offset == fp->block_length) {
859 fp->block_address = bgzf_htell(fp);
860 fp->block_offset = fp->block_length = 0;
864 fp->uncompressed_address += bytes_read;
866 return bytes_read;
869 ssize_t bgzf_raw_read(BGZF *fp, void *data, size_t length)
871 ssize_t ret = hread(fp->fp, data, length);
872 if (ret < 0) fp->errcode |= BGZF_ERR_IO;
873 return ret;
876 #ifdef BGZF_MT
878 void *bgzf_encode_func(void *arg) {
879 bgzf_job *j = (bgzf_job *)arg;
881 j->comp_len = BGZF_MAX_BLOCK_SIZE;
882 int ret = bgzf_compress(j->comp_data, &j->comp_len,
883 j->uncomp_data, j->uncomp_len,
884 j->fp->compress_level);
885 if (ret != 0)
886 j->errcode |= BGZF_ERR_ZLIB;
888 return arg;
891 // Our input block has already been decoded by bgzf_mt_read_block().
892 // We need to split that into a fetch block (compressed) and make this
893 // do the actual decompression step.
894 void *bgzf_decode_func(void *arg) {
895 bgzf_job *j = (bgzf_job *)arg;
897 j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
898 int ret = bgzf_uncompress(j->uncomp_data, &j->uncomp_len,
899 j->comp_data+18, j->comp_len-18);
900 if (ret != 0)
901 j->errcode |= BGZF_ERR_ZLIB;
903 return arg;
907 * Nul function so we can dispatch a job with the correct serial
908 * to mark failure or to indicate an empty read (EOF).
910 void *bgzf_nul_func(void *arg) { return arg; }
913 * Takes compressed blocks off the results queue and calls hwrite to
914 * punt them to the output stream.
916 * Returns NULL when no more are left, or -1 on error
918 static void *bgzf_mt_writer(void *vp) {
919 BGZF *fp = (BGZF *)vp;
920 mtaux_t *mt = fp->mt;
921 hts_tpool_result *r;
923 // Iterates until result queue is shutdown, where it returns NULL.
924 while ((r = hts_tpool_next_result_wait(mt->out_queue))) {
925 bgzf_job *j = (bgzf_job *)hts_tpool_result_data(r);
926 assert(j);
928 if (hwrite(fp->fp, j->comp_data, j->comp_len) != j->comp_len) {
929 fp->errcode |= BGZF_ERR_IO;
930 goto err;
934 * Periodically call hflush (which calls fsync when on a file).
935 * This avoids the fsync being done at the bgzf_close stage,
936 * which can sometimes cause signficant delays. As this is in
937 * a separate thread, spreading the sync delays throughout the
938 * program execution seems better.
939 * Frequency of 1/512 has been chosen by experimentation
940 * across local XFS, NFS and Lustre tests.
942 if (++mt->flush_pending % 512 == 0)
943 if (hflush(fp->fp) != 0)
944 goto err;
947 hts_tpool_delete_result(r, 0);
949 // Also updated by main thread
950 pthread_mutex_lock(&mt->job_pool_m);
951 pool_free(mt->job_pool, j);
952 mt->jobs_pending--;
953 pthread_mutex_unlock(&mt->job_pool_m);
956 if (hflush(fp->fp) != 0)
957 goto err;
959 hts_tpool_process_destroy(mt->out_queue);
961 return NULL;
963 err:
964 hts_tpool_process_destroy(mt->out_queue);
965 return (void *)-1;
970 * Reads a compressed block of data using hread and dispatches it to
971 * the thread pool for decompression. This is the analogue of the old
972 * non-threaded bgzf_read_block() function, but without modifying fp
973 * in any way (except for the read offset). All output goes via the
974 * supplied bgzf_job struct.
976 * Returns NULL when no more are left, or -1 on error
978 int bgzf_mt_read_block(BGZF *fp, bgzf_job *j)
980 uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
981 int count, size = 0, block_length, remaining;
983 // NOTE: Guaranteed to be compressed as we block multi-threading in
984 // uncompressed mode. However it may be gzip compression instead
985 // of bgzf.
987 // Reading compressed file
988 int64_t block_address;
989 block_address = htell(fp->fp);
991 if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
992 count = hpeek(fp->fp, header, sizeof(header));
993 if (count == 0) // no data read
994 return -1;
995 int ret;
996 if ( count != sizeof(header) || (ret=check_header(header))==-2 )
998 j->errcode |= BGZF_ERR_HEADER;
999 return -1;
1001 if (ret == -1) {
1002 j->errcode |= BGZF_ERR_MT;
1003 return -1;
1006 count = hread(fp->fp, header, sizeof(header));
1007 if (count != sizeof(header)) // no data read
1008 return -1;
1010 size = count;
1011 block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1012 if (block_length < BLOCK_HEADER_LENGTH) {
1013 j->errcode |= BGZF_ERR_HEADER;
1014 return -1;
1016 compressed_block = (uint8_t*)j->comp_data;
1017 memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1018 remaining = block_length - BLOCK_HEADER_LENGTH;
1019 count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1020 if (count != remaining) {
1021 j->errcode |= BGZF_ERR_IO;
1022 return -1;
1024 size += count;
1025 j->comp_len = block_length;
1026 j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1027 j->block_address = block_address;
1028 j->fp = fp;
1029 j->errcode = 0;
1031 return 0;
1035 static int bgzf_check_EOF_common(BGZF *fp)
1037 uint8_t buf[28];
1038 off_t offset = htell(fp->fp);
1039 if (hseek(fp->fp, -28, SEEK_END) < 0) {
1040 if (errno == ESPIPE) { hclearerr(fp->fp); return 2; }
1041 else return -1;
1043 if ( hread(fp->fp, buf, 28) != 28 ) return -1;
1044 if ( hseek(fp->fp, offset, SEEK_SET) < 0 ) return -1;
1045 return (memcmp("\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", buf, 28) == 0)? 1 : 0;
1049 * Checks EOF from the reader thread.
1051 static void bgzf_mt_eof(BGZF *fp) {
1052 mtaux_t *mt = fp->mt;
1054 pthread_mutex_lock(&mt->job_pool_m);
1055 mt->eof = bgzf_check_EOF_common(fp);
1056 pthread_mutex_unlock(&mt->job_pool_m);
1057 pthread_cond_signal(&mt->command_c);
1062 * Performs the seek (called by reader thread).
1064 * This simply drains the entire queue, throwing away blocks, seeks,
1065 * and starts it up again. Brute force, but maybe sufficient.
1067 static void bgzf_mt_seek(BGZF *fp) {
1068 mtaux_t *mt = fp->mt;
1070 hts_tpool_process_reset(mt->out_queue, 0);
1071 pthread_mutex_lock(&mt->job_pool_m);
1072 mt->command = NONE;
1073 mt->errcode = 0;
1075 if (hseek(fp->fp, mt->block_address, SEEK_SET) < 0)
1076 mt->errcode = BGZF_ERR_IO;
1078 pthread_mutex_unlock(&mt->job_pool_m);
1079 pthread_cond_signal(&mt->command_c);
1082 static void *bgzf_mt_reader(void *vp) {
1083 BGZF *fp = (BGZF *)vp;
1084 mtaux_t *mt = fp->mt;
1086 restart:
1087 pthread_mutex_lock(&mt->job_pool_m);
1088 bgzf_job *j = pool_alloc(mt->job_pool);
1089 pthread_mutex_unlock(&mt->job_pool_m);
1090 j->errcode = 0;
1091 j->comp_len = 0;
1092 j->uncomp_len = 0;
1093 j->hit_eof = 0;
1095 while (bgzf_mt_read_block(fp, j) == 0) {
1096 // Dispatch
1097 hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_decode_func, j);
1099 // Check for command
1100 pthread_mutex_lock(&mt->command_m);
1101 switch (mt->command) {
1102 case SEEK:
1103 bgzf_mt_seek(fp);
1104 pthread_mutex_unlock(&mt->command_m);
1105 goto restart;
1107 case HAS_EOF:
1108 bgzf_mt_eof(fp);
1109 break;
1111 case CLOSE:
1112 pthread_cond_signal(&mt->command_c);
1113 pthread_mutex_unlock(&mt->command_m);
1114 hts_tpool_process_destroy(mt->out_queue);
1115 pthread_exit(NULL);
1117 default:
1118 break;
1120 pthread_mutex_unlock(&mt->command_m);
1122 // Allocate buffer for next block
1123 pthread_mutex_lock(&mt->job_pool_m);
1124 j = pool_alloc(mt->job_pool);
1125 pthread_mutex_unlock(&mt->job_pool_m);
1126 j->errcode = 0;
1127 j->comp_len = 0;
1128 j->uncomp_len = 0;
1129 j->hit_eof = 0;
1132 if (j->errcode == BGZF_ERR_MT) {
1133 // Attempt to multi-thread decode a raw gzip stream cannot be done.
1134 // We tear down the multi-threaded decoder and revert to the old code.
1135 hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_nul_func, j);
1136 hts_tpool_process_ref_decr(mt->out_queue);
1137 pthread_exit(&j->errcode);
1140 // Dispatch an empty block so EOF is spotted.
1141 // We also use this mechanism for returning errors, in which case
1142 // j->errcode is set already.
1144 j->hit_eof = 1;
1145 hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_nul_func, j);
1146 if (j->errcode != 0) {
1147 hts_tpool_process_destroy(mt->out_queue);
1148 pthread_exit(&j->errcode);
1151 // We hit EOF so can stop reading, but we may get a subsequent
1152 // seek request. In this case we need to restart the reader.
1154 // To handle this we wait on a condition variable and then
1155 // monitor the command. (This could be either seek or close.)
1156 for (;;) {
1157 pthread_mutex_lock(&mt->command_m);
1158 if (mt->command == NONE)
1159 pthread_cond_wait(&mt->command_c, &mt->command_m);
1160 switch(mt->command) {
1161 default:
1162 pthread_mutex_unlock(&mt->command_m);
1163 break;
1165 case SEEK:
1166 bgzf_mt_seek(fp);
1167 pthread_mutex_unlock(&mt->command_m);
1168 goto restart;
1170 case HAS_EOF:
1171 bgzf_mt_eof(fp);
1172 pthread_mutex_unlock(&mt->command_m);
1173 continue;
1175 case CLOSE:
1176 pthread_cond_signal(&mt->command_c);
1177 pthread_mutex_unlock(&mt->command_m);
1178 hts_tpool_process_destroy(mt->out_queue);
1179 pthread_exit(NULL);
1184 int bgzf_thread_pool(BGZF *fp, hts_tpool *pool, int qsize) {
1185 // No gain from multi-threading when not compressed
1186 if (!fp->is_compressed)
1187 return 0;
1189 mtaux_t *mt;
1190 mt = (mtaux_t*)calloc(1, sizeof(mtaux_t));
1191 if (!mt) return -1;
1192 fp->mt = mt;
1194 mt->pool = pool;
1195 mt->n_threads = hts_tpool_size(pool);
1196 if (!qsize)
1197 qsize = mt->n_threads*2;
1198 if (!(mt->out_queue = hts_tpool_process_init(mt->pool, qsize, 0))) {
1199 free(mt);
1200 return -1;
1202 hts_tpool_process_ref_incr(mt->out_queue);
1204 mt->job_pool = pool_create(sizeof(bgzf_job));
1206 pthread_mutex_init(&mt->job_pool_m, NULL);
1207 pthread_mutex_init(&mt->command_m, NULL);
1208 pthread_cond_init(&mt->command_c, NULL);
1209 mt->flush_pending = 0;
1210 mt->jobs_pending = 0;
1211 mt->free_block = fp->uncompressed_block; // currently in-use block
1212 pthread_create(&mt->io_task, NULL,
1213 fp->is_write ? bgzf_mt_writer : bgzf_mt_reader, fp);
1215 return 0;
1218 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1220 // No gain from multi-threading when not compressed
1221 if (!fp->is_compressed || fp->is_gzip)
1222 return 0;
1224 if (n_threads < 1) return -1;
1225 hts_tpool *p = hts_tpool_init(n_threads);
1226 if (!p)
1227 return -1;
1229 if (bgzf_thread_pool(fp, p, 0) != 0) {
1230 hts_tpool_destroy(p);
1231 return -1;
1234 fp->mt->own_pool = 1;
1236 return 0;
1239 static void mt_destroy(mtaux_t *mt)
1241 pthread_mutex_lock(&mt->command_m);
1242 mt->command = CLOSE;
1243 pthread_cond_signal(&mt->command_c);
1244 hts_tpool_wake_dispatch(mt->out_queue); // unstick the reader
1245 pthread_mutex_unlock(&mt->command_m);
1247 // Destroying the queue first forces the writer to exit.
1248 hts_tpool_process_destroy(mt->out_queue);
1249 pthread_join(mt->io_task, NULL);
1251 pthread_mutex_destroy(&mt->job_pool_m);
1252 pthread_mutex_destroy(&mt->command_m);
1253 pthread_cond_destroy(&mt->command_c);
1254 if (mt->curr_job)
1255 pool_free(mt->job_pool, mt->curr_job);
1257 if (mt->own_pool)
1258 hts_tpool_destroy(mt->pool);
1260 pool_destroy(mt->job_pool);
1262 free(mt);
1263 fflush(stderr);
1266 static int mt_queue(BGZF *fp)
1268 mtaux_t *mt = fp->mt;
1270 // Also updated by writer thread
1271 pthread_mutex_lock(&mt->job_pool_m);
1272 bgzf_job *j = pool_alloc(mt->job_pool);
1273 mt->jobs_pending++;
1274 pthread_mutex_unlock(&mt->job_pool_m);
1276 j->fp = fp;
1277 j->errcode = 0;
1278 j->uncomp_len = fp->block_offset;
1279 memcpy(j->uncomp_data, fp->uncompressed_block, j->uncomp_len);
1281 // Need non-block vers & job_pending?
1282 hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_encode_func, j);
1284 fp->block_offset = 0;
1285 return 0;
1288 static int mt_flush_queue(BGZF *fp)
1290 mtaux_t *mt = fp->mt;
1292 // Drain the encoder jobs.
1293 // We cannot use hts_tpool_flush here as it can cause deadlock if
1294 // the queue is full up of decoder tasks. The best solution would
1295 // be to have one input queue per type of job, but we don't right now.
1296 //hts_tpool_flush(mt->pool);
1297 pthread_mutex_lock(&mt->job_pool_m);
1298 while (mt->jobs_pending != 0) {
1299 pthread_mutex_unlock(&mt->job_pool_m);
1300 usleep(10000); // FIXME: replace by condition variable
1301 pthread_mutex_lock(&mt->job_pool_m);
1303 pthread_mutex_unlock(&mt->job_pool_m);
1305 // Wait on bgzf_mt_writer to drain the queue
1306 if (hts_tpool_process_flush(mt->out_queue) != 0)
1307 return -1;
1309 return (fp->errcode == 0)? 0 : -1;
1312 static int lazy_flush(BGZF *fp)
1314 if (fp->mt)
1315 return fp->block_offset ? mt_queue(fp) : 0;
1316 else
1317 return bgzf_flush(fp);
1320 #else // ~ #ifdef BGZF_MT
1322 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1324 return 0;
1327 static inline int lazy_flush(BGZF *fp)
1329 return bgzf_flush(fp);
1332 #endif // ~ #ifdef BGZF_MT
1334 int bgzf_flush(BGZF *fp)
1336 if (!fp->is_write) return 0;
1337 #ifdef BGZF_MT
1338 if (fp->mt) {
1339 int ret = 0;
1340 if (fp->block_offset) ret = mt_queue(fp);
1341 return ret ? ret : mt_flush_queue(fp);
1343 #endif
1344 while (fp->block_offset > 0) {
1345 int block_length;
1346 if ( fp->idx_build_otf )
1348 bgzf_index_add_block(fp);
1349 fp->idx->ublock_addr += fp->block_offset;
1351 block_length = deflate_block(fp, fp->block_offset);
1352 if (block_length < 0) {
1353 hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length, NULL));
1354 return -1;
1356 if (hwrite(fp->fp, fp->compressed_block, block_length) != block_length) {
1357 hts_log_error("File write failed (wrong size)");
1358 fp->errcode |= BGZF_ERR_IO; // possibly truncated file
1359 return -1;
1361 fp->block_address += block_length;
1363 return 0;
1366 int bgzf_flush_try(BGZF *fp, ssize_t size)
1368 if (fp->block_offset + size > BGZF_BLOCK_SIZE) return lazy_flush(fp);
1369 return 0;
1372 ssize_t bgzf_write(BGZF *fp, const void *data, size_t length)
1374 if ( !fp->is_compressed )
1375 return hwrite(fp->fp, data, length);
1377 const uint8_t *input = (const uint8_t*)data;
1378 ssize_t remaining = length;
1379 assert(fp->is_write);
1380 while (remaining > 0) {
1381 uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
1382 int copy_length = BGZF_BLOCK_SIZE - fp->block_offset;
1383 if (copy_length > remaining) copy_length = remaining;
1384 memcpy(buffer + fp->block_offset, input, copy_length);
1385 fp->block_offset += copy_length;
1386 input += copy_length;
1387 remaining -= copy_length;
1388 if (fp->block_offset == BGZF_BLOCK_SIZE) {
1389 if (lazy_flush(fp) != 0) return -1;
1392 return length - remaining;
1395 ssize_t bgzf_block_write(BGZF *fp, const void *data, size_t length)
1397 if ( !fp->is_compressed )
1398 return hwrite(fp->fp, data, length);
1399 const uint8_t *input = (const uint8_t*)data;
1400 ssize_t remaining = length;
1401 assert(fp->is_write);
1402 uint64_t current_block; //keep track of current block
1403 uint64_t ublock_size; // amount of uncompressed data to be fed into next block
1404 while (remaining > 0) {
1405 current_block = fp->idx->moffs - fp->idx->noffs;
1406 ublock_size = fp->idx->offs[current_block+1].uaddr-fp->idx->offs[current_block].uaddr;
1407 uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
1408 int copy_length = ublock_size - fp->block_offset;
1409 if (copy_length > remaining) copy_length = remaining;
1410 memcpy(buffer + fp->block_offset, input, copy_length);
1411 fp->block_offset += copy_length;
1412 input += copy_length;
1413 remaining -= copy_length;
1414 if (fp->block_offset == ublock_size) {
1415 if (lazy_flush(fp) != 0) return -1;
1416 fp->idx->noffs--; // decrement noffs to track the blocks
1419 return length - remaining;
1423 ssize_t bgzf_raw_write(BGZF *fp, const void *data, size_t length)
1425 ssize_t ret = hwrite(fp->fp, data, length);
1426 if (ret < 0) fp->errcode |= BGZF_ERR_IO;
1427 return ret;
1430 int bgzf_close(BGZF* fp)
1432 int ret, block_length;
1433 if (fp == 0) return -1;
1434 if (fp->is_write && fp->is_compressed) {
1435 if (bgzf_flush(fp) != 0) return -1;
1436 fp->compress_level = -1;
1437 block_length = deflate_block(fp, 0); // write an empty block
1438 if (block_length < 0) {
1439 hts_log_debug("Deflate block operation failed: %s", bgzf_zerr(block_length, NULL));
1440 return -1;
1442 if (hwrite(fp->fp, fp->compressed_block, block_length) < 0
1443 || hflush(fp->fp) != 0) {
1444 hts_log_error("File write failed");
1445 fp->errcode |= BGZF_ERR_IO;
1446 return -1;
1449 #ifdef BGZF_MT
1450 if (fp->mt) {
1451 if (!fp->mt->free_block)
1452 fp->uncompressed_block = NULL;
1453 mt_destroy(fp->mt);
1455 #endif
1456 if ( fp->is_gzip )
1458 if (fp->gz_stream == NULL) ret = Z_OK;
1459 else if (!fp->is_write) ret = inflateEnd(fp->gz_stream);
1460 else ret = deflateEnd(fp->gz_stream);
1461 if (ret != Z_OK) {
1462 hts_log_error("Call to inflateEnd/deflateEnd failed: %s", bgzf_zerr(ret, NULL));
1464 free(fp->gz_stream);
1466 ret = hclose(fp->fp);
1467 if (ret != 0) return -1;
1468 bgzf_index_destroy(fp);
1469 free(fp->uncompressed_block);
1470 free_cache(fp);
1471 free(fp);
1472 return 0;
1475 void bgzf_set_cache_size(BGZF *fp, int cache_size)
1477 if (fp) fp->cache_size = cache_size;
1480 int bgzf_check_EOF(BGZF *fp) {
1481 int has_eof;
1483 if (fp->mt) {
1484 pthread_mutex_lock(&fp->mt->command_m);
1485 fp->mt->command = HAS_EOF;
1486 pthread_cond_signal(&fp->mt->command_c);
1487 hts_tpool_wake_dispatch(fp->mt->out_queue);
1488 pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
1489 has_eof = fp->mt->eof;
1490 pthread_mutex_unlock(&fp->mt->command_m);
1491 } else {
1492 has_eof = bgzf_check_EOF_common(fp);
1495 fp->no_eof_block = (has_eof == 0);
1497 return has_eof;
1500 int64_t bgzf_seek(BGZF* fp, int64_t pos, int where)
1502 int block_offset;
1503 int64_t block_address;
1505 if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
1506 fp->errcode |= BGZF_ERR_MISUSE;
1507 return -1;
1509 block_offset = pos & 0xFFFF;
1510 block_address = pos >> 16;
1512 if (fp->mt) {
1513 // The reader runs asynchronous and does loops of:
1514 // Read block
1515 // Check & process command
1516 // Dispatch decode job
1518 // Once at EOF it then switches to loops of
1519 // Wait for command
1520 // Process command (possibly switching back to above loop).
1522 // To seek we therefore send the reader thread a SEEK command,
1523 // waking it up if blocked in dispatch and signalling if
1524 // waiting for a command. We then wait for the response so we
1525 // know the seek succeeded.
1526 pthread_mutex_lock(&fp->mt->command_m);
1527 fp->mt->hit_eof = 0;
1528 fp->mt->command = SEEK;
1529 fp->mt->block_address = block_address;
1530 pthread_cond_signal(&fp->mt->command_c);
1531 hts_tpool_wake_dispatch(fp->mt->out_queue);
1532 pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
1534 fp->block_length = 0; // indicates current block has not been loaded
1535 fp->block_address = block_address;
1536 fp->block_offset = block_offset;
1538 pthread_mutex_unlock(&fp->mt->command_m);
1539 } else {
1540 if (hseek(fp->fp, block_address, SEEK_SET) < 0) {
1541 fp->errcode |= BGZF_ERR_IO;
1542 return -1;
1544 fp->block_length = 0; // indicates current block has not been loaded
1545 fp->block_address = block_address;
1546 fp->block_offset = block_offset;
1549 return 0;
1552 int bgzf_is_bgzf(const char *fn)
1554 uint8_t buf[16];
1555 int n;
1556 hFILE *fp;
1557 if ((fp = hopen(fn, "r")) == 0) return 0;
1558 n = hread(fp, buf, 16);
1559 if (hclose(fp) < 0) return 0;
1560 if (n != 16) return 0;
1561 return check_header(buf) == 0? 1 : 0;
1564 int bgzf_compression(BGZF *fp)
1566 return (!fp->is_compressed)? no_compression : (fp->is_gzip)? gzip : bgzf;
1569 int bgzf_getc(BGZF *fp)
1571 if (fp->block_offset+1 < fp->block_length) {
1572 fp->uncompressed_address++;
1573 return ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
1576 int c;
1577 if (fp->block_offset >= fp->block_length) {
1578 if (bgzf_read_block(fp) != 0) return -2; /* error */
1579 if (fp->block_length == 0) return -1; /* end-of-file */
1581 c = ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
1582 if (fp->block_offset == fp->block_length) {
1583 fp->block_address = bgzf_htell(fp);
1584 fp->block_offset = 0;
1585 fp->block_length = 0;
1587 fp->uncompressed_address++;
1588 return c;
1591 #ifndef kroundup32
1592 #define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
1593 #endif
1595 int bgzf_getline(BGZF *fp, int delim, kstring_t *str)
1597 int l, state = 0;
1598 str->l = 0;
1599 do {
1600 if (fp->block_offset >= fp->block_length) {
1601 if (bgzf_read_block(fp) != 0) { state = -2; break; }
1602 if (fp->block_length == 0) { state = -1; break; }
1604 unsigned char *buf = fp->uncompressed_block;
1605 for (l = fp->block_offset; l < fp->block_length && buf[l] != delim; ++l);
1606 if (l < fp->block_length) state = 1;
1607 l -= fp->block_offset;
1608 if (str->l + l + 1 >= str->m) {
1609 str->m = str->l + l + 2;
1610 kroundup32(str->m);
1611 str->s = (char*)realloc(str->s, str->m);
1613 memcpy(str->s + str->l, buf + fp->block_offset, l);
1614 str->l += l;
1615 fp->block_offset += l + 1;
1616 if (fp->block_offset >= fp->block_length) {
1617 fp->block_address = bgzf_htell(fp);
1618 fp->block_offset = 0;
1619 fp->block_length = 0;
1621 } while (state == 0);
1622 if (str->l == 0 && state < 0) return state;
1623 fp->uncompressed_address += str->l + 1;
1624 if ( delim=='\n' && str->l>0 && str->s[str->l-1]=='\r' ) str->l--;
1625 str->s[str->l] = 0;
1626 return str->l;
1629 void bgzf_index_destroy(BGZF *fp)
1631 if ( !fp->idx ) return;
1632 free(fp->idx->offs);
1633 free(fp->idx);
1634 fp->idx = NULL;
1635 fp->idx_build_otf = 0;
1638 int bgzf_index_build_init(BGZF *fp)
1640 bgzf_index_destroy(fp);
1641 fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
1642 if ( !fp->idx ) return -1;
1643 fp->idx_build_otf = 1; // build index on the fly
1644 return 0;
1647 int bgzf_index_add_block(BGZF *fp)
1649 fp->idx->noffs++;
1650 if ( fp->idx->noffs > fp->idx->moffs )
1652 fp->idx->moffs = fp->idx->noffs;
1653 kroundup32(fp->idx->moffs);
1654 fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
1655 if ( !fp->idx->offs ) return -1;
1657 fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->ublock_addr;
1658 fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->block_address;
1659 return 0;
1662 static inline int hwrite_uint64(uint64_t x, hFILE *f)
1664 if (ed_is_big()) x = ed_swap_8(x);
1665 if (hwrite(f, &x, sizeof(x)) != sizeof(x)) return -1;
1666 return 0;
1669 static char * get_name_suffix(const char *bname, const char *suffix)
1671 size_t len = strlen(bname) + strlen(suffix) + 1;
1672 char *buff = malloc(len);
1673 if (!buff) return NULL;
1674 snprintf(buff, len, "%s%s", bname, suffix);
1675 return buff;
1678 int bgzf_index_dump_hfile(BGZF *fp, struct hFILE *idx, const char *name)
1680 // Note that the index contains one extra record when indexing files opened
1681 // for reading. The terminating record is not present when opened for writing.
1682 // This is not a bug.
1684 int i;
1686 if (!fp->idx) {
1687 hts_log_error("Called for BGZF handle with no index");
1688 errno = EINVAL;
1689 return -1;
1692 if (bgzf_flush(fp) != 0) return -1;
1694 if (hwrite_uint64(fp->idx->noffs - 1, idx) < 0) goto fail;
1695 for (i=1; i<fp->idx->noffs; i++)
1697 if (hwrite_uint64(fp->idx->offs[i].caddr, idx) < 0) goto fail;
1698 if (hwrite_uint64(fp->idx->offs[i].uaddr, idx) < 0) goto fail;
1700 return 0;
1702 fail:
1703 hts_log_error("Error writing to %s : %s", name ? name : "index", strerror(errno));
1704 return -1;
1707 int bgzf_index_dump(BGZF *fp, const char *bname, const char *suffix)
1709 const char *name = bname, *msg = NULL;
1710 char *tmp = NULL;
1711 hFILE *idx = NULL;
1713 if (!fp->idx) {
1714 hts_log_error("Called for BGZF handle with no index");
1715 errno = EINVAL;
1716 return -1;
1719 if ( suffix )
1721 tmp = get_name_suffix(bname, suffix);
1722 if ( !tmp ) return -1;
1723 name = tmp;
1726 idx = hopen(name, "wb");
1727 if ( !idx ) {
1728 msg = "Error opening";
1729 goto fail;
1732 if (bgzf_index_dump_hfile(fp, idx, name) != 0) goto fail;
1734 if (hclose(idx) < 0)
1736 idx = NULL;
1737 msg = "Error on closing";
1738 goto fail;
1741 free(tmp);
1742 return 0;
1744 fail:
1745 if (msg != NULL) {
1746 hts_log_error("%s %s : %s", msg, name, strerror(errno));
1748 if (idx) hclose_abruptly(idx);
1749 free(tmp);
1750 return -1;
1753 static inline int hread_uint64(uint64_t *xptr, hFILE *f)
1755 if (hread(f, xptr, sizeof(*xptr)) != sizeof(*xptr)) return -1;
1756 if (ed_is_big()) ed_swap_8p(xptr);
1757 return 0;
1760 int bgzf_index_load_hfile(BGZF *fp, struct hFILE *idx, const char *name)
1762 fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
1763 if (fp->idx == NULL) goto fail;
1764 uint64_t x;
1765 if (hread_uint64(&x, idx) < 0) goto fail;
1767 fp->idx->noffs = fp->idx->moffs = x + 1;
1768 fp->idx->offs = (bgzidx1_t*) malloc(fp->idx->moffs*sizeof(bgzidx1_t));
1769 if (fp->idx->offs == NULL) goto fail;
1770 fp->idx->offs[0].caddr = fp->idx->offs[0].uaddr = 0;
1772 int i;
1773 for (i=1; i<fp->idx->noffs; i++)
1775 if (hread_uint64(&fp->idx->offs[i].caddr, idx) < 0) goto fail;
1776 if (hread_uint64(&fp->idx->offs[i].uaddr, idx) < 0) goto fail;
1779 return 0;
1781 fail:
1782 hts_log_error("Error reading %s : %s", name ? name : "index", strerror(errno));
1783 if (fp->idx) {
1784 free(fp->idx->offs);
1785 free(fp->idx);
1786 fp->idx = NULL;
1788 return -1;
1791 int bgzf_index_load(BGZF *fp, const char *bname, const char *suffix)
1793 const char *name = bname, *msg = NULL;
1794 char *tmp = NULL;
1795 hFILE *idx = NULL;
1796 if ( suffix )
1798 tmp = get_name_suffix(bname, suffix);
1799 if ( !tmp ) return -1;
1800 name = tmp;
1803 idx = hopen(name, "rb");
1804 if ( !idx ) {
1805 msg = "Error opening";
1806 goto fail;
1809 if (bgzf_index_load_hfile(fp, idx, name) != 0) goto fail;
1811 if (hclose(idx) != 0) {
1812 idx = NULL;
1813 msg = "Error closing";
1814 goto fail;
1817 free(tmp);
1818 return 0;
1820 fail:
1821 if (msg != NULL) {
1822 hts_log_error("%s %s : %s", msg, name, strerror(errno));
1824 if (idx) hclose_abruptly(idx);
1825 free(tmp);
1826 return -1;
1829 int bgzf_useek(BGZF *fp, long uoffset, int where)
1831 if ( !fp->is_compressed )
1833 if (hseek(fp->fp, uoffset, SEEK_SET) < 0)
1835 fp->errcode |= BGZF_ERR_IO;
1836 return -1;
1838 fp->block_length = 0; // indicates current block has not been loaded
1839 fp->block_address = uoffset;
1840 fp->block_offset = 0;
1841 if (bgzf_read_block(fp) < 0) {
1842 fp->errcode |= BGZF_ERR_IO;
1843 return -1;
1845 fp->uncompressed_address = uoffset;
1846 return 0;
1849 if ( !fp->idx )
1851 fp->errcode |= BGZF_ERR_IO;
1852 return -1;
1855 // binary search
1856 int ilo = 0, ihi = fp->idx->noffs - 1;
1857 while ( ilo<=ihi )
1859 int i = (ilo+ihi)*0.5;
1860 if ( uoffset < fp->idx->offs[i].uaddr ) ihi = i - 1;
1861 else if ( uoffset >= fp->idx->offs[i].uaddr ) ilo = i + 1;
1862 else break;
1864 int i = ilo-1;
1865 if (hseek(fp->fp, fp->idx->offs[i].caddr, SEEK_SET) < 0)
1867 fp->errcode |= BGZF_ERR_IO;
1868 return -1;
1870 fp->block_length = 0; // indicates current block has not been loaded
1871 fp->block_address = fp->idx->offs[i].caddr;
1872 fp->block_offset = 0;
1873 if ( bgzf_read_block(fp) < 0 ) {
1874 fp->errcode |= BGZF_ERR_IO;
1875 return -1;
1877 if ( uoffset - fp->idx->offs[i].uaddr > 0 )
1879 fp->block_offset = uoffset - fp->idx->offs[i].uaddr;
1880 assert( fp->block_offset <= fp->block_length ); // todo: skipped, unindexed, blocks
1882 fp->uncompressed_address = uoffset;
1883 return 0;
1886 long bgzf_utell(BGZF *fp)
1888 return fp->uncompressed_address; // currently maintained only when reading
1891 /* prototype is in hfile_internal.h */
1892 struct hFILE *bgzf_hfile(struct BGZF *fp) {
1893 return fp->fp;