2 * Copyright 2018-2022 NXP.
4 * Redistribution and use in source and binary forms, with or without modification,
5 * are permitted provided that the following conditions are met:
7 * Redistributions of source code must retain the above copyright notice, this
8 * list of conditions and the following disclaimer.
10 * Redistributions in binary form must reproduce the above copyright notice, this
11 * list of conditions and the following disclaimer in the documentation and/or
12 * other materials provided with the distribution.
14 * Neither the name of the NXP Semiconductor nor the names of its
15 * contributors may be used to endorse or promote products derived from this
16 * software without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
52 #define stat_os _stat64
53 #elif defined(__APPLE__)
57 #define stat_os stat64
61 static map
<string
, shared_ptr
<FileBuffer
>> g_filebuffer_map
;
62 static mutex g_mutex_map
;
63 static bool g_small_memory
= true;
65 #define MAGIC_PATH '>'
67 string g_current_dir
= ">";
69 void set_current_dir(const string
&dir
)
71 g_current_dir
= MAGIC_PATH
;
76 int DataBuffer:: resize(size_t sz
)
78 if (m_allocate_way
!= ALLOCATION_WAYS::MALLOC
)
80 set_last_err_string("data buffer ref can't resize");
87 m_pDatabuffer
= (uint8_t*)realloc(m_pDatabuffer
, sz
);
90 set_last_err_string("fail alloc memory");
100 int DataBuffer::ref_other_buffer(std::shared_ptr
<FileBuffer
> p
, size_t offset
, size_t size
)
105 if (p
->m_allocate_way
== FileBuffer::ALLOCATION_WAYS::SEGMENT
)
107 shared_ptr
<FragmentBlock
> blk
;
108 blk
= p
->get_map_it(offset
);
109 if (offset
+ size
< blk
->m_output_offset
+ blk
->m_actual_size
)
111 m_pDatabuffer
= blk
->m_data
.data() + offset
- blk
->m_output_offset
;
120 m_pDatabuffer
= p
->data() + offset
;
125 m_allocate_way
= ALLOCATION_WAYS::REF
;
133 friend class DataBuffer
;
134 friend class FileBuffer
;
135 virtual int get_file_timesample(const string
&filename
, uint64_t *ptime
) = 0;
136 virtual int load(const string
&backfile
, const string
&filename
, shared_ptr
<FileBuffer
> p
) = 0;
137 virtual bool exist(const string
&backfile
, const string
&filename
) = 0;
138 virtual int for_each_ls(uuu_ls_file fn
, const string
&backfile
, const string
&filename
, void *p
) = 0;
140 virtual int Decompress(const string
& /*backfifle*/, shared_ptr
<FileBuffer
> /*outp*/) { return 0; };
141 virtual bool seekable(const string
& /*backfile*/) { return false; }
142 virtual std::shared_ptr
<FragmentBlock
> ScanCompressblock(const string
& /*backfile*/, size_t& /*input_offset*/, size_t& /*output_offset*/) { return NULL
; };
143 virtual int PreloadWorkThread(shared_ptr
<FileBuffer
>outp
);
145 virtual int split(const string
&filename
, string
*outbackfile
, string
*outfilename
, bool dir
=false)
147 string path
= str_to_upper(filename
);
148 if (m_ext
== nullptr || strlen(m_ext
) == 0)
152 size_t pos
= path
.rfind("/");
153 if(pos
== string::npos
)
155 *outbackfile
= MAGIC_PATH
;
156 *outbackfile
+= "./";
157 *outfilename
= filename
;
159 *outbackfile
= filename
.substr(0, pos
);
160 if(filename
.size() >= pos
+ 1)
161 *outfilename
= filename
.substr(pos
+ 1);
163 outfilename
->clear();
167 *outbackfile
= filename
;
175 size_t pos
= path
.rfind(ext
);
176 if (pos
== string::npos
)
178 string err
= "can't find ext name in path: ";
180 set_last_err_string(err
);
184 *outbackfile
= filename
.substr(0, pos
+ strlen(m_ext
));
186 if(filename
.size() >= pos
+ strlen(m_ext
) + 1)
187 *outfilename
= filename
.substr(pos
+ strlen(m_ext
) + 1);
189 outfilename
->clear();
194 const char * m_ext
= nullptr;
195 const char * m_Prefix
= nullptr;
197 bool m_small_pool
= false;
200 static class FSFlat
: public FSBasic
203 FSFlat() { m_ext
= ""; }
204 int get_file_timesample(const string
&filename
, uint64_t *ptime
) override
207 if (stat_os(filename
.c_str() + 1, &st
))
209 set_last_err_string("stat_os failure");
213 *ptime
= st
.st_mtime
;
218 bool exist(const string
&backfile
, const string
& /*filename*/) override
223 if (backfile
[0] != MAGIC_PATH
)
226 return stat_os(backfile
.c_str() + off
, &st
) == 0 && ((st
.st_mode
& S_IFDIR
) == 0);
229 int load(const string
&backfile
, const string
&/*filename*/, shared_ptr
<FileBuffer
> p
) override
232 if (stat_os(backfile
.c_str() + 1, &st
))
234 set_last_err_string("stat_os failure");
239 if (p
->mapfile(backfile
.substr(1), st
.st_size
))
242 p
->m_available_size
= st
.st_size
;
244 atomic_fetch_or(&p
->m_dataflags
, FILEBUFFER_FLAG_LOADED
| FILEBUFFER_FLAG_NEVER_FREE
);
245 p
->m_request_cv
.notify_all();
250 int for_each_ls(uuu_ls_file fn
, const string
&backfile
, const string
&filename
, void *p
) override
254 if(stat_os(backfile
.c_str() + 1, &st
))
259 if(st
.st_mode
& S_IFDIR
)
262 string str
= backfile
.substr(1);
263 if (filename
.empty())
266 str
+= "/" + filename
;
269 HANDLE handle
= FindFirstFile(str
.c_str(), &fd
);
273 if (handle
== INVALID_HANDLE_VALUE
)
275 string path
= backfile
+ "/" + fd
.cFileName
;
276 if(fd
.dwFileAttributes
&FILE_ATTRIBUTE_DIRECTORY
)
278 fn(path
.c_str() + 1, p
);
279 } while (FindNextFile(handle
, &fd
));
284 dir
= opendir(backfile
.c_str() + 1);
286 while ((dp
=readdir(dir
)) != nullptr)
288 string name
= dp
->d_name
;
289 if(name
.substr(0, filename
.size()) == filename
|| filename
.empty())
291 string path
= backfile
+ "/" + name
;
292 if(dp
->d_type
== DT_DIR
)
294 fn(path
.c_str() + 1, p
);
302 return fn(backfile
.c_str() + 1, p
);
308 class FSNetwork
: public FSBasic
314 int split(const string
&filename
, string
*outbackfile
, string
*outfilename
, bool /*dir = false*/) override
316 if (m_Prefix
== nullptr)
319 if (filename
.size() < strlen(m_Prefix
))
322 string path
= str_to_upper(filename
);
323 if (path
.compare(1, strlen(m_Prefix
), m_Prefix
) == 0)
326 pos
= filename
.find('/', 1 + strlen(m_Prefix
));
328 *outbackfile
= filename
.substr(1 + strlen(m_Prefix
), pos
- 1 - strlen(m_Prefix
));
331 cpos
= outbackfile
->find(':');
332 if (cpos
!= string::npos
)
334 m_Port
= str_to_uint32(outbackfile
->substr(cpos
+ 1));
336 *outbackfile
= outbackfile
->substr(0, cpos
);
338 *outfilename
= filename
.substr(pos
);
347 static class FSHttp
: public FSNetwork
350 FSHttp() { m_Prefix
= "HTTP://"; m_Port
= 80; }
351 int load(const string
&backfile
, const string
&filename
, shared_ptr
<FileBuffer
> p
) override
;
352 virtual bool exist(const string
&backfile
, const string
&filename
) override
354 shared_ptr
<HttpStream
> http
= make_shared
<HttpStream
>();
356 if (http
->HttpGetHeader(backfile
, filename
, m_Port
, typeid(*this) != typeid(FSHttp
)))
361 int for_each_ls(uuu_ls_file
/*fn*/, const string
&/*backfile*/, const string
&/*filename*/, void * /*p*/) override
{ return 0; };
362 int get_file_timesample(const string
&/*filename*/, uint64_t * /*ptime*/) override
{ return 0; };
363 int http_load(shared_ptr
<HttpStream
> http
, shared_ptr
<FileBuffer
> p
, string filename
);
366 static class FSHttps
: public FSHttp
369 FSHttps() { m_Prefix
= "HTTPS://"; m_Port
= 443; }
372 int FSHttp::http_load(shared_ptr
<HttpStream
> http
, shared_ptr
<FileBuffer
> p
, string filename
)
374 size_t max
= 0x10000;
377 ut
.type
= uuu_notify::NOTIFY_DOWNLOAD_START
;
378 ut
.str
= (char*)filename
.c_str();
381 ut
.type
= uuu_notify::NOTIFY_TRANS_SIZE
;
382 ut
.total
= p
->size();
385 for (size_t i
= 0; i
< p
->size() && !p
->m_reset_stream
; i
+= max
)
387 size_t sz
= p
->size() - i
;
390 if (http
->HttpDownload((char*)(p
->data() + i
), sz
) < 0)
392 atomic_fetch_or(&p
->m_dataflags
, FILEBUFFER_FLAG_ERROR_BIT
);
393 p
->m_request_cv
.notify_all();
396 p
->m_available_size
= i
+ sz
;
397 p
->m_request_cv
.notify_all();
399 ut
.type
= uuu_notify::NOTIFY_TRANS_POS
;
404 atomic_fetch_or(&p
->m_dataflags
, FILEBUFFER_FLAG_LOADED
| FILEBUFFER_FLAG_NEVER_FREE
);
405 p
->m_request_cv
.notify_all();
407 ut
.type
= uuu_notify::NOTIFY_DOWNLOAD_END
;
408 ut
.str
= (char*)filename
.c_str();
413 class FSBackFile
: public FSBasic
416 int get_file_timesample(const string
&filename
, uint64_t *ptime
) override
;
420 static class FSZip
: public FSBackFile
423 FSZip() { m_ext
= ".ZIP"; };
424 int load(const string
&backfile
, const string
&filename
, shared_ptr
<FileBuffer
> p
) override
;
425 bool exist(const string
&backfile
, const string
&filename
) override
;
426 int for_each_ls(uuu_ls_file fn
, const string
&backfile
, const string
&filename
, void *p
) override
;
429 static class FSTar
: public FSBackFile
432 FSTar() {m_ext
= ".TAR"; };
433 int load(const string
&backfile
, const string
&filename
, shared_ptr
<FileBuffer
> p
) override
;
434 bool exist(const string
&backfile
, const string
&filename
) override
;
435 int for_each_ls(uuu_ls_file fn
, const string
&backfile
, const string
&filename
, void *p
) override
;
439 static class FSFat
: public FSBackFile
442 FSFat() { m_ext
= ".SDCARD"; };
443 int load(const string
&backfile
, const string
&filename
, shared_ptr
<FileBuffer
> p
) override
;
444 bool exist(const string
&backfile
, const string
&filename
) override
;
445 int for_each_ls(uuu_ls_file fn
, const string
&backfile
, const string
&filename
, void *p
) override
;
452 virtual int set_input_buff(void* p
, size_t sz
) = 0;
453 virtual int set_output_buff(void* p
, size_t sz
) = 0;
454 virtual size_t get_input_pos() = 0;
455 virtual size_t get_output_pos() = 0;
456 virtual int decompress() = 0;
458 virtual size_t get_default_input_size() { return 0x1000; }
459 virtual size_t decompress_size(const string
& /*backfile*/) { return 0; }
462 class FSCompressStream
: public FSBackFile
465 FSCompressStream() { m_small_pool
= g_small_memory
; }
466 int load(const string
& backfile
, const string
& filename
, shared_ptr
<FileBuffer
>outp
) override
;
467 bool exist(const string
& backfile
, const string
& filename
) override
;
468 int for_each_ls(uuu_ls_file fn
, const string
&backfile
, const string
&filename
, void *p
) override
;
469 int Decompress(const string
& backfile
, shared_ptr
<FileBuffer
>outp
) override
;
470 virtual std::shared_ptr
<CommonStream
> create_stream() { return nullptr; };
474 class Bz2stream
: public CommonStream
477 size_t m_in_size
= 0;
478 size_t m_out_size
= 0;
481 Bz2stream() { memset(&m_strm
, 0, sizeof(m_strm
)); BZ2_bzDecompressInit(&m_strm
, 0, 0); }
484 BZ2_bzDecompressEnd(&m_strm
);
486 virtual int set_input_buff(void* p
, size_t sz
) override
488 m_strm
.next_in
= (char*)p
;
489 m_strm
.avail_in
= m_in_size
= sz
;
492 virtual int set_output_buff(void* p
, size_t sz
) override
494 m_strm
.next_out
= (char*)p
;
495 m_strm
.avail_out
= m_out_size
= sz
;
498 virtual size_t get_input_pos() override
500 return m_in_size
- m_strm
.avail_in
;
502 virtual size_t get_output_pos() override
504 return m_out_size
- m_strm
.avail_out
;
506 virtual int decompress() override
508 return BZ2_bzDecompress(&m_strm
);
511 virtual size_t get_default_input_size() override
{ return 0x10000; }
514 static class FSBz2
: public FSCompressStream
517 FSBz2() { m_ext
= ".BZ2"; };
518 virtual bool seekable(const string
& backfile
) override
;
519 virtual std::shared_ptr
<CommonStream
> create_stream() override
{ return std::make_shared
<Bz2stream
>(); }
520 virtual std::shared_ptr
<FragmentBlock
> ScanCompressblock(const string
& backfile
, size_t& input_offset
, size_t& output_offset
) override
;
524 class Gzstream
: public CommonStream
527 size_t m_in_size
= 0;
528 size_t m_out_size
= 0;
533 memset(&m_strm
, 0, sizeof(m_strm
));
534 inflateInit2(&m_strm
, 15 + 16);
540 virtual int set_input_buff(void* p
, size_t sz
) override
542 m_strm
.next_in
= (Bytef
*)p
;
543 m_strm
.avail_in
= m_in_size
= sz
;
546 virtual int set_output_buff(void* p
, size_t sz
) override
548 m_strm
.next_out
= (Bytef
*)p
;
549 m_strm
.avail_out
= m_out_size
= sz
;
552 virtual size_t get_input_pos() override
554 return m_in_size
- m_strm
.avail_in
;
556 virtual size_t get_output_pos() override
558 return m_out_size
- m_strm
.avail_out
;
560 virtual int decompress() override
562 return inflate(&m_strm
, Z_SYNC_FLUSH
);
565 virtual size_t get_default_input_size() override
{ return 0x10000; }
568 static class FSGz
: public FSCompressStream
571 FSGz() { m_ext
= ".GZ"; };
572 virtual std::shared_ptr
<CommonStream
> create_stream() { return std::make_shared
<Gzstream
>(); }
575 class ZstdStream
:public CommonStream
578 ZSTD_outBuffer m_output
= { 0, 0, 0 };
579 ZSTD_inBuffer m_input
= { 0, 0, 0 };
582 virtual int set_input_buff(void* p
, size_t sz
) override
589 virtual int set_output_buff(void* p
, size_t sz
) override
596 virtual size_t get_input_pos() override
600 virtual size_t get_output_pos() override
605 virtual int decompress() override
607 return ZSTD_decompressStream(m_dctx
, &m_output
, &m_input
);
610 virtual size_t get_default_input_size() override
612 return ZSTD_DStreamInSize();
615 size_t decompress_size(const string
& backfile
) override
617 shared_ptr
<FileBuffer
> inp
= get_file_buffer(backfile
, true);
622 size_t sz
= ZSTD_DStreamInSize();
623 shared_ptr
<DataBuffer
> pb
= inp
->request_data(0, sz
);
626 size_t decompressed_sz
= ZSTD_getFrameContentSize(pb
->data(), sz
);
628 return decompressed_sz
;
632 m_dctx
= ZSTD_createDCtx();
634 virtual ~ZstdStream()
636 ZSTD_freeDCtx(m_dctx
);
640 static class FSzstd
: public FSCompressStream
643 FSzstd() { m_ext
= ".ZST"; };
644 virtual std::shared_ptr
<CommonStream
> create_stream() { return make_shared
<ZstdStream
>(); };
650 vector
<FSBasic
*> m_pFs
;
653 m_pFs
.push_back(&g_fsflat
);
654 m_pFs
.push_back(&g_fszip
);
655 m_pFs
.push_back(&g_fstar
);
656 m_pFs
.push_back(&g_fsbz2
);
657 m_pFs
.push_back(&g_fsfat
);
658 m_pFs
.push_back(&g_fsgz
);
659 m_pFs
.push_back(&g_FSzstd
);
660 m_pFs
.push_back(&g_fshttps
);
661 m_pFs
.push_back(&g_fshttp
);
664 int get_file_timesample(const string
&filename
, uint64_t *ptimesample
)
666 if (ptimesample
== nullptr)
668 set_last_err_string("ptimesample is null\n");
672 for (size_t i
= 0; i
< m_pFs
.size(); i
++)
674 if (!m_pFs
[i
]->get_file_timesample(filename
, ptimesample
))
681 int for_each_ls(uuu_ls_file fn
, string path
, void *p
)
683 for (int i
= m_pFs
.size() -1; i
>= 0; i
--)
685 string back
, filename
;
686 if (m_pFs
[i
]->split(path
, &back
, &filename
, true) == 0)
687 if(m_pFs
[i
]->for_each_ls(fn
, back
, filename
, p
)==0)
695 bool exist(const string
&filename
)
697 for (size_t i
= 0; i
< m_pFs
.size(); i
++)
700 if (m_pFs
[i
]->split(filename
, &back
, &fn
) == 0)
701 if (m_pFs
[i
]->exist(back
, fn
))
707 bool need_small_mem(const string
& filename
)
709 for (size_t i
= 0; i
< m_pFs
.size(); i
++)
712 if (m_pFs
[i
]->split(filename
, &back
, &fn
) == 0)
713 if (m_pFs
[i
]->exist(back
, fn
))
714 return m_pFs
[i
]->m_small_pool
;
718 int load(const string
&filename
, shared_ptr
<FileBuffer
> p
)
720 for (size_t i
= 0; i
< m_pFs
.size(); i
++)
723 if (m_pFs
[i
]->split(filename
, &back
, &fn
) == 0) {
724 if (m_pFs
[i
]->load(back
, fn
, p
) == 0)
730 err
= "fail open file: ";
732 set_last_err_string(err
);
737 int FSBackFile::get_file_timesample(const string
&filename
, uint64_t *ptime
)
740 if (split(filename
, &back
, &file
))
743 return g_fs_data
.get_file_timesample(back
, ptime
);
746 bool FSZip::exist(const string
&backfile
, const string
&filename
)
749 if (zip
.Open(backfile
))
752 return zip
.check_file_exist(filename
);
755 int FSZip::for_each_ls(uuu_ls_file fn
, const string
&backfile
, const string
&filename
, void *p
)
759 if (zip
.Open(backfile
))
762 for(auto it
= zip
.m_filemap
.begin(); it
!=zip
.m_filemap
.end(); ++it
)
764 if(it
->first
.substr(0, filename
.size()) == filename
|| filename
.empty())
766 string name
= backfile
;
769 fn(name
.c_str()+1, p
);
776 int zip_async_load(string zipfile
, string fn
, shared_ptr
<FileBuffer
> buff
)
778 std::lock_guard
<mutex
> lock(buff
->m_async_mutex
);
781 if (zip
.Open(zipfile
))
784 if(zip
.get_file_buff(fn
, buff
))
787 buff
->m_available_size
= buff
->m_DataSize
;
788 atomic_fetch_or(&buff
->m_dataflags
, FILEBUFFER_FLAG_LOADED
);
790 buff
->m_request_cv
.notify_all();
794 int FSZip::load(const string
&backfile
, const string
&filename
, shared_ptr
<FileBuffer
> p
)
798 if (zip
.Open(backfile
))
801 if (!zip
.check_file_exist(filename
))
804 if(zip
.get_file_buff(filename
, p
))
807 atomic_fetch_or(&p
->m_dataflags
, FILEBUFFER_FLAG_LOADED
);
808 p
->m_request_cv
.notify_all();
813 bool FSTar::exist(const string
&backfile
, const string
&filename
)
816 if (tar
.Open(backfile
))
819 return tar
.check_file_exist(filename
);
823 int FSTar::for_each_ls(uuu_ls_file fn
, const string
&backfile
, const string
&filename
, void *p
)
827 if (tar
.Open(backfile
))
830 for(auto it
= tar
.m_filemap
.begin(); it
!=tar
.m_filemap
.end(); ++it
)
832 if(it
->first
.substr(0, filename
.size()) == filename
|| filename
.empty())
834 string name
= backfile
;
837 fn(name
.c_str()+1, p
);
844 int FSTar::load(const string
&backfile
, const string
&filename
, shared_ptr
<FileBuffer
> p
)
847 if (tar
.Open(backfile
))
850 if (!tar
.check_file_exist(filename
))
853 if(tar
.get_file_buff(filename
, p
))
855 p
->m_available_size
= p
->m_DataSize
;
856 atomic_fetch_or(&p
->m_dataflags
, FILEBUFFER_FLAG_LOADED
);
857 p
->m_request_cv
.notify_all();
861 bool FSFat::exist(const string
&backfile
, const string
&filename
)
864 if (fat
.Open(backfile
))
868 return fat
.m_filemap
.find(filename
) != fat
.m_filemap
.end();
871 int FSFat::load(const string
&backfile
, const string
&filename
, shared_ptr
<FileBuffer
> p
)
874 if (fat
.Open(backfile
))
879 if(fat
.get_file_buff(filename
, p
))
882 atomic_fetch_or(&p
->m_dataflags
, FILEBUFFER_FLAG_LOADED
);
883 p
->m_request_cv
.notify_all();
888 int FSFat::for_each_ls(uuu_ls_file fn
, const string
&backfile
, const string
&filename
, void *p
)
891 if (fat
.Open(backfile
))
896 for(auto it
= fat
.m_filemap
.begin(); it
!= fat
.m_filemap
.end(); ++it
)
898 if(it
->first
.substr(0, filename
.size()) == filename
|| filename
.empty())
900 string name
= backfile
;
903 fn(name
.c_str()+1, p
);
909 bool FSCompressStream::exist(const string
&backfile
, const string
&filename
)
912 if (!g_fs_data
.exist(backfile
))
921 int FSCompressStream::for_each_ls(uuu_ls_file fn
, const string
&backfile
, const string
&/*filename*/, void *p
)
924 if(!g_fs_data
.exist(backfile
))
928 str
= backfile
+ "/*";
930 fn(str
.c_str() + 1, p
);
934 class Bz2FragmentBlock
: public FragmentBlock
937 virtual ~Bz2FragmentBlock() {}
938 int DataConvert() override
940 std::lock_guard
<mutex
> lock(m_mutex
);
942 m_actual_size
= m_output_size
;
943 m_data
.resize(m_output_size
);
945 shared_ptr
<DataBuffer
> input
= m_input
->request_data(m_input_offset
, m_input_sz
);
948 unsigned int len
= m_output_size
;
949 m_ret
= BZ2_bzBuffToBuffDecompress((char*)m_data
.data(),
951 (char*)input
->data(),
957 m_data
.resize(m_actual_size
);
959 assert(m_output_size
>= m_actual_size
);
961 atomic_fetch_or(&m_dataflags
, (int)CONVERT_DONE
);
966 shared_ptr
<FragmentBlock
> FSBz2::ScanCompressblock(const string
& backfile
, size_t& input_offset
, size_t& output_offset
)
968 shared_ptr
<FileBuffer
> pbz
;
970 pbz
= get_file_buffer(backfile
, true);
971 if (pbz
== nullptr) {
975 size_t request_size
= 1 * 1000 * 1000;
976 shared_ptr
<DataBuffer
> pd
= pbz
->request_data(input_offset
, request_size
);
980 uint8_t* p1
= pd
->data();
982 size_t sz
= min(request_size
- 10, pd
->size());
984 for (size_t i
= 0; i
< sz
; i
++)
986 uint16_t* header
= (uint16_t*)p1
++;
987 if (*header
== 0x5a42) //"BZ"
989 uint32_t* magic1
= (uint32_t*)&pd
->at(i
+ 4);
990 if (*magic1
== 0x26594131 && pd
->at(i
+ 2) == 'h') //PI 3.1415926
992 uint16_t* magic2
= (uint16_t*)&pd
->at(i
+ 8);
993 if (*magic2
== 0x5953)
995 shared_ptr
<FragmentBlock
> p
= shared_ptr
<FragmentBlock
>(new Bz2FragmentBlock
);
998 p
->m_actual_size
= 0;
1000 p
->m_input_offset
= input_offset
+ i
;
1001 p
->m_output_offset
= output_offset
;
1002 p
->m_output_size
= (pd
->at(i
+ 3) - '0') * 100 * 1000; /* not l024 for bz2 */
1003 p
->m_input_sz
= request_size
;
1005 input_offset
+= i
+ 8;
1007 output_offset
+= p
->m_output_size
;
1017 bool FSBz2::seekable(const string
& backfile
)
1019 shared_ptr
<FileBuffer
> file
= get_file_buffer(backfile
, true);
1020 shared_ptr
<DataBuffer
> p
= file
->request_data(0, 1024*1024);
1026 uint8_t* ptr
= p
->data();
1028 for (size_t i
= 0; i
< p
->size(); i
++)
1030 if (ptr
[0] == 'B' && ptr
[1] == 'Z' && ptr
[2] == 'h' && ptr
[4] == '1' && ptr
[5] == 'A' && ptr
[6] == 'Y' && ptr
[7] == '&' && ptr
[8] == 'S' && ptr
[9] == 'Y')
1043 int FSCompressStream::Decompress(const string
& backfile
, shared_ptr
<FileBuffer
>outp
)
1045 shared_ptr
<CommonStream
> cs
= create_stream();
1049 ssize_t lastRet
= 0;
1050 size_t outOffset
= 0;
1051 shared_ptr
<FileBuffer
> inp
= get_file_buffer(backfile
, true);
1057 size_t sz
= cs
->decompress_size(backfile
);
1061 atomic_fetch_or(&outp
->m_dataflags
, FILEBUFFER_FLAG_KNOWN_SIZE
);
1064 std::shared_ptr
<DataBuffer
> buff
;
1065 buff
= inp
->request_data(0, 0x1000);
1071 ut
.type
= uuu_notify::NOTIFY_DECOMPRESS_START
;
1072 ut
.str
= (char*)backfile
.c_str();
1075 shared_ptr
<FragmentBlock
> blk
;
1076 blk
= outp
->get_map_it(0, true);
1078 blk
= outp
->request_new_blk();
1081 lock_guard
<mutex
> l(outp
->m_seg_map_mutex
);
1082 outp
->m_last_db
= blk
;
1085 cs
->set_output_buff(blk
->data(), blk
->m_output_size
);
1087 while ((buff
= inp
->request_data(offset
, cs
->get_default_input_size())))
1092 //ZSTD_inBuffer input = { buff->data(), buff->size(), 0 };
1093 cs
->set_input_buff(buff
->data(), buff
->size());
1094 /* Given a valid frame, zstd won't consume the last byte of the frame
1095 * until it has flushed all of the decompressed data of the frame.
1096 * Therefore, instead of checking if the return code is 0, we can
1097 * decompress just check if input.pos < input.size.
1099 while (cs
->get_input_pos() < min(buff
->size(), inp
->m_DataSize
- offset
))
1101 /* The return code is zero if the frame is complete, but there may
1102 * be multiple frames concatenated together. Zstd will automatically
1103 * reset the context when a frame is complete. Still, calling
1104 * ZSTD_DCtx_reset() can be useful to reset the context to a clean
1105 * state, for instance if the last decompression call returned an
1108 size_t old
= cs
->get_output_pos();
1109 ssize_t
const ret
= cs
->decompress(); //ZSTD_decompressStream(dctx, &output, &input);
1110 lastRet
= min(lastRet
, ret
);
1111 outOffset
+= cs
->get_output_pos() - old
;
1115 set_last_err_string("decompress error");
1116 outp
->m_request_cv
.notify_all();
1121 blk
->m_actual_size
= cs
->get_output_pos();
1123 atomic_fetch_or(&blk
->m_dataflags
, (int)FragmentBlock::CONVERT_PARTIAL
);
1125 ut
.type
= uuu_notify::NOTIFY_DECOMPRESS_POS
;
1126 ut
.index
= outOffset
;
1128 outp
->m_available_size
= outOffset
;
1129 outp
->m_request_cv
.notify_all();
1132 if (cs
->get_output_pos() == blk
->m_output_size
)
1134 atomic_fetch_or(&blk
->m_dataflags
, (int)FragmentBlock::CONVERT_DONE
);
1135 if (!(cs
->get_input_pos() == buff
->size() &&
1136 buff
->size() == (inp
->size() - offset
)))
1138 blk
= outp
->get_map_it(outOffset
, true);
1140 blk
= outp
->request_new_blk();
1141 cs
->set_output_buff(blk
->data(), blk
->m_output_size
);
1144 lock_guard
<mutex
> l(outp
->m_seg_map_mutex
);
1145 outp
->m_last_db
= blk
;
1150 if (outp
->m_reset_stream
)
1152 outp
->m_reset_stream
= false;
1157 offset
+= cs
->get_default_input_size();
1159 outp
->resize(outOffset
);
1161 atomic_fetch_or(&blk
->m_dataflags
, (int)FragmentBlock::CONVERT_DONE
);
1162 atomic_fetch_or(&outp
->m_dataflags
, FILEBUFFER_FLAG_LOADED
);
1164 outp
->m_request_cv
.notify_all();
1170 uint64_t get_file_timesample(string filename
)
1173 g_fs_data
.get_file_timesample(filename
, &time
);
1177 shared_ptr
<FileBuffer
> get_file_buffer(string filename
, bool async
)
1179 filename
= remove_quota(filename
);
1181 if (!filename
.empty() && filename
[0] != MAGIC_PATH
)
1183 if (filename
== "..")
1184 filename
= g_current_dir
.substr(0, g_current_dir
.size() - 1);
1186 filename
= g_current_dir
+ filename
;
1192 path
.replace('\\', '/');
1198 std::lock_guard
<mutex
> lock(g_mutex_map
);
1199 find
= (g_filebuffer_map
.find(filename
) == g_filebuffer_map
.end());
1204 shared_ptr
<FileBuffer
> p(new FileBuffer
);
1206 if (p
->reload(filename
, async
))
1210 std::lock_guard
<mutex
> lock(g_mutex_map
);
1211 g_filebuffer_map
[filename
] = p
;
1217 shared_ptr
<FileBuffer
> p
;
1219 std::lock_guard
<mutex
> lock(g_mutex_map
);
1220 p
= g_filebuffer_map
[filename
];
1222 if (p
->m_timesample
!= get_file_timesample(filename
))
1223 if (p
->reload(filename
, async
))
1228 if (!p
->IsLoaded() && !async
)
1230 std::lock_guard
<mutex
> lock(p
->m_async_mutex
);
1232 if(p
->m_async_thread
.joinable())
1233 p
->m_async_thread
.join();
1245 FileBuffer::FileBuffer()
1247 m_pDatabuffer
= nullptr;
1251 m_available_size
= 0;
1254 FileBuffer::FileBuffer(void *p
, size_t sz
)
1256 m_pDatabuffer
= nullptr;
1260 m_pDatabuffer
= (uint8_t*)malloc(sz
);
1261 m_MemSize
= m_DataSize
= sz
;
1263 memcpy(m_pDatabuffer
, p
, sz
);
1266 atomic_fetch_or(&m_dataflags
, FILEBUFFER_FLAG_LOADED
);
1269 FileBuffer::~FileBuffer()
1271 m_reset_stream
= true;
1273 if(m_async_thread
.joinable())
1274 m_async_thread
.join();
1278 if(m_allocate_way
== ALLOCATION_WAYS::MMAP
)
1280 if(m_allocate_way
== ALLOCATION_WAYS::MALLOC
)
1281 free(m_pDatabuffer
);
1285 int FileBuffer::mapfile(const string
&filename
, size_t sz
)
1289 m_Request
.StructureVersion
= REQUEST_OPLOCK_CURRENT_VERSION
;
1290 m_Request
.StructureLength
= sizeof(REQUEST_OPLOCK_INPUT_BUFFER
);
1291 m_Request
.RequestedOplockLevel
= (OPLOCK_LEVEL_CACHE_READ
| OPLOCK_LEVEL_CACHE_HANDLE
);
1292 m_Request
.Flags
= REQUEST_OPLOCK_INPUT_FLAG_REQUEST
;
1294 REQUEST_OPLOCK_OUTPUT_BUFFER Response
;
1296 m_OverLapped
.hEvent
= CreateEvent(nullptr, TRUE
, FALSE
, nullptr);
1297 ResetEvent(m_OverLapped
.hEvent
);
1299 m_file_handle
= CreateFile(filename
.c_str(),
1301 FILE_SHARE_READ
| FILE_SHARE_WRITE
,
1304 FILE_ATTRIBUTE_NORMAL
| FILE_FLAG_RANDOM_ACCESS
| FILE_FLAG_OVERLAPPED
,
1307 if (m_file_handle
== INVALID_HANDLE_VALUE
)
1309 string err
= "Create File Failure ";
1311 set_last_err_string(err
);
1315 BOOL bSuccess
= DeviceIoControl(m_file_handle
,
1316 FSCTL_REQUEST_OPLOCK
,
1324 if (bSuccess
|| GetLastError() == ERROR_IO_PENDING
)
1326 m_file_monitor
= thread(file_overwrite_monitor
, filename
, this);
1329 m_file_map
= CreateFileMapping(m_file_handle
,
1330 nullptr, PAGE_READONLY
, 0, 0, nullptr);
1332 if (m_file_map
== INVALID_HANDLE_VALUE
)
1334 set_last_err_string("Fail create Map");
1338 m_pDatabuffer
= (uint8_t *)MapViewOfFile(m_file_map
, FILE_MAP_READ
, 0, 0, sz
);
1341 m_allocate_way
= ALLOCATION_WAYS::MMAP
;
1344 int fd
= open(filename
.c_str(), O_RDONLY
);
1348 err
+= "xx Failure open file: ";
1350 set_last_err_string(err
);
1354 m_pDatabuffer
= (uint8_t *)mmap64(0, sz
, PROT_READ
, MAP_SHARED
, fd
, 0);
1355 if (m_pDatabuffer
== MAP_FAILED
) {
1356 m_pDatabuffer
= nullptr;
1357 set_last_err_string("mmap failure\n");
1362 m_allocate_way
= ALLOCATION_WAYS::MMAP
;
1369 set_last_err_string("mmap file failure");
1373 int FileBuffer::ref_other_buffer(shared_ptr
<FileBuffer
> p
, size_t offset
, size_t size
)
1375 m_pDatabuffer
= p
->data() + offset
;
1376 m_DataSize
= m_MemSize
= size
;
1377 m_available_size
= m_DataSize
;
1378 m_allocate_way
= ALLOCATION_WAYS::REF
;
1381 atomic_fetch_or(&m_dataflags
, FILEBUFFER_FLAG_LOADED
);
1385 int FileBuffer::reload(string filename
, bool async
)
1388 if(!g_fs_data
.exist(filename
))
1391 if (g_fs_data
.need_small_mem(filename
))
1392 m_allocate_way
= ALLOCATION_WAYS::SEGMENT
;
1394 m_async_thread
= thread(&FS_DATA::load
, &g_fs_data
, filename
, shared_from_this());
1398 if(g_fs_data
.load(filename
, shared_from_this()))
1401 m_timesample
= get_file_timesample(filename
);
1402 m_filename
= filename
;
1407 int FileBuffer::request_data(std::vector
<uint8_t> &data
, size_t offset
, size_t sz
)
1410 ret
= request_data(data
.data(), offset
, sz
);
1421 void FileBuffer::truncate_old_data_in_pool()
1423 if (!g_small_memory
)
1427 std::unique_lock
<std::mutex
> lock(this->m_seg_map_mutex
);
1429 if (m_last_request_offset
< m_total_buffer_size
/2)
1432 size_t off
= m_last_request_offset
- m_total_buffer_size
/2;
1434 for (auto it
= m_seg_map
.lower_bound(off
); it
!= m_seg_map
.end(); it
++)
1436 auto blk
= it
->second
;
1437 std::unique_lock
<std::mutex
> lock(blk
->m_mutex
);
1439 if ((blk
->m_dataflags
& FragmentBlock::CONVERT_DONE
)
1440 /* && !(blk->m_dataflags & FragmentBlock::USING)*/
1443 blk
->m_dataflags
= 0;
1444 blk
->m_actual_size
= 0;
1446 blk
->m_data
.swap(v
);
1451 int64_t FileBuffer::request_data_from_segment(void *data
, size_t offset
, size_t sz
)
1453 size_t return_sz
= 0;
1457 m_last_request_offset
= offset
;
1458 std::unique_lock
<std::mutex
> lck(m_request_cv_mutex
);
1460 shared_ptr
<FragmentBlock
> blk
;
1462 m_pool_load_cv
.notify_all();
1464 while (!(blk
= get_map_it(offset
)))
1468 if(offset
>= this->m_DataSize
)
1471 auto now
= std::chrono::system_clock::now();
1472 m_request_cv
.wait_until(lck
, now
+ 500ms
);
1476 shared_ptr
<FragmentBlock
> last_decompress_db
;
1478 std::unique_lock
<std::mutex
> lock(m_seg_map_mutex
);
1479 last_decompress_db
= m_last_db
;
1483 std::unique_lock
<std::mutex
> lock(blk
->m_mutex
);
1485 if (blk
->m_actual_size
>= (offset
+ sz
- blk
->m_output_offset
))
1488 if (!(m_dataflags
& FILEBUFFER_FLAG_PARTIAL_RELOADABLE
))
1490 if (last_decompress_db
)
1492 if (offset
< last_decompress_db
->m_output_offset
&& !(blk
->m_dataflags
& FragmentBlock::CONVERT_DONE
))
1494 m_reset_stream
= true;
1503 if ((blk
->m_dataflags
& FragmentBlock::CONVERT_DONE
))
1505 atomic_fetch_or(&blk
->m_dataflags
, (int)FragmentBlock::USING
);
1509 auto now
= std::chrono::system_clock::now();
1510 m_request_cv
.wait_until(lck
, now
+ 500ms
);
1516 m_available_size
= 0;
1517 this->m_async_thread
.join();
1518 m_reset_stream
= false;
1520 this->reload(m_filename
, true);
1525 std::unique_lock
<std::mutex
> lock(blk
->m_mutex
);
1527 size_t off
= offset
- blk
->m_output_offset
;
1529 assert(offset
>= blk
->m_output_offset
);
1531 size_t item_sz
= blk
->m_actual_size
- off
;
1533 if (off
> blk
->m_actual_size
)
1538 memcpy(data
, blk
->data() + off
, sz
);
1539 atomic_fetch_and(&blk
->m_dataflags
, ~FragmentBlock::USING
);
1544 else if (item_sz
== 0)
1550 memcpy(data
, blk
->m_data
.data() + off
, item_sz
);
1551 data
= ((uint8_t*)data
) + item_sz
;
1555 return_sz
+= item_sz
;
1563 int64_t FileBuffer::request_data(void *data
, size_t offset
, size_t sz
)
1565 bool needlock
= false;
1570 if (offset
>= this->size())
1572 set_last_err_string("request offset exceed memory size");
1576 if (this->m_allocate_way
== FileBuffer::ALLOCATION_WAYS::SEGMENT
)
1577 return request_data_from_segment(data
, offset
, sz
);
1582 if (this->m_allocate_way
== FileBuffer::ALLOCATION_WAYS::SEGMENT
)
1583 return request_data_from_segment(data
, offset
, sz
);
1585 std::unique_lock
<std::mutex
> lck(m_request_cv_mutex
);
1586 while ((offset
+ sz
> m_available_size
) && !IsLoaded())
1590 set_last_err_string("Async request data error");
1593 m_request_cv
.wait(lck
);
1598 if (offset
> m_available_size
)
1600 set_last_err_string("request offset execeed memory size");
1608 if (offset
+ size
>= m_available_size
)
1609 size
= m_available_size
- offset
;
1611 if (needlock
) m_data_mutex
.lock();
1615 memcpy(data
, this->data() + offset
, size
);
1620 set_last_err_string("Out of memory");
1621 ret
= ERR_OUT_MEMORY
;
1623 if (needlock
) m_data_mutex
.unlock();
1628 std::shared_ptr
<FragmentBlock
> FileBuffer::request_new_blk()
1630 if (m_allocate_way
== ALLOCATION_WAYS::SEGMENT
)
1632 if (m_seg_map
.empty())
1634 std::shared_ptr
<FragmentBlock
> p(new FragmentBlock
);
1635 lock_guard
<mutex
> lock(m_seg_map_mutex
);
1636 p
->m_output_size
= m_seg_blk_size
;
1637 p
->m_data
.resize(m_seg_blk_size
);
1646 truncate_old_data_in_pool();
1649 lock_guard
<mutex
> lock(m_seg_map_mutex
);
1650 shared_ptr
<FragmentBlock
> p
= m_seg_map
.begin()->second
;
1651 offset
= p
->m_output_offset
;
1654 while (offset
> m_last_request_offset
+ m_total_buffer_size
)
1658 std::unique_lock
<std::mutex
> lck(m_pool_load_cv_mutex
);
1659 m_pool_load_cv
.wait(lck
);
1664 lock_guard
<mutex
> lock(m_seg_map_mutex
);
1665 shared_ptr
<FragmentBlock
> p
= m_seg_map
.begin()->second
;
1666 offset
= p
->m_output_offset
;
1669 offset
+= m_seg_blk_size
;
1671 std::shared_ptr
<FragmentBlock
> p(new FragmentBlock
);
1672 p
->m_output_size
= m_seg_blk_size
;
1673 p
->m_output_offset
= offset
;
1674 p
->m_data
.resize(m_seg_blk_size
);
1677 lock_guard
<mutex
> lock(m_seg_map_mutex
);
1678 m_seg_map
[offset
] = p
;
1685 std::shared_ptr
<FragmentBlock
> p(new FragmentBlock
);
1686 p
->m_pData
= this->m_pDatabuffer
;
1687 p
->m_output_size
= this->m_MemSize
;
1692 std::shared_ptr
<DataBuffer
> FileBuffer::request_data(size_t offset
, size_t sz
)
1694 shared_ptr
<DataBuffer
> p(new DataBuffer
);
1696 if (IsLoaded() && IsRefable())
1698 if (offset
>= this->size())
1700 set_last_err_string("request offset bigger than file size");
1705 if (offset
+ sz
> this->size())
1706 size
= this->size() - offset
;
1708 if (!p
->ref_other_buffer(shared_from_this(), offset
, size
))
1713 sz
= size() - offset
;
1716 int64_t ret
= request_data(p
->m_pDatabuffer
, offset
, sz
);
1724 int FileBuffer::reserve(size_t sz
)
1726 assert(m_allocate_way
== ALLOCATION_WAYS::MALLOC
);
1730 m_pDatabuffer
= (uint8_t*)realloc(m_pDatabuffer
, sz
);
1733 if (m_pDatabuffer
== nullptr)
1735 set_last_err_string("Out of memory\n");
1743 int FileBuffer::resize(size_t sz
)
1745 if (this->m_allocate_way
== ALLOCATION_WAYS::SEGMENT
)
1751 if (this->m_allocate_way
== ALLOCATION_WAYS::REF
)
1753 if (sz
> m_DataSize
)
1761 int ret
= reserve(sz
);
1767 int FileBuffer::swap(FileBuffer
&a
)
1769 std::swap(m_pDatabuffer
, a
.m_pDatabuffer
);
1770 std::swap(m_DataSize
, a
.m_DataSize
);
1771 std::swap(m_MemSize
, a
.m_MemSize
);
1772 std::swap(m_allocate_way
, a
.m_allocate_way
);
1777 int FileBuffer::unmapfile()
1782 UnmapViewOfFile(m_pDatabuffer
);
1783 m_pDatabuffer
= nullptr;
1784 CloseHandle(m_file_map
);
1785 CloseHandle(m_file_handle
);
1786 SetEvent(m_OverLapped
.hEvent
);
1788 if (m_file_monitor
.joinable())
1789 m_file_monitor
.join();
1791 CloseHandle(m_OverLapped
.hEvent
);
1792 m_OverLapped
.hEvent
= m_file_map
= m_file_handle
= INVALID_HANDLE_VALUE
;
1794 munmap(m_pDatabuffer
, m_DataSize
);
1796 m_pDatabuffer
= nullptr;
1801 bool check_file_exist(string filename
, bool /*start_async_load*/)
1804 fn
+= remove_quota(filename
);
1806 if (!fn
.empty() && fn
[0] != MAGIC_PATH
)
1809 path
+= g_current_dir
.substr(0, g_current_dir
.size() - 1);
1811 path
+= g_current_dir
+ fn
;
1817 path
.replace('\\', '/');
1821 return g_fs_data
.exist(path
);
1826 int file_overwrite_monitor(string filename
, FileBuffer
*p
)
1828 WaitForSingleObject(p
->m_OverLapped
.hEvent
, INFINITE
);
1834 if(p
->m_pDatabuffer
&& p
->get_m_allocate_way() == FileBuffer::ALLOCATION_WAYS::MMAP
)
1836 std::lock_guard
<mutex
> lock(g_mutex_map
);
1837 p
->m_file_monitor
.detach(); /*Detach itself, erase will delete p*/
1838 if(g_filebuffer_map
.find(str
) != g_filebuffer_map
.end())
1839 g_filebuffer_map
.erase(str
);
1846 int uuu_for_each_ls_file(uuu_ls_file fn
, const char *file_path
, void *p
)
1851 string f
= file_path
;
1856 }else if( f
[0] == '/')
1865 path
.replace('\\', '/');
1868 return g_fs_data
.for_each_ls(fn
, f
, p
);
1871 int FSCompressStream::load(const string
& backfile
, const string
& filename
, shared_ptr
<FileBuffer
>outp
)
1873 if (!g_fs_data
.exist(backfile
))
1876 str
= "Failure open file:";
1878 set_last_err_string(str
);
1881 if (filename
!= "*")
1884 string decompressed_name
= backfile
+ star
;
1885 shared_ptr
<FileBuffer
> decompressed_file
= get_file_buffer(decompressed_name
);
1887 tar
.Open(decompressed_name
);
1888 if (tar
.get_file_buff(filename
, outp
))
1890 outp
->m_available_size
= outp
->m_DataSize
;
1893 if (seekable(backfile
))
1896 size_t decompress_off
= 0;
1897 shared_ptr
<FragmentBlock
> p
;
1898 size_t total_size
= 0;
1900 atomic_fetch_or(&outp
->m_dataflags
, FILEBUFFER_FLAG_PARTIAL_RELOADABLE
);
1902 int nthread
= thread::hardware_concurrency();
1904 vector
<thread
> threads
;
1906 for (int i
= 0; i
< nthread
; i
++)
1908 threads
.push_back(thread(&FSCompressStream::PreloadWorkThread
, this, outp
));
1911 while ((p
= ScanCompressblock(backfile
, offset
, decompress_off
)))
1917 lock_guard
<mutex
> lock(outp
->m_seg_map_mutex
);
1918 outp
->m_seg_map
[p
->m_output_offset
] = p
;
1921 outp
->m_request_cv
.notify_all();
1922 outp
->m_pool_load_cv
.notify_all();
1924 total_size
= p
->m_output_offset
+ p
->m_output_size
;
1926 if (outp
->m_reset_stream
)
1930 outp
->m_DataSize
= total_size
;
1932 atomic_fetch_or(&outp
->m_dataflags
, FILEBUFFER_FLAG_KNOWN_SIZE
| FILEBUFFER_FLAG_SEG_DONE
);
1933 outp
->m_request_cv
.notify_one();
1935 for (int i
= 0; i
< nthread
; i
++)
1942 return Decompress(backfile
, outp
);
1945 int FSBasic::PreloadWorkThread(shared_ptr
<FileBuffer
>outp
)
1947 while (!outp
->m_reset_stream
)
1949 size_t request_offset
= outp
->m_last_request_offset
;
1951 lock_guard
<mutex
> lock(outp
->m_seg_map_mutex
);
1952 if (!outp
->m_offset_request
.empty())
1954 request_offset
= outp
->m_offset_request
.front();
1955 outp
->m_offset_request
.pop();
1959 shared_ptr
<FragmentBlock
> blk
;
1962 lock_guard
<mutex
> lock(outp
->m_seg_map_mutex
);
1964 auto low
= outp
->m_seg_map
.begin();
1966 low
= outp
->m_seg_map
.lower_bound(request_offset
);
1969 while (low
!= outp
->m_seg_map
.end() )
1971 if (!(low
->second
->m_dataflags
& (FragmentBlock::CONVERT_START
)))
1976 if (g_small_memory
&& count
>= 5)
1980 if (low
!= outp
->m_seg_map
.end()) {
1981 if (!(low
->second
->m_dataflags
& FragmentBlock::CONVERT_START
))
1983 atomic_fetch_or(&low
->second
->m_dataflags
, (int)FragmentBlock::CONVERT_START
);
1990 (blk
&& (blk
->m_dataflags
& FragmentBlock::CONVERT_DONE
))
1993 std::unique_lock
<std::mutex
> lck(outp
->m_pool_load_cv_mutex
);
1994 outp
->m_pool_load_cv
.wait(lck
);
1998 outp
->truncate_old_data_in_pool();
2000 if (blk
->DataConvert() < 0)
2002 // todo error handle;
2006 atomic_fetch_or(&blk
->m_dataflags
, (int)FragmentBlock::CONVERT_DONE
);
2007 outp
->m_request_cv
.notify_all();
2012 int FSHttp::load(const string
& backfile
, const string
& filename
, shared_ptr
<FileBuffer
> p
)
2014 shared_ptr
<HttpStream
> http
= make_shared
<HttpStream
>();
2016 if (http
->HttpGetHeader(backfile
, filename
, m_Port
, typeid(*this) == typeid(FSHttps
)))
2019 size_t sz
= http
->HttpGetFileSize();
2023 atomic_fetch_or(&p
->m_dataflags
, FILEBUFFER_FLAG_KNOWN_SIZE
);
2024 p
->m_request_cv
.notify_all();
2026 return http_load(http
, p
, backfile
);
2029 void uuu_set_small_mem(uint32_t val
)
2031 g_small_memory
= !!val
;
2034 void clean_up_filemap()
2036 for (auto it
: g_filebuffer_map
)
2038 it
.second
->m_reset_stream
= true;
2039 it
.second
->m_pool_load_cv
.notify_all();
2040 if (it
.second
->m_async_thread
.joinable())
2041 it
.second
->m_async_thread
.join();
2043 g_filebuffer_map
.clear();