Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/GAP/hpcgap/demo/parorbit/   (Algebra von RWTH Aachen Version 4.15.1©)  Datei vom 18.9.2025 mit Größe 11 kB image not shown  

SSL parallelorbit3.g   Sprache: unbekannt

 
# This is a third try of a parallel orbit for hpcgap running in threads:
# This time we use individual channels for each worker to distribute
# the work, but we queue a configurable number of chunks for each worker.
# This should avoid the hotspot in the single channel and should be
# translatable to distributed memory without much change.

LoadPackage("orb");

HashServer := function(id,pt,inch,outchs,status,hashsize,chunksize,queuesize)
  # id is the ID of ourselves
  # pt is a sample point
  # hashsize is the hash size
  # inch is our input channel
  # outchs is a list work queues, one for each worker
  # chunksize is the size of a chunk of work
  # queuesize is the target for each work queue
  local Poll,Send,ht,p,pts,queued,r,sent,sumq,target,todo,val,w,work;
  #Print("I am hash #", ThreadID(CurrentThread()), " ", id, "\n");
  ht := HTCreate(pt,rec( hashlen := hashsize ));
  pts := EmptyPlist(hashsize);
  w := Length(outchs);  # number of workers
  target := queuesize * w;
  sent := 0;
  todo := EmptyPlist(1000);
  #sizes := EmptyPlist(1000);
  work := CreateChannel();  # a FIFO queue from us to ourselves
  queued := 0*[1..w];
  sumq := 0;
  Poll := function(wait)
      local r,x;
      if wait then
          r := ReceiveChannel(inch);
      else
          r := TryReceiveChannel(inch,fail);
          if r = fail then return false; fi;
      fi;
      if IsStringRep(r) and r = "exit" then
          MakeReadOnlySingleObj(ht!.els);
          MakeReadOnlySingleObj(ht!.vals);
          SendChannel(status,ht);
          MakeReadOnlySingleObj(pts);
          SendChannel(status,pts);
          #SendChannel(status,sizes);
          return true;
      elif IsInt(r) then
          x := TryReceiveChannel(work,fail);
          if x = fail then
              queued[r] := queued[r] - 1;
              sumq := sumq - 1;
              if queued[r] = 0 and sumq = 0 and Length(todo) = 0 then
                  SendChannel(status,id);   # we are idle
              fi;
          else
              SendChannel(outchs[r],x);
          fi;
          return false;
      else
          #Print("HS ",id," got data ",Length(r),"\n");
          Add(todo,r);
      fi;
      return false;
  end;
  Send := function(e)
      local i,tosend,x;
      tosend := EmptyPlist(e-sent+1);
      Add(tosend,id);
      for i in [sent+1..e] do
          Add(tosend,pts[i]);
      od;
      MakeReadOnlySingleObj(tosend);
      SendChannel(work,tosend);
      #Add(sizes,e-sent);
      sent := e;
      if sumq = target then return; fi;
      i := Random(1,w);
      while true do
          x := TryReceiveChannel(work,fail);
          if x = fail then return; fi;
          if queued[i] < queuesize then
              SendChannel(outchs[i],x);
              queued[i] := queued[i] + 1;
              sumq := sumq+1;
              if sumq = target then return; fi;
          fi;
          i := i + 1;
          if i > w then i := 1; fi;
      od;
  end;

  while true do
      if Poll(true) then return; fi;
      while Length(todo) > 0 do
          r := Remove(todo,1);
          for p in r do
              val := HTValue(ht,p);
              if val = fail then
                  HTAdd(ht,p,true);
                  Add(pts,p);
                  if Length(pts)-sent >= chunksize then
                      Send(sent+chunksize);
                  fi;
              fi;
          od;
          if Length(pts) > sent and sumq < target then
              Send(Length(pts));
          fi;
          if Poll(false) then return; fi;
          if sumq = 0 and Length(todo) = 0 then
              SendChannel(status,id);
          fi;
      od;
  od;
end;

Worker := function(id,gens,op,hashins,myqueue,status,f)
  # id is my ID
  # gens are the generators to act
  # op is the action function
  # hashins is a list of channels, one for each hashserver
  # myqueue is one channel for us to receive work
  # status is the global status channel
  # f is a distribution hash function
  local c,g,j,n,res,t,x;
  #Print("I am worker #",id,"\n");
  n := Length(hashins);
  while true do
      #c := 0;
      #while true do
      #    t := TryReceiveChannel(myqueue,fail);
      #    if t = fail then
      #        c := c + 1;
      #    else
      #        break;
      #    fi;
      #od;
      #if c > 0 then Print("Had to TryReceiveChannel ",c," times.\n"); fi;
      t := ReceiveChannel(myqueue);
      if IsStringRep(t) and t = "exit" then return; fi;
      #Print("Worker got work ",Length(t),"\n");
      res := List([1..n],x->EmptyPlist(QuoInt(Length(t)*Length(gens)*2,n)));
      for j in [2..Length(t)] do
          for g in gens do
              x := op(t[j],g);
              Add(res[f(x)],x);
          od;
      od;
      for j in [1..n] do
          if Length(res[j]) > 0 then
              SendChannel(status,-j);   # hashserver not ready
              MakeReadOnlySingleObj(res[j]);
              SendChannel(hashins[j],res[j]);
              #Print("Worker sent result to HS ",j,"\n");
          fi;
      od;
      SendChannel(hashins[t[1]],id);
      #Print("Worker sent done to HS ",t[1],"\n");
  od;
end;

TimeDiff := function(t,t2)
  return (t2-t)*1.E-9;
end;

ParallelOrbit := function(gens,pt,op,opt)
    local allhashes,allpts,change,h,i,k,pos,ptcopy,q,ready,s,ti,ti2,w,x;
    if not IsBound(opt.nrhash) then opt.nrhash := 1; fi;
    if not IsBound(opt.nrwork) then opt.nrwork := 1; fi;
    if not IsBound(opt.disthf) then opt.disthf := x->1; fi;
    if not IsBound(opt.hashlen) then opt.hashlen := 100001; fi;
    if not IsBound(opt.chunksize) then opt.chunksize := 1000; fi;
    if not IsBound(opt.queuesize) then opt.queuesize := 3; fi;
    if IsGroup(gens) then gens := GeneratorsOfGroup(gens); fi;
    if IsMutable(gens) then MakeImmutable(gens); fi;
    if not(IsReadOnlyObj(gens)) then MakeReadOnlySingleObj(gens); fi;
    if IsMutable(pt) then pt := MakeImmutable(StructuralCopy(pt)); fi;
    if not(IsReadOnlyObj(pt)) then MakeReadOnlySingleObj(pt); fi;

    ti := IO_gettimeofday();
    ptcopy := StructuralCopy(pt);
    ShareObj(ptcopy);
    i := List([1..opt.nrhash],k->CreateChannel());
    MakeReadOnlySingleObj(i);
    q := List([1..opt.nrwork],k->CreateChannel());
    MakeReadOnlySingleObj(q);
    s := CreateChannel();
    h := List([1..opt.nrhash],k->RunTask(HashServer,k,ptcopy,i[k],q,s,
                  opt.hashlen,opt.chunksize,opt.queuesize));
    Print("Hash servers started.\n");
    pos := opt.disthf(pt);
    SendChannel(i[pos],[pt]);
    Print("Seed sent.\n");
    w := List([1..opt.nrwork],
              k->RunTask(Worker,k,gens,op,i,q[k],s,opt.disthf));
    Print("Workers started...\n");
    ready := BlistList([1..opt.nrhash],[1..opt.nrhash]);
    ready[pos] := false;
    while ForAny([1..opt.nrhash],i->ready[i]=false) do
        change := false;
        x := ReceiveChannel(s);
        while true do
            if x = fail then break; 
            elif x < 0 then 
                if ready[-x] = true then 
                    change := true; 
                    #Print("Hash server #",x," got some work.\n");
                fi;
                ready[-x] := false;
            else 
                if ready[x] = false then 
                    change := true; 
                    #Print("Hash server #",x," became idle.\n");
                fi;
                ready[x] := true;
            fi;
            x := TryReceiveChannel(s,fail);
        od;
        # if change then Print("\nCentral: ready is ",ready,"\r"); fi;
    od;
    # Now terminate all workers:
    for k in [1..opt.nrwork] do
        SendChannel(q[k],"exit");
    od;
    Print("Sent exit.\n");
    for k in [1..opt.nrwork] do
        WaitTask(w[k]);
    od;
    Print("All workers done.\n");
    # Now terminate all hashservers:
    allhashes := EmptyPlist(k);
    allpts := EmptyPlist(k);
    #allstats := EmptyPlist(k);
    for k in [1..opt.nrhash] do
        SendChannel(i[k],"exit");
        Add(allhashes,ReceiveChannel(s));
        Add(allpts,ReceiveChannel(s));
        #Add(allstats,ReceiveChannel(s));
        WaitTask(h[k]);
    od;
    Print("All hashservers done.\n");
    ti2 := IO_gettimeofday();
    return rec( allhashes := allhashes, allpts := allpts,
                time := TimeDiff(ti,ti2),
                #allstats := allstats, 
                nrhash := opt.nrhash, 
                nrwork := opt.nrwork );
end;

Measure := function(gens,pt,op,n)
  local Ping,Pong,c1,c2,computebandwidth,g,ht,i,j,k,l,ll,
        lookupbandwidth,t,t1,ti,ti2,timeperlookup,timeperop,times,x;
  if IsGroup(gens) then gens := GeneratorsOfGroup(gens); fi;
  if IsMutable(gens) then MakeImmutable(gens); fi;
  if IsMutable(pt) then pt := MakeImmutable(StructuralCopy(pt)); fi;

  # First measure computation bandwidth to get some idea and some data:
  Print("Measuring computation bandwidth... \c");
  k := Length(gens);
  l := EmptyPlist(n*k);
  l[1] := pt;
  ti := IO_gettimeofday();
  # This does Length(gens)*n operations and keeps the results:
  for j in [1..n] do
      for g in gens do
          Add(l,op(l[j],g));
      od;
  od;
  ti2 := IO_gettimeofday();
  timeperop := TimeDiff(ti,ti2)/(k*n);  # time for one op
  computebandwidth := 1.0/timeperop;
  Print(computebandwidth,"\n");

  # Now hash lookup bandwith:
  Print("Measuring lookup bandwidth... \c");
  ht := HTCreate(pt,rec( hashlen := NextPrimeInt(2*k*n) ));
  # Store things in the hash:
  for j in [1..n*k] do
      x := HTValue(ht,l[j]);
      if x = fail then HTAdd(ht,l[j],j); fi;
  od;
  ti := IO_gettimeofday();
  for j in [1..n*k] do
      x := HTValue(ht,l[j]);
  od;
  ti2 := IO_gettimeofday();
  timeperlookup := TimeDiff(ti,ti2)/(k*n);  # time for one op
  lookupbandwidth := 1.0/timeperlookup;
  Print(lookupbandwidth,"\n");

  # Now transfer data between two threads:
  Ping := function(c,cc,n)
    local i,o,oo;
    o := ReceiveChannel(cc);
    for i in [1..n] do
        SendChannel(c,o);
        oo := ReceiveChannel(cc);
    od;
    SendChannel(c,o);
  end;
  Pong := function(c,cc,n)
    local i,oo;
    for i in [1..n] do
        oo := ReceiveChannel(c);
        SendChannel(cc,oo);
    od;
  end;
  times := [];
  c1 := CreateChannel();
  c2 := CreateChannel();
  Print("Measuring ping pong speed...\n");
  for i in [0..30] do
      if 2^i > Length(l) then break; fi;
      ll := l{[1..2^i]};
      Print(2^i,"... ");
      ti := IO_gettimeofday();
      t1 := CreateThread(Ping,c1,c2,10000000);
      ti2 := IO_gettimeofday();
      t := TimeDiff(ti,ti2)/10000000.0;
      Add(times,t);
      Print(t," ==> ",1.0/t," ping pongs/s.\n");
  od;
  
  return rec( timeperop := timeperop, 
              computebandwidth := computebandwidth,
              timeperlookup := timeperlookup, 
              lookupbandwidth := lookupbandwidth,
              pingpongtimes := times,
              pingpongbandwidth := List(times,x->1.0/x) );
end;

OnRightRO := function(x,g)
  local y;
  y := x*g;
  MakeReadOnlySingleObj(y);
  return y;
end;

OnSubspacesByCanonicalBasisRO := function(x,g)
  local y;
  y := OnSubspacesByCanonicalBasis(x,g);
  MakeReadOnlySingleObj(y);
  return y;
end;

MakeDistributionHF := function(x,n) 
  local hf,data;
  if n = 1 then return x->1; fi;
  hf := ChooseHashFunction(x,n);
  data := hf.data;
  MakeReadOnlySingleObj(data);
  hf := hf.func;
  return y->hf(y,data);
end;

DoStatistics := function(gens,v,op,hash,work,opt)
  local stats,s,h,w,r;
  stats := [];
  for h in hash do
    s := [];
    for w in work do
      opt.nrhash := h;
      opt.nrwork := w;
      opt.disthf := MakeDistributionHF(v,h);
      Print("Doing garbage collection...\c");
      GASMAN("collect");
      Print("\nDoing ",h," hashservers and ",w," workers... \c");
      r := ParallelOrbit(gens,v,op,opt);
      Add(s,r.time);
      Print(r.time,"\n");
    od;
    Add(stats,s);
  od;
  return stats;
end;


m := MathieuGroup(24);
# r := ParallelOrbit(m,1,OnPoints,rec());;
# r := ParallelOrbit(m,[1,2,3,4],OnTuples,
#     rec(nrhash := 2,nrwork := 2, disthf := x -> (x[1] mod 2) + 1));;

[ Verzeichnis aufwärts0.33unsichere Verbindung  Übersetzung europäischer Sprachen durch Browser  ]