Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  nsim.py   Sprache: Python

 
# SPDX-License-Identifier: GPL-2.0

import errno
import json
import os
import random
import re
import time
from .utils import cmd, ip


class NetdevSim:
    """
    Class for netdevsim netdevice and its attributes.
    """

    def __init__(self, nsimdev, port_index, ifname, ns=None):
        # In case udev renamed the netdev to according to new schema,
        # check if the name matches the port_index.
        nsimnamere = re.compile(r"eni\d+np(\d+)")
        match = nsimnamere.match(ifname)
        if match and int(match.groups()[0]) != port_index + 1:
            raise Exception("netdevice name mismatches the expected one")

        self.ifname = ifname
        self.nsimdev = nsimdev
        self.port_index = port_index
        self.ns = ns
        self.dfs_dir = "%s/ports/%u/" % (nsimdev.dfs_dir, port_index)
        ret = ip("-j link show dev %s" % ifname, ns=ns)
        self.dev = json.loads(ret.stdout)[0]
        self.ifindex = self.dev["ifindex"]

    def dfs_write(self, path, val):
        self.nsimdev.dfs_write(f'ports/{self.port_index}/' + path, val)


class NetdevSimDev:
    """
    Class for netdevsim bus device and its attributes.
    """
    @staticmethod
    def ctrl_write(path, val):
        fullpath = os.path.join("/sys/bus/netdevsim/", path)
        with open(fullpath, "w"as f:
            f.write(val)

    def dfs_write(self, path, val):
        fullpath = os.path.join(f"/sys/kernel/debug/netdevsim/netdevsim{self.addr}/", path)
        with open(fullpath, "w"as f:
            f.write(val)

    def __init__(self, port_count=1, queue_count=1, ns=None):
        # nsim will spawn in init_net, we'll set to actual ns once we switch it there
        self.ns = None

        if not os.path.exists("/sys/bus/netdevsim"):
            cmd("modprobe netdevsim")

        addr = random.randrange(1 << 15)
        while True:
            try:
                self.ctrl_write("new_device""%u %u %u" % (addr, port_count, queue_count))
            except OSError as e:
                if e.errno == errno.ENOSPC:
                    addr = random.randrange(1 << 15)
                    continue
                raise e
            break
        self.addr = addr

        # As probe of netdevsim device might happen from a workqueue,
        # so wait here until all netdevs appear.
        self.wait_for_netdevs(port_count)

        if ns:
            cmd(f"devlink dev reload netdevsim/netdevsim{addr} netns {ns.name}")
            self.ns = ns

        cmd("udevadm settle", ns=self.ns)
        ifnames = self.get_ifnames()

        self.dfs_dir = "/sys/kernel/debug/netdevsim/netdevsim%u/" % addr

        self.nsims = []
        for port_index in range(port_count):
            self.nsims.append(self._make_port(port_index, ifnames[port_index]))

        self.removed = False

    def __enter__(self):
        return self

    def __exit__(self, ex_type, ex_value, ex_tb):
        """
        __exit__ gets called at the end of a "with" block.
        """
        self.remove()

    def _make_port(self, port_index, ifname):
        return NetdevSim(self, port_index, ifname, self.ns)

    def get_ifnames(self):
        ifnames = []
        listdir = cmd(f"ls /sys/bus/netdevsim/devices/netdevsim{self.addr}/net/",
                      ns=self.ns).stdout.split()
        for ifname in listdir:
            ifnames.append(ifname)
        ifnames.sort()
        return ifnames

    def wait_for_netdevs(self, port_count):
        timeout = 5
        timeout_start = time.time()

        while True:
            try:
                ifnames = self.get_ifnames()
            except FileNotFoundError as e:
                ifnames = []
            if len(ifnames) == port_count:
                break
            if time.time() < timeout_start + timeout:
                continue
            raise Exception("netdevices did not appear within timeout")

    def remove(self):
        if not self.removed:
            self.ctrl_write("del_device""%u" % (self.addr, ))
            self.removed = True

    def remove_nsim(self, nsim):
        self.nsims.remove(nsim)
        self.ctrl_write("devices/netdevsim%u/del_port" % (self.addr, ),
                        "%u" % (nsim.port_index, ))

Messung V0.5
C=93 H=92 G=92

¤ Dauer der Verarbeitung: 0.4 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge