def decode_negint(decoder, subtype, shareable_index=None): # Major tag 1
uint = decode_uint(decoder, subtype) return -uint - 1
def decode_bytestring(decoder, subtype, shareable_index=None): # Major tag 2
length = decode_uint(decoder, subtype, allow_infinite=True) if length isNone: # Indefinite length
buf = bytearray() whileTrue:
initial_byte = byte_as_integer(decoder.read(1)) if initial_byte == 255: return buf else:
length = decode_uint(decoder, initial_byte & 31)
value = decoder.read(length)
buf.extend(value) else: return decoder.read(length)
def decode_string(decoder, subtype, shareable_index=None): # Major tag 3 return decode_bytestring(decoder, subtype).decode('utf-8')
def decode_array(decoder, subtype, shareable_index=None): # Major tag 4
items = []
decoder.set_shareable(shareable_index, items)
length = decode_uint(decoder, subtype, allow_infinite=True) if length isNone: # Indefinite length whileTrue:
value = decoder.decode() if value is break_marker: break else:
items.append(value) else: for _ in xrange(length):
item = decoder.decode()
items.append(item)
return items
def decode_map(decoder, subtype, shareable_index=None): # Major tag 5
dictionary = {}
decoder.set_shareable(shareable_index, dictionary)
length = decode_uint(decoder, subtype, allow_infinite=True) if length isNone: # Indefinite length whileTrue:
key = decoder.decode() if key is break_marker: break else:
value = decoder.decode()
dictionary[key] = value else: for _ in xrange(length):
key = decoder.decode()
value = decoder.decode()
dictionary[key] = value
if decoder.object_hook: return decoder.object_hook(decoder, dictionary) else: return dictionary
def decode_semantic(decoder, subtype, shareable_index=None): # Major tag 6
tagnum = decode_uint(decoder, subtype)
# Special handling for the "shareable" tag if tagnum == 28:
shareable_index = decoder._allocate_shareable() return decoder.decode(shareable_index)
value = decoder.decode()
semantic_decoder = semantic_decoders.get(tagnum) if semantic_decoder: return semantic_decoder(decoder, value, shareable_index)
tag = CBORTag(tagnum, value) if decoder.tag_hook: return decoder.tag_hook(decoder, tag, shareable_index) else: return tag
def decode_special(decoder, subtype, shareable_index=None): # Simple value if subtype < 20: return CBORSimpleValue(subtype)
# Major tag 7 return special_decoders[subtype](decoder)
# # Semantic decoders (major tag 6) #
def decode_datetime_string(decoder, value, shareable_index=None): # Semantic tag 0
match = timestamp_re.match(value) if match:
year, month, day, hour, minute, second, micro, offset_h, offset_m = match.groups() if offset_h:
tz = timezone(timedelta(hours=int(offset_h), minutes=int(offset_m))) else:
tz = timezone.utc
class CBORDecoder(object): """
Deserializes a CBOR encoded byte stream.
:param tag_hook: Callable that takes 3 arguments: the decoder instance, the
:class:`~cbor2.types.CBORTag` and the shareable index for the resulting object, if any.
This callback is called for any tags for which there is no built-in decoder.
The return value is substituted for the CBORTag object in the deserialized output.
:param object_hook: Callable that takes 2 arguments: the decoder instance and the dictionary.
This callback is called for each deserialized :class:`dict` object.
The return value is substituted for the dict in the deserialized output. """
def set_shareable(self, index, value): """
Set the shareable value for the last encountered shared value marker, if any.
If the given index is ``None``, nothing is done.
:param index: the value of the ``shared_index`` argument to the decoder
:param value: the shared value
""" if index isnotNone:
self._shareables[index] = value
def read(self, amount): """
Read bytes from the data stream.
:param int amount: the number of bytes to read
"""
data = self.fp.read(amount) if len(data) < amount: raise CBORDecodeError('premature end of stream (expected to read {} bytes, got {} ' 'instead)'.format(amount, len(data)))
return data
def decode(self, shareable_index=None): """
Decode the next value from the stream.
:raises CBORDecodeError: if there is any problem decoding the stream
""" try:
initial_byte = byte_as_integer(self.fp.read(1))
major_type = initial_byte >> 5
subtype = initial_byte & 31 except Exception as e: raise CBORDecodeError('error reading major type at index {}: {}'
.format(self.fp.tell(), e))
decoder = major_decoders[major_type] try: return decoder(self, subtype, shareable_index) except CBORDecodeError: raise except Exception as e: raise CBORDecodeError('error decoding value at index {}: {}'.format(self.fp.tell(), e))
def decode_from_bytes(self, buf): """
Wrap the given bytestring as a file and call :meth:`decode` with it as the argument.
This method was intended to be used from the ``tag_hook`` hook when an object needs to be
decoded separately from the rest but while still taking advantage of the shared value
registry.
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.