1 /* MtDec.c -- Multi-thread Decoder
2 2018-03-02 : Igor Pavlov : Public domain */
6 // #define SHOW_DEBUG_INFO
10 #ifdef SHOW_DEBUG_INFO
14 #ifdef SHOW_DEBUG_INFO
20 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
26 void MtProgress_Init(CMtProgress
*p
, ICompressProgress
*progress
)
28 p
->progress
= progress
;
35 SRes
MtProgress_Progress_ST(CMtProgress
*p
)
37 if (p
->res
== SZ_OK
&& p
->progress
)
38 if (ICompressProgress_Progress(p
->progress
, p
->totalInSize
, p
->totalOutSize
) != SZ_OK
)
39 p
->res
= SZ_ERROR_PROGRESS
;
44 SRes
MtProgress_ProgressAdd(CMtProgress
*p
, UInt64 inSize
, UInt64 outSize
)
47 CriticalSection_Enter(&p
->cs
);
49 p
->totalInSize
+= inSize
;
50 p
->totalOutSize
+= outSize
;
51 if (p
->res
== SZ_OK
&& p
->progress
)
52 if (ICompressProgress_Progress(p
->progress
, p
->totalInSize
, p
->totalOutSize
) != SZ_OK
)
53 p
->res
= SZ_ERROR_PROGRESS
;
56 CriticalSection_Leave(&p
->cs
);
61 SRes
MtProgress_GetError(CMtProgress
*p
)
64 CriticalSection_Enter(&p
->cs
);
66 CriticalSection_Leave(&p
->cs
);
71 void MtProgress_SetError(CMtProgress
*p
, SRes res
)
73 CriticalSection_Enter(&p
->cs
);
76 CriticalSection_Leave(&p
->cs
);
80 #define RINOK_THREAD(x) RINOK(x)
83 static WRes
ArEvent_OptCreate_And_Reset(CEvent
*p
)
85 if (Event_IsCreated(p
))
86 return Event_Reset(p
);
87 return AutoResetEvent_CreateNotSignaled(p
);
98 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
99 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
103 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
ThreadFunc(void *pp
);
106 static WRes
MtDecThread_CreateEvents(CMtDecThread
*t
)
108 WRes wres
= ArEvent_OptCreate_And_Reset(&t
->canWrite
);
111 wres
= ArEvent_OptCreate_And_Reset(&t
->canRead
);
119 static SRes
MtDecThread_CreateAndStart(CMtDecThread
*t
)
121 WRes wres
= MtDecThread_CreateEvents(t
);
122 // wres = 17; // for test
125 if (Thread_WasCreated(&t
->thread
))
127 wres
= Thread_Create(&t
->thread
, ThreadFunc
, t
);
131 return MY_SRes_HRESULT_FROM_WRes(wres
);
135 void MtDecThread_FreeInBufs(CMtDecThread
*t
)
139 void *link
= t
->inBuf
;
143 void *next
= ((CMtDecBufLink
*)link
)->next
;
144 ISzAlloc_Free(t
->mtDec
->alloc
, link
);
152 static void MtDecThread_CloseThread(CMtDecThread
*t
)
154 if (Thread_WasCreated(&t
->thread
))
156 Event_Set(&t
->canWrite
); /* we can disable it. There are no threads waiting canWrite in normal cases */
157 Event_Set(&t
->canRead
);
158 Thread_Wait(&t
->thread
);
159 Thread_Close(&t
->thread
);
162 Event_Close(&t
->canRead
);
163 Event_Close(&t
->canWrite
);
166 static void MtDec_CloseThreads(CMtDec
*p
)
169 for (i
= 0; i
< MTDEC__THREADS_MAX
; i
++)
170 MtDecThread_CloseThread(&p
->threads
[i
]);
173 static void MtDecThread_Destruct(CMtDecThread
*t
)
175 MtDecThread_CloseThread(t
);
176 MtDecThread_FreeInBufs(t
);
181 static SRes
FullRead(ISeqInStream
*stream
, Byte
*data
, size_t *processedSize
)
183 size_t size
= *processedSize
;
188 SRes res
= ISeqInStream_Read(stream
, data
, &cur
);
189 *processedSize
+= cur
;
200 static SRes
MtDec_GetError_Spec(CMtDec
*p
, UInt64 interruptIndex
, Bool
*wasInterrupted
)
203 CriticalSection_Enter(&p
->mtProgress
.cs
);
204 *wasInterrupted
= (p
->needInterrupt
&& interruptIndex
> p
->interruptIndex
);
205 res
= p
->mtProgress
.res
;
206 CriticalSection_Leave(&p
->mtProgress
.cs
);
210 static SRes
MtDec_Progress_GetError_Spec(CMtDec
*p
, UInt64 inSize
, UInt64 outSize
, UInt64 interruptIndex
, Bool
*wasInterrupted
)
213 CriticalSection_Enter(&p
->mtProgress
.cs
);
215 p
->mtProgress
.totalInSize
+= inSize
;
216 p
->mtProgress
.totalOutSize
+= outSize
;
217 if (p
->mtProgress
.res
== SZ_OK
&& p
->mtProgress
.progress
)
218 if (ICompressProgress_Progress(p
->mtProgress
.progress
, p
->mtProgress
.totalInSize
, p
->mtProgress
.totalOutSize
) != SZ_OK
)
219 p
->mtProgress
.res
= SZ_ERROR_PROGRESS
;
221 *wasInterrupted
= (p
->needInterrupt
&& interruptIndex
> p
->interruptIndex
);
222 res
= p
->mtProgress
.res
;
224 CriticalSection_Leave(&p
->mtProgress
.cs
);
229 static void MtDec_Interrupt(CMtDec
*p
, UInt64 interruptIndex
)
231 CriticalSection_Enter(&p
->mtProgress
.cs
);
232 if (!p
->needInterrupt
|| interruptIndex
< p
->interruptIndex
)
234 p
->interruptIndex
= interruptIndex
;
235 p
->needInterrupt
= True
;
237 CriticalSection_Leave(&p
->mtProgress
.cs
);
240 Byte
*MtDec_GetCrossBuff(CMtDec
*p
)
242 Byte
*cr
= p
->crossBlock
;
245 cr
= (Byte
*)ISzAlloc_Alloc(p
->alloc
, MTDEC__LINK_DATA_OFFSET
+ p
->inBufSize
);
250 return MTDEC__DATA_PTR_FROM_LINK(cr
);
255 ThreadFunc2() returns:
256 0 - in all normal cases (even for stream error or memory allocation error)
257 (!= 0) - WRes error return by system threading function
260 // #define MTDEC_ProgessStep (1 << 22)
261 #define MTDEC_ProgessStep (1 << 0)
263 static WRes
ThreadFunc2(CMtDecThread
*t
)
265 CMtDec
*p
= t
->mtDec
;
267 PRF_STR_INT("ThreadFunc2", t
->index
);
269 // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
274 Bool wasInterrupted
, isAllocError
, overflow
, finish
;
275 SRes threadingErrorSRes
;
276 Bool needCode
, needWrite
, needContinue
;
278 size_t inDataSize_Start
;
280 // UInt64 inDataSize_Full;
289 Byte
*afterEndData
= NULL
;
290 size_t afterEndData_Size
= 0;
292 Bool canCreateNewThread
= False
;
293 // CMtDecCallbackInfo parse;
294 CMtDecThread
*nextThread
;
296 PRF_STR_INT("Event_Wait(&t->canRead)", t
->index
);
298 RINOK_THREAD(Event_Wait(&t
->canRead
));
302 PRF_STR_INT("after Event_Wait(&t->canRead)", t
->index
);
304 // if (t->index == 3) return 19; // for test
306 blockIndex
= p
->blockIndex
++;
308 // PRF(printf("\ncanRead\n"))
310 res
= MtDec_Progress_GetError_Spec(p
, 0, 0, blockIndex
, &wasInterrupted
);
312 finish
= p
->readWasFinished
;
315 isAllocError
= False
;
318 inDataSize_Start
= 0;
320 // inDataSize_Full = 0;
322 if (res
== SZ_OK
&& !wasInterrupted
)
326 CMtDecBufLink
*prev
= NULL
;
327 CMtDecBufLink
*link
= (CMtDecBufLink
*)t
->inBuf
;
328 size_t crossSize
= p
->crossEnd
- p
->crossStart
;
330 PRF(printf("\ncrossSize = %d\n", crossSize
));
336 link
= (CMtDecBufLink
*)ISzAlloc_Alloc(p
->alloc
, MTDEC__LINK_DATA_OFFSET
+ p
->inBufSize
);
340 // p->allocError_for_Read_BlockIndex = blockIndex;
347 // static unsigned g_num = 0;
348 // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
352 t
->inBuf
= (void *)link
;
356 Byte
*data
= MTDEC__DATA_PTR_FROM_LINK(link
);
357 Byte
*parseData
= data
;
362 inDataSize
= crossSize
;
363 // inDataSize_Full = inDataSize;
364 inDataSize_Start
= crossSize
;
366 parseData
= MTDEC__DATA_PTR_FROM_LINK(p
->crossBlock
) + p
->crossStart
;
367 PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",
368 (int)p
->crossStart
, (int)p
->crossEnd
, (int)finish
));
374 res
= FullRead(p
->inStream
, data
, &size
);
376 // size = 10; // test
379 // inDataSize_Full = inDataSize;
381 inDataSize_Start
= size
;
383 p
->readProcessed
+= size
;
384 finish
= (size
!= p
->inBufSize
);
386 p
->readWasFinished
= True
;
388 // res = E_INVALIDARG; // test
392 // PRF(printf("\nRead error = %d\n", res))
393 // we want to decode all data before error
395 // p->readError_BlockIndex = blockIndex;
396 p
->readWasFinished
= True
;
402 if (inDataSize
- inPrev
>= MTDEC_ProgessStep
)
404 res
= MtDec_Progress_GetError_Spec(p
, 0, 0, blockIndex
, &wasInterrupted
);
405 if (res
!= SZ_OK
|| wasInterrupted
)
412 CMtDecCallbackInfo parse
;
414 parse
.startCall
= (prev
== NULL
);
415 parse
.src
= parseData
;
416 parse
.srcSize
= size
;
417 parse
.srcFinished
= finish
;
418 parse
.canCreateNewThread
= True
;
420 // PRF(printf("\nParse size = %d\n", (unsigned)size))
422 p
->mtCallback
->Parse(p
->mtCallbackObject
, t
->index
, &parse
);
425 canCreateNewThread
= parse
.canCreateNewThread
;
427 // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
430 // parseRes != SZ_OK ||
431 // inDataSize - (size - parse.srcSize) > p->inBlockMax
433 parse
.state
== MTDEC_PARSE_OVERFLOW
437 // Overflow or Parse error - switch from MT decoding to ST decoding
442 PRF(printf("\n Overflow"));
443 // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
444 PRF(printf("\n inDataSize = %d", (unsigned)inDataSize
));
448 memcpy(data
, parseData
, size
);
456 memcpy(data
, parseData
, parse
.srcSize
);
457 p
->crossStart
+= parse
.srcSize
;
460 if (parse
.state
!= MTDEC_PARSE_CONTINUE
|| finish
)
462 // we don't need to parse in current thread anymore
464 if (parse
.state
== MTDEC_PARSE_END
)
468 // p->crossFinished = finish;
470 if (parse
.srcSize
== size
)
472 // full parsed - no cross transfer
478 if (parse
.state
== MTDEC_PARSE_END
)
484 memcpy(data
+ parse
.srcSize
, parseData
+ parse
.srcSize
, size
- parse
.srcSize
); // we need all data
485 afterEndData_Size
= size
- parse
.srcSize
;
486 afterEndData
= parseData
+ parse
.srcSize
;
488 // we reduce data size to required bytes (parsed only)
489 inDataSize
-= (size
- parse
.srcSize
);
491 inDataSize_Start
= parse
.srcSize
;
496 // partial parsed - need cross transfer
498 inDataSize
= parse
.srcSize
; // it's only parsed now
501 // partial parsed - is not in initial cross block - we need to copy new data to cross block
502 Byte
*cr
= MtDec_GetCrossBuff(p
);
506 PRF(printf("\ncross alloc error error\n"));
507 // res = SZ_ERROR_MEM;
509 // p->allocError_for_Read_BlockIndex = blockIndex;
516 size_t crSize
= size
- parse
.srcSize
;
517 inDataSize
-= crSize
;
518 p
->crossEnd
= crSize
;
520 memcpy(cr
, parseData
+ parse
.srcSize
, crSize
);
524 // inDataSize_Full = inDataSize;
526 inDataSize_Start
= parse
.srcSize
; // it's partial size (parsed only)
533 if (parse
.srcSize
!= size
)
536 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res
));
555 res
= MtDec_GetError_Spec(p
, blockIndex
, &wasInterrupted
);
560 if (res
== SZ_OK
&& needCode
&& !wasInterrupted
)
562 codeRes
= p
->mtCallback
->PreCode(p
->mtCallbackObject
, t
->index
);
563 if (codeRes
!= SZ_OK
)
567 // SZ_ERROR_MEM is expected error here.
568 // if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
569 // if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
573 if (res
!= SZ_OK
|| wasInterrupted
)
577 threadingErrorSRes
= SZ_OK
;
581 if (p
->numStartedThreads
< p
->numStartedThreads_Limit
&& canCreateNewThread
)
583 SRes res2
= MtDecThread_CreateAndStart(&p
->threads
[p
->numStartedThreads
]);
586 // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
587 p
->numStartedThreads
++;
591 PRF(printf("\nERROR: numStartedThreads=%d\n", p
->numStartedThreads
));
592 if (p
->numStartedThreads
== 1)
594 // if only one thread is possible, we leave muti-threading code
597 threadingErrorSRes
= res2
;
600 p
->numStartedThreads_Limit
= p
->numStartedThreads
;
606 unsigned nextIndex
= t
->index
+ 1;
607 nextThread
= &p
->threads
[nextIndex
>= p
->numStartedThreads
? 0 : nextIndex
];
608 RINOK_THREAD(Event_Set(&nextThread
->canRead
))
609 // We have started executing for new iteration (with next thread)
610 // And that next thread now is responsible for possible exit from decoding (threading_code)
614 // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
615 // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
616 // if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
617 // - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
618 // - otherwise we stop decoding and exit from ThreadFunc2()
620 // Don't change (finish) variable in the further code
623 // ---------- CODE ----------
630 if (res
== SZ_OK
&& needCode
&& codeRes
== SZ_OK
)
632 Bool isStartBlock
= True
;
633 CMtDecBufLink
*link
= (CMtDecBufLink
*)t
->inBuf
;
641 inSize
= inDataSize_Start
;
644 UInt64 rem
= inDataSize
- inCodePos
;
645 inSize
= p
->inBufSize
;
647 inSize
= (size_t)rem
;
653 codeRes
= p
->mtCallback
->Code(p
->mtCallbackObject
, t
->index
,
654 (const Byte
*)MTDEC__DATA_PTR_FROM_LINK(link
), inSize
,
655 (inCodePos
== inDataSize
), // srcFinished
656 &inCodePos
, &outCodePos
, &stop
);
658 if (codeRes
!= SZ_OK
)
660 PRF(printf("\nCode Interrupt error = %x\n", codeRes
));
661 // we interrupt only later blocks
662 MtDec_Interrupt(p
, blockIndex
);
666 if (stop
|| inCodePos
== inDataSize
)
670 const UInt64 inDelta
= inCodePos
- inPrev
;
671 const UInt64 outDelta
= outCodePos
- outPrev
;
672 if (inDelta
>= MTDEC_ProgessStep
|| outDelta
>= MTDEC_ProgessStep
)
675 res
= MtDec_Progress_GetError_Spec(p
, inDelta
, outDelta
, blockIndex
, &wasInterrupted
);
676 if (res
!= SZ_OK
|| wasInterrupted
)
679 outPrev
= outCodePos
;
684 isStartBlock
= False
;
689 // ---------- WRITE ----------
691 RINOK_THREAD(Event_Wait(&t
->canWrite
));
694 Bool isErrorMode
= False
;
695 Bool canRecode
= True
;
696 Bool needWriteToStream
= needWrite
;
698 if (p
->exitThread
) return 0; // it's never executed in normal cases
700 if (p
->wasInterrupted
)
701 wasInterrupted
= True
;
704 if (codeRes
!= SZ_OK
) // || !needCode // check it !!!
706 p
->wasInterrupted
= True
;
707 p
->codeRes
= codeRes
;
708 if (codeRes
== SZ_ERROR_MEM
)
712 if (threadingErrorSRes
)
714 p
->wasInterrupted
= True
;
715 p
->threadingErrorSRes
= threadingErrorSRes
;
716 needWriteToStream
= False
;
720 p
->wasInterrupted
= True
;
721 p
->isAllocError
= True
;
722 needWriteToStream
= False
;
726 p
->wasInterrupted
= True
;
728 needWriteToStream
= False
;
740 const UInt64 inDelta
= inCodePos
- inPrev
;
741 const UInt64 outDelta
= outCodePos
- outPrev
;
742 // if (inDelta != 0 || outDelta != 0)
743 res
= MtProgress_ProgressAdd(&p
->mtProgress
, inDelta
, outDelta
);
747 needContinue
= (!finish
);
749 // if (res == SZ_OK && needWrite && !wasInterrupted)
752 // p->inProcessed += inCodePos;
754 res
= p
->mtCallback
->Write(p
->mtCallbackObject
, t
->index
,
755 res
== SZ_OK
&& needWriteToStream
&& !wasInterrupted
, // needWrite
756 afterEndData
, afterEndData_Size
,
760 // res= E_INVALIDARG; // for test
762 PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue
));
763 PRF(printf("\nprocessed = %d\n", (unsigned)p
->inProcessed
));
767 PRF(printf("\nWrite error = %d\n", res
));
769 p
->wasInterrupted
= True
;
772 || (!needContinue
&& !finish
))
774 PRF(printf("\nWrite Interrupt error = %x\n", res
));
775 MtDec_Interrupt(p
, blockIndex
);
785 || p
->numFilledThreads
!= 0
788 if (p
->numFilledThreads
== 0)
789 p
->filledThreadStart
= t
->index
;
790 if (inDataSize
!= 0 || !finish
)
792 t
->inDataSize_Start
= inDataSize_Start
;
793 t
->inDataSize
= inDataSize
;
794 p
->numFilledThreads
++;
796 PRF(printf("\np->numFilledThreads = %d\n", p
->numFilledThreads
));
797 PRF(printf("p->filledThreadStart = %d\n", p
->filledThreadStart
));
802 RINOK_THREAD(Event_Set(&nextThread
->canWrite
));
808 // we restore decoding with new iteration
809 RINOK_THREAD(Event_Set(&p
->threads
[0].canWrite
));
813 // we exit from decoding
816 p
->exitThread
= True
;
818 RINOK_THREAD(Event_Set(&p
->threads
[0].canRead
));
837 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
ThreadFunc1(void *pp
)
841 CMtDecThread
*t
= (CMtDecThread
*)pp
;
844 // fprintf(stdout, "\n%d = %p\n", t->index, &t);
846 res
= ThreadFunc2(t
);
849 return p
->exitThreadWRes
;
851 // it's unexpected situation for some threading function error
852 if (p
->exitThreadWRes
== 0)
853 p
->exitThreadWRes
= res
;
854 PRF(printf("\nthread exit error = %d\n", res
));
855 p
->exitThread
= True
;
856 Event_Set(&p
->threads
[0].canRead
);
857 Event_Set(&p
->threads
[0].canWrite
);
858 MtProgress_SetError(&p
->mtProgress
, MY_SRes_HRESULT_FROM_WRes(res
));
863 static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
ThreadFunc(void *pp
)
865 CMtDecThread
*t
= (CMtDecThread
*)pp
;
867 // fprintf(stderr, "\n%d = %p - before", t->index, &t);
869 t
->allocaPtr
= alloca(t
->index
* 128);
871 return ThreadFunc1(pp
);
875 int MtDec_PrepareRead(CMtDec
*p
)
877 if (p
->crossBlock
&& p
->crossStart
== p
->crossEnd
)
879 ISzAlloc_Free(p
->alloc
, p
->crossBlock
);
880 p
->crossBlock
= NULL
;
885 for (i
= 0; i
< MTDEC__THREADS_MAX
; i
++)
886 if (i
> p
->numStartedThreads
887 || p
->numFilledThreads
<=
888 (i
>= p
->filledThreadStart
?
889 i
- p
->filledThreadStart
:
890 i
+ p
->numStartedThreads
- p
->filledThreadStart
))
891 MtDecThread_FreeInBufs(&p
->threads
[i
]);
894 return (p
->numFilledThreads
!= 0) || (p
->crossStart
!= p
->crossEnd
);
898 const Byte
*MtDec_Read(CMtDec
*p
, size_t *inLim
)
900 while (p
->numFilledThreads
!= 0)
902 CMtDecThread
*t
= &p
->threads
[p
->filledThreadStart
];
907 void *link
= t
->inBuf
;
908 void *next
= ((CMtDecBufLink
*)link
)->next
;
909 ISzAlloc_Free(p
->alloc
, link
);
913 if (t
->inDataSize
== 0)
915 MtDecThread_FreeInBufs(t
);
916 if (--p
->numFilledThreads
== 0)
918 if (++p
->filledThreadStart
== p
->numStartedThreads
)
919 p
->filledThreadStart
= 0;
920 t
= &p
->threads
[p
->filledThreadStart
];
925 size_t lim
= t
->inDataSize_Start
;
927 t
->inDataSize_Start
= 0;
930 UInt64 rem
= t
->inDataSize
;
935 t
->inDataSize
-= lim
;
937 return (const Byte
*)MTDEC__DATA_PTR_FROM_LINK(t
->inBuf
);
942 size_t crossSize
= p
->crossEnd
- p
->crossStart
;
945 const Byte
*data
= MTDEC__DATA_PTR_FROM_LINK(p
->crossBlock
) + p
->crossStart
;
954 ISzAlloc_Free(p
->alloc
, p
->crossBlock
);
955 p
->crossBlock
= NULL
;
962 void MtDec_Construct(CMtDec
*p
)
966 p
->inBufSize
= (size_t)1 << 18;
968 p
->numThreadsMax
= 0;
973 // p->inDataSize = 0;
975 p
->crossBlock
= NULL
;
979 p
->numFilledThreads
= 0;
984 p
->mtCallback
= NULL
;
985 p
->mtCallbackObject
= NULL
;
987 p
->allocatedBufsSize
= 0;
989 for (i
= 0; i
< MTDEC__THREADS_MAX
; i
++)
991 CMtDecThread
*t
= &p
->threads
[i
];
995 Event_Construct(&t
->canRead
);
996 Event_Construct(&t
->canWrite
);
997 Thread_Construct(&t
->thread
);
1000 // Event_Construct(&p->finishedEvent);
1002 CriticalSection_Init(&p
->mtProgress
.cs
);
1006 static void MtDec_Free(CMtDec
*p
)
1010 p
->exitThread
= True
;
1012 for (i
= 0; i
< MTDEC__THREADS_MAX
; i
++)
1013 MtDecThread_Destruct(&p
->threads
[i
]);
1015 // Event_Close(&p->finishedEvent);
1019 ISzAlloc_Free(p
->alloc
, p
->crossBlock
);
1020 p
->crossBlock
= NULL
;
1025 void MtDec_Destruct(CMtDec
*p
)
1029 CriticalSection_Delete(&p
->mtProgress
.cs
);
1033 SRes
MtDec_Code(CMtDec
*p
)
1039 p
->blockIndex
= 1; // it must be larger than not_defined index (0)
1040 p
->isAllocError
= False
;
1041 p
->overflow
= False
;
1042 p
->threadingErrorSRes
= SZ_OK
;
1044 p
->needContinue
= True
;
1046 p
->readWasFinished
= False
;
1047 p
->needInterrupt
= False
;
1048 p
->interruptIndex
= (UInt64
)(Int64
)-1;
1050 p
->readProcessed
= 0;
1053 p
->wasInterrupted
= False
;
1058 p
->filledThreadStart
= 0;
1059 p
->numFilledThreads
= 0;
1062 unsigned numThreads
= p
->numThreadsMax
;
1063 if (numThreads
> MTDEC__THREADS_MAX
)
1064 numThreads
= MTDEC__THREADS_MAX
;
1065 p
->numStartedThreads_Limit
= numThreads
;
1066 p
->numStartedThreads
= 0;
1069 if (p
->inBufSize
!= p
->allocatedBufsSize
)
1071 for (i
= 0; i
< MTDEC__THREADS_MAX
; i
++)
1073 CMtDecThread
*t
= &p
->threads
[i
];
1075 MtDecThread_FreeInBufs(t
);
1079 ISzAlloc_Free(p
->alloc
, p
->crossBlock
);
1080 p
->crossBlock
= NULL
;
1083 p
->allocatedBufsSize
= p
->inBufSize
;
1086 MtProgress_Init(&p
->mtProgress
, p
->progress
);
1088 // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
1089 p
->exitThread
= False
;
1090 p
->exitThreadWRes
= 0;
1095 CMtDecThread
*nextThread
= &p
->threads
[p
->numStartedThreads
++];
1096 // wres = MtDecThread_CreateAndStart(nextThread);
1097 wres
= MtDecThread_CreateEvents(nextThread
);
1098 if (wres
== 0) { wres
= Event_Set(&nextThread
->canWrite
);
1099 if (wres
== 0) { wres
= Event_Set(&nextThread
->canRead
);
1100 if (wres
== 0) { wres
= ThreadFunc(nextThread
);
1103 p
->needContinue
= False
;
1104 MtDec_CloseThreads(p
);
1107 // wres = 17; // for test
1108 // wres = Event_Wait(&p->finishedEvent);
1110 sres
= MY_SRes_HRESULT_FROM_WRes(wres
);
1113 p
->threadingErrorSRes
= sres
;
1118 // || p->mtc.codeRes == SZ_ERROR_MEM
1120 || p
->threadingErrorSRes
!= SZ_OK
1123 // p->needContinue = True;
1126 p
->needContinue
= False
;
1128 if (p
->needContinue
)
1131 // if (sres != SZ_OK)