diff --git a/cryptnox_cli/command/command.py b/cryptnox_cli/command/command.py index 0f2d894..d74bbc7 100644 --- a/cryptnox_cli/command/command.py +++ b/cryptnox_cli/command/command.py @@ -55,6 +55,7 @@ def _handle_execution(self, serial_number: int = None) -> int: try: self.serial_number = self.data.serial except AttributeError: + # No serial provided in arguments; keep the passed-in serial_number pass try: card = self._cards[self.serial_number] @@ -67,6 +68,8 @@ def _handle_execution(self, serial_number: int = None) -> int: self.run_execute(card) + return None + def run_execute(self, card) -> int: print(f"Using card with serial number {card.serial_number}") origin = card.origin diff --git a/cryptnox_cli/command/erc_token/initialize.py b/cryptnox_cli/command/erc_token/initialize.py index e2089bb..40c0023 100644 --- a/cryptnox_cli/command/erc_token/initialize.py +++ b/cryptnox_cli/command/erc_token/initialize.py @@ -7,10 +7,10 @@ import gzip import json -import urllib +import urllib.error +import urllib.parse from argparse import Namespace from typing import List -from urllib import parse import cryptnox_sdk_py import requests @@ -202,7 +202,7 @@ def _token_slots() -> list[str]: def _abi() -> str: def uri_validator(x): try: - result = parse.urlparse(x) + result = urllib.parse.urlparse(x) return all([result.scheme, result.netloc]) except Exception: return False diff --git a/cryptnox_cli/command/eth.py b/cryptnox_cli/command/eth.py index 3e2673b..d436608 100644 --- a/cryptnox_cli/command/eth.py +++ b/cryptnox_cli/command/eth.py @@ -169,6 +169,8 @@ def logs(self): endpoint.block_number save_to_config(self.card, self.config) + return None + @staticmethod def _get_logs(event) -> List[Dict[str, Any]]: min_offset = 0 @@ -256,6 +258,8 @@ def _execute(self, card): print(error) return -1 + return None + def _get_endpoint(self, card): config = get_configuration(card) @@ -311,6 +315,8 @@ def _add(self, card): print(f"Contract added to application. Use it with alias:" f" {self.data.alias}") + return None + @staticmethod def _list(card) -> int: config = get_configuration(card) @@ -356,6 +362,8 @@ def _functions(self, card): print(tabulate(tabulate_table, headers=tabulate_header, tablefmt="grid")) + return None + def _call(self, card): config = get_configuration(card) try: @@ -645,3 +653,5 @@ def _send_token(self, card): contract.transfer(card, config["endpoint"], config["network"], config["api_key"], self.data.contract, self.data.address, self.data.amount, self.data.price, self.data.limit, derivation) + + return None diff --git a/cryptnox_cli/command/factory.py b/cryptnox_cli/command/factory.py index ea21bd4..0571d26 100644 --- a/cryptnox_cli/command/factory.py +++ b/cryptnox_cli/command/factory.py @@ -27,7 +27,7 @@ def command(data: Namespace, cards: CardManager = None) -> Command: 'btc', 'card_configuration', 'change_pin', 'change_puk', 'config', 'eth', 'history', 'info', 'initialize', 'seed', 'cards', 'server', 'reset', 'unlock_pin', 'user_key', 'transfer', 'get_xpub', - 'get_clearpubkey', 'decrypt', 'manufacturer_certificate' + 'get_clearpubkey', 'decrypt', 'manufacturer_certificate', 'musig2' ] for module_name in command_modules: diff --git a/cryptnox_cli/command/get_clearpubkey.py b/cryptnox_cli/command/get_clearpubkey.py index 1f08b29..d715b71 100644 --- a/cryptnox_cli/command/get_clearpubkey.py +++ b/cryptnox_cli/command/get_clearpubkey.py @@ -98,6 +98,7 @@ def _execute(self, card) -> int: else: card.derive(key_type, "m/44'/0'/0'") except Exception: + # Best-effort derivation; clear pubkey read below handles failures pass pubkey_bytes = card.get_public_key_clear(derivation, path, compressed) self._print_pubkey_info(key_type, compressed, pubkey_bytes) diff --git a/cryptnox_cli/command/helper/cards.py b/cryptnox_cli/command/helper/cards.py index f048b50..532d70d 100644 --- a/cryptnox_cli/command/helper/cards.py +++ b/cryptnox_cli/command/helper/cards.py @@ -113,6 +113,7 @@ def refresh(self, remote: bool = False) -> None: except cryptnox_sdk_py.exceptions.ReaderException: break except cryptnox_sdk_py.exceptions.CryptnoxException: + # Card at this index is unreadable; skip it and continue scanning pass index += 1 @@ -194,7 +195,7 @@ def _open_card(self, index: int, remote: bool = False) -> cryptnox_sdk_py.Card: if test_response: if index in self._cards_by_index: return self._cards_by_index[index] - except (BaseException, cryptnox_sdk_py.exceptions.ConnectionException): + except (Exception, cryptnox_sdk_py.exceptions.ConnectionException): # Connection is stale, remove it and create new one del _GLOBAL_CONNECTIONS[index] if index in self._cards_by_index: diff --git a/cryptnox_cli/command/helper/config.py b/cryptnox_cli/command/helper/config.py index 54c4ee4..439b037 100644 --- a/cryptnox_cli/command/helper/config.py +++ b/cryptnox_cli/command/helper/config.py @@ -185,6 +185,7 @@ def write_config(card: cryptnox_sdk_py.Card, section: str, key: str, value: str) print(error) return 1 except AttributeError: + # Key not present on the validator instance; fall through to config handling pass try: diff --git a/cryptnox_cli/command/helper/helper_methods.py b/cryptnox_cli/command/helper/helper_methods.py index 576e1ce..cd23830 100644 --- a/cryptnox_cli/command/helper/helper_methods.py +++ b/cryptnox_cli/command/helper/helper_methods.py @@ -102,5 +102,6 @@ def try_eval(value: str) -> Any: try: value = ast.literal_eval(value) except ValueError: + # Not a literal; keep the original string value pass return value diff --git a/cryptnox_cli/command/helper/security.py b/cryptnox_cli/command/helper/security.py index 64081c6..58724b5 100644 --- a/cryptnox_cli/command/helper/security.py +++ b/cryptnox_cli/command/helper/security.py @@ -140,6 +140,7 @@ def check(card, check_seed: bool = True) -> bool: try: result = user_keys.authenticate(card) except NotImplementedError: + # User-key auth unsupported here; fall back to PIN authentication below pass if not result: diff --git a/cryptnox_cli/command/musig2.py b/cryptnox_cli/command/musig2.py new file mode 100644 index 0000000..c93f7c3 --- /dev/null +++ b/cryptnox_cli/command/musig2.py @@ -0,0 +1,479 @@ +# -*- coding: utf-8 -*- +""" +Module containing the ``musig2`` command for MuSig2 (BIP-327) multi-signature +operations with Cryptnox Basic G2 cards. + +The Basic G2 card exposes three extra instructions used here over the encrypted +secure channel: + +* ``C2 00 01`` -- read the card's MuSig2 public key +* ``C7 00 00`` -- generate this card's pair of signing nonces (R1 || R2) +* ``C8 01 00`` -- load a cosigner public key (repeated for every signer) +* ``C8 02 00`` -- produce this card's partial signature + +The byte layout of these calls matches the reference ``cryptnox-cli-musig2`` +scripts, which were validated against the card firmware. All public-key/nonce +aggregation and the Schnorr/Taproot math are done on the host in +:mod:`cryptnox_cli.command.musig2_crypto`. + +Because MuSig2 needs every signer's card and this build targets a single reader, +the command walks the user through inserting each card in turn (the same +"insert card N, press ENTER" flow as the reference scripts). +""" + +import hashlib + +import cryptnox_sdk_py +import requests + +from .command import Command +from .helper.cards import ExitException +from .helper.security import check +from . import musig2_crypto as mc + +try: + import enums +except ImportError: + from .. import enums + + +# MuSig2 applet instructions (see module docstring) +_INS_GET_PUBKEY = [0x00, 0xC2, 0x00, 0x01] +_INS_NONCE_GEN = [0x00, 0xC7, 0x00, 0x00] +_INS_SIGN = 0xC8 +_P1_LOAD_COSIGNER = 0x01 +_P1_PARTIAL_SIGN = 0x02 + +# Length of the partial-signature input: aggR1(65) + aggR2(65) + aggPk(33) + msg(32) +_SIGN_DATA_LENGTH = 195 + +_NETWORKS = { + "testnet": {"hrp": "tb", "api": "https://mempool.space/testnet/api"}, + "mainnet": {"hrp": "bc", "api": "https://mempool.space/api"}, +} + +# Standard dust threshold (sats) below which an output is not economically spendable. +_DUST_LIMIT = 546 + + +class Musig2(Command): + """ + Command for MuSig2 (BIP-327) multi-signature operations on Basic G2 cards. + """ + _name = enums.Command.MUSIG2.value + + # ------------------------------------------------------------------ + # Entry point -- overrides Command.execute() because this command drives + # several cards through a single reader instead of one pre-selected card. + # ------------------------------------------------------------------ + def execute(self, serial_number: int = None) -> int: + self._debug = bool(getattr(self.data, "verbose", False)) + action = getattr(self.data, "action", None) + + try: + if action == "address": + return self._address() + if action == "sign": + return self._sign() + if action == "send": + return self._send() + except KeyboardInterrupt: + print("\nCancelled by user.") + return -1 + except cryptnox_sdk_py.exceptions.GenericException as error: + print(f"\nThe card rejected a MuSig2 command (status 0x{error.status.hex().upper()}).") + print("This usually means the card is not a Basic G2 card with MuSig2 support.") + return -2 + except cryptnox_sdk_py.exceptions.CryptnoxException as error: + print(error) + return -1 + except cryptnox_sdk_py.exceptions.CardClosedException: + print("\nLost contact with the card. Please keep the card on the reader and retry.") + return -1 + except (ExitException, EOFError): + print("\nCancelled by user.") + return -1 + + print("Unknown MuSig2 action. Use 'address', 'sign' or 'send'.") + return -1 + + # _execute is required by the abstract base class but is never reached + # because execute() is overridden for the multi-card flow. + def _execute(self, card) -> int: # pragma: no cover - not used + raise NotImplementedError + + # ------------------------------------------------------------------ + # Card connection / reader helpers (single reader, card swaps) + # ------------------------------------------------------------------ + def _reader_index(self) -> int: + return int(getattr(self.data, "reader", 0) or 0) + + def _connect(self, label: str): + """Open a fresh connection, recognise the card and verify it for signing.""" + connection = cryptnox_sdk_py.Connection(self._reader_index(), self._debug) + card = cryptnox_sdk_py.factory.get_card(connection, self._debug) + + if card.type != ord("B"): + connection.disconnect() + raise cryptnox_sdk_py.exceptions.CryptnoxException( + "MuSig2 is only supported on Cryptnox Basic G2 cards.") + + # Ensure the card is initialised, has a seed and is authenticated (PIN). + check(card) + + print(f" [{label}] connected, serial: {card.serial_number}") + return connection, card + + @staticmethod + def _wait_swap(message: str) -> None: + input(f"\n>>> {message}, then press ENTER...") + + # ------------------------------------------------------------------ + # MuSig2 card instructions + # ------------------------------------------------------------------ + @staticmethod + def _get_pubkey(connection) -> bytes: + return mc.compressed_pubkey(connection.send_encrypted(_INS_GET_PUBKEY, b"")) + + @staticmethod + def _nonce_gen(connection): + response = connection.send_encrypted(_INS_NONCE_GEN, b"") + if len(response) < 66: + raise cryptnox_sdk_py.exceptions.DataException( + "Bad nonce response from card (expected 66 bytes).") + return response[:33], response[33:66] + + @staticmethod + def _load_cosigner(connection, pk: bytes) -> None: + connection.send_encrypted([0x00, _INS_SIGN, _P1_LOAD_COSIGNER, 0x00], pk) + + @staticmethod + def _partial_sign(connection, data: bytes) -> bytes: + return connection.send_encrypted([0x00, _INS_SIGN, _P1_PARTIAL_SIGN, 0x00], data) + + def _sign_with_card(self, connection, sorted_pks, sign_data: bytes) -> bytes: + for pk in sorted_pks: + self._load_cosigner(connection, pk) + return self._partial_sign(connection, sign_data) + + # ------------------------------------------------------------------ + # Phase 1 -- collect every card's public key and nonces + # ------------------------------------------------------------------ + def _collect_pubkeys_and_nonces(self, num_signers: int): + """ + Walk through every card once, reading its public key and nonces. + + :return: tuple (pubkeys, nonces_r1, nonces_r2, last_connection). The last + card's connection is left open so it can sign first in phase 2. + """ + pubkeys, nonces_r1, nonces_r2 = [], [], [] + previous_connection = None + + for index in range(1, num_signers + 1): + if previous_connection is not None: + previous_connection.disconnect() + self._wait_swap(f"Insert CARD {index}") + + print(f"\n--- Phase 1.{index}: public key + nonce generation ---") + connection, _ = self._connect(f"Card{index}") + pubkey = self._get_pubkey(connection) + nonce_r1, nonce_r2 = self._nonce_gen(connection) + print(f" pk{index}: {pubkey.hex()}") + pubkeys.append(pubkey) + nonces_r1.append(nonce_r1) + nonces_r2.append(nonce_r2) + previous_connection = connection + + return pubkeys, nonces_r1, nonces_r2, previous_connection + + # ------------------------------------------------------------------ + # Phase 2 -- collect a partial signature from every card + # ------------------------------------------------------------------ + def _collect_partial_signatures(self, num_signers: int, sorted_pks, sign_data: bytes, + last_connection) -> list: + """Gather one partial signature per card, starting with the inserted last card.""" + partial_sigs = [None] * num_signers + + print(f"\n--- Phase 2.{num_signers}: partial signature (card already inserted) ---") + partial_sigs[num_signers - 1] = self._sign_with_card(last_connection, sorted_pks, sign_data) + print(f" s{num_signers}: {partial_sigs[num_signers - 1].hex()}") + previous_connection = last_connection + + for index in range(1, num_signers): + previous_connection.disconnect() + self._wait_swap(f"Insert CARD {index}") + print(f"\n--- Phase 2.{index}: partial signature ---") + connection, _ = self._connect(f"Card{index}") + partial_sigs[index - 1] = self._sign_with_card(connection, sorted_pks, sign_data) + print(f" s{index}: {partial_sigs[index - 1].hex()}") + previous_connection = connection + + previous_connection.disconnect() + return partial_sigs + + @staticmethod + def _sum_partials(partial_sigs) -> int: + return sum(int.from_bytes(s, "big") for s in partial_sigs) % mc.N + + # ------------------------------------------------------------------ + # Argument helpers + # ------------------------------------------------------------------ + def _num_signers(self) -> int: + num = int(self.data.signers) + if num < 2: + raise cryptnox_sdk_py.exceptions.DataValidationException( + "MuSig2 needs at least 2 signers.") + return num + + def _message_hash(self) -> bytes: + text = getattr(self.data, "text", None) + message = getattr(self.data, "message", None) + if text is not None: + return hashlib.sha256(text.encode()).digest() + if message: + digest = bytes.fromhex(message) + if len(digest) != 32: + raise cryptnox_sdk_py.exceptions.DataValidationException( + "--message must be a 32-byte (64 hex character) value.") + return digest + raise cryptnox_sdk_py.exceptions.DataValidationException( + "Provide a message with --message or --text .") + + def _network(self): + name = getattr(self.data, "network", None) or "testnet" + return name, _NETWORKS[name] + + # ------------------------------------------------------------------ + # Sub-commands + # ------------------------------------------------------------------ + def _aggregate(self, num_signers: int): + """Phase 1 + host-side aggregation shared by all sub-commands.""" + pubkeys, nonces_r1, nonces_r2, last_connection = \ + self._collect_pubkeys_and_nonces(num_signers) + + sorted_pks = sorted(pubkeys) + aggpk_c, aggpk_x = mc.aggregate_pubkey(sorted_pks) + agg_r1, agg_r2 = mc.aggregate_nonces(nonces_r1, nonces_r2) + + print(f"\n Aggregate public key: {aggpk_c.hex()}") + return sorted_pks, aggpk_c, aggpk_x, agg_r1, agg_r2, last_connection + + def _address(self) -> int: + num_signers = self._num_signers() + _, network = self._network() + + print("=" * 60) + print(f" MuSig2 Taproot address -- {num_signers} signers") + print("=" * 60) + + sorted_pks, _, aggpk_x, _, _, last_connection = self._aggregate(num_signers) + last_connection.disconnect() + + _, output_x, _, _ = mc.taproot_output_key(aggpk_x) + address = mc.encode_taproot_address(output_x, network["hrp"]) + + print(f"\n Internal key (x-only): {aggpk_x.hex()}") + print(f" Output key (x-only): {output_x.hex()}") + print(f"\n *** Taproot address: {address} ***") + return 0 + + def _sign(self) -> int: + num_signers = self._num_signers() + message = self._message_hash() + + print("=" * 60) + print(f" MuSig2 sign -- {num_signers} signers") + print("=" * 60) + print(f" Message hash: {message.hex()}") + + sorted_pks, aggpk_c, aggpk_x, agg_r1, agg_r2, last_connection = \ + self._aggregate(num_signers) + + sign_data = mc.point_to_uncompressed(agg_r1) + mc.point_to_uncompressed(agg_r2) \ + + aggpk_c + message + assert len(sign_data) == _SIGN_DATA_LENGTH + + partial_sigs = self._collect_partial_signatures( + num_signers, sorted_pks, sign_data, last_connection) + + s_agg = self._sum_partials(partial_sigs) + nonce_x = mc.effective_nonce_x(agg_r1, agg_r2, aggpk_x, message) + signature = nonce_x + s_agg.to_bytes(32, "big") + + print("\n--- Result ---") + print(f" Signature: {signature.hex()}") + + if mc.schnorr_verify(aggpk_x, message, signature): + print(" BIP-340 Schnorr verification: PASS") + return 0 + + print(" BIP-340 Schnorr verification: FAIL") + return -1 + + def _send(self) -> int: + num_signers = self._num_signers() + network_name, network = self._network() + destination = self.data.address + + print("=" * 60) + print(f" MuSig2 Taproot transaction -- {num_signers} signers ({network_name})") + print("=" * 60) + + sorted_pks, aggpk_c, aggpk_x, agg_r1, agg_r2, last_connection = \ + self._aggregate(num_signers) + + output_c, output_x, tweak, q_even = mc.taproot_output_key(aggpk_x) + our_spk = b"\x51\x20" + output_x + address = mc.encode_taproot_address(output_x, network["hrp"]) + print(f"\n Source Taproot address: {address}") + + # ---- gather UTXOs (host / network) ---- + try: + utxos = self._get_utxos(network["api"], address) + except requests.exceptions.RequestException as error: + last_connection.disconnect() + print(f" Could not reach the blockchain API: {error}") + return -1 + + if not utxos: + last_connection.disconnect() + print("\n No UTXOs found. Fund the address above and try again.") + return -1 + + try: + inputs, outputs, amounts, fee = self._build_outputs( + utxos, destination, our_spk, network["api"]) + except ValueError as error: + last_connection.disconnect() + print(f" {error}") + return -1 + + sighash = mc.compute_sighash_taproot(2, 0, inputs, outputs, 0, amounts, [our_spk]) + print(f" Sighash: {sighash.hex()}") + print(f" Fee: {fee} sats") + + if not self._confirm(network_name): + last_connection.disconnect() + print(" Aborted.") + return -1 + + # ---- card-side partial signatures over the Taproot sighash ---- + p_even = mc.has_even_y(aggpk_c) + card_aggpk = output_c if p_even else mc.negate_compressed(output_c) + sign_data = mc.point_to_uncompressed(agg_r1) + mc.point_to_uncompressed(agg_r2) \ + + card_aggpk + sighash + assert len(sign_data) == _SIGN_DATA_LENGTH + + partial_sigs = self._collect_partial_signatures( + num_signers, sorted_pks, sign_data, last_connection) + + # ---- aggregate + apply the Taproot tweak ---- + s_agg = self._sum_partials(partial_sigs) + nonce_x = mc.effective_nonce_x(agg_r1, agg_r2, mc.xonly(card_aggpk), sighash) + challenge = mc.tagged_hash("BIP0340/challenge", nonce_x + output_x + sighash) + e = int.from_bytes(challenge, "big") % mc.N + if q_even: + s_final = (s_agg + e * tweak) % mc.N + else: + s_final = (s_agg - e * tweak) % mc.N + signature = nonce_x + s_final.to_bytes(32, "big") + + if not mc.schnorr_verify(output_x, sighash, signature): + print("\n ERROR: aggregate signature failed local verification, not broadcasting.") + return -1 + print("\n BIP-340 Schnorr verification: PASS") + + signed_tx = mc.build_signed_tx(2, 0, inputs, outputs, signature) + tx_hex = signed_tx.hex() + + try: + txid = self._broadcast(network["api"], tx_hex) + except requests.exceptions.RequestException as error: + print(f" Broadcast error: {error}") + print(f" Raw transaction: {tx_hex}") + return -1 + + print(f"\n SUCCESS! Transaction id: {txid}") + return 0 + + # ------------------------------------------------------------------ + # Transaction construction (single input, sweep or amount + change) + # ------------------------------------------------------------------ + def _build_outputs(self, utxos, destination, our_spk, api): + fee_rate = self._get_fee_rate(api) + dest_spk = mc.decode_address_to_scriptpubkey(destination) + + # Spend the first UTXO only (keeps the example simple and deterministic). + first = utxos[0] + if len(utxos) > 1: + print(f" NOTE: {len(utxos)} UTXOs found, using only the first one.") + inputs = [(first["txid"], first["vout"], 0xfffffffd)] + amounts = [first["value"]] + input_total = first["value"] + + amount = getattr(self.data, "amount", None) + if amount is not None: + num_outputs = 2 + est_vsize = 10.5 + 58 + 43 * num_outputs + fee = int(est_vsize * fee_rate) + 10 + send_amount = int(amount) + if send_amount <= _DUST_LIMIT: + raise ValueError(f"Amount too small (dust limit is {_DUST_LIMIT} sats).") + change = input_total - send_amount - fee + if change < 0: + raise ValueError( + f"Not enough funds: need {send_amount + fee}, have {input_total}.") + outputs = [(send_amount, dest_spk)] + if change > _DUST_LIMIT: + outputs.append((change, our_spk)) + print(f" Send: {send_amount} sats") + print(f" Change: {change} sats") + else: + fee += change + print(f" Send: {send_amount} sats (dust change {change} added to fee)") + else: + num_outputs = 1 + est_vsize = 10.5 + 58 + 43 * num_outputs + fee = int(est_vsize * fee_rate) + 10 + send_amount = input_total - fee + if send_amount <= _DUST_LIMIT: + raise ValueError( + f"Not enough funds: balance {input_total}, fee {fee}.") + outputs = [(send_amount, dest_spk)] + print(f" Sweep: {send_amount} sats (entire balance minus fee)") + + return inputs, outputs, amounts, fee + + @staticmethod + def _confirm(network_name: str) -> bool: + prompt = "\n Broadcast this transaction" + if network_name == "mainnet": + prompt += " on MAINNET (real funds)" + answer = input(prompt + "? [y/N]: ").strip().lower() + return answer in ("y", "yes") + + # ------------------------------------------------------------------ + # Blockchain API (mempool.space) + # ------------------------------------------------------------------ + @staticmethod + def _get_utxos(api: str, address: str): + response = requests.get(f"{api}/address/{address}/utxo", timeout=30) + response.raise_for_status() + return response.json() + + @staticmethod + def _get_fee_rate(api: str) -> int: + try: + response = requests.get(f"{api}/v1/fees/recommended", timeout=30) + response.raise_for_status() + return response.json().get("halfHourFee", 2) + except requests.exceptions.RequestException: + return 2 + + @staticmethod + def _broadcast(api: str, tx_hex: str) -> str: + response = requests.post(f"{api}/tx", data=tx_hex, timeout=30) + if response.status_code == 200: + return response.text + raise requests.exceptions.HTTPError( + f"broadcast failed ({response.status_code}): {response.text}") diff --git a/cryptnox_cli/command/musig2_crypto.py b/cryptnox_cli/command/musig2_crypto.py new file mode 100644 index 0000000..cce829a --- /dev/null +++ b/cryptnox_cli/command/musig2_crypto.py @@ -0,0 +1,398 @@ +# -*- coding: utf-8 -*- +""" +Host-side cryptography for MuSig2 (BIP-327) signing with Cryptnox Basic G2 cards. + +The Cryptnox Basic G2 card performs the secret-dependent part of a MuSig2 +signature (nonce generation and the partial signature) inside the secure +element. Everything that does not require the private key -- public-key and +nonce aggregation (BIP-327), the BIP-340 Schnorr challenge, the BIP-341 Taproot +tweak and address derivation, transaction serialisation and signature +verification -- is done here on the host. + +The exact algorithm and the byte layout of the data exchanged with the card +mirror the reference scripts in the ``cryptnox-cli-musig2`` repository (the +"oracle"), which were validated against the card firmware. This module +deliberately reproduces that math, including the places where it differs from a +strict BIP-327 implementation, so the host stays compatible with the applet. + +All elliptic-curve operations reuse the secp256k1 primitives that already ship +with the CLI (``cryptnox_cli.lib.cryptos.main``) so no extra dependency such as +``coincurve`` is required. +""" + +import hashlib +import struct + +try: + from lib import cryptos +except ImportError: + from ..lib import cryptos + +# secp256k1 group order, field prime and generator (from the bundled cryptos lib) +N = cryptos.N +P = cryptos.P +G = cryptos.G + + +# ============================================================ +# Tagged hash (BIP340 / BIP341 / BIP327) +# ============================================================ + +def tagged_hash(tag: str, msg: bytes) -> bytes: + """Return the BIP-340 tagged hash ``SHA256(SHA256(tag)||SHA256(tag)||msg)``.""" + tag_hash = hashlib.sha256(tag.encode()).digest() + return hashlib.sha256(tag_hash + tag_hash + msg).digest() + + +# ============================================================ +# EC point helpers -- points are (x, y) integer tuples +# ============================================================ + +def point_from_bytes(data: bytes): + """Decode a compressed (33B), uncompressed (65B) or raw (64B) public key.""" + return cryptos.decode_pubkey(data) + + +def point_to_compressed(point) -> bytes: + """Encode an (x, y) point as a 33-byte compressed public key.""" + return cryptos.encode_pubkey(point, "bin_compressed") + + +def point_to_uncompressed(point) -> bytes: + """Encode an (x, y) point as a 65-byte uncompressed public key.""" + return cryptos.encode_pubkey(point, "bin") + + +def point_add(point_a, point_b): + """Add two secp256k1 points.""" + return cryptos.fast_add(point_a, point_b) + + +def point_mul(point, scalar: int): + """Multiply a secp256k1 point by a scalar.""" + return cryptos.fast_multiply(point, scalar) + + +def point_add_bytes(points): + """Add a list of public keys given as bytes, returning an (x, y) point.""" + decoded = [point_from_bytes(p) for p in points] + acc = decoded[0] + for point in decoded[1:]: + acc = cryptos.fast_add(acc, point) + return acc + + +def point_mul_bytes(pk_bytes: bytes, scalar: int): + """Multiply the public key ``pk_bytes`` by ``scalar``, returning an (x, y) point.""" + return cryptos.fast_multiply(point_from_bytes(pk_bytes), scalar) + + +def compressed_pubkey(card_response: bytes) -> bytes: + """Normalise a card public-key response to 33-byte compressed form.""" + return point_to_compressed(point_from_bytes(card_response)) + + +def xonly(compressed: bytes) -> bytes: + """Return the x-only (32-byte) part of a compressed public key.""" + return compressed[1:33] + + +def has_even_y(compressed: bytes) -> bool: + """True when the compressed public key encodes an even-y point (prefix 0x02).""" + return compressed[0] == 0x02 + + +def lift_x(x_bytes: bytes) -> bytes: + """Return the compressed even-y public key with the given x coordinate.""" + return b"\x02" + x_bytes + + +def negate_compressed(pk33: bytes) -> bytes: + """Negate a point by flipping the parity byte of its compressed encoding.""" + prefix = b"\x03" if pk33[0] == 0x02 else b"\x02" + return prefix + pk33[1:] + + +# ============================================================ +# BIP327 key and nonce aggregation +# ============================================================ + +def keyagg_coeff(key_list_hash: bytes, pk: bytes) -> int: + """KeyAgg coefficient for ``pk`` given the tagged hash of the key list.""" + digest = tagged_hash("KeyAgg coefficient", key_list_hash + pk) + return int.from_bytes(digest, "big") % N + + +def aggregate_pubkey(sorted_pks): + """ + Aggregate the (already sorted) list of 33-byte compressed public keys. + + :return: tuple of (compressed aggregate key, x-only aggregate key) + """ + key_list_hash = tagged_hash("KeyAgg list", b"".join(sorted_pks)) + acc = None + for pk in sorted_pks: + coeff = keyagg_coeff(key_list_hash, pk) + term = point_mul_bytes(pk, coeff) + acc = term if acc is None else point_add(acc, term) + compressed = point_to_compressed(acc) + return compressed, xonly(compressed) + + +def nonce_coeff(aggnonce: bytes, aggpubkey_x: bytes, msg: bytes) -> int: + """BIP-327 nonce coefficient ``b``.""" + digest = tagged_hash("MuSig/noncecoef", aggnonce + aggpubkey_x + msg) + return int.from_bytes(digest, "big") % N + + +def aggregate_nonces(nonces_r1, nonces_r2): + """ + Aggregate per-card nonce points. + + :param nonces_r1: list of 33-byte first-round nonce points + :param nonces_r2: list of 33-byte second-round nonce points + :return: tuple (aggR1_point, aggR2_point) + """ + return point_add_bytes(nonces_r1), point_add_bytes(nonces_r2) + + +def effective_nonce_x(agg_r1, agg_r2, aggpubkey_x: bytes, msg: bytes) -> bytes: + """ + Compute the x-only final nonce ``R = aggR1 + b*aggR2``. + + :param agg_r1: aggregate first-round nonce point + :param agg_r2: aggregate second-round nonce point + :param aggpubkey_x: x-only aggregate public key used in the challenge + :param msg: 32-byte message being signed + :return: 32-byte x coordinate of the final nonce R + """ + agg_r1_c = point_to_compressed(agg_r1) + agg_r2_c = point_to_compressed(agg_r2) + coeff = nonce_coeff(agg_r1_c + agg_r2_c, aggpubkey_x, msg) + b_r2 = point_mul(agg_r2, coeff) + final_r = point_add(agg_r1, b_r2) + return xonly(point_to_compressed(final_r)) + + +# ============================================================ +# BIP340 Schnorr verification (host side, no card) +# ============================================================ + +def _lift_x_point(x_int: int): + """Lift an x coordinate to the even-y curve point, or None if invalid.""" + if x_int >= P: + return None + y_sq = (pow(x_int, 3, P) + 7) % P + y = pow(y_sq, (P + 1) // 4, P) + if pow(y, 2, P) != y_sq: + return None + if y % 2 != 0: + y = P - y + return (x_int, y) + + +def schnorr_verify(pubkey_x: bytes, msg: bytes, signature: bytes) -> bool: + """Verify a 64-byte BIP-340 Schnorr signature against an x-only public key.""" + if len(signature) != 64 or len(pubkey_x) != 32: + return False + + point = _lift_x_point(int.from_bytes(pubkey_x, "big")) + if point is None: + return False + + r = int.from_bytes(signature[:32], "big") + s = int.from_bytes(signature[32:], "big") + if r >= P or s >= N: + return False + + challenge = tagged_hash("BIP0340/challenge", signature[:32] + pubkey_x + msg) + e = int.from_bytes(challenge, "big") % N + + # R = s*G - e*P + s_g = point_mul(G, s) + e_p = point_mul(point, e) + neg_e_p = (e_p[0], (P - e_p[1]) % P) + final_r = point_add(s_g, neg_e_p) + + if final_r[0] == 0 and final_r[1] == 0: + return False + if final_r[1] % 2 != 0: + return False + return final_r[0] == r + + +# ============================================================ +# BIP341 Taproot +# ============================================================ + +def taproot_tweak(internal_key_xonly: bytes) -> int: + """Return the Taproot tweak ``t`` for a key-path-only output.""" + digest = tagged_hash("TapTweak", internal_key_xonly) + return int.from_bytes(digest, "big") % N + + +def taproot_output_key(internal_key_xonly: bytes): + """ + Derive the Taproot output key from the x-only internal (aggregate) key. + + :return: tuple (Q_compressed, Q_xonly, tweak, q_has_even_y) + """ + tweak = taproot_tweak(internal_key_xonly) + internal_point = point_from_bytes(lift_x(internal_key_xonly)) + tweak_point = point_mul(G, tweak) + output_point = point_add(internal_point, tweak_point) + output_compressed = point_to_compressed(output_point) + return output_compressed, xonly(output_compressed), tweak, has_even_y(output_compressed) + + +# ============================================================ +# Bech32 / Bech32m (BIP173 / BIP350) for SegWit & Taproot addresses +# ============================================================ + +_BECH32M_CONST = 0x2bc830a3 +_CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" + + +def _bech32_polymod(values): + generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] + chk = 1 + for value in values: + top = chk >> 25 + chk = ((chk & 0x1ffffff) << 5) ^ value + for i in range(5): + chk ^= generator[i] if ((top >> i) & 1) else 0 + return chk + + +def _bech32_hrp_expand(hrp): + return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] + + +def _bech32_create_checksum(hrp, data, const): + values = _bech32_hrp_expand(hrp) + data + polymod = _bech32_polymod(values + [0] * 6) ^ const + return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] + + +def _convertbits(data, frombits, tobits, pad=True): + acc = 0 + bits = 0 + ret = [] + maxv = (1 << tobits) - 1 + for value in data: + acc = (acc << frombits) | value + bits += frombits + while bits >= tobits: + bits -= tobits + ret.append((acc >> bits) & maxv) + if pad and bits: + ret.append((acc << (tobits - bits)) & maxv) + return ret + + +def encode_segwit_address(witness_version: int, witness_program: bytes, hrp: str) -> str: + """Encode a SegWit address (bech32 for v0, bech32m for v1+).""" + const = 1 if witness_version == 0 else _BECH32M_CONST + data = [witness_version] + _convertbits(list(witness_program), 8, 5) + checksum = _bech32_create_checksum(hrp, data, const) + return hrp + "1" + "".join(_CHARSET[d] for d in data + checksum) + + +def encode_taproot_address(witness_program: bytes, hrp: str) -> str: + """Encode a Taproot (witness v1, bech32m) address.""" + return encode_segwit_address(1, witness_program, hrp) + + +def _bech32_data(addr: str, hrp: str): + return [_CHARSET.index(c) for c in addr[len(hrp) + 1:]][:-6] + + +def decode_address_to_scriptpubkey(addr: str) -> bytes: + """Convert a SegWit v0 (P2WPKH) or v1 (P2TR) address to its scriptPubKey.""" + lowered = addr.lower() + for hrp in ("bc", "tb"): + if lowered.startswith(hrp + "1q"): + data = _bech32_data(addr, hrp) + witness = bytes(_convertbits(data[1:], 5, 8, False)) + return b"\x00\x14" + witness + if lowered.startswith(hrp + "1p"): + data = _bech32_data(addr, hrp) + witness = bytes(_convertbits(data[1:], 5, 8, False)) + return b"\x51\x20" + witness + raise ValueError(f"Unsupported address: {addr}") + + +# ============================================================ +# Transaction building (BIP341 key-path spend) +# ============================================================ + +def ser_compact_size(value: int) -> bytes: + if value < 0xfd: + return struct.pack(" bytes: + return bytes.fromhex(txid_hex)[::-1] + struct.pack(" bytes: + return struct.pack(" bytes: + """Compute the BIP-341 key-path (SIGHASH_DEFAULT) sighash for one input.""" + prevouts = b"".join(ser_outpoint(txid, vout) for txid, vout, _ in inputs) + hash_prevouts = hashlib.sha256(prevouts).digest() + + amounts_ser = b"".join(struct.pack(" bytes: + """Serialise a single-input Taproot key-path spend with the given witness signature.""" + tx = b"" + tx += struct.pack(" str: add_config_sub_parser(action_sub_parser, "Bitcoin") +def _musig2_options(subparsers, interactive_mode): + def _validate_bech32_address(address: str) -> str: + if re.match('^(bc1|tb1)[a-zA-HJ-NP-Z0-9]{25,90}$', address): + return address + raise argparse.ArgumentTypeError("Not a valid bech32/bech32m BTC address") + + def _add_signers(sub_parser): + sub_parser.add_argument("signers", type=int, help="Number of signer cards") + sub_parser.add_argument("-r", "--reader", type=int, default=0, + help="Reader index to use (default 0)") + + def _add_network(sub_parser): + sub_parser.add_argument("-n", "--network", choices=["testnet", "mainnet"], + default="testnet", help="Bitcoin network (default testnet)") + + musig2_parser = subparsers.add_parser( + enums.Command.MUSIG2.value, + help="MuSig2 (BIP-327) multi-signature operations on Basic G2 cards") + if interactive_mode: + add_pin_option(musig2_parser) + + action_sub_parser = musig2_parser.add_subparsers(dest="action", required=True) + + address_parser = action_sub_parser.add_parser( + "address", help="Derive the shared MuSig2 Taproot address") + _add_signers(address_parser) + _add_network(address_parser) + + sign_parser = action_sub_parser.add_parser( + "sign", help="Produce an aggregate MuSig2 Schnorr signature over a message") + _add_signers(sign_parser) + message_group = sign_parser.add_mutually_exclusive_group(required=True) + message_group.add_argument("-m", "--message", help="32-byte message hash (64 hex chars)") + message_group.add_argument("-t", "--text", help="Text to hash with SHA-256 and sign") + + send_parser = action_sub_parser.add_parser( + "send", help="Build, sign and broadcast a Taproot transaction") + _add_signers(send_parser) + send_parser.add_argument("address", type=_validate_bech32_address, + help="Destination address") + send_parser.add_argument("-a", "--amount", type=int, + help="Amount in satoshis (default: sweep entire balance)") + _add_network(send_parser) + + def _card_configuration(subparsers, interactive_mode): sub_parser = subparsers.add_parser(enums.Command.CARD_CONFIGURATION.value, help="Show card configuration and set PIN-less path, " diff --git a/cryptnox_cli/command/seed.py b/cryptnox_cli/command/seed.py index 2f34f07..a74f110 100644 --- a/cryptnox_cli/command/seed.py +++ b/cryptnox_cli/command/seed.py @@ -96,6 +96,7 @@ def _dual_seed(self, card: cryptnox_sdk_py.Card) -> int: print(error) return -1 except cryptnox_sdk_py.exceptions.DataValidationException: + # Expected when probing dual-seed support without a PIN; safe to proceed pass print("Dual seed generation process starting...") diff --git a/cryptnox_cli/command/transfer.py b/cryptnox_cli/command/transfer.py index e0abac1..2859fb2 100644 --- a/cryptnox_cli/command/transfer.py +++ b/cryptnox_cli/command/transfer.py @@ -10,10 +10,8 @@ try: import enums - from wallet import eth as wallet # noqa: F401 except ImportError: from .. import enums - from ..wallet import eth as wallet # noqa: F401 class Transfer(Command): diff --git a/cryptnox_cli/command/user_keys/authentication.py b/cryptnox_cli/command/user_keys/authentication.py index 3caa2f4..6309895 100644 --- a/cryptnox_cli/command/user_keys/authentication.py +++ b/cryptnox_cli/command/user_keys/authentication.py @@ -21,6 +21,7 @@ try: importlib.import_module("." + submodule, package=__package__) except Exception: + # Optional submodule unavailable in frozen build; skip it pass else: # When running normally, dynamically discover submodules @@ -155,6 +156,7 @@ def delete(name: str, card: cryptnox_sdk_py.Card, puk: str) -> bool: try: user_key.delete() except user_key_base.UserKeyException: + # Best-effort local key removal; proceed to delete it from the card pass card.user_key_delete(user_key.slot_index, puk) diff --git a/cryptnox_cli/command/user_keys/hello/windows_hello.py b/cryptnox_cli/command/user_keys/hello/windows_hello.py index dac23c6..b1d9e22 100644 --- a/cryptnox_cli/command/user_keys/hello/windows_hello.py +++ b/cryptnox_cli/command/user_keys/hello/windows_hello.py @@ -95,6 +95,7 @@ async def _get_user_credentials(account_id: str) -> KeyCredential: return key_result.credential _error_handle(key_result.status) + return None async def _public_key(name: str) -> bytearray: diff --git a/cryptnox_cli/command/user_keys/piv/piv_card.py b/cryptnox_cli/command/user_keys/piv/piv_card.py index 0206438..596d6a0 100644 --- a/cryptnox_cli/command/user_keys/piv/piv_card.py +++ b/cryptnox_cli/command/user_keys/piv/piv_card.py @@ -220,6 +220,7 @@ def is_locked(self, pin_bank: int) -> bool: try: self.verify_pin(pin_bank, "") except PinException: + # Empty PIN rejected as expected; card is not locked pass except PIVCardException as error: if error.sw_code == 0x6983: diff --git a/cryptnox_cli/enums.py b/cryptnox_cli/enums.py index 183f35d..3610200 100644 --- a/cryptnox_cli/enums.py +++ b/cryptnox_cli/enums.py @@ -35,3 +35,4 @@ class Command(Enum): USER_KEY = "user_key" TRANSFER = "transfer" CERTIFICATE = "cert" + MUSIG2 = "musig2" diff --git a/cryptnox_cli/interactive_cli.py b/cryptnox_cli/interactive_cli.py index 2803263..fdfdabe 100644 --- a/cryptnox_cli/interactive_cli.py +++ b/cryptnox_cli/interactive_cli.py @@ -219,6 +219,7 @@ def run(self) -> int: self._cards.refresh(self.port and client is not None) self._card_info = list(self._cards.values())[0].info except IndexError: + # No cards found yet; leave _card_info unset and continue pass self._cards.print_card_list(show_warnings=True, print_with_one_card=True) @@ -294,6 +295,7 @@ def _process_command(self): if execute[0] not in ["use", "exit", "back"]: execute[0:0] = self.subcommand except LookupError: + # Empty command; nothing to prepend, ignore pass else: self._prepare_parser() @@ -312,6 +314,7 @@ def _process_command(self): try: self.parser.parse_args(execute) except SystemExit: + # argparse exits on help/error; keep the interactive loop alive pass UsageParser.throw_error = True else: @@ -399,6 +402,7 @@ def _run_command(self, args: argparse.Namespace, to_always_run: List = None) -> try: self._card_info = self._cards[command.serial_number].info except KeyError: + # Card no longer present; keep previous _card_info pass except (cryptnox_sdk_py.exceptions.CryptnoxException, ExitException, TimeoutException) as error: print(error) diff --git a/cryptnox_cli/lib/cryptos/deterministic.py b/cryptnox_cli/lib/cryptos/deterministic.py index 8a23c84..7045d69 100644 --- a/cryptnox_cli/lib/cryptos/deterministic.py +++ b/cryptnox_cli/lib/cryptos/deterministic.py @@ -224,6 +224,8 @@ def bip32_descend(*args, prefixes=DEFAULT): path = parse_bip32_path(args[1]) elif len(args): key, path = args[0], list(map(int, args[1:])) + else: + raise TypeError("bip32_descend() requires at least one positional argument") for p in path: key = bip32_ckd(key, p, prefixes) return bip32_extract_key(key, prefixes) diff --git a/cryptnox_cli/lib/cryptos/keystore.py b/cryptnox_cli/lib/cryptos/keystore.py index abdd82e..781aada 100644 --- a/cryptnox_cli/lib/cryptos/keystore.py +++ b/cryptnox_cli/lib/cryptos/keystore.py @@ -166,6 +166,7 @@ def get_pubkey_derivation(self, x_pubkey): addr = self.coin.p2sh_scriptaddr(x_pubkey[2:]) if addr in self.addresses: return self.addresses[addr].get('pubkey') + return None def update_password(self, old_password, new_password): self.check_password(old_password) @@ -238,7 +239,7 @@ def derive_pubkey(self, for_change, n): return self.get_pubkey_from_xpub(xpub, (n,), self.bip39_prefixes) @classmethod - def get_pubkey_from_xpub(self, xpub, sequence, bip39_prefixes): + def get_pubkey_from_xpub(cls, xpub, sequence, bip39_prefixes): return bip32_derive_key(xpub, sequence, bip39_prefixes) """needed? @@ -352,7 +353,7 @@ class Hardware_KeyStore(KeyStore, Xpub): max_change_outputs = 1 def __init__(self, d, coin): - Xpub.__init__(self, coin) + Xpub.__init__(self) KeyStore.__init__(self, coin) # Errors and other user interaction is done through the wallet's # handler. The handler is per-window and preserved across @@ -483,7 +484,7 @@ def xpubkey_to_address(x_pubkey, coin): pubkey = x_pubkey elif x_pubkey[0:2] == 'ff': xpub, s = BIP32_KeyStore.parse_xpubkey(x_pubkey) - pubkey = BIP32_KeyStore.get_pubkey_from_xpub(xpub, s) + pubkey = BIP32_KeyStore.get_pubkey_from_xpub(xpub, s, coin.bip39_prefixes) else: raise BaseException("Cannot parse pubkey") address = coin.pubtoaddr(pubkey) @@ -516,11 +517,12 @@ def get_private_keys(text): parts = list(filter(bool, parts)) if bool(parts) and all(bitcoin.is_private_key(x) for x in parts): return parts + return None def is_private_key_list(text): return bool(get_private_keys(text)) -is_mpk = lambda x: is_xpub(x) +is_mpk = is_xpub is_private = lambda x: is_seed(x) or is_xprv(x) or is_private_key_list(x) is_master_key = lambda x: is_xprv(x) or is_xpub(x) is_private_key = lambda x: is_xprv(x) or is_private_key_list(x) @@ -568,7 +570,7 @@ def from_master_key(text, coin): if is_xprv(text, prefixes): k = from_xprv(text, coin) elif is_xpub(text, prefixes): - k = from_xpub(text, coin) + k = from_xpub(text, coin, 'p2pkh') else: raise BaseException('Invalid key') return k diff --git a/cryptnox_cli/lib/cryptos/main.py b/cryptnox_cli/lib/cryptos/main.py index 8e9a584..d82da9f 100644 --- a/cryptnox_cli/lib/cryptos/main.py +++ b/cryptnox_cli/lib/cryptos/main.py @@ -147,7 +147,7 @@ def jacobian_multiply(a, n): return jacobian_multiply(a, n % N) if (n % 2) == 0: return jacobian_double(jacobian_multiply(a, n // 2)) - if (n % 2) == 1: + else: return jacobian_add(jacobian_double(jacobian_multiply(a, n // 2)), a) @@ -333,6 +333,7 @@ def compress(pubkey): return encode_pubkey(decode_pubkey(pubkey, f), 'bin_compressed') elif f == 'hex' or f == 'decimal': return encode_pubkey(decode_pubkey(pubkey, f), 'hex_compressed') + return None def decompress(pubkey): @@ -343,6 +344,7 @@ def decompress(pubkey): return encode_pubkey(decode_pubkey(pubkey, f), 'bin') elif f == 'hex_compressed' or f == 'decimal': return encode_pubkey(decode_pubkey(pubkey, f), 'hex') + return None def privkey_to_pubkey(privkey): @@ -398,7 +400,9 @@ def bin_hash160(string): digest = '' try: digest = hashlib.new('ripemd160', intermed).digest() - except: + except Exception: + # 'ripemd160' may be unavailable (e.g. OpenSSL 3 legacy provider off); + # fall back to the bundled pure-Python implementation. digest = RIPEMD160(intermed).digest() return digest @@ -423,7 +427,9 @@ def sha256(string): def bin_ripemd160(string): try: digest = hashlib.new('ripemd160', string).digest() - except: + except Exception: + # 'ripemd160' may be unavailable (e.g. OpenSSL 3 legacy provider off); + # fall back to the bundled pure-Python implementation. digest = RIPEMD160(string).digest() return digest @@ -541,7 +547,7 @@ def is_privkey(priv): try: get_privkey_format(priv) return True - except: + except Exception: return False @@ -549,7 +555,7 @@ def is_pubkey(pubkey): try: get_pubkey_format(pubkey) return True - except: + except Exception: return False @@ -677,8 +683,9 @@ def subtract(p1, p2): def magicbyte_to_prefix(magicbyte): + # Always return a 2-tuple of the low/high address prefix characters. + # Callers only test membership over these, so a repeated character when + # first == last is harmless and keeps the return shape consistent. first = bin_to_b58check(hash160Low, magicbyte=magicbyte)[0] last = bin_to_b58check(hash160High, magicbyte=magicbyte)[0] - if first == last: - return (first,) return (first, last) diff --git a/cryptnox_cli/lib/cryptos/mnemonic.py b/cryptnox_cli/lib/cryptos/mnemonic.py index e773055..acf784b 100644 --- a/cryptnox_cli/lib/cryptos/mnemonic.py +++ b/cryptnox_cli/lib/cryptos/mnemonic.py @@ -16,8 +16,8 @@ from .specials import * from .wallet_utils import is_new_seed -wordlist_english = [word.strip() for word in - list(open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'english.txt'), 'r'))] +with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'english.txt'), 'r') as _wordlist_file: + wordlist_english = [word.strip() for word in _wordlist_file] ELECTRUM_VERSION = '3.0.5' # version of the client package PROTOCOL_VERSION = '1.1' # protocol version requested @@ -226,6 +226,7 @@ def seed_prefix(seed_type): return SEED_PREFIX_SW elif seed_type == '2fa': return SEED_PREFIX_2FA + return None def seed_type(x): diff --git a/cryptnox_cli/lib/cryptos/ripemd.py b/cryptnox_cli/lib/cryptos/ripemd.py index 86c0b6b..8415a3c 100644 --- a/cryptnox_cli/lib/cryptos/ripemd.py +++ b/cryptnox_cli/lib/cryptos/ripemd.py @@ -52,7 +52,8 @@ try: range = xrange -except: +except NameError: + # Python 3: xrange does not exist; the built-in range is already lazy. pass class RIPEMD160: @@ -164,7 +165,6 @@ def R(a, b, c, d, e, Fj, Kj, sj, rj, X): import struct def RMD160Transform(state, block): #uint32 state[5], uchar block[64] - x = [0]*16 if sys.byteorder == 'little': if is_python2: x = struct.unpack('<16L', ''.join([chr(x) for x in block[0:64]])) @@ -369,8 +369,6 @@ def RMD160Transform(state, block): #uint32 state[5], uchar block[64] state[4] = (state[0] + bb + c) % 0x100000000; state[0] = t % 0x100000000; - pass - def RMD160Update(ctx, inp, inplen): if type(inp) == str: diff --git a/cryptnox_cli/lib/cryptos/stealth.py b/cryptnox_cli/lib/cryptos/stealth.py index ca769d7..6a6c0c2 100644 --- a/cryptnox_cli/lib/cryptos/stealth.py +++ b/cryptnox_cli/lib/cryptos/stealth.py @@ -83,6 +83,9 @@ def mk_stealth_tx_outputs(stealth_addr, value, ephem_privkey, nonce, network='bt raise Exception('Invalid testnet stealth address: ' + stealth_addr) magic_byte_addr = 111 + else: + raise Exception('Unknown network: ' + str(network)) + ephem_pubkey = main.privkey_to_pubkey(ephem_privkey) output0 = {'script': mk_stealth_metadata_script(ephem_pubkey, nonce), 'value': 0} diff --git a/cryptnox_cli/lib/cryptos/transaction.py b/cryptnox_cli/lib/cryptos/transaction.py index 35da75d..e442ad2 100644 --- a/cryptnox_cli/lib/cryptos/transaction.py +++ b/cryptnox_cli/lib/cryptos/transaction.py @@ -78,7 +78,7 @@ def deserialize(tx): if isinstance(tx, str) and re.match('^[0-9a-fA-F]*$', tx): # tx = bytes(bytearray.fromhex(tx)) return json_changebase(deserialize(binascii.unhexlify(tx)), - lambda x: safe_hexlify(x)) + safe_hexlify) # http://stackoverflow.com/questions/4851463/python-closure-write-to-variable-in-parent-scope # Python's scoping rules are demented, requiring me to make pos an object # so that it is call-by-reference @@ -148,7 +148,7 @@ def serialize(txobj, include_witness=True): txobj = bytes_to_hex_string(txobj) o = [] if json_is_base(txobj, 16): - json_changedbase = json_changebase(txobj, lambda x: binascii.unhexlify(x)) + json_changedbase = json_changebase(txobj, binascii.unhexlify) hexlified = safe_hexlify(serialize(json_changedbase, include_witness=include_witness)) return hexlified o.append(encode_4_bytes(txobj["version"])) @@ -178,7 +178,7 @@ def uahf_digest(txobj, i): o = [] if json_is_base(txobj, 16): - txobj = json_changebase(txobj, lambda x: binascii.unhexlify(x)) + txobj = json_changebase(txobj, binascii.unhexlify) o.append(encode(txobj["version"], 256, 4)[::-1]) serialized_ins = [] @@ -373,7 +373,7 @@ def p2wpkh_nested_script(pubkey): def deserialize_script(script): if isinstance(script, str) and re.match('^[0-9a-fA-F]*$', script): return json_changebase(deserialize_script(binascii.unhexlify(script)), - lambda x: safe_hexlify(x)) + safe_hexlify) out, pos = [], 0 while pos < len(script): code = from_byte_to_int(script[pos]) @@ -420,13 +420,13 @@ def serialize_script_unit(unit): def serialize_script(script): if json_is_base(script, 16): return binascii.hexlify(serialize_script(json_changebase(script, - lambda x: binascii.unhexlify(x)))) + binascii.unhexlify))) return ''.join(map(serialize_script_unit, script)) else: def serialize_script(script): if json_is_base(script, 16): return safe_hexlify(serialize_script(json_changebase(script, - lambda x: binascii.unhexlify(x)))) + binascii.unhexlify))) result = bytes() for b in map(serialize_script_unit, script): result += b if isinstance(b, bytes) else bytes(b, 'utf-8') @@ -507,6 +507,4 @@ def select(unspent, value): i += 1 if tv < value: raise Exception("Not enough funds") - unspents = low[:i] - actual_value = sum(unspent['value'] for unspent in unspents) return low[:i] diff --git a/cryptnox_cli/lib/cryptos/wallet.py b/cryptnox_cli/lib/cryptos/wallet.py index 3a8e104..a509642 100644 --- a/cryptnox_cli/lib/cryptos/wallet.py +++ b/cryptnox_cli/lib/cryptos/wallet.py @@ -28,7 +28,7 @@ def __init__(self, keystore, num_addresses=0, last_receiving_index=0, last_chang def privkey(self, address, formt="wif_compressed", password=None): if self.is_watching_only: - return + return None try: addr_derivation = self.addresses[address] except KeyError: @@ -39,7 +39,7 @@ def privkey(self, address, formt="wif_compressed", password=None): def export_privkeys(self, password=None): if self.is_watching_only: - return + return None return { 'receiving': {addr: self.privkey(addr, password=password) for addr in self.receiving_addresses}, 'change': {addr: self.privkey(addr, password=password) for addr in self.change_addresses} @@ -58,6 +58,7 @@ def pubtoaddr(self, pubkey): return self.coin.pubtosegwit(pubkey) elif self.keystore.xtype == "p2wpkh-p2sh": return self.coin.pubtop2w(pubkey) + return None def receiving_address(self, index): pubkey = self.pubkey_receiving(index) @@ -149,4 +150,4 @@ def details(self, password=None): 'xchange': (), 'receiving': [self.account(a, password=password) for a in self.receiving_addresses], 'change': [self.account(a, password=password) for a in self.change_addresses] - } \ No newline at end of file + } diff --git a/cryptnox_cli/lib/cryptos/wallet_utils.py b/cryptnox_cli/lib/cryptos/wallet_utils.py index 4e2579e..f046919 100644 --- a/cryptnox_cli/lib/cryptos/wallet_utils.py +++ b/cryptnox_cli/lib/cryptos/wallet_utils.py @@ -78,7 +78,7 @@ def __str__(self): try: from Cryptodome.Cipher import AES -except: +except ImportError: AES = None diff --git a/cryptnox_cli/wallet/btc.py b/cryptnox_cli/wallet/btc.py index 3a060df..11e57ee 100644 --- a/cryptnox_cli/wallet/btc.py +++ b/cryptnox_cli/wallet/btc.py @@ -6,7 +6,6 @@ import json import re import urllib.parse -import urllib.request import requests from enum import Enum from typing import Union, List, Dict @@ -85,13 +84,15 @@ def get_data(self, endpoint: str, params: Dict = None, data: bytes = None) \ if not parsed.hostname or 'blockcypher.com' not in parsed.hostname: raise ValueError("Invalid URL: must be blockcypher.com domain") - req = urllib.request.Request( - full_url, - headers={'User-Agent': 'Mozilla/5.0'}, - data=data - ) - self.web_rsc = urllib.request.urlopen(req, timeout=30) - self.js_res = json.load(self.web_rsc) + # Use 'requests' (validated allow-listed https URL) instead of urllib, + # which would also honour file:// and other local schemes. + headers = {'User-Agent': 'Mozilla/5.0'} + if data is None: + response = requests.get(full_url, headers=headers, timeout=30) + else: + response = requests.post(full_url, headers=headers, data=data, timeout=30) + response.raise_for_status() + self.js_res = response.json() self.web_rsc = None except Exception as ex: print(ex) @@ -237,19 +238,20 @@ def get_data(self, endpoint: str, params: Dict = None, data: bytes = None) \ headers = {'User-Agent': 'Mozilla/5.0'} if self.api_key: headers['x-token'] = self.api_key - req = urllib.request.Request( - full_url, - headers=headers, - data=data - ) - self.web_rsc = urllib.request.urlopen(req, timeout=30) - b_rep = self.web_rsc.read() + # Use 'requests' (validated allow-listed https URL) instead of urllib, + # which would also honour file:// and other local schemes. + if data is None: + response = requests.get(full_url, headers=headers, timeout=30) + else: + response = requests.post(full_url, headers=headers, data=data, timeout=30) + response.raise_for_status() + b_rep = response.content if len(b_rep) == 64 and b_rep[0] != ord('{'): b_rep = b'{"txid":"' + b_rep + b'"}' self.js_res = json.loads(b_rep) - except urllib.error.HTTPError as error: - raise IOError(f"Error while processing request:\n{error.code} - " - f"{error.read().decode('utf8')}") from error + except requests.exceptions.HTTPError as error: + raise IOError(f"Error while processing request:\n{error.response.status_code} - " + f"{error.response.text}") from error except Exception as error: raise IOError(f"Error while processing request:\n{self.url}{endpoint}?params_enc\n" f"{error}") from error diff --git a/cryptnox_cli/wallet/eth/api.py b/cryptnox_cli/wallet/eth/api.py index 631d3ca..6f85483 100644 --- a/cryptnox_cli/wallet/eth/api.py +++ b/cryptnox_cli/wallet/eth/api.py @@ -80,6 +80,7 @@ def transaction_hash(self, transaction: Dict[str, Any], vrs: bool = False): del transaction["maxFeePerGas"] del transaction["maxPriorityFeePerGas"] except KeyError: + # EIP-1559 fee fields absent (legacy transaction); nothing to remove pass unsigned_transaction = serializable_unsigned_transaction_from_dict(transaction) encoded_transaction = encode_transaction(unsigned_transaction, (self._chain_id, 0, 0)) diff --git a/dev-requirements.txt b/dev-requirements.txt index c0d3069..55dc49e 100644 Binary files a/dev-requirements.txt and b/dev-requirements.txt differ diff --git a/requirements.txt b/requirements.txt index 0f54754..a917212 100644 Binary files a/requirements.txt and b/requirements.txt differ