Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quellcode-Bibliothek analyze.py   Sprache: Python

 
#!/usr/bin/env python3

#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
Runs the static rooting analysis
"""

import argparse
import os
import subprocess
import sys
from subprocess import Popen

try:
    from shlex import quote
except ImportError:
    from pipes import quote


def execfile(thefile, globals):
    exec(compile(open(thefile).read(), filename=thefile, mode="exec"), globals)


# Label a string as an output.
class Output(str):
    pass


# Label a string as a pattern for multiple inputs.
class MultiInput(str):
    pass


# Construct a new environment by merging in some settings needed for running the individual scripts.
def env(config):
    # Add config['sixgill_bin'] to $PATH if not already there.
    path = os.environ["PATH"].split(":")
    if dir := config.get("sixgill_bin"):
        if dir not in path:
            path.insert(0, dir)

    return dict(
        os.environ,
        PATH=":".join(path),
        XDB=f"{config['sixgill_bin']}/xdb.so",
        SOURCE=config["source"],
    )


def fill(command, config):
    filled = []
    for s in command:
        try:
            rep = s.format(**config)
        except KeyError:
            print("Substitution failed: %s" % s)
            filled = None
            break

        if isinstance(s, Output):
            filled.append(Output(rep))
        elif isinstance(s, MultiInput):
            N = int(config["jobs"])
            for i in range(1, N + 1):
                filled.append(rep.format(i=i, n=N))
        else:
            filled.append(rep)

    if filled is None:
        raise Exception("substitution failure")

    return tuple(filled)


def print_command(job, config, env=None):
    # Display a command to run that has roughly the same effect as what was
    # actually run. The actual command uses temporary files that get renamed at
    # the end, and run some commands in parallel chunks. The printed command
    # will substitute in the actual output and run in a single chunk, so that
    # it is easier to cut & paste and add a --function flag for debugging.
    cfg = dict(config, n=1, i=1, jobs=1)
    cmd = job_command_with_final_output_names(job)
    cmd = fill(cmd, cfg)

    cmd = [quote(s) for s in cmd]
    if outfile := job.get("redirect-output"):
        cmd.extend([">", quote(outfile.format(**cfg))])
    if HOME := os.environ.get("HOME"):
        cmd = [s.replace(HOME, "~"for s in cmd]

    if env:
        # Try to keep the command as short as possible by only displaying
        # modified environment variable settings.
        e = os.environ
        changed = {key: value for key, value in env.items() if value != e.get(key)}
        if changed:
            settings = []
            for key, value in changed.items():
                if key in e and e[key] in value:
                    # Display modifications as V=prefix${V}suffix when
                    # possible. This can make a huge different for $PATH.
                    start = value.index(e[key])
                    end = start + len(e[key])
                    setting = '%s="%s${%s}%s"' % (key, value[:start], key, value[end:])
                else:
                    setting = '%s="%s"' % (key, value)
                if HOME:
                    setting = setting.replace(HOME, "$HOME")
                settings.append(setting)

            cmd = settings + cmd

    print(" " + " ".join(cmd))


JOBS = {
    "list-dbs": {"command": ["ls""-l"]},
    "rawcalls": {
        "command": [
            "{js}",
            "{analysis_scriptdir}/computeCallgraph.js",
            "{typeInfo}",
            Output("{rawcalls}"),
            "{i}",
            "{n}",
        ],
        "multi-output"True,
        "outputs": ["rawcalls.{i}.of.{n}"],
    },
    "gcFunctions": {
        "command": [
            "{js}",
            "{analysis_scriptdir}/computeGCFunctions.js",
            MultiInput("{rawcalls}"),
            "--outputs",
            Output("{callgraph}"),
            Output("{gcFunctions}"),
            Output("{gcFunctions_list}"),
            Output("{limitedFunctions_list}"),
        ],
        "outputs": [
            "callgraph.txt",
            "gcFunctions.txt",
            "gcFunctions.lst",
            "limitedFunctions.lst",
        ],
    },
    "gcTypes": {
        "command": [
            "{js}",
            "{analysis_scriptdir}/computeGCTypes.js",
            Output("{gcTypes}"),
            Output("{typeInfo}"),
        ],
        "outputs": ["gcTypes.txt""typeInfo.txt"],
    },
    "allFunctions": {
        "command": ["{sixgill_bin}/xdbkeys""src_body.xdb"],
        "redirect-output""allFunctions.txt",
    },
    "hazards": {
        "command": [
            "{js}",
            "{analysis_scriptdir}/analyzeRoots.js",
            "{gcFunctions_list}",
            "{limitedFunctions_list}",
            "{gcTypes}",
            "{typeInfo}",
            "{i}",
            "{n}",
            "tmp.{i}.of.{n}",
        ],
        "multi-output"True,
        "redirect-output""rootingHazards.{i}.of.{n}",
    },
    "gather-hazards": {
        "command": [
            "{js}",
            "{analysis_scriptdir}/mergeJSON.js",
            MultiInput("{hazards}"),
            Output("{all_hazards}"),
        ],
        "outputs": ["rootingHazards.json"],
    },
    "explain": {
        "command": [
            sys.executable,
            "{analysis_scriptdir}/explain.py",
            "{all_hazards}",
            "{gcFunctions}",
            Output("{explained_hazards}"),
            Output("{unnecessary}"),
            Output("{refs}"),
            Output("{html}"),
        ],
        "outputs": ["hazards.txt""unnecessary.txt""refs.txt""hazards.html"],
    },
    "heapwrites": {
        "command": ["{js}""{analysis_scriptdir}/analyzeHeapWrites.js"],
        "redirect-output""heapWriteHazards.txt",
    },
}


# Generator of (i, j, item) tuples corresponding to outputs:
#  - i is just the index of the yielded tuple (a la enumerate())
#  - j is the index of the item in the command list
#  - item is command[j]
def out_indexes(command):
    i = 0
    for j, fragment in enumerate(command):
        if isinstance(fragment, Output):
            yield (i, j, fragment)
            i += 1


def job_command_with_final_output_names(job):
    outfiles = job.get("outputs", [])
    command = list(job["command"])
    for i, j, name in out_indexes(job["command"]):
        command[j] = outfiles[i]
    return command


def run_job(name, config):
    job = JOBS[name]
    outs = job.get("outputs"or job.get("redirect-output")
    print("Running " + name + " to generate " + str(outs))
    if "function" in job:
        job["function"](config, job["redirect-output"])
        return

    N = int(config["jobs"]) if job.get("multi-output"else 1
    config["n"] = N
    jobs = {}
    for i in range(1, N + 1):
        config["i"] = i
        cmd = fill(job["command"], config)
        info = spawn_command(cmd, job, name, config)
        jobs[info["proc"].pid] = info

    if config["verbose"] > 0:
        print_command(job, config, env=env(config))

    final_status = 0
    while jobs:
        pid, status = os.wait()
        final_status = final_status or status
        info = jobs[pid]
        del jobs[pid]
        if "redirect" in info:
            info["redirect"].close()

        # Rename the temporary files to their final names.
        for temp, final in info["rename_map"].items():
            try:
                if config["verbose"] > 1:
                    print("Renaming %s -> %s" % (temp, final))
                os.rename(temp, final)
            except OSError:
                print("Error renaming %s -> %s" % (temp, final))
                raise

    if final_status != 0:
        raise Exception("job {} returned status {}".format(name, final_status))


def spawn_command(cmdspec, job, name, config):
    rename_map = {}

    if "redirect-output" in job:
        stdout_filename = "{}.tmp{}".format(name, config.get("i"""))
        final_outfile = job["redirect-output"].format(**config)
        rename_map[stdout_filename] = final_outfile
        command = cmdspec
    else:
        outfiles = fill(job["outputs"], config)
        stdout_filename = None

        # Replace the Outputs with temporary filenames, and record a mapping
        # from those temp names to their actual final names that will be used
        # if the command succeeds.
        command = list(cmdspec)
        for i, j, raw_name in out_indexes(cmdspec):
            [name] = fill([raw_name], config)
            command[j] = "{}.tmp{}".format(name, config.get("i"""))
            rename_map[command[j]] = outfiles[i]

    sys.stdout.flush()
    info = {"rename_map": rename_map}
    if stdout_filename:
        info["redirect"] = open(stdout_filename, "w")
        info["proc"] = Popen(command, stdout=info["redirect"], env=env(config))
    else:
        info["proc"] = Popen(command, env=env(config))

    if config["verbose"] > 1:
        print("Spawned process {}".format(info["proc"].pid))

    return info


# Default to conservatively assuming 4GB/job.
def max_parallel_jobs(job_size=4 * 2**30):
    """Return the max number of parallel jobs we can run without overfilling
    memory, assuming heavyweight jobs."""
    from_cores = int(subprocess.check_output(["nproc""--ignore=1"]).strip())
    mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")
    from_mem = round(mem_bytes / job_size)
    return min(from_cores, from_mem)


config = {"analysis_scriptdir": os.path.dirname(__file__)}

defaults = [
    "%s/defaults.py" % config["analysis_scriptdir"],
    "%s/defaults.py" % os.getcwd(),
]

parser = argparse.ArgumentParser(
    description="Statically analyze build tree for rooting hazards."
)
parser.add_argument(
    "step", metavar="STEP", type=str, nargs="?", help="run only step STEP"
)
parser.add_argument(
    "--source", metavar="SOURCE", type=str, nargs="?", help="source code to analyze"
)
parser.add_argument(
    "--js",
    metavar="JSSHELL",
    type=str,
    nargs="?",
    help="full path to ctypes-capable JS shell",
)
parser.add_argument(
    "--first",
    metavar="STEP",
    type=str,
    nargs="?",
    help="execute all jobs starting with STEP",
)
parser.add_argument(
    "--last", metavar="STEP", type=str, nargs="?", help="stop at step STEP"
)
parser.add_argument(
    "--jobs",
    "-j",
    default=None,
    metavar="JOBS",
    type=int,
    help="number of simultaneous analyzeRoots.js jobs",
)
parser.add_argument(
    "--list", const=True, nargs="?", type=bool, help="display available steps"
)
parser.add_argument(
    "--expect-file",
    type=str,
    nargs="?",
    help="deprecated option, temporarily still present for backwards " "compatibility",
)
parser.add_argument(
    "--verbose",
    "-v",
    action="count",
    default=1,
    help="Display cut & paste commands to run individual steps (give twice for more output)",
)
parser.add_argument("--quiet""-q", action="count", default=0, help="Suppress output")

args = parser.parse_args()
args.verbose = max(0, args.verbose - args.quiet)

for default in defaults:
    try:
        execfile(default, config)
        if args.verbose > 1:
            print("Loaded %s" % default)
    except Exception:
        pass

# execfile() used config as the globals for running the
# defaults.py script, and will have set a __builtins__ key as a side effect.
del config["__builtins__"]
data = config.copy()

for k, v in vars(args).items():
    if v is not None:
        data[k] = v

if args.jobs is not None:
    data["jobs"] = args.jobs
if not data.get("jobs"):
    data["jobs"] = max_parallel_jobs()

if "GECKO_PATH" in os.environ:
    data["source"] = os.environ["GECKO_PATH"]
if "SOURCE" in os.environ:
    data["source"] = os.environ["SOURCE"]

steps = [
    "gcTypes",
    "rawcalls",
    "gcFunctions",
    "allFunctions",
    "hazards",
    "gather-hazards",
    "explain",
    "heapwrites",
]

if args.list:
    for step in steps:
        job = JOBS[step]
        outfiles = job.get("outputs"or job.get("redirect-output")
        if outfiles:
            print(
                "%s\n ->%s %s"
                % (step, "*" if job.get("multi-output"else "", outfiles)
            )
        else:
            print(step)
    sys.exit(0)

for step in steps:
    job = JOBS[step]
    if "redirect-output" in job:
        data[step] = job["redirect-output"]
    elif "outputs" in job and "command" in job:
        outfiles = job["outputs"]
        num_outputs = 0
        for i, j, name in out_indexes(job["command"]):
            # Trim the {curly brackets} off of the output keys.
            data[name[1:-1]] = outfiles[i]
            num_outputs += 1
        assert (
            len(outfiles) == num_outputs
        ), 'step "%s": mismatched number of output files (%d) and params (%d)' % (
            step,
            num_outputs,
            len(outfiles),
        )  # NOQA: E501

if args.step:
    if args.first or args.last:
        raise Exception(
            "--first and --last cannot be used when a step argument is given"
        )
    steps = [args.step]
else:
    if args.first:
        steps = steps[steps.index(args.first) :]
    if args.last:
        steps = steps[: steps.index(args.last) + 1]

for step in steps:
    run_job(step, data)

77%


¤ Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.0.4Bemerkung:  ¤

*Bot Zugriff






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung ist noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge