Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  globalobject.gi   Sprache: unbekannt

 
#############################################################################
##
##  This file is part of GAP, a system for computational discrete algebra.
##
##  Copyright of GAP belongs to its developers, whose names are too numerous
##  to list here. Please refer to the COPYRIGHT file for details.
##
##  SPDX-License-Identifier: GPL-2.0-or-later
##

# placeholder for helper functions that we don't want to expose
GlobalObjHandles := AtomicRecord ( rec () );

# the map of handles to the objects that the pe owns. used to prevent garbage collection
# of handles and its associated objects
HandlesMap := ShareSpecialObj ([ [], [] ]);

# the map of global addresses to local addresses. used for handles that reside on
# other nodes
GAMap := ShareSpecialObj ( [ [], [] ] );

# the map of tasks to the handles that will hold their results
TaskResultHandles := ShareSpecialObj ( [ [], [] ]);

# the list of handles that a thread has opened.
# used for testing whether the thread has access to a handle it is trying to use
MyHandles := [ [], [] ];
MakeThreadLocal("MyHandles");

# access types for handles. basically, just a list of constants
BindGlobal ("ACCESS_TYPES", MakeReadOnlyObj ( rec (
        READ_ONLY := 1,
        READ_WRITE := 2,
        VOLATILE := 3 )));

# types of non-blocking requests with handles
BindGlobal ("REQUEST_TYPES", MakeReadOnlyObj( rec (
        GET_OBJ := 1,
        CLONE_OBJ := 2,
        PULL_OBJ := 3,
        BLOCKED_FETCH := 4,
        PUSH_OBJ := 5,
        COPY_OBJ := 6,
        ACK := 7)));

DeclareRepresentation( "IsGlobalObjectHandleRep",
                       IsNonAtomicComponentObjectRep,
        ["pe", "localId", "owner", "accessType", "immediate", "obj", "control"] );

GlobalObjectHandleDefaultType :=
  NewType( GlobalObjectHandlesFamily,
           IsGlobalObjectHandleRep and IsGlobalObjectHandle);

InstallMethod (ViewObj, "for global object handles",
        [IsGlobalObjectHandleRep and IsGlobalObjectHandle],
        function (obj)
  local accessType;
  if obj!.control.accessType = ACCESS_TYPES.READ_ONLY then
    accessType := "ro";
  elif obj!.control.accessType = ACCESS_TYPES.READ_WRITE then
    accessType := "rw";
  else
    accessType := "vol";
  fi;
  Print ("< PE=", obj!.pe,", ID=",obj!.localId,", OW=", obj!.owner, ", AC=",accessType,">\n");
  Print ("<control data : REQ=", obj!.control.requested, ", HO=", obj!.control.haveObject, ", BL=",
         obj!.control.blockedOnHandle, ", LC=",obj!.control.localCount,
         ", GC=",obj!.control.globalCount,">\n");
end);


DoPushObj := fail;
ReadLib ("distributed/globalobject_messages.g");
ReadLib ("distributed/globalobject_io.g");

#################################################################################
# functions that deal with requests (created via calls to <...>NonBlocking
# functions)
InstallGlobalFunction (RequestCompleted, atomic function (readonly request)
  return request.completed;
end);

InstallGlobalFunction (WaitRequest, function (request)
  local p;
  p := LOCK (request);
  if request.completed then
    return;
  else
    if not IsBound(request.blockedOnRequest) then
      request.blockedOnRequest := MigrateObj ([], request);
    fi;
    Add (request.blockedOnRequest, threadId);
    UNLOCK(p);
    Tasks.BlockWorkerThread();
  fi;
end);

InstallGlobalFunction (WaitRequests, function (arg)
  local req;
  for req in arg do
    WaitRequest(req);
  od;
end);

InstallGlobalFunction (GetRequestObj, function (request)
  local p;
  p := LOCK(request);
  if request.completed then
    return request.obj;
  else
    UNLOCK(p);
    WaitRequest (request);
    atomic readonly request do
      return request.obj;
    od;
  fi;
end);
#############################################################################

# Some trivial functions to get some handle info
InstallGlobalFunction (GetHandlePE, atomic function( readonly handle )
    return handle!.pe;
end);

InstallGlobalFunction (GetHandleAccessType, atomic function ( readonly handle )
  return handle!.accessType;
end);

GlobalObjHandles.HaveAccessCheck := function(handle)
  local res;
  res := MyLookupHashTable(MyHandles, HANDLE_OBJ(handle));
  if IsIdenticalObj(res, fail) then
    Error ("Trying to access handle that is not opened from thread ", threadId, " on PE ", processId, "\n");
  fi;
end;

# helper function for creating handles
GlobalObjHandles.CreateHandle := function (arg)
  local pe, owner, immediate, accessType, obj, handle, control;
  pe := arg[1];
  owner := arg[2];
  immediate := arg[3];
  accessType := arg[4];
  control := rec ( accessType := accessType,
                   immediate := immediate,
                   requested := false,
                   haveObject := false,
                   globalCount := 0,
                   localCount := 0,
                   blockedOnHandle := [],
                   complete := true);
  handle := Objectify( GlobalObjectHandleDefaultType,
                    rec ( pe := pe, owner := owner, control := control ));
  if IsBound(arg[5]) then
    obj := arg[5];
    control.haveObject := true;
    if immediate then
      handle!.obj := [obj];
    else
      handle!.obj := obj;
    fi;
    if accessType = ACCESS_TYPES.READ_ONLY then
      if not IsReadOnlyObj(obj) then
        atomic readwrite obj do
          MakeReadOnlyObj(obj);
        od;
      fi;
    fi;
  fi;
  return handle;
end;

GlobalObjHandles.CreateHandleFromMsg := function (pe, owner, localId, immediate, accessType)
  local handle;
  handle := GlobalObjHandles.CreateHandle (pe, owner, immediate, accessType);
  handle!.localId := localId;
  return handle;
end;

InstallGlobalFunction (CreateHandleFromObj, function (arg)
  local objToStore, handle, obj, accessType, immediate;

  obj := arg[1];
  if Length(arg)>1 then
    accessType := arg[2];
  else
    accessType := ACCESS_TYPES.READ_ONLY;
  fi;
  if HANDLE_OBJ(obj) mod 4 = 0 then
    immediate := false;
  else
    immediate := true;
  fi;

  if immediate or (not IsThreadLocal(obj)) then
    objToStore := obj;
  else
    objToStore := ShareSpecialObj(obj);
  fi;

  handle := GlobalObjHandles.CreateHandle (processId,
                    processId,
                    immediate,
                    accessType,
                    objToStore);
  handle!.localId := HANDLE_OBJ(handle);
  atomic readwrite GAMap do
    if MPI_DEBUG.GA_MAP then MPILog(MPI_DEBUG_OUTPUT.GA_MAP, handle, String(HANDLE_OBJ(handle))); fi;
    MyInsertHashTable(GAMap, MakeReadOnlyObj(rec ( pe := processId, localId := handle!.localId )), handle);
  od;
  if MPI_DEBUG.HANDLE_CREATION then MPILog(MPI_DEBUG_OUTPUT.HANDLE_CREATION, handle); fi;
  ShareSpecialObj(handle);
  return handle;

end);

InstallGlobalFunction (CreateTaskResultHandle, function (task)
  local handle;
  handle := GlobalObjHandles.CreateHandle(processId, processId, false, ACCESS_TYPES.READ_WRITE);
  ShareSpecialObj (handle);
  atomic readwrite handle do
    handle!.localId := HANDLE_OBJ(handle);
  od;
  atomic readwrite task do
    task.result := handle;
  od;
  atomic readwrite HandlesMap do
    MyInsertHashTable (TaskResultHandles, HANDLE_OBJ(handle), handle);
  od;

  return handle;
end);

GlobalObjHandles.GetLocalCount := atomic function (readonly handle)
  return handle!.control.localCount;
end;

GlobalObjHandles.GetGlobalCount := atomic function (readonly handle)
  return handle!.control.globalCount;
end;


GlobalObjHandles.ChangeCount := atomic function (readwrite handle, global, inc)
  local pe;

  if not global then
    handle!.control.localCount := handle!.control.localCount+inc;
    if handle!.control.localCount<0 then
      Error ("Invalid change of local count for a handle!\n");
    else
      return handle!.control.localCount;
    fi;
    if MPI_DEBUG.GA_MAP then MPILog(MPI_DEBUG_OUTPUT.CHANGE_COUNT, handle); fi;
  else
    pe := handle!.pe;
    if pe = processId then
      handle!.control.globalCount := handle!.control.globalCount+inc;
      if handle!.control.globalCount<0 then
        Error ("Invalid change of global count for a handle");
      else
        return handle!.control.globalCount;
      fi;
      if MPI_DEBUG.GA_MAP then MPILog(MPI_DEBUG_OUTPUT.CHANGE_COUNT, handle); fi;
    else
      SendChangeGlobalCountMsg (pe, handle, inc);
      return -1;
    fi;
  fi;
end;

GlobalObjHandles.IncreaseGlobalCount := function (handle)
  return GlobalObjHandles.ChangeCount(handle, true, 1);
end;

GlobalObjHandles.DecreaseGlobalCount := function (handle)
  return GlobalObjHandles.ChangeCount(handle, true, -1);
end;

GlobalObjHandles.IncreaseLocalCount := function (handle)
  return GlobalObjHandles.ChangeCount(handle, false, 1);
end;

GlobalObjHandles.DecreaseLocalCount := function (handle)
  return GlobalObjHandles.ChangeCount(handle, false, -1);
end;

InstallGlobalFunction (Destroy, atomic function (readwrite handle)
  local localCount, globalCount;

  localCount := GlobalObjHandles.GetLocalCount(handle);
  globalCount := GlobalObjHandles.GetGlobalCount(handle);

  if localCount<>0 then
    Error("Cannot destroy handle in use on local node\n");
  elif handle!.pe<>processId then
    SendChangeGlobalCountMsg(handle!.pe, handle, -1);
  elif globalCount<>0 then
    Error("Cannot destroy handle referenced from other nodes\n");
  fi;

  if MPI_DEBUG.HANDLE_CREATION then MPILog(MPI_DEBUG_OUTPUT.HANDLE_DELETION, handle); fi;
  atomic HandlesMap do
    MyDeleteHashTable (HandlesMap, handle);
  od;
end);

InstallGlobalFunction (Open, function (globalObjHandle)
  MyInsertHashTable (MyHandles, HANDLE_OBJ(globalObjHandle), 1);
  return GlobalObjHandles.IncreaseLocalCount (globalObjHandle);
end);

InstallGlobalFunction (Close, function (globalObjHandle)
  GlobalObjHandles.HaveAccessCheck(globalObjHandle);
  return GlobalObjHandles.DecreaseLocalCount (globalObjHandle);
end);

# todo : fix GetHandleObjNonBlocking so that it doesn't store the object
#        into the handle, but rather only into the request. GetHandleObj also need
#        to be fixed in this way
InstallGlobalFunction (GetHandleObjNonBlocking, atomic function (readwrite handle)
  local objCopy, request;

  GlobalObjHandles.HaveAccessCheck(handle);
  request := rec ( completed := false, type := REQUEST_TYPES.GET_OBJ, pe := processId, pullObj := false, storeObj := false);
  if not handle!.control.haveObject then
    if handle!.owner = processId then
      if MPI_DEBUG.OBJECT_TRANSFER then MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " obj under eval |"); fi;
      Add (handle!.control.blockedOnHandle, MigrateObj(request, handle));
      return request;
    fi;
    if not handle!.control.requested then
      if MPI_DEBUG.OBJECT_TRANSFER then MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " @@ ", String(handle!.owner)); fi;
      SendGetObjMsg (handle, false, false);
      handle!.control.requested := true;
    fi;
    if MPI_DEBUG.OBJECT_TRANSFER then MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " @@ ", String(handle!.owner), " | "); fi;
    Add (handle!.control.blockedOnHandle, MigrateObj(request, handle));
    return request;
  fi;
  request.completed := true;
  if MPI_DEBUG.OBJECT_TRANSFER then MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " get obj local **"); fi;
  atomic readonly handle!.obj do
    if handle!.control.immediate then
      request.obj := handle!.obj[1];
    else
      request.obj := handle!.obj;
    fi;
  od;
  AdoptObj(request);
  return request;
end);

InstallGlobalFunction (GetHandleObj, function (handle)
  local request;
  request := GetHandleObjNonBlocking(handle);
  WaitRequest(request);
  atomic readwrite request do
    AdoptObj(request);
  od;
  #atomic readwrite request.obj do
    #AdoptObj(request.obj);
  #od;
  return request.obj;
end);

InstallGlobalFunction (SendHandle, atomic function (readwrite handle, pe)
  GlobalObjHandles.HaveAccessCheck(handle);
  GlobalObjHandles.IncreaseGlobalCount(handle);
  SendGlobalObjHandleMsg (handle, pe);
end);

InstallGlobalFunction (SendAndAssignHandle, atomic function (readwrite handle, pe, name)
  GlobalObjHandles.HaveAccessCheck(handle);
  GlobalObjHandles.IncreaseGlobalCount(handle);
  SendGlobalObjHandleMsg (handle, pe, name);
end);

InstallGlobalFunction (SetHandleObj, function (handle, obj)
  local localCount, p;
  GlobalObjHandles.HaveAccessCheck(handle);
  p := LOCK(handle, true);
  if handle!.control.accessType = ACCESS_TYPES.READ_ONLY then
    Error("Cannot change object of a READ_ONLY handle\n");
  fi;
  if handle!.control.haveObject then
    if handle!.control.immediate then
      handle!.obj[1] := obj;
    else
      handle!.obj := obj;
    fi;
  else
    SendSetByHandleMsg (handle, obj);
  fi;
end);

InstallGlobalFunction (SetHandleObjList, function (handle, ind, obj)
  local localCount, p, q, list;
  GlobalObjHandles.HaveAccessCheck(handle);
  q := LOCK(handle, true);
  if handle!.control.accessType = ACCESS_TYPES.READ_ONLY then
    Error("Cannot change object of a READ_ONLY handle\n");
  fi;
  if handle!.control.haveObject then
    list := handle!.obj;
    if not IsThreadLocal(list) then
      p := LOCK(list);
    fi;
    if not IsList(list) then
      Error ("Calling SetHandleObjList for a handle whose object is not list\n");
    fi;
    list[ind] := obj;
  else
    SendSetByHandleListMsg (handle, ind, obj);
  fi;
end);

InstallGlobalFunction (RemoteCopyObj, atomic function (readwrite handle, pe)
  GlobalObjHandles.HaveAccessCheck(handle);
  GlobalObjHandles.IncreaseGlobalCount(handle);
  if handle!.control.accessType = ACCESS_TYPES.READ_WRITE then
    Error ("Cannot call RemoteCopyObj on READ_WRITE handle!\n");
  fi;
  atomic readonly handle!.obj do
    DoSendObj (pe, true, false, handle);
  od;
end);

# this is a bid dodgy...what if object is under evaluation or something?
DoPushObj := function (handle, request)
  if MPI_DEBUG.OBJECT_TRANSFER then MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ->-> ", String(request.pe)); fi;
  if not handle!.control.haveObject then
    if MPI_DEBUG.OBJECT_TRANSFER then MPILog(MPI_DEBUG_OUTPUT.OBJECT_TRANSFER, handle, " ->-> (", String(request.pe),
            ") ==> ", String(handle!.owner)); fi;
    SendMessage (handle!.owner, MESSAGE_TYPES.GET_OBJ_MSG,
            request.pe, handle!.pe, handle!.localId, true, true);
    request.completed := true;
  else
    atomic readonly handle!.obj do
      SendMessage (request.pe, MESSAGE_TYPES.OBJ_MSG,
              handle!.pe,
              handle!.localId,
              handle!.obj,             # object
              true,                   # store object?
              true,                   # object is being pushed?
              handle!.owner,
              handle!.control.immediate,       # handle immediate
              handle!.control.accessType,      # handle access type
              handle!.control.globalCount);    # handle global count
    od;
    request.completed := true;
  fi;

  handle!.control.haveObject := false;
  handle!.owner := request.pe;
  Unbind(handle!.obj);
  return request;
end;

InstallGlobalFunction (RemotePushObjNonBlocking, atomic function (readwrite handle, pe)
  local p, request;
  GlobalObjHandles.HaveAccessCheck(handle);
  request := rec (completed := false, type := REQUEST_TYPES.PUSH_OBJ, pe := pe, pullObj := true, storeObj := true);
  DoPushObj(handle, request);
  return request;
end);

InstallGlobalFunction (RemotePushObj, function (handle, pe)
  local request;
  GlobalObjHandles.HaveAccessCheck(handle);
  request := RemotePushObjNonBlocking(handle, pe);
  WaitRequest(request);
  if not IsThreadLocal(request) then
    atomic readwrite request do
      AdoptObj(request);
    od;
  fi;
end);

InstallGlobalFunction (RemoteCloneObjNonBlocking, atomic function (readwrite handle)
  local objCopy, request;
  GlobalObjHandles.HaveAccessCheck(handle);
  request := rec ( completed := false, type := REQUEST_TYPES.CLONE_OBJ, pe := processId, pullObj := false, storeObj := true );
  if not handle!.control.haveObject then
    if not handle!.control.requested then
      SendGetObjMsg (handle, true, false);
    fi;
    Add (handle!.control.blockedOnHandle, MigrateObj(request, handle));
    return request;
  fi;
  request!.control.completed := true;
  if not IsThreadLocal(request) then
    atomic readwrite request do
      AdoptObj(request);
    od;
  fi;
  return request;
end);

InstallGlobalFunction (RemoteCloneObj, function(handle)
  local request;
  GlobalObjHandles.HaveAccessCheck(handle);
  request := RemoteCloneObjNonBlocking(handle);
  WaitRequest(request);
  if not IsThreadLocal(request) then
    atomic readwrite request do
      AdoptObj(request);
    od;
  fi;
end);

InstallGlobalFunction (RemotePullObjNonBlocking, atomic function (readwrite handle)
  local objCopy, request;
  GlobalObjHandles.HaveAccessCheck(handle);
  request := rec ( completed := false, type := REQUEST_TYPES.PULL_OBJ, pe := processId, storeObj := true, pullObj := true );
  if not handle!.control.haveObject then
    if not handle!.control.requested then
      SendGetObjMsg (handle, true, true);
    fi;
    Add (handle!.control.blockedOnHandle, MigrateObj(request, handle));
    return request;
  fi;
  request.completed := true;
  if not IsThreadLocal(request) then
    atomic readwrite request do
      AdoptObj(request);
    od;
  fi;
  return request;
end);

InstallGlobalFunction (RemotePullObj, function(handle)
  local request;
  GlobalObjHandles.HaveAccessCheck(handle);
  request := RemotePullObjNonBlocking(handle);
  WaitRequest(request);
  if not IsThreadLocal(request) then
    atomic readwrite request do
      AdoptObj(request);
    od;
  fi;
end);

[ Dauer der Verarbeitung: 0.35 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge