Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/GAP/hpcgap/lib/distributed/   (Algebra von RWTH Aachen Version 4.15.1©)  Datei vom 18.9.2025 mit Größe 16 kB image not shown  

Quelle  globalobject_messages.g   Sprache: unbekannt

 
#############################################################################
##
##  This file is part of GAP, a system for computational discrete algebra.
##
##  Copyright of GAP belongs to its developers, whose names are too numerous
##  to list here. Please refer to the COPYRIGHT file for details.
##
##  SPDX-License-Identifier: GPL-2.0-or-later
##

CreateBlockedFetch := atomic function (readonly handle, sourceId, storeObj, pullObj)
  local blockedFetch;
  blockedFetch := rec ( type := REQUEST_TYPES.BLOCKED_FETCH,
                        pe := sourceId, storeObj := storeObj, pullObj := pullObj );
  if not IsBound (handle!.control.blockedOnHandle) then
    handle!.control.blockedOnHandle := MigrateObj ([], handle);
  fi;
  Add (handle!.control.blockedOnHandle, MigrateObj (blockedFetch, handle));
end;

# GLOBAL_OBJ_HANDLE_MSG is a message containing a handle, sent when a node
# executes SendHandle function.
# The format of the message is
# *************************
# * GLOBAL_OBJ_HANDLE_MSG *
# * --------------------- *
# * handle                * -- handle being sent
# * --------------------- *
# * name                  * -- name of the global variable to which the handle is
# * --------------------- *    assigned
# *************************

SendGlobalObjHandleMsg := atomic function (readonly handle, target, name)
  # todo : should be SendMessage (target, MESSAGE_TYPES.GLOBAL_OBJ_HANDLE_MSG, handle, name);
  #        (once we implement pickling/unpickling of handles)
  local hanRep;
  hanRep := [ handle!.pe, handle!.owner, handle!.localId, handle!.control.immediate, handle!.control.accessType ];
  SendMessage (target, MESSAGE_TYPES.GLOBAL_OBJ_HANDLE_MSG, hanRep, name);
end;

ProcessGlobalObjHandleMsg := function (message)
  local res, handle, hanRep, name, pe, localId,
        immediate, accessType, owner;
  hanRep := message.content[1];
  if IsBound(message.content[2]) then
    name := message.content[2];
  else
    name := fail;
  fi;
  pe := hanRep[1];
  owner := hanRep[2];
  localId := hanRep[3];
  immediate := hanRep[4];
  accessType := hanRep[5];
  atomic readwrite GAMap do
    if pe<>processId then
      handle := MyLookupHashTable (GAMap, rec (pe := pe, localId := localId));
    else
      handle := OBJ_HANDLE(localId);
    fi;
    if IsIdenticalObj(handle, fail) then
      handle := GlobalObjHandles.CreateHandleFromMsg (pe,
                        owner,
                        localId,
                        immediate,
                        accessType);
      if MPI_DEBUG.GA_MAP then MPILog(MPI_DEBUG_OUTPUT.GA_MAP, handle, String(HANDLE_OBJ(handle))); fi;
      ShareSpecialObj(handle);
      MyInsertHashTable(GAMap,
              MakeReadOnlyObj (rec ( pe := pe, localId := localId )),
                                        handle);
    fi;
    if not IsIdenticalObj(name, fail) then
      if not IsBound(name) then
        BindGlobal(name, handle);
        MakeReadOnlyGVar(name);
      else
        MakeReadWriteGVar(name);
        BindGlobal(name, handle);
        MakeReadOnlyGVar(name);
      fi;
    fi;
  od;
  return handle;
end;

# SET_BY_HANDLE_MSG is a message that is sent when a node does
# SetByHandle or SetByHandleList.
# The format of the message is
# *********************
# * SET_BY_HANDLE_MSG *
# * ----------------- *
# * globalAddrPE      * -- global address pe
# * ----------------- *
# * globalAddrLocalId * -- global address local id
# * ------------------*
# * value             * -- value to set the object of a handle to
# * ----------------- *
# * index             * -- index of the list to which handle points to
# *********************    (in the case of SeyByHandleList)

SendSetByHandleMsg := atomic function (readonly handle, value)
  SendMessage (handle!.pe, MESSAGE_TYPES.SET_BY_HANDLE_MSG, handle!.pe, handle!.localId, value);
end;

SendSetByHandleListMsg := atomic function (readonly handle, ind, value)
  SendMessage (handle!.pe, MESSAGE_TYPES.SET_BY_HANDLE_MSG, handle!.pe, handle!.localId, value, ind);
end;

ProcessSetByHandleMsg := function (message)
  local res, obj, handle, forwardPE, ind, p, pe, localId;
  pe := message.content[1];
  localId := message.content[2];
  obj := message.content[3];
  if IsBound(message.content[4]) then
    ind := message.content[4];
  fi;
  if processId = pe then
    handle := OBJ_HANDLE(localId);
  else
    handle := MyLookupHashTable (GAMap, rec ( pe := pe, localId := localId ));
  fi;
  atomic readwrite handle do
    if handle!.owner = processId then
      if handle!.control.immediate then
        handle!.obj[1] := obj;
      elif IsBound(ind) then
        if not IsThreadLocal(handle!.obj) then
          p := LOCK(handle!.obj);
        fi;
        if not IsList(handle!.obj) then
          Error ("SetByHandleList called for non-list object\n");
        fi;
        handle!.obj[ind] := obj;
      else
        handle!.obj := ShareSpecialObj(obj);
      fi;
    else
      forwardPE := handle!.owner;
      if IsBound(ind) then
        SendMessage (forwardPE, MESSAGE_TYPES.SET_BY_HANDLE_MSG, forwardPE, localId, obj, ind);
      else
        SendMessage (forwardPE, MESSAGE_TYPES.SET_BY_HANDLE_MSG, forwardPE, localId, obj);
      fi;
    fi;
  od;
end;

# CHANGE_GLOBAL_COUNT_MSG is a message that is sent when global count
# for a handle needs to be changed (e.g. when handles are destroyed)
# Format of the message is
# ***************************
# * CHANGE_GLOBAL_COUNT_MSG *
# * ----------------------- *
# * globalAddrPE            * -- global address pe
# * ----------------------- *
# * globalAddrLocalId       * -- global address local id
# * ----------------------- *
# * destLocalAddr           * -- local address of a handle on the
# * ----------------------- *    destination node
# * increment               * -- integer increment of global count
# ***************************

SendChangeGlobalCountMsg := atomic function (target, readonly handle, increment)
  SendMessage (target, MESSAGE_TYPES.CHANGE_GLOBAL_COUNT_MSG, handle!.pe, handle!.localId, increment);
end;

ProcessChangeGlobalCountMsg := function (message)
  local pe, localId, handle, inc;
  pe := message.content[1];
  localId := message.content[2];
  if pe = processId then
    handle := OBJ_HANDLE(localId);
  else
    handle := MyLookupHashTable (GAMap, rec ( pe := pe, localId := localId ));
  fi;
  inc := message.content[3];
  GlobalObjHandles.ChangeCount (handle, true, inc);
end;

# GET_OBJ_MSG is the message that is sent when an object needs to be
# fetched/read from a remote node (GetHandleObj, RemotePullObj, RemoteCloneObj)
# Format of the message is
# *********************
# * GET_OBJ_MSG tag   *
# * ----------------  *
# * sourceId          * -- id of the source node that sent the message
# * ----------------  *
# * globalAddrPE      * -- global address PE
# * ----------------  *
# * globalAddrLocalId * -- global address local id
# * ----------------  *
# * storeObj          * -- true if the received object needs to be stored in handle
# * ----------------  *    (RemoteCloneObj, RemotePullObj)
# * pullObj           * -- true if the object needs to be pulled from the remote node
# *********************    (RemotePullObj)

SendGetObjMsg := atomic function (readonly handle, storeObj, pullObj)
  SendMessage (handle!.owner, MESSAGE_TYPES.GET_OBJ_MSG,
          processId, handle!.pe, handle!.localId, storeObj, pullObj);
end;

# auxiliary functions that deal with creation of blocked fetches and
# processing of the queue of blocked requests on a handle
UnblockWaitingThreads := function (request)
  local thread;
  for thread in request.blockedOnRequest do
    SendChannel (Tasks.TaskManagerRequests,
            rec ( worker := thread, type := TASK_MANAGER_REQUESTS.RESUME_BLOCKED_WORKER));
  od;
end;

DoSendObj := fail;

InstallGlobalFunction(ProcessHandleBlockedQueue, atomic function (readwrite handle, obj)
  local toRemove, thread, queue;
  queue := handle!.control.blockedOnHandle;
  handle!.control.blockedOnHandle := MigrateObj ([], handle);
  for toRemove in queue do
    if MPI_DEBUG.OBJECT_TRANSFER then
      if toRemove.pullObj then
        MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ->-> ", String(toRemove.pe), " - ");
      elif toRemove.storeObj then
        MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ++ ", String(toRemove.pe), " - ");
      else
        MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ", String(toRemove.pe)," @@ - ");
      fi;
    fi;
    if toRemove.pe = processId then
      toRemove.obj := obj;
      toRemove.completed := true;
      if IsBound(toRemove.blockedOnRequest) then UnblockWaitingThreads(toRemove); fi;
    else
      DoSendObj (toRemove.pe, toRemove.storeObj, toRemove.pullObj, handle);
    fi;
  od;
end);

DoSendObj := atomic function (sourceId, storeObj, pullObj, readwrite handle)
  if handle!.control.haveObject then
    atomic readonly handle!.obj do
      if processId = sourceId then
        if MPI_DEBUG.OBJECT_TRANSFER then MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " local request"); fi;
        ProcessHandleBlockedQueue (handle, handle!.obj);
      else
        if MPI_DEBUG.OBJECT_TRANSFER then
          if pullObj then
            MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ->-> (", String(sourceId), ") --> ", String(sourceId));
          elif storeObj then
            MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ++ (", String(sourceId), ") --> ", String(sourceId));
          else
            MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ", String(sourceId), " @@ --> ", String(sourceId));
          fi;
        fi;
        SendMessage (sourceId, MESSAGE_TYPES.OBJ_MSG,
                handle!.pe,
                handle!.localId,
                handle!.obj,                 # object
                storeObj,
                pullObj,
                handle!.owner,
                handle!.control.immediate,
                handle!.control.accessType,
                handle!.control.globalCount);
      fi;
    od;
    if pullObj then
      if MPI_DEBUG.OBJECT_TRANSFER then MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " new owner"); fi;
      handle!.owner := sourceId;
      Unbind(handle!.obj);
      handle!.control.haveObject := false;
    fi;
  else
    if handle!.owner = processId then        # object under evaluation
      if MPI_DEBUG.OBJECT_TRANSFER then MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " | (obj under eval)"); fi;
      CreateBlockedFetch (handle, sourceId, storeObj, pullObj);
    else                                  # object resides somewhere else
      if MPI_DEBUG.OBJECT_TRANSFER then
        if pullObj then
          MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ->-> (", String(sourceId), ") ==> ", String(handle!.owner));
        elif storeObj then
          MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ++ (", String(sourceId), ") ==> ", String(handle!.owner));
        else
          MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ", sourceId, " @@ ==> ", String(handle!.owner));
        fi;
      fi;
      SendMessage (handle!.owner, MESSAGE_TYPES.GET_OBJ_MSG, sourceId, handle!.pe, handle!.localId,
              storeObj, pullObj);
    fi;
  fi;
end;


ProcessGetObjMsg := function (message)
  local sourceId, pe, localId, handle, storeObj, pullObj, request;
  sourceId := message.content[1];
  pe := message.content[2];
  localId := message.content[3];
  storeObj := message.content[4];
  pullObj := message.content[5];
  handle := MyLookupHashTable (GAMap, rec ( pe := pe, localId := localId));
  if IsIdenticalObj(handle,fail) then
    Error ("Node ", processId, " does not have handle for (", pe, ",", localId, ")\n");
  fi;
  atomic readwrite handle do
    DoSendObj(sourceId, storeObj, pullObj, handle);
  od;
end;

# i don't think we need ack messages any more, yes?
# ACK_MSG is a message that is sent when an object is migrated from one
# node to the other (using RemotePushObj or RemotePullObj), and when the
# owner of that object needs to change.
# Format of the message is
# *******************
# * ACK_MSG         *
# * --------------- *
# * destLocalAddr   * -- local address of a handle on the
# * --------------- *    destination node (node that receives the message)
# * sourceId        * -- id of the node
# * --------------- *
# * sourceLocalAddr * -- local address of a handle on the source node
# *******************

#ProcessAckMsg := function (message)
#  local remoteId, remoteLocalAddr, myLocalAddr, handle;
#  myLocalAddr := message.content[1];
#  remoteId := message.content[2];
#  remoteLocalAddr := message.content[3];
#  handle := OBJ_HANDLE(myLocalAddr);
#  atomic readwrite handle do
#    handle!.pe := remoteId;
#    handle!.localAddr := remoteLocalAddr;
#    PrintKY10 3AQ (processId, " is updating the local addr of ", HANDLE_OBJ(handle), " to ", remoteLocalAddr, "\n");
#    handle!.control.complete := true;
#    if not IsEmpty(handle!.control.blockedOnHandle) then
#      ProcessHandleBlockedQueue(handle, fail);
#    fi;
#  od;
#  # q : do we need to insert the newly received (pe,addr) pair into GAMap?
#end;


# OBJ_MSG is a message containing an object, which is sent when the object is
# requested from a remote node (GetHandleObj, RemotePullObj, RemoteCloneObj) or
# when the object is copied from the source to the destination node
# (RemoteCopyObj, RemotePushObj).
# In the comments below, 'sender node' is the node that sent the OBJ_MSG message,
# and 'receiver node' is the node that is processing it.
# The message is of the form
# ************************
# * OBJ_MSG tag          *
# * -------------------- *
# * globalAddrPE         * -- global address PE
# * -------------------- *
# * globalAddressLocalId * -- global address local id
# * -------------------- *
# * obj                  * -- object being transferred
# * -------------------- *
# * storeObj             * -- true or false, depending on whether the object needs to
# * -------------------- *    be stored in the dest node (false for GetHandleObj)
# * objPushed            * -- true if object is pushed from the source node (using
# * -------------------- *    RemotePushObj or RemotePullObj), false otherwise
# * owner                * -- owner of the object (relevant when a handle for the
# * -------------------- *    object needs to be created on the receiver)
# * immediate            * -- true if handle is immediate (relevant when a handle
# * -------------------- *    for the object needs to be created on the receiver)
# * accessType           * -- access type of a handle (relevant when a handle for the
# * -------------------- *    object needs to be created on the receiver)
# * globalCount          * -- global count of a handle (relevant when a handle for the
# ************************    object needs to be created on the receiver)

ProcessObjMsg := function (message)
  local pe, localId, obj, handle, id, thread, blocked, pushed,
        immediate, accessType, objPushed,
        storeObject, globalCount, sourceLocalAddr, owner;

  pe := message.content[1];
  localId := message.content[2];
  obj := ShareSpecialObj(message.content[3]);
  storeObject := message.content[4];
  objPushed := message.content[5];
  handle := MyLookupHashTable( GAMap, rec ( pe := pe, localId := localId ));
  if IsIdenticalObj(handle, fail) then
    owner := message.content[6];
    immediate := message.content[7];
    accessType := message.content[8];
    handle := GlobalObjHandles.CreateHandleFromMsg (pe,
                        owner,
                        localId,
                        immediate,
                        accessType);
    atomic readwrite GAMap do
      MyInsertHashTable (GAMap, rec ( pe := pe, localId := localId ), handle );
      if MPI_DEBUG.GA_MAP then MPILog(MPI_DEBUG_OUTPUT.GA_MAP, handle, String(HANDLE_OBJ(handle))); fi;
    od;
    ShareSpecialObj(handle);
  fi;
  atomic readwrite handle do
    immediate := handle!.control.immediate;
    if not immediate then
      ShareSpecialObj(obj);
      handle!.obj := obj;
    else
      handle!.obj := [];
      handle!.obj[1] := obj;
      #MigrateObj (handle!.obj, handle);
      atomic readonly obj do
        MigrateObj (obj, handle);
      od;
    fi;
    if storeObject then
      handle!.control.haveObject := true;
    fi;
  od;
  ProcessHandleBlockedQueue (handle, obj);
  # debug stuff
  atomic readonly handle do
    if MPI_DEBUG.OBJECT_TRANSFER then
      if objPushed then
        MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " obj (", String(message.source), ",->->,M)");
      elif storeObject then
        MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " obj (", String(message.source), ",++,M)");
      else
        MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " obj (", String(message.source), ",@@,X)");
      fi;
    fi;
  od;
  if objPushed then
    globalCount := message.content[9];
    atomic readwrite handle do
      handle!.control.globalCount := handle!.control.globalCount + globalCount + 1;
      if MPI_DEBUG.GA_MAP then MPILog(MPI_DEBUG_OUTPUT.CHANGE_COUNT, handle); fi;
      handle!.owner := processId;
      if MPI_DEBUG.OBJECT_TRANSFER then MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " new owner"); fi;
      atomic readwrite HandlesMap do
        MyInsertHashTable (HandlesMap, HANDLE_OBJ(handle), handle);
      od;
    od;
  fi;
end;

[ Dauer der Verarbeitung: 0.31 Sekunden  (vorverarbeitet)  ]