Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/other-licenses/7zstub/src/C/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 29 kB image not shown  

Quelle  MtDec.c   Sprache: C

 
/* MtDec.c -- Multi-thread Decoder
2018-03-02 : Igor Pavlov : Public domain */


#include "Precomp.h"

// #define SHOW_DEBUG_INFO

// #include <stdio.h>

#ifdef SHOW_DEBUG_INFO
#include <stdio.h>
#endif

#ifdef SHOW_DEBUG_INFO
#define PRF(x) x
#else
#define PRF(x)
#endif

#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))

#include "MtDec.h"

#ifndef _7ZIP_ST

void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
{
  p->progress = progress;
  p->res = SZ_OK;
  p->totalInSize = 0;
  p->totalOutSize = 0;
}


SRes MtProgress_Progress_ST(CMtProgress *p)
{
  if (p->res == SZ_OK && p->progress)
    if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
      p->res = SZ_ERROR_PROGRESS;
  return p->res;
}


SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
{
  SRes res;
  CriticalSection_Enter(&p->cs);
  
  p->totalInSize += inSize;
  p->totalOutSize += outSize;
  if (p->res == SZ_OK && p->progress)
    if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
      p->res = SZ_ERROR_PROGRESS;
  res = p->res;
  
  CriticalSection_Leave(&p->cs);
  return res;
}


SRes MtProgress_GetError(CMtProgress *p)
{
  SRes res;
  CriticalSection_Enter(&p->cs);
  res = p->res;
  CriticalSection_Leave(&p->cs);
  return res;
}


void MtProgress_SetError(CMtProgress *p, SRes res)
{
  CriticalSection_Enter(&p->cs);
  if (p->res == SZ_OK)
    p->res = res;
  CriticalSection_Leave(&p->cs);
}


#define RINOK_THREAD(x) RINOK(x)


static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
{
  if (Event_IsCreated(p))
    return Event_Reset(p);
  return AutoResetEvent_CreateNotSignaled(p);
}



typedef struct
{
  void *next;
  void *pad[3];
} CMtDecBufLink;

#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)



static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);


static WRes MtDecThread_CreateEvents(CMtDecThread *t)
{
  WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
  if (wres == 0)
  {
    wres = ArEvent_OptCreate_And_Reset(&t->canRead);
    if (wres == 0)
      return SZ_OK;
  }
  return wres;
}


static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
{
  WRes wres = MtDecThread_CreateEvents(t);
  // wres = 17; // for test
  if (wres == 0)
  {
    if (Thread_WasCreated(&t->thread))
      return SZ_OK;
    wres = Thread_Create(&t->thread, ThreadFunc, t);
    if (wres == 0)
      return SZ_OK;
  }
  return MY_SRes_HRESULT_FROM_WRes(wres);
}


void MtDecThread_FreeInBufs(CMtDecThread *t)
{
  if (t->inBuf)
  {
    void *link = t->inBuf;
    t->inBuf = NULL;
    do
    {
      void *next = ((CMtDecBufLink *)link)->next;
      ISzAlloc_Free(t->mtDec->alloc, link);
      link = next;
    }
    while (link);
  }
}


static void MtDecThread_CloseThread(CMtDecThread *t)
{
  if (Thread_WasCreated(&t->thread))
  {
    Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
    Event_Set(&t->canRead);
    Thread_Wait(&t->thread);
    Thread_Close(&t->thread);
  }

  Event_Close(&t->canRead);
  Event_Close(&t->canWrite);
}

static void MtDec_CloseThreads(CMtDec *p)
{
  unsigned i;
  for (i = 0; i < MTDEC__THREADS_MAX; i++)
    MtDecThread_CloseThread(&p->threads[i]);
}

static void MtDecThread_Destruct(CMtDecThread *t)
{
  MtDecThread_CloseThread(t);
  MtDecThread_FreeInBufs(t);
}



static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
{
  size_t size = *processedSize;
  *processedSize = 0;
  while (size != 0)
  {
    size_t cur = size;
    SRes res = ISeqInStream_Read(stream, data, &cur);
    *processedSize += cur;
    data += cur;
    size -= cur;
    RINOK(res);
    if (cur == 0)
      return SZ_OK;
  }
  return SZ_OK;
}


static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, Bool *wasInterrupted)
{
  SRes res;
  CriticalSection_Enter(&p->mtProgress.cs);
  *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
  res = p->mtProgress.res;
  CriticalSection_Leave(&p->mtProgress.cs);
  return res;
}

static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, Bool *wasInterrupted)
{
  SRes res;
  CriticalSection_Enter(&p->mtProgress.cs);

  p->mtProgress.totalInSize += inSize;
  p->mtProgress.totalOutSize += outSize;
  if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
    if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
      p->mtProgress.res = SZ_ERROR_PROGRESS;

  *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
  res = p->mtProgress.res;
  
  CriticalSection_Leave(&p->mtProgress.cs);

  return res;
}

static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
{
  CriticalSection_Enter(&p->mtProgress.cs);
  if (!p->needInterrupt || interruptIndex < p->interruptIndex)
  {
    p->interruptIndex = interruptIndex;
    p->needInterrupt = True;
  }
  CriticalSection_Leave(&p->mtProgress.cs);
}

Byte *MtDec_GetCrossBuff(CMtDec *p)
{
  Byte *cr = p->crossBlock;
  if (!cr)
  {
    cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
    if (!cr)
      return NULL;
    p->crossBlock = cr;
  }
  return MTDEC__DATA_PTR_FROM_LINK(cr);
}


/*
  ThreadFunc2() returns:
  0      - in all normal cases (even for stream error or memory allocation error)
  (!= 0) - WRes error return by system threading function
*/


// #define MTDEC_ProgessStep (1 << 22)
#define MTDEC_ProgessStep (1 << 0)

static WRes ThreadFunc2(CMtDecThread *t)
{
  CMtDec *p = t->mtDec;

  PRF_STR_INT("ThreadFunc2", t->index);

  // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);

  for (;;)
  {
    SRes res, codeRes;
    Bool wasInterrupted, isAllocError, overflow, finish;
    SRes threadingErrorSRes;
    Bool needCode, needWrite, needContinue;
    
    size_t inDataSize_Start;
    UInt64 inDataSize;
    // UInt64 inDataSize_Full;
    
    UInt64 blockIndex;

    UInt64 inPrev = 0;
    UInt64 outPrev = 0;
    UInt64 inCodePos;
    UInt64 outCodePos;
    
    Byte *afterEndData = NULL;
    size_t afterEndData_Size = 0;

    Bool canCreateNewThread = False;
    // CMtDecCallbackInfo parse;
    CMtDecThread *nextThread;

    PRF_STR_INT("Event_Wait(&t->canRead)", t->index);

    RINOK_THREAD(Event_Wait(&t->canRead));
    if (p->exitThread)
      return 0;

    PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);

    // if (t->index == 3) return 19; // for test

    blockIndex = p->blockIndex++;

    // PRF(printf("\ncanRead\n"))

    res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);

    finish = p->readWasFinished;
    needCode = False;
    needWrite = False;
    isAllocError = False;
    overflow = False;

    inDataSize_Start = 0;
    inDataSize = 0;
    // inDataSize_Full = 0;

    if (res == SZ_OK && !wasInterrupted)
    {
      // if (p->inStream)
      {
        CMtDecBufLink *prev = NULL;
        CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
        size_t crossSize = p->crossEnd - p->crossStart;

        PRF(printf("\ncrossSize = %d\n", crossSize));

        for (;;)
        {
          if (!link)
          {
            link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
            if (!link)
            {
              finish = True;
              // p->allocError_for_Read_BlockIndex = blockIndex;
              isAllocError = True;
              break;
            }
            link->next = NULL;
            if (prev)
            {
              // static unsigned g_num = 0;
              // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
              prev->next = link;
            }
            else
              t->inBuf = (void *)link;
          }

          {
            Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
            Byte *parseData = data;
            size_t size;

            if (crossSize != 0)
            {
              inDataSize = crossSize;
              // inDataSize_Full = inDataSize;
              inDataSize_Start = crossSize;
              size = crossSize;
              parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
              PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",
                  (int)p->crossStart, (int)p->crossEnd, (int)finish));
            }
            else
            {
              size = p->inBufSize;
              
              res = FullRead(p->inStream, data, &size);
              
              // size = 10; // test

              inDataSize += size;
              // inDataSize_Full = inDataSize;
              if (!prev)
                inDataSize_Start = size;

              p->readProcessed += size;
              finish = (size != p->inBufSize);
              if (finish)
                p->readWasFinished = True;
              
              // res = E_INVALIDARG; // test

              if (res != SZ_OK)
              {
                // PRF(printf("\nRead error = %d\n", res))
                // we want to decode all data before error
                p->readRes = res;
                // p->readError_BlockIndex = blockIndex;
                p->readWasFinished = True;
                finish = True;
                res = SZ_OK;
                // break;
              }

              if (inDataSize - inPrev >= MTDEC_ProgessStep)
              {
                res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
                if (res != SZ_OK || wasInterrupted)
                  break;
                inPrev = inDataSize;
              }
            }

            {
              CMtDecCallbackInfo parse;

              parse.startCall = (prev == NULL);
              parse.src = parseData;
              parse.srcSize = size;
              parse.srcFinished = finish;
              parse.canCreateNewThread = True;

              // PRF(printf("\nParse size = %d\n", (unsigned)size))

              p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);

              needWrite = True;
              canCreateNewThread = parse.canCreateNewThread;

              // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
              
              if (
                  // parseRes != SZ_OK ||
                  // inDataSize - (size - parse.srcSize) > p->inBlockMax
                  // ||
                  parse.state == MTDEC_PARSE_OVERFLOW
                  // || wasInterrupted
                  )
              {
                // Overflow or Parse error - switch from MT decoding to ST decoding
                finish = True;
                overflow = True;

                {
                  PRF(printf("\n Overflow"));
                  // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
                  PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
                }
                
                if (crossSize != 0)
                  memcpy(data, parseData, size);
                p->crossStart = 0;
                p->crossEnd = 0;
                break;
              }

              if (crossSize != 0)
              {
                memcpy(data, parseData, parse.srcSize);
                p->crossStart += parse.srcSize;
              }

              if (parse.state != MTDEC_PARSE_CONTINUE || finish)
              {
                // we don't need to parse in current thread anymore

                if (parse.state == MTDEC_PARSE_END)
                  finish = True;

                needCode = True;
                // p->crossFinished = finish;

                if (parse.srcSize == size)
                {
                  // full parsed - no cross transfer
                  p->crossStart = 0;
                  p->crossEnd = 0;
                  break;
                }

                if (parse.state == MTDEC_PARSE_END)
                {
                  p->crossStart = 0;
                  p->crossEnd = 0;

                  if (crossSize != 0)
                    memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
                  afterEndData_Size = size - parse.srcSize;
                  afterEndData = parseData + parse.srcSize;

                  // we reduce data size to required bytes (parsed only)
                  inDataSize -= (size - parse.srcSize);
                  if (!prev)
                    inDataSize_Start = parse.srcSize;
                  break;
                }

                {
                  // partial parsed - need cross transfer
                  if (crossSize != 0)
                    inDataSize = parse.srcSize; // it's only parsed now
                  else
                  {
                    // partial parsed - is not in initial cross block - we need to copy new data to cross block
                    Byte *cr = MtDec_GetCrossBuff(p);
                    if (!cr)
                    {
                      {
                        PRF(printf("\ncross alloc error error\n"));
                        // res = SZ_ERROR_MEM;
                        finish = True;
                        // p->allocError_for_Read_BlockIndex = blockIndex;
                        isAllocError = True;
                        break;
                      }
                    }

                    {
                      size_t crSize = size - parse.srcSize;
                      inDataSize -= crSize;
                      p->crossEnd = crSize;
                      p->crossStart = 0;
                      memcpy(cr, parseData + parse.srcSize, crSize);
                    }
                  }

                  // inDataSize_Full = inDataSize;
                  if (!prev)
                    inDataSize_Start = parse.srcSize; // it's partial size (parsed only)

                  finish = False;
                  break;
                }
              }

              if (parse.srcSize != size)
              {
                res = SZ_ERROR_FAIL;
                PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
                break;
              }
            }
          }
          
          prev = link;
          link = link->next;

          if (crossSize != 0)
          {
            crossSize = 0;
            p->crossStart = 0;
            p->crossEnd = 0;
          }
        }
      }

      if (res == SZ_OK)
        res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
    }

    codeRes = SZ_OK;

    if (res == SZ_OK && needCode && !wasInterrupted)
    {
      codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
      if (codeRes != SZ_OK)
      {
        needCode = False;
        finish = True;
        // SZ_ERROR_MEM is expected error here.
        //   if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
        //   if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
      }
    }
    
    if (res != SZ_OK || wasInterrupted)
      finish = True;
    
    nextThread = NULL;
    threadingErrorSRes = SZ_OK;

    if (!finish)
    {
      if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
      {
        SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
        if (res2 == SZ_OK)
        {
          // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
          p->numStartedThreads++;
        }
        else
        {
          PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
          if (p->numStartedThreads == 1)
          {
            // if only one thread is possible, we leave muti-threading code
            finish = True;
            needCode = False;
            threadingErrorSRes = res2;
          }
          else
            p->numStartedThreads_Limit = p->numStartedThreads;
        }
      }
      
      if (!finish)
      {
        unsigned nextIndex = t->index + 1;
        nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
        RINOK_THREAD(Event_Set(&nextThread->canRead))
        // We have started executing for new iteration (with next thread)
        // And that next thread now is responsible for possible exit from decoding (threading_code)
      }
    }

    // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
    // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
    // if (  finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
    //   - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
    //   - otherwise we stop decoding and exit from ThreadFunc2()

    // Don't change (finish) variable in the further code


    // ---------- CODE ----------

    inPrev = 0;
    outPrev = 0;
    inCodePos = 0;
    outCodePos = 0;

    if (res == SZ_OK && needCode && codeRes == SZ_OK)
    {
      Bool isStartBlock = True;
      CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;

      for (;;)
      {
        size_t inSize;
        int stop;

        if (isStartBlock)
          inSize = inDataSize_Start;
        else
        {
          UInt64 rem = inDataSize - inCodePos;
          inSize = p->inBufSize;
          if (inSize > rem)
            inSize = (size_t)rem;
        }

        inCodePos += inSize;
        stop = True;

        codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
            (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
            (inCodePos == inDataSize), // srcFinished
            &inCodePos, &outCodePos, &stop);
        
        if (codeRes != SZ_OK)
        {
          PRF(printf("\nCode Interrupt error = %x\n", codeRes));
          // we interrupt only later blocks
          MtDec_Interrupt(p, blockIndex);
          break;
        }

        if (stop || inCodePos == inDataSize)
          break;
  
        {
          const UInt64 inDelta = inCodePos - inPrev;
          const UInt64 outDelta = outCodePos - outPrev;
          if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
          {
            // Sleep(1);
            res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
            if (res != SZ_OK || wasInterrupted)
              break;
            inPrev = inCodePos;
            outPrev = outCodePos;
          }
        }

        link = link->next;
        isStartBlock = False;
      }
    }


    // ---------- WRITE ----------
   
    RINOK_THREAD(Event_Wait(&t->canWrite));

  {
    Bool isErrorMode = False;
    Bool canRecode = True;
    Bool needWriteToStream = needWrite;

    if (p->exitThread) return 0; // it's never executed in normal cases

    if (p->wasInterrupted)
      wasInterrupted = True;
    else
    {
      if (codeRes != SZ_OK) // || !needCode // check it !!!
      {
        p->wasInterrupted = True;
        p->codeRes = codeRes;
        if (codeRes == SZ_ERROR_MEM)
          isAllocError = True;
      }
      
      if (threadingErrorSRes)
      {
        p->wasInterrupted = True;
        p->threadingErrorSRes = threadingErrorSRes;
        needWriteToStream = False;
      }
      if (isAllocError)
      {
        p->wasInterrupted = True;
        p->isAllocError = True;
        needWriteToStream = False;
      }
      if (overflow)
      {
        p->wasInterrupted = True;
        p->overflow = True;
        needWriteToStream = False;
      }
    }

    if (needCode)
    {
      if (wasInterrupted)
      {
        inCodePos = 0;
        outCodePos = 0;
      }
      {
        const UInt64 inDelta = inCodePos - inPrev;
        const UInt64 outDelta = outCodePos - outPrev;
        // if (inDelta != 0 || outDelta != 0)
        res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
      }
    }

    needContinue = (!finish);

    // if (res == SZ_OK && needWrite && !wasInterrupted)
    if (needWrite)
    {
      // p->inProcessed += inCodePos;

      res = p->mtCallback->Write(p->mtCallbackObject, t->index,
          res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
          afterEndData, afterEndData_Size,
          &needContinue,
          &canRecode);
      
      // res= E_INVALIDARG; // for test

      PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
      PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));

      if (res != SZ_OK)
      {
        PRF(printf("\nWrite error = %d\n", res));
        isErrorMode = True;
        p->wasInterrupted = True;
      }
      if (res != SZ_OK
          || (!needContinue && !finish))
      {
        PRF(printf("\nWrite Interrupt error = %x\n", res));
        MtDec_Interrupt(p, blockIndex);
      }
    }

    if (canRecode)
    if (!needCode
        || res != SZ_OK
        || p->wasInterrupted
        || codeRes != SZ_OK
        || wasInterrupted
        || p->numFilledThreads != 0
        || isErrorMode)
    {
      if (p->numFilledThreads == 0)
        p->filledThreadStart = t->index;
      if (inDataSize != 0 || !finish)
      {
        t->inDataSize_Start = inDataSize_Start;
        t->inDataSize = inDataSize;
        p->numFilledThreads++;
      }
      PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
      PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
    }

    if (!finish)
    {
      RINOK_THREAD(Event_Set(&nextThread->canWrite));
    }
    else
    {
      if (needContinue)
      {
        // we restore decoding with new iteration
        RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
      }
      else
      {
        // we exit from decoding
        if (t->index == 0)
          return SZ_OK;
        p->exitThread = True;
      }
      RINOK_THREAD(Event_Set(&p->threads[0].canRead));
    }
  }
  }
}

#ifdef _WIN32
#define USE_ALLOCA
#endif

#ifdef USE_ALLOCA
#ifdef _WIN32
#include <malloc.h>
#else
#include <stdlib.h>
#endif
#endif


static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
{
  WRes res;

  CMtDecThread *t = (CMtDecThread *)pp;
  CMtDec *p;

  // fprintf(stdout, "\n%d = %p\n", t->index, &t);

  res = ThreadFunc2(t);
  p = t->mtDec;
  if (res == 0)
    return p->exitThreadWRes;
  {
    // it's unexpected situation for some threading function error
    if (p->exitThreadWRes == 0)
      p->exitThreadWRes = res;
    PRF(printf("\nthread exit error = %d\n", res));
    p->exitThread = True;
    Event_Set(&p->threads[0].canRead);
    Event_Set(&p->threads[0].canWrite);
    MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
  }
  return res;
}

static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
{
  CMtDecThread *t = (CMtDecThread *)pp;

  // fprintf(stderr, "\n%d = %p - before", t->index, &t);
  #ifdef USE_ALLOCA
  t->allocaPtr = alloca(t->index * 128);
  #endif
  return ThreadFunc1(pp);
}


int MtDec_PrepareRead(CMtDec *p)
{
  if (p->crossBlock && p->crossStart == p->crossEnd)
  {
    ISzAlloc_Free(p->alloc, p->crossBlock);
    p->crossBlock = NULL;
  }
    
  {
    unsigned i;
    for (i = 0; i < MTDEC__THREADS_MAX; i++)
      if (i > p->numStartedThreads
          || p->numFilledThreads <=
            (i >= p->filledThreadStart ?
              i - p->filledThreadStart :
              i + p->numStartedThreads - p->filledThreadStart))
        MtDecThread_FreeInBufs(&p->threads[i]);
  }

  return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
}

    
const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
{
  while (p->numFilledThreads != 0)
  {
    CMtDecThread *t = &p->threads[p->filledThreadStart];
    
    if (*inLim != 0)
    {
      {
        void *link = t->inBuf;
        void *next = ((CMtDecBufLink *)link)->next;
        ISzAlloc_Free(p->alloc, link);
        t->inBuf = next;
      }
      
      if (t->inDataSize == 0)
      {
        MtDecThread_FreeInBufs(t);
        if (--p->numFilledThreads == 0)
          break;
        if (++p->filledThreadStart == p->numStartedThreads)
          p->filledThreadStart = 0;
        t = &p->threads[p->filledThreadStart];
      }
    }
    
    {
      size_t lim = t->inDataSize_Start;
      if (lim != 0)
        t->inDataSize_Start = 0;
      else
      {
        UInt64 rem = t->inDataSize;
        lim = p->inBufSize;
        if (lim > rem)
          lim = (size_t)rem;
      }
      t->inDataSize -= lim;
      *inLim = lim;
      return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
    }
  }

  {
    size_t crossSize = p->crossEnd - p->crossStart;
    if (crossSize != 0)
    {
      const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
      *inLim = crossSize;
      p->crossStart = 0;
      p->crossEnd = 0;
      return data;
    }
    *inLim = 0;
    if (p->crossBlock)
    {
      ISzAlloc_Free(p->alloc, p->crossBlock);
      p->crossBlock = NULL;
    }
    return NULL;
  }
}


void MtDec_Construct(CMtDec *p)
{
  unsigned i;
  
  p->inBufSize = (size_t)1 << 18;

  p->numThreadsMax = 0;

  p->inStream = NULL;
  
  // p->inData = NULL;
  // p->inDataSize = 0;

  p->crossBlock = NULL;
  p->crossStart = 0;
  p->crossEnd = 0;

  p->numFilledThreads = 0;

  p->progress = NULL;
  p->alloc = NULL;

  p->mtCallback = NULL;
  p->mtCallbackObject = NULL;

  p->allocatedBufsSize = 0;

  for (i = 0; i < MTDEC__THREADS_MAX; i++)
  {
    CMtDecThread *t = &p->threads[i];
    t->mtDec = p;
    t->index = i;
    t->inBuf = NULL;
    Event_Construct(&t->canRead);
    Event_Construct(&t->canWrite);
    Thread_Construct(&t->thread);
  }

  // Event_Construct(&p->finishedEvent);

  CriticalSection_Init(&p->mtProgress.cs);
}


static void MtDec_Free(CMtDec *p)
{
  unsigned i;

  p->exitThread = True;

  for (i = 0; i < MTDEC__THREADS_MAX; i++)
    MtDecThread_Destruct(&p->threads[i]);

  // Event_Close(&p->finishedEvent);

  if (p->crossBlock)
  {
    ISzAlloc_Free(p->alloc, p->crossBlock);
    p->crossBlock = NULL;
  }
}


void MtDec_Destruct(CMtDec *p)
{
  MtDec_Free(p);

  CriticalSection_Delete(&p->mtProgress.cs);
}


SRes MtDec_Code(CMtDec *p)
{
  unsigned i;

  p->inProcessed = 0;

  p->blockIndex = 1; // it must be larger than not_defined index (0)
  p->isAllocError = False;
  p->overflow = False;
  p->threadingErrorSRes = SZ_OK;

  p->needContinue = True;

  p->readWasFinished = False;
  p->needInterrupt = False;
  p->interruptIndex = (UInt64)(Int64)-1;

  p->readProcessed = 0;
  p->readRes = SZ_OK;
  p->codeRes = SZ_OK;
  p->wasInterrupted = False;

  p->crossStart = 0;
  p->crossEnd = 0;

  p->filledThreadStart = 0;
  p->numFilledThreads = 0;

  {
    unsigned numThreads = p->numThreadsMax;
    if (numThreads > MTDEC__THREADS_MAX)
      numThreads = MTDEC__THREADS_MAX;
    p->numStartedThreads_Limit = numThreads;
    p->numStartedThreads = 0;
  }

  if (p->inBufSize != p->allocatedBufsSize)
  {
    for (i = 0; i < MTDEC__THREADS_MAX; i++)
    {
      CMtDecThread *t = &p->threads[i];
      if (t->inBuf)
        MtDecThread_FreeInBufs(t);
    }
    if (p->crossBlock)
    {
      ISzAlloc_Free(p->alloc, p->crossBlock);
      p->crossBlock = NULL;
    }

    p->allocatedBufsSize = p->inBufSize;
  }

  MtProgress_Init(&p->mtProgress, p->progress);

  // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
  p->exitThread = False;
  p->exitThreadWRes = 0;

  {
    WRes wres;
    WRes sres;
    CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
    // wres = MtDecThread_CreateAndStart(nextThread);
    wres = MtDecThread_CreateEvents(nextThread);
    if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
    if (wres == 0) { wres = Event_Set(&nextThread->canRead);
    if (wres == 0) { wres = ThreadFunc(nextThread);
    if (wres != 0)
    {
      p->needContinue = False;
      MtDec_CloseThreads(p);
    }}}}

    // wres = 17; // for test
    // wres = Event_Wait(&p->finishedEvent);

    sres = MY_SRes_HRESULT_FROM_WRes(wres);

    if (sres != 0)
      p->threadingErrorSRes = sres;

    if (
        // wres == 0
        // wres != 0
        // || p->mtc.codeRes == SZ_ERROR_MEM
        p->isAllocError
        || p->threadingErrorSRes != SZ_OK
        || p->overflow)
    {
      // p->needContinue = True;
    }
    else
      p->needContinue = False;
    
    if (p->needContinue)
      return SZ_OK;

    // if (sres != SZ_OK)
      return sres;
    // return E_FAIL;
  }
}

#endif

Messung V0.5
C=97 H=92 G=94

¤ Dauer der Verarbeitung: 0.37 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.