1 /* MtCoder.c -- Multi-thread Coder
2 2018-02-21 : Igor Pavlov : Public domain */
10 SRes
MtProgressThunk_Progress(const ICompressProgress
*pp
, UInt64 inSize
, UInt64 outSize
)
12 CMtProgressThunk
*thunk
= CONTAINER_FROM_VTBL(pp
, CMtProgressThunk
, vt
);
15 if (inSize
!= (UInt64
)(Int64
)-1)
17 inSize2
= inSize
- thunk
->inSize
;
18 thunk
->inSize
= inSize
;
20 if (outSize
!= (UInt64
)(Int64
)-1)
22 outSize2
= outSize
- thunk
->outSize
;
23 thunk
->outSize
= outSize
;
25 return MtProgress_ProgressAdd(thunk
->mtProgress
, inSize2
, outSize2
);
29 void MtProgressThunk_CreateVTable(CMtProgressThunk
*p
)
31 p
->vt
.Progress
= MtProgressThunk_Progress
;
36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
39 static WRes
ArEvent_OptCreate_And_Reset(CEvent
*p
)
41 if (Event_IsCreated(p
))
42 return Event_Reset(p
);
43 return AutoResetEvent_CreateNotSignaled(p
);
47 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
ThreadFunc(void *pp
);
50 static SRes
MtCoderThread_CreateAndStart(CMtCoderThread
*t
)
52 WRes wres
= ArEvent_OptCreate_And_Reset(&t
->startEvent
);
56 if (!Thread_WasCreated(&t
->thread
))
57 wres
= Thread_Create(&t
->thread
, ThreadFunc
, t
);
59 wres
= Event_Set(&t
->startEvent
);
63 return MY_SRes_HRESULT_FROM_WRes(wres
);
67 static void MtCoderThread_Destruct(CMtCoderThread
*t
)
69 if (Thread_WasCreated(&t
->thread
))
72 Event_Set(&t
->startEvent
);
73 Thread_Wait(&t
->thread
);
74 Thread_Close(&t
->thread
);
77 Event_Close(&t
->startEvent
);
81 ISzAlloc_Free(t
->mtCoder
->allocBig
, t
->inBuf
);
88 static SRes
FullRead(ISeqInStream
*stream
, Byte
*data
, size_t *processedSize
)
90 size_t size
= *processedSize
;
95 SRes res
= ISeqInStream_Read(stream
, data
, &cur
);
96 *processedSize
+= cur
;
108 ThreadFunc2() returns:
109 SZ_OK - in all normal cases (even for stream error or memory allocation error)
110 SZ_ERROR_THREAD - in case of failure in system synch function
113 static SRes
ThreadFunc2(CMtCoderThread
*t
)
115 CMtCoder
*mtc
= t
->mtCoder
;
126 UInt64 readProcessed
= 0;
128 RINOK_THREAD(Event_Wait(&mtc
->readEvent
))
130 /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
132 if (mtc
->stopReading
)
134 return Event_Set(&mtc
->readEvent
) == 0 ? SZ_OK
: SZ_ERROR_THREAD
;
137 res
= MtProgress_GetError(&mtc
->mtProgress
);
145 size
= mtc
->blockSize
;
150 t
->inBuf
= (Byte
*)ISzAlloc_Alloc(mtc
->allocBig
, mtc
->blockSize
);
156 res
= FullRead(mtc
->inStream
, t
->inBuf
, &size
);
157 readProcessed
= mtc
->readProcessed
+ size
;
158 mtc
->readProcessed
= readProcessed
;
163 /* after reading error - we can stop encoding of previous blocks */
164 MtProgress_SetError(&mtc
->mtProgress
, res
);
167 finished
= (size
!= mtc
->blockSize
);
172 readProcessed
= mtc
->readProcessed
;
173 rem
= mtc
->inDataSize
- (size_t)readProcessed
;
176 inData
= mtc
->inData
+ (size_t)readProcessed
;
177 readProcessed
+= size
;
178 mtc
->readProcessed
= readProcessed
;
179 finished
= (mtc
->inDataSize
== (size_t)readProcessed
);
183 /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
187 if (Semaphore_Wait(&mtc
->blocksSemaphore
) != 0)
189 res2
= SZ_ERROR_THREAD
;
193 // MtProgress_SetError(&mtc->mtProgress, res);
197 bi
= mtc
->blockIndex
;
199 if (++mtc
->blockIndex
>= mtc
->numBlocksMax
)
202 bufIndex
= (unsigned)(int)-1;
205 res
= MtProgress_GetError(&mtc
->mtProgress
);
212 if (mtc
->numStartedThreads
< mtc
->numStartedThreadsLimit
213 && mtc
->expectedDataSize
!= readProcessed
)
215 res
= MtCoderThread_CreateAndStart(&mtc
->threads
[mtc
->numStartedThreads
]);
217 mtc
->numStartedThreads
++;
220 MtProgress_SetError(&mtc
->mtProgress
, res
);
227 mtc
->stopReading
= True
;
229 RINOK_THREAD(Event_Set(&mtc
->readEvent
))
236 CriticalSection_Enter(&mtc
->cs
);
237 bufIndex
= mtc
->freeBlockHead
;
238 mtc
->freeBlockHead
= mtc
->freeBlockList
[bufIndex
];
239 CriticalSection_Leave(&mtc
->cs
);
241 res
= mtc
->mtCallback
->Code(mtc
->mtCallbackObject
, t
->index
, bufIndex
,
242 mtc
->inStream
? t
->inBuf
: inData
, size
, finished
);
244 // MtProgress_Reinit(&mtc->mtProgress, t->index);
247 MtProgress_SetError(&mtc
->mtProgress
, res
);
251 CMtCoderBlock
*block
= &mtc
->blocks
[bi
];
253 block
->bufIndex
= bufIndex
;
254 block
->finished
= finished
;
257 #ifdef MTCODER__USE_WRITE_THREAD
258 RINOK_THREAD(Event_Set(&mtc
->writeEvents
[bi
]))
263 CriticalSection_Enter(&mtc
->cs
);
264 wi
= mtc
->writeIndex
;
266 mtc
->writeIndex
= (unsigned)(int)-1;
268 mtc
->ReadyBlocks
[bi
] = True
;
269 CriticalSection_Leave(&mtc
->cs
);
274 if (res
!= SZ_OK
|| finished
)
279 if (mtc
->writeRes
!= SZ_OK
)
284 if (res
== SZ_OK
&& bufIndex
!= (unsigned)(int)-1)
286 res
= mtc
->mtCallback
->Write(mtc
->mtCallbackObject
, bufIndex
);
290 MtProgress_SetError(&mtc
->mtProgress
, res
);
294 if (++wi
>= mtc
->numBlocksMax
)
299 CriticalSection_Enter(&mtc
->cs
);
301 if (bufIndex
!= (unsigned)(int)-1)
303 mtc
->freeBlockList
[bufIndex
] = mtc
->freeBlockHead
;
304 mtc
->freeBlockHead
= bufIndex
;
307 isReady
= mtc
->ReadyBlocks
[wi
];
310 mtc
->ReadyBlocks
[wi
] = False
;
312 mtc
->writeIndex
= wi
;
314 CriticalSection_Leave(&mtc
->cs
);
316 RINOK_THREAD(Semaphore_Release1(&mtc
->blocksSemaphore
))
323 CMtCoderBlock
*block
= &mtc
->blocks
[wi
];
324 if (res
== SZ_OK
&& block
->res
!= SZ_OK
)
326 bufIndex
= block
->bufIndex
;
327 finished
= block
->finished
;
333 if (finished
|| res
!= SZ_OK
)
339 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE
ThreadFunc(void *pp
)
341 CMtCoderThread
*t
= (CMtCoderThread
*)pp
;
344 if (Event_Wait(&t
->startEvent
) != 0)
345 return SZ_ERROR_THREAD
;
349 SRes res
= ThreadFunc2(t
);
350 CMtCoder
*mtc
= t
->mtCoder
;
353 MtProgress_SetError(&mtc
->mtProgress
, res
);
356 #ifndef MTCODER__USE_WRITE_THREAD
358 unsigned numFinished
= (unsigned)InterlockedIncrement(&mtc
->numFinishedThreads
);
359 if (numFinished
== mtc
->numStartedThreads
)
360 if (Event_Set(&mtc
->finishedEvent
) != 0)
361 return SZ_ERROR_THREAD
;
370 void MtCoder_Construct(CMtCoder
*p
)
375 p
->numThreadsMax
= 0;
376 p
->expectedDataSize
= (UInt64
)(Int64
)-1;
385 p
->mtCallback
= NULL
;
386 p
->mtCallbackObject
= NULL
;
388 p
->allocatedBufsSize
= 0;
390 Event_Construct(&p
->readEvent
);
391 Semaphore_Construct(&p
->blocksSemaphore
);
393 for (i
= 0; i
< MTCODER__THREADS_MAX
; i
++)
395 CMtCoderThread
*t
= &p
->threads
[i
];
400 Event_Construct(&t
->startEvent
);
401 Thread_Construct(&t
->thread
);
404 #ifdef MTCODER__USE_WRITE_THREAD
405 for (i
= 0; i
< MTCODER__BLOCKS_MAX
; i
++)
406 Event_Construct(&p
->writeEvents
[i
]);
408 Event_Construct(&p
->finishedEvent
);
411 CriticalSection_Init(&p
->cs
);
412 CriticalSection_Init(&p
->mtProgress
.cs
);
418 static void MtCoder_Free(CMtCoder
*p
)
423 p->stopReading = True;
424 if (Event_IsCreated(&p->readEvent))
425 Event_Set(&p->readEvent);
428 for (i
= 0; i
< MTCODER__THREADS_MAX
; i
++)
429 MtCoderThread_Destruct(&p
->threads
[i
]);
431 Event_Close(&p
->readEvent
);
432 Semaphore_Close(&p
->blocksSemaphore
);
434 #ifdef MTCODER__USE_WRITE_THREAD
435 for (i
= 0; i
< MTCODER__BLOCKS_MAX
; i
++)
436 Event_Close(&p
->writeEvents
[i
]);
438 Event_Close(&p
->finishedEvent
);
443 void MtCoder_Destruct(CMtCoder
*p
)
447 CriticalSection_Delete(&p
->cs
);
448 CriticalSection_Delete(&p
->mtProgress
.cs
);
452 SRes
MtCoder_Code(CMtCoder
*p
)
454 unsigned numThreads
= p
->numThreadsMax
;
455 unsigned numBlocksMax
;
459 if (numThreads
> MTCODER__THREADS_MAX
)
460 numThreads
= MTCODER__THREADS_MAX
;
461 numBlocksMax
= MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads
);
463 if (p
->blockSize
< ((UInt32
)1 << 26)) numBlocksMax
++;
464 if (p
->blockSize
< ((UInt32
)1 << 24)) numBlocksMax
++;
465 if (p
->blockSize
< ((UInt32
)1 << 22)) numBlocksMax
++;
467 if (numBlocksMax
> MTCODER__BLOCKS_MAX
)
468 numBlocksMax
= MTCODER__BLOCKS_MAX
;
470 if (p
->blockSize
!= p
->allocatedBufsSize
)
472 for (i
= 0; i
< MTCODER__THREADS_MAX
; i
++)
474 CMtCoderThread
*t
= &p
->threads
[i
];
477 ISzAlloc_Free(p
->allocBig
, t
->inBuf
);
481 p
->allocatedBufsSize
= p
->blockSize
;
486 MtProgress_Init(&p
->mtProgress
, p
->progress
);
488 #ifdef MTCODER__USE_WRITE_THREAD
489 for (i
= 0; i
< numBlocksMax
; i
++)
491 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p
->writeEvents
[i
]));
494 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p
->finishedEvent
));
498 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p
->readEvent
));
500 if (Semaphore_IsCreated(&p
->blocksSemaphore
))
502 RINOK_THREAD(Semaphore_Close(&p
->blocksSemaphore
));
504 RINOK_THREAD(Semaphore_Create(&p
->blocksSemaphore
, numBlocksMax
, numBlocksMax
));
507 for (i
= 0; i
< MTCODER__BLOCKS_MAX
- 1; i
++)
508 p
->freeBlockList
[i
] = i
+ 1;
509 p
->freeBlockList
[MTCODER__BLOCKS_MAX
- 1] = (unsigned)(int)-1;
510 p
->freeBlockHead
= 0;
512 p
->readProcessed
= 0;
514 p
->numBlocksMax
= numBlocksMax
;
515 p
->stopReading
= False
;
517 #ifndef MTCODER__USE_WRITE_THREAD
520 for (i
= 0; i
< MTCODER__BLOCKS_MAX
; i
++)
521 p
->ReadyBlocks
[i
] = False
;
522 p
->numFinishedThreads
= 0;
525 p
->numStartedThreadsLimit
= numThreads
;
526 p
->numStartedThreads
= 0;
528 // for (i = 0; i < numThreads; i++)
530 CMtCoderThread
*nextThread
= &p
->threads
[p
->numStartedThreads
++];
531 RINOK(MtCoderThread_CreateAndStart(nextThread
));
534 RINOK_THREAD(Event_Set(&p
->readEvent
))
536 #ifdef MTCODER__USE_WRITE_THREAD
542 if (bi
>= numBlocksMax
)
545 RINOK_THREAD(Event_Wait(&p
->writeEvents
[bi
]))
548 const CMtCoderBlock
*block
= &p
->blocks
[bi
];
549 unsigned bufIndex
= block
->bufIndex
;
550 Bool finished
= block
->finished
;
551 if (res
== SZ_OK
&& block
->res
!= SZ_OK
)
554 if (bufIndex
!= (unsigned)(int)-1)
558 res
= p
->mtCallback
->Write(p
->mtCallbackObject
, bufIndex
);
560 MtProgress_SetError(&p
->mtProgress
, res
);
563 CriticalSection_Enter(&p
->cs
);
565 p
->freeBlockList
[bufIndex
] = p
->freeBlockHead
;
566 p
->freeBlockHead
= bufIndex
;
568 CriticalSection_Leave(&p
->cs
);
571 RINOK_THREAD(Semaphore_Release1(&p
->blocksSemaphore
))
580 WRes wres
= Event_Wait(&p
->finishedEvent
);
581 res
= MY_SRes_HRESULT_FROM_WRes(wres
);
589 res
= p
->mtProgress
.res
;
591 #ifndef MTCODER__USE_WRITE_THREAD