Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/GAP/pkg/io/gap/   (Algebra von RWTH Aachen Version 4.15.1©)  Datei vom 21.5.2025 mit Größe 12 kB image not shown  

Quelle  iohub.gi   Sprache: unbekannt

 
#############################################################################
##
##  iohub.gd               GAP 4 package IO
##                                                           Max Neunhoeffer
##
##  Copyright (C) by Max Neunhoeffer
##  This file is free software, see license information at the end.
##
##  This file contains functions for a generic client server framework
##  for GAP
##
##  Main points:
##
##   - handle multiple connections using IO multiplexing
##   - single threaded
##   - use pickling for data transfer
##

InstallMethod( IOHub, "constructor without arguments", [ ],
  function( )
    local s;
    s := rec( sock := fail, inqueue := [], outqueue := [],
              tosend := [], torecv := [], inbuf := [], outbuf := [],
              connections := [], isactive := true ) ;
    Objectify(IOHubType, s);
    return s;
  end );

InstallMethod( AttachServingSocket, "for an address and a port",
  [ IsIOHub, IsStringRep, IsPosInt ],
  function( s, address, port )
    s!.sock := IO_socket(IO.PF_INET,IO.SOCK_STREAM,"tcp");
    if s!.sock = fail then return fail; fi;
    IO_setsockopt(s!.sock,IO.SOL_SOCKET,IO.SO_REUSEADDR,"\001\001\001\001");
    if IO_bind(s!.sock,IO_MakeIPAddressPort(address,port)) = fail then
        IO_close(s!.sock);
        s!.sock := fail;
        return fail;
    fi;
    if not(IO_listen(s!.sock,5)) then
        IO_close(s!.sock);
        s!.sock := fail;
        return fail;
    fi;
    return s!.sock;
  end );

InstallMethod( CloseConnection, "for an IO hub and a positive integer",
  [ IsIOHub, IsPosInt ],
  function( s, nr )
    # First remove all entries in the queue from or for this connection:
    local i;
    if not(IsBound(s!.connections[nr])) then
        return fail;
    fi;
    if IsBound(s!.connections[nr][1]) then
        i := 1;
        while i <= Length(s!.inqueue) do
            if s!.inqueue[i][1] = nr then
                Remove(s!.inqueue[i]);
            else
                i := i + 1;
            fi;
        od;
        IO_close(s!.connections[nr][1]);
    fi;
    if IsBound(s!.connections[nr][2]) then
        i := 1;
        while i <= Length(s!.outqueue) do
            if s!.outqueue[i][1] = nr then
                Remove(s!.outqueue[i]);
            else
                i := i + 1;
            fi;
        od;
        IO_close(s!.connections[nr][2]);
    fi;
    Unbind(s!.connections[nr]);
    Unbind(s!.tosend[nr]);
    Unbind(s!.torecv[nr]);
    Unbind(s!.inbuf[nr]);
    Unbind(s!.outbuf[nr]);
    Print("Connection #",nr," closed.\n");
  end );

InstallMethod( ShutdownServingSocket, "for an IO hub",
  [IsIOHub],
  function(s)
    if s!.sock <> fail then
        IO_close(s!.sock);
        s!.sock := fail;
    fi;
  end );

InstallMethod( Shutdown, "for an IO hub",
  [ IsIOHub ],
  function( s )
    local i;
    if not(s!.isactive) then return; fi;
    for i in [1..Length(s!.connections)] do
        CloseConnection(s,i);
    od;
    ShutdownServingSocket(s);
    s!.isactive := false;
  end );

InstallMethod( ViewObj, "for a tcp server",
  [ IsIOHub ],
  function( s )
    local nr;
    if s!.isactive then
        Print("<IO hub");
        if s!.sock <> fail then
            Print(" with serving socket");
        fi;
        nr := Number([1..Length(s!.connections)],
                     i->IsBound(s!.connections[i]) and
                        IsBound(s!.connections[i][1]));
        if nr > 0 then
            Print(", reading from ",nr," fds");
        fi;
        nr := Number([1..Length(s!.connections)],
                     i->IsBound(s!.connections[i]) and
                        IsBound(s!.connections[i][2]));
        if nr > 0 then
            Print(", writing to ",nr," fds");
        fi;
        Print(">");
    else
        Print("<IO hub already shut down>");
    fi;
  end );

InstallMethod( NewConnection, "for an IO hub and two integers",
  [ IsIOHub, IsInt, IsInt ],
  function( s, inp, out )
    local i,l;
    if not(s!.isactive) then return fail; fi;
    i := Length(s!.connections)+1;   # do not reuse old connection numbers
    l := [];
    if inp > 0 then l[1] := inp; fi;
    if out > 0 then l[2] := out; fi;
    s!.connections[i] := l;
    s!.tosend[i] := 0;
    s!.torecv[i] := 0;
    s!.inbuf[i] := EmptyString(8);
    s!.outbuf[i] := "";
    return i;
  end );

InstallMethod( NewTCPConnection, "for an IO hub, an address and a port",
  [ IsIOHub, IsStringRep, IsPosInt ],
  function( s, address, port )
    local t;
    t := IO_socket(IO.PF_INET,IO.SOCK_STREAM,"tcp");
    if IO_connect(t,IO_MakeIPAddressPort(address,port)) = fail then
        IO_close(t);
        return fail;
    fi;
    return NewConnection(s,t,t);
  end );

InstallMethod( AcceptNewConnection, "for an IO hub",
  [ IsIOHub ],
  function( s )
    local t,i;
    if not(s!.isactive) or not(IsBound(s!.sock)) then
        return fail;
    fi;
    t := IO_accept(s!.sock,IO_MakeIPAddressPort("0.0.0.0",0));
    i := NewConnection( s, t, t );
    Print("Got new connection #",i,"...\n");
    return i;
  end );

InstallMethod( GetInput, "for an IO hub and an integer",
  [ IsIOHub, IsInt ],
  function( s, i )
    local p;
    if not(s!.isactive) then return fail; fi;
    if i = 0 then  # get something from any connection
        if Length(s!.inqueue) = 0 then
            return false;
        else
            return Remove(s!.inqueue,1);
        fi;
    else
        p := 1;
        while p <= Length(s!.inqueue) and s!.inqueue[p][1] <> i do
            p := p + 1;
        od;
        if p > Length(s!.inqueue) then
            return false;
        else
            return Remove(s!.inqueue,p);
        fi;
    fi;
  end );

InstallMethod( SubmitOutput, "for an IO hub, a positive integers and an obj",
  [ IsIOHub, IsPosInt, IsStringRep ],
  function( s, i, o )
    if not(IsBound(s!.connections[i]) and IsBound(s!.connections[i][2])) then
        Error("This connection is closed or has no output");
        return fail;
    fi;
    Add(s!.outqueue,[i,o]);
    return true;
  end );

InstallMethod( OutputQueue, "for an IO hub",
  [ IsIOHub ],
  function( s ) return s!.outqueue; end );

InstallMethod( InputQueue, "for an IO hub",
  [ IsIOHub ],
  function( s ) return s!.inqueue; end );

InstallMethod( StoreLenIn8Bytes, "for a string and a len",
  [IsStringRep, IsInt],
  function( st, len )
    local c,i;
    for i in [1..8] do
      c := len mod 256;
      st[i] := CHAR_INT(c);
      len := (len - c) / 256;
    od;
  end );

InstallMethod( GetLenFrom8Bytes, "for a string",
  [IsStringRep],
  function( st )
    local len,i;
    len := 0;
    for i in [8,7..1] do
        len := len * 256 + INT_CHAR(st[i]);
    od;
    return len;
  end );

InstallMethod( DoIO, "for an IO hub", [ IsIOHub ],
  function( s ) return DoIO(s,false); end );

InstallMethod( DoIO, "for an IO hub and a boolean",
  [ IsIOHub, IsBool ],
  function( s, block )
    # This uses select to see to all open connections including the
    # original socket to perform all possible IO on them. New connections
    # are created if needed and those to which network connectivity is
    # lost are closed.
    # Note that this does not automatically call the worker on the input
    # queue.
    # However, it does serve the output queue.
    local activity,bytes,hadactivity,i,infds,inptab,j,len,nr,outfds,outtab,st;

    if not(s!.isactive) then return fail; fi;

    hadactivity := false;
    repeat
        activity := false;
        # First we check whether some output from the queue has to be sent:
        j := 1;
        while j <= Length(s!.outqueue) do
            i := s!.outqueue[j][1];
            if s!.tosend[i] = 0 then   # idle
                st := Concatenation("00000000",s!.outqueue[j][2]);
                # the first 8 will be the length
                len := Length(st);
                StoreLenIn8Bytes(st,len-8);
                s!.outbuf[i] := st;
                s!.tosend[i] := len;
                Remove(s!.outqueue,j);
            else
                j := j + 1;
            fi;
        od;

        # Now do a select:
        infds := EmptyPlist(Length(s!.connections)+1);
        outfds := EmptyPlist(Length(s!.connections));
        inptab := EmptyPlist(Length(s!.connections)+1);
        outtab := EmptyPlist(Length(s!.connections));
        for i in [1..Length(s!.connections)] do
            if IsBound(s!.connections[i]) then
                if IsBound(s!.connections[i][1]) then
                    Add(infds,s!.connections[i][1]);
                    Add(inptab,i);
                fi;
                if IsBound(s!.connections[i][2]) and s!.tosend[i] <> 0 then
                    Add(outfds,s!.connections[i][2]);
                    Add(outtab,i);
                fi;
            fi;
        od;
        if s!.sock <> fail then
            Add(infds,s!.sock);
            Add(inptab,0);
        fi;
        if block and not(hadactivity) then
            nr := IO_select(infds,outfds,[],false,false);
        else
            nr := IO_select(infds,outfds,[],0,0);
        fi;
        if nr > 0 then
            # Look for possible output first:
            for j in [1..Length(outfds)] do
                if outfds[j] <> fail then
                    activity := true;
                    i := outtab[j];
                    bytes := IO_write(s!.connections[i][2],s!.outbuf[i],
                                      Length(s!.outbuf[i])-s!.tosend[i],
                                      s!.tosend[i]);
                    if bytes <= 0 then   # an error
                        CloseConnection(s,i);
                        # maybe we want to have a callback here!
                    else
                        s!.tosend[i] := s!.tosend[i] - bytes;
                        if s!.tosend[i] = 0 then
                            Unbind(s!.outbuf[i]);
                        fi;
                    fi;
                fi;
            od;
            # Now look for possible inputs next:
            # We need to remember that some connections might already
            # me closed by the output routine above!
            for j in [1..Length(infds)] do
                if infds[j] <> fail then
                    activity := true;
                    i := inptab[j];
                    if i = 0 then
                        AcceptNewConnection(s);
                    else
                        if IsBound(s!.connections[i]) then
                            if s!.torecv[i] = 0 then   # read length
                                bytes := IO_read(s!.connections[i][1],
                                   s!.inbuf[i],Length(s!.inbuf[i]),
                                   8-Length(s!.inbuf[i]));
                                if bytes <= 0 then   # an error
                                    CloseConnection(s,i);
                                    # maybe we want to have a callback here!
                                    continue;
                                fi;
                                if Length(s!.inbuf[i]) = 8 then
                                    s!.torecv[i]:=GetLenFrom8Bytes(s!.inbuf[i]);
                                    s!.inbuf[i] := EmptyString(s!.torecv[i]);
                                fi;
                            else   # we are in the reading process
                                bytes := IO_read(s!.connections[i][1],
                                   s!.inbuf[i],Length(s!.inbuf[i]),
                                   s!.torecv[i]-Length(s!.inbuf[i]));
                                if bytes <= 0 then   # an error
                                    CloseConnection(s,i);
                                    # maybe we want to have a callback here!
                                    continue;
                                fi;
                                if Length(s!.inbuf[i]) = s!.torecv[i] then
                                    Add(s!.inqueue,[i,s!.inbuf[i]]);
                                    s!.torecv[i] := 0;
                                    s!.inbuf[i] := EmptyString(8);
                                fi;
                            fi;
                        fi;
                    fi;
                fi;
            od;
        fi;
        if activity then hadactivity := true; fi;
    until activity = false;
    return hadactivity;
  end );


##
##  This program is free software: you can redistribute it and/or modify
##  it under the terms of the GNU General Public License as published by
##  the Free Software Foundation, either version 3 of the License, or
##  (at your option) any later version.
##
##  This program is distributed in the hope that it will be useful,
##  but WITHOUT ANY WARRANTY; without even the implied warranty of
##  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
##  GNU General Public License for more details.
##
##  You should have received a copy of the GNU General Public License
##  along with this program.  If not, see <https://www.gnu.org/licenses/>.
##

[ Dauer der Verarbeitung: 0.38 Sekunden  (vorverarbeitet)  ]