Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/GAP/hpcgap/demo/parorbit/   (Algebra von RWTH Aachen Version 4.15.1©)  Datei vom 18.9.2025 mit Größe 8 kB image not shown  

Quelle  parorbit-nohashservers.g   Sprache: unbekannt

 
Spracherkennung für: .g vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

# This is a second try of a parallel orbit for hpcgap running in threads:
# This time we use a central channel for work distribution.
LoadPackage("orb");
LoadPackage("io");
Read ("../bench.g");
Read ("logging.g");

TaskPool := ShareObj (rec (nrChunks := 0,
                    chunks := [],
                    currentChunk := [],
                    currentChunkSize := 0,
                    outstandingWork := 0));

DeclareGlobalVariable("hasWork");
MakeReadWriteGVar("hasWork");
DeclareGlobalVariable("HashTables");
MakeReadWriteGVar("HashTables");

taskPoolSemaphore := CreateSemaphore(1);
  
DoWork := function (work, nrTasks, op, gens, distHashFun, results)
  local g, x, i;
  for i in [1..nrTasks] do
    for g in gens do 
      x := op (work[i], g);
      MakeReadOnlySingleObj(x);
      Add(results[distHashFun(x)],x);
    od;
  od;
end;

Worker := function(nrWorkers, op, gens, chunkSize, distHashFun, tracing)
  local lock, haveTaskPoolLock, work, i, hashLock, 
        nrProducedElems, nrTasksToGrab, results, accResults,
        j, res, val, t, tasksToAdd, prevNrChunks, bound;
  if tracing then
    Tracing.InitWorkerLog();
  fi;
  tasksToAdd := 0;
  haveTaskPoolLock := false;
  results := EmptyPlist(nrWorkers);
  nrProducedElems := Length(gens);
  for i in [1..nrWorkers] do
    results[i] := EmptyPlist(nrProducedElems*chunkSize);
  od;
  accResults := EmptyPlist(nrProducedElems*chunkSize);
  work := EmptyPlist(chunkSize);
  while true do
    ###### 1. get work
    # 1.1 lock the task pool
    if not haveTaskPoolLock then
      lock := LOCK(TaskPool);
      haveTaskPoolLock := true;
    fi;
    
    # 1.2 if there are 0 chunks of work in the task pool, and one of the workers is
    #     still processing tasks, we release the lock on the task pool and wait for
    #     task pool semaphore to signal when enough tasks are available
    if TaskPool.nrChunks = 0 and TaskPool.outstandingWork > 0 then
      UNLOCK(lock);
      haveTaskPoolLock := false;
      WaitSemaphore(taskPoolSemaphore);
      lock := LOCK(TaskPool);
      haveTaskPoolLock := true;
    fi;
    
    # 1.3 if we are here, it means that either 
    #     i)  there are > 0 chunks of work in the task pool or
    #     ii) there are 0 chunks of work in the task pool, but we are the only
    #         worker doing work
    #     if i) holds, then grab a chunk of work
    #     if ii) holds, grab an incomplete chunk of work
    if TaskPool.nrChunks = 0 then
      if TaskPool.currentChunkSize > 0 then
        work := AdoptObj(TaskPool.currentChunk);
        TaskPool.currentChunk := MigrateObj(EmptyPlist(chunkSize),TaskPool);
        TaskPool.currentChunkSize := 0;
      else
        if tracing then
          Tracing.Close();
        fi;
        return;
      fi;
    else
      work := AdoptObj(Remove(TaskPool.chunks));
      TaskPool.nrChunks := TaskPool.nrChunks - 1;
    fi;
    
    if tracing then
      Tracing.TraceWorkerGotTask();
    fi;
    TaskPool.outstandingWork := TaskPool.outstandingWork + 1;
    
    # 1.5 if there remains > 0 chunks of work in the task pool,
    #     then signal the taskPoolSemaphore and release the lock on the task pool. 
    #     also, if some workers are currently processing tasks, release the task pool
    #     lock, so they can add their results to the task pool.

    
    if TaskPool.nrChunks > 0 or TaskPool.outstandingWork > 1 then
      if TaskPool.nrChunks > 0 then
        SignalSemaphore(taskPoolSemaphore);
      fi;
      UNLOCK(lock);
      haveTaskPoolLock := false;
    fi;
    
    if tracing then
      Tracing.TraceTaskStarted();
    fi;
    
    # 2. do work (this stores results in results list of lists)
    DoWork(work, Length(work), op, gens, distHashFun, results);
    
    # 3. add results to the hash table and task pool
    
    # 3.1 accumulate all results that have not been seen before in 
    #     accResults list
    for i in [1..nrWorkers] do
      if Length(results[i]) > 0 then
        if tracing then
          Tracing.TraceWorkerBlocked();
        fi;
        hashLock := LOCK(HashTables[i]);
        if tracing then
          Tracing.TraceWorkerResumed();
        fi;
        for j in [1..Length(results[i])] do
          res := Remove(results[i]);
          val := HTValue(HashTables[i], res);
          if val = fail then
            HTAdd (HashTables[i], res, true);
            Add(accResults, res);
          fi;
        od;
        UNLOCK(hashLock);
      fi;
    od;
    
    
    # 3.2 if we are not already holding task pool lock, obtain it
    if not haveTaskPoolLock then
      if tracing then
        Tracing.TraceWorkerBlocked();
      fi;
      lock := LOCK(TaskPool);
      if tracing then
        Tracing.TraceWorkerResumed();
      fi;
      haveTaskPoolLock := true;
    fi;
    
    TaskPool.outstandingWork := TaskPool.outstandingWork - 1;
    tasksToAdd := Length(accResults);
    
    # 3.3 add the tasks from accResults to the task pool
    
    if tasksToAdd > 0 then
      prevNrChunks := TaskPool.nrChunks;
      while not IsEmpty(accResults) do
        if Length(accResults) < chunkSize - TaskPool.currentChunkSize then
          bound := Length(accResults);
        else
          bound := chunkSize - TaskPool.currentChunkSize;
        fi;
        for i in [1..bound] do
          Add (TaskPool.currentChunk, Remove(accResults));
        od;
        if tracing and TaskPool.currentChunkSize = 0 then
          Tracing.TraceTaskCreated();
        fi;
        TaskPool.currentChunkSize := TaskPool.currentChunkSize + bound;
        if TaskPool.currentChunkSize = chunkSize then
          Add (TaskPool.chunks, TaskPool.currentChunk);
          TaskPool.nrChunks := TaskPool.nrChunks + 1;
          TaskPool.currentChunk := MigrateObj(EmptyPlist(chunkSize), TaskPool);
          TaskPool.currentChunkSize := 0;
        fi;
      od;
    fi;
    
    if tracing then
      Tracing.TraceTaskFinished();
      Tracing.TraceWorkerIdle();  
    fi;
    
    # 3.4 if there are now enough tasks (>chunksize) in the
    #     task pool, signal the task pool semaphore
    if TaskPool.nrChunks > 0 and prevNrChunks = 0 then
      SignalSemaphore(taskPoolSemaphore);
    fi;
    
    # 3.5 if there are workers processing tasks, release the task pool
    #     lock so they can add their results to it
    if TaskPool.outstandingWork > 0 then 
      UNLOCK(lock);
      haveTaskPoolLock := false;
    fi;
    
  od;
end;

ParallelOrbit := function (gens, pt, op, opt)
  local i, workers;
  if not IsBound(opt.nrwork) then opt.nrwork := 1; fi;
  if not IsBound(opt.disthf) then opt.disthf := x->1; fi;
  if not IsBound(opt.hashlen) then opt.hashlen := 100001; fi;
  if not IsBound(opt.chunksize) then opt.chunksize := 1000; fi;
  if IsGroup(gens) then gens := GeneratorsOfGroup(gens); fi;
  if IsMutable(gens) then MakeImmutable(gens); fi;
  if not(IsReadOnlyObj(gens)) then MakeReadOnlySingleObj(gens); fi;
  if IsMutable(pt) then pt := MakeImmutable(StructuralCopy(pt)); fi;
  if not(IsReadOnlyObj(pt)) then MakeReadOnlySingleObj(pt); fi;
  
  if not IsBound(opt.tracing) then opt.tracing := false; fi;
  
  if opt.tracing then
    StartLogging();
  fi;
  
  hasWork := CreateChannel();
  HashTables := AtomicList ([]);
  for i in [1..opt.nrwork] do
    HashTables[i] := ShareObj(HTCreate(pt, rec (hashlen := opt.hashlen)));
  od;
  atomic TaskPool do
    TaskPool.nrChunks := 0;
    TaskPool.currentChunk := MigrateObj(EmptyPlist(opt.chunksize), TaskPool);
    Add(TaskPool.currentChunk, MakeReadOnlySingleObj(pt));
    if opt.tracing then
      Tracing.TraceTaskCreated();
    fi;
    TaskPool.currentChunkSize := 1;
  od;
  SendChannel (hasWork, 1);
  workers := List ([1..opt.nrwork], \x -> CreateThread(Worker,opt.nrwork,op,gens,opt.chunksize,opt.disthf,opt.tracing));
  for i in [1..opt.nrwork] do
    WaitThread(workers[i]);
  od;
  
  if opt.tracing then
    StopLogging();
  fi;
  
  return true;
    
end;

OnRightRO := function(x,g)
  local y;
  y := x*g;
  MakeReadOnlySingleObj(y);
  return y;
end;

OnSubspacesByCanonicalBasisRO := function(x,g)
  local y;
  y := OnSubspacesByCanonicalBasis(x,g);
  MakeReadOnlySingleObj(y);
  return y;
end;

MakeDistributionHF := function(x,n) 
  local hf,data;
  hf := ChooseHashFunction(x,n);
  data := hf.data;
  MakeReadOnlySingleObj(data);
  hf := hf.func;
  return y->hf(y,data);
end;

if IsBound(MakeReadOnlySingleObj) then
    OnRightRO := function(x,g)
      local y;
      y := x*g;
      MakeReadOnlySingleObj(y);
      return y;
    end;
else
    OnRightRO := OnRight;
fi;

#Read ("HNdata.g");
#r := Bench( do ParallelOrbit(gens,v,OnRightRO,
#        rec(nrwork := 16, disthf := MakeDistributionHF(v,16)));; od);
        

[ Dauer der Verarbeitung: 0.29 Sekunden  ]