| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- from eth_utils import (
- big_endian_to_int,
- )
- from eth_keys.datatypes import (
- BaseSignature,
- NonRecoverableSignature,
- PrivateKey,
- PublicKey,
- Signature,
- )
- from eth_keys.exceptions import (
- BadSignature,
- )
- from eth_keys.utils import (
- der,
- )
- from eth_keys.utils.numeric import (
- coerce_low_s,
- )
- from eth_keys.validation import (
- validate_uncompressed_public_key_bytes,
- )
- from .base import (
- BaseECCBackend,
- )
- def is_coincurve_available() -> bool:
- try:
- import coincurve # noqa: F401
- except ImportError:
- return False
- else:
- return True
- class CoinCurveECCBackend(BaseECCBackend):
- def __init__(self) -> None:
- try:
- import coincurve
- except ImportError:
- raise ImportError(
- "The CoinCurveECCBackend requires the coincurve "
- "library which is not available for import."
- )
- self.keys = coincurve.keys
- self.ecdsa = coincurve.ecdsa
- super().__init__()
- def ecdsa_sign(self, msg_hash: bytes, private_key: PrivateKey) -> Signature:
- private_key_bytes = private_key.to_bytes()
- signature_bytes = self.keys.PrivateKey(private_key_bytes).sign_recoverable(
- msg_hash,
- hasher=None,
- )
- signature = Signature(signature_bytes, backend=self)
- return signature
- def ecdsa_sign_non_recoverable(
- self, msg_hash: bytes, private_key: PrivateKey
- ) -> NonRecoverableSignature:
- private_key_bytes = private_key.to_bytes()
- der_encoded_signature = self.keys.PrivateKey(private_key_bytes).sign(
- msg_hash,
- hasher=None,
- )
- rs = der.two_int_sequence_decoder(der_encoded_signature)
- signature = NonRecoverableSignature(rs=rs, backend=self)
- return signature
- def ecdsa_verify(
- self, msg_hash: bytes, signature: BaseSignature, public_key: PublicKey
- ) -> bool:
- # coincurve rejects signatures with a high s,
- # so convert to the equivalent low s form
- low_s = coerce_low_s(signature.s)
- der_encoded_signature = der.two_int_sequence_encoder(signature.r, low_s)
- coincurve_public_key = self.keys.PublicKey(b"\x04" + public_key.to_bytes())
- return coincurve_public_key.verify(
- der_encoded_signature,
- msg_hash,
- hasher=None,
- )
- def ecdsa_recover(self, msg_hash: bytes, signature: Signature) -> PublicKey:
- signature_bytes = signature.to_bytes()
- try:
- public_key_bytes = self.keys.PublicKey.from_signature_and_message(
- signature_bytes,
- msg_hash,
- hasher=None,
- ).format(compressed=False)[1:]
- except Exception as err:
- raise BadSignature(str(err))
- public_key = PublicKey(public_key_bytes, backend=self)
- return public_key
- def private_key_to_public_key(self, private_key: PrivateKey) -> PublicKey:
- public_key_bytes = self.keys.PrivateKey(
- private_key.to_bytes()
- ).public_key.format(compressed=False,)[1:]
- return PublicKey(public_key_bytes, backend=self)
- def decompress_public_key_bytes(self, compressed_public_key_bytes: bytes) -> bytes:
- public_key = self.keys.PublicKey(compressed_public_key_bytes)
- return public_key.format(compressed=False)[1:]
- def compress_public_key_bytes(self, uncompressed_public_key_bytes: bytes) -> bytes:
- validate_uncompressed_public_key_bytes(uncompressed_public_key_bytes)
- point = (
- big_endian_to_int(uncompressed_public_key_bytes[:32]),
- big_endian_to_int(uncompressed_public_key_bytes[32:]),
- )
- public_key = self.keys.PublicKey.from_point(*point)
- return public_key.format(compressed=True)
|