Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/testing/mozbase/mozdevice/mozdevice/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 179 kB image not shown  

Quelle  adb.py   Sprache: Python

 
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.

import io
import os
import posixpath
import re
import shlex
import shutil
import signal
import subprocess
import sys
import tempfile
import time
import traceback
from shutil import copytree
from threading import Thread

from . import version_codes

_TEST_ROOT = None


class ADBProcess:
    """ADBProcess encapsulates the data related to executing the adb process."""

    def __init__(self, args, use_stdout_pipe=False, timeout=None):
        #: command argument list.
        self.args = args
        Popen_args = {}

        #: Temporary file handle to be used for stdout.
        if use_stdout_pipe:
            self.stdout_file = subprocess.PIPE
            # Reading utf-8 from the stdout pipe
            Popen_args["encoding"] = "utf-8"
        else:
            self.stdout_file = tempfile.NamedTemporaryFile(mode="w+b")
        Popen_args["stdout"] = self.stdout_file

        #: boolean indicating if the command timed out.
        self.timedout = None

        #: exitcode of the process.
        self.exitcode = None

        #: subprocess Process object used to execute the command.
        Popen_args["stderr"] = subprocess.STDOUT
        self.proc = subprocess.Popen(args, **Popen_args)

        # If a timeout is set, then create a thread responsible for killing the
        # process, as well as updating the exitcode and timedout status.
        def timeout_thread(adb_process, timeout):
            start_time = time.time()
            polling_interval = 0.001
            adb_process.exitcode = adb_process.proc.poll()
            while (time.time() - start_time) <= float(
                timeout
            ) and adb_process.exitcode is None:
                time.sleep(polling_interval)
                adb_process.exitcode = adb_process.proc.poll()

            if adb_process.exitcode is None:
                adb_process.proc.kill()
                adb_process.timedout = True
                adb_process.exitcode = adb_process.proc.poll()

        if timeout:
            Thread(target=timeout_thread, args=(self, timeout), daemon=True).start()

    @property
    def stdout(self):
        """Return the contents of stdout."""
        assert not self.stdout_file == subprocess.PIPE
        if not self.stdout_file or self.stdout_file.closed:
            content = ""
        else:
            self.stdout_file.seek(0, os.SEEK_SET)
            content = self.stdout_file.read().rstrip().decode()
        return content

    def __str__(self):
        # Remove -s <serialno> from the error message to allow bug suggestions
        # to be independent of the individual failing device.
        arg_string = " ".join(self.args)
        arg_string = re.sub(r" -s [\w-]+""", arg_string)
        return "args: {}, exitcode: {}, stdout: {}".format(
            arg_string,
            self.exitcode,
            self.stdout,
        )

    def __iter__(self):
        assert self.stdout_file == subprocess.PIPE
        return self

    def __next__(self):
        assert self.stdout_file == subprocess.PIPE
        try:
            return next(self.proc.stdout)
        except StopIteration:
            # Wait until the process ends.
            while self.exitcode is None or self.timedout:
                time.sleep(0.001)
            raise StopIteration


# ADBError and ADBTimeoutError are treated differently in order that
# ADBTimeoutErrors can be handled distinctly from ADBErrors.


class ADBError(Exception):
    """ADBError is raised in situations where a command executed on a
    device either exited with a non-zero exitcode or when an
    unexpected error condition has occurred. Generally, ADBErrors can
    be handled and the device can continue to be used.
    """

    pass


class ADBProcessError(ADBError):
    """ADBProcessError is raised when an associated ADBProcess is
    available and relevant.
    """

    def __init__(self, adb_process):
        ADBError.__init__(self, str(adb_process))
        self.adb_process = adb_process


class ADBListDevicesError(ADBError):
    """ADBListDevicesError is raised when errors are found listing the
    devices, typically not any permissions.

    The devices information is stocked with the *devices* member.
    """

    def __init__(self, msg, devices):
        ADBError.__init__(self, msg)
        self.devices = devices


class ADBTimeoutError(Exception):
    """ADBTimeoutError is raised when either a host command or shell
    command takes longer than the specified timeout to execute. The
    timeout value is set in the ADBCommand constructor and is 300 seconds by
    default. This error is typically fatal since the host is having
    problems communicating with the device. You may be able to recover
    by rebooting, but this is not guaranteed.

    Recovery options are:

    * Killing and restarting the adb server via
      ::

          adb kill-server; adb start-server

    * Rebooting the device manually.
    * Rebooting the host.
    """

    pass


class ADBDeviceFactoryError(Exception):
    """ADBDeviceFactoryError is raised when the ADBDeviceFactory is in
    an inconsistent state.
    """

    pass


class ADBCommand:
    """ADBCommand provides a basic interface to adb commands
    which is used to provide the 'command' methods for the
    classes ADBHost and ADBDevice.

    ADBCommand should only be used as the base class for other
    classes and should not be instantiated directly. To enforce this
    restriction calling ADBCommand's constructor will raise a
    NonImplementedError exception.

    :param str adb: path to adb executable. Defaults to 'adb'.
    :param str adb_host: host of the adb server.
    :param int adb_port: port of the adb server.
    :param str logger_name: logging logger name. Defaults to 'adb'.
    :param int timeout: The default maximum time in
        seconds for any spawned adb process to complete before
        throwing an ADBTimeoutError.  This timeout is per adb call. The
        total time spent may exceed this value. If it is not
        specified, the value defaults to 300.
    :param bool verbose: provide verbose output
    :param bool use_root: Use root if available on device
    :raises: :exc:`ADBError`
             :exc:`ADBTimeoutError`

    ::

       from mozdevice import ADBCommand

       try:
           adbcommand = ADBCommand()
       except NotImplementedError:
           print "ADBCommand can not be instantiated."
    """

    def __init__(
        self,
        adb="adb",
        adb_host=None,
        adb_port=None,
        logger_name="adb",
        timeout=300,
        verbose=False,
        use_root=True,
    ):
        if self.__class__ == ADBCommand:
            raise NotImplementedError

        self._logger = self._get_logger(logger_name, verbose)
        self._verbose = verbose
        self._use_root = use_root
        self._adb_path = adb
        self._adb_host = adb_host
        self._adb_port = adb_port
        self._timeout = timeout
        self._polling_interval = 0.001
        self._adb_version = ""

        self._logger.debug(f"{self.__class__.__name__}: {self.__dict__}")

        # catch early a missing or non executable adb command
        # and get the adb version while we are at it.
        try:
            output = subprocess.Popen(
                [adb, "version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
            ).communicate()
            re_version = re.compile(r"Android Debug Bridge version (.*)")
            if isinstance(output[0], bytes):
                self._adb_version = re_version.match(
                    output[0].decode("utf-8""replace")
                ).group(1)
            else:
                self._adb_version = re_version.match(output[0]).group(1)

            if self._adb_version < "1.0.36":
                raise ADBError(
                    "adb version %s less than minimum 1.0.36" % self._adb_version
                )

        except Exception as exc:
            raise ADBError(f"{exc}: {adb} is not executable.")

    def _get_logger(self, logger_name, verbose):
        logger = None
        level = "DEBUG" if verbose else "INFO"
        try:
            import mozlog

            logger = mozlog.get_default_logger(logger_name)
            if not logger:
                if sys.__stdout__.isatty():
                    defaults = {"mach": sys.stdout}
                else:
                    defaults = {"tbpl": sys.stdout}
                logger = mozlog.commandline.setup_logging(
                    logger_name, {}, defaults, formatter_defaults={"level": level}
                )
        except ImportError:
            pass

        if logger is None:
            import logging

            logger = logging.getLogger(logger_name)
            logger.setLevel(level)
        return logger

    # Host Command methods

    def command(self, cmds, device_serial=None, timeout=None):
        """Executes an adb command on the host.

        :param list cmds: The command and its arguments to be
            executed.
        :param str device_serial: The device's
            serial number if the adb command is to be executed against
            a specific device. If it is not specified, ANDROID_SERIAL
            from the environment will be used if it is set.
        :param int timeout: The maximum time in
            seconds for any spawned adb process to complete before
            throwing an ADBTimeoutError.  This timeout is per adb call. The
            total time spent may exceed this value. If it is not
            specified, the value set in the ADBCommand constructor is used.
        :return: :class:`ADBProcess`

        command() provides a low level interface for executing
        commands on the host via adb.

        command() executes on the host in such a fashion that stdout
        of the adb process is a file handle on the host and
        the exit code is available as the exit code of the adb
        process.

        The caller provides a list containing commands, as well as a
        timeout period in seconds.

        A subprocess is spawned to execute adb with stdout and stderr
        directed to a temporary file. If the process takes longer than
        the specified timeout, the process is terminated.

        It is the caller's responsibilty to clean up by closing
        the stdout temporary file.
        """
        args = [self._adb_path]
        device_serial = device_serial or os.environ.get("ANDROID_SERIAL")
        if self._adb_host:
            args.extend(["-H", self._adb_host])
        if self._adb_port:
            args.extend(["-P", str(self._adb_port)])
        if device_serial:
            args.extend(["-s", device_serial, "wait-for-device"])
        args.extend(cmds)

        adb_process = ADBProcess(args)

        if timeout is None:
            timeout = self._timeout

        start_time = time.time()
        adb_process.exitcode = adb_process.proc.poll()
        while (time.time() - start_time) <= float(
            timeout
        ) and adb_process.exitcode is None:
            time.sleep(self._polling_interval)
            adb_process.exitcode = adb_process.proc.poll()
        if adb_process.exitcode is None:
            adb_process.proc.kill()
            adb_process.timedout = True
            adb_process.exitcode = adb_process.proc.poll()

        adb_process.stdout_file.seek(0, os.SEEK_SET)

        return adb_process

    def command_output(self, cmds, device_serial=None, timeout=None):
        """Executes an adb command on the host returning stdout.

        :param list cmds: The command and its arguments to be
            executed.
        :param str device_serial: The device's
            serial number if the adb command is to be executed against
            a specific device. If it is not specified, ANDROID_SERIAL
            from the environment will be used if it is set.
        :param int timeout: The maximum time in seconds
            for any spawned adb process to complete before throwing
            an ADBTimeoutError.
            This timeout is per adb call. The total time spent
            may exceed this value. If it is not specified, the value
            set in the ADBCommand constructor is used.
        :return: str - content of stdout.
        :raises: :exc:`ADBTimeoutError`
                 :exc:`ADBError`
        """
        adb_process = None
        try:
            # Need to force the use of the ADBCommand class's command
            # since ADBDevice will redefine command and call its
            # own version otherwise.
            adb_process = ADBCommand.command(
                self, cmds, device_serial=device_serial, timeout=timeout
            )
            if adb_process.timedout:
                raise ADBTimeoutError("%s" % adb_process)
            if adb_process.exitcode:
                raise ADBProcessError(adb_process)
            output = adb_process.stdout
            if self._verbose:
                self._logger.debug(
                    "command_output: %s, "
                    "timeout: %s, "
                    "timedout: %s, "
                    "exitcode: %s, output: %s"
                    % (
                        " ".join(adb_process.args),
                        timeout,
                        adb_process.timedout,
                        adb_process.exitcode,
                        output,
                    )
                )

            return output
        finally:
            if adb_process and isinstance(adb_process.stdout_file, io.IOBase):
                adb_process.stdout_file.close()


class ADBHost(ADBCommand):
    """ADBHost provides a basic interface to adb host commands
    which do not target a specific device.

    :param str adb: path to adb executable. Defaults to 'adb'.
    :param str adb_host: host of the adb server.
    :param int adb_port: port of the adb server.
    :param logger_name: logging logger name. Defaults to 'adb'.
    :param int timeout: The default maximum time in
        seconds for any spawned adb process to complete before
        throwing an ADBTimeoutError.  This timeout is per adb call. The
        total time spent may exceed this value. If it is not
        specified, the value defaults to 300.
    :param bool verbose: provide verbose output
    :raises: :exc:`ADBError`
             :exc:`ADBTimeoutError`

    ::

       from mozdevice import ADBHost

       adbhost = ADBHost()
       adbhost.start_server()
    """

    def __init__(
        self,
        adb="adb",
        adb_host=None,
        adb_port=None,
        logger_name="adb",
        timeout=300,
        verbose=False,
    ):
        ADBCommand.__init__(
            self,
            adb=adb,
            adb_host=adb_host,
            adb_port=adb_port,
            logger_name=logger_name,
            timeout=timeout,
            verbose=verbose,
            use_root=True,
        )

    def command(self, cmds, timeout=None):
        """Executes an adb command on the host.

        :param list cmds: The command and its arguments to be
            executed.
        :param int timeout: The maximum time in
            seconds for any spawned adb process to complete before
            throwing an ADBTimeoutError.  This timeout is per adb call. The
            total time spent may exceed this value. If it is not
            specified, the value set in the ADBHost constructor is used.
        :return: :class:`ADBProcess`

        command() provides a low level interface for executing
        commands on the host via adb.

        command() executes on the host in such a fashion that stdout
        of the adb process is a file handle on the host and
        the exit code is available as the exit code of the adb
        process.

        The caller provides a list containing commands, as well as a
        timeout period in seconds.

        A subprocess is spawned to execute adb with stdout and stderr
        directed to a temporary file. If the process takes longer than
        the specified timeout, the process is terminated.

        It is the caller's responsibilty to clean up by closing
        the stdout temporary file.
        """
        return ADBCommand.command(self, cmds, timeout=timeout)

    def command_output(self, cmds, timeout=None):
        """Executes an adb command on the host returning stdout.

        :param list cmds: The command and its arguments to be
            executed.
        :param int timeout: The maximum time in seconds
            for any spawned adb process to complete before throwing
            an ADBTimeoutError.
            This timeout is per adb call. The total time spent
            may exceed this value. If it is not specified, the value
            set in the ADBHost constructor is used.
        :return: str - content of stdout.
        :raises: :exc:`ADBTimeoutError`
                 :exc:`ADBError`
        """
        return ADBCommand.command_output(self, cmds, timeout=timeout)

    def start_server(self, timeout=None):
        """Starts the adb server.

        :param int timeout: The maximum time in
            seconds for any spawned adb process to complete before
            throwing an ADBTimeoutError.  This timeout is per adb call. The
            total time spent may exceed this value. If it is not
            specified, the value set in the ADBHost constructor is used.
        :raises: :exc:`ADBTimeoutError`
                 :exc:`ADBError`

        Attempting to use start_server with any adb_host value other than None
        will fail with an ADBError exception.

        You will need to start the server on the remote host via the command:

        .. code-block:: shell

            adb -a fork-server server

        If you wish the remote adb server to restart automatically, you can
        enclose the command in a loop as in:

        .. code-block:: shell

            while true; do
              adb -a fork-server server
            done
        """
        self.command_output(["start-server"], timeout=timeout)

    def kill_server(self, timeout=None):
        """Kills the adb server.

        :param int timeout: The maximum time in
            seconds for any spawned adb process to complete before
            throwing an ADBTimeoutError.  This timeout is per adb call. The
            total time spent may exceed this value. If it is not
            specified, the value set in the ADBHost constructor is used.
        :raises: :exc:`ADBTimeoutError`
                 :exc:`ADBError`
        """
        self.command_output(["kill-server"], timeout=timeout)

    def devices(self, timeout=None):
        """Executes adb devices -l and returns a list of objects describing attached devices.

        :param int timeout: The maximum time in
            seconds for any spawned adb process to complete before
            throwing an ADBTimeoutError.  This timeout is per adb call. The
            total time spent may exceed this value. If it is not
            specified, the value set in the ADBHost constructor is used.
        :return: an object contain
        :raises: :exc:`ADBTimeoutError`
                 :exc:`ADBListDevicesError`
                 :exc:`ADBError`

        The output of adb devices -l

        ::

            $ adb devices -l
            List of devices attached
            b313b945               device usb:1-7 product:d2vzw model:SCH_I535 device:d2vzw

        is parsed and placed into an object as in

        ::

            [{'device_serial''b313b945''state''device''product''d2vzw',
              'usb''1-7''device''d2vzw''model''SCH_I535' }]
        """
        # b313b945               device usb:1-7 product:d2vzw model:SCH_I535 device:d2vzw
        # from Android system/core/adb/transport.c statename()
        re_device_info = re.compile(
            r"([^\s]+)\s+(offline|bootloader|device|host|recovery|sideload|"
            "no permissions|unauthorized|unknown)"
        )
        devices = []
        lines = self.command_output(["devices""-l"], timeout=timeout).splitlines()
        for line in lines:
            if line == "List of devices attached ":
                continue
            match = re_device_info.match(line)
            if match:
                device = {"device_serial": match.group(1), "state": match.group(2)}
                remainder = line[match.end(2) :].strip()
                if remainder:
                    try:
                        device.update(
                            dict([j.split(":"for j in remainder.split(" ")])
                        )
                    except ValueError:
                        self._logger.warning(
                            "devices: Unable to parse " "remainder for device %s" % line
                        )
                devices.append(device)
        for device in devices:
            if device["state"] == "no permissions":
                raise ADBListDevicesError(
                    "No permissions to detect devices. You should restart the"
                    " adb server as root:\n"
                    "\n# adb kill-server\n# adb start-server\n"
                    "\nor maybe configure your udev rules.",
                    devices,
                )
        return devices


ADBDEVICES = {}


def ADBDeviceFactory(
    device=None,
    adb="adb",
    adb_host=None,
    adb_port=None,
    test_root=None,
    logger_name="adb",
    timeout=300,
    verbose=False,
    device_ready_retry_wait=20,
    device_ready_retry_attempts=3,
    use_root=True,
    share_test_root=True,
    run_as_package=None,
):
    """ADBDeviceFactory provides a factory for :class:`ADBDevice`
    instances that enforces the requirement that only one
    :class:`ADBDevice` be created for each attached device. It uses
    the identical arguments as the :class:`ADBDevice`
    constructor. This is also used to ensure that the device's
    test_root is initialized to an empty directory before tests are
    run on the device.

    :return: :class:`ADBDevice`
    :raises: :exc:`ADBDeviceFactoryError`
             :exc:`ADBError`
             :exc:`ADBTimeoutError`

    """
    device = device or os.environ.get("ANDROID_SERIAL")
    if device is not None and device in ADBDEVICES:
        # We have already created an ADBDevice for this device, just re-use it.
        adbdevice = ADBDEVICES[device]
    elif device is None and ADBDEVICES:
        # We did not specify the device serial number and we have
        # already created an ADBDevice which means we must only have
        # one device connected and we can re-use the existing ADBDevice.
        devices = list(ADBDEVICES.keys())
        assert (
            len(devices) == 1
        ), "Only one device may be connected if the device serial number is not specified."
        adbdevice = ADBDEVICES[devices[0]]
    elif (
        device is not None
        and device not in ADBDEVICES
        or device is None
        and not ADBDEVICES
    ):
        # The device has not had an ADBDevice created yet.
        adbdevice = ADBDevice(
            device=device,
            adb=adb,
            adb_host=adb_host,
            adb_port=adb_port,
            test_root=test_root,
            logger_name=logger_name,
            timeout=timeout,
            verbose=verbose,
            device_ready_retry_wait=device_ready_retry_wait,
            device_ready_retry_attempts=device_ready_retry_attempts,
            use_root=use_root,
            share_test_root=share_test_root,
            run_as_package=run_as_package,
        )
        ADBDEVICES[adbdevice._device_serial] = adbdevice
    else:
        raise ADBDeviceFactoryError(
            "Inconsistent ADBDeviceFactory: device: %s, ADBDEVICES: %s"
            % (device, ADBDEVICES)
        )
    # Clean the test root before testing begins.
    if test_root:
        adbdevice.rm(
            posixpath.join(adbdevice.test_root, "*"),
            recursive=True,
            force=True,
            timeout=timeout,
        )
    # Sync verbose and update the logger configuration in case it has
    # changed since the initial initialization
    if verbose != adbdevice._verbose:
        adbdevice._verbose = verbose
        adbdevice._logger = adbdevice._get_logger(adbdevice._logger.name, verbose)
    return adbdevice


class ADBDevice(ADBCommand):
    """ADBDevice provides methods which can be used to interact with the
    associated Android-based device.

    :param str device: When a string is passed in device, it
        is interpreted as the device serial number. This form is not
        compatible with devices containing a ":" in the serial; in
        this case ValueError will be raised.  When a dictionary is
        passed it must have one or both of the keys "device_serial"
        and "usb". This is compatible with the dictionaries in the
        list returned by ADBHost.devices(). If the value of
        device_serial is a valid serial not containing a ":" it will
        be used to identify the device, otherwise the value of the usb
        key, prefixed with "usb:" is used.  If None is passed and
        there is exactly one device attached to the host, that device
        is used. If None is passed and ANDROID_SERIAL is set in the environment,
        that device is used. If there is more than one device attached and
        device is None and ANDROID_SERIAL is not set in the environment, ValueError
        is raised. If no device is attached the constructor will block
        until a device is attached or the timeout is reached.
    :param str adb_host: host of the adb server to connect to.
    :param int adb_port: port of the adb server to connect to.
    :param str test_root: value containing the test root to be
        used on the device. This value will be shared among all
        instances of ADBDevice if share_test_root is True.
    :param str logger_name: logging logger name. Defaults to 'adb'
    :param int timeout: The default maximum time in
        seconds for any spawned adb process to complete before
        throwing an ADBTimeoutError.  This timeout is per adb call. The
        total time spent may exceed this value. If it is not
        specified, the value defaults to 300.
    :param bool verbose: provide verbose output
    :param int device_ready_retry_wait: number of seconds to wait
        between attempts to check if the device is ready after a
        reboot.
    :param integer device_ready_retry_attempts: number of attempts when
        checking if a device is ready.
    :param bool use_root: Use root if it is available on device
    :param bool share_test_root: True if instance should share the
        same test_root value with other ADBInstances. Defaults to True.
    :param str run_as_package: Name of package to be used in run-as in liew of
        using su.
    :raises: :exc:`ADBError`
             :exc:`ADBTimeoutError`
             :exc:`ValueError`

    ::

       from mozdevice import ADBDevice

       adbdevice = ADBDevice()
       print(adbdevice.list_files("/mnt/sdcard"))
       if adbdevice.process_exist("org.mozilla.geckoview.test_runner"):
           print("org.mozilla.geckoview.test_runner is running")
    """

    SOCKET_DIRECTION_REVERSE = "reverse"

    SOCKET_DIRECTION_FORWARD = "forward"

    # BUILTINS is used to determine which commands can not be executed
    # via su or run-as. This set of possible builtin commands was
    # obtained from `man builtin` on Linux.
    BUILTINS = set(
        [
            "alias",
            "bg",
            "bind",
            "break",
            "builtin",
            "caller",
            "cd",
            "command",
            "compgen",
            "complete",
            "compopt",
            "continue",
            "declare",
            "dirs",
            "disown",
            "echo",
            "enable",
            "eval",
            "exec",
            "exit",
            "export",
            "false",
            "fc",
            "fg",
            "getopts",
            "hash",
            "help",
            "history",
            "jobs",
            "kill",
            "let",
            "local",
            "logout",
            "mapfile",
            "popd",
            "printf",
            "pushd",
            "pwd",
            "read",
            "readonly",
            "return",
            "set",
            "shift",
            "shopt",
            "source",
            "suspend",
            "test",
            "times",
            "trap",
            "true",
            "type",
            "typeset",
            "ulimit",
            "umask",
            "unalias",
            "unset",
            "wait",
        ]
    )

    def __init__(
        self,
        device=None,
        adb="adb",
        adb_host=None,
        adb_port=None,
        test_root=None,
        logger_name="adb",
        timeout=300,
        verbose=False,
        device_ready_retry_wait=20,
        device_ready_retry_attempts=3,
        use_root=True,
        share_test_root=True,
        run_as_package=None,
    ):
        global _TEST_ROOT

        ADBCommand.__init__(
            self,
            adb=adb,
            adb_host=adb_host,
            adb_port=adb_port,
            logger_name=logger_name,
            timeout=timeout,
            verbose=verbose,
            use_root=use_root,
        )
        self._logger.info("Using adb %s" % self._adb_version)
        self._device_serial = self._get_device_serial(device)
        self._initial_test_root = test_root
        self._share_test_root = share_test_root
        if share_test_root and not _TEST_ROOT:
            _TEST_ROOT = test_root
        self._test_root = None
        self._run_as_package = None
        # Cache packages debuggable state.
        self._debuggable_packages = {}
        self._device_ready_retry_wait = device_ready_retry_wait
        self._device_ready_retry_attempts = device_ready_retry_attempts
        self._have_root_shell = False
        self._have_su = False
        self._have_android_su = False
        self._selinux = None
        self._re_internal_storage = None

        self._wait_for_boot_completed(timeout=timeout)

        # Record the start time of the ADBDevice initialization so we can
        # determine if we should abort with an ADBTimeoutError if it is
        # taking too long.
        start_time = time.time()

        # Attempt to get the Android version as early as possible in order
        # to work around differences in determining adb command exit codes
        # in Android before and after Android 7.
        self.version = 0
        while self.version < 1 and (time.time() - start_time) <= float(timeout):
            try:
                version = self.get_prop("ro.build.version.sdk", timeout=timeout)
                self.version = int(version)
            except ValueError:
                self._logger.info("unexpected ro.build.version.sdk: '%s'" % version)
                time.sleep(2)
        if self.version < 1:
            # note slightly different meaning to the ADBTimeoutError here (and above):
            # failed to get valid (numeric) version string in all attempts in allowed time
            raise ADBTimeoutError(
                "ADBDevice: unable to determine ro.build.version.sdk."
            )

        self._mkdir_p = None
        # Force the use of /system/bin/ls or /system/xbin/ls in case
        # there is /sbin/ls which embeds ansi escape codes to colorize
        # the output.  Detect if we are using busybox ls. We want each
        # entry on a single line and we don't want . or ..
        ls_dir = "/system"

        # Using self.is_file is problematic on emulators either
        # using ls or test to check for their existence.
        # Executing the command to detect its existence works around
        # any issues with ls or test.
        boot_completed = False
        while not boot_completed and (time.time() - start_time) <= float(timeout):
            try:
                self.shell_output(
                    "/system/bin/ls /system/bin/ls", timeout=timeout, attempts=3
                )
                boot_completed = True
                self._ls = "/system/bin/ls"
            except ADBError as e1:
                self._logger.debug(f"detect /system/bin/ls {e1}")
                try:
                    self.shell_output(
                        "/system/xbin/ls /system/xbin/ls", timeout=timeout, attempts=3
                    )
                    boot_completed = True
                    self._ls = "/system/xbin/ls"
                except ADBError as e2:
                    self._logger.debug(f"detect /system/xbin/ls : {e2}")
            if not boot_completed:
                time.sleep(2)
        if not boot_completed:
            raise ADBError("ADBDevice.__init__: ls could not be found")

        # A race condition can occur especially with emulators where
        # the device appears to be available but it has not completed
        # mounting the sdcard. We can work around this by checking if
        # the sdcard is missing when we attempt to ls it and retrying
        # if it is not yet available.
        boot_completed = False
        while not boot_completed and (time.time() - start_time) <= float(timeout):
            try:
                self.shell_output(
                    f"{self._ls} -1A {ls_dir}", timeout=timeout, attempts=3
                )
                boot_completed = True
                self._ls += " -1A"
            except ADBError as e:
                self._logger.debug(f"detect ls -1A: {e}")
                if "No such file or directory" not in str(e):
                    boot_completed = True
                    self._ls += " -a"
            if not boot_completed:
                time.sleep(2)
        if not boot_completed:
            raise ADBTimeoutError("ADBDevice: /sdcard not found.")

        self._logger.info("%s supported" % self._ls)

        # builtin commands which do not exist as separate programs can
        # not be executed using su or run-as.  Remove builtin commands
        # from self.BUILTINS which also exist as separate programs so
        # that we will be able to execute them using su or run-as if
        # necessary.
        remove_builtins = set()
        for builtin in self.BUILTINS:
            try:
                # It is important not to retry ls (with attempts=) here
                # since many of these calls are expected to fail:
                # additional retries can introduce a significant delay.
                self.ls("/system/*bin/%s" % builtin, timeout=timeout)
                self._logger.debug("Removing %s from BUILTINS" % builtin)
                remove_builtins.add(builtin)
            except ADBError:
                pass
        self.BUILTINS.difference_update(remove_builtins)

        # Do we have cp?
        boot_completed = False
        while not boot_completed and (time.time() - start_time) <= float(timeout):
            try:
                self.shell_output("cp --help", timeout=timeout)
                boot_completed = True
                self._have_cp = True
            except ADBError as e:
                if "not found" in str(e):
                    self._have_cp = False
                    boot_completed = True
                elif "known option" in str(e):
                    self._have_cp = True
                    boot_completed = True
                elif "invalid option" in str(e):
                    self._have_cp = True
                    boot_completed = True
            if not boot_completed:
                time.sleep(2)
        if not boot_completed:
            raise ADBTimeoutError("ADBDevice: cp not found.")
        self._logger.info("Native cp support: %s" % self._have_cp)

        # Do we have chmod -R?
        try:
            self._chmod_R = False
            re_recurse = re.compile(r"[-]R")
            chmod_output = self.shell_output("chmod --help", timeout=timeout)
            match = re_recurse.search(chmod_output)
            if match:
                self._chmod_R = True
        except ADBError as e:
            self._logger.debug(f"Check chmod -R: {e}")
            match = re_recurse.search(str(e))
            if match:
                self._chmod_R = True
        self._logger.info(f"Native chmod -R support: {self._chmod_R}")

        # Do we have chown -R?
        try:
            self._chown_R = False
            chown_output = self.shell_output("chown --help", timeout=timeout)
            match = re_recurse.search(chown_output)
            if match:
                self._chown_R = True
        except ADBError as e:
            self._logger.debug(f"Check chown -R: {e}")
        self._logger.info(f"Native chown -R support: {self._chown_R}")

        try:
            cleared = self.shell_bool('logcat -P ""', timeout=timeout)
        except ADBError:
            cleared = False
        if not cleared:
            self._logger.info("Unable to turn off logcat chatty")

        # Do we have pidof?
        if self.version < version_codes.N:
            # unexpected pidof behavior observed on Android 6 in bug 1514363
            self._have_pidof = False
        else:
            boot_completed = False
            while not boot_completed and (time.time() - start_time) <= float(timeout):
                try:
                    self.shell_output("pidof --help", timeout=timeout)
                    boot_completed = True
                    self._have_pidof = True
                except ADBError as e:
                    if "not found" in str(e):
                        self._have_pidof = False
                        boot_completed = True
                    elif "known option" in str(e):
                        self._have_pidof = True
                        boot_completed = True
                if not boot_completed:
                    time.sleep(2)
            if not boot_completed:
                raise ADBTimeoutError("ADBDevice: pidof not found.")
        # Bug 1529960 observed pidof intermittently returning no results for a
        # running process on the 7.0 x86_64 emulator.

        characteristics = self.get_prop("ro.build.characteristics", timeout=timeout)

        abi = self.get_prop("ro.product.cpu.abi", timeout=timeout)
        self._have_flaky_pidof = (
            self.version == version_codes.N
            and abi == "x86_64"
            and "emulator" in characteristics
        )
        self._logger.info(
            "Native {} pidof support: {}".format(
                "flaky" if self._have_flaky_pidof else "normal", self._have_pidof
            )
        )

        if self._use_root:
            # Detect if root is available, but do not fail if it is not.
            # Catch exceptions due to the potential for segfaults
            # calling su when using an improperly rooted device.

            self._check_adb_root(timeout=timeout)

            if not self._have_root_shell:
                # To work around bug 1525401 where su -c id will return an
                # exitcode of 1 if selinux permissive is not already in effect,
                # we need su to turn off selinux prior to checking for su.
                # We can use shell() directly to prevent the non-zero exitcode
                # from raising an ADBError.
                # Note: We are assuming su -c is supported and do not attempt to
                # use su 0.
                adb_process = self.shell("su -c setenforce 0")
                self._logger.info(
                    "su -c setenforce 0 exitcode %s, stdout: %s"
                    % (adb_process.proc.poll(), adb_process.proc.stdout)
                )

                uid = "uid=0"
                # Do we have a 'Superuser' sh like su?
                try:
                    if self.shell_output("su -c id", timeout=timeout).find(uid) != -1:
                        self._have_su = True
                        self._logger.info("su -c supported")
                except ADBError as e:
                    self._logger.debug(f"Check for su -c failed: {e}")

                # Check if Android's su 0 command works.
                # If we already have detected su -c support, we can skip this check.
                try:
                    if (
                        not self._have_su
                        and self.shell_output("su 0 id", timeout=timeout).find(uid)
                        != -1
                    ):
                        self._have_android_su = True
                        self._logger.info("su 0 supported")
                except ADBError as e:
                    self._logger.debug(f"Check for su 0 failed: {e}")

        # Guarantee that /data/local/tmp exists and is accessible to all.
        # It is a fatal error if /data/local/tmp does not exist and can not be created.
        if not self.exists("/data/local/tmp", timeout=timeout):
            # parents=True is required on emulator, where exist() may be flaky
            self.mkdir("/data/local/tmp", parents=True, timeout=timeout)

        # Beginning in Android 8.1 /data/anr/traces.txt no longer contains
        # a single file traces.txt but instead will contain individual files
        # for each stack.
        # See https://github.com/aosp-mirror/platform_build/commit/
        # fbba7fe06312241c7eb8c592ec2ac630e4316d55
        stack_trace_dir = self.shell_output(
            "getprop dalvik.vm.stack-trace-dir", timeout=timeout, attempts=3
        )
        if not stack_trace_dir:
            stack_trace_file = self.shell_output(
                "getprop dalvik.vm.stack-trace-file", timeout=timeout, attempts=3
            )
            if stack_trace_file:
                stack_trace_dir = posixpath.dirname(stack_trace_file)
            else:
                stack_trace_dir = "/data/anr"
        self.stack_trace_dir = stack_trace_dir
        self.enforcing = "Permissive"
        self.run_as_package = run_as_package

        self._logger.debug("ADBDevice: %s" % self.__dict__)

    @property
    def is_rooted(self):
        return self._have_root_shell or self._have_su or self._have_android_su

    def _wait_for_boot_completed(self, timeout=None):
        """Internal method to wait for boot to complete.

        Wait for sys.boot_completed=1 and raise ADBError if boot does
        not complete within retry attempts.

        :param int timeout: The default maximum time in
            seconds for any spawned adb process to complete before
            throwing an ADBTimeoutError.  This timeout is per adb call. The
            total time spent may exceed this value. If it is not
            specified, the value defaults to 300.
        :raises: :exc:`ADBError`
        """
        for attempt in range(self._device_ready_retry_attempts):
            sys_boot_completed = self.shell_output(
                "getprop sys.boot_completed", timeout=timeout, attempts=3
            )
            if sys_boot_completed == "1":
                break
            time.sleep(self._device_ready_retry_wait)
        if sys_boot_completed != "1":
            raise ADBError("Failed to complete boot in time")

    def _get_device_serial(self, device):
        device = device or os.environ.get("ANDROID_SERIAL")
        if device is None:
            devices = ADBHost(
                adb=self._adb_path, adb_host=self._adb_host, adb_port=self._adb_port
            ).devices()
            if len(devices) > 1:
                raise ValueError(
                    "ADBDevice called with multiple devices "
                    "attached and no device specified"
                )
            if len(devices) == 0:
                raise ADBError("No connected devices found.")
            device = devices[0]

        # Allow : in device serial if it matches a tcpip device serial.
        re_device_serial_tcpip = re.compile(r"[^:]+:[0-9]+$")

        def is_valid_serial(serial):
            return (
                serial.startswith("usb:")
                or re_device_serial_tcpip.match(serial) is not None
                or ":" not in serial
            )

        if isinstance(device, str):
            # Treat this as a device serial
            if not is_valid_serial(device):
                raise ValueError(
                    "Device serials containing ':' characters are "
                    "invalid. Pass the output from "
                    "ADBHost.devices() for the device instead"
                )
            return device

        serial = device.get("device_serial")
        if serial is not None and is_valid_serial(serial):
            return serial
        usb = device.get("usb")
        if usb is not None:
            return "usb:%s" % usb

        raise ValueError("Unable to get device serial")

    def _check_root_user(self, timeout=None):
        uid = "uid=0"
        # Is shell already running as root?
        try:
            if self.shell_output("id", timeout=timeout).find(uid) != -1:
                self._logger.info("adbd running as root")
                return True
        except ADBError:
            self._logger.debug("Check for root user failed")
        return False

    def _check_adb_root(self, timeout=None):
        self._have_root_shell = self._check_root_user(timeout=timeout)

        # Exclude these devices from checking for a root shell due to
        # potential hangs.
        exclude_set = set()
        exclude_set.add("E5823")  # Sony Xperia Z5 Compact (E5823)
        # Do we need to run adb root to get a root shell?
        if not self._have_root_shell:
            if self.get_prop("ro.product.model"in exclude_set:
                self._logger.warning(
                    "your device was excluded from attempting adb root."
                )
            else:
                try:
                    self.command_output(["root"], timeout=timeout)
                    self._have_root_shell = self._check_root_user(timeout=timeout)
                    if self._have_root_shell:
                        self._logger.info("adbd restarted as root")
                    else:
                        self._logger.info("adbd not restarted as root")
                except ADBError:
                    self._logger.debug("Check for root adbd failed")

    def pidof(self, app_name, timeout=None):
        """
        Return a list of pids for all extant processes running within the
        specified application package.

        :param str app_name: The name of the application package to examine
        :param int timeout: The maximum time in
            seconds for any spawned adb process to complete before
            throwing an ADBTimeoutError.  This timeout is per
            adb call. The total time spent may exceed this
            value. If it is not specified, the value set
            in the ADBDevice constructor is used.
        :return: List of integers containing the pid(s) of the various processes.
        :raises: :exc:`ADBTimeoutError`
        """
        if self._have_pidof:
            try:
                pid_output = self.shell_output("pidof %s" % app_name, timeout=timeout)
                re_pids = re.compile(r"[0-9]+")
                pids = re_pids.findall(pid_output)
                if self._have_flaky_pidof and not pids:
                    time.sleep(0.1)
                    pid_output = self.shell_output(
                        "pidof %s" % app_name, timeout=timeout
                    )
                    pids = re_pids.findall(pid_output)
            except ADBError:
                pids = []
        else:
            procs = self.get_process_list(timeout=timeout)
            # limit the comparion to the first 75 characters due to a
            # limitation in processname length in android.
            pids = [proc[0] for proc in procs if proc[1] == app_name[:75]]

        return [int(pid) for pid in pids]

    def _sync(self, timeout=None):
        """Sync the file system using shell_output in order that exceptions
        are raised to the caller."""
        self.shell_output("sync", timeout=timeout, attempts=3)

    @staticmethod
    def _should_quote(arg):
        """Utility function if command argument should be quoted."""
        if not arg:
            return False
        if arg[0] == "'" and arg[-1] == "'" or arg[0] == '"' and arg[-1] == '"':
            # Already quoted
            return False
        re_quotable_chars = re.compile(r"[ ()\"&'\];]")
        return re_quotable_chars.search(arg)

    @staticmethod
    def _quote(arg):
        """Utility function to return quoted version of command argument."""
        if hasattr(shlex, "quote"):
            quote = shlex.quote
        else:

            def quote(arg):
                arg = arg or ""
                re_unsafe = re.compile(r"[^\w@%+=:,./-]")
                if re_unsafe.search(arg):
                    arg = "'" + arg.replace("'""'\"'\"'") + "'"
                return arg

        return quote(arg)

    @staticmethod
    def _escape_command_line(cmds):
        """Utility function which takes a list of command arguments and returns
        escaped and quoted version of the command as a string.
        """
        assert isinstance(cmds, list)
        # This is identical to shlex.join in Python 3.8. We can
        # replace it should we ever get Python 3.8 as a minimum.
        quoted_cmd = " ".join([ADBDevice._quote(arg) for arg in cmds])

        return quoted_cmd

    @staticmethod
    def _get_exitcode(file_obj):
        """Get the exitcode from the last line of the file_obj for shell
        commands executed on Android prior to Android 7.
        """
        re_returncode = re.compile(r"adb_returncode=([0-9]+)")
        file_obj.seek(0, os.SEEK_END)

        line = ""
        length = file_obj.tell()
        offset = 1
        while length - offset >= 0:
            file_obj.seek(-offset, os.SEEK_END)
            char = file_obj.read(1).decode()
            if not char:
                break
            if char != "\r" and char != "\n":
                line = char + line
            elif line:
                # we have collected everything up to the beginning of the line
                break
            offset += 1
        match = re_returncode.match(line)
        if match:
            exitcode = int(match.group(1))
            # Set the position in the file to the position of the
            # adb_returncode and truncate it from the output.
            file_obj.seek(-1, os.SEEK_CUR)
            file_obj.truncate()
        else:
            exitcode = None
            # We may have a situation where the adb_returncode= is not
            # at the end of the output. This happens at least in the
            # failure jit-tests on arm. To work around this
            # possibility, we can search the entire output for the
            # appropriate match.
            file_obj.seek(0, os.SEEK_SET)
            for line in file_obj:
                line_str = line.decode()
                match = re_returncode.search(line_str)
                if match:
                    exitcode = int(match.group(1))
                    break
            # Reset the position in the file to the end.
            file_obj.seek(0, os.SEEK_END)

        return exitcode

    def is_path_internal_storage(self, path, timeout=None):
        """
        Return True if the path matches an internal storage path
        as defined by either '/sdcard''/mnt/sdcard'or any of the
        .*_STORAGE environment variables on the device otherwise False.

        :param str path: The path to test.
        :param int timeout: The maximum time in
            seconds for any spawned adb process to complete before
            throwing an ADBTimeoutError.  This timeout is per adb call. The
            total time spent may exceed this value. If it is not
            specified, the value set in the ADBDevice constructor is used.
        :return: boolean
        :raises: :exc:`ADBTimeoutError`
                 :exc:`ADBError`
        """
        if not self._re_internal_storage:
            storage_dirs = set(["/mnt/sdcard""/sdcard"])
            re_STORAGE = re.compile("([^=]+STORAGE)=(.*)")
            lines = self.shell_output("set", timeout=timeout).split()
            for line in lines:
                m = re_STORAGE.match(line.strip())
                if m and m.group(2):
                    storage_dirs.add(m.group(2))
            self._re_internal_storage = re.compile("/|".join(list(storage_dirs)) + "/")
        return self._re_internal_storage.match(path) is not None

    def is_package_debuggable(self, package):
        if not package:
            return False

        if not self.is_app_installed(package):
            self._logger.warning(
                "Can not check if package %s is debuggable as it is not installed."
                % package
            )
            return False

        if package in self._debuggable_packages:
            return self._debuggable_packages[package]

        try:
            self.shell_output("run-as %s ls /system" % package)
            self._debuggable_packages[package] = True
        except ADBError as e:
            self._debuggable_packages[package] = False
            self._logger.warning(f"Package {package} is not debuggable: {str(e)}")
        return self._debuggable_packages[package]

    @property
    def package_dir(self):
        if not self._run_as_package:
            return None
        # If we have a debuggable app and can use its directory to
        # locate the test_root, this returns the location of the app's
        # directory. If it is not located in the default location this
        # will not be correct.
        return "/data/data/%s" % self._run_as_package

    @property
    def run_as_package(self):
        """Returns the name of the package which will be used in run-as to change
        the effective user executing a command."""
        return self._run_as_package

    @run_as_package.setter
    def run_as_package(self, value):
        if self._have_root_shell or self._have_su or self._have_android_su:
            # When we have root available, use that instead of run-as.
            return

        if self._run_as_package == value:
            # Do nothing if the value doesn't change.
            return

        if not value:
            if self._test_root:
                # Make sure the old test_root is clean without using
                # the test_root property getter.
                self.rm(
                    posixpath.join(self._test_root, "*"), recursive=True, force=True
                )
            self._logger.info(
                "Setting run_as_package to None. Resetting test root from %s to %s"
                % (self._test_root, self._initial_test_root)
            )
            self._run_as_package = None
            # We must set _run_as_package to None before assigning to
            # self.test_root in order to prevent attempts to use
            # run-as.
            self.test_root = self._initial_test_root
            if self._test_root:
                # Make sure the new test_root is clean.
                self.rm(
                    posixpath.join(self._test_root, "*"), recursive=True, force=True
                )
            return

        if not self.is_package_debuggable(value):
            self._logger.warning(
                "Can not set run_as_package to %s since it is not debuggable." % value
            )
            # Since we are attempting to set run_as_package assume
            # that we are not rooted and do not include
            # /data/local/tmp as an option when checking for possible
            # test_root paths using external storage.
            paths = [
                "/storage/emulated/0/Android/data/%s/test_root" % value,
                "/sdcard/test_root",
                "/mnt/sdcard/test_root",
            ]
            self._try_test_root_candidates(paths)
            return

        # Require these devices to have Verify bytecode turned off due to failures with run-as.
        include_set = set()
        include_set.add("SM-G973F")  # Samsung S10g SM-G973F

        if (
            self.get_prop("ro.product.model"in include_set
            and self.shell_output("settings get global art_verifier_verify_debuggable")
            == "1"
        ):
            self._logger.warning(
                """Your device has Verify bytecode of debuggable apps set which
            causes problems attempting to use run-as to delegate command execution to debuggable
            apps. You must turn this setting off in Developer options on your device.
            """
            )
            raise ADBError(
                "Verify bytecode of debuggable apps must be turned off to use run-as"
            )

        self._logger.info("Setting run_as_package to %s" % value)

        self._run_as_package = value
        old_test_root = self._test_root
        new_test_root = posixpath.join(self.package_dir, "test_root")
        if old_test_root != new_test_root:
            try:
                # Make sure the old test_root is clean.
                if old_test_root:
                    self.rm(
                        posixpath.join(old_test_root, "*"), recursive=True, force=True
                    )
                self.test_root = posixpath.join(self.package_dir, "test_root")
                # Make sure the new test_root is clean.
                self.rm(posixpath.join(self.test_root, "*"), recursive=True, force=True)
            except ADBError as e:
                # There was a problem using run-as to initialize
                # the new test_root in the app's directory.
                # Restore the old test root and raise an ADBError.
                self._run_as_package = None
                self.test_root = old_test_root
                self._logger.warning(
                    "Exception %s setting test_root to %s. "
                    "Resetting test_root to %s."
                    % (str(e), new_test_root, old_test_root)
                )
                raise ADBError(
                    "Unable to initialize test root while setting run_as_package %s"
                    % value
                )

    def enable_run_as_for_path(self, path):
        return self._run_as_package is not None and path.startswith(self.package_dir)

    @property
    def test_root(self):
        """
        The test_root property returns the directory on the device where
        temporary test files are stored.

        The first time test_root it is called it determines and caches a value
        for the test root on the device. It determines the appropriate test
        root by attempting to create a 'proof' directory on each of a list of
        directories and returning the first successful directory as the
        test_root value. The cached value for the test_root will be shared
        by subsequent instances of ADBDevice if self._share_test_root is True.

        The default list of directories checked by test_root are:

        If the device is rooted:
            - /data/local/tmp/test_root

        If run_as_package is not available and the device is not rooted:

            - /data/local/tmp/test_root
            - /sdcard/test_root
            - /storage/sdcard/test_root
            - /mnt/sdcard/test_root

        You may override the default list by providing a test_root argument to
        the :class:`ADBDevice` constructor which will then be used when
        attempting to create the 'proof' directory.

        :raises: :exc:`ADBTimeoutError`
                 :exc:`ADBError`
        """
        if self._test_root is not None:
            self._logger.debug("Using cached test_root %s" % self._test_root)
            return self._test_root

        if self.run_as_package is not None:
            raise ADBError(
                "run_as_package is %s however test_root is None" % self.run_as_package
            )

        if self._share_test_root and _TEST_ROOT:
            self._logger.debug(
                "Attempting to use shared test_root %s" % self._test_root
            )
            paths = [_TEST_ROOT]
        elif self._initial_test_root is not None:
            self._logger.debug(
                "Attempting to use initial test_root %s" % self._test_root
            )
            paths = [self._initial_test_root]
        else:
            # Android 10's scoped storage means we can no longer
            # reliably host profiles and tests on the sdcard though it
            # depends on the device. See
            https://developer.android.com/training/data-storage#scoped-storage
            # Also see RunProgram in
            # python/mozbuild/mozbuild/mach_commands.py where they
            # choose /data/local/tmp as the default location for the
            # profile because GeckoView only takes its configuration
            # file from /data/local/tmp.  Since we have not specified
            # a run_as_package yet, assume we may be attempting to use
            # a shell program which creates files owned by the shell
            # user and which would work using /data/local/tmp/ even if
            # the device is not rooted. Fall back to external storage
            # if /data/local/tmp is not available.
            paths = ["/data/local/tmp/test_root"]
            if not self.is_rooted:
                # Note that /sdcard may be accessible while
                # /mnt/sdcard is not.
                paths.extend(
                    [
                        "/sdcard/test_root",
                        "/storage/sdcard/test_root",
                        "/mnt/sdcard/test_root",
                    ]
                )

        return self._try_test_root_candidates(paths)

    @test_root.setter
    def test_root(self, value):
        # Cache the requested test root so that
        # other invocations of ADBDevice will pick
        # up the same value.
        global _TEST_ROOT
        if self._test_root == value:
            return
        self._logger.debug(f"Setting test_root from {self._test_root} to {value}")
        old_test_root = self._test_root
        self._test_root = value
        if self._share_test_root:
            _TEST_ROOT = value
        if not value:
            return
        if not self._try_test_root(value):
            self._test_root = old_test_root
            raise ADBError("Unable to set test_root to %s" % value)
        readme = posixpath.join(value, "README")
        if not self.is_file(readme):
            tmpf = tempfile.NamedTemporaryFile(mode="w", delete=False)
            tmpf.write(
                "This directory is used by mozdevice to contain all content "
                "related to running tests on this device.\n"
            )
            tmpf.close()
            try:
                self.push(tmpf.name, readme)
            finally:
                if tmpf:
                    os.unlink(tmpf.name)

    def _try_test_root_candidates(self, paths):
        max_attempts = 3
        for test_root in paths:
            for attempt in range(1, max_attempts + 1):
                self._logger.debug(
                    "Setting test root to %s attempt %d of %d"
                    % (test_root, attempt, max_attempts)
                )

                if self._try_test_root(test_root):
                    if not self._test_root:
                        # Cache the detected test_root so that we can
                        # restore the value without having re-run
                        # _try_test_root.
                        self._initial_test_root = test_root
                    self._test_root = test_root
                    self._logger.info("Setting test_root to %s" % self._test_root)
                    return self._test_root

                self._logger.debug(
                    "_setup_test_root: "
                    "Attempt %d of %d failed to set test_root to %s"
                    % (attempt, max_attempts, test_root)
                )

                if attempt != max_attempts:
                    time.sleep(20)

        raise ADBError(
            "Unable to set up test root using paths: [%s]" % ", ".join(paths)
        )

    def _try_test_root(self, test_root):
        try:
            if not self.is_dir(test_root):
                self.mkdir(test_root, parents=True)
            proof_dir = posixpath.join(test_root, "proof")
            if self.is_dir(proof_dir):
                self.rm(proof_dir, recursive=True)
            self.mkdir(proof_dir)
            self.rm(proof_dir, recursive=True)
        except ADBError as e:
            self._logger.warning(f"{test_root} is not writable: {str(e)}")
            return False

        return True

    # Host Command methods

    def command(self, cmds, timeout=None):
        """Executes an adb command on the host against the device.

        :param list cmds: The command and its arguments to be
            executed.
        :param int timeout: The maximum time in
            seconds for any spawned adb process to complete before
            throwing an ADBTimeoutError.  This timeout is per adb call. The
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=91 H=91 G=90

¤ Dauer der Verarbeitung: 0.27 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.