diff --git a/pactus/crypto/address.py b/pactus/crypto/address.py index d1a95c7..9ae4b79 100644 --- a/pactus/crypto/address.py +++ b/pactus/crypto/address.py @@ -16,6 +16,7 @@ class AddressType(Enum): VALIDATOR = 1 BLS_ACCOUNT = 2 ED25519_ACCOUNT = 3 + SECP256K1_ACCOUNT = 4 class Address: diff --git a/pactus/crypto/bls/__init__.py b/pactus/crypto/bls/__init__.py index 4c1784c..4da4e2b 100644 --- a/pactus/crypto/bls/__init__.py +++ b/pactus/crypto/bls/__init__.py @@ -3,4 +3,3 @@ from .signature import DST, SIGNATURE_TYPE_BLS, Signature __all__ = ["DST", "SIGNATURE_TYPE_BLS", "PrivateKey", "PublicKey", "Signature"] - diff --git a/pactus/crypto/bls/bls12_381/consts.py b/pactus/crypto/bls/bls12_381/consts.py index 58a1150..ac0e862 100644 --- a/pactus/crypto/bls/bls12_381/consts.py +++ b/pactus/crypto/bls/bls12_381/consts.py @@ -12,8 +12,8 @@ k_final = (p**4 - p**2 + 1) // q # ciphersuite numbers -_gsuite = ( - lambda stype, group, stag: b"BLS_" +_gsuite = lambda stype, group, stag: ( + b"BLS_" + stype + b"_BLS12381G" + group diff --git a/pactus/crypto/ed25519/__init__.py b/pactus/crypto/ed25519/__init__.py index e6e249f..aa96f5e 100644 --- a/pactus/crypto/ed25519/__init__.py +++ b/pactus/crypto/ed25519/__init__.py @@ -3,4 +3,3 @@ from .signature import SIGNATURE_TYPE_ED25519, Signature __all__ = ["SIGNATURE_TYPE_ED25519", "PrivateKey", "PublicKey", "Signature"] - diff --git a/pactus/crypto/secp256k1/__init__.py b/pactus/crypto/secp256k1/__init__.py new file mode 100644 index 0000000..717eabd --- /dev/null +++ b/pactus/crypto/secp256k1/__init__.py @@ -0,0 +1,5 @@ +from .private_key import PrivateKey +from .public_key import PublicKey +from .signature import SIGNATURE_TYPE_SECP256K1, Signature + +__all__ = ["SIGNATURE_TYPE_SECP256K1", "PrivateKey", "PublicKey", "Signature"] diff --git a/pactus/crypto/secp256k1/private_key.py b/pactus/crypto/secp256k1/private_key.py new file mode 100644 index 0000000..6ba71ca --- /dev/null +++ b/pactus/crypto/secp256k1/private_key.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import secp256k1 + +from pactus.crypto.hrp import HRP +from pactus.utils import utils + +from .public_key import PublicKey +from .signature import SIGNATURE_TYPE_SECP256K1, Signature + +PRIVATE_KEY_SIZE = 32 + + +class PrivateKey: + def __init__(self, scalar: secp256k1.PrivateKey) -> None: + self.scalar = scalar + + @classmethod + def from_bytes(cls, buffer: bytes) -> PrivateKey: + sk = secp256k1.PrivateKey() + sk.set_raw_privkey(buffer) + return cls(sk) + + @classmethod + def random(cls) -> PrivateKey: + sk = secp256k1.PrivateKey() + return cls(sk) + + @classmethod + def from_string(cls, text: str) -> PrivateKey: + hrp, typ, data = utils.decode_to_base256_with_type(text) + + if hrp != HRP.PRIVATE_KEY_HRP: + msg = f"Invalid hrp: {hrp}" + raise ValueError(msg) + + if typ != SIGNATURE_TYPE_SECP256K1: + msg = f"Invalid Private key type: {typ}" + raise ValueError(msg) + + if len(data) != PRIVATE_KEY_SIZE: + msg = "Private key data must be 32 bytes long" + raise ValueError(msg) + + sk = secp256k1.PrivateKey() + sk.set_raw_privkey(bytes(data)) + return cls(sk) + + def raw_bytes(self) -> bytes: + # serialize() returns a hex string, convert to bytes + hex_str = self.scalar.serialize() + return bytes.fromhex(hex_str) + + def string(self) -> str: + return utils.encode_from_base256_with_type( + HRP.PRIVATE_KEY_HRP, + SIGNATURE_TYPE_SECP256K1, + self.raw_bytes(), + ) + + def public_key(self) -> PublicKey: + return PublicKey(self.scalar.pubkey) + + def sign(self, msg: bytes) -> Signature: + sig = self.scalar.ecdsa_sign(msg) + sig_compact = self.scalar.ecdsa_serialize_compact(sig) + return Signature(sig_compact) diff --git a/pactus/crypto/secp256k1/public_key.py b/pactus/crypto/secp256k1/public_key.py new file mode 100644 index 0000000..12b5137 --- /dev/null +++ b/pactus/crypto/secp256k1/public_key.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import hashlib + +import secp256k1 +from ripemd.ripemd160 import ripemd160 + +from pactus.crypto.address import Address, AddressType +from pactus.crypto.hrp import HRP +from pactus.utils import utils + +from .signature import SIGNATURE_TYPE_SECP256K1, Signature + +PUBLIC_KEY_SIZE = 33 # Compressed public key + + +class PublicKey: + def __init__(self, pub: secp256k1.PublicKey) -> None: + self.pub = pub + + @classmethod + def from_string(cls, text: str) -> PublicKey: + hrp, typ, data = utils.decode_to_base256_with_type(text) + + if hrp != HRP.PUBLIC_KEY_HRP: + msg = f"Invalid hrp: {hrp}" + raise ValueError(msg) + + if typ != SIGNATURE_TYPE_SECP256K1: + msg = f"Invalid Public key type: {typ}" + raise ValueError(msg) + + if len(data) != PUBLIC_KEY_SIZE: + msg = "Public key data must be 33 bytes long" + raise ValueError(msg) + + pub_key = secp256k1.PublicKey(bytes(data), raw=True) + + return cls(pub_key) + + def raw_bytes(self) -> bytes: + return self.pub.serialize(compressed=True) + + def string(self) -> str: + return utils.encode_from_base256_with_type( + HRP.PUBLIC_KEY_HRP, + SIGNATURE_TYPE_SECP256K1, + self.raw_bytes(), + ) + + def account_address(self) -> Address: + blake2b = hashlib.blake2b(digest_size=32) + blake2b.update(self.raw_bytes()) + hash_256 = blake2b.digest() + hash_160 = ripemd160(hash_256) + + return Address(AddressType.SECP256K1_ACCOUNT, hash_160) + + def verify(self, msg: bytes, sig: Signature) -> bool: + try: + sig_compact = sig.raw_bytes() + sig_deserialized = self.pub.ecdsa_deserialize_compact(sig_compact) + return self.pub.ecdsa_verify(msg, sig_deserialized) + + # ruff: noqa: BLE001 # unable to fix this issue + except Exception: + return False diff --git a/pactus/crypto/secp256k1/signature.py b/pactus/crypto/secp256k1/signature.py new file mode 100644 index 0000000..7b0ea5c --- /dev/null +++ b/pactus/crypto/secp256k1/signature.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +SIGNATURE_SIZE = 64 +SIGNATURE_TYPE_SECP256K1 = 4 + + +class Signature: + def __init__(self, sig: bytes) -> None: + self.sig = sig + + @classmethod + def from_string(cls, text: str) -> Signature: + data = bytes.fromhex(text) + + if len(data) != SIGNATURE_SIZE: + msg = "Signature data must be 64 bytes long" + raise ValueError(msg) + + return cls(data) + + def raw_bytes(self) -> bytes: + return self.sig + + def string(self) -> str: + return self.sig.hex() diff --git a/pactus/crypto/sss/sss.py b/pactus/crypto/sss/sss.py index 8f66994..802fd17 100644 --- a/pactus/crypto/sss/sss.py +++ b/pactus/crypto/sss/sss.py @@ -6,6 +6,7 @@ See the bottom few lines for usage. Tested on Python 2 and 3. """ + from __future__ import annotations import functools @@ -100,7 +101,6 @@ def make_random_shares(secret: int, minimum: int, shares: int, prime: int) -> li return [(i, _eval_at(poly, i, prime)) for i in range(1, shares + 1)] - def recover_secret(shares: list[tuple[int, int]], prime: int) -> int: """ Recover the secret from share points diff --git a/pactus/types/amount.py b/pactus/types/amount.py index 7083f77..de8595d 100644 --- a/pactus/types/amount.py +++ b/pactus/types/amount.py @@ -89,4 +89,3 @@ def round(self: float) -> float: return self - 0.5 return self + 0.5 - diff --git a/setup.py b/setup.py index 89a2338..f5f3a85 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ "grpcio", "grpcio-tools", "cryptography>=43.0", + "secp256k1", "zmq", ], keywords=["pactus", "blockchain", "sdk", "web3", "dapp", "bls", "bech32"], diff --git a/tests/test_crypto_secp256k1.py b/tests/test_crypto_secp256k1.py new file mode 100644 index 0000000..e6273ae --- /dev/null +++ b/tests/test_crypto_secp256k1.py @@ -0,0 +1,154 @@ +import unittest + +from pactus.crypto.secp256k1 import PrivateKey as Secp256k1PrivateKey +from pactus.crypto.secp256k1 import PublicKey as Secp256k1PublicKey +from pactus.crypto.secp256k1 import Signature as Secp256k1Signature + + +class TestSecp256k1Crypto(unittest.TestCase): + def test_private_key_to_public_key(self): + prv_str = "secret1yqyqszqgpqyqszqgpqyqszqgpqyqszqgpqyqszqgpqyqszqgpqyqsd25y3e" + expected_pub_str = ( + "public1yqvdcf32k0vfxgsyet5ldt246q4jaw8scx3sysx0lnstlt6w4m5rc7k3ysjp" + ) + + prv = Secp256k1PrivateKey.from_string(prv_str) + pub = prv.public_key() + pub_str = pub.string() + + self.assertEqual(pub_str, expected_pub_str) + + def test_public_key_to_address(self): + pub_str = "public1yqvdcf32k0vfxgsyet5ldt246q4jaw8scx3sysx0lnstlt6w4m5rc7k3ysjp" + expected_acc_addr_str = "pc1yj7ag28h54jf4e09nnednjhgmg60srnvj7uu39v" + + pub = Secp256k1PublicKey.from_string(pub_str) + acc_add_str = pub.account_address().string() + + self.assertEqual(acc_add_str, expected_acc_addr_str) + + def test_sign_and_verify(self): + prv_str = "secret1yqyqszqgpqyqszqgpqyqszqgpqyqszqgpqyqszqgpqyqszqgpqyqsd25y3e" + prv = Secp256k1PrivateKey.from_string(prv_str) + pub = prv.public_key() + msg = b"pactus" + sig = prv.sign(msg) + + # Verify the signature is correct length (64 bytes) + self.assertEqual(len(sig.raw_bytes()), 64) + + # Verify the signature verifies correctly + self.assertTrue(pub.verify(msg, sig)) + + # Verify an invalid signature is rejected + invalid_sig_bytes = b"\x00" * 64 + invalid_sig = Secp256k1Signature.from_string(invalid_sig_bytes.hex()) + self.assertFalse(pub.verify(msg, invalid_sig)) + + # Verify a signature for a different message is rejected + different_msg = b"different" + sig2 = prv.sign(different_msg) + self.assertFalse(pub.verify(msg, sig2)) + + def test_key_generation(self): + # Test random key generation + prv = Secp256k1PrivateKey.random() + self.assertIsNotNone(prv) + + # Test that we can derive public key from random private key + pub = prv.public_key() + self.assertIsNotNone(pub) + + # Test that we can sign and verify with random key + msg = b"test message" + sig = prv.sign(msg) + self.assertTrue(pub.verify(msg, sig)) + + def test_key_from_bytes(self): + # Test creating private key from bytes + key_bytes = b"\x01" * 32 + prv = Secp256k1PrivateKey.from_bytes(key_bytes) + self.assertEqual(prv.raw_bytes(), key_bytes) + + # Test that the same bytes produce the same public key + prv2 = Secp256k1PrivateKey.from_bytes(key_bytes) + pub1 = prv.public_key() + pub2 = prv2.public_key() + self.assertEqual(pub1.string(), pub2.string()) + + def test_string_encoding_decoding(self): + # Test private key string encoding/decoding + prv = Secp256k1PrivateKey.random() + prv_str = prv.string() + prv_decoded = Secp256k1PrivateKey.from_string(prv_str) + self.assertEqual(prv.raw_bytes(), prv_decoded.raw_bytes()) + + # Test public key string encoding/decoding + pub = prv.public_key() + pub_str = pub.string() + pub_decoded = Secp256k1PublicKey.from_string(pub_str) + self.assertEqual(pub.raw_bytes(), pub_decoded.raw_bytes()) + + def test_signature_from_string(self): + # Test signature creation from hex string + sig_hex = "a" * 128 # 64 bytes = 128 hex chars + sig = Secp256k1Signature.from_string(sig_hex) + self.assertEqual(len(sig.raw_bytes()), 64) + self.assertEqual(sig.string(), sig_hex) + + def test_signature_invalid_length(self): + # Test that invalid signature length raises error + with self.assertRaises(ValueError): + Secp256k1Signature.from_string("00" * 32) # Too short + + with self.assertRaises(ValueError): + Secp256k1Signature.from_string("00" * 100) # Too long + + def test_private_key_invalid_string(self): + # Test that invalid private key string raises error + with self.assertRaises(ValueError): + # Wrong HRP + Secp256k1PrivateKey.from_string( + "invalid1yqyqszqgpqyqszqgpqyqszqgpqyqszqgpqyqszqgpqyqszqgpqyqsd25y3e" + ) + + with self.assertRaises(ValueError): + # Wrong type + Secp256k1PrivateKey.from_string( + "secret1ry2cqw5yfhmr7ve8nctgzg6wgcyc73xqr2uud486jgsq7wu253egsx6msep" + ) + + def test_public_key_invalid_string(self): + # Test that invalid public key string raises error + with self.assertRaises(ValueError): + # Wrong HRP + Secp256k1PublicKey.from_string( + "invalid1yqvdcf32k0vfxgsyet5ldt246q4jaw8scx3sysx0lnstlt6w4m5rc7k3ysjp" + ) + + with self.assertRaises(ValueError): + # Wrong type + Secp256k1PublicKey.from_string( + "public1ry2cqw5yfhmr7ve8nctgzg6wgcyc73xqr2uud486jgsq7wu253egsx6msep" + ) + + def test_multiple_signatures(self): + # Test that multiple signatures for the same message and key are deterministic + # (secp256k1 library uses deterministic signing with RFC 6979) + prv = Secp256k1PrivateKey.random() + pub = prv.public_key() + msg = b"same message" + + sig1 = prv.sign(msg) + sig2 = prv.sign(msg) + + # Both signatures should verify + self.assertTrue(pub.verify(msg, sig1)) + self.assertTrue(pub.verify(msg, sig2)) + + # Signatures should be deterministic (same message + same key = same signature) + self.assertEqual(sig1.string(), sig2.string()) + + +if __name__ == "__main__": + unittest.main()