Bug 1943761 - Add class alignment to the mozsearch analysis file. r=asuth
[gecko.git] / other-licenses / 7zstub / src / C / MtCoder.c
blobddc7c0285808931078b3aa7647159fcec9416dd2
1 /* MtCoder.c -- Multi-thread Coder
2 2018-02-21 : Igor Pavlov : Public domain */
4 #include "Precomp.h"
6 #include "MtCoder.h"
8 #ifndef _7ZIP_ST
10 SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
12 CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
13 UInt64 inSize2 = 0;
14 UInt64 outSize2 = 0;
15 if (inSize != (UInt64)(Int64)-1)
17 inSize2 = inSize - thunk->inSize;
18 thunk->inSize = inSize;
20 if (outSize != (UInt64)(Int64)-1)
22 outSize2 = outSize - thunk->outSize;
23 thunk->outSize = outSize;
25 return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
31 p->vt.Progress = MtProgressThunk_Progress;
36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
39 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
41 if (Event_IsCreated(p))
42 return Event_Reset(p);
43 return AutoResetEvent_CreateNotSignaled(p);
47 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
50 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
52 WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
53 if (wres == 0)
55 t->stop = False;
56 if (!Thread_WasCreated(&t->thread))
57 wres = Thread_Create(&t->thread, ThreadFunc, t);
58 if (wres == 0)
59 wres = Event_Set(&t->startEvent);
61 if (wres == 0)
62 return SZ_OK;
63 return MY_SRes_HRESULT_FROM_WRes(wres);
67 static void MtCoderThread_Destruct(CMtCoderThread *t)
69 if (Thread_WasCreated(&t->thread))
71 t->stop = 1;
72 Event_Set(&t->startEvent);
73 Thread_Wait(&t->thread);
74 Thread_Close(&t->thread);
77 Event_Close(&t->startEvent);
79 if (t->inBuf)
81 ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
82 t->inBuf = NULL;
88 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
90 size_t size = *processedSize;
91 *processedSize = 0;
92 while (size != 0)
94 size_t cur = size;
95 SRes res = ISeqInStream_Read(stream, data, &cur);
96 *processedSize += cur;
97 data += cur;
98 size -= cur;
99 RINOK(res);
100 if (cur == 0)
101 return SZ_OK;
103 return SZ_OK;
108 ThreadFunc2() returns:
109 SZ_OK - in all normal cases (even for stream error or memory allocation error)
110 SZ_ERROR_THREAD - in case of failure in system synch function
113 static SRes ThreadFunc2(CMtCoderThread *t)
115 CMtCoder *mtc = t->mtCoder;
117 for (;;)
119 unsigned bi;
120 SRes res;
121 SRes res2;
122 Bool finished;
123 unsigned bufIndex;
124 size_t size;
125 const Byte *inData;
126 UInt64 readProcessed = 0;
128 RINOK_THREAD(Event_Wait(&mtc->readEvent))
130 /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
132 if (mtc->stopReading)
134 return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
137 res = MtProgress_GetError(&mtc->mtProgress);
139 size = 0;
140 inData = NULL;
141 finished = True;
143 if (res == SZ_OK)
145 size = mtc->blockSize;
146 if (mtc->inStream)
148 if (!t->inBuf)
150 t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
151 if (!t->inBuf)
152 res = SZ_ERROR_MEM;
154 if (res == SZ_OK)
156 res = FullRead(mtc->inStream, t->inBuf, &size);
157 readProcessed = mtc->readProcessed + size;
158 mtc->readProcessed = readProcessed;
160 if (res != SZ_OK)
162 mtc->readRes = res;
163 /* after reading error - we can stop encoding of previous blocks */
164 MtProgress_SetError(&mtc->mtProgress, res);
166 else
167 finished = (size != mtc->blockSize);
169 else
171 size_t rem;
172 readProcessed = mtc->readProcessed;
173 rem = mtc->inDataSize - (size_t)readProcessed;
174 if (size > rem)
175 size = rem;
176 inData = mtc->inData + (size_t)readProcessed;
177 readProcessed += size;
178 mtc->readProcessed = readProcessed;
179 finished = (mtc->inDataSize == (size_t)readProcessed);
183 /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
185 res2 = SZ_OK;
187 if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
189 res2 = SZ_ERROR_THREAD;
190 if (res == SZ_OK)
192 res = res2;
193 // MtProgress_SetError(&mtc->mtProgress, res);
197 bi = mtc->blockIndex;
199 if (++mtc->blockIndex >= mtc->numBlocksMax)
200 mtc->blockIndex = 0;
202 bufIndex = (unsigned)(int)-1;
204 if (res == SZ_OK)
205 res = MtProgress_GetError(&mtc->mtProgress);
207 if (res != SZ_OK)
208 finished = True;
210 if (!finished)
212 if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
213 && mtc->expectedDataSize != readProcessed)
215 res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
216 if (res == SZ_OK)
217 mtc->numStartedThreads++;
218 else
220 MtProgress_SetError(&mtc->mtProgress, res);
221 finished = True;
226 if (finished)
227 mtc->stopReading = True;
229 RINOK_THREAD(Event_Set(&mtc->readEvent))
231 if (res2 != SZ_OK)
232 return res2;
234 if (res == SZ_OK)
236 CriticalSection_Enter(&mtc->cs);
237 bufIndex = mtc->freeBlockHead;
238 mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
239 CriticalSection_Leave(&mtc->cs);
241 res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
242 mtc->inStream ? t->inBuf : inData, size, finished);
244 // MtProgress_Reinit(&mtc->mtProgress, t->index);
246 if (res != SZ_OK)
247 MtProgress_SetError(&mtc->mtProgress, res);
251 CMtCoderBlock *block = &mtc->blocks[bi];
252 block->res = res;
253 block->bufIndex = bufIndex;
254 block->finished = finished;
257 #ifdef MTCODER__USE_WRITE_THREAD
258 RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
259 #else
261 unsigned wi;
263 CriticalSection_Enter(&mtc->cs);
264 wi = mtc->writeIndex;
265 if (wi == bi)
266 mtc->writeIndex = (unsigned)(int)-1;
267 else
268 mtc->ReadyBlocks[bi] = True;
269 CriticalSection_Leave(&mtc->cs);
272 if (wi != bi)
274 if (res != SZ_OK || finished)
275 return 0;
276 continue;
279 if (mtc->writeRes != SZ_OK)
280 res = mtc->writeRes;
282 for (;;)
284 if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
286 res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
287 if (res != SZ_OK)
289 mtc->writeRes = res;
290 MtProgress_SetError(&mtc->mtProgress, res);
294 if (++wi >= mtc->numBlocksMax)
295 wi = 0;
297 Bool isReady;
299 CriticalSection_Enter(&mtc->cs);
301 if (bufIndex != (unsigned)(int)-1)
303 mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
304 mtc->freeBlockHead = bufIndex;
307 isReady = mtc->ReadyBlocks[wi];
309 if (isReady)
310 mtc->ReadyBlocks[wi] = False;
311 else
312 mtc->writeIndex = wi;
314 CriticalSection_Leave(&mtc->cs);
316 RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
318 if (!isReady)
319 break;
323 CMtCoderBlock *block = &mtc->blocks[wi];
324 if (res == SZ_OK && block->res != SZ_OK)
325 res = block->res;
326 bufIndex = block->bufIndex;
327 finished = block->finished;
331 #endif
333 if (finished || res != SZ_OK)
334 return 0;
339 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
341 CMtCoderThread *t = (CMtCoderThread *)pp;
342 for (;;)
344 if (Event_Wait(&t->startEvent) != 0)
345 return SZ_ERROR_THREAD;
346 if (t->stop)
347 return 0;
349 SRes res = ThreadFunc2(t);
350 CMtCoder *mtc = t->mtCoder;
351 if (res != SZ_OK)
353 MtProgress_SetError(&mtc->mtProgress, res);
356 #ifndef MTCODER__USE_WRITE_THREAD
358 unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
359 if (numFinished == mtc->numStartedThreads)
360 if (Event_Set(&mtc->finishedEvent) != 0)
361 return SZ_ERROR_THREAD;
363 #endif
370 void MtCoder_Construct(CMtCoder *p)
372 unsigned i;
374 p->blockSize = 0;
375 p->numThreadsMax = 0;
376 p->expectedDataSize = (UInt64)(Int64)-1;
378 p->inStream = NULL;
379 p->inData = NULL;
380 p->inDataSize = 0;
382 p->progress = NULL;
383 p->allocBig = NULL;
385 p->mtCallback = NULL;
386 p->mtCallbackObject = NULL;
388 p->allocatedBufsSize = 0;
390 Event_Construct(&p->readEvent);
391 Semaphore_Construct(&p->blocksSemaphore);
393 for (i = 0; i < MTCODER__THREADS_MAX; i++)
395 CMtCoderThread *t = &p->threads[i];
396 t->mtCoder = p;
397 t->index = i;
398 t->inBuf = NULL;
399 t->stop = False;
400 Event_Construct(&t->startEvent);
401 Thread_Construct(&t->thread);
404 #ifdef MTCODER__USE_WRITE_THREAD
405 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
406 Event_Construct(&p->writeEvents[i]);
407 #else
408 Event_Construct(&p->finishedEvent);
409 #endif
411 CriticalSection_Init(&p->cs);
412 CriticalSection_Init(&p->mtProgress.cs);
418 static void MtCoder_Free(CMtCoder *p)
420 unsigned i;
423 p->stopReading = True;
424 if (Event_IsCreated(&p->readEvent))
425 Event_Set(&p->readEvent);
428 for (i = 0; i < MTCODER__THREADS_MAX; i++)
429 MtCoderThread_Destruct(&p->threads[i]);
431 Event_Close(&p->readEvent);
432 Semaphore_Close(&p->blocksSemaphore);
434 #ifdef MTCODER__USE_WRITE_THREAD
435 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
436 Event_Close(&p->writeEvents[i]);
437 #else
438 Event_Close(&p->finishedEvent);
439 #endif
443 void MtCoder_Destruct(CMtCoder *p)
445 MtCoder_Free(p);
447 CriticalSection_Delete(&p->cs);
448 CriticalSection_Delete(&p->mtProgress.cs);
452 SRes MtCoder_Code(CMtCoder *p)
454 unsigned numThreads = p->numThreadsMax;
455 unsigned numBlocksMax;
456 unsigned i;
457 SRes res = SZ_OK;
459 if (numThreads > MTCODER__THREADS_MAX)
460 numThreads = MTCODER__THREADS_MAX;
461 numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
463 if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
464 if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
465 if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
467 if (numBlocksMax > MTCODER__BLOCKS_MAX)
468 numBlocksMax = MTCODER__BLOCKS_MAX;
470 if (p->blockSize != p->allocatedBufsSize)
472 for (i = 0; i < MTCODER__THREADS_MAX; i++)
474 CMtCoderThread *t = &p->threads[i];
475 if (t->inBuf)
477 ISzAlloc_Free(p->allocBig, t->inBuf);
478 t->inBuf = NULL;
481 p->allocatedBufsSize = p->blockSize;
484 p->readRes = SZ_OK;
486 MtProgress_Init(&p->mtProgress, p->progress);
488 #ifdef MTCODER__USE_WRITE_THREAD
489 for (i = 0; i < numBlocksMax; i++)
491 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
493 #else
494 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
495 #endif
498 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));
500 if (Semaphore_IsCreated(&p->blocksSemaphore))
502 RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore));
504 RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
507 for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
508 p->freeBlockList[i] = i + 1;
509 p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
510 p->freeBlockHead = 0;
512 p->readProcessed = 0;
513 p->blockIndex = 0;
514 p->numBlocksMax = numBlocksMax;
515 p->stopReading = False;
517 #ifndef MTCODER__USE_WRITE_THREAD
518 p->writeIndex = 0;
519 p->writeRes = SZ_OK;
520 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
521 p->ReadyBlocks[i] = False;
522 p->numFinishedThreads = 0;
523 #endif
525 p->numStartedThreadsLimit = numThreads;
526 p->numStartedThreads = 0;
528 // for (i = 0; i < numThreads; i++)
530 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
531 RINOK(MtCoderThread_CreateAndStart(nextThread));
534 RINOK_THREAD(Event_Set(&p->readEvent))
536 #ifdef MTCODER__USE_WRITE_THREAD
538 unsigned bi = 0;
540 for (;; bi++)
542 if (bi >= numBlocksMax)
543 bi = 0;
545 RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
548 const CMtCoderBlock *block = &p->blocks[bi];
549 unsigned bufIndex = block->bufIndex;
550 Bool finished = block->finished;
551 if (res == SZ_OK && block->res != SZ_OK)
552 res = block->res;
554 if (bufIndex != (unsigned)(int)-1)
556 if (res == SZ_OK)
558 res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
559 if (res != SZ_OK)
560 MtProgress_SetError(&p->mtProgress, res);
563 CriticalSection_Enter(&p->cs);
565 p->freeBlockList[bufIndex] = p->freeBlockHead;
566 p->freeBlockHead = bufIndex;
568 CriticalSection_Leave(&p->cs);
571 RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
573 if (finished)
574 break;
578 #else
580 WRes wres = Event_Wait(&p->finishedEvent);
581 res = MY_SRes_HRESULT_FROM_WRes(wres);
583 #endif
585 if (res == SZ_OK)
586 res = p->readRes;
588 if (res == SZ_OK)
589 res = p->mtProgress.res;
591 #ifndef MTCODER__USE_WRITE_THREAD
592 if (res == SZ_OK)
593 res = p->writeRes;
594 #endif
596 if (res != SZ_OK)
597 MtCoder_Free(p);
598 return res;
601 #endif