Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Linux/tools/testing/selftests/bpf/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 2 kB image not shown  

Quelle  generate_udp_fragments.py   Sprache: Python

 
#!/bin/env python3
# SPDX-License-Identifier: GPL-2.0

"""
This script helps generate fragmented UDP packets.

While it is technically possible to dynamically generate
fragmented packets in C, it is much harder to read and write
said code. `scapy` is relatively industry standard and really
easy to read / write.

So we choose to write this script that generates a valid C
header. Rerun script and commit generated file after any
modifications.
"""

import argparse
import os

from scapy.all import *


# These constants must stay in sync with `ip_check_defrag.c`
VETH1_ADDR = "172.16.1.200"
VETH0_ADDR6 = "fc00::100"
VETH1_ADDR6 = "fc00::200"
CLIENT_PORT = 48878
SERVER_PORT = 48879
MAGIC_MESSAGE = "THIS IS THE ORIGINAL MESSAGE, PLEASE REASSEMBLE ME"


def print_header(f):
    f.write("// SPDX-License-Identifier: GPL-2.0\n")
    f.write("/* DO NOT EDIT -- this file is generated */\n")
    f.write("\n")
    f.write("#ifndef _IP_CHECK_DEFRAG_FRAGS_H\n")
    f.write("#define _IP_CHECK_DEFRAG_FRAGS_H\n")
    f.write("\n")
    f.write("#include \n")
    f.write("\n")


def print_frags(f, frags, v6):
    for idx, frag in enumerate(frags):
        # 10 bytes per line to keep width in check
        chunks = [frag[i : i + 10] for i in range(0, len(frag), 10)]
        chunks_fmted = [", ".join([str(hex(b)) for b in chunk]) for chunk in chunks]
        suffix = "6" if v6 else ""

        f.write(f"static uint8_t frag{suffix}_{idx}[] = {{\n")
        for chunk in chunks_fmted:
            f.write(f"\t{chunk},\n")
        f.write(f"}};\n")


def print_trailer(f):
    f.write("\n")
    f.write("#endif /* _IP_CHECK_DEFRAG_FRAGS_H */\n")


def main(f):
    # srcip of 0 is filled in by IP_HDRINCL
    sip = "0.0.0.0"
    sip6 = VETH0_ADDR6
    dip = VETH1_ADDR
    dip6 = VETH1_ADDR6
    sport = CLIENT_PORT
    dport = SERVER_PORT
    payload = MAGIC_MESSAGE.encode()

    # Disable UDPv4 checksums to keep code simpler
    pkt = IP(src=sip,dst=dip) / UDP(sport=sport,dport=dport,chksum=0) / Raw(load=payload)
    # UDPv6 requires a checksum
    # Also pin the ipv6 fragment header ID, otherwise it's a random value
    pkt6 = IPv6(src=sip6,dst=dip6) / IPv6ExtHdrFragment(id=0xBEEF) / UDP(sport=sport,dport=dport) / Raw(load=payload)

    frags = [f.build() for f in pkt.fragment(24)]
    frags6 = [f.build() for f in fragment6(pkt6, 72)]

    print_header(f)
    print_frags(f, frags, False)
    print_frags(f, frags6, True)
    print_trailer(f)


if __name__ == "__main__":
    dir = os.path.dirname(os.path.realpath(__file__))
    header = f"{dir}/ip_check_defrag_frags.h"
    with open(header, "w"as f:
        main(f)

Messung V0.5
C=87 H=57 G=73

¤ Dauer der Verarbeitung: 0.0 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.