#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (c) 2020-2024 tecnovert # Copyright (c) 2024-2025 The Basicswap developers # Distributed under the MIT software license, see the accompanying # file LICENSE or http://www.opensource.org/licenses/mit-license.php. import base64 import hashlib import json import logging import mmap import os import shutil import sqlite3 import traceback from io import BytesIO from typing import Dict, List, Optional from basicswap.basicswap_util import ( getVoutByAddress, getVoutByScriptPubKey, ) from basicswap.interface.base import Secp256k1Interface from basicswap.util import ( b2i, ensure, i2b, i2h, ) from basicswap.util.extkey import ExtKeyPair from basicswap.util.script import ( SerialiseNumCompact, decodeScriptNum, getCompactSizeLen, getWitnessElementLen, ) from basicswap.util.address import ( b58decode, b58encode, decodeAddress, decodeWif, pubkeyToAddress, toWIF, ) from basicswap.util.crypto import ( hash160, sha256, ) from coincurve.keys import ( PrivateKey, PublicKey, ) from coincurve.types import ffi from coincurve.ecdsaotves import ( ecdsaotves_enc_sign, ecdsaotves_enc_verify, ecdsaotves_dec_sig, ecdsaotves_rec_enc_key, ) from basicswap.contrib.test_framework import segwit_addr from basicswap.contrib.test_framework.descriptors import descsum_create from basicswap.contrib.test_framework.messages import ( COIN, COutPoint, CTransaction, CTxIn, CTxInWitness, CTxOut, ) from basicswap.contrib.test_framework.script import ( CScript, CScriptOp, OP_0, OP_2, OP_CHECKMULTISIG, OP_CHECKSEQUENCEVERIFY, OP_CHECKSIG, OP_DROP, OP_DUP, OP_ELSE, OP_ENDIF, OP_EQUAL, OP_EQUALVERIFY, OP_HASH160, OP_IF, OP_RETURN, SIGHASH_ALL, SegwitV0SignatureHash, ) from basicswap.basicswap_util import TxLockTypes from basicswap.chainparams import Coins from basicswap.rpc import make_rpc_func, openrpc SEQUENCE_LOCKTIME_GRANULARITY = 9 # 512 seconds SEQUENCE_LOCKTIME_TYPE_FLAG = 1 << 22 SEQUENCE_LOCKTIME_MASK = 0x0000FFFF def ensure_op(v, err_string="Bad opcode"): ensure(v, err_string) def findOutput(tx, script_pk: bytes): for i in range(len(tx.vout)): if tx.vout[i].scriptPubKey == script_pk: return i return None def find_vout_for_address_from_txobj(tx_obj, addr: str) -> int: """ Locate the vout index of the given transaction sending to the given address. Raises runtime error exception if not found. """ for i in range(len(tx_obj["vout"])): scriptPubKey = tx_obj["vout"][i]["scriptPubKey"] if "addresses" in scriptPubKey: if any([addr == a for a in scriptPubKey["addresses"]]): return i elif "address" in scriptPubKey: if addr == scriptPubKey["address"]: return i raise RuntimeError( "Vout not found for address: txid={}, addr={}".format(tx_obj["txid"], addr) ) def extractScriptLockScriptValues(script_bytes: bytes) -> (bytes, bytes): script_len = len(script_bytes) ensure(script_len == 71, "Bad script length") o = 0 ensure_op(script_bytes[o] == OP_2) ensure_op(script_bytes[o + 1] == 33) o += 2 pk1 = script_bytes[o : o + 33] o += 33 ensure_op(script_bytes[o] == 33) o += 1 pk2 = script_bytes[o : o + 33] o += 33 ensure_op(script_bytes[o] == OP_2) ensure_op(script_bytes[o + 1] == OP_CHECKMULTISIG) return pk1, pk2 def extractScriptLockRefundScriptValues(script_bytes: bytes): script_len = len(script_bytes) ensure(script_len > 73, "Bad script length") ensure_op(script_bytes[0] == OP_IF) ensure_op(script_bytes[1] == OP_2) ensure_op(script_bytes[2] == 33) pk1 = script_bytes[3 : 3 + 33] ensure_op(script_bytes[36] == 33) pk2 = script_bytes[37 : 37 + 33] ensure_op(script_bytes[70] == OP_2) ensure_op(script_bytes[71] == OP_CHECKMULTISIG) ensure_op(script_bytes[72] == OP_ELSE) o = 73 csv_val, nb = decodeScriptNum(script_bytes, o) o += nb ensure(script_len == o + 5 + 33, "Bad script length") # Fails if script too long ensure_op(script_bytes[o] == OP_CHECKSEQUENCEVERIFY) o += 1 ensure_op(script_bytes[o] == OP_DROP) o += 1 ensure_op(script_bytes[o] == 33) o += 1 pk3 = script_bytes[o : o + 33] o += 33 ensure_op(script_bytes[o] == OP_CHECKSIG) o += 1 ensure_op(script_bytes[o] == OP_ENDIF) return pk1, pk2, csv_val, pk3 class BTCInterface(Secp256k1Interface): @staticmethod def coin_type(): return Coins.BTC @staticmethod def COIN(): return COIN @staticmethod def exp() -> int: return 8 @staticmethod def nbk() -> int: return 32 @staticmethod def nbK() -> int: # No. of bytes requires to encode a public key return 33 @staticmethod def witnessScaleFactor() -> int: return 4 @staticmethod def txVersion() -> int: return 2 @staticmethod def getTxOutputValue(tx) -> int: rv = 0 for output in tx.vout: rv += output.nValue return rv @staticmethod def est_lock_tx_vsize() -> int: return 110 @staticmethod def xmr_swap_a_lock_spend_tx_vsize() -> int: return 147 @staticmethod def xmr_swap_b_lock_spend_tx_vsize() -> int: return 110 @staticmethod def txoType(): return CTxOut @staticmethod def getExpectedSequence(lockType: int, lockVal: int) -> int: ensure(lockVal >= 1, "Bad lockVal") if lockType == TxLockTypes.SEQUENCE_LOCK_BLOCKS: return lockVal if lockType == TxLockTypes.SEQUENCE_LOCK_TIME: secondsLocked = lockVal # Ensure the locked time is never less than lockVal if secondsLocked % (1 << SEQUENCE_LOCKTIME_GRANULARITY) != 0: secondsLocked += 1 << SEQUENCE_LOCKTIME_GRANULARITY secondsLocked >>= SEQUENCE_LOCKTIME_GRANULARITY return secondsLocked | SEQUENCE_LOCKTIME_TYPE_FLAG raise ValueError("Unknown lock type") @staticmethod def decodeSequence(lock_value: int) -> int: # Return the raw value if lock_value & SEQUENCE_LOCKTIME_TYPE_FLAG: return ( lock_value & SEQUENCE_LOCKTIME_MASK ) << SEQUENCE_LOCKTIME_GRANULARITY return lock_value & SEQUENCE_LOCKTIME_MASK @staticmethod def depth_spendable() -> int: return 0 def __init__(self, coin_settings, network, swap_client=None): super().__init__(network) self._rpc_host = coin_settings.get("rpchost", "127.0.0.1") self._rpcport = coin_settings["rpcport"] self._rpcauth = coin_settings["rpcauth"] self.rpc = make_rpc_func(self._rpcport, self._rpcauth, host=self._rpc_host) self._rpc_wallet = coin_settings.get("wallet_name", "wallet.dat") self._rpc_wallet_watch = coin_settings.get( "watch_wallet_name", self._rpc_wallet ) self.rpc_wallet = make_rpc_func( self._rpcport, self._rpcauth, host=self._rpc_host, wallet=self._rpc_wallet ) if self._rpc_wallet_watch == self._rpc_wallet: self.rpc_wallet_watch = self.rpc_wallet else: self.rpc_wallet_watch = make_rpc_func( self._rpcport, self._rpcauth, host=self._rpc_host, wallet=self._rpc_wallet_watch, ) self.blocks_confirmed = coin_settings["blocks_confirmed"] self.setConfTarget(coin_settings["conf_target"]) self._use_segwit = coin_settings["use_segwit"] self._connection_type = coin_settings["connection_type"] self._sc = swap_client self._log = self._sc.log if self._sc and self._sc.log else logging self._expect_seedid_hex = None self._altruistic = coin_settings.get("altruistic", True) self._use_descriptors = coin_settings.get("use_descriptors", False) # Use hardened account indices to match existing wallet keys, only applies when use_descriptors is True self._use_legacy_key_paths = coin_settings.get("use_legacy_key_paths", False) self._disable_lock_tx_rbf = False def open_rpc(self, wallet=None): return openrpc(self._rpcport, self._rpcauth, wallet=wallet, host=self._rpc_host) def json_request(self, rpc_conn, method, params): try: v = rpc_conn.json_request(method, params) r = json.loads(v.decode("utf-8")) except Exception as ex: traceback.print_exc() raise ValueError("RPC Server Error " + str(ex)) if "error" in r and r["error"] is not None: raise ValueError("RPC error " + str(r["error"])) return r["result"] def close_rpc(self, rpc_conn): rpc_conn.close() def checkWallets(self) -> int: wallets = self.rpc("listwallets") if self._rpc_wallet not in wallets: self._log.debug( f"Wallet: {self._rpc_wallet} not active, attempting to load." ) try: self.rpc_wallet( "loadwallet", [ self._rpc_wallet, ], ) wallets = self.rpc("listwallets") except Exception as e: self._log.debug(f'Error loading wallet "self._rpc_wallet": {e}.') # Wallet name is "" for some LTC and PART installs on older cores if self._rpc_wallet not in wallets and len(wallets) > 0: self._log.warning(f"Changing {self.ticker()} wallet name.") for wallet_name in wallets: # Skip over other expected wallets if wallet_name in ("mweb",): continue change_watchonly_wallet: bool = ( self._rpc_wallet_watch == self._rpc_wallet ) self._rpc_wallet = wallet_name self._log.info( f"Switched {self.ticker()} wallet name to {self._rpc_wallet}." ) self.rpc_wallet = make_rpc_func( self._rpcport, self._rpcauth, host=self._rpc_host, wallet=self._rpc_wallet, ) if change_watchonly_wallet: self.rpc_wallet_watch = self.rpc_wallet break return len(wallets) def testDaemonRPC(self, with_wallet=True) -> None: self.rpc_wallet("getwalletinfo" if with_wallet else "getblockchaininfo") def getDaemonVersion(self): if self._core_version is None: self._core_version = self.rpc("getnetworkinfo")["version"] return self._core_version def getBlockchainInfo(self): return self.rpc("getblockchaininfo") def getChainHeight(self) -> int: return self.rpc("getblockcount") def getMempoolTx(self, txid): return self.rpc("getrawtransaction", [txid.hex()]) def getBlockHeaderFromHeight(self, height): block_hash = self.rpc("getblockhash", [height]) return self.rpc("getblockheader", [block_hash]) def getBlockHeader(self, block_hash): return self.rpc("getblockheader", [block_hash]) def getBlockHeaderAt(self, time: int, block_after=False): blockchaininfo = self.rpc("getblockchaininfo") last_block_header = self.rpc( "getblockheader", [blockchaininfo["bestblockhash"]] ) max_tries = 5000 for i in range(max_tries): prev_block_header = self.rpc( "getblockheader", [last_block_header["previousblockhash"]] ) if prev_block_header["time"] <= time: return last_block_header if block_after else prev_block_header last_block_header = prev_block_header raise ValueError(f"Block header not found at time: {time}") def getWalletAccountPath(self) -> str: # Use a bip44 style path, however the seed (derived from the particl mnemonic keychain) can't be turned into a bip39 mnemonic without the matching entropy purpose: int = 84 # native segwit coin_type: int = self.chainparams_network()["bip44"] account: int = 0 return f"{purpose}h/{coin_type}h/{account}h" def initialiseWallet(self, key_bytes: bytes, restore_time: int = -1) -> None: assert len(key_bytes) == 32 self._have_checked_seed = False if self._use_descriptors: self._log.info("Importing descriptors") ek = ExtKeyPair() ek.set_seed(key_bytes) ek_encoded: str = self.encode_secret_extkey(ek.encode_v()) if self._use_legacy_key_paths: # Match keys from legacy wallets (created from sethdseed) desc_external = descsum_create(f"wpkh({ek_encoded}/0h/0h/*h)") desc_internal = descsum_create(f"wpkh({ek_encoded}/0h/1h/*h)") else: # Use a bip44 path so the seed can be exported as a mnemonic path: str = self.getWalletAccountPath() desc_external = descsum_create(f"wpkh({ek_encoded}/{path}/0/*)") desc_internal = descsum_create(f"wpkh({ek_encoded}/{path}/1/*)") rv = self.rpc_wallet( "importdescriptors", [ [ {"desc": desc_external, "timestamp": "now", "active": True}, { "desc": desc_internal, "timestamp": "now" if restore_time == -1 else restore_time, "active": True, "internal": True, }, ], ], ) num_successful: int = 0 for entry in rv: if entry.get("success", False) is True: num_successful += 1 if num_successful != 2: self._log.error(f"Failed to import descriptors: {rv}.") raise ValueError("Failed to import descriptors.") else: key_wif = self.encodeKey(key_bytes) try: self.rpc_wallet("sethdseed", [True, key_wif]) except Exception as e: self._log.debug(f"sethdseed failed: {e}") """ # TODO: Find derived key counts if "Already have this key" in str(e): key_id: bytes = self.getSeedHash(key_bytes) self.setActiveKeyChain(key_id) else: """ raise (e) def canExportToElectrum(self) -> bool: # keychains must be unhardened to export into electrum return self._use_descriptors is True and self._use_legacy_key_paths is False def getAccountKey( self, key_bytes: bytes, extkey_prefix: Optional[int] = None, coin_type_overide: Optional[int] = None, ) -> str: # For electrum, must start with zprv to get P2WPKH, addresses # extkey_prefix: 0x04b2430c ek = ExtKeyPair() ek.set_seed(key_bytes) path: str = self.getWalletAccountPath() account_ek = ek.derive_path(path) return self.encode_secret_extkey(account_ek.encode_v(), extkey_prefix) def getWalletKeyChains( self, key_bytes: bytes, extkey_prefix: Optional[int] = None ) -> Dict[str, str]: ek = ExtKeyPair() ek.set_seed(key_bytes) # extkey must contain keydata to derive hardened child keys if self.canExportToElectrum(): path: str = self.getWalletAccountPath() external_extkey = ek.derive_path(f"{path}/0") internal_extkey = ek.derive_path(f"{path}/1") else: # Match keychain paths of legacy wallets external_extkey = ek.derive_path("0h/0h") internal_extkey = ek.derive_path("0h/1h") def encode_extkey(extkey): return self.encode_secret_extkey(extkey.encode_v(), extkey_prefix) rv = { "external": encode_extkey(external_extkey), "internal": encode_extkey(internal_extkey), } return rv def getWalletInfo(self): rv = self.rpc_wallet("getwalletinfo") rv["encrypted"] = "unlocked_until" in rv rv["locked"] = rv.get("unlocked_until", 1) <= 0 rv["locked_utxos"] = len(self.rpc_wallet("listlockunspent")) return rv def getWalletRestoreHeight(self) -> int: if self._use_descriptors: descriptor = self.getActiveDescriptor() if descriptor is None: start_time = 0 else: start_time = descriptor["timestamp"] else: start_time = self.rpc_wallet("getwalletinfo")["keypoololdest"] blockchaininfo = self.getBlockchainInfo() best_block = blockchaininfo["bestblockhash"] chain_synced = round(blockchaininfo["verificationprogress"], 3) if chain_synced < 1.0: raise ValueError(f"{self.coin_name()} chain isn't synced.") self._log.debug(f"Finding block at time: {start_time}") rpc_conn = self.open_rpc() try: block_hash = best_block while True: block_header = self.json_request( rpc_conn, "getblockheader", [block_hash] ) if block_header["time"] < start_time: return block_header["height"] if "previousblockhash" not in block_header: # Genesis block return block_header["height"] block_hash = block_header["previousblockhash"] finally: self.close_rpc(rpc_conn) raise ValueError(f"{self.coin_name()} wallet restore height not found.") def getActiveDescriptor(self): descriptors = self.rpc_wallet("listdescriptors")["descriptors"] for descriptor in descriptors: if ( descriptor["desc"].startswith("wpkh") and descriptor["active"] is True and descriptor["internal"] is False ): return descriptor return None def getWalletSeedID(self) -> str: if self._use_descriptors: descriptor = self.getActiveDescriptor() if descriptor is None: self._log.debug("Could not find active descriptor.") return "Not found" start = descriptor["desc"].find("]") if start < 3: return "Could not parse descriptor" descriptor = descriptor["desc"][start + 1 :] end = descriptor.find("/") if end < 10: return "Could not parse descriptor" extkey = descriptor[:end] extkey_data = b58decode(extkey)[4:-4] extkey_data_hash: bytes = hash160(extkey_data) return extkey_data_hash.hex() wi = self.rpc_wallet("getwalletinfo") return "Not found" if "hdseedid" not in wi else wi["hdseedid"] def checkExpectedSeed(self, expect_seedid: str) -> bool: wallet_seed_id = self.getWalletSeedID() self._expect_seedid_hex = expect_seedid self._have_checked_seed = True return expect_seedid == wallet_seed_id def getNewAddress(self, use_segwit: bool, label: str = "swap_receive") -> str: args = [label] if use_segwit: args.append("bech32") return self.rpc_wallet("getnewaddress", args) def isValidAddress(self, address: str) -> bool: try: rv = self.rpc_wallet("validateaddress", [address]) if rv["isvalid"] is True: return True except Exception as e: # noqa: F841 self._log.debug("validateaddress failed: {}".format(address)) return False def isAddressMine(self, address: str, or_watch_only: bool = False) -> bool: addr_info = self.rpc_wallet("getaddressinfo", [address]) if not or_watch_only: return addr_info["ismine"] if self._use_descriptors: addr_info = self.rpc_wallet_watch("getaddressinfo", [address]) return addr_info["ismine"] or addr_info["iswatchonly"] def checkAddressMine(self, address: str) -> None: addr_info = self.rpc_wallet("getaddressinfo", [address]) ensure(addr_info["ismine"], "ismine is false") if self.sc._restrict_unknown_seed_wallets: ensure( addr_info["hdseedid"] == self._expect_seedid_hex, "unexpected seedid" ) def get_fee_rate(self, conf_target: int = 2) -> (float, str): chain_client_settings = self._sc.getChainClientSettings( self.coin_type() ) # basicswap.json override_feerate = chain_client_settings.get("override_feerate", None) if override_feerate: self._log.debug( f"Fee rate override used for {self.coin_name()}: {override_feerate}" ) return override_feerate, "override_feerate" min_relay_fee = chain_client_settings.get("min_relay_fee", None) def try_get_fee_rate(self, conf_target): try: fee_rate: float = self.rpc_wallet("estimatesmartfee", [conf_target])[ "feerate" ] assert fee_rate > 0.0, "Negative feerate" return fee_rate, "estimatesmartfee" except Exception: try: fee_rate: float = self.rpc_wallet("getwalletinfo")["paytxfee"] assert fee_rate > 0.0, "Non positive feerate" return fee_rate, "paytxfee" except Exception: fee_rate: float = self.rpc("getnetworkinfo")["relayfee"] return fee_rate, "relayfee" fee_rate, rate_src = try_get_fee_rate(self, conf_target) if min_relay_fee and min_relay_fee > fee_rate: self._log.warning( "Feerate {} ({}) is below min relay fee {} for {}".format( self.format_amount(fee_rate, True, 1), rate_src, self.format_amount(min_relay_fee, True, 1), self.coin_name(), ) ) return min_relay_fee, "min_relay_fee" return fee_rate, rate_src def isSegwitAddress(self, address: str) -> bool: return address.startswith(self.chainparams_network()["hrp"] + "1") def decodeAddress(self, address: str) -> bytes: bech32_prefix = self.chainparams_network()["hrp"] if len(bech32_prefix) > 0 and address.startswith(bech32_prefix + "1"): return bytes(segwit_addr.decode(bech32_prefix, address)[1]) return decodeAddress(address)[1:] def pubkey_to_segwit_address(self, pk: bytes) -> str: bech32_prefix = self.chainparams_network()["hrp"] version = 0 pkh = hash160(pk) return segwit_addr.encode(bech32_prefix, version, pkh) def encode_secret_extkey(self, ek_data: bytes, prefix=None) -> str: assert len(ek_data) == 74 if prefix is None: prefix = self.chainparams_network()["ext_secret_key_prefix"] data: bytes = prefix.to_bytes(4, "big") + ek_data checksum = sha256(sha256(data)) return b58encode(data + checksum[0:4]) def encode_public_extkey(self, ek_data: bytes) -> str: assert len(ek_data) == 74 prefix = self.chainparams_network()["ext_public_key_prefix"] data: bytes = prefix.to_bytes(4, "big") + ek_data checksum = sha256(sha256(data)) return b58encode(data + checksum[0:4]) def pkh_to_address(self, pkh: bytes) -> str: # pkh is ripemd160(sha256(pk)) assert len(pkh) == 20 prefix = self.chainparams_network()["pubkey_address"] data = bytes((prefix,)) + pkh checksum = sha256(sha256(data)) return b58encode(data + checksum[0:4]) def sh_to_address(self, sh: bytes) -> str: assert len(sh) == 20 prefix = self.chainparams_network()["script_address"] data = bytes((prefix,)) + sh checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest() return b58encode(data + checksum[0:4]) def encode_p2wsh(self, script: bytes) -> str: bech32_prefix = self.chainparams_network()["hrp"] version = 0 program = script[2:] # strip version and length return segwit_addr.encode(bech32_prefix, version, program) def encodeScriptDest(self, script: bytes) -> str: return self.encode_p2wsh(script) def encode_p2sh(self, script: bytes) -> str: return pubkeyToAddress(self.chainparams_network()["script_address"], script) def pubkey_to_address(self, pk: bytes) -> str: assert len(pk) == 33 return self.pkh_to_address(hash160(pk)) def getAddressHashFromKey(self, key: bytes) -> bytes: pk = self.getPubkey(key) return hash160(pk) def getSeedHash(self, seed: bytes) -> bytes: if self._use_descriptors: ek = ExtKeyPair() ek.set_seed(seed) return hash160(ek.encode_p()) return self.getAddressHashFromKey(seed)[::-1] def encodeKey(self, key_bytes: bytes) -> str: wif_prefix = self.chainparams_network()["key_prefix"] return toWIF(wif_prefix, key_bytes) def encodeSegwitAddress(self, key_hash: bytes) -> str: return segwit_addr.encode(self.chainparams_network()["hrp"], 0, key_hash) def decodeSegwitAddress(self, addr: str) -> bytes: return bytes(segwit_addr.decode(self.chainparams_network()["hrp"], addr)[1]) def decodeKey(self, k: str) -> bytes: return decodeWif(k) def getScriptForPubkeyHash(self, pkh: bytes) -> CScript: # p2wpkh return CScript([OP_0, pkh]) def loadTx(self, tx_bytes: bytes, allow_witness: bool = True) -> CTransaction: # Load tx from bytes to internal representation # Transactions with no inputs require allow_witness set to false to decode correctly tx = CTransaction() tx.deserialize(BytesIO(tx_bytes), allow_witness) return tx def createSCLockTx( self, value: int, script: bytearray, vkbv: bytes = None ) -> bytes: tx = CTransaction() tx.nVersion = self.txVersion() tx.nLockTime = 0 # TODO: match locktimes by core tx.vout.append(self.txoType()(value, self.getScriptDest(script))) return tx.serialize() def fundSCLockTx(self, tx_bytes, feerate, vkbv=None) -> bytes: funded_tx = self.fundTx(tx_bytes, feerate) if self._disable_lock_tx_rbf: tx = self.loadTx(funded_tx) for txi in tx.vin: txi.nSequence = 0xFFFFFFFE funded_tx = tx.serialize_with_witness() return funded_tx def genScriptLockRefundTxScript(self, Kal, Kaf, csv_val) -> CScript: assert len(Kal) == 33 assert len(Kaf) == 33 # fmt: off return CScript([ CScriptOp(OP_IF), 2, Kal, Kaf, 2, CScriptOp(OP_CHECKMULTISIG), CScriptOp(OP_ELSE), csv_val, CScriptOp(OP_CHECKSEQUENCEVERIFY), CScriptOp(OP_DROP), Kaf, CScriptOp(OP_CHECKSIG), CScriptOp(OP_ENDIF)]) # fmt: on def isScriptP2PKH(self, script: bytes) -> bool: if len(script) != 25: return False if script[0] != OP_DUP: return False if script[1] != OP_HASH160: return False if script[2] != 20: return False if script[23] != OP_EQUALVERIFY: return False if script[24] != OP_CHECKSIG: return False return True def isScriptP2WPKH(self, script: bytes) -> bool: if len(script) != 22: return False if script[0] != OP_0: return False if script[1] != 20: return False return True def getScriptDummyWitness(self, script: bytes) -> List[bytes]: if self.isScriptP2WPKH(script): return [bytes(72), bytes(33)] raise ValueError("Unknown script type") def createSCLockRefundTx( self, tx_lock_bytes, script_lock, Kal, Kaf, lock1_value, csv_val, tx_fee_rate, vkbv=None, ): tx_lock = CTransaction() tx_lock = self.loadTx(tx_lock_bytes) output_script = self.getScriptDest(script_lock) locked_n = findOutput(tx_lock, output_script) ensure(locked_n is not None, "Output not found in tx") locked_coin = tx_lock.vout[locked_n].nValue tx_lock.rehash() tx_lock_id_int = tx_lock.sha256 refund_script = self.genScriptLockRefundTxScript(Kal, Kaf, csv_val) tx = CTransaction() tx.nVersion = self.txVersion() tx.vin.append( CTxIn( COutPoint(tx_lock_id_int, locked_n), nSequence=lock1_value, scriptSig=self.getScriptScriptSig(script_lock), ) ) tx.vout.append(self.txoType()(locked_coin, self.getScriptDest(refund_script))) dummy_witness_stack = self.getScriptLockTxDummyWitness(script_lock) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) pay_fee = round(tx_fee_rate * vsize / 1000) tx.vout[0].nValue = locked_coin - pay_fee tx.rehash() self._log.info( "createSCLockRefundTx {}{}.".format( self._log.id(i2b(tx.sha256)), ( "" if self._log.safe_logs else f":\n fee_rate, vsize, fee: {tx_fee_rate}, {vsize}, {pay_fee}" ), ) ) return tx.serialize(), refund_script, tx.vout[0].nValue def createSCLockRefundSpendTx( self, tx_lock_refund_bytes, script_lock_refund, pkh_refund_to, tx_fee_rate, vkbv=None, ): # Returns the coinA locked coin to the leader # The follower will sign the multisig path with a signature encumbered by the leader's coinB spend pubkey # If the leader publishes the decrypted signature the leader's coinB spend privatekey will be revealed to the follower tx_lock_refund = self.loadTx(tx_lock_refund_bytes) output_script = self.getScriptDest(script_lock_refund) locked_n = findOutput(tx_lock_refund, output_script) ensure(locked_n is not None, "Output not found in tx") locked_coin = tx_lock_refund.vout[locked_n].nValue tx_lock_refund.rehash() tx_lock_refund_hash_int = tx_lock_refund.sha256 tx = CTransaction() tx.nVersion = self.txVersion() tx.vin.append( CTxIn( COutPoint(tx_lock_refund_hash_int, locked_n), nSequence=0, scriptSig=self.getScriptScriptSig(script_lock_refund), ) ) tx.vout.append( self.txoType()(locked_coin, self.getScriptForPubkeyHash(pkh_refund_to)) ) dummy_witness_stack = self.getScriptLockRefundSpendTxDummyWitness( script_lock_refund ) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) pay_fee = round(tx_fee_rate * vsize / 1000) tx.vout[0].nValue = locked_coin - pay_fee tx.rehash() self._log.info( "createSCLockRefundSpendTx {}{}.".format( self._log.id(i2b(tx.sha256)), ( "" if self._log.safe_logs else f":\n fee_rate, vsize, fee: {tx_fee_rate}, {vsize}, {pay_fee}" ), ) ) return tx.serialize() def createSCLockRefundSpendToFTx( self, tx_lock_refund_bytes, script_lock_refund, pkh_dest, tx_fee_rate, vkbv=None, kbsf=None, ): # lock refund swipe tx # Sends the coinA locked coin to the follower tx_lock_refund = self.loadTx(tx_lock_refund_bytes) output_script = self.getScriptDest(script_lock_refund) locked_n = findOutput(tx_lock_refund, output_script) ensure(locked_n is not None, "Output not found in tx") locked_coin = tx_lock_refund.vout[locked_n].nValue A, B, lock2_value, C = extractScriptLockRefundScriptValues(script_lock_refund) tx_lock_refund.rehash() tx_lock_refund_hash_int = tx_lock_refund.sha256 tx = CTransaction() tx.nVersion = self.txVersion() tx.vin.append( CTxIn( COutPoint(tx_lock_refund_hash_int, locked_n), nSequence=lock2_value, scriptSig=self.getScriptScriptSig(script_lock_refund), ) ) tx.vout.append( self.txoType()(locked_coin, self.getScriptForPubkeyHash(pkh_dest)) ) if self.altruistic() and kbsf: # Add mercy_keyshare tx.vout.append(self.txoType()(0, CScript([OP_RETURN, b"XBSW", kbsf]))) else: self._log.debug( "Not attaching mercy output, have kbsf {}.".format( "true" if kbsf else "false" ) ) dummy_witness_stack = self.getScriptLockRefundSwipeTxDummyWitness( script_lock_refund ) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) pay_fee = round(tx_fee_rate * vsize / 1000) tx.vout[0].nValue = locked_coin - pay_fee tx.rehash() self._log.info( "createSCLockRefundSpendToFTx {}{}.".format( self._log.id(i2b(tx.sha256)), ( "" if self._log.safe_logs else f":\n fee_rate, vsize, fee: {tx_fee_rate}, {vsize}, {pay_fee}" ), ) ) return tx.serialize() def createSCLockSpendTx( self, tx_lock_bytes, script_lock, pkh_dest, tx_fee_rate, vkbv=None, fee_info={} ): tx_lock = self.loadTx(tx_lock_bytes) output_script = self.getScriptDest(script_lock) locked_n = findOutput(tx_lock, output_script) ensure(locked_n is not None, "Output not found in tx") locked_coin = tx_lock.vout[locked_n].nValue tx_lock.rehash() tx_lock_id_int = tx_lock.sha256 tx = CTransaction() tx.nVersion = self.txVersion() tx.vin.append( CTxIn( COutPoint(tx_lock_id_int, locked_n), scriptSig=self.getScriptScriptSig(script_lock), ) ) tx.vout.append( self.txoType()(locked_coin, self.getScriptForPubkeyHash(pkh_dest)) ) dummy_witness_stack = self.getScriptLockTxDummyWitness(script_lock) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) pay_fee = round(tx_fee_rate * vsize / 1000) tx.vout[0].nValue = locked_coin - pay_fee fee_info["fee_paid"] = pay_fee fee_info["rate_used"] = tx_fee_rate fee_info["witness_bytes"] = witness_bytes fee_info["vsize"] = vsize tx.rehash() self._log.info( "createSCLockSpendTx {}{}.".format( self._log.id(i2b(tx.sha256)), ( "" if self._log.safe_logs else f":\n fee_rate, vsize, fee: {tx_fee_rate}, {vsize}, {pay_fee}" ), ) ) return tx.serialize() def verifySCLockTx( self, tx_bytes, script_out, swap_value, Kal, Kaf, feerate, check_lock_tx_inputs, vkbv=None, ): # Verify: # # Not necessary to check the lock txn is mineable, as protocol will wait for it to confirm # However by checking early we can avoid wasting time processing unmineable txns # Check fee is reasonable tx = self.loadTx(tx_bytes) txid = self.getTxid(tx) self._log.info("Verifying lock tx: {}.".format(self._log.id(txid))) ensure(tx.nVersion == self.txVersion(), "Bad version") # locktime must be <= chainheight + 2 # TODO: Locktime is set to 0 to keep compaitibility with older nodes. # Set locktime to current chainheight in createSCLockTx. if tx.nLockTime != 0: current_height: int = self.getChainHeight() if tx.nLockTime > current_height + 2: raise ValueError( f"{self.coin_name()} - Bad nLockTime {tx.nLockTime}, current height {current_height}" ) script_pk = self.getScriptDest(script_out) locked_n = findOutput(tx, script_pk) ensure(locked_n is not None, "Output not found in tx") locked_coin = tx.vout[locked_n].nValue # Check value ensure(locked_coin == swap_value, "Bad locked value") # Check script A, B = extractScriptLockScriptValues(script_out) ensure(A == Kal, "Bad script pubkey") ensure(B == Kaf, "Bad script pubkey") if check_lock_tx_inputs: # TODO: Check that inputs are unspent # Verify fee rate inputs_value = 0 add_bytes = 0 add_witness_bytes = getCompactSizeLen(len(tx.vin)) for pi in tx.vin: ptx = self.rpc("getrawtransaction", [i2h(pi.prevout.hash), True]) prevout = ptx["vout"][pi.prevout.n] inputs_value += self.make_int(prevout["value"]) prevout_type = prevout["scriptPubKey"]["type"] if prevout_type == "witness_v0_keyhash": add_witness_bytes += 107 # sig 72, pk 33 and 2 size bytes add_witness_bytes += getCompactSizeLen(107) else: # Assume P2PKH, TODO more types add_bytes += ( 107 # OP_PUSH72 OP_PUSH33 ) outputs_value = 0 for txo in tx.vout: outputs_value += txo.nValue fee_paid = inputs_value - outputs_value assert fee_paid > 0 vsize = self.getTxVSize(tx, add_bytes, add_witness_bytes) fee_rate_paid = fee_paid * 1000 // vsize self._log.info( "tx amount, vsize, feerate: %ld, %ld, %ld", locked_coin, vsize, fee_rate_paid, ) if not self.compareFeeRates(fee_rate_paid, feerate): self._log.warning( "feerate paid doesn't match expected: %ld, %ld", fee_rate_paid, feerate, ) # TODO: Display warning to user return txid, locked_n def verifySCLockRefundTx( self, tx_bytes, lock_tx_bytes, script_out, prevout_id, prevout_n, prevout_seq, prevout_script, Kal, Kaf, csv_val_expect, swap_value, feerate, vkbv=None, ): # Verify: # Must have only one input with correct prevout and sequence # Must have only one output to the p2wsh of the lock refund script # Output value must be locked_coin - lock tx fee tx = self.loadTx(tx_bytes) txid = self.getTxid(tx) self._log.info("Verifying lock refund tx: {}.".format(self._log.id(txid))) ensure(tx.nVersion == self.txVersion(), "Bad version") ensure(tx.nLockTime == 0, "nLockTime not 0") ensure(len(tx.vin) == 1, "tx doesn't have one input") ensure(tx.vin[0].nSequence == prevout_seq, "Bad input nSequence") ensure( tx.vin[0].scriptSig == self.getScriptScriptSig(prevout_script), "Input scriptsig mismatch", ) ensure( tx.vin[0].prevout.hash == b2i(prevout_id) and tx.vin[0].prevout.n == prevout_n, "Input prevout mismatch", ) ensure(len(tx.vout) == 1, "tx doesn't have one output") script_pk = self.getScriptDest(script_out) locked_n = findOutput(tx, script_pk) ensure(locked_n is not None, "Output not found in tx") locked_coin = tx.vout[locked_n].nValue # Check script and values A, B, csv_val, C = extractScriptLockRefundScriptValues(script_out) ensure(A == Kal, "Bad script pubkey") ensure(B == Kaf, "Bad script pubkey") ensure(csv_val == csv_val_expect, "Bad script csv value") ensure(C == Kaf, "Bad script pubkey") fee_paid = swap_value - locked_coin assert fee_paid > 0 dummy_witness_stack = self.getScriptLockTxDummyWitness(prevout_script) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) fee_rate_paid = fee_paid * 1000 // vsize self._log.info_s( "tx amount, vsize, feerate: %ld, %ld, %ld", locked_coin, vsize, fee_rate_paid, ) if not self.compareFeeRates(fee_rate_paid, feerate): raise ValueError("Bad fee rate, expected: {}".format(feerate)) return txid, locked_coin, locked_n def verifySCLockRefundSpendTx( self, tx_bytes, lock_refund_tx_bytes, lock_refund_tx_id, prevout_script, Kal, prevout_n, prevout_value, feerate, vkbv=None, ): # Verify: # Must have only one input with correct prevout (n is always 0) and sequence # Must have only one output sending lock refund tx value - fee to leader's address, TODO: follower shouldn't need to verify destination addr tx = self.loadTx(tx_bytes) txid = self.getTxid(tx) self._log.info("Verifying lock refund spend tx: {}.".format(self._log.id(txid))) ensure(tx.nVersion == self.txVersion(), "Bad version") ensure(tx.nLockTime == 0, "nLockTime not 0") ensure(len(tx.vin) == 1, "tx doesn't have one input") ensure(tx.vin[0].nSequence == 0, "Bad input nSequence") ensure( tx.vin[0].scriptSig == self.getScriptScriptSig(prevout_script), "Input scriptsig mismatch", ) ensure( tx.vin[0].prevout.hash == b2i(lock_refund_tx_id) and tx.vin[0].prevout.n == 0, "Input prevout mismatch", ) ensure(len(tx.vout) == 1, "tx doesn't have one output") # Destination doesn't matter to the follower """ p2wpkh = CScript([OP_0, hash160(Kal)]) locked_n = findOutput(tx, p2wpkh) ensure(locked_n is not None, 'Output not found in lock refund spend tx') """ tx_value = tx.vout[0].nValue fee_paid = prevout_value - tx_value assert fee_paid > 0 dummy_witness_stack = self.getScriptLockRefundSpendTxDummyWitness( prevout_script ) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) fee_rate_paid = fee_paid * 1000 // vsize self._log.info_s( "tx amount, vsize, feerate: %ld, %ld, %ld", tx_value, vsize, fee_rate_paid ) if not self.compareFeeRates(fee_rate_paid, feerate): raise ValueError("Bad fee rate, expected: {}".format(feerate)) return True def verifySCLockSpendTx( self, tx_bytes, lock_tx_bytes, lock_tx_script, a_pkhash_f, feerate, vkbv=None ): # Verify: # Must have only one input with correct prevout (n is always 0) and sequence # Must have only one output with destination and amount tx = self.loadTx(tx_bytes) txid = self.getTxid(tx) self._log.info("Verifying lock spend tx: {}.".format(self._log.id(txid))) ensure(tx.nVersion == self.txVersion(), "Bad version") ensure(tx.nLockTime == 0, "nLockTime not 0") ensure(len(tx.vin) == 1, "tx doesn't have one input") lock_tx = self.loadTx(lock_tx_bytes) lock_tx_id = self.getTxid(lock_tx) output_script = self.getScriptDest(lock_tx_script) locked_n = findOutput(lock_tx, output_script) ensure(locked_n is not None, "Output not found in tx") locked_coin = lock_tx.vout[locked_n].nValue ensure(tx.vin[0].nSequence == 0, "Bad input nSequence") ensure( tx.vin[0].scriptSig == self.getScriptScriptSig(lock_tx_script), "Input scriptsig mismatch", ) ensure( tx.vin[0].prevout.hash == b2i(lock_tx_id) and tx.vin[0].prevout.n == locked_n, "Input prevout mismatch", ) ensure(len(tx.vout) == 1, "tx doesn't have one output") p2wpkh = self.getScriptForPubkeyHash(a_pkhash_f) ensure(tx.vout[0].scriptPubKey == p2wpkh, "Bad output destination") # The value of the lock tx output should already be verified, if the fee is as expected the difference will be the correct amount fee_paid = locked_coin - tx.vout[0].nValue assert fee_paid > 0 dummy_witness_stack = self.getScriptLockTxDummyWitness(lock_tx_script) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) fee_rate_paid = fee_paid * 1000 // vsize self._log.info_s( "tx amount, vsize, feerate: %ld, %ld, %ld", tx.vout[0].nValue, vsize, fee_rate_paid, ) if not self.compareFeeRates(fee_rate_paid, feerate): raise ValueError("Bad fee rate, expected: {}".format(feerate)) return True def signTx( self, key_bytes: bytes, tx_bytes: bytes, input_n: int, prevout_script: bytes, prevout_value: int, ) -> bytes: tx = self.loadTx(tx_bytes) sig_hash = SegwitV0SignatureHash( prevout_script, tx, input_n, SIGHASH_ALL, prevout_value ) eck = PrivateKey(key_bytes) for i in range(10000): # Grind for low-R value if i == 0: nonce = (ffi.NULL, ffi.NULL) else: extra_entropy = i.to_bytes(4, "little") + (b"\0" * 28) nonce = (ffi.NULL, ffi.new("unsigned char [32]", extra_entropy)) sig = eck.sign(sig_hash, hasher=None, custom_nonce=nonce) if len(sig) < 71: return sig + bytes((SIGHASH_ALL,)) raise RuntimeError("sign failed.") def signTxOtVES( self, key_sign: bytes, pubkey_encrypt: bytes, tx_bytes: bytes, input_n: int, prevout_script: bytes, prevout_value: int, ) -> bytes: tx = self.loadTx(tx_bytes) sig_hash = SegwitV0SignatureHash( prevout_script, tx, input_n, SIGHASH_ALL, prevout_value ) return ecdsaotves_enc_sign(key_sign, pubkey_encrypt, sig_hash) def verifyTxOtVES( self, tx_bytes: bytes, ct: bytes, Ks: bytes, Ke: bytes, input_n: int, prevout_script: bytes, prevout_value, ): tx = self.loadTx(tx_bytes) sig_hash = SegwitV0SignatureHash( prevout_script, tx, input_n, SIGHASH_ALL, prevout_value ) return ecdsaotves_enc_verify(Ks, Ke, sig_hash, ct) def decryptOtVES(self, k: bytes, esig: bytes) -> bytes: return ecdsaotves_dec_sig(k, esig) + bytes((SIGHASH_ALL,)) def recoverEncKey(self, esig, sig, K): return ecdsaotves_rec_enc_key(K, esig, sig[:-1]) # Strip sighash type def verifyTxSig( self, tx_bytes: bytes, sig: bytes, K: bytes, input_n: int, prevout_script: bytes, prevout_value: int, ) -> bool: tx = self.loadTx(tx_bytes) sig_hash = SegwitV0SignatureHash( prevout_script, tx, input_n, SIGHASH_ALL, prevout_value ) pubkey = PublicKey(K) return pubkey.verify(sig[:-1], sig_hash, hasher=None) # Pop the hashtype byte def fundTx(self, tx: bytes, feerate) -> bytes: feerate_str = self.format_amount(feerate) # TODO: Unlock unspents if bid cancelled # TODO: Manually select only segwit prevouts options = { "lockUnspents": True, "feeRate": feerate_str, } rv = self.rpc_wallet("fundrawtransaction", [tx.hex(), options]) tx_bytes: bytes = bytes.fromhex(rv["hex"]) return tx_bytes def getNonSegwitOutputs(self): unspents = self.rpc_wallet("listunspent", [0, 99999999]) nonsegwit_unspents = [] for u in unspents: if u.get("spendable", False) is False: continue if "desc" in u: desc = u["desc"] if self.use_p2shp2wsh(): if not desc.startswith("sh(wpkh"): nonsegwit_unspents.append( { "txid": u["txid"], "vout": u["vout"], "amount": u["amount"], } ) else: if not desc.startswith("wpkh"): nonsegwit_unspents.append( { "txid": u["txid"], "vout": u["vout"], "amount": u["amount"], } ) return nonsegwit_unspents def lockNonSegwitPrevouts(self) -> None: # For tests to_lock = self.getNonSegwitOutputs() if len(to_lock) > 0: self._log.debug(f"Locking {len(to_lock)} non segwit prevouts") self.rpc_wallet("lockunspent", [False, to_lock]) def listInputs(self, tx_bytes: bytes): tx = self.loadTx(tx_bytes) all_locked = self.rpc_wallet("listlockunspent") inputs = [] for pi in tx.vin: txid_hex = i2h(pi.prevout.hash) islocked = any( [ txid_hex == a["txid"] and pi.prevout.n == a["vout"] for a in all_locked ] ) inputs.append( {"txid": txid_hex, "vout": pi.prevout.n, "islocked": islocked} ) return inputs def unlockInputs(self, tx_bytes): tx = self.loadTx(tx_bytes) inputs = [] for pi in tx.vin: inputs.append({"txid": i2h(pi.prevout.hash), "vout": pi.prevout.n}) self.rpc_wallet("lockunspent", [True, inputs]) def signTxWithWallet(self, tx: bytes) -> bytes: rv = self.rpc_wallet("signrawtransactionwithwallet", [tx.hex()]) return bytes.fromhex(rv["hex"]) def signTxWithKey(self, tx: bytes, key: bytes) -> bytes: key_wif = self.encodeKey(key) rv = self.rpc( "signrawtransactionwithkey", [ tx.hex(), [ key_wif, ], ], ) return bytes.fromhex(rv["hex"]) def publishTx(self, tx: bytes): return self.rpc("sendrawtransaction", [tx.hex()]) def encodeTx(self, tx) -> bytes: return tx.serialize() def getTxid(self, tx) -> bytes: if isinstance(tx, str): tx = bytes.fromhex(tx) if isinstance(tx, bytes): tx = self.loadTx(tx) tx.rehash() return i2b(tx.sha256) def getTxOutputPos(self, tx, script): if isinstance(tx, bytes): tx = self.loadTx(tx) script_pk = self.getScriptDest(script) return findOutput(tx, script_pk) def getPubkeyHash(self, K: bytes) -> bytes: return hash160(K) def getScriptDest(self, script): return CScript([OP_0, sha256(script)]) def getP2WSHScriptDest(self, script): return CScript([OP_0, sha256(script)]) def getScriptScriptSig(self, script: bytes) -> bytes: return bytes() def getP2SHP2WSHDest(self, script): script_hash = sha256(script) assert len(script_hash) == 32 p2wsh_hash = hash160(CScript([OP_0, script_hash])) assert len(p2wsh_hash) == 20 return CScript([OP_HASH160, p2wsh_hash, OP_EQUAL]) def getP2SHP2WSHScriptSig(self, script): script_hash = sha256(script) assert len(script_hash) == 32 return CScript( [ CScript( [ OP_0, script_hash, ] ), ] ) def getPkDest(self, K: bytes) -> bytearray: return self.getScriptForPubkeyHash(self.getPubkeyHash(K)) def scanTxOutset(self, dest): return self.rpc("scantxoutset", ["start", ["raw({})".format(dest.hex())]]) def getTransaction(self, txid: bytes): try: return bytes.fromhex(self.rpc("getrawtransaction", [txid.hex()])) except Exception as e: # noqa: F841 # TODO: filter errors return None def getWalletTransaction(self, txid: bytes): try: return bytes.fromhex(self.rpc_wallet("gettransaction", [txid.hex()])["hex"]) except Exception as e: # noqa: F841 # TODO: filter errors return None def setTxSignature(self, tx_bytes: bytes, stack) -> bytes: tx = self.loadTx(tx_bytes) tx.wit.vtxinwit.clear() tx.wit.vtxinwit.append(CTxInWitness()) tx.wit.vtxinwit[0].scriptWitness.stack = stack return tx.serialize() def setTxScriptSig( self, tx_bytes: bytes, input_no: int, script_sig: bytes ) -> bytes: tx = self.loadTx(tx_bytes) tx.vin[0].scriptSig = script_sig return tx.serialize() def stripTxSignature(self, tx_bytes) -> bytes: tx = self.loadTx(tx_bytes) tx.wit.vtxinwit.clear() return tx.serialize() def extractLeaderSig(self, tx_bytes: bytes) -> bytes: tx = self.loadTx(tx_bytes) return tx.wit.vtxinwit[0].scriptWitness.stack[1] def extractFollowerSig(self, tx_bytes: bytes) -> bytes: tx = self.loadTx(tx_bytes) return tx.wit.vtxinwit[0].scriptWitness.stack[2] def createBLockTx(self, Kbs, output_amount, vkbv=None) -> bytes: tx = CTransaction() tx.nVersion = self.txVersion() p2wpkh_script_pk = self.getPkDest(Kbs) tx.vout.append(self.txoType()(output_amount, p2wpkh_script_pk)) return tx.serialize() def encodeSharedAddress(self, Kbv, Kbs): return self.pubkey_to_segwit_address(Kbs) def publishBLockTx( self, kbv, Kbs, output_amount, feerate, unlock_time: int = 0 ) -> bytes: b_lock_tx = self.createBLockTx(Kbs, output_amount) b_lock_tx = self.fundTx(b_lock_tx, feerate) b_lock_tx = self.signTxWithWallet(b_lock_tx) return bytes.fromhex(self.publishTx(b_lock_tx)) def getTxVSize(self, tx, add_bytes: int = 0, add_witness_bytes: int = 0) -> int: wsf = self.witnessScaleFactor() len_full = len(tx.serialize_with_witness()) + add_bytes + add_witness_bytes len_nwit = len(tx.serialize_without_witness()) + add_bytes weight = len_nwit * (wsf - 1) + len_full return (weight + wsf - 1) // wsf def findTxB( self, kbv, Kbs, cb_swap_value: int, cb_block_confirmed: int, restore_height: int, bid_sender: bool, check_amount: bool = True, ): dest_address = ( self.pubkey_to_segwit_address(Kbs) if self.using_segwit() else self.pubkey_to_address(Kbs) ) return self.getLockTxHeight(None, dest_address, cb_swap_value, restore_height) """ raw_dest = self.getPkDest(Kbs) rv = self.scanTxOutset(raw_dest) for utxo in rv['unspents']: if 'height' in utxo and utxo['height'] > 0 and rv['height'] - utxo['height'] > cb_block_confirmed: if self.make_int(utxo['amount']) != cb_swap_value: self._log.warning('Found output to lock tx pubkey of incorrect value: %s', str(utxo['amount'])) else: return {'txid': utxo['txid'], 'vout': utxo['vout'], 'amount': utxo['amount'], 'height': utxo['height']} return None """ def getBLockSpendTxFee(self, tx, fee_rate: int) -> int: witness_bytes = 109 vsize = self.getTxVSize(tx, add_witness_bytes=witness_bytes) pay_fee = round(fee_rate * vsize / 1000) self._log.info_s( f"BLockSpendTx fee_rate, vsize, fee: {fee_rate}, {vsize}, {pay_fee}." ) return pay_fee def spendBLockTx( self, chain_b_lock_txid: bytes, address_to: str, kbv: bytes, kbs: bytes, cb_swap_value: int, b_fee: int, restore_height: int, spend_actual_balance: bool = False, lock_tx_vout=None, ) -> bytes: self._log.info( "spendBLockTx: {} {}\n".format( self._log.id(chain_b_lock_txid), lock_tx_vout ) ) locked_n = lock_tx_vout Kbs = self.getPubkey(kbs) script_pk = self.getPkDest(Kbs) if locked_n is None: wtx = self.rpc_wallet_watch( "gettransaction", [ chain_b_lock_txid.hex(), ], ) lock_tx = self.loadTx(bytes.fromhex(wtx["hex"])) locked_n = findOutput(lock_tx, script_pk) ensure(locked_n is not None, "Output not found in tx") pkh_to = self.decodeAddress(address_to) tx = CTransaction() tx.nVersion = self.txVersion() script_lock = self.getScriptForPubkeyHash(Kbs) chain_b_lock_txid_int = b2i(chain_b_lock_txid) tx.vin.append( CTxIn( COutPoint(chain_b_lock_txid_int, locked_n), nSequence=0, scriptSig=self.getScriptScriptSig(script_lock), ) ) tx.vout.append( self.txoType()(cb_swap_value, self.getScriptForPubkeyHash(pkh_to)) ) pay_fee = self.getBLockSpendTxFee(tx, b_fee) tx.vout[0].nValue = cb_swap_value - pay_fee b_lock_spend_tx = tx.serialize() b_lock_spend_tx = self.signTxWithKey(b_lock_spend_tx, kbs) return bytes.fromhex(self.publishTx(b_lock_spend_tx)) def importWatchOnlyAddress(self, address: str, label: str) -> None: if self._use_descriptors: desc_watch = descsum_create(f"addr({address})") rv = self.rpc_wallet_watch( "importdescriptors", [ [ {"desc": desc_watch, "timestamp": "now", "active": False}, ], ], ) ensure(rv[0]["success"] is True, "importdescriptors failed for watchonly") return self.rpc_wallet("importaddress", [address, label, False]) def isWatchOnlyAddress(self, address: str) -> bool: addr_info = self.rpc_wallet("getaddressinfo", [address]) return addr_info["iswatchonly"] def getSCLockScriptAddress(self, lock_script: bytes) -> str: lock_tx_dest = self.getScriptDest(lock_script) return self.encodeScriptDest(lock_tx_dest) def getLockTxHeight( self, txid, dest_address, bid_amount, rescan_from, find_index: bool = False, vout: int = -1, ): # Add watchonly address and rescan if required if not self.isAddressMine(dest_address, or_watch_only=True): self.importWatchOnlyAddress(dest_address, "bid") self._log.info( "Imported watch-only addr: {}".format(self._log.addr(dest_address)) ) self._log.info( "Rescanning {} chain from height: {}".format( self.coin_name(), rescan_from ) ) self.rpc_wallet("rescanblockchain", [rescan_from]) return_txid = True if txid is None else False if txid is None: txns = self.rpc_wallet_watch( "listunspent", [ 0, 99999999, [ dest_address, ], ], ) for tx in txns: if self.make_int(tx["amount"]) == bid_amount: txid = bytes.fromhex(tx["txid"]) break if txid is None: return None try: # set `include_watchonly` explicitly to `True` to get transactions for watchonly addresses also in BCH tx = self.rpc_wallet_watch("gettransaction", [txid.hex(), True]) block_height = 0 if "blockhash" in tx: block_header = self.rpc("getblockheader", [tx["blockhash"]]) block_height = block_header["height"] rv = { "depth": 0 if "confirmations" not in tx else tx["confirmations"], "height": block_height, } if "mempoolconflicts" in tx and len(tx["mempoolconflicts"]) > 0: rv["conflicts"] = tx["mempoolconflicts"] elif "walletconflicts" in tx and len(tx["walletconflicts"]) > 0: rv["conflicts"] = tx["walletconflicts"] except Exception as e: self._log.debug( "getLockTxHeight gettransaction failed: %s, %s", txid.hex(), str(e) ) return None if find_index: tx_obj = self.rpc("decoderawtransaction", [tx["hex"]]) rv["index"] = find_vout_for_address_from_txobj(tx_obj, dest_address) if return_txid: rv["txid"] = txid.hex() return rv def getOutput(self, txid, dest_script, expect_value, xmr_swap=None): # TODO: Use getrawtransaction if txindex is active utxos = self.rpc( "scantxoutset", ["start", ["raw({})".format(dest_script.hex())]] ) if "height" in utxos: # chain_height not returned by v18 codebase chain_height = utxos["height"] else: chain_height = self.getChainHeight() rv = [] for utxo in utxos["unspents"]: if txid and txid.hex() != utxo["txid"]: continue if expect_value != self.make_int(utxo["amount"]): continue rv.append( { "depth": ( 0 if "height" not in utxo else (chain_height - utxo["height"]) + 1 ), "height": 0 if "height" not in utxo else utxo["height"], "amount": self.make_int(utxo["amount"]), "txid": utxo["txid"], "vout": utxo["vout"], } ) return rv, chain_height def withdrawCoin(self, value: float, addr_to: str, subfee: bool): params = [addr_to, value, "", "", subfee, True, self._conf_target] return self.rpc_wallet("sendtoaddress", params) def signCompact(self, k, message: str) -> bytes: message_hash = sha256(bytes(message, "utf-8")) privkey = PrivateKey(k) return privkey.sign_recoverable(message_hash, hasher=None)[:64] def signRecoverable(self, k, message: str) -> bytes: message_hash = sha256(bytes(message, "utf-8")) privkey = PrivateKey(k) return privkey.sign_recoverable(message_hash, hasher=None) def verifyCompactSig(self, K, message: str, sig) -> None: message_hash = sha256(bytes(message, "utf-8")) pubkey = PublicKey(K) rv = pubkey.verify_compact(sig, message_hash, hasher=None) assert rv is True def verifySigAndRecover(self, sig, message: str) -> bytes: message_hash = sha256(bytes(message, "utf-8")) pubkey = PublicKey.from_signature_and_message(sig, message_hash, hasher=None) return pubkey.format() def verifyMessage( self, address: str, message: str, signature: str, message_magic: str = None ) -> bool: if message_magic is None: message_magic = self.chainparams()["message_magic"] message_bytes = ( SerialiseNumCompact(len(message_magic)) + bytes(message_magic, "utf-8") + SerialiseNumCompact(len(message)) + bytes(message, "utf-8") ) message_hash = sha256(sha256(message_bytes)) signature_bytes = base64.b64decode(signature) rec_id = (signature_bytes[0] - 27) & 3 signature_bytes = signature_bytes[1:] + bytes((rec_id,)) try: pubkey = PublicKey.from_signature_and_message( signature_bytes, message_hash, hasher=None ) except Exception as e: self._log.info("verifyMessage failed: " + str(e)) return False address_hash = self.decodeAddress(address) pubkey_hash = hash160(pubkey.format()) return True if address_hash == pubkey_hash else False def showLockTransfers(self, kbv, Kbs, restore_height): raise ValueError("Unimplemented") def getWitnessStackSerialisedLength(self, witness_stack): length = getCompactSizeLen(len(witness_stack)) for e in witness_stack: length += getWitnessElementLen(len(e)) # See core SerializeTransaction length += 1 # vinDummy length += 1 # flags return length def describeTx(self, tx_hex: str): return self.rpc("decoderawtransaction", [tx_hex]) def getSpendableBalance(self) -> int: return self.make_int(self.rpc_wallet("getbalances")["mine"]["trusted"]) def createUTXO(self, value_sats: int): # Create a new address and send value_sats to it spendable_balance = self.getSpendableBalance() if spendable_balance < value_sats: raise ValueError("Balance too low") address = self.getNewAddress(self._use_segwit, "create_utxo") return ( self.withdrawCoin(self.format_amount(value_sats), address, False), address, ) def createRawFundedTransaction( self, addr_to: str, amount: int, sub_fee: bool = False, lock_unspents: bool = True, ) -> str: txn = self.rpc( "createrawtransaction", [[], {addr_to: self.format_amount(amount)}] ) options = { "lockUnspents": lock_unspents, "conf_target": self._conf_target, } if sub_fee: options["subtractFeeFromOutputs"] = [ 0, ] return self.rpc_wallet("fundrawtransaction", [txn, options])["hex"] def createRawSignedTransaction(self, addr_to, amount) -> str: txn_funded = self.createRawFundedTransaction(addr_to, amount) return self.rpc_wallet("signrawtransactionwithwallet", [txn_funded])["hex"] def getBlockWithTxns(self, block_hash: str): return self.rpc("getblock", [block_hash, 2]) def listUtxos(self): return self.rpc_wallet("listunspent") def getUnspentsByAddr(self): unspent_addr = dict() unspent = self.rpc_wallet("listunspent") for u in unspent: if u.get("spendable", False) is False: continue if "address" not in u: continue if "desc" in u: desc = u["desc"] if self.using_segwit: if self.use_p2shp2wsh(): if not desc.startswith("sh(wpkh"): continue else: if not desc.startswith("wpkh"): continue else: if not desc.startswith("pkh"): continue unspent_addr[u["address"]] = unspent_addr.get( u["address"], 0 ) + self.make_int(u["amount"], r=1) return unspent_addr def getUTXOBalance(self, address: str): sum_unspent = 0 self._log.debug("[rm] scantxoutset start") # scantxoutset is slow ro = self.rpc( "scantxoutset", ["start", ["addr({})".format(address)]] ) # TODO: Use combo(address) where possible self._log.debug("[rm] scantxoutset end") for o in ro["unspents"]: sum_unspent += self.make_int(o["amount"]) return sum_unspent def signMessage(self, address: str, message: str) -> str: return self.rpc_wallet( "signmessage", [address, message], ) def signMessageWithKey(self, key_wif: str, message: str) -> str: return self.rpc("signmessagewithprivkey", [key_wif, message]) def getProofOfFunds(self, amount_for, extra_commit_bytes): # TODO: Lock unspent and use same output/s to fund bid unspent_addr = self.getUnspentsByAddr() sign_for_addr = None for addr, value in unspent_addr.items(): if value >= amount_for: sign_for_addr = addr break ensure( sign_for_addr is not None, "Could not find address with enough funds for proof", ) self._log.debug(f"sign_for_addr {sign_for_addr}") funds_addr: str = sign_for_addr if ( self.using_segwit() ): # TODO: Use isSegwitAddress when scantxoutset can use combo # 'Address does not refer to key' for non p2pkh pkh = self.decodeAddress(sign_for_addr) sign_for_addr = self.pkh_to_address(pkh) self._log.debug(f"sign_for_addr converted {sign_for_addr}") sign_message: str = sign_for_addr + "_swap_proof_" + extra_commit_bytes.hex() if self._use_descriptors: # https://github.com/bitcoin/bitcoin/issues/10542 # https://github.com/bitcoin/bitcoin/issues/26046 priv_keys = self.rpc_wallet( "listdescriptors", [ True, ], ) addr_info = self.rpc_wallet( "getaddressinfo", [ funds_addr, ], ) hdkeypath = addr_info["hdkeypath"] sign_for_address_key = None for descriptor in priv_keys["descriptors"]: if descriptor["active"] is False or descriptor["internal"] is True: continue desc = descriptor["desc"] assert desc.startswith("wpkh(") ext_key = desc[5:].split(")")[0].split("/", 1)[0] ext_key_data = decodeAddress(ext_key)[4:] ci_part = self._sc.ci(Coins.PART) ext_key_data_part = ci_part.encode_secret_extkey(ext_key_data) rv = ci_part.rpc_wallet( "extkey", ["info", ext_key_data_part, hdkeypath] ) extkey_derived = rv["key_info"]["result"] ext_key_data = decodeAddress(extkey_derived)[4:] ek = ExtKeyPair() ek.decode(ext_key_data) sign_for_address_key = self.encodeKey(ek._key) break assert sign_for_address_key is not None signature = self.signMessageWithKey(sign_for_address_key, sign_message) del priv_keys else: signature = self.signMessage(sign_for_addr, sign_message) prove_utxos = [] # TODO: Send specific utxos return (sign_for_addr, signature, prove_utxos) def encodeProofUtxos(self, proof_utxos): packed_utxos = bytes() for utxo in proof_utxos: packed_utxos += utxo[0] + utxo[1].to_bytes(2, "big") return packed_utxos def decodeProofUtxos(self, msg_utxos): proof_utxos = [] if len(msg_utxos) > 0: num_utxos = len(msg_utxos) // 34 p: int = 0 for i in range(num_utxos): proof_utxos.append( ( msg_utxos[p : p + 32], int.from_bytes(msg_utxos[p + 32 : p + 34], "big"), ) ) p += 34 return proof_utxos def verifyProofOfFunds(self, address, signature, utxos, extra_commit_bytes): passed = self.verifyMessage( address, address + "_swap_proof_" + extra_commit_bytes.hex(), signature ) ensure(passed is True, "Proof of funds signature invalid") if self.using_segwit(): address = self.encodeSegwitAddress(decodeAddress(address)[1:]) return self.getUTXOBalance(address) def isWalletEncrypted(self) -> bool: wallet_info = self.rpc_wallet("getwalletinfo") return "unlocked_until" in wallet_info def isWalletLocked(self) -> bool: wallet_info = self.rpc_wallet("getwalletinfo") if "unlocked_until" in wallet_info and wallet_info["unlocked_until"] <= 0: return True return False def isWalletEncryptedLocked(self) -> (bool, bool): wallet_info = self.rpc_wallet("getwalletinfo") encrypted = "unlocked_until" in wallet_info locked = encrypted and wallet_info["unlocked_until"] <= 0 return encrypted, locked def createWallet(self, wallet_name: str, password: str = "") -> None: self.rpc( "createwallet", [wallet_name, False, True, password, False, self._use_descriptors], ) def setActiveWallet(self, wallet_name: str) -> None: # For debugging self.rpc_wallet = make_rpc_func( self._rpcport, self._rpcauth, host=self._rpc_host, wallet=wallet_name ) self._rpc_wallet = wallet_name def newKeypool(self) -> None: self._log.debug("Running newkeypool.") self.rpc_wallet("newkeypool") def encryptWallet(self, password: str, check_seed: bool = True): # Watchonly wallets are not encrypted # Workaround for https://github.com/bitcoin/bitcoin/issues/26607 seed_id_before: str = self.getWalletSeedID() orig_active_descriptors = [] orig_hdchain_bytes = None walletpath = None max_hdchain_key_count: int = 4000000 # Arbitrary chain_client_settings = self._sc.getChainClientSettings( self.coin_type() ) # basicswap.json if ( chain_client_settings.get("manage_daemon", False) and check_seed is True and seed_id_before != "Not found" ): # Store active keys self.rpc("unloadwallet", [self._rpc_wallet]) datadir = chain_client_settings["datadir"] if self._network != "mainnet": datadir = os.path.join(datadir, self._network) try_wallet_path = os.path.join(datadir, self._rpc_wallet) if os.path.exists(try_wallet_path): walletpath = try_wallet_path else: try_wallet_path = os.path.join(datadir, "wallets", self._rpc_wallet) if os.path.exists(try_wallet_path): walletpath = try_wallet_path walletfilepath = walletpath if os.path.isdir(walletpath): walletfilepath = os.path.join(walletpath, "wallet.dat") if walletpath is None: self._log.warning(f"Unable to find {self.ticker()} wallet path.") else: if self._use_descriptors: orig_active_descriptors = [] with sqlite3.connect(walletfilepath) as conn: c = conn.cursor() rows = c.execute( "SELECT * FROM main WHERE key in (:kext, :kint)", { "kext": bytes.fromhex( "1161637469766565787465726e616c73706b02" ), "kint": bytes.fromhex( "11616374697665696e7465726e616c73706b02" ), }, ) for row in rows: k, v = row orig_active_descriptors.append({"k": k, "v": v}) else: seedid_bytes: bytes = bytes.fromhex(seed_id_before)[::-1] with open(walletfilepath, "rb") as fp: with mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) as mm: pos = mm.find(seedid_bytes) while pos != -1: mm.seek(pos - 8) hdchain_bytes = mm.read(12 + 20) version = int.from_bytes(hdchain_bytes[:4], "little") if version == 2: external_counter = int.from_bytes( hdchain_bytes[4:8], "little" ) internal_counter = int.from_bytes( hdchain_bytes[-4:], "little" ) if ( external_counter > 0 and external_counter <= max_hdchain_key_count and internal_counter > 0 and internal_counter <= max_hdchain_key_count ): orig_hdchain_bytes = hdchain_bytes self._log.debug( f"Found hdchain for: {seed_id_before} external_counter: {external_counter}, internal_counter: {internal_counter}." ) break pos = mm.find(seedid_bytes, pos + 1) self.rpc("loadwallet", [self._rpc_wallet]) self.rpc_wallet("encryptwallet", [password]) if check_seed is False or seed_id_before == "Not found" or walletpath is None: return seed_id_after: str = self.getWalletSeedID() if seed_id_before == seed_id_after: return self._log.warning(f"{self.ticker()} wallet seed changed after encryption.") self._log.debug( f"seed_id_before: {seed_id_before} seed_id_after: {seed_id_after}." ) self.setWalletSeedWarning(True) if chain_client_settings.get("manage_daemon", False) is False: self._log.warning( f"{self.ticker()} manage_daemon is false. Can't attempt to fix." ) return if self._use_descriptors: if len(orig_active_descriptors) < 2: self._log.error( "Could not find original active descriptors for wallet." ) return self._log.info("Attempting to revert to last descriptors.") else: if orig_hdchain_bytes is None: self._log.error("Could not find hdchain for wallet.") return self._log.info("Attempting to revert to last hdchain.") try: # Make a copy of the encrypted wallet before modifying it bkp_path = walletpath + ".bkp" for i in range(100): if not os.path.exists(bkp_path): break bkp_path = walletpath + f".bkp{i}" if os.path.exists(bkp_path): self._log.error("Could not find backup path for wallet.") return self.rpc("unloadwallet", [self._rpc_wallet]) if os.path.isfile(walletpath): shutil.copy(walletpath, bkp_path) else: shutil.copytree(walletpath, bkp_path) hdchain_replaced: bool = False if self._use_descriptors: with sqlite3.connect(walletfilepath) as conn: c = conn.cursor() c.executemany( "UPDATE main SET value = :v WHERE key = :k", orig_active_descriptors, ) conn.commit() else: seedid_after_bytes: bytes = bytes.fromhex(seed_id_after)[::-1] with open(walletfilepath, "r+b") as fp: with mmap.mmap(fp.fileno(), 0) as mm: pos = mm.find(seedid_after_bytes) while pos != -1: mm.seek(pos - 8) hdchain_bytes = mm.read(12 + 20) version = int.from_bytes(hdchain_bytes[:4], "little") if version == 2: external_counter = int.from_bytes( hdchain_bytes[4:8], "little" ) internal_counter = int.from_bytes( hdchain_bytes[-4:], "little" ) if ( external_counter > 0 and external_counter <= max_hdchain_key_count and internal_counter > 0 and internal_counter <= max_hdchain_key_count ): self._log.debug( f"Replacing hdchain for: {seed_id_after} external_counter: {external_counter}, internal_counter: {internal_counter}." ) offset: int = pos - 8 mm.seek(offset) mm.write(orig_hdchain_bytes) self._log.debug( f"hdchain replaced at offset: {offset}." ) hdchain_replaced = True # Can appear multiple times in file, replace all. pos = mm.find(seedid_after_bytes, pos + 1) if hdchain_replaced is False: self._log.error("Could not find new hdchain in wallet.") self.rpc("loadwallet", [self._rpc_wallet]) if hdchain_replaced: self.unlockWallet(password, check_seed=False) seed_id_after_restore: str = self.getWalletSeedID() if seed_id_after_restore == seed_id_before: self.newKeypool() else: self._log.warning( f"Expected seed id not found: {seed_id_before}, have {seed_id_after_restore}." ) self.lockWallet() except Exception as e: self._log.error(f"{self.ticker()} recreating wallet failed: {e}.") if self._sc.debug: self._log.error(traceback.format_exc()) def changeWalletPassword( self, old_password: str, new_password: str, check_seed_if_encrypt: bool = True ): self._log.info("changeWalletPassword - {}".format(self.ticker())) if old_password == "": if self.isWalletEncrypted(): raise ValueError("Old password must be set") return self.encryptWallet(new_password, check_seed=check_seed_if_encrypt) self.rpc_wallet("walletpassphrasechange", [old_password, new_password]) def unlockWallet(self, password: str, check_seed: bool = True) -> None: if password == "": return self._log.info(f"unlockWallet - {self.ticker()}") if self.coin_type() == Coins.BTC: # Recreate wallet if none found # Required when encrypting an existing btc wallet, workaround is to delete the btc wallet and recreate wallets = self.rpc("listwallets") if len(wallets) < 1: self._log.info( f'Creating wallet "{self._rpc_wallet}" for {self.coin_name()}.' ) # wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors self.rpc( "createwallet", [ self._rpc_wallet, False, True, password, False, self._use_descriptors, ], ) # Max timeout value, ~3 years self.rpc_wallet("walletpassphrase", [password, 100000000]) if check_seed: self._sc.checkWalletSeed(self.coin_type()) def lockWallet(self): self._log.info(f"lockWallet - {self.ticker()}") self.rpc_wallet("walletlock") def get_p2sh_script_pubkey(self, script: bytearray) -> bytearray: script_hash = hash160(script) assert len(script_hash) == 20 return CScript([OP_HASH160, script_hash, OP_EQUAL]) def get_p2wsh_script_pubkey(self, script: bytearray) -> bytearray: return CScript([OP_0, sha256(script)]) def findTxnByHash(self, txid_hex: str): # Only works for wallet txns try: rv = self.rpc_wallet("gettransaction", [txid_hex]) except Exception as e: # noqa: F841 self._log.debug( "findTxnByHash getrawtransaction failed: {}".format(txid_hex) ) return None if "confirmations" in rv and rv["confirmations"] >= self.blocks_confirmed: return {"txid": txid_hex, "amount": 0, "height": rv["blockheight"]} return None def createRedeemTxn( self, prevout, output_addr: str, output_value: int, txn_script: bytes = None ) -> str: tx = CTransaction() tx.nVersion = self.txVersion() prev_txid = b2i(bytes.fromhex(prevout["txid"])) tx.vin.append(CTxIn(COutPoint(prev_txid, prevout["vout"]))) pkh = self.decodeAddress(output_addr) script = self.getScriptForPubkeyHash(pkh) tx.vout.append(self.txoType()(output_value, script)) tx.rehash() return tx.serialize().hex() def createRefundTxn( self, prevout, output_addr: str, output_value: int, locktime: int, sequence: int, txn_script: bytes = None, ) -> str: tx = CTransaction() tx.nVersion = self.txVersion() tx.nLockTime = locktime prev_txid = b2i(bytes.fromhex(prevout["txid"])) tx.vin.append( CTxIn( COutPoint(prev_txid, prevout["vout"]), nSequence=sequence, ) ) pkh = self.decodeAddress(output_addr) script = self.getScriptForPubkeyHash(pkh) tx.vout.append(self.txoType()(output_value, script)) tx.rehash() return tx.serialize().hex() def ensureFunds(self, amount: int) -> None: if self.getSpendableBalance() < amount: raise ValueError("Balance too low") def getHTLCSpendTxVSize(self, redeem: bool = True) -> int: tx_vsize = ( 5 # Add a few bytes, sequence in script takes variable amount of bytes ) if self.using_segwit(): tx_vsize += 143 if redeem else 134 else: tx_vsize += 323 if redeem else 287 return tx_vsize def find_prevout_info(self, txn_hex: str, txn_script: bytes): txjs = self.rpc("decoderawtransaction", [txn_hex]) if self.using_segwit(): p2wsh = self.getScriptDest(txn_script) n = getVoutByScriptPubKey(txjs, p2wsh.hex()) else: addr_to = self.encode_p2sh(txn_script) n = getVoutByAddress(txjs, addr_to) return { "txid": txjs["txid"], "vout": n, "scriptPubKey": txjs["vout"][n]["scriptPubKey"]["hex"], "redeemScript": txn_script.hex(), "amount": txjs["vout"][n]["value"], } def inspectSwipeTx(self, tx: dict): for vout in tx["vout"]: script_bytes = bytes.fromhex(vout["scriptPubKey"]["hex"]) if len(script_bytes) < 39: continue if script_bytes[0] != OP_RETURN: continue script_bytes[0] return script_bytes[7 : 7 + 32] return None def isTxExistsError(self, err_str: str) -> bool: return "Transaction already in block chain" in err_str def isTxNonFinalError(self, err_str: str) -> bool: return "non-BIP68-final" in err_str or "non-final" in err_str def combine_non_segwit_prevouts(self): self._log.info("Combining non-segwit prevouts") if self._use_segwit is False: raise RuntimeError("Not configured to use segwit outputs.") prevouts_to_spend = self.getNonSegwitOutputs() if len(prevouts_to_spend) < 1: raise RuntimeError("No non-segwit outputs found.") total_amount: int = 0 for n, prevout in enumerate(prevouts_to_spend): total_amount += self.make_int(prevout["amount"]) addr_to: str = self.getNewAddress( self._use_segwit, "combine_non_segwit_prevouts" ) txn = self.rpc( "createrawtransaction", [prevouts_to_spend, {addr_to: self.format_amount(total_amount)}], ) fee_rate, rate_src = self.get_fee_rate(self._conf_target) fee_rate_str: str = self.format_amount(fee_rate, True, 1) self._log.debug( f"Using fee rate: {fee_rate_str}, src: {rate_src}, confirms target: {self._conf_target}" ) options = { "add_inputs": False, "subtractFeeFromOutputs": [ 0, ], "feeRate": fee_rate_str, } tx_fee_set = self.rpc_wallet("fundrawtransaction", [txn, options])["hex"] tx_signed = self.rpc_wallet("signrawtransactionwithwallet", [tx_fee_set])["hex"] tx = self.rpc( "decoderawtransaction", [ tx_signed, ], ) self._log.info( "Submitting tx to combine non-segwit prevouts: {}".format( self._log.id(bytes.fromhex(tx["txid"])) ) ) self.rpc( "sendrawtransaction", [ tx_signed, ], ) return tx["txid"] def testBTCInterface(): print("TODO: testBTCInterface") if __name__ == "__main__": testBTCInterface()