applicators.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. from typing import (
  2. Any,
  3. Callable,
  4. Dict,
  5. Generator,
  6. List,
  7. Tuple,
  8. Union,
  9. )
  10. import warnings
  11. from .decorators import (
  12. return_arg_type,
  13. )
  14. from .functional import (
  15. to_dict,
  16. )
  17. from .pydantic import (
  18. CamelModel,
  19. )
  20. from .toolz import (
  21. compose,
  22. curry,
  23. )
  24. Formatters = Callable[[List[Any]], List[Any]]
  25. @return_arg_type(2)
  26. def apply_formatter_at_index(
  27. formatter: Callable[..., Any], at_index: int, value: List[Any]
  28. ) -> Generator[List[Any], None, None]:
  29. if at_index + 1 > len(value):
  30. raise IndexError(
  31. f"Not enough values in iterable to apply formatter. Got: {len(value)}. "
  32. f"Need: {at_index + 1}"
  33. )
  34. for index, item in enumerate(value):
  35. if index == at_index:
  36. yield formatter(item)
  37. else:
  38. yield item
  39. def combine_argument_formatters(*formatters: List[Callable[..., Any]]) -> Formatters:
  40. warnings.warn(
  41. DeprecationWarning(
  42. "combine_argument_formatters(formatter1, formatter2)([item1, item2])"
  43. "has been deprecated and will be removed in a subsequent major version "
  44. "release of the eth-utils library. Update your calls to use "
  45. "apply_formatters_to_sequence([formatter1, formatter2], [item1, item2]) "
  46. "instead."
  47. ),
  48. stacklevel=2,
  49. )
  50. _formatter_at_index = curry(apply_formatter_at_index)
  51. return compose( # type: ignore
  52. *(
  53. _formatter_at_index(formatter, index)
  54. for index, formatter in enumerate(formatters)
  55. )
  56. )
  57. @return_arg_type(1)
  58. def apply_formatters_to_sequence(
  59. formatters: List[Any], sequence: List[Any]
  60. ) -> Generator[List[Any], None, None]:
  61. if len(formatters) > len(sequence):
  62. raise IndexError(
  63. f"Too many formatters for sequence: {len(formatters)} formatters for "
  64. f"{repr(sequence)}"
  65. )
  66. elif len(formatters) < len(sequence):
  67. raise IndexError(
  68. f"Too few formatters for sequence: {len(formatters)} formatters for "
  69. f"{repr(sequence)}"
  70. )
  71. else:
  72. for formatter, item in zip(formatters, sequence):
  73. yield formatter(item)
  74. def apply_formatter_if(
  75. condition: Callable[..., bool], formatter: Callable[..., Any], value: Any
  76. ) -> Any:
  77. if condition(value):
  78. return formatter(value)
  79. else:
  80. return value
  81. @to_dict
  82. def apply_formatters_to_dict(
  83. formatters: Dict[Any, Any],
  84. value: Union[Dict[Any, Any], CamelModel],
  85. unaliased: bool = False,
  86. ) -> Generator[Tuple[Any, Any], None, None]:
  87. """
  88. Apply formatters to a dictionary of values. If the value is a pydantic model,
  89. it will be serialized to a dictionary first, taking into account the
  90. ``unaliased`` parameter.
  91. :param formatters: The formatters to apply to the dictionary.
  92. :param value: The dictionary-like object to apply the formatters to.
  93. :param unaliased: If the model is a ``CamelModel``, whether to turn off
  94. serialization by alias (camelCase).
  95. :return: A generator that yields the formatted key-value pairs.
  96. """
  97. if isinstance(value, CamelModel):
  98. value = value.model_dump(by_alias=not unaliased)
  99. for key, item in value.items():
  100. if key in formatters:
  101. try:
  102. yield key, formatters[key](item)
  103. except ValueError as exc:
  104. new_error_message = (
  105. f"Could not format invalid value {repr(item)} as field {repr(key)}"
  106. )
  107. raise ValueError(new_error_message) from exc
  108. except TypeError as exc:
  109. new_error_message = (
  110. f"Could not format invalid type {repr(item)} as field {repr(key)}"
  111. )
  112. raise TypeError(new_error_message) from exc
  113. else:
  114. yield key, item
  115. @return_arg_type(1)
  116. def apply_formatter_to_array(
  117. formatter: Callable[..., Any], value: List[Any]
  118. ) -> Generator[List[Any], None, None]:
  119. for item in value:
  120. yield formatter(item)
  121. def apply_one_of_formatters(
  122. formatter_condition_pairs: Tuple[Tuple[Callable[..., Any], Callable[..., Any]]],
  123. value: Any,
  124. ) -> Any:
  125. for condition, formatter in formatter_condition_pairs:
  126. if condition(value):
  127. return formatter(value)
  128. else:
  129. raise ValueError(
  130. "The provided value did not satisfy any of the formatter conditions"
  131. )
  132. @to_dict
  133. def apply_key_map(
  134. key_mappings: Dict[Any, Any], value: Dict[Any, Any]
  135. ) -> Generator[Tuple[Any, Any], None, None]:
  136. key_conflicts = (
  137. set(value.keys())
  138. .difference(key_mappings.keys())
  139. .intersection(v for k, v in key_mappings.items() if v in value)
  140. )
  141. if key_conflicts:
  142. raise KeyError(
  143. f"Could not apply key map due to conflicting key(s): {key_conflicts}"
  144. )
  145. for key, item in value.items():
  146. if key in key_mappings:
  147. yield key_mappings[key], item
  148. else:
  149. yield key, item