def annotate_extack(self, attr_space): """ Make extack more human friendly with attribute information """
# We don't have the ability to parse nests yet, so only do global if'miss-type'in self.extack and'miss-nest'notin self.extack:
miss_type = self.extack['miss-type'] if miss_type in attr_space.attrs_by_val:
spec = attr_space.attrs_by_val[miss_type]
self.extack['miss-type'] = spec['name'] if'doc'in spec:
self.extack['miss-type-doc'] = spec['doc']
def decode(self, ynl, nl_msg, op):
msg = self._decode(nl_msg) if op isNone:
op = ynl.rsp_by_value[msg.cmd()]
fixed_header_size = ynl._struct_size(op.fixed_header)
msg.raw_attrs = NlAttrs(msg.raw, fixed_header_size) return msg
def get_mcast_id(self, mcast_name, mcast_groups): if mcast_name notin mcast_groups: raise Exception(f'Multicast group "{mcast_name}" not present in the spec') return mcast_groups[mcast_name].value
def msghdr_size(self): return 16
class GenlProtocol(NetlinkProtocol): def __init__(self, family_name):
super().__init__(family_name, Netlink.NETLINK_GENERIC)
global genl_family_name_to_id if genl_family_name_to_id isNone:
_genl_load_families()
def get_mcast_id(self, mcast_name, mcast_groups): if mcast_name notin self.genl_family['mcast']: raise Exception(f'Multicast group "{mcast_name}" not present in the family') return self.genl_family['mcast'][mcast_name]
def lookup(self, name): for scope in self.scopes: if name in scope.spec: if name in scope.values: return scope.values[name]
spec_name = scope.spec.yaml['name'] raise Exception(
f"No value for '{name}' in attribute space '{spec_name}'") raise Exception(f"Attribute '{name}' not defined in any attribute-set")
# # YNL implementation details. #
class YnlFamily(SpecFamily): def __init__(self, def_path, schema=None, process_unknown=False,
recv_size=0):
super().__init__(def_path, schema)
try: if self.proto == "netlink-raw":
self.nlproto = NetlinkProtocol(self.yaml['name'],
self.yaml['protonum']) else:
self.nlproto = GenlProtocol(self.yaml['name']) except KeyError: raise Exception(f"Family '{self.yaml['name']}' not supported by the kernel")
self._recv_dbg = False # Note that netlink will use conservative (min) message size for # the first dump recv() on the socket, our setting will only matter # from the second recv() on.
self._recv_size = recv_size if recv_size else 131072 # Netlink will always allocate at least PAGE_SIZE - sizeof(skb_shinfo) # for a message, so smaller receive sizes will lead to truncation. # Note that the min size for other families may be larger than 4k! if self._recv_size < 4000: raise ConfigError()
if attr.is_multi and isinstance(value, list):
attr_payload = b'' for subvalue in value:
attr_payload += self._add_attr(space, name, subvalue, search_attrs) return attr_payload
if attr["type"] == 'nest':
nl_type |= Netlink.NLA_F_NESTED
attr_payload = b''
sub_space = attr['nested-attributes']
sub_attrs = SpaceAttrs(self.attr_sets[sub_space], value, search_attrs) for subname, subvalue in value.items():
attr_payload += self._add_attr(sub_space, subname, subvalue, sub_attrs) elif attr["type"] == 'flag': ifnot value: # If value is absent or false then skip attribute creation. return b''
attr_payload = b'' elif attr["type"] == 'string':
attr_payload = str(value).encode('ascii') + b'\x00' elif attr["type"] == 'binary': if value isNone:
attr_payload = b'' elif isinstance(value, bytes):
attr_payload = value elif isinstance(value, str): if attr.display_hint:
attr_payload = self._from_string(value, attr) else:
attr_payload = bytes.fromhex(value) elif isinstance(value, dict) and attr.struct_name:
attr_payload = self._encode_struct(attr.struct_name, value) elif isinstance(value, list) and attr.sub_type in NlAttr.type_formats:
format = NlAttr.get_format(attr.sub_type)
attr_payload = b''.join([format.pack(x) for x in value]) else: raise Exception(f'Unknown type for binary attribute, value: {value}') elif attr['type'] in NlAttr.type_formats or attr.is_auto_scalar:
scalar = self._get_scalar(attr, value) if attr.is_auto_scalar:
attr_type = attr["type"][0] + ('32'if scalar.bit_length() <= 32 else'64') else:
attr_type = attr["type"]
format = NlAttr.get_format(attr_type, attr.byte_order)
attr_payload = format.pack(scalar) elif attr['type'] in"bitfield32":
scalar_value = self._get_scalar(attr, value["value"])
scalar_selector = self._get_scalar(attr, value["selector"])
attr_payload = struct.pack("II", scalar_value, scalar_selector) elif attr['type'] == 'sub-message':
msg_format, _ = self._resolve_selector(attr, search_attrs)
attr_payload = b'' if msg_format.fixed_header:
attr_payload += self._encode_struct(msg_format.fixed_header, value) if msg_format.attr_set: if msg_format.attr_set in self.attr_sets:
nl_type |= Netlink.NLA_F_NESTED
sub_attrs = SpaceAttrs(msg_format.attr_set, value, search_attrs) for subname, subvalue in value.items():
attr_payload += self._add_attr(msg_format.attr_set,
subname, subvalue, sub_attrs) else: raise Exception(f"Unknown attribute-set '{msg_format.attr_set}'") else: raise Exception(f'Unknown type at {space} {name} {value} {attr["type"]}')
def _get_enum_or_unknown(self, enum, raw): try:
name = enum.entries_by_val[raw].name except KeyError as error: if self.process_unknown:
name = f"Unknown({raw})" else: raise error return name
def _decode_enum(self, raw, attr_spec):
enum = self.consts[attr_spec['enum']] if enum.type == 'flags'or attr_spec.get('enum-as-flags', False):
i = 0
value = set() while raw: if raw & 1:
value.add(self._get_enum_or_unknown(enum, i))
raw >>= 1
i += 1 else:
value = self._get_enum_or_unknown(enum, raw) return value
def _decode_binary(self, attr, attr_spec): if attr_spec.struct_name:
decoded = self._decode_struct(attr.raw, attr_spec.struct_name) elif attr_spec.sub_type:
decoded = attr.as_c_array(attr_spec.sub_type) if'enum'in attr_spec:
decoded = [ self._decode_enum(x, attr_spec) for x in decoded ] elif attr_spec.display_hint:
decoded = [ self._formatted_string(x, attr_spec.display_hint) for x in decoded ] else:
decoded = attr.as_bin() if attr_spec.display_hint:
decoded = self._formatted_string(decoded, attr_spec.display_hint) return decoded
def _rsp_add(self, rsp, name, is_multi, decoded): if is_multi == None: if name in rsp and type(rsp[name]) isnot list:
rsp[name] = [rsp[name]]
is_multi = True else:
is_multi = False
ifnot is_multi:
rsp[name] = decoded elif name in rsp:
rsp[name].append(decoded) else:
rsp[name] = [decoded]
def _resolve_selector(self, attr_spec, search_attrs):
sub_msg = attr_spec.sub_message if sub_msg notin self.sub_msgs: raise Exception(f"No sub-message spec named {sub_msg} for {attr_spec.name}")
sub_msg_spec = self.sub_msgs[sub_msg]
selector = attr_spec.selector
value = search_attrs.lookup(selector) if value notin sub_msg_spec.formats: raise Exception(f"No message format for '{value}' in sub-message spec '{sub_msg}'")
spec = sub_msg_spec.formats[value] return spec, value
def _decode_sub_msg(self, attr, attr_spec, search_attrs):
msg_format, _ = self._resolve_selector(attr_spec, search_attrs)
decoded = {}
offset = 0 if msg_format.fixed_header:
decoded.update(self._decode_struct(attr.raw, msg_format.fixed_header));
offset = self._struct_size(msg_format.fixed_header) if msg_format.attr_set: if msg_format.attr_set in self.attr_sets:
subdict = self._decode(NlAttrs(attr.raw, offset), msg_format.attr_set)
decoded.update(subdict) else: raise Exception(f"Unknown attribute-set '{msg_format.attr_set}' when decoding '{attr_spec.name}'") return decoded
for attr in attrs: try:
attr_spec = attr_space.attrs_by_val[attr.type] except (KeyError, UnboundLocalError): ifnot self.process_unknown: raise Exception(f"Space '{space}' has no attribute with value '{attr.type}'")
attr_name = f"UnknownAttr({attr.type})"
self._rsp_add(rsp, attr_name, None, self._decode_unknown(attr)) continue
def _decode_extack_path(self, attrs, attr_set, offset, target, search_attrs): for attr in attrs: try:
attr_spec = attr_set.attrs_by_val[attr.type] except KeyError: raise Exception(f"Space '{attr_set.name}' has no attribute with value '{attr.type}'") if offset > target: break if offset == target: return'.' + attr_spec.name
if offset + attr.full_len <= target:
offset += attr.full_len continue
pathname = attr_spec.name if attr_spec['type'] == 'nest':
sub_attrs = self.attr_sets[attr_spec['nested-attributes']]
search_attrs = SpaceAttrs(sub_attrs, search_attrs.lookup(attr_spec['name'])) elif attr_spec['type'] == 'sub-message':
msg_format, value = self._resolve_selector(attr_spec, search_attrs) if msg_format isNone: raise Exception(f"Can't resolve sub-message of {attr_spec['name']} for extack")
sub_attrs = self.attr_sets[msg_format.attr_set]
pathname += f"({value})" else: raise Exception(f"Can't dive into {attr.type} ({attr_spec['name']}) for extack")
offset += 4
subpath = self._decode_extack_path(NlAttrs(attr.raw), sub_attrs,
offset, target, search_attrs) if subpath isNone: returnNone return'.' + pathname + subpath
def _struct_size(self, name): if name:
members = self.consts[name].members
size = 0 for m in members: if m.type in ['pad', 'binary']: if m.struct:
size += self._struct_size(m.struct) else:
size += m.len else:
format = NlAttr.get_format(m.type, m.byte_order)
size += format.size return size else: return 0
def _decode_struct(self, data, name):
members = self.consts[name].members
attrs = dict()
offset = 0 for m in members:
value = None if m.type == 'pad':
offset += m.len elif m.type == 'binary': if m.struct:
len = self._struct_size(m.struct)
value = self._decode_struct(data[offset : offset + len],
m.struct)
offset += len else:
value = data[offset : offset + m.len]
offset += m.len else:
format = NlAttr.get_format(m.type, m.byte_order)
[ value ] = format.unpack_from(data, offset)
offset += format.size if value isnotNone: if m.enum:
value = self._decode_enum(value, m) elif m.display_hint:
value = self._formatted_string(value, m.display_hint)
attrs[m.name] = value return attrs
def _encode_struct(self, name, vals):
members = self.consts[name].members
attr_payload = b'' for m in members:
value = vals.pop(m.name) if m.name in vals elseNone if m.type == 'pad':
attr_payload += bytearray(m.len) elif m.type == 'binary': if m.struct: if value isNone:
value = dict()
attr_payload += self._encode_struct(m.struct, value) else: if value isNone:
attr_payload += bytearray(m.len) else:
attr_payload += bytes.fromhex(value) else: if value isNone:
value = 0
format = NlAttr.get_format(m.type, m.byte_order)
attr_payload += format.pack(value) return attr_payload
def _formatted_string(self, raw, display_hint): if display_hint == 'mac':
formatted = ':'.join('%02x' % b for b in raw) elif display_hint == 'hex': if isinstance(raw, int):
formatted = hex(raw) else:
formatted = bytes.hex(raw, ' ') elif display_hint in [ 'ipv4', 'ipv6' ]:
formatted = format(ipaddress.ip_address(raw)) elif display_hint == 'uuid':
formatted = str(uuid.UUID(bytes=raw)) else:
formatted = raw return formatted
def _from_string(self, string, attr_spec): if attr_spec.display_hint in ['ipv4', 'ipv6']:
ip = ipaddress.ip_address(string) if attr_spec['type'] == 'binary':
raw = ip.packed else:
raw = int(ip) else: raise Exception(f"Display hint '{attr_spec.display_hint}' not implemented"
f" when parsing '{attr_spec['name']}'") return raw
def handle_ntf(self, decoded):
msg = dict() if self.include_raw:
msg['raw'] = decoded
op = self.rsp_by_value[decoded.cmd()]
attrs = self._decode(decoded.raw_attrs, op.attr_set.name) if op.fixed_header:
attrs.update(self._decode_struct(decoded.raw, op.fixed_header))
nms = NlMsgs(reply)
self._recv_dbg_print(reply, nms) for nl_msg in nms: if nl_msg.error:
print("Netlink error in ntf!?", os.strerror(-nl_msg.error))
print(nl_msg) continue if nl_msg.done:
print("Netlink done while checking for ntf!?") continue
decoded = self.nlproto.decode(self, nl_msg, None) if decoded.cmd() notin self.async_msg_ids:
print("Unexpected msg id while checking for ntf", decoded) continue
def operation_do_attributes(self, name): """ For a given operation name, find andreturn a supported
set of attributes (as a dict). """
op = self.find_operation(name) ifnot op: returnNone
return op['do']['request']['attributes'].copy()
def _encode_message(self, op, vals, flags, req_seq):
nl_flags = Netlink.NLM_F_REQUEST | Netlink.NLM_F_ACK for flag in flags or []:
nl_flags |= flag
msg = self.nlproto.message(nl_flags, op.req_value, 1, req_seq) if op.fixed_header:
msg += self._encode_struct(op.fixed_header, vals)
search_attrs = SpaceAttrs(op.attr_set, vals) for name, value in vals.items():
msg += self._add_attr(op.attr_set.name, name, value, search_attrs)
msg = _genl_msg_finalize(msg) return msg
done = False
rsp = []
op_rsp = [] whilenot done:
reply = self.sock.recv(self._recv_size)
nms = NlMsgs(reply)
self._recv_dbg_print(reply, nms) for nl_msg in nms: if nl_msg.nl_seq in reqs_by_seq:
(op, vals, req_msg, req_flags) = reqs_by_seq[nl_msg.nl_seq] if nl_msg.extack:
nl_msg.annotate_extack(op.attr_set)
self._decode_extack(req_msg, op, nl_msg.extack, vals) else:
op = None
req_flags = []
if nl_msg.error: raise NlError(nl_msg) if nl_msg.done: if nl_msg.extack:
print("Netlink warning:")
print(nl_msg)
if Netlink.NLM_F_DUMP in req_flags:
rsp.append(op_rsp) elifnot op_rsp:
rsp.append(None) elif len(op_rsp) == 1:
rsp.append(op_rsp[0]) else:
rsp.append(op_rsp)
op_rsp = []
del reqs_by_seq[nl_msg.nl_seq]
done = len(reqs_by_seq) == 0 break
decoded = self.nlproto.decode(self, nl_msg, op)
# Check if this is a reply to our request if nl_msg.nl_seq notin reqs_by_seq or decoded.cmd() != op.rsp_value: if decoded.cmd() in self.async_msg_ids:
self.handle_ntf(decoded) continue else:
print('Unexpected message: ' + repr(decoded)) continue
rsp_msg = self._decode(decoded.raw_attrs, op.attr_set.name) if op.fixed_header:
rsp_msg.update(self._decode_struct(decoded.raw, op.fixed_header))
op_rsp.append(rsp_msg)
return rsp
def _op(self, method, vals, flags=None, dump=False):
req_flags = flags or [] if dump:
req_flags.append(Netlink.NLM_F_DUMP)
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.