signing.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. from typing import (
  2. Optional,
  3. Tuple,
  4. cast,
  5. )
  6. from eth_keys.datatypes import (
  7. PrivateKey,
  8. )
  9. from eth_utils import (
  10. to_bytes,
  11. to_int,
  12. )
  13. from eth_utils.toolz import (
  14. pipe,
  15. )
  16. from eth_account._utils.legacy_transactions import (
  17. ChainAwareUnsignedTransaction,
  18. Transaction,
  19. UnsignedTransaction,
  20. encode_transaction,
  21. serializable_unsigned_transaction_from_dict,
  22. strip_signature,
  23. )
  24. from eth_account.typed_transactions import (
  25. TypedTransaction,
  26. )
  27. from eth_account.types import (
  28. Blobs,
  29. Bytes32,
  30. TransactionDictType,
  31. )
  32. CHAIN_ID_OFFSET = 35
  33. V_OFFSET = 27
  34. # signature versions
  35. PERSONAL_SIGN_VERSION = b"E" # Hex value 0x45
  36. INTENDED_VALIDATOR_SIGN_VERSION = b"\x00" # Hex value 0x00
  37. STRUCTURED_DATA_SIGN_VERSION = b"\x01" # Hex value 0x01
  38. def sign_transaction_dict(
  39. eth_key: PrivateKey,
  40. transaction_dict: TransactionDictType,
  41. blobs: Optional[Blobs] = None,
  42. ) -> Tuple[int, int, int, bytes]:
  43. # generate RLP-serializable transaction, with defaults filled
  44. unsigned_transaction = serializable_unsigned_transaction_from_dict(
  45. transaction_dict, blobs=blobs
  46. )
  47. transaction_hash = unsigned_transaction.hash()
  48. # detect chain
  49. if isinstance(unsigned_transaction, UnsignedTransaction):
  50. chain_id = None
  51. (v, r, s) = sign_transaction_hash(eth_key, transaction_hash, chain_id)
  52. elif isinstance(unsigned_transaction, Transaction):
  53. chain_id = unsigned_transaction.v
  54. (v, r, s) = sign_transaction_hash(eth_key, transaction_hash, chain_id)
  55. elif isinstance(unsigned_transaction, TypedTransaction):
  56. # Each transaction type dictates its payload, and consequently,
  57. # all the funky logic around the `v` signature field is both obsolete &&
  58. # incorrect. We want to obtain the raw `v` and delegate
  59. # to the transaction type itself.
  60. (v, r, s) = eth_key.sign_msg_hash(transaction_hash).vrs
  61. else:
  62. # Cannot happen, but better for code to be defensive + self-documenting.
  63. raise TypeError(f"unknown Transaction object: {type(unsigned_transaction)}")
  64. # serialize transaction with rlp
  65. encoded_transaction = encode_transaction(unsigned_transaction, vrs=(v, r, s))
  66. return (v, r, s, encoded_transaction)
  67. def hash_of_signed_transaction(txn_obj: Transaction) -> Bytes32:
  68. """
  69. Regenerate the hash of the signed transaction object.
  70. 1. Infer the chain ID from the signature
  71. 2. Strip out signature from transaction
  72. 3. Annotate the transaction with that ID, if available
  73. 4. Take the hash of the serialized, unsigned, chain-aware transaction
  74. Chain ID inference and annotation is according to EIP-155
  75. See details at https://github.com/ethereum/EIPs/blob/master/EIPS/eip-155.md
  76. :return: the hash of the provided transaction, to be signed
  77. """ # blocklint: URL pragma
  78. (chain_id, _v) = extract_chain_id(txn_obj.v)
  79. unsigned_parts = strip_signature(txn_obj)
  80. if chain_id is None:
  81. signable_transaction = UnsignedTransaction(*unsigned_parts)
  82. else:
  83. extended_transaction = unsigned_parts + [chain_id, 0, 0]
  84. signable_transaction = ChainAwareUnsignedTransaction(*extended_transaction)
  85. return signable_transaction.hash()
  86. def extract_chain_id(raw_v: int) -> Tuple[Optional[int], int]:
  87. """
  88. Extracts chain ID, according to EIP-155.
  89. @return (chain_id, v)
  90. """
  91. above_id_offset = raw_v - CHAIN_ID_OFFSET
  92. if above_id_offset < 0:
  93. if raw_v in {0, 1}:
  94. return (None, raw_v + V_OFFSET)
  95. elif raw_v in {27, 28}:
  96. return (None, raw_v)
  97. else:
  98. raise ValueError(
  99. f"v {repr(raw_v)} is invalid, must be one of: 0, 1, 27, 28, 35+"
  100. )
  101. else:
  102. (chain_id, v_bit) = divmod(above_id_offset, 2)
  103. return (chain_id, v_bit + V_OFFSET)
  104. def to_standard_signature_bytes(ethereum_signature_bytes: bytes) -> bytes:
  105. rs = ethereum_signature_bytes[:-1]
  106. v = to_int(ethereum_signature_bytes[-1])
  107. standard_v = to_standard_v(v)
  108. return rs + to_bytes(standard_v)
  109. def to_standard_v(enhanced_v: int) -> int:
  110. (_chain, chain_naive_v) = extract_chain_id(enhanced_v)
  111. v_standard = chain_naive_v - V_OFFSET
  112. assert v_standard in {0, 1}
  113. return v_standard
  114. def to_eth_v(v_raw: int, chain_id: Optional[int] = None) -> int:
  115. if chain_id is None:
  116. v = v_raw + V_OFFSET
  117. else:
  118. v = v_raw + CHAIN_ID_OFFSET + 2 * chain_id
  119. return v
  120. def sign_transaction_hash(
  121. account: PrivateKey, transaction_hash: Bytes32, chain_id: Optional[int] = None
  122. ) -> Tuple[int, int, int]:
  123. signature = account.sign_msg_hash(transaction_hash)
  124. (v_raw, r, s) = signature.vrs
  125. v = to_eth_v(v_raw, chain_id)
  126. return (v, r, s)
  127. def _pad_to_eth_word(bytes_val: bytes) -> Bytes32:
  128. return bytes_val.rjust(32, b"\0")
  129. def to_bytes32(val: int) -> Bytes32:
  130. return cast(
  131. bytes,
  132. pipe(
  133. val,
  134. to_bytes,
  135. _pad_to_eth_word,
  136. ),
  137. )
  138. def sign_message_hash(
  139. key: PrivateKey, msg_hash: Bytes32
  140. ) -> Tuple[int, int, int, bytes]:
  141. signature = key.sign_msg_hash(msg_hash)
  142. (v_raw, r, s) = signature.vrs
  143. v = to_eth_v(v_raw)
  144. eth_signature_bytes = to_bytes32(r) + to_bytes32(s) + to_bytes(v)
  145. return (v, r, s, eth_signature_bytes)