Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Linux/tools/verification/rvgen/rvgen/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 7 kB image not shown  

Quelle  ltl2k.py   Sprache: Python

 
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-only

from pathlib import Path
from . import generator
from . import ltl2ba

COLUMN_LIMIT = 100

def line_len(line: str) -> int:
    tabs = line.count('\t')
    return tabs * 7 + len(line)

def break_long_line(line: str, indent='') -> list[str]:
    result = []
    while line_len(line) > COLUMN_LIMIT:
        i = line[:COLUMN_LIMIT - line_len(line)].rfind(' ')
        result.append(line[:i])
        line = indent + line[i + 1:]
    if line:
        result.append(line)
    return result

def build_condition_string(node: ltl2ba.GraphNode):
    if not node.labels:
        return "(true)"

    result = "("

    first = True
    for label in sorted(node.labels):
        if not first:
            result += " && "
        result += label
        first = False

    result += ")"

    return result

def abbreviate_atoms(atoms: list[str]) -> list[str]:
    def shorten(s: str) -> str:
        skip = ["is""by""or""and"]
        return '_'.join([x[:2] for x in s.lower().split('_'if x not in skip])

    abbrs = []
    for atom in atoms:
        for i in range(len(atom), -1, -1):
            if sum(a.startswith(atom[:i]) for a in atoms) > 1:
                break
        share = atom[:i]
        unique = atom[i:]
        abbrs.append((shorten(share) + shorten(unique)))
    return abbrs

class ltl2k(generator.Monitor):
    template_dir = "ltl2k"

    def __init__(self, file_path, MonitorType, extra_params={}):
        if MonitorType != "per_task":
            raise NotImplementedError("Only per_task monitor is supported for LTL")
        super().__init__(extra_params)
        with open(file_path) as f:
            self.atoms, self.ba, self.ltl = ltl2ba.create_graph(f.read())
        self.atoms_abbr = abbreviate_atoms(self.atoms)
        self.name = extra_params.get("model_name")
        if not self.name:
            self.name = Path(file_path).stem

    def _fill_states(self) -> str:
        buf = [
            "enum ltl_buchi_state {",
        ]

        for node in self.ba:
            buf.append("\tS%i," % node.id)
        buf.append("\tRV_NUM_BA_STATES")
        buf.append("};")
        buf.append("static_assert(RV_NUM_BA_STATES <= RV_MAX_BA_STATES);")
        return buf

    def _fill_atoms(self):
        buf = ["enum ltl_atom {"]
        for a in sorted(self.atoms):
            buf.append("\tLTL_%s," % a)
        buf.append("\tLTL_NUM_ATOM")
        buf.append("};")
        buf.append("static_assert(LTL_NUM_ATOM <= RV_MAX_LTL_ATOM);")
        return buf

    def _fill_atoms_to_string(self):
        buf = [
            "static const char *ltl_atom_str(enum ltl_atom atom)",
            "{",
            "\tstatic const char *const names[] = {"
        ]

        for name in self.atoms_abbr:
            buf.append("\t\t\"%s\"," % name)

        buf.extend([
            "\t};",
            "",
            "\treturn names[atom];",
            "}"
        ])
        return buf

    def _fill_atom_values(self, required_values):
        buf = []
        for node in self.ltl:
            if str(node) not in required_values:
                continue

            if isinstance(node.op, ltl2ba.AndOp):
                buf.append("\tbool %s = %s && %s;" % (node, node.op.left, node.op.right))
                required_values |= {str(node.op.left), str(node.op.right)}
            elif isinstance(node.op, ltl2ba.OrOp):
                buf.append("\tbool %s = %s || %s;" % (node, node.op.left, node.op.right))
                required_values |= {str(node.op.left), str(node.op.right)}
            elif isinstance(node.op, ltl2ba.NotOp):
                buf.append("\tbool %s = !%s;" % (node, node.op.child))
                required_values.add(str(node.op.child))

        for atom in self.atoms:
            if atom.lower() not in required_values:
                continue
            buf.append("\tbool %s = test_bit(LTL_%s, mon->atoms);" % (atom.lower(), atom))

        buf.reverse()

        buf2 = []
        for line in buf:
            buf2.extend(break_long_line(line, "\t "))
        return buf2

    def _fill_transitions(self):
        buf = [
            "static void",
            "ltl_possible_next_states(struct ltl_monitor *mon, unsigned int state, unsigned long *next)",
            "{"
        ]

        required_values = set()
        for node in self.ba:
            for o in sorted(node.outgoing):
                required_values |= o.labels

        buf.extend(self._fill_atom_values(required_values))
        buf.extend([
            "",
            "\tswitch (state) {"
        ])

        for node in self.ba:
            buf.append("\tcase S%i:" % node.id)

            for o in sorted(node.outgoing):
                line   = "\t\tif "
                indent = "\t\t "

                line += build_condition_string(o)
                lines = break_long_line(line, indent)
                buf.extend(lines)

                buf.append("\t\t\t__set_bit(S%i, next);" % o.id)
            buf.append("\t\tbreak;")
        buf.extend([
            "\t}",
            "}"
        ])

        return buf

    def _fill_start(self):
        buf = [
            "static void ltl_start(struct task_struct *task, struct ltl_monitor *mon)",
            "{"
        ]

        required_values = set()
        for node in self.ba:
            if node.init:
                required_values |= node.labels

        buf.extend(self._fill_atom_values(required_values))
        buf.append("")

        for node in self.ba:
            if not node.init:
                continue

            line   = "\tif "
            indent = "\t "

            line += build_condition_string(node)
            lines = break_long_line(line, indent)
            buf.extend(lines)

            buf.append("\t\t__set_bit(S%i, mon->states);" % node.id)
        buf.append("}")
        return buf

    def fill_tracepoint_handlers_skel(self):
        buff = []
        buff.append("static void handle_example_event(void *data, /* XXX: fill header */)")
        buff.append("{")
        buff.append("\tltl_atom_update(task, LTL_%s, true/false);" % self.atoms[0])
        buff.append("}")
        buff.append("")
        return '\n'.join(buff)

    def fill_tracepoint_attach_probe(self):
        return "\trv_attach_trace_probe(\"%s\", /* XXX: tracepoint */, handle_example_event);" \
                % self.name

    def fill_tracepoint_detach_helper(self):
        return "\trv_detach_trace_probe(\"%s\", /* XXX: tracepoint */, handle_sample_event);" \
                % self.name

    def fill_atoms_init(self):
        buff = []
        for a in self.atoms:
            buff.append("\tltl_atom_set(mon, LTL_%s, true/false);" % a)
        return '\n'.join(buff)

    def fill_model_h(self):
        buf = [
            "/* SPDX-License-Identifier: GPL-2.0 */",
            "",
            "/*",
            " * C implementation of Buchi automaton, automatically generated by",
            " * tools/verification/rvgen from the linear temporal logic specification.",
            " * For further information, see kernel documentation:",
            " * Documentation/trace/rv/linear_temporal_logic.rst",
            " */",
            "",
            "#include ",
            "",
            "#define MONITOR_NAME " + self.name,
            ""
        ]

        buf.extend(self._fill_atoms())
        buf.append('')

        buf.extend(self._fill_atoms_to_string())
        buf.append('')

        buf.extend(self._fill_states())
        buf.append('')

        buf.extend(self._fill_start())
        buf.append('')

        buf.extend(self._fill_transitions())
        buf.append('')

        return '\n'.join(buf)

    def fill_monitor_class_type(self):
        return "LTL_MON_EVENTS_ID"

    def fill_monitor_class(self):
        return "ltl_monitor_id"

    def fill_main_c(self):
        main_c = super().fill_main_c()
        main_c = main_c.replace("%%ATOMS_INIT%%", self.fill_atoms_init())

        return main_c

Messung V0.5
C=99 H=66 G=83

¤ Dauer der Verarbeitung: 0.10 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.