def limit_to_number(name): """
Turn a string limit like u32-max or s64-min into its numerical value """ if name[0] == 'u'and name.endswith('-min'): return 0
width = int(name[1:-4]) if name[0] == 's':
width -= 1
value = (1 << width) - 1 if name[0] == 's'and name.endswith('-min'):
value = -value - 1 return value
class BaseNlLib: def get_family_id(self): return'ys->family_id'
if nested:
self.nested_attrs = nested if self.nested_attrs == family.name:
self.nested_render_name = c_lower(f"{family.ident_name}") else:
self.nested_render_name = c_lower(f"{family.ident_name}_{self.nested_attrs}")
if self.nested_attrs in self.family.consts:
self.nested_struct_type = 'struct ' + self.nested_render_name + '_' else:
self.nested_struct_type = 'struct ' + self.nested_render_name
self.c_name = c_lower(self.name) if self.c_name in _C_KW:
self.c_name += '_' if self.c_name[0].isdigit():
self.c_name = '_' + self.c_name
# Added by resolve():
self.enum_name = None
delattr(self, "enum_name")
def _get_real_attr(self): # if the attr is for a subset return the "real" attr (just one down, does not recurse) return self.family.attr_sets[self.attr_set.subset_of][self.name]
def set_request(self):
self.request = True if self.attr_set.subset_of:
self._get_real_attr().set_request()
def set_reply(self):
self.reply = True if self.attr_set.subset_of:
self._get_real_attr().set_reply()
def get_limit(self, limit, default=None):
value = self.checks.get(limit, default) if value isNone: return value if isinstance(value, int): return value if value in self.family.consts: return self.family.consts[value]["value"] return limit_to_number(value)
def get_limit_str(self, limit, default=None, suffix=''):
value = self.checks.get(limit, default) if value isNone: return'' if isinstance(value, int): return str(value) + suffix if value in self.family.consts:
const = self.family.consts[value] if const.get('header'): return c_upper(value) return c_upper(f"{self.family['name']}-{value}") return c_upper(value)
def presence_member(self, space, type_filter): if self.presence_type() != type_filter: return
if self.presence_type() == 'present':
pfx = '__'if space == 'user'else'' return f"{pfx}u32 {self.c_name}:1;"
if self.presence_type() in {'len', 'count'}:
pfx = '__'if space == 'user'else'' return f"{pfx}u32 {self.c_name};"
def _complex_member_type(self, ri): returnNone
def free_needs_iter(self): returnFalse
def _free_lines(self, ri, var, ref): if self.is_multi_val() or self.presence_type() in {'count', 'len'}: return [f'free({var}->{ref}{self.c_name});'] return []
def free(self, ri, var, ref):
lines = self._free_lines(ri, var, ref) for line in lines:
ri.cw.p(line)
def arg_member(self, ri):
member = self._complex_member_type(ri) if member:
spc = ' 'if member[-1] != '*'else''
arg = [member + spc + '*' + self.c_name] if self.presence_type() == 'count':
arg += ['unsigned int n_' + self.c_name] return arg raise Exception(f"Struct member not implemented for class type {self.type}")
def struct_member(self, ri):
member = self._complex_member_type(ri) if member:
ptr = '*'if self.is_multi_val() else'' if self.is_recursive_for_op(ri):
ptr = '*'
spc = ' 'if member[-1] != '*'else''
ri.cw.p(f"{member}{spc}{ptr}{self.c_name};") return
members = self.arg_member(ri) for one in members:
ri.cw.p(one + ';')
def _attr_put_line(self, ri, var, line):
presence = self.presence_type() if presence in {'present', 'len'}:
ri.cw.p(f"if ({var}->_{presence}.{self.c_name})")
ri.cw.p(f"{line};")
def _attr_put_simple(self, ri, var, put_type):
line = f"ynl_attr_put_{put_type}(nlh, {self.enum_name}, {var}->{self.c_name})"
self._attr_put_line(ri, var, line)
def attr_put(self, ri, var): raise Exception(f"Put not implemented for class type {self.type}")
def _attr_get(self, ri, var): raise Exception(f"Attr get not implemented for class type {self.type}")
def attr_get(self, ri, var, first):
lines, init_lines, local_vars = self._attr_get(ri, var) if type(lines) is str:
lines = [lines] if type(init_lines) is str:
init_lines = [init_lines]
kw = 'if'if first else'else if'
ri.cw.block_start(line=f"{kw} (type == {self.enum_name})") if local_vars: for local in local_vars:
ri.cw.p(local)
ri.cw.nl()
if init_lines:
ri.cw.nl() for line in init_lines:
ri.cw.p(line)
for line in lines:
ri.cw.p(line)
ri.cw.block_end() returnTrue
def _setter_lines(self, ri, member, presence): raise Exception(f"Setter not implemented for class type {self.type}")
def setter(self, ri, space, direction, deref=False, ref=None, var="req"):
ref = (ref if ref else []) + [self.c_name]
member = f"{var}->{'.'.join(ref)}"
local_vars = [] if self.free_needs_iter():
local_vars += ['unsigned int i;']
code = []
presence = '' for i in range(0, len(ref)):
presence = f"{var}->{'.'.join(ref[:i] + [''])}_present.{ref[i]}" # Every layer below last is a nest, so we know it uses bit presence # last layer is "self" and may be a complex type if i == len(ref) - 1 and self.presence_type() != 'present':
presence = f"{var}->{'.'.join(ref[:i] + [''])}_{self.presence_type()}.{ref[i]}" continue
code.append(presence + ' = 1;')
ref_path = '.'.join(ref[:-1]) if ref_path:
ref_path += '.'
code += self._free_lines(ri, var, ref_path)
code += self._setter_lines(ri, member, presence)
func_name = f"{op_prefix(ri, direction, deref=deref)}_set_{'_'.join(ref)}"
free = bool([x for x in code if'free('in x])
alloc = bool([x for x in code if'alloc('in x]) if free andnot alloc:
func_name = '__' + func_name
ri.cw.write_func('static inline void', func_name, local_vars=local_vars,
body=code,
args=[f'{type_name(ri, direction, deref=deref)} *{var}'] + self.arg_member(ri))
class TypeUnused(Type): def presence_type(self): return''
# Classic families have some funny enums, don't bother # computing checks, since we only need them for kernel policies ifnot family.is_classic():
self._init_checks()
for _, attr in ri.family.pure_nested_structs[self.nested_attrs].member_list(): if attr.is_recursive(): continue
attr.setter(ri, self.nested_attrs, direction, deref=deref, ref=ref,
var=var)
def _attr_typol(self):
typol = f'.type = YNL_PT_NEST, .nest = &{self.nested_render_name}_nest, '
typol += '.is_submsg = 1, ' # Reverse-parsing of the policy (ynl_err_walk() in ynl.c) does not # support external selectors. No family uses sub-messages with external # selector for requests so this is fine for now. ifnot self.selector.is_external():
typol += f'.selector_type = {self.attr_set[self["selector"]].value} ' return typol
class Selector: def __init__(self, msg_attr, attr_set):
self.name = msg_attr["selector"]
if self.name in attr_set:
self.attr = attr_set[self.name]
self.attr.is_selector = True
self._external = False else: # The selector will need to get passed down thru the structs
self.attr = None
self._external = True
def set_attr(self, attr):
self.attr = attr
def is_external(self): return self._external
class Struct: def __init__(self, family, space_name, type_list=None, fixed_header=None,
inherited=None, submsg=None):
self.family = family
self.space_name = space_name
self.attr_set = family.attr_sets[space_name] # Use list to catch comparisons with empty sets
self._inherited = inherited if inherited isnotNoneelse []
self.inherited = []
self.fixed_header = None if fixed_header:
self.fixed_header = 'struct ' + c_lower(fixed_header)
self.submsg = submsg
self.nested = type_list isNone if family.name == c_lower(space_name):
self.render_name = c_lower(family.ident_name) else:
self.render_name = c_lower(family.ident_name + '-' + space_name)
self.struct_name = 'struct ' + self.render_name if self.nested and space_name in family.consts:
self.struct_name += '_'
self.ptr_name = self.struct_name + ' *' # All attr sets this one contains, directly or multiple levels down
self.child_nests = set()
self.request = False
self.reply = False
self.recursive = False
self.in_multi_val = False# used by a MultiAttr or and legacy arrays
self.attr_list = []
self.attrs = dict() if type_list isnotNone: for t in type_list:
self.attr_list.append((t, self.attr_set[t]),) else: for t in self.attr_set:
self.attr_list.append((t, self.attr_set[t]),)
max_val = 0
self.attr_max_val = None for name, attr in self.attr_list: if attr.value >= max_val:
max_val = attr.value
self.attr_max_val = attr
self.attrs[name] = attr
def set_inherited(self, new_inherited): if self._inherited != new_inherited: raise Exception("Inheriting different members not supported")
self.inherited = [c_lower(x) for x in sorted(self._inherited)]
def external_selectors(self):
sels = [] for name, attr in self.attr_list: if isinstance(attr, TypeSubMessage) and attr.selector.is_external():
sels.append(attr.selector) return sels
def free_needs_iter(self): for _, attr in self.attr_list: if attr.free_needs_iter(): returnTrue returnFalse
# Added by resolve:
self.c_name = None
delattr(self, "c_name")
def resolve(self):
self.c_name = c_lower(self.name) if self.c_name in _C_KW:
self.c_name += '_' if self.c_name == self.family.c_name:
self.c_name = ''
def new_attr(self, elem, value): if elem['type'] in scalars:
t = TypeScalar(self.family, self, elem, value) elif elem['type'] == 'unused':
t = TypeUnused(self.family, self, elem, value) elif elem['type'] == 'pad':
t = TypePad(self.family, self, elem, value) elif elem['type'] == 'flag':
t = TypeFlag(self.family, self, elem, value) elif elem['type'] == 'string':
t = TypeString(self.family, self, elem, value) elif elem['type'] == 'binary': if'struct'in elem:
t = TypeBinaryStruct(self.family, self, elem, value) elif elem.get('sub-type') in scalars:
t = TypeBinaryScalarArray(self.family, self, elem, value) else:
t = TypeBinary(self.family, self, elem, value) elif elem['type'] == 'bitfield32':
t = TypeBitfield32(self.family, self, elem, value) elif elem['type'] == 'nest':
t = TypeNest(self.family, self, elem, value) elif elem['type'] == 'indexed-array'and'sub-type'in elem: if elem["sub-type"] in ['binary', 'nest', 'u32']:
t = TypeArrayNest(self.family, self, elem, value) else: raise Exception(f'new_attr: unsupported sub-type {elem["sub-type"]}') elif elem['type'] == 'nest-type-value':
t = TypeNestTypeValue(self.family, self, elem, value) elif elem['type'] == 'sub-message':
t = TypeSubMessage(self.family, self, elem, value) else: raise Exception(f"No typed class for type {elem['type']}")
if'multi-attr'in elem and elem['multi-attr']:
t = TypeMultiAttr(self.family, self, elem, value, t)
return t
class Operation(SpecOperation): def __init__(self, family, yaml, req_value, rsp_value): # Fill in missing operation properties (for fixed hdr-only msgs) for mode in ['do', 'dump', 'event']: for direction in ['request', 'reply']: try:
yaml[mode][direction].setdefault('attributes', []) except KeyError: pass
self.hooks = dict() for when in ['pre', 'post']:
self.hooks[when] = dict() for op_mode in ['do', 'dump']:
self.hooks[when][op_mode] = dict()
self.hooks[when][op_mode]['set'] = set()
self.hooks[when][op_mode]['list'] = []
def _mark_notify(self): for op in self.msgs.values(): if'notify'in op:
self.ops[op['notify']].mark_has_ntf()
# Fake a 'do' equivalent of all events, so that we can render their response parsing def _mock_up_events(self): for op in self.yaml['operations']['list']: if'event'in op:
op['do'] = { 'reply': { 'attributes': op['event']['attributes']
}
}
def _load_root_sets(self): for op_name, op in self.msgs.items(): if'attribute-set'notin op: continue
req_attrs = set()
rsp_attrs = set() for op_mode in ['do', 'dump']: if op_mode in op and'request'in op[op_mode]:
req_attrs.update(set(op[op_mode]['request']['attributes'])) if op_mode in op and'reply'in op[op_mode]:
rsp_attrs.update(set(op[op_mode]['reply']['attributes'])) if'event'in op:
rsp_attrs.update(set(op['event']['attributes']))
def _sort_pure_types(self): # Try to reorder according to dependencies
pns_key_list = list(self.pure_nested_structs.keys())
pns_key_seen = set()
rounds = len(pns_key_list) ** 2 # it's basically bubble sort for _ in range(rounds): if len(pns_key_list) == 0: break
name = pns_key_list.pop(0)
finished = True for _, spec in self.attr_sets[name].items(): if'nested-attributes'in spec:
nested = spec['nested-attributes'] elif'sub-message'in spec:
nested = spec.sub_message else: continue
# If the unknown nest we hit is recursive it's fine, it'll be a pointer if self.pure_nested_structs[nested].recursive: continue if nested notin pns_key_seen: # Dicts are sorted, this will make struct last
struct = self.pure_nested_structs.pop(name)
self.pure_nested_structs[name] = struct
finished = False break if finished:
pns_key_seen.add(name) else:
pns_key_list.append(name)
def _load_nested_set_nest(self, spec):
inherit = set()
nested = spec['nested-attributes'] if nested notin self.root_sets: if nested notin self.pure_nested_structs:
self.pure_nested_structs[nested] = \
Struct(self, nested, inherited=inherit,
fixed_header=spec.get('fixed-header')) else: raise Exception(f'Using attr set as root and nested not supported - {nested}')
if'type-value'in spec: if nested in self.root_sets: raise Exception("Inheriting members to a space used as root not supported")
inherit.update(set(spec['type-value'])) elif spec['type'] == 'indexed-array':
inherit.add('idx')
self.pure_nested_structs[nested].set_inherited(inherit)
return nested
def _load_nested_set_submsg(self, spec): # Fake the struct type for the sub-message itself # its not a attr_set but codegen wants attr_sets.
submsg = self.sub_msgs[spec["sub-message"]]
nested = submsg.name
while len(attr_set_queue):
a_set = attr_set_queue.pop(0) for attr, spec in self.attr_sets[a_set].items(): if'nested-attributes'in spec:
nested = self._load_nested_set_nest(spec) elif'sub-message'in spec:
nested = self._load_nested_set_submsg(spec) else: continue
if nested notin attr_set_seen:
attr_set_queue.append(nested)
attr_set_seen.add(nested)
for root_set, rs_members in self.root_sets.items(): for attr, spec in self.attr_sets[root_set].items(): if'nested-attributes'in spec:
nested = spec['nested-attributes'] elif'sub-message'in spec:
nested = spec.sub_message else:
nested = None
if nested: if attr in rs_members['request']:
self.pure_nested_structs[nested].request = True if attr in rs_members['reply']:
self.pure_nested_structs[nested].reply = True
if spec.is_multi_val():
child = self.pure_nested_structs.get(nested)
child.in_multi_val = True
self._sort_pure_types()
# Propagate the request / reply / recursive for attr_set, struct in reversed(self.pure_nested_structs.items()): for _, spec in self.attr_sets[attr_set].items(): if attr_set in struct.child_nests:
struct.recursive = True
struct.child_nests.add(child_name)
child = self.pure_nested_structs.get(child_name) if child: ifnot child.recursive:
struct.child_nests.update(child.child_nests)
child.request |= struct.request
child.reply |= struct.reply if spec.is_multi_val():
child.in_multi_val = True
self._sort_pure_types()
def _load_attr_use(self): for _, struct in self.pure_nested_structs.items(): if struct.request: for _, arg in struct.member_list():
arg.set_request() if struct.reply: for _, arg in struct.member_list():
arg.set_reply()
for root_set, rs_members in self.root_sets.items(): for attr, spec in self.attr_sets[root_set].items(): if attr in rs_members['request']:
spec.set_request() if attr in rs_members['reply']:
spec.set_reply()
def _load_selector_passing(self): def all_structs(): for k, v in reversed(self.pure_nested_structs.items()): yield k, v for k, _ in self.root_sets.items(): yield k, None# we don't have a struct, but it must be terminal
for attr_set, struct in all_structs(): for _, spec in self.attr_sets[attr_set].items(): if'nested-attributes'in spec:
child_name = spec['nested-attributes'] elif'sub-message'in spec:
child_name = spec.sub_message else: continue
child = self.pure_nested_structs.get(child_name) for selector in child.external_selectors(): if selector.name in self.attr_sets[attr_set]:
sel_attr = self.attr_sets[attr_set][selector.name]
selector.set_attr(sel_attr) else: raise Exception("Passing selector thru more than one layer not supported")
def _load_global_policy(self):
global_set = set()
attr_set_name = None for op_name, op in self.ops.items(): ifnot op: continue if'attribute-set'notin op: continue
if attr_set_name isNone:
attr_set_name = op['attribute-set'] if attr_set_name != op['attribute-set']: raise Exception('For a global policy all ops must use the same set')
for op_mode in ['do', 'dump']: if op_mode in op:
req = op[op_mode].get('request') if req:
global_set.update(req.get('attributes', []))
self.global_policy = []
self.global_policy_set = attr_set_name for attr in self.attr_sets[attr_set_name]: if attr in global_set:
self.global_policy.append(attr)
def _load_hooks(self): for op in self.ops.values(): for op_mode in ['do', 'dump']: if op_mode notin op: continue for when in ['pre', 'post']: if when notin op[op_mode]: continue
name = op[op_mode][when] if name in self.hooks[when][op_mode]['set']: continue
self.hooks[when][op_mode]['set'].add(name)
self.hooks[when][op_mode]['list'].append(name)
class RenderInfo: def __init__(self, cw, family, ku_space, op, op_mode, attr_set=None):
self.family = family
self.nl = cw.nlib
self.ku_space = ku_space
self.op_mode = op_mode
self.op = op
fixed_hdr = op.fixed_header if op elseNone
self.fixed_hdr_len = 'ys->family->hdr_len' if op and op.fixed_header: if op.fixed_header != family.fixed_header: if family.is_classic():
self.fixed_hdr_len = f"sizeof(struct {c_lower(fixed_hdr)})" else: raise Exception(f"Per-op fixed header not supported, yet")
# 'do' and 'dump' response parsing is identical
self.type_consistent = True
self.type_oneside = False if op_mode != 'do'and'dump'in op: if'do'in op: if ('reply'in op['do']) != ('reply'in op["dump"]):
self.type_consistent = False elif'reply'in op['do'] and op["do"]["reply"] != op["dump"]["reply"]:
self.type_consistent = False else:
self.type_consistent = True
self.type_oneside = True
def close_out_file(self): if self._out == os.sys.stdout: return # Avoid modifying the file if contents didn't change
self._out.flush() ifnot self._overwrite and os.path.isfile(self._out_file): if filecmp.cmp(self._out.name, self._out_file, shallow=False): return with open(self._out_file, 'w+') as out_file:
self._out.seek(0)
shutil.copyfileobj(self._out, out_file)
self._out.close()
self._out = os.sys.stdout
@classmethod def _is_cond(cls, line): return line.startswith('if') or line.startswith('while') or line.startswith('for')
def p(self, line, add_ind=0): if self._block_end:
self._block_end = False if line.startswith('else'):
line = '} ' + line else:
self._out.write('\t' * self._ind + '}\n')
if self._nl:
self._out.write('\n')
self._nl = False
ind = self._ind if line[-1] == ':':
ind -= 1 if self._silent_block:
ind += 1
self._silent_block = line.endswith(')') and CodeWriter._is_cond(line)
self._silent_block |= line.strip() == 'else' if line[0] == '#':
ind = 0 if add_ind:
ind += add_ind
self._out.write('\t' * ind + line + '\n')
def nl(self):
self._nl = True
def block_start(self, line=''): if line:
line = line + ' '
self.p(line + '{')
self._ind += 1
def block_end(self, line=''): if line and line[0] notin {';', ','}:
line = ' ' + line
self._ind -= 1
self._nl = False ifnot line: # Delay printing closing bracket in case "else" comes next if self._block_end:
self._out.write('\t' * (self._ind + 1) + '}\n')
self._block_end = True else:
self.p('}' + line)
def write_doc_line(self, doc, indent=True):
words = doc.split()
line = ' *' for word in words: if len(line) + len(word) >= 79:
self.p(line)
line = ' *' if indent:
line += ' '
line += ' ' + word
self.p(line)
def writes_defines(self, defines):
longest = 0 for define in defines: if len(define[0]) > longest:
longest = len(define[0])
longest = ((longest + 8) // 8) * 8 for define in defines:
line = '#define ' + define[0]
line += '\t' * ((longest - len(define[0]) + 7) // 8) if type(define[1]) is int:
line += str(define[1]) elif type(define[1]) is str:
line += '"' + define[1] + '"'
self.p(line)
def write_struct_init(self, members):
longest = max([len(x[0]) for x in members])
longest += 1 # because we prepend a .
longest = ((longest + 8) // 8) * 8 for one in members:
line = '.' + one[0]
line += '\t' * ((longest - len(one[0]) - 1 + 7) // 8)
line += '= ' + str(one[1]) + ','
self.p(line)
def ifdef_block(self, config):
config_option = None if config:
config_option = 'CONFIG_' + c_upper(config) if self._ifdef_block == config_option: return
if self._ifdef_block:
self.p('#endif /* ' + self._ifdef_block + ' */') if config_option:
self.p('#ifdef ' + config_option)
self._ifdef_block = config_option
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.