Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  dist_tasks.g   Sprache: unbekannt

 
#############################################################################
##
##  This file is part of GAP, a system for computational discrete algebra.
##
##  Copyright of GAP belongs to its developers, whose names are too numerous
##  to list here. Please refer to the COPYRIGHT file for details.
##
##  SPDX-License-Identifier: GPL-2.0-or-later
##

TaskStats := ShareObj ( rec (tasksCreated := 0,
                     tasksStolen := 0,
                     tasksExecuted := 0,
                     tasksOffloaded := 0));

BindGlobal ("BLOCK_TYPES", MakeReadOnlyObj( rec (
        BLOCKED_FETCH := 1,
        BLOCKED_WORKER := 2 )));

BindGlobal ("TASK_MANAGER_REQUESTS", MakeReadOnlyObj (rec (
        BLOCK_ME := 1,
        RESUME_IDLE_WORKER := 2,
        RESUME_BLOCKED_WORKER := 3,
        RESUME_SUSPENDED_WORKER := 4,
        SUSPEND_ME := 5,
        FINISH := 6,
        CULL_IDLE_WORKERS := 7,
        FINISH_WORKER := 8,
                            START_WORKERS := 9,
                            STEAL := 10,
                            NO_WORK := 11,
                            UNSUCC_STEAL := 12,
                            GOT_TASK := 13)));
#        TRY_UNBLOCK_TASK := 10,


DeclareGlobalFunction("ProcessHandleBlockedQueue");
DeclareGlobalFunction("SendSteal");

Tasks := AtomicRecord( rec ( Initial := GAPInfo.KernelInfo.NUM_CPUS ,    # initial number of worker threads
                 ReportErrors := true,
                 FirstTask := true,
                 WorkerPool := CreateChannel(),                # pool of idle workers
                 TaskManagerRequests := CreateChannel(),       # task manager requests
                 WorkerSuspensionRequests := CreateChannel(),  # suspend requests from task manager
                 InputChannels := AtomicList ([]),             # list of worker input channels
                 doStealing := false,
                 stealingStopped := false));



TaskPoolData := ShareSpecialObj( rec(
                        TaskPool := [],                               # task pool (list)
                        TaskPoolLen := 0 ));         # length of a task pool

MakeThreadLocal("threadId");

#ReadLib ("logging.g");

TaskStats := ShareSpecialObj ( rec (tasksCreated := 0,
 tasksStolen := 0,
 tasksExecuted := 0,
  tasksOffloaded := 0));

# Task manager is a special thread that supervises the workers
# (starts, blocks, suspends and resumes workers).
DeclareGlobalVariable ("TaskManager");
MakeReadWriteGVar("TaskManager");
DeclareGlobalVariable ("mainThreadChannels");
MakeReadWriteGVar("mainThreadChannels");


PrintTaskManStats := function()
  atomic readonly TaskStats do
    Print ("-----------------\n");
    Print ("Task Manager ", processId, ":\n");
    Print ("Tasks stolen : ", TaskStats.tasksStolen, "\n");
    Print ("Tasks executed : ", TaskStats.tasksExecuted, "\n");
    Print ("Tasks offloaded : ", TaskStats.tasksOffloaded, "\n");
    Print ("-----------------\n");
  od;
end;

GetWorkerInputChannel := function (worker)
  while true do
    if IsBound(Tasks.InputChannels[worker+1]) then
      return Tasks.InputChannels[worker+1];
    fi;
  od;
end;

SubObjectRegions := function(obj)
  local result, objs, subobj;
  result := OBJ_SET();
  objs := CLONE_DELIMITED(obj);
  for subobj in objs do
    if IsRegion(subobj) then
      ADD_OBJ_SET(result, subobj);
    fi;
  od;
  return OBJ_SET_VALUES(result);
end;

# Function executed by each worker thread
Tasks.Worker := function(channels)
  local taskdata, result, toUnblock, resume,
        suspend, unSuspend, p, task, i;

  #Tracing.InitWorkerLog();
  Tasks.InputChannels[ThreadID(CurrentThread())+1] := channels.toworker;
  threadId := ThreadID(CurrentThread());

  while true do
    Unbind (task);
    while not IsBound(task) do
      suspend := TryReceiveChannel (Tasks.WorkerSuspensionRequests, fail);
      if not IsIdenticalObj (suspend, fail) then
        #Tracing.TraceWorkerSuspended();
        SendChannel (Tasks.TaskManagerRequests, rec ( worker:= threadId,
                                                               type := TASK_MANAGER_REQUESTS.SUSPEND_ME));

        unSuspend := ReceiveChannel (channels.toworker);
        if unSuspend=TASK_MANAGER_REQUESTS.FINISH then
          #Tracing.Close();
          SendChannel (Tasks.TaskManagerRequests, rec ( worker := CurrentThread(),
                                                                  type := TASK_MANAGER_REQUESTS.FINISH_WORKER));
          return;
        fi;
      fi;


      p := LOCK(TaskPoolData);
      if IsIdenticalObj (p,fail) then
         Error("Failed to obtain lock for TaskPoolData inside Worker function\n");
      fi;
      if TaskPoolData.TaskPoolLen>0 then
        task := TaskPoolData.TaskPool[TaskPoolData.TaskPoolLen];
        Unbind (TaskPoolData.TaskPool[TaskPoolData.TaskPoolLen]);
        TaskPoolData.TaskPoolLen := TaskPoolData.TaskPoolLen-1;
        #Tracing.TraceWorkerGotTask();
        UNLOCK(p);
      else
        UNLOCK(p);
        SendChannel (Tasks.WorkerPool, channels);
        SendChannel (Tasks.TaskManagerRequests, rec ( worker := threadId,
                                                                type := TASK_MANAGER_REQUESTS.NO_WORK));
        resume := ReceiveChannel (channels.toworker);
        if resume=TASK_MANAGER_REQUESTS.FINISH then
          #Tracing.Close();
          SendChannel (Tasks.TaskManagerRequests (rec ( type := TASK_MANAGER_REQUESTS.FINISH_WORKER,
           worker := threadId)));
          return;
        fi;
      fi;
    od;

    atomic task do
      task.started := true;
      for i in [1..Length(task.adopt)] do
        if task.adopt[i] then
          atomic readwrite task.args[i] do
            ADOPT(task.args[i]);
          od;
        fi;
      od;
      taskdata := rec (func := ADOPT(task.func),
                       args := ADOPT(task.args),
                       async := ADOPT(task.async));
      if MPI_DEBUG.TASKS then
        MPILog(MPI_DEBUG_OUTPUT.LOCAL_TASKS, String(HANDLE_OBJ(task)), " EX");
      fi;
    od;


    atomic TaskStats do
      TaskStats.tasksExecuted := TaskStats.tasksExecuted+1;
    od;

    if IsString(taskdata.func) then
      taskdata.func := VALUE_GLOBAL(taskdata.func);
    fi;


    if taskdata.async then
      CALL_WITH_CATCH(taskdata.func, taskdata.args);
    else
      result := CALL_WITH_CATCH(taskdata.func, taskdata.args);
      #Tracing.TraceTaskFinished();
      if Length(result) = 1 or not result[1] then
        if Length(result) > 1 and Tasks.ReportErrors then
          Print("Task Error: ", result[2], "\n");
        fi;
        result := fail;
      else
        result := result[2];
      fi;

      atomic task do
        task.complete := true;
        if IsBound(task.result) and not IsThreadLocal(task.result) then
          p := LOCK(task.result);
        fi;
        if IsBound(task.result) and IsGlobalObjectHandle(task.result) then
          ShareSpecialObj(result);
          task.result!.obj := result;
          #MigrateObj(result,task);
          task.result!.control.haveObject := true;
          ProcessHandleBlockedQueue(task.result, result);
          UNLOCK(p);
        else
          if IsThreadLocal(result) then
            task.result := MigrateObj (result, task);
            task.adopt_result := true;
          else
            task.result := result;
            task.adopt_result := false;
            #SetImportedTaskResult(task,result);
          fi;
        fi;
        #if IsBound(task.waitingOnMe) then
        #  SendChannel (Tasks.TaskManagerRequests, rec (
        #          type := TASK_MANAGER_REQUESTS.TRY_UNBLOCK_TASK,
        #                          worker := threadId,
        #                          tasks := task.waitingOnMe));
        #fi;
        while true do
          toUnblock := TryReceiveChannel (task.blockedWorkers, fail);
          if IsIdenticalObj (toUnblock, fail) then
            break;
          else
            SendChannel (Tasks.TaskManagerRequests, rec ( type := TASK_MANAGER_REQUESTS.RESUME_BLOCKED_WORKER,
                                                                  worker := toUnblock.worker));
          fi;
        od;
      od;
    fi;

  od;
end;

# Function called when worker blocks on the result of a task.
Tasks.BlockWorkerThread := function()
  local resume;
  #Tracing.TraceWorkerBlocked();
  if not IsBound(Tasks.InputChannels[threadId+1]) then
    Tasks.InputChannels[threadId+1] := CreateChannel(1);
  fi;
  SendChannel (Tasks.TaskManagerRequests, rec (worker := threadId, type := TASK_MANAGER_REQUESTS.BLOCK_ME));
  resume := ReceiveChannel (GetWorkerInputChannel(threadId));
  if resume<>TASK_MANAGER_REQUESTS.RESUME_BLOCKED_WORKER then
    Error("Error while worker is waiting to resume\n");
  fi;
  #Tracing.TraceWorkerResumed();
end;

# Starts a new worker (called by task manager).
Tasks.StartNewWorkerThread := function()
  local toworker, fromworker, channels, worker;
  toworker := CreateChannel(1);
  fromworker := CreateChannel(1);
  channels := rec(toworker := toworker, fromworker := fromworker);
  MakeReadOnlyObj(channels);
  worker := CreateThread(Tasks.Worker, channels);
  return worker;
end;

# Tasks.Initialize just fires off the task manager.
Tasks.Initialize := function()
  local i, toworker, fromworker, channels;
  TaskManager := CreateThread(Tasks.TaskManagerFunc);
  MakeReadOnlyGVar ("TaskManager");
  mainThreadChannels := rec ( toworker := CreateChannel(1),
                              fromworker := CreateChannel(1));
  MakeReadOnlyGVar("mainThreadChannels");
  Tasks.InputChannels[1] := mainThreadChannels.toworker;
  threadId := 0;
end;

# Creates a task without binding it to a worker
CreateTask := function(arglist)
  local i, channels, task, request, args, adopt, adopted, ds,p,
        addToTaskPool, q;
  args := arglist{[2..Length(arglist)]};
  adopt := AtomicList([]);
  adopted := false;
  for i in [1..Length(args)] do
    if IsThreadLocal(args[i]) then
      adopt[i] := true;
      if not adopted then
        args[i] := ShareSpecialObj(CLONE_REACHABLE(args[i]));
        ds := RegionOf(args[i]);
        p := LOCK(args[i]);
        adopted := true;
      else
        args[i] := MIGRATE(CLONE_REACHABLE(args[i]), ds);
      fi;
    else
      adopt[i] := false;
    fi;
  od;
  if adopted then
    UNLOCK(p);
  fi;
  task :=  ShareSpecialObj (rec (
                   func := arglist[1],
                   args := args,
                   adopt := adopt,
                   async := false,
                   complete := false,
                   started := false,
                   async := false,
                   offloaded := false,
                   blockedWorkers := CreateChannel(),
                   waitingOnMe := ShareSpecialObj([]),
                   ));
  atomic TaskStats do
    TaskStats.tasksCreated := TaskStats.tasksCreated+1;
  od;
  return task;
end;

# Gracefully kill idle tasks.
CullIdleTasks := function()
  local ch, channels;
  channels := MultiReceiveChannel(Tasks.WorkerPool, 1024);
  for ch in channels do
    SendChannel(ch.toworker, TASK_MANAGER_REQUESTS.FINISH);
    WaitThread(ReceiveChannel(ch.fromworker));
  od;

  SendChannel (Tasks.TaskManagerRequests, rec ( type := TASK_MANAGER_REQUESTS.CULL_IDLE_WORKERS,
                                                        worker := threadId));
end;

ExecuteTask:= atomic function(readwrite task)
  local channels, t, taskdata, worker, tracingTime;

  task.started := true;
  task.complete := false;
  atomic readwrite TaskPoolData do
    TaskPoolData.TaskPoolLen := TaskPoolData.TaskPoolLen+1;
    TaskPoolData.TaskPool[TaskPoolData.TaskPoolLen] := task;
  od;
 # Tracing.TraceTaskCreated();
  if (Tasks.FirstTask) then
    Tasks.FirstTask := false;
    SendChannel (Tasks.TaskManagerRequests, rec (type := TASK_MANAGER_REQUESTS.START_WORKERS,
                                                 noWorkers := Tasks.Initial,
                                                 worker := 0)); # worker id is irrelevant in this case
  fi;
  worker := TryReceiveChannel (Tasks.WorkerPool, fail);
  if IsNotIdenticalObj (worker, fail) then
    SendChannel (worker.toworker, TASK_MANAGER_REQUESTS.RESUME_IDLE_WORKER);
  fi;
  return task;
end;

RunTask:= function(arg)
  local task;
  task := CreateTask(arg);
  ExecuteTask(task);
  return task;
end;

RunAsyncTask := function(arg)
  local task;
  task := Tasks.CreateTask(arg);
  atomic task do
    task.async := true;
  od;
  ExecuteTask(task);
  return task;
end;

MakeTaskAsync := function(task)
  if task.started then
    Error("Cannot make a running task asynchronous");
  fi;
  task.async := true;
end;

ImmediateTask := function(arg)
  local result, task;
  result := CALL_WITH_CATCH(arg[1], arg{[2..Length(arg)]});
  if Length(result) = 1 or not result[1] then
    if Length(result) > 1 and Tasks.ReportErrors then
      Print("Task Error: ", result[2], "\n");
    fi;
    result := fail;
  else
    result := result[2];
  fi;
  task := ShareSpecialObj (rec( started := true, complete := true, async := false,
                  result := result, adopt_result := false ));
  return task;
end;

DelayTask := function(arg)
  local task;
  return Tasks.CreateTask(arg);
end;

WaitTask := function(arg)
  local task, taskresult, i, p;

  atomic readonly arg[1] do
    if Length(arg) = 1 and IsList(arg[1]) then
      arg := arg[1];
    fi;
  od;

  for task in arg do
    atomic readonly task do
      if task.async then
        Error("Cannot wait for an asynchronous task");
      fi;
    od;
  od;
  for task in arg do
    atomic readonly task do
      if (not task.complete) and (not task.started) and (not task.offloaded) then
        ExecuteTask(task);
      fi;
    od;
  od;
  for task in arg do
    p := LOCK(task, false);
    if not task.complete then
      SendChannel (task.blockedWorkers, rec (type := BLOCK_TYPES.BLOCKED_WORKER, worker := threadId));
      UNLOCK(p);
      Tasks.BlockWorkerThread();
      atomic task do
        if task.offloaded then
          atomic readwrite task.result do
            Open(task.result);
          od;
          task.result := GetHandleObj(task.result);
        fi;
      od;
    else
      UNLOCK(p);
    fi;
  od;
end;

WaitTasks := WaitTask;

WaitAnyTask := function(arg)
  local i, len, task, taskresult, channels, ch;

  atomic arg[1] do
    if Length(arg) = 1 and IsList(arg[1]) then
      arg := arg[1];
    fi;
  od;

  len := Length(arg);

  for task in arg do
    LOCK (task, false);
    if task.async then
      UNLOCK(task);
      Error("Cannot wait for an async task");
    fi;
    if not task.started then
      UNLOCK(task);
      ExecuteTask(task);
    fi;
  od;

  while true do
    for i in [1..len] do
      atomic readonly arg[i].complete do
        if arg[i].complete then
          return i;
        fi;
      od;
    od;
  od;

end;

TaskResult := function(task)
  local taskresult, toExecute, toWait, toFetch;
  toExecute := false;
  toWait := false;
  toFetch := false;
  atomic readonly task do
    if task.async then
      Error("Cannot obtain the result of an asynchronous task");
    fi;
    if task.offloaded then
      toFetch := true;
    elif not task.started then
      toExecute := true;
    elif not task.complete then
      toWait := true;
    fi;
  od;
  if toExecute then
    ExecuteTask(task);
  elif toWait then
    WaitTask(task);
  elif toFetch then
    atomic readwrite task do
      atomic readwrite task.result do
        Open(task.result);
      od;
      task.result := GetHandleObj(task.result);
    od;
  fi;
  atomic readonly task do
    if task.adopt_result then
      taskresult :=  CLONE_REACHABLE(task.result);
    else
      taskresult := task.result;
    fi;
  od;
  return taskresult;
end;

TaskStarted := atomic function(readonly task)
  return task.started;
end;

TaskFinished := atomic function(readonly task)
  return task.complete;
end;

TaskIsAsync := function(task)
  return task.async;
end;

## task milestones
#ScheduleTask := function (arg)
#  local task, waitTask;
#  if IsFunction(arg[1]) then
#    task := RunTask(arg);
#  else
#    task := Tasks.CreateTask(arg{[2..Length(arg)]});
#    atomic readwrite task do
#      task.started := true;
#      task.condCount := 0;
#      for waitTask in arg[1] do
#        atomic readonly waitTask do
#          atomic readwrite waitTask.waitingOnMe do
#            if waitTask.complete = false then
#              Add(waitTask.waitingOnMe, task);
#              task.condCount := task.condCount+1;
#            fi;
#          od;
#        od;
#      od;
#      if task.condCount = 0 then
#        ExecuteTask(task);
#      fi;
#    od;
#  fi;
#  return task;
#end;

#NewMilestone := function(contributions)
#  local c;
#  c := Set(contributions);
#  return ShareSpecialObj(rec(
#             achieved := Set([]),
#             targets := Immutable(c),
#             complete := Length(c) = 0,
#             waitingOnMe := ShareSpecialObj([]),
#             ));
#end;

#ContributeToMilestone := function(milestone, contribution)
#  local trigger, notify, t;
#  atomic milestone do
#    if not contribution in milestone.targets then
#      Error("ContributeToMilestone: Milestone does not have such a contribution");
#    fi;
#    if milestone.complete then
#      return;
#    fi;
#    AddSet(milestone.achieved, Immutable(contribution));
#    if Length(milestone.achieved) < Length(milestone.targets) then
#      return;
#    fi;
#    milestone.complete := true;
#    atomic readonly milestone.waitingOnMe do
#      SendChannel (Tasks.TaskManagerRequests, rec (
#              type := TASK_MANAGER_REQUESTS.TRY_UNBLOCK_TASK,
#              worker := threadId,
#              tasks := milestone.waitingOnMe ));
#    od;
#  od;
#end;

# Function executed by a task manager
Tasks.TaskManagerFunc := function()
  local i, worker,  request, requestType,
        taskToFinish, toUnblock,
        totalTasks, blockedWorkersChannel, taskMap,
        toResume, taskman, target, finishing, finishingState, t;

  finishing := false;
  taskman := rec (startedWorkers := 0,
                  activeWorkers:= 0,
                  blockedWorkers:=0,
                  suspendedWorkers:=0,
                  suspendedWorkersList:=[],
                  allWorkers:=[],
                  stealing:=false,
                  stealRequests:=0);
#                  nextStealingTime:=MicroSeconds());

  while true do
    if finishing and finishingState.myWorkersToFinish=0 then
      PrintTaskManStats();
      return;
    fi;
    if Tasks.doStealing and taskman.stealRequests>0 and (not taskman.stealing) and (not finishing) then
      SendSteal();
      taskman.stealing := true;
    fi;
    request := ReceiveChannel (Tasks.TaskManagerRequests);

    worker := request.worker;
    requestType := request.type;
    if requestType = TASK_MANAGER_REQUESTS.START_WORKERS then
      if taskman.startedWorkers = 0 then
        for i in [1..request.noWorkers] do
          worker := Tasks.StartNewWorkerThread();
          taskman.activeWorkers := taskman.activeWorkers+1;
          taskman.startedWorkers := taskman.startedWorkers+1;
        od;
      fi;
    elif requestType = TASK_MANAGER_REQUESTS.BLOCK_ME then # request to block a worker
      taskman.activeWorkers := taskman.activeWorkers-1;
      taskman.blockedWorkers := taskman.blockedWorkers+1;
      if taskman.activeWorkers<Tasks.Initial then
        if taskman.suspendedWorkers>0 then
          toResume := Remove (taskman.suspendedWorkersList);
          taskman.suspendedWorkers := taskman.suspendedWorkers-1;
          SendChannel (GetWorkerInputChannel(toResume), TASK_MANAGER_REQUESTS.RESUME_SUSPENDED_WORKER);
        else
          worker := Tasks.StartNewWorkerThread();
          Add (taskman.allWorkers, ThreadID(worker));
        fi;
        taskman.activeWorkers := taskman.activeWorkers+1;
      fi;
    elif requestType = TASK_MANAGER_REQUESTS.RESUME_BLOCKED_WORKER then # request to unblock a worker
      if worker<>0 then
      taskman.blockedWorkers := taskman.blockedWorkers-1;
      if taskman.activeWorkers>Tasks.Initial then
          SendChannel (Tasks.WorkerSuspensionRequests, true);
        fi;
      fi;
      SendChannel (GetWorkerInputChannel(worker), TASK_MANAGER_REQUESTS.RESUME_BLOCKED_WORKER);
 #   elif requestType = TASK_MANAGER_REQUESTS.TRY_UNBLOCK_TASK then
 #     atomic readonly request.tasks do
 #       for t in request.tasks do
 #         atomic readwrite t do
 #           t.condCount := t.condCount-1;
 #           if t.condCount = 0 then
 #             ExecuteTask(t);
 #           fi;
 #         od;
 #       od;
 #     od;
    elif requestType = TASK_MANAGER_REQUESTS.SUSPEND_ME then
      taskman.activeWorkers := taskman.activeWorkers-1;
      if taskman.suspendedWorkers>Tasks.Initial then
        SendChannel (GetWorkerInputChannel(worker), TASK_MANAGER_REQUESTS.FINISH);
      else
        taskman.suspendedWorkers := taskman.suspendedWorkers+1;
        Add (taskman.suspendedWorkersList, worker);
      fi;
    elif requestType = TASK_MANAGER_REQUESTS.CULL_IDLE_WORKERS then
      while not IsEmpty(taskman.suspendedWorkersList) do
        worker := Remove(taskman.suspendedWorkersList);
        taskman.suspendedWorkers := taskman.suspendedWorkers-1;
        SendChannel (GetWorkerInputChannel(worker), TASK_MANAGER_REQUESTS.FINISH);
      od;
    elif requestType = TASK_MANAGER_REQUESTS.NO_WORK then
      taskman.stealRequests := taskman.stealRequests+1;
    elif requestType = TASK_MANAGER_REQUESTS.UNSUCC_STEAL then
      taskman.stealing := false;
    elif requestType = TASK_MANAGER_REQUESTS.GOT_TASK then
      #if taskman.stealRequests>0 then taskman.stealRequests := taskman.stealRequests-1; fi;
      taskman.stealRequests := 0;
      taskman.stealing := false;
    elif requestType = TASK_MANAGER_REQUESTS.FINISH_WORKER then
      WaitThread(worker);
      if finishing then
        finishingState.myWorkersToFinish := finishingState.myWorkersToFinish-1;
      fi;
    elif requestType = TASK_MANAGER_REQUESTS.FINISH then
      finishingState := rec ( myWorkersToFinish := taskman.activeWorkers + taskman.suspendedWorkers );
      if IsBound(MPI_Initialized) and IsBound(MPI_Comm_rank) then
        if processId=0 then
          SendMessage (processId, MESSAGE_TYPES.FINISH);
        fi;
      fi;
      for i in taskman.allWorkers do
        SendChannel (GetWorkerInputChannel(i), TASK_MANAGER_REQUESTS.FINISH);
      od;
      finishing := true;
    else
      Error("Task manager on ", processId, " received unknown request!\n");
    fi;
  od;
end;

Tasks.Initialize();

[ Dauer der Verarbeitung: 0.34 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge