Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/GAP/hpcgap/lib/distributed/   (Algebra von RWTH Aachen Version 4.15.1©)  Datei vom 18.9.2025 mit Größe 22 kB image not shown  

Quelle  dist_tasks.g   Sprache: unbekannt

 
#############################################################################
##
##  This file is part of GAP, a system for computational discrete algebra.
##
##  Copyright of GAP belongs to its developers, whose names are too numerous
##  to list here. Please refer to the COPYRIGHT file for details.
##
##  SPDX-License-Identifier: GPL-2.0-or-later
##

TaskStats := ShareObj ( rec (tasksCreated := 0,
                     tasksStolen := 0,
                     tasksExecuted := 0,
                     tasksOffloaded := 0));

BindGlobal ("BLOCK_TYPES", MakeReadOnlyObj( rec (
        BLOCKED_FETCH := 1,
        BLOCKED_WORKER := 2 )));

BindGlobal ("TASK_MANAGER_REQUESTS", MakeReadOnlyObj (rec (
        BLOCK_ME := 1,
        RESUME_IDLE_WORKER := 2,
        RESUME_BLOCKED_WORKER := 3,
        RESUME_SUSPENDED_WORKER := 4,
        SUSPEND_ME := 5,
        FINISH := 6,
        CULL_IDLE_WORKERS := 7,
        FINISH_WORKER := 8,
                            START_WORKERS := 9,
                            STEAL := 10,
                            NO_WORK := 11,
                            UNSUCC_STEAL := 12,
                            GOT_TASK := 13)));
#        TRY_UNBLOCK_TASK := 10,


DeclareGlobalFunction("ProcessHandleBlockedQueue");
DeclareGlobalFunction("SendSteal");

Tasks := AtomicRecord( rec ( Initial := GAPInfo.KernelInfo.NUM_CPUS ,    # initial number of worker threads
                 ReportErrors := true,
                 FirstTask := true,
                 WorkerPool := CreateChannel(),                # pool of idle workers
                 TaskManagerRequests := CreateChannel(),       # task manager requests
                 WorkerSuspensionRequests := CreateChannel(),  # suspend requests from task manager
                 InputChannels := AtomicList ([]),             # list of worker input channels
                 doStealing := false,
                 stealingStopped := false));



TaskPoolData := ShareSpecialObj( rec(
                        TaskPool := [],                               # task pool (list)
                        TaskPoolLen := 0 ));         # length of a task pool

MakeThreadLocal("threadId");

#ReadLib ("logging.g");

TaskStats := ShareSpecialObj ( rec (tasksCreated := 0,
 tasksStolen := 0,
 tasksExecuted := 0,
  tasksOffloaded := 0));

# Task manager is a special thread that supervises the workers
# (starts, blocks, suspends and resumes workers).
DeclareGlobalVariable ("TaskManager");
MakeReadWriteGVar("TaskManager");
DeclareGlobalVariable ("mainThreadChannels");
MakeReadWriteGVar("mainThreadChannels");


PrintTaskManStats := function()
  atomic readonly TaskStats do
    Print ("-----------------\n");
    Print ("Task Manager ", processId, ":\n");
    Print ("Tasks stolen : ", TaskStats.tasksStolen, "\n");
    Print ("Tasks executed : ", TaskStats.tasksExecuted, "\n");
    Print ("Tasks offloaded : ", TaskStats.tasksOffloaded, "\n");
    Print ("-----------------\n");
  od;
end;

GetWorkerInputChannel := function (worker)
  while true do
    if IsBound(Tasks.InputChannels[worker+1]) then
      return Tasks.InputChannels[worker+1];
    fi;
  od;
end;

SubObjectRegions := function(obj)
  local result, objs, subobj;
  result := OBJ_SET();
  objs := CLONE_DELIMITED(obj);
  for subobj in objs do
    if IsRegion(subobj) then
      ADD_OBJ_SET(result, subobj);
    fi;
  od;
  return OBJ_SET_VALUES(result);
end;

# Function executed by each worker thread
Tasks.Worker := function(channels)
  local taskdata, result, toUnblock, resume,
        suspend, unSuspend, p, task, i;

  #Tracing.InitWorkerLog();
  Tasks.InputChannels[ThreadID(CurrentThread())+1] := channels.toworker;
  threadId := ThreadID(CurrentThread());

  while true do
    Unbind (task);
    while not IsBound(task) do
      suspend := TryReceiveChannel (Tasks.WorkerSuspensionRequests, fail);
      if not IsIdenticalObj (suspend, fail) then
        #Tracing.TraceWorkerSuspended();
        SendChannel (Tasks.TaskManagerRequests, rec ( worker:= threadId,
                                                               type := TASK_MANAGER_REQUESTS.SUSPEND_ME));

        unSuspend := ReceiveChannel (channels.toworker);
        if unSuspend=TASK_MANAGER_REQUESTS.FINISH then
          #Tracing.Close();
          SendChannel (Tasks.TaskManagerRequests, rec ( worker := CurrentThread(),
                                                                  type := TASK_MANAGER_REQUESTS.FINISH_WORKER));
          return;
        fi;
      fi;


      p := LOCK(TaskPoolData);
      if IsIdenticalObj (p,fail) then
         Error("Failed to obtain lock for TaskPoolData inside Worker function\n");
      fi;
      if TaskPoolData.TaskPoolLen>0 then
        task := TaskPoolData.TaskPool[TaskPoolData.TaskPoolLen];
        Unbind (TaskPoolData.TaskPool[TaskPoolData.TaskPoolLen]);
        TaskPoolData.TaskPoolLen := TaskPoolData.TaskPoolLen-1;
        #Tracing.TraceWorkerGotTask();
        UNLOCK(p);
      else
        UNLOCK(p);
        SendChannel (Tasks.WorkerPool, channels);
        SendChannel (Tasks.TaskManagerRequests, rec ( worker := threadId,
                                                                type := TASK_MANAGER_REQUESTS.NO_WORK));
        resume := ReceiveChannel (channels.toworker);
        if resume=TASK_MANAGER_REQUESTS.FINISH then
          #Tracing.Close();
          SendChannel (Tasks.TaskManagerRequests (rec ( type := TASK_MANAGER_REQUESTS.FINISH_WORKER,
           worker := threadId)));
          return;
        fi;
      fi;
    od;

    atomic task do
      task.started := true;
      for i in [1..Length(task.adopt)] do
        if task.adopt[i] then
          atomic readwrite task.args[i] do
            ADOPT(task.args[i]);
          od;
        fi;
      od;
      taskdata := rec (func := ADOPT(task.func),
                       args := ADOPT(task.args),
                       async := ADOPT(task.async));
      if MPI_DEBUG.TASKS then
        MPILog(MPI_DEBUG_OUTPUT.LOCAL_TASKS, String(HANDLE_OBJ(task)), " EX");
      fi;
    od;


    atomic TaskStats do
      TaskStats.tasksExecuted := TaskStats.tasksExecuted+1;
    od;

    if IsString(taskdata.func) then
      taskdata.func := VALUE_GLOBAL(taskdata.func);
    fi;


    if taskdata.async then
      CALL_WITH_CATCH(taskdata.func, taskdata.args);
    else
      result := CALL_WITH_CATCH(taskdata.func, taskdata.args);
      #Tracing.TraceTaskFinished();
      if Length(result) = 1 or not result[1] then
        if Length(result) > 1 and Tasks.ReportErrors then
          Print("Task Error: ", result[2], "\n");
        fi;
        result := fail;
      else
        result := result[2];
      fi;

      atomic task do
        task.complete := true;
        if IsBound(task.result) and not IsThreadLocal(task.result) then
          p := LOCK(task.result);
        fi;
        if IsBound(task.result) and IsGlobalObjectHandle(task.result) then
          ShareSpecialObj(result);
          task.result!.obj := result;
          #MigrateObj(result,task);
          task.result!.control.haveObject := true;
          ProcessHandleBlockedQueue(task.result, result);
          UNLOCK(p);
        else
          if IsThreadLocal(result) then
            task.result := MigrateObj (result, task);
            task.adopt_result := true;
          else
            task.result := result;
            task.adopt_result := false;
            #SetImportedTaskResult(task,result);
          fi;
        fi;
        #if IsBound(task.waitingOnMe) then
        #  SendChannel (Tasks.TaskManagerRequests, rec (
        #          type := TASK_MANAGER_REQUESTS.TRY_UNBLOCK_TASK,
        #                          worker := threadId,
        #                          tasks := task.waitingOnMe));
        #fi;
        while true do
          toUnblock := TryReceiveChannel (task.blockedWorkers, fail);
          if IsIdenticalObj (toUnblock, fail) then
            break;
          else
            SendChannel (Tasks.TaskManagerRequests, rec ( type := TASK_MANAGER_REQUESTS.RESUME_BLOCKED_WORKER,
                                                                  worker := toUnblock.worker));
          fi;
        od;
      od;
    fi;

  od;
end;

# Function called when worker blocks on the result of a task.
Tasks.BlockWorkerThread := function()
  local resume;
  #Tracing.TraceWorkerBlocked();
  if not IsBound(Tasks.InputChannels[threadId+1]) then
    Tasks.InputChannels[threadId+1] := CreateChannel(1);
  fi;
  SendChannel (Tasks.TaskManagerRequests, rec (worker := threadId, type := TASK_MANAGER_REQUESTS.BLOCK_ME));
  resume := ReceiveChannel (GetWorkerInputChannel(threadId));
  if resume<>TASK_MANAGER_REQUESTS.RESUME_BLOCKED_WORKER then
    Error("Error while worker is waiting to resume\n");
  fi;
  #Tracing.TraceWorkerResumed();
end;

# Starts a new worker (called by task manager).
Tasks.StartNewWorkerThread := function()
  local toworker, fromworker, channels, worker;
  toworker := CreateChannel(1);
  fromworker := CreateChannel(1);
  channels := rec(toworker := toworker, fromworker := fromworker);
  MakeReadOnlyObj(channels);
  worker := CreateThread(Tasks.Worker, channels);
  return worker;
end;

# Tasks.Initialize just fires off the task manager.
Tasks.Initialize := function()
  local i, toworker, fromworker, channels;
  TaskManager := CreateThread(Tasks.TaskManagerFunc);
  MakeReadOnlyGVar ("TaskManager");
  mainThreadChannels := rec ( toworker := CreateChannel(1),
                              fromworker := CreateChannel(1));
  MakeReadOnlyGVar("mainThreadChannels");
  Tasks.InputChannels[1] := mainThreadChannels.toworker;
  threadId := 0;
end;

# Creates a task without binding it to a worker
CreateTask := function(arglist)
  local i, channels, task, request, args, adopt, adopted, ds,p,
        addToTaskPool, q;
  args := arglist{[2..Length(arglist)]};
  adopt := AtomicList([]);
  adopted := false;
  for i in [1..Length(args)] do
    if IsThreadLocal(args[i]) then
      adopt[i] := true;
      if not adopted then
        args[i] := ShareSpecialObj(CLONE_REACHABLE(args[i]));
        ds := RegionOf(args[i]);
        p := LOCK(args[i]);
        adopted := true;
      else
        args[i] := MIGRATE(CLONE_REACHABLE(args[i]), ds);
      fi;
    else
      adopt[i] := false;
    fi;
  od;
  if adopted then
    UNLOCK(p);
  fi;
  task :=  ShareSpecialObj (rec (
                   func := arglist[1],
                   args := args,
                   adopt := adopt,
                   async := false,
                   complete := false,
                   started := false,
                   async := false,
                   offloaded := false,
                   blockedWorkers := CreateChannel(),
                   waitingOnMe := ShareSpecialObj([]),
                   ));
  atomic TaskStats do
    TaskStats.tasksCreated := TaskStats.tasksCreated+1;
  od;
  return task;
end;

# Gracefully kill idle tasks.
CullIdleTasks := function()
  local ch, channels;
  channels := MultiReceiveChannel(Tasks.WorkerPool, 1024);
  for ch in channels do
    SendChannel(ch.toworker, TASK_MANAGER_REQUESTS.FINISH);
    WaitThread(ReceiveChannel(ch.fromworker));
  od;

  SendChannel (Tasks.TaskManagerRequests, rec ( type := TASK_MANAGER_REQUESTS.CULL_IDLE_WORKERS,
                                                        worker := threadId));
end;

ExecuteTask:= atomic function(readwrite task)
  local channels, t, taskdata, worker, tracingTime;

  task.started := true;
  task.complete := false;
  atomic readwrite TaskPoolData do
    TaskPoolData.TaskPoolLen := TaskPoolData.TaskPoolLen+1;
    TaskPoolData.TaskPool[TaskPoolData.TaskPoolLen] := task;
  od;
 # Tracing.TraceTaskCreated();
  if (Tasks.FirstTask) then
    Tasks.FirstTask := false;
    SendChannel (Tasks.TaskManagerRequests, rec (type := TASK_MANAGER_REQUESTS.START_WORKERS,
                                                 noWorkers := Tasks.Initial,
                                                 worker := 0)); # worker id is irrelevant in this case
  fi;
  worker := TryReceiveChannel (Tasks.WorkerPool, fail);
  if IsNotIdenticalObj (worker, fail) then
    SendChannel (worker.toworker, TASK_MANAGER_REQUESTS.RESUME_IDLE_WORKER);
  fi;
  return task;
end;

RunTask:= function(arg)
  local task;
  task := CreateTask(arg);
  ExecuteTask(task);
  return task;
end;

RunAsyncTask := function(arg)
  local task;
  task := Tasks.CreateTask(arg);
  atomic task do
    task.async := true;
  od;
  ExecuteTask(task);
  return task;
end;

MakeTaskAsync := function(task)
  if task.started then
    Error("Cannot make a running task asynchronous");
  fi;
  task.async := true;
end;

ImmediateTask := function(arg)
  local result, task;
  result := CALL_WITH_CATCH(arg[1], arg{[2..Length(arg)]});
  if Length(result) = 1 or not result[1] then
    if Length(result) > 1 and Tasks.ReportErrors then
      Print("Task Error: ", result[2], "\n");
    fi;
    result := fail;
  else
    result := result[2];
  fi;
  task := ShareSpecialObj (rec( started := true, complete := true, async := false,
                  result := result, adopt_result := false ));
  return task;
end;

DelayTask := function(arg)
  local task;
  return Tasks.CreateTask(arg);
end;

WaitTask := function(arg)
  local task, taskresult, i, p;

  atomic readonly arg[1] do
    if Length(arg) = 1 and IsList(arg[1]) then
      arg := arg[1];
    fi;
  od;

  for task in arg do
    atomic readonly task do
      if task.async then
        Error("Cannot wait for an asynchronous task");
      fi;
    od;
  od;
  for task in arg do
    atomic readonly task do
      if (not task.complete) and (not task.started) and (not task.offloaded) then
        ExecuteTask(task);
      fi;
    od;
  od;
  for task in arg do
    p := LOCK(task, false);
    if not task.complete then
      SendChannel (task.blockedWorkers, rec (type := BLOCK_TYPES.BLOCKED_WORKER, worker := threadId));
      UNLOCK(p);
      Tasks.BlockWorkerThread();
      atomic task do
        if task.offloaded then
          atomic readwrite task.result do
            Open(task.result);
          od;
          task.result := GetHandleObj(task.result);
        fi;
      od;
    else
      UNLOCK(p);
    fi;
  od;
end;

WaitTasks := WaitTask;

WaitAnyTask := function(arg)
  local i, len, task, taskresult, channels, ch;

  atomic arg[1] do
    if Length(arg) = 1 and IsList(arg[1]) then
      arg := arg[1];
    fi;
  od;

  len := Length(arg);

  for task in arg do
    LOCK (task, false);
    if task.async then
      UNLOCK(task);
      Error("Cannot wait for an async task");
    fi;
    if not task.started then
      UNLOCK(task);
      ExecuteTask(task);
    fi;
  od;

  while true do
    for i in [1..len] do
      atomic readonly arg[i].complete do
        if arg[i].complete then
          return i;
        fi;
      od;
    od;
  od;

end;

TaskResult := function(task)
  local taskresult, toExecute, toWait, toFetch;
  toExecute := false;
  toWait := false;
  toFetch := false;
  atomic readonly task do
    if task.async then
      Error("Cannot obtain the result of an asynchronous task");
    fi;
    if task.offloaded then
      toFetch := true;
    elif not task.started then
      toExecute := true;
    elif not task.complete then
      toWait := true;
    fi;
  od;
  if toExecute then
    ExecuteTask(task);
  elif toWait then
    WaitTask(task);
  elif toFetch then
    atomic readwrite task do
      atomic readwrite task.result do
        Open(task.result);
      od;
      task.result := GetHandleObj(task.result);
    od;
  fi;
  atomic readonly task do
    if task.adopt_result then
      taskresult :=  CLONE_REACHABLE(task.result);
    else
      taskresult := task.result;
    fi;
  od;
  return taskresult;
end;

TaskStarted := atomic function(readonly task)
  return task.started;
end;

TaskFinished := atomic function(readonly task)
  return task.complete;
end;

TaskIsAsync := function(task)
  return task.async;
end;

## task milestones
#ScheduleTask := function (arg)
#  local task, waitTask;
#  if IsFunction(arg[1]) then
#    task := RunTask(arg);
#  else
#    task := Tasks.CreateTask(arg{[2..Length(arg)]});
#    atomic readwrite task do
#      task.started := true;
#      task.condCount := 0;
#      for waitTask in arg[1] do
#        atomic readonly waitTask do
#          atomic readwrite waitTask.waitingOnMe do
#            if waitTask.complete = false then
#              Add(waitTask.waitingOnMe, task);
#              task.condCount := task.condCount+1;
#            fi;
#          od;
#        od;
#      od;
#      if task.condCount = 0 then
#        ExecuteTask(task);
#      fi;
#    od;
#  fi;
#  return task;
#end;

#NewMilestone := function(contributions)
#  local c;
#  c := Set(contributions);
#  return ShareSpecialObj(rec(
#             achieved := Set([]),
#             targets := Immutable(c),
#             complete := Length(c) = 0,
#             waitingOnMe := ShareSpecialObj([]),
#             ));
#end;

#ContributeToMilestone := function(milestone, contribution)
#  local trigger, notify, t;
#  atomic milestone do
#    if not contribution in milestone.targets then
#      Error("ContributeToMilestone: Milestone does not have such a contribution");
#    fi;
#    if milestone.complete then
#      return;
#    fi;
#    AddSet(milestone.achieved, Immutable(contribution));
#    if Length(milestone.achieved) < Length(milestone.targets) then
#      return;
#    fi;
#    milestone.complete := true;
#    atomic readonly milestone.waitingOnMe do
#      SendChannel (Tasks.TaskManagerRequests, rec (
#              type := TASK_MANAGER_REQUESTS.TRY_UNBLOCK_TASK,
#              worker := threadId,
#              tasks := milestone.waitingOnMe ));
#    od;
#  od;
#end;

# Function executed by a task manager
Tasks.TaskManagerFunc := function()
  local i, worker,  request, requestType,
        taskToFinish, toUnblock,
        totalTasks, blockedWorkersChannel, taskMap,
        toResume, taskman, target, finishing, finishingState, t;

  finishing := false;
  taskman := rec (startedWorkers := 0,
                  activeWorkers:= 0,
                  blockedWorkers:=0,
                  suspendedWorkers:=0,
                  suspendedWorkersList:=[],
                  allWorkers:=[],
                  stealing:=false,
                  stealRequests:=0);
#                  nextStealingTime:=MicroSeconds());

  while true do
    if finishing and finishingState.myWorkersToFinish=0 then
      PrintTaskManStats();
      return;
    fi;
    if Tasks.doStealing and taskman.stealRequests>0 and (not taskman.stealing) and (not finishing) then
      SendSteal();
      taskman.stealing := true;
    fi;
    request := ReceiveChannel (Tasks.TaskManagerRequests);

    worker := request.worker;
    requestType := request.type;
    if requestType = TASK_MANAGER_REQUESTS.START_WORKERS then
      if taskman.startedWorkers = 0 then
        for i in [1..request.noWorkers] do
          worker := Tasks.StartNewWorkerThread();
          taskman.activeWorkers := taskman.activeWorkers+1;
          taskman.startedWorkers := taskman.startedWorkers+1;
        od;
      fi;
    elif requestType = TASK_MANAGER_REQUESTS.BLOCK_ME then # request to block a worker
      taskman.activeWorkers := taskman.activeWorkers-1;
      taskman.blockedWorkers := taskman.blockedWorkers+1;
      if taskman.activeWorkers<Tasks.Initial then
        if taskman.suspendedWorkers>0 then
          toResume := Remove (taskman.suspendedWorkersList);
          taskman.suspendedWorkers := taskman.suspendedWorkers-1;
          SendChannel (GetWorkerInputChannel(toResume), TASK_MANAGER_REQUESTS.RESUME_SUSPENDED_WORKER);
        else
          worker := Tasks.StartNewWorkerThread();
          Add (taskman.allWorkers, ThreadID(worker));
        fi;
        taskman.activeWorkers := taskman.activeWorkers+1;
      fi;
    elif requestType = TASK_MANAGER_REQUESTS.RESUME_BLOCKED_WORKER then # request to unblock a worker
      if worker<>0 then
      taskman.blockedWorkers := taskman.blockedWorkers-1;
      if taskman.activeWorkers>Tasks.Initial then
          SendChannel (Tasks.WorkerSuspensionRequests, true);
        fi;
      fi;
      SendChannel (GetWorkerInputChannel(worker), TASK_MANAGER_REQUESTS.RESUME_BLOCKED_WORKER);
 #   elif requestType = TASK_MANAGER_REQUESTS.TRY_UNBLOCK_TASK then
 #     atomic readonly request.tasks do
 #       for t in request.tasks do
 #         atomic readwrite t do
 #           t.condCount := t.condCount-1;
 #           if t.condCount = 0 then
 #             ExecuteTask(t);
 #           fi;
 #         od;
 #       od;
 #     od;
    elif requestType = TASK_MANAGER_REQUESTS.SUSPEND_ME then
      taskman.activeWorkers := taskman.activeWorkers-1;
      if taskman.suspendedWorkers>Tasks.Initial then
        SendChannel (GetWorkerInputChannel(worker), TASK_MANAGER_REQUESTS.FINISH);
      else
        taskman.suspendedWorkers := taskman.suspendedWorkers+1;
        Add (taskman.suspendedWorkersList, worker);
      fi;
    elif requestType = TASK_MANAGER_REQUESTS.CULL_IDLE_WORKERS then
      while not IsEmpty(taskman.suspendedWorkersList) do
        worker := Remove(taskman.suspendedWorkersList);
        taskman.suspendedWorkers := taskman.suspendedWorkers-1;
        SendChannel (GetWorkerInputChannel(worker), TASK_MANAGER_REQUESTS.FINISH);
      od;
    elif requestType = TASK_MANAGER_REQUESTS.NO_WORK then
      taskman.stealRequests := taskman.stealRequests+1;
    elif requestType = TASK_MANAGER_REQUESTS.UNSUCC_STEAL then
      taskman.stealing := false;
    elif requestType = TASK_MANAGER_REQUESTS.GOT_TASK then
      #if taskman.stealRequests>0 then taskman.stealRequests := taskman.stealRequests-1; fi;
      taskman.stealRequests := 0;
      taskman.stealing := false;
    elif requestType = TASK_MANAGER_REQUESTS.FINISH_WORKER then
      WaitThread(worker);
      if finishing then
        finishingState.myWorkersToFinish := finishingState.myWorkersToFinish-1;
      fi;
    elif requestType = TASK_MANAGER_REQUESTS.FINISH then
      finishingState := rec ( myWorkersToFinish := taskman.activeWorkers + taskman.suspendedWorkers );
      if IsBound(MPI_Initialized) and IsBound(MPI_Comm_rank) then
        if processId=0 then
          SendMessage (processId, MESSAGE_TYPES.FINISH);
        fi;
      fi;
      for i in taskman.allWorkers do
        SendChannel (GetWorkerInputChannel(i), TASK_MANAGER_REQUESTS.FINISH);
      od;
      finishing := true;
    else
      Error("Task manager on ", processId, " received unknown request!\n");
    fi;
  od;
end;

Tasks.Initialize();

[ Dauer der Verarbeitung: 0.33 Sekunden  (vorverarbeitet)  ]