# # This file is part of pyasn1 software. # # Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> # License: http://snmplabs.com/pyasn1/license.html # from pyasn1 import debug from pyasn1 import error from pyasn1.codec.ber import eoo from pyasn1.compat.integer import from_bytes from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null from pyasn1.type import base from pyasn1.type import char from pyasn1.type import tag from pyasn1.type import tagmap from pyasn1.type import univ from pyasn1.type import useful
while substrate:
component, substrate = decodeFun(substrate, self.protoComponent,
substrateFun=substrateFun,
allowEoo=True, **options) if component is eoo.endOfOctets: break
# All inner fragments are of the same type, treat them as octet string
substrateFun = self.substrateCollector
header = null
while substrate:
component, substrate = decodeFun(substrate,
self.protoComponent,
substrateFun=substrateFun,
allowEoo=True, **options) if component is eoo.endOfOctets: break
header += component
else: raise error.SubstrateUnderrunError( 'No EOO seen before substrate ends'
)
oid = ()
index = 0
substrateLen = len(head) while index < substrateLen:
subId = head[index]
index += 1 if subId < 128:
oid += (subId,) elif subId > 128: # Construct subid from a number of octets
nextSubId = subId
subId = 0 while nextSubId >= 128:
subId = (subId << 7) + (nextSubId & 0x7F) if index >= substrateLen: raise error.SubstrateUnderrunError( 'Short substrate for sub-OID past %s' % (oid,)
)
nextSubId = head[index]
index += 1
oid += ((subId << 7) + nextSubId,) elif subId == 128: # ASN.1 spec forbids leading zeros (0x80) in OID # encoding, tolerating it opens a vulnerability. See # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf # page 7 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding')
# Decode two leading arcs if 0 <= oid[0] <= 39:
oid = (0,) + oid elif 40 <= oid[0] <= 79:
oid = (1, oid[0] - 40) + oid[1:] elif oid[0] >= 80:
oid = (2, oid[0] - 80) + oid[1:] else: raise error.PyAsn1Error('Malformed first OID octet: %s' % head[0])
# Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF # The heuristics is: # * 1+ components of different types -> likely SEQUENCE/SET # * otherwise -> likely SEQUENCE OF/SET OF if len(componentTypes) > 1:
protoComponent = self.protoRecordComponent
asn1Object = protoComponent.clone( # construct tagSet from base tag from prototype ASN.1 object # and additional tags recovered from the substrate
tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags)
)
if LOG:
LOG('guessed %r container type (pass `asn1Spec` to guide the ' 'decoder)' % asn1Object)
for idx, component in enumerate(components):
asn1Object.setComponentByPosition(
idx, component,
verifyConstraints=False,
matchTags=False, matchConstraints=False
)
return asn1Object, substrate
def valueDecoder(self, substrate, asn1Spec,
tagSet=None, length=None, state=None,
decodeFun=None, substrateFun=None,
**options): if tagSet[0].tagFormat != tag.tagFormatConstructed: raise error.PyAsn1Error('Constructed tag format expected')
if LOG:
LOG('default open types map of component ' '"%s.%s" governed by component "%s.%s"' ':' % (asn1Object.__class__.__name__,
namedType.name,
asn1Object.__class__.__name__,
namedType.openType.name))
for k, v in namedType.openType.items():
LOG('%s -> %r' % (k, v))
if LOG:
LOG('default open types map of component ' '"%s.%s" governed by component "%s.%s"' ':' % (asn1Object.__class__.__name__,
namedType.name,
asn1Object.__class__.__name__,
namedType.openType.name))
for k, v in namedType.openType.items():
LOG('%s -> %r' % (k, v))
if LOG:
LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(header))
# Any components do not inherit initial tag
asn1Spec = self.protoComponent
if substrateFun and substrateFun isnot self.substrateCollector:
asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) return substrateFun(asn1Object, header + substrate, length + len(header))
if LOG:
LOG('assembling constructed serialization')
# All inner fragments are of the same type, treat them as octet string
substrateFun = self.substrateCollector
while substrate:
component, substrate = decodeFun(substrate, asn1Spec,
substrateFun=substrateFun,
allowEoo=True, **options) if component is eoo.endOfOctets: break
header += component
else: raise error.SubstrateUnderrunError( 'No EOO seen before substrate ends'
)
# Put in non-ambiguous types for faster codec lookup for typeDecoder in tagMap.values(): if typeDecoder.protoComponent isnotNone:
typeId = typeDecoder.protoComponent.__class__.typeId if typeId isnotNoneand typeId notin typeMap:
typeMap[typeId] = typeDecoder
(stDecodeTag,
stDecodeLength,
stGetValueDecoder,
stGetValueDecoderByAsn1Spec,
stGetValueDecoderByTag,
stTryAsExplicitTag,
stDecodeValue,
stDumpRawValue,
stErrorCondition,
stStop) = [x for x in range(10)]
if LOG:
LOG('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate)))
allowEoo = options.pop('allowEoo', False)
# Look for end-of-octets sentinel if allowEoo and self.supportIndefLength: if substrate[:2] == self.__eooSentinel: if LOG:
LOG('end-of-octets sentinel found') return eoo.endOfOctets, substrate[2:]
if isShortTag: # cache short tags
tagCache[firstOctet] = lastTag
if tagSet isNone: if isShortTag: try:
tagSet = tagSetCache[firstOctet]
except KeyError: # base tag not recovered
tagSet = tag.TagSet((), lastTag)
tagSetCache[firstOctet] = tagSet else:
tagSet = tag.TagSet((), lastTag)
else:
tagSet = lastTag + tagSet
state = stDecodeLength
if LOG:
LOG('tag decoded into %s, decoding length' % tagSet)
if state is stDecodeLength: # Decode length ifnot substrate: raise error.SubstrateUnderrunError( 'Short octet stream on length decoding'
)
firstOctet = oct2int(substrate[0])
if firstOctet < 128:
size = 1
length = firstOctet
elif firstOctet > 128:
size = firstOctet & 0x7F # encoded in size bytes
encodedLength = octs2ints(substrate[1:size + 1]) # missing check on maximum size, which shouldn't be a # problem, we can handle more than is possible if len(encodedLength) != size: raise error.SubstrateUnderrunError( '%s<%s at %s' % (size, len(encodedLength), tagSet)
)
length = 0 for lengthOctet in encodedLength:
length <<= 8
length |= lengthOctet
size += 1
else:
size = 1
length = -1
substrate = substrate[size:]
if length == -1: ifnot self.supportIndefLength: raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
if LOG:
LOG('value length decoded into %d, payload substrate is: %s' % (length, debug.hexdump(length == -1 and substrate or substrate[:length])))
if state is stGetValueDecoder: if asn1Spec isNone:
state = stGetValueDecoderByTag
else:
state = stGetValueDecoderByAsn1Spec # # There're two ways of creating subtypes in ASN.1 what influences # decoder operation. These methods are: # 1) Either base types used in or no IMPLICIT tagging has been # applied on subtyping. # 2) Subtype syntax drops base type information (by means of # IMPLICIT tagging. # The first case allows for complete tag recovery from substrate # while the second one requires original ASN.1 type spec for # decoding. # # In either case a set of tags (tagSet) is coming from substrate # in an incremental, tag-by-tag fashion (this is the case of # EXPLICIT tag which is most basic). Outermost tag comes first # from the wire. # if state is stGetValueDecoderByTag: try:
concreteDecoder = tagMap[tagSet]
except KeyError:
concreteDecoder = None
if concreteDecoder:
state = stDecodeValue
else: try:
concreteDecoder = tagMap[tagSet[:1]]
except KeyError:
concreteDecoder = None
if concreteDecoder:
state = stDecodeValue else:
state = stTryAsExplicitTag
if LOG:
LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or"", state is stDecodeValue and'value'or'as explicit tag'))
debug.scope.push(concreteDecoder isNoneand'?'or concreteDecoder.protoComponent.__class__.__name__)
if state is stGetValueDecoderByAsn1Spec:
if asn1Spec.__class__ is tagmap.TagMap: try:
chosenSpec = asn1Spec[tagSet]
except KeyError:
chosenSpec = None
if LOG:
LOG('candidate ASN.1 spec is a map of:')
for firstOctet, v in asn1Spec.presentTypes.items():
LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
if asn1Spec.skipTypes:
LOG('but neither of: ') for firstOctet, v in asn1Spec.skipTypes.items():
LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec isNoneand''or chosenSpec.prettyPrintType(), tagSet))
elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap:
chosenSpec = asn1Spec if LOG:
LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
else:
chosenSpec = None
if chosenSpec isnotNone: try: # ambiguous type or just faster codec lookup
concreteDecoder = typeMap[chosenSpec.typeId]
if LOG:
LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,))
except KeyError: # use base type for codec lookup to recover untagged types
baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag) try: # base type or tagged subtype
concreteDecoder = tagMap[baseTagSet]
if LOG:
LOG('value decoder chosen by base %s' % (baseTagSet,))
except KeyError:
concreteDecoder = None
if concreteDecoder:
asn1Spec = chosenSpec
state = stDecodeValue
else:
state = stTryAsExplicitTag
else:
concreteDecoder = None
state = stTryAsExplicitTag
if LOG:
LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or"", state is stDecodeValue and'value'or'as explicit tag'))
debug.scope.push(chosenSpec isNoneand'?'or chosenSpec.__class__.__name__)
if state is stDecodeValue: ifnot options.get('recursiveFlag', True) andnot substrateFun: # deprecate this
substrateFun = lambda a, b, c: (a, b[:c])
if LOG:
LOG('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, isinstance(value, base.Asn1Item) and value.prettyPrint() or value, substrate and debug.hexdump(substrate) or''))
state = stStop break
if state is stTryAsExplicitTag: if (tagSet and
tagSet[0].tagFormat == tag.tagFormatConstructed and
tagSet[0].tagClass != tag.tagClassUniversal): # Assume explicit tagging
concreteDecoder = explicitTagDecoder
state = stDecodeValue
else:
concreteDecoder = None
state = self.defaultErrorState
if LOG:
LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or"", state is stDecodeValue and'value'or'as failure'))
if state is stDumpRawValue:
concreteDecoder = self.defaultRawDecoder
if LOG:
LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
state = stDecodeValue
if state is stErrorCondition: raise error.PyAsn1Error( '%s not in asn1Spec: %r' % (tagSet, asn1Spec)
)
if LOG:
debug.scope.pop()
LOG('decoder left scope %s, call completed' % debug.scope)
return value, substrate
#: Turns BER octet stream into an ASN.1 object. #: #: Takes BER octet-stream and decode it into an ASN.1 object #: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which #: may be a scalar or an arbitrary nested structure. #: #: Parameters #: ---------- #: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) #: BER octet-stream #: #: Keyword Args #: ------------ #: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative #: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure #: being decoded, *asn1Spec* may or may not be required. Most common reason for #: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. #: #: Returns #: ------- #: : :py:class:`tuple` #: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative) #: and the unprocessed trailing portion of the *substrate* (may be empty) #: #: Raises #: ------ #: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError #: On decoding errors #: #: Examples #: -------- #: Decode BER serialisation without ASN.1 schema #: #: .. code-block:: pycon #: #: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') #: >>> str(s) #: SequenceOf: #: 1 2 3 #: #: Decode BER serialisation with ASN.1 schema #: #: .. code-block:: pycon #: #: >>> seq = SequenceOf(componentType=Integer()) #: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) #: >>> str(s) #: SequenceOf: #: 1 2 3 #:
decode = Decoder(tagMap, typeMap)
# XXX # non-recursive decoding; return position rather than substrate
Messung V0.5
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.108Bemerkung:
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.