| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639 |
- import abc
- import decimal
- import io
- from typing import (
- Any,
- Generator,
- )
- from eth_utils import (
- big_endian_to_int,
- to_normalized_address,
- to_tuple,
- )
- from eth_abi.base import (
- BaseCoder,
- parse_tuple_type_str,
- parse_type_str,
- )
- from eth_abi.exceptions import (
- InsufficientDataBytes,
- InvalidPointer,
- NonEmptyPaddingBytes,
- )
- from eth_abi.utils.numeric import (
- TEN,
- abi_decimal_context,
- ceil32,
- )
- class ContextFramesBytesIO(io.BytesIO):
- """
- A byte stream which can track a series of contextual frames in a stack. This
- data structure is necessary to perform nested decodings using the
- :py:class:``HeadTailDecoder`` since offsets present in head sections are
- relative only to a particular encoded object. These offsets can only be
- used to locate a position in a decoding stream if they are paired with a
- contextual offset that establishes the position of the object in which they
- are found.
- For example, consider the encoding of a value for the following type::
- type: (int,(int,int[]))
- value: (1,(2,[3,3]))
- There are two tuples in this type: one inner and one outer. The inner tuple
- type contains a dynamic type ``int[]`` and, therefore, is itself dynamic.
- This means that its value encoding will be placed in the tail section of the
- outer tuple's encoding. Furthermore, the inner tuple's encoding will,
- itself, contain a tail section with the encoding for ``[3,3]``. All
- together, the encoded value of ``(1,(2,[3,3]))`` would look like this (the
- data values are normally 32 bytes wide but have been truncated to remove the
- redundant zeros at the beginnings of their encodings)::
- offset data
- --------------------------
- ^ 0 0x01
- | 32 0x40 <-- Offset of object A in global frame (64)
- -----|--------------------
- Global frame ^ 64 0x02 <-- Beginning of object A (64 w/offset 0 = 64)
- | | 96 0x40 <-- Offset of object B in frame of object A (64)
- -----|-Object A's frame---
- | | 128 0x02 <-- Beginning of object B (64 w/offset 64 = 128)
- | | 160 0x03
- v v 192 0x03
- --------------------------
- Note that the offset of object B is encoded as 64 which only specifies the
- beginning of its encoded value relative to the beginning of object A's
- encoding. Globally, object B is located at offset 128. In order to make
- sense out of object B's offset, it needs to be positioned in the context of
- its enclosing object's frame (object A).
- """
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self._frames = []
- self._total_offset = 0
- def seek_in_frame(self, pos: int, *args: Any, **kwargs: Any) -> None:
- """
- Seeks relative to the total offset of the current contextual frames.
- """
- self.seek(self._total_offset + pos, *args, **kwargs)
- def push_frame(self, offset: int) -> None:
- """
- Pushes a new contextual frame onto the stack with the given offset and a
- return position at the current cursor position then seeks to the new
- total offset.
- """
- self._frames.append((offset, self.tell()))
- self._total_offset += offset
- self.seek_in_frame(0)
- def pop_frame(self):
- """
- Pops the current contextual frame off of the stack and returns the
- cursor to the frame's return position.
- """
- try:
- offset, return_pos = self._frames.pop()
- except IndexError:
- raise IndexError("no frames to pop")
- self._total_offset -= offset
- self.seek(return_pos)
- class BaseDecoder(BaseCoder, metaclass=abc.ABCMeta):
- """
- Base class for all decoder classes. Subclass this if you want to define a
- custom decoder class. Subclasses must also implement
- :any:`BaseCoder.from_type_str`.
- """
- strict = True
- @abc.abstractmethod
- def decode(self, stream: ContextFramesBytesIO) -> Any: # pragma: no cover
- """
- Decodes the given stream of bytes into a python value. Should raise
- :any:`exceptions.DecodingError` if a python value cannot be decoded
- from the given byte stream.
- """
- def __call__(self, stream: ContextFramesBytesIO) -> Any:
- return self.decode(stream)
- class HeadTailDecoder(BaseDecoder):
- """
- Decoder for a dynamic element of a dynamic container (a dynamic array, or a sized
- array or tuple that contains dynamic elements). A dynamic element consists of a
- pointer, aka offset, which is located in the head section of the encoded container,
- and the actual value, which is located in the tail section of the encoding.
- """
- is_dynamic = True
- tail_decoder = None
- def validate(self):
- super().validate()
- if self.tail_decoder is None:
- raise ValueError("No `tail_decoder` set")
- def decode(self, stream: ContextFramesBytesIO) -> Any:
- # Decode the offset and move the stream cursor forward 32 bytes
- start_pos = decode_uint_256(stream)
- # Jump ahead to the start of the value
- stream.push_frame(start_pos)
- # assertion check for mypy
- if self.tail_decoder is None:
- raise AssertionError("`tail_decoder` is None")
- # Decode the value
- value = self.tail_decoder(stream)
- # Return the cursor
- stream.pop_frame()
- return value
- class TupleDecoder(BaseDecoder):
- decoders = None
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.decoders = tuple(
- HeadTailDecoder(tail_decoder=d) if getattr(d, "is_dynamic", False) else d
- for d in self.decoders
- )
- self.is_dynamic = any(getattr(d, "is_dynamic", False) for d in self.decoders)
- def validate(self):
- super().validate()
- if self.decoders is None:
- raise ValueError("No `decoders` set")
- def validate_pointers(self, stream: ContextFramesBytesIO) -> None:
- """
- Verify that all pointers point to a valid location in the stream.
- """
- current_location = stream.tell()
- len_of_head = sum(
- decoder.array_size if hasattr(decoder, "array_size") else 1
- for decoder in self.decoders
- )
- end_of_offsets = current_location + 32 * len_of_head
- total_stream_length = len(stream.getbuffer())
- for decoder in self.decoders:
- if isinstance(decoder, HeadTailDecoder):
- # the next 32 bytes are a pointer
- offset = decode_uint_256(stream)
- indicated_idx = current_location + offset
- if (
- indicated_idx < end_of_offsets
- or indicated_idx >= total_stream_length
- ):
- # the pointer is indicating its data is located either within the
- # offsets section of the stream or beyond the end of the stream,
- # both of which are invalid
- raise InvalidPointer(
- "Invalid pointer in tuple at location "
- f"{stream.tell() - 32} in payload"
- )
- else:
- # the next 32 bytes are not a pointer, so progress the stream per
- # the decoder
- decoder(stream)
- # return the stream to its original location for actual decoding
- stream.seek(current_location)
- @to_tuple
- def decode(self, stream: ContextFramesBytesIO) -> Generator[Any, None, None]:
- self.validate_pointers(stream)
- for decoder in self.decoders:
- yield decoder(stream)
- @parse_tuple_type_str
- def from_type_str(cls, abi_type, registry):
- decoders = tuple(
- registry.get_decoder(c.to_type_str()) for c in abi_type.components
- )
- return cls(decoders=decoders)
- class SingleDecoder(BaseDecoder):
- decoder_fn = None
- def validate(self):
- super().validate()
- if self.decoder_fn is None:
- raise ValueError("No `decoder_fn` set")
- def validate_padding_bytes(self, value, padding_bytes):
- raise NotImplementedError("Must be implemented by subclasses")
- def decode(self, stream):
- raw_data = self.read_data_from_stream(stream)
- data, padding_bytes = self.split_data_and_padding(raw_data)
- if self.decoder_fn is None:
- raise AssertionError("`decoder_fn` is None")
- value = self.decoder_fn(data)
- self.validate_padding_bytes(value, padding_bytes)
- return value
- def read_data_from_stream(self, stream):
- raise NotImplementedError("Must be implemented by subclasses")
- def split_data_and_padding(self, raw_data):
- return raw_data, b""
- class BaseArrayDecoder(BaseDecoder):
- item_decoder = None
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- # Use a head-tail decoder to decode dynamic elements
- if self.item_decoder.is_dynamic:
- self.item_decoder = HeadTailDecoder(
- tail_decoder=self.item_decoder,
- )
- def validate(self):
- super().validate()
- if self.item_decoder is None:
- raise ValueError("No `item_decoder` set")
- @parse_type_str(with_arrlist=True)
- def from_type_str(cls, abi_type, registry):
- item_decoder = registry.get_decoder(abi_type.item_type.to_type_str())
- array_spec = abi_type.arrlist[-1]
- if len(array_spec) == 1:
- # If array dimension is fixed
- return SizedArrayDecoder(
- array_size=array_spec[0],
- item_decoder=item_decoder,
- )
- else:
- # If array dimension is dynamic
- return DynamicArrayDecoder(item_decoder=item_decoder)
- def validate_pointers(self, stream: ContextFramesBytesIO, array_size: int) -> None:
- """
- Verify that all pointers point to a valid location in the stream.
- """
- if isinstance(self.item_decoder, HeadTailDecoder):
- current_location = stream.tell()
- end_of_offsets = current_location + 32 * array_size
- total_stream_length = len(stream.getbuffer())
- for _ in range(array_size):
- offset = decode_uint_256(stream)
- indicated_idx = current_location + offset
- if (
- indicated_idx < end_of_offsets
- or indicated_idx >= total_stream_length
- ):
- # the pointer is indicating its data is located either within the
- # offsets section of the stream or beyond the end of the stream,
- # both of which are invalid
- raise InvalidPointer(
- "Invalid pointer in array at location "
- f"{stream.tell() - 32} in payload"
- )
- stream.seek(current_location)
- class SizedArrayDecoder(BaseArrayDecoder):
- array_size = None
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.is_dynamic = self.item_decoder.is_dynamic
- @to_tuple
- def decode(self, stream):
- if self.item_decoder is None:
- raise AssertionError("`item_decoder` is None")
- self.validate_pointers(stream, self.array_size)
- for _ in range(self.array_size):
- yield self.item_decoder(stream)
- class DynamicArrayDecoder(BaseArrayDecoder):
- # Dynamic arrays are always dynamic, regardless of their elements
- is_dynamic = True
- @to_tuple
- def decode(self, stream):
- array_size = decode_uint_256(stream)
- stream.push_frame(32)
- if self.item_decoder is None:
- raise AssertionError("`item_decoder` is None")
- self.validate_pointers(stream, array_size)
- for _ in range(array_size):
- yield self.item_decoder(stream)
- stream.pop_frame()
- class FixedByteSizeDecoder(SingleDecoder):
- decoder_fn = None
- value_bit_size = None
- data_byte_size = None
- is_big_endian = None
- def validate(self):
- super().validate()
- if self.value_bit_size is None:
- raise ValueError("`value_bit_size` may not be None")
- if self.data_byte_size is None:
- raise ValueError("`data_byte_size` may not be None")
- if self.decoder_fn is None:
- raise ValueError("`decoder_fn` may not be None")
- if self.is_big_endian is None:
- raise ValueError("`is_big_endian` may not be None")
- if self.value_bit_size % 8 != 0:
- raise ValueError(
- "Invalid value bit size: {self.value_bit_size}. Must be a multiple of 8"
- )
- if self.value_bit_size > self.data_byte_size * 8:
- raise ValueError("Value byte size exceeds data size")
- def read_data_from_stream(self, stream):
- data = stream.read(self.data_byte_size)
- if len(data) != self.data_byte_size:
- raise InsufficientDataBytes(
- f"Tried to read {self.data_byte_size} bytes, "
- f"only got {len(data)} bytes."
- )
- return data
- def split_data_and_padding(self, raw_data):
- value_byte_size = self._get_value_byte_size()
- padding_size = self.data_byte_size - value_byte_size
- if self.is_big_endian:
- padding_bytes = raw_data[:padding_size]
- data = raw_data[padding_size:]
- else:
- data = raw_data[:value_byte_size]
- padding_bytes = raw_data[value_byte_size:]
- return data, padding_bytes
- def validate_padding_bytes(self, value, padding_bytes):
- value_byte_size = self._get_value_byte_size()
- padding_size = self.data_byte_size - value_byte_size
- if padding_bytes != b"\x00" * padding_size:
- raise NonEmptyPaddingBytes(
- f"Padding bytes were not empty: {repr(padding_bytes)}"
- )
- def _get_value_byte_size(self):
- value_byte_size = self.value_bit_size // 8
- return value_byte_size
- class Fixed32ByteSizeDecoder(FixedByteSizeDecoder):
- data_byte_size = 32
- class BooleanDecoder(Fixed32ByteSizeDecoder):
- value_bit_size = 8
- is_big_endian = True
- @staticmethod
- def decoder_fn(data):
- if data == b"\x00":
- return False
- elif data == b"\x01":
- return True
- else:
- raise NonEmptyPaddingBytes(
- f"Boolean must be either 0x0 or 0x1. Got: {repr(data)}"
- )
- @parse_type_str("bool")
- def from_type_str(cls, abi_type, registry):
- return cls()
- class AddressDecoder(Fixed32ByteSizeDecoder):
- value_bit_size = 20 * 8
- is_big_endian = True
- decoder_fn = staticmethod(to_normalized_address)
- @parse_type_str("address")
- def from_type_str(cls, abi_type, registry):
- return cls()
- #
- # Unsigned Integer Decoders
- #
- class UnsignedIntegerDecoder(Fixed32ByteSizeDecoder):
- decoder_fn = staticmethod(big_endian_to_int)
- is_big_endian = True
- @parse_type_str("uint")
- def from_type_str(cls, abi_type, registry):
- return cls(value_bit_size=abi_type.sub)
- decode_uint_256 = UnsignedIntegerDecoder(value_bit_size=256)
- #
- # Signed Integer Decoders
- #
- class SignedIntegerDecoder(Fixed32ByteSizeDecoder):
- is_big_endian = True
- def decoder_fn(self, data):
- value = big_endian_to_int(data)
- if value >= 2 ** (self.value_bit_size - 1):
- return value - 2**self.value_bit_size
- else:
- return value
- def validate_padding_bytes(self, value, padding_bytes):
- value_byte_size = self._get_value_byte_size()
- padding_size = self.data_byte_size - value_byte_size
- if value >= 0:
- expected_padding_bytes = b"\x00" * padding_size
- else:
- expected_padding_bytes = b"\xff" * padding_size
- if padding_bytes != expected_padding_bytes:
- raise NonEmptyPaddingBytes(
- f"Padding bytes were not empty: {repr(padding_bytes)}"
- )
- @parse_type_str("int")
- def from_type_str(cls, abi_type, registry):
- return cls(value_bit_size=abi_type.sub)
- #
- # Bytes1..32
- #
- class BytesDecoder(Fixed32ByteSizeDecoder):
- is_big_endian = False
- @staticmethod
- def decoder_fn(data):
- return data
- @parse_type_str("bytes")
- def from_type_str(cls, abi_type, registry):
- return cls(value_bit_size=abi_type.sub * 8)
- class BaseFixedDecoder(Fixed32ByteSizeDecoder):
- frac_places = None
- is_big_endian = True
- def validate(self):
- super().validate()
- if self.frac_places is None:
- raise ValueError("must specify `frac_places`")
- if self.frac_places <= 0 or self.frac_places > 80:
- raise ValueError("`frac_places` must be in range (0, 80]")
- class UnsignedFixedDecoder(BaseFixedDecoder):
- def decoder_fn(self, data):
- value = big_endian_to_int(data)
- with decimal.localcontext(abi_decimal_context):
- decimal_value = decimal.Decimal(value) / TEN**self.frac_places
- return decimal_value
- @parse_type_str("ufixed")
- def from_type_str(cls, abi_type, registry):
- value_bit_size, frac_places = abi_type.sub
- return cls(value_bit_size=value_bit_size, frac_places=frac_places)
- class SignedFixedDecoder(BaseFixedDecoder):
- def decoder_fn(self, data):
- value = big_endian_to_int(data)
- if value >= 2 ** (self.value_bit_size - 1):
- signed_value = value - 2**self.value_bit_size
- else:
- signed_value = value
- with decimal.localcontext(abi_decimal_context):
- decimal_value = decimal.Decimal(signed_value) / TEN**self.frac_places
- return decimal_value
- def validate_padding_bytes(self, value, padding_bytes):
- value_byte_size = self._get_value_byte_size()
- padding_size = self.data_byte_size - value_byte_size
- if value >= 0:
- expected_padding_bytes = b"\x00" * padding_size
- else:
- expected_padding_bytes = b"\xff" * padding_size
- if padding_bytes != expected_padding_bytes:
- raise NonEmptyPaddingBytes(
- f"Padding bytes were not empty: {repr(padding_bytes)}"
- )
- @parse_type_str("fixed")
- def from_type_str(cls, abi_type, registry):
- value_bit_size, frac_places = abi_type.sub
- return cls(value_bit_size=value_bit_size, frac_places=frac_places)
- #
- # String and Bytes
- #
- class ByteStringDecoder(SingleDecoder):
- is_dynamic = True
- @staticmethod
- def decoder_fn(data):
- return data
- def read_data_from_stream(self, stream):
- data_length = decode_uint_256(stream)
- padded_length = ceil32(data_length)
- data = stream.read(padded_length)
- if self.strict:
- if len(data) < padded_length:
- raise InsufficientDataBytes(
- f"Tried to read {padded_length} bytes, only got {len(data)} bytes"
- )
- padding_bytes = data[data_length:]
- if padding_bytes != b"\x00" * (padded_length - data_length):
- raise NonEmptyPaddingBytes(
- f"Padding bytes were not empty: {repr(padding_bytes)}"
- )
- return data[:data_length]
- def validate_padding_bytes(self, value, padding_bytes):
- pass
- @parse_type_str("bytes")
- def from_type_str(cls, abi_type, registry):
- return cls()
- class StringDecoder(ByteStringDecoder):
- def __init__(self, handle_string_errors="strict"):
- self.bytes_errors = handle_string_errors
- super().__init__()
- @parse_type_str("string")
- def from_type_str(cls, abi_type, registry):
- return cls()
- def decode(self, stream):
- raw_data = self.read_data_from_stream(stream)
- data, padding_bytes = self.split_data_and_padding(raw_data)
- value = self.decoder_fn(data, self.bytes_errors)
- self.validate_padding_bytes(value, padding_bytes)
- return value
- @staticmethod
- def decoder_fn(data, handle_string_errors="strict"):
- return data.decode("utf-8", errors=handle_string_errors)
|