Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/GAP/hpcgap/lib/distributed/   (Algebra von RWTH Aachen Version 4.15.1©)  Datei vom 18.9.2025 mit Größe 4 kB image not shown  

Quelle  messageman.g   Sprache: unbekannt

 
#############################################################################
##
##  This file is part of GAP, a system for computational discrete algebra.
##
##  Copyright of GAP belongs to its developers, whose names are too numerous
##  to list here. Please refer to the COPYRIGHT file for details.
##
##  SPDX-License-Identifier: GPL-2.0-or-later
##

MessageManagerData := rec ( finishing := false,
                            outgoingMessages := 0,
                            processesToFinish := 0);

ProcessFinish := function()
  local i;
  if processId=0 then
    for i in [1..commSize-1] do
      SendMessage (i, MESSAGE_TYPES.FINISH);
    od;
  else
    SendChannel (Tasks.TaskManagerRequests, rec ( type := TASK_MANAGER_REQUESTS.FINISH, worker := 0 ) );
  fi;
  atomic DistTaskData do
    DistTaskData.finishing := true;
  od;
end;

ProcessStopManagers := function()
  local i;
  if processId = 0 then
    for i in [1..commSize-1] do
      SendMessage (i, MESSAGE_TYPES.STOP_MANAGERS);
    od;
    SendChannel (Tasks.TaskManagerRequests, rec ( type := TASK_MANAGER_REQUESTS.FINISH, worker := 0 ) );
  else
    SendChannel (Tasks.TaskManagerRequests, rec ( type := TASK_MANAGER_REQUESTS.FINISH, worker := 0 ) );
  fi;
end;

ProcessMessage := function (message)
  local task, taskdata, outMessage, i, tmp, source, handle, data, l1, toUnblock, msg, res;

  source := message.source;
  if message.type = MESSAGE_TYPES.EVAL_MSG then
    ReadEvalFromString(message.content);
    return false;
  elif message.type = MESSAGE_TYPES.SCHEDULE_MSG then
    ProcessScheduleMsg (message);
    return false;
  elif message.type = MESSAGE_TYPES.STEAL_MSG then
    ProcessSteal (message, source);
    return false;
  elif message.type = MESSAGE_TYPES.STOP_STEALING_MSG then
    Tasks.doStealing := false;
    Tasks.stealingStopped := true;
    PrintTaskManStats();
    return false;
  elif message.type = MESSAGE_TYPES.FINISH then
    ProcessFinish();
    return true;
  elif message.type = MESSAGE_TYPES.STOP_MANAGERS then
    ProcessStopManagers();
    return false;
  #elif message.type = MESSAGE_TYPES.REMOTE_PUSH_OBJ_MSG then
    #ProcessRemotePushObj(message);
    #return false;
  #elif message.type = MESSAGE_TYPES.PROCESS_FINISHED then
    #atomic readwrite DistTaskData do
      #DistTaskData.processesToFinish := DistTaskData.processesToFinish-1;
    #od;
  #elif message.type = MESSAGE_TYPES.REMOTE_COPY_OBJ_MSG then
    #ProcessRemoteCopyObj(message);
    #return false;
  #elif message.type = MESSAGE_TYPES.ACK_MSG then
    #ProcessAckMsg(message);
    #return false;
  elif message.type = MESSAGE_TYPES.GET_OBJ_MSG then
    ProcessGetObjMsg(message);
    return false;
  elif message.type = MESSAGE_TYPES.OBJ_MSG then
    ProcessObjMsg(message);
    return false;
  #elif message.type = MESSAGE_TYPES.FETCH_OBJ then
    #ProcessFetchObj(message);
    #return false;
  #elif message.type = MESSAGE_TYPES.REPLY_OBJ then
    #ProcessReplyObj(message);
    #return false;
  elif message.type = MESSAGE_TYPES.GLOBAL_OBJ_HANDLE_MSG then
    ProcessGlobalObjHandleMsg (message);
    return false;
  elif message.type = MESSAGE_TYPES.SET_BY_HANDLE_MSG then
    ProcessSetByHandleMsg (message);
    return false;
  elif message.type = MESSAGE_TYPES.CHANGE_GLOBAL_COUNT_MSG then
    ProcessChangeGlobalCountMsg (message);
    return false;
  else
    Error ("Unknown message type ", message.type, "\n");
    return true;
  fi;
end;

MessageManagerFunc := function()
  local finished, msg;
  finished := false;
  while not finished do
    #MPI_Probe();
    msg := GetMessage();
    #Print (MSTime(), " :: ", processId, " got a new message of type ", msg.type, " from ", msg.source, "\n");
    finished := ProcessMessage(msg);
    #if MPI_Probe() then
    #  msg := GetMessage();
    #  finished := ProcessMessage(msg);
    #fi;
  od;

end;

StopManagers := function()
  if processId <> 0 then
    Error("StopManagers can only be called from process 0!\n");
  else
    SendMessage (processId, MESSAGE_TYPES.STOP_MANAGERS);
  fi;

end;

FinishProcesses := function ()
  SendMessage (processId, MESSAGE_TYPES.FINISH);
end;

#ParFinish := function ()
#  SendChannel (Tasks.TaskManagerRequests, rec ( type := TASK_MANAGER_REQUESTS.FINISH, worker := 0 ));
#  WaitThread(TaskManager);
#  WaitThread(MessageManager);
#  MPI_Finalize();
#end;

[ Dauer der Verarbeitung: 0.29 Sekunden  (vorverarbeitet)  ]