def path_from_usb_dev(dev): """Takes a pyUSB device as argument and returns a string.
The string is a Path representation of the position of the USB device on the USB bus tree.
This path is used to find a USB device on the bus or all devices connected to a HUB.
The path is made up of the number of the USB controller followed be the ports of the HUB tree.""" if dev.port_numbers:
dev_path = ".".join(str(i) for i in dev.port_numbers) return f"{dev.bus}-{dev_path}" return""
HEXDUMP_FILTER = "".join(chr(x).isprintable() and chr(x) or"."for x in range(128)) + "." * 128
class Forwarder:
@staticmethod def _log_hexdump(data): ifnot logging.root.isEnabledFor(logging.TRACE): return
L = 16 for c in range(0, len(data), L):
chars = data[c : c + L]
dump = " ".join(f"{x:02x}"for x in chars)
printable = "".join(HEXDUMP_FILTER[x] for x in chars)
line = f"{c:08x} {dump:{L*3}s} |{printable:{L}s}|"
logging.root.log(logging.TRACE, "%s", line)
dev = usb.core.find(idVendor=vid, idProduct=pid, custom_match=find_filter) if dev isNone: raise ValueError("Device not found")
logging.info(f"found device: {dev.bus}/{dev.address} located at {path_from_usb_dev(dev)}")
# dev.set_configuration() is not necessary since g_multi has only one
usb9pfs = None # g_multi adds 9pfs as last interface
cfg = dev.get_active_configuration() for intf in cfg: # we have to detach the usb-storage driver from multi gadget since # stall option could be set, which will lead to spontaneous port # resets and our transfers will run dead if intf.bInterfaceClass == 0x08: if dev.is_kernel_driver_active(intf.bInterfaceNumber):
dev.detach_kernel_driver(intf.bInterfaceNumber)
if intf.bInterfaceClass == 0xFF and intf.bInterfaceSubClass == 0xFF and intf.bInterfaceProtocol == 0x09:
usb9pfs = intf if usb9pfs isNone: raise ValueError("Interface not found")
self.ep_out = ep_out
self.ep_in = ep_in
self.dev = dev
# create and connect socket
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect(server)
logging.info("connected to server")
def c2s(self): """forward a request from the USB client to the TCP server"""
data = None while data isNone: try:
logging.log(logging.TRACE, "c2s: reading")
data = self.ep_in.read(self.ep_in.wMaxPacketSize) except usb.core.USBTimeoutError:
logging.log(logging.TRACE, "c2s: reading timed out") continue except usb.core.USBError as e: if e.errno == errno.EIO:
logging.debug("c2s: reading failed with %s, retrying", repr(e))
time.sleep(0.5) continue
logging.error("c2s: reading failed with %s, aborting", repr(e)) raise
size = struct.unpack(", data[:4])[0] while len(data) < size:
data += self.ep_in.read(size - len(data))
logging.log(logging.TRACE, "c2s: writing")
self._log_hexdump(data)
self.s.send(data)
logging.debug("c2s: forwarded %i bytes", size)
self.stats["c2s packets"] += 1
self.stats["c2s bytes"] += size
def s2c(self): """forward a response from the TCP server to the USB client"""
logging.log(logging.TRACE, "s2c: reading")
data = self.s.recv(4)
size = struct.unpack(", data[:4])[0] while len(data) < size:
data += self.s.recv(size - len(data))
logging.log(logging.TRACE, "s2c: writing")
self._log_hexdump(data) while data:
written = self.ep_out.write(data) assert written > 0
data = data[written:] if size % self.ep_out.wMaxPacketSize == 0:
logging.log(logging.TRACE, "sending zero length packet")
self.ep_out.write(b"")
logging.debug("s2c: forwarded %i bytes", size)
self.stats["s2c packets"] += 1
self.stats["s2c bytes"] += size
def log_stats(self):
logging.info("statistics:") for k, v in self.stats.items():
logging.info(f" {k+':':14s} {v}")
def log_stats_interval(self, interval=5): if (time.monotonic() - self.stats_logged) < interval: return
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.