Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  MtCoder.c   Sprache: C

 
/* MtCoder.c -- Multi-thread Coder
2018-02-21 : Igor Pavlov : Public domain */


#include "Precomp.h"

#include "MtCoder.h"

#ifndef _7ZIP_ST

SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
{
  CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
  UInt64 inSize2 = 0;
  UInt64 outSize2 = 0;
  if (inSize != (UInt64)(Int64)-1)
  {
    inSize2 = inSize - thunk->inSize;
    thunk->inSize = inSize;
  }
  if (outSize != (UInt64)(Int64)-1)
  {
    outSize2 = outSize - thunk->outSize;
    thunk->outSize = outSize;
  }
  return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
}


void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
{
  p->vt.Progress = MtProgressThunk_Progress;
}



#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }


static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
{
  if (Event_IsCreated(p))
    return Event_Reset(p);
  return AutoResetEvent_CreateNotSignaled(p);
}


static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);


static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
{
  WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
  if (wres == 0)
  {
    t->stop = False;
    if (!Thread_WasCreated(&t->thread))
      wres = Thread_Create(&t->thread, ThreadFunc, t);
    if (wres == 0)
      wres = Event_Set(&t->startEvent);
  }
  if (wres == 0)
    return SZ_OK;
  return MY_SRes_HRESULT_FROM_WRes(wres);
}


static void MtCoderThread_Destruct(CMtCoderThread *t)
{
  if (Thread_WasCreated(&t->thread))
  {
    t->stop = 1;
    Event_Set(&t->startEvent);
    Thread_Wait(&t->thread);
    Thread_Close(&t->thread);
  }

  Event_Close(&t->startEvent);

  if (t->inBuf)
  {
    ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
    t->inBuf = NULL;
  }
}



static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
{
  size_t size = *processedSize;
  *processedSize = 0;
  while (size != 0)
  {
    size_t cur = size;
    SRes res = ISeqInStream_Read(stream, data, &cur);
    *processedSize += cur;
    data += cur;
    size -= cur;
    RINOK(res);
    if (cur == 0)
      return SZ_OK;
  }
  return SZ_OK;
}


/*
  ThreadFunc2() returns:
  SZ_OK           - in all normal cases (even for stream error or memory allocation error)
  SZ_ERROR_THREAD - in case of failure in system synch function
*/


static SRes ThreadFunc2(CMtCoderThread *t)
{
  CMtCoder *mtc = t->mtCoder;

  for (;;)
  {
    unsigned bi;
    SRes res;
    SRes res2;
    Bool finished;
    unsigned bufIndex;
    size_t size;
    const Byte *inData;
    UInt64 readProcessed = 0;
    
    RINOK_THREAD(Event_Wait(&mtc->readEvent))

    /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */

    if (mtc->stopReading)
    {
      return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
    }

    res = MtProgress_GetError(&mtc->mtProgress);
    
    size = 0;
    inData = NULL;
    finished = True;

    if (res == SZ_OK)
    {
      size = mtc->blockSize;
      if (mtc->inStream)
      {
        if (!t->inBuf)
        {
          t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
          if (!t->inBuf)
            res = SZ_ERROR_MEM;
        }
        if (res == SZ_OK)
        {
          res = FullRead(mtc->inStream, t->inBuf, &size);
          readProcessed = mtc->readProcessed + size;
          mtc->readProcessed = readProcessed;
        }
        if (res != SZ_OK)
        {
          mtc->readRes = res;
          /* after reading error - we can stop encoding of previous blocks */
          MtProgress_SetError(&mtc->mtProgress, res);
        }
        else
          finished = (size != mtc->blockSize);
      }
      else
      {
        size_t rem;
        readProcessed = mtc->readProcessed;
        rem = mtc->inDataSize - (size_t)readProcessed;
        if (size > rem)
          size = rem;
        inData = mtc->inData + (size_t)readProcessed;
        readProcessed += size;
        mtc->readProcessed = readProcessed;
        finished = (mtc->inDataSize == (size_t)readProcessed);
      }
    }

    /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */

    res2 = SZ_OK;

    if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
    {
      res2 = SZ_ERROR_THREAD;
      if (res == SZ_OK)
      {
        res = res2;
        // MtProgress_SetError(&mtc->mtProgress, res);
      }
    }

    bi = mtc->blockIndex;

    if (++mtc->blockIndex >= mtc->numBlocksMax)
      mtc->blockIndex = 0;

    bufIndex = (unsigned)(int)-1;

    if (res == SZ_OK)
      res = MtProgress_GetError(&mtc->mtProgress);

    if (res != SZ_OK)
      finished = True;

    if (!finished)
    {
      if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
          && mtc->expectedDataSize != readProcessed)
      {
        res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
        if (res == SZ_OK)
          mtc->numStartedThreads++;
        else
        {
          MtProgress_SetError(&mtc->mtProgress, res);
          finished = True;
        }
      }
    }

    if (finished)
      mtc->stopReading = True;

    RINOK_THREAD(Event_Set(&mtc->readEvent))

    if (res2 != SZ_OK)
      return res2;

    if (res == SZ_OK)
    {
      CriticalSection_Enter(&mtc->cs);
      bufIndex = mtc->freeBlockHead;
      mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
      CriticalSection_Leave(&mtc->cs);
      
      res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
          mtc->inStream ? t->inBuf : inData, size, finished);
      
      // MtProgress_Reinit(&mtc->mtProgress, t->index);

      if (res != SZ_OK)
        MtProgress_SetError(&mtc->mtProgress, res);
    }

    {
      CMtCoderBlock *block = &mtc->blocks[bi];
      block->res = res;
      block->bufIndex = bufIndex;
      block->finished = finished;
    }
    
    #ifdef MTCODER__USE_WRITE_THREAD
      RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
    #else
    {
      unsigned wi;
      {
        CriticalSection_Enter(&mtc->cs);
        wi = mtc->writeIndex;
        if (wi == bi)
          mtc->writeIndex = (unsigned)(int)-1;
        else
          mtc->ReadyBlocks[bi] = True;
        CriticalSection_Leave(&mtc->cs);
      }

      if (wi != bi)
      {
        if (res != SZ_OK || finished)
          return 0;
        continue;
      }

      if (mtc->writeRes != SZ_OK)
        res = mtc->writeRes;

      for (;;)
      {
        if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
        {
          res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
          if (res != SZ_OK)
          {
            mtc->writeRes = res;
            MtProgress_SetError(&mtc->mtProgress, res);
          }
        }

        if (++wi >= mtc->numBlocksMax)
          wi = 0;
        {
          Bool isReady;

          CriticalSection_Enter(&mtc->cs);
          
          if (bufIndex != (unsigned)(int)-1)
          {
            mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
            mtc->freeBlockHead = bufIndex;
          }
          
          isReady = mtc->ReadyBlocks[wi];
          
          if (isReady)
            mtc->ReadyBlocks[wi] = False;
          else
            mtc->writeIndex = wi;
          
          CriticalSection_Leave(&mtc->cs);

          RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))

          if (!isReady)
            break;
        }

        {
          CMtCoderBlock *block = &mtc->blocks[wi];
          if (res == SZ_OK && block->res != SZ_OK)
            res = block->res;
          bufIndex = block->bufIndex;
          finished = block->finished;
        }
      }
    }
    #endif
      
    if (finished || res != SZ_OK)
      return 0;
  }
}


static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
{
  CMtCoderThread *t = (CMtCoderThread *)pp;
  for (;;)
  {
    if (Event_Wait(&t->startEvent) != 0)
      return SZ_ERROR_THREAD;
    if (t->stop)
      return 0;
    {
      SRes res = ThreadFunc2(t);
      CMtCoder *mtc = t->mtCoder;
      if (res != SZ_OK)
      {
        MtProgress_SetError(&mtc->mtProgress, res);
      }
      
      #ifndef MTCODER__USE_WRITE_THREAD
      {
        unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
        if (numFinished == mtc->numStartedThreads)
          if (Event_Set(&mtc->finishedEvent) != 0)
            return SZ_ERROR_THREAD;
      }
      #endif
    }
  }
}



void MtCoder_Construct(CMtCoder *p)
{
  unsigned i;
  
  p->blockSize = 0;
  p->numThreadsMax = 0;
  p->expectedDataSize = (UInt64)(Int64)-1;

  p->inStream = NULL;
  p->inData = NULL;
  p->inDataSize = 0;

  p->progress = NULL;
  p->allocBig = NULL;

  p->mtCallback = NULL;
  p->mtCallbackObject = NULL;

  p->allocatedBufsSize = 0;

  Event_Construct(&p->readEvent);
  Semaphore_Construct(&p->blocksSemaphore);

  for (i = 0; i < MTCODER__THREADS_MAX; i++)
  {
    CMtCoderThread *t = &p->threads[i];
    t->mtCoder = p;
    t->index = i;
    t->inBuf = NULL;
    t->stop = False;
    Event_Construct(&t->startEvent);
    Thread_Construct(&t->thread);
  }

  #ifdef MTCODER__USE_WRITE_THREAD
    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
      Event_Construct(&p->writeEvents[i]);
  #else
    Event_Construct(&p->finishedEvent);
  #endif

  CriticalSection_Init(&p->cs);
  CriticalSection_Init(&p->mtProgress.cs);
}




static void MtCoder_Free(CMtCoder *p)
{
  unsigned i;

  /*
  p->stopReading = True;
  if (Event_IsCreated(&p->readEvent))
    Event_Set(&p->readEvent);
  */


  for (i = 0; i < MTCODER__THREADS_MAX; i++)
    MtCoderThread_Destruct(&p->threads[i]);

  Event_Close(&p->readEvent);
  Semaphore_Close(&p->blocksSemaphore);

  #ifdef MTCODER__USE_WRITE_THREAD
    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
      Event_Close(&p->writeEvents[i]);
  #else
    Event_Close(&p->finishedEvent);
  #endif
}


void MtCoder_Destruct(CMtCoder *p)
{
  MtCoder_Free(p);

  CriticalSection_Delete(&p->cs);
  CriticalSection_Delete(&p->mtProgress.cs);
}


SRes MtCoder_Code(CMtCoder *p)
{
  unsigned numThreads = p->numThreadsMax;
  unsigned numBlocksMax;
  unsigned i;
  SRes res = SZ_OK;

  if (numThreads > MTCODER__THREADS_MAX)
    numThreads = MTCODER__THREADS_MAX;
  numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
  
  if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
  if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
  if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;

  if (numBlocksMax > MTCODER__BLOCKS_MAX)
    numBlocksMax = MTCODER__BLOCKS_MAX;

  if (p->blockSize != p->allocatedBufsSize)
  {
    for (i = 0; i < MTCODER__THREADS_MAX; i++)
    {
      CMtCoderThread *t = &p->threads[i];
      if (t->inBuf)
      {
        ISzAlloc_Free(p->allocBig, t->inBuf);
        t->inBuf = NULL;
      }
    }
    p->allocatedBufsSize = p->blockSize;
  }

  p->readRes = SZ_OK;

  MtProgress_Init(&p->mtProgress, p->progress);

  #ifdef MTCODER__USE_WRITE_THREAD
    for (i = 0; i < numBlocksMax; i++)
    {
      RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
    }
  #else
    RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
  #endif

  {
    RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));

    if (Semaphore_IsCreated(&p->blocksSemaphore))
    {
      RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore));
    }
    RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
  }

  for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
    p->freeBlockList[i] = i + 1;
  p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
  p->freeBlockHead = 0;

  p->readProcessed = 0;
  p->blockIndex = 0;
  p->numBlocksMax = numBlocksMax;
  p->stopReading = False;

  #ifndef MTCODER__USE_WRITE_THREAD
    p->writeIndex = 0;
    p->writeRes = SZ_OK;
    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
      p->ReadyBlocks[i] = False;
    p->numFinishedThreads = 0;
  #endif

  p->numStartedThreadsLimit = numThreads;
  p->numStartedThreads = 0;

  // for (i = 0; i < numThreads; i++)
  {
    CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
    RINOK(MtCoderThread_CreateAndStart(nextThread));
  }

  RINOK_THREAD(Event_Set(&p->readEvent))

  #ifdef MTCODER__USE_WRITE_THREAD
  {
    unsigned bi = 0;

    for (;; bi++)
    {
      if (bi >= numBlocksMax)
        bi = 0;

      RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))

      {
        const CMtCoderBlock *block = &p->blocks[bi];
        unsigned bufIndex = block->bufIndex;
        Bool finished = block->finished;
        if (res == SZ_OK && block->res != SZ_OK)
          res = block->res;

        if (bufIndex != (unsigned)(int)-1)
        {
          if (res == SZ_OK)
          {
            res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
            if (res != SZ_OK)
              MtProgress_SetError(&p->mtProgress, res);
          }
          
          CriticalSection_Enter(&p->cs);
          {
            p->freeBlockList[bufIndex] = p->freeBlockHead;
            p->freeBlockHead = bufIndex;
          }
          CriticalSection_Leave(&p->cs);
        }
        
        RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))

        if (finished)
          break;
      }
    }
  }
  #else
  {
    WRes wres = Event_Wait(&p->finishedEvent);
    res = MY_SRes_HRESULT_FROM_WRes(wres);
  }
  #endif

  if (res == SZ_OK)
    res = p->readRes;

  if (res == SZ_OK)
    res = p->mtProgress.res;

  #ifndef MTCODER__USE_WRITE_THREAD
    if (res == SZ_OK)
      res = p->writeRes;
  #endif

  if (res != SZ_OK)
    MtCoder_Free(p);
  return res;
}

#endif

Messung V0.5
C=99 H=84 G=91

¤ Dauer der Verarbeitung: 0.1 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge