#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (c) 2020-2024 tecnovert # Copyright (c) 2024-2025 The Basicswap developers # Distributed under the MIT software license, see the accompanying # file LICENSE or http://www.opensource.org/licenses/mit-license.php. import hashlib from enum import IntEnum from typing import List from basicswap.contrib.test_framework.messages import ( CTxOutPart, ) from basicswap.contrib.test_framework.script import ( CScript, OP_DUP, OP_HASH160, OP_EQUALVERIFY, OP_CHECKSIG, ) from basicswap.util import ( ensure, TemporaryError, ) from basicswap.util.script import ( getCompactSizeLen, getWitnessElementLen, ) from basicswap.util.address import ( encodeStealthAddress, ) from basicswap.interface.btc import ( BTCInterface, extractScriptLockScriptValues, extractScriptLockRefundScriptValues, ) from basicswap.chainparams import Coins, chainparams class BalanceTypes(IntEnum): PLAIN = 1 BLIND = 2 ANON = 3 class PARTInterface(BTCInterface): @staticmethod def coin_type(): # Returns the base coin type # ANON and BLIND PART will return Coins.PART return Coins.PART @staticmethod def balance_type(): return BalanceTypes.PLAIN @staticmethod def witnessScaleFactor() -> int: return 2 @staticmethod def txVersion() -> int: return 0xA0 @staticmethod def est_lock_tx_vsize() -> int: return 138 @staticmethod def xmr_swap_a_lock_spend_tx_vsize() -> int: return 200 @staticmethod def xmr_swap_b_lock_spend_tx_vsize() -> int: return 138 @staticmethod def txoType(): return CTxOutPart def __init__(self, coin_settings, network, swap_client=None): super().__init__(coin_settings, network, swap_client) self.setAnonTxRingSize(int(coin_settings.get("anon_tx_ring_size", 12))) def use_tx_vsize(self) -> bool: return True def setAnonTxRingSize(self, value): ensure(value >= 3 and value < 33, "Invalid anon_tx_ring_size value") self._anon_tx_ring_size = value def knownWalletSeed(self): # TODO: Double check return True def getNewAddress(self, use_segwit, label="swap_receive") -> str: return self.rpc_wallet("getnewaddress", [label]) def getNewStealthAddress(self, label="swap_stealth") -> str: return self.rpc_wallet("getnewstealthaddress", [label]) def haveSpentIndex(self): version = self.getDaemonVersion() index_info = self.rpc( "getinsightinfo" if int(str(version)[:2]) > 19 else "getindexinfo" ) return index_info["spentindex"] def initialiseWallet(self, key: bytes, restore_time: int = -1) -> None: raise ValueError("TODO") def withdrawCoin(self, value, addr_to, subfee): params = [addr_to, value, "", "", subfee, "", True, self._conf_target] return self.rpc_wallet("sendtoaddress", params) def sendTypeTo(self, type_from, type_to, value, addr_to, subfee): params = [ type_from, type_to, [ {"address": addr_to, "amount": value, "subfee": subfee}, ], "", "", self._anon_tx_ring_size, 1, False, {"conf_target": self._conf_target}, ] return self.rpc_wallet("sendtypeto", params) def getScriptForPubkeyHash(self, pkh: bytes) -> CScript: return CScript([OP_DUP, OP_HASH160, pkh, OP_EQUALVERIFY, OP_CHECKSIG]) def getScriptDummyWitness(self, script: bytes) -> List[bytes]: if self.isScriptP2WPKH(script) or self.isScriptP2PKH(script): return [bytes(72), bytes(33)] raise ValueError("Unknown script type") def formatStealthAddress(self, scan_pubkey, spend_pubkey) -> str: prefix_byte = chainparams[self.coin_type()][self._network]["stealth_key_prefix"] return encodeStealthAddress(prefix_byte, scan_pubkey, spend_pubkey) def getWitnessStackSerialisedLength(self, witness_stack) -> int: length: int = getCompactSizeLen(len(witness_stack)) for e in witness_stack: length += getWitnessElementLen(len(e)) return length def getWalletRestoreHeight(self) -> int: start_time = self.rpc_wallet("getwalletinfo")["keypoololdest"] blockchaininfo = self.getBlockchainInfo() chain_synced = round(blockchaininfo["verificationprogress"], 3) if chain_synced < 1.0: raise ValueError("{} chain isn't synced.".format(self.coin_name())) self._log.debug("Finding block at time: {}".format(start_time)) block_hash = self.rpc("getblockhashafter", [start_time]) block_header = self.rpc("getblockheader", [block_hash]) return block_header["height"] def getHTLCSpendTxVSize(self, redeem: bool = True) -> int: tx_vsize = ( 5 # Add a few bytes, sequence in script takes variable amount of bytes ) tx_vsize += 204 if redeem else 187 return tx_vsize def getUnspentsByAddr(self): unspent_addr = dict() balance_type = self.balance_type() if balance_type == BalanceTypes.PLAIN: unspent = self.rpc_wallet("listunspent") elif balance_type == BalanceTypes.BLIND: unspent = self.rpc_wallet("listunspentblind") else: raise ValueError( f"getUnspentsByAddr not implemented for {balance_type} type" ) for u in unspent: if u["spendable"] is not True: continue if "address" not in u: continue unspent_addr[u["address"]] = unspent_addr.get( u["address"], 0 ) + self.make_int(u["amount"], r=1) return unspent_addr def combine_non_segwit_prevouts(self): raise RuntimeError("No non-segwit outputs found.") def signMessage(self, address: str, message: str) -> str: args = [address, message] if self.getDaemonVersion() > 23020700: message_magic: str = self.chainparams()["message_magic"] args += [ message_magic, ] return self.rpc_wallet("signmessage", args) def signMessageWithKey(self, key_wif: str, message: str) -> str: args = [key_wif, message] if self.getDaemonVersion() > 23020700: message_magic: str = self.chainparams()["message_magic"] args += [ message_magic, ] return self.rpc("signmessagewithprivkey", args) class PARTInterfaceBlind(PARTInterface): def interface_type(self) -> int: return Coins.PART_BLIND @staticmethod def balance_type(): return BalanceTypes.BLIND @staticmethod def est_lock_tx_vsize() -> int: return 980 @staticmethod def xmr_swap_a_lock_spend_tx_vsize() -> int: return 1032 @staticmethod def xmr_swap_b_lock_spend_tx_vsize() -> int: return 980 @staticmethod def compareFeeRates(actual: int, expected: int) -> bool: # Allow the fee to be up to 10% larger than expected if actual < expected - 20: return False if actual > expected + expected * 0.1: return False return True def coin_name(self) -> str: return super().coin_name() + " Blind" def getScriptLockTxNonce(self, data): return hashlib.sha256(data + bytes("locktx", "utf-8")).digest() def getScriptLockRefundTxNonce(self, data): return hashlib.sha256(data + bytes("lockrefundtx", "utf-8")).digest() def findOutputByNonce(self, tx_obj, nonce): blinded_info = None output_n = None for txo in tx_obj["vout"]: if txo["type"] != "blind": continue try: blinded_info = self.rpc( "rewindrangeproof", [txo["rangeproof"], txo["valueCommitment"], nonce.hex()], ) output_n = txo["n"] self.rpc( "rewindrangeproof", [txo["rangeproof"], txo["valueCommitment"], nonce.hex()], ) break except Exception as e: self._log.debug("Searching for locked output: {}".format(str(e))) continue # Should not be possible for commitment not to match v = self.rpc( "verifycommitment", [txo["valueCommitment"], blinded_info["blind"], blinded_info["amount"]], ) ensure(v["result"] is True, "verifycommitment failed") return output_n, blinded_info def createSCLockTx(self, value: int, script: bytearray, vkbv: bytes) -> bytes: # Nonce is derived from vkbv, ephemeral_key isn't used ephemeral_key = self.getNewRandomKey() ephemeral_pubkey = self.getPubkey(ephemeral_key) assert len(ephemeral_pubkey) == 33 nonce = self.getScriptLockTxNonce(vkbv) p2wsh_addr = self.encode_p2wsh(self.getP2WSHScriptDest(script)) inputs = [] outputs = [ { "type": "blind", "amount": self.format_amount(value), "address": p2wsh_addr, "nonce": nonce.hex(), "data": ephemeral_pubkey.hex(), } ] params = [inputs, outputs] rv = self.rpc_wallet("createrawparttransaction", params) return bytes.fromhex(rv["hex"]) def fundSCLockTx(self, tx_bytes: bytes, feerate: int, vkbv: bytes) -> bytes: feerate_str = self.format_amount(feerate) # TODO: unlock unspents if bid cancelled tx_hex = tx_bytes.hex() nonce = self.getScriptLockTxNonce(vkbv) tx_obj = self.rpc("decoderawtransaction", [tx_hex]) assert len(tx_obj["vout"]) == 1 txo = tx_obj["vout"][0] blinded_info = self.rpc( "rewindrangeproof", [txo["rangeproof"], txo["valueCommitment"], nonce.hex()] ) outputs_info = { 0: { "value": blinded_info["amount"], "blind": blinded_info["blind"], "nonce": nonce.hex(), } } options = { "lockUnspents": True, "feeRate": feerate_str, } rv = self.rpc_wallet( "fundrawtransactionfrom", ["blind", tx_hex, {}, outputs_info, options] ) return bytes.fromhex(rv["hex"]) def createSCLockRefundTx( self, tx_lock_bytes, script_lock, Kal, Kaf, lock1_value, csv_val, tx_fee_rate, vkbv, ): lock_tx_obj = self.rpc("decoderawtransaction", [tx_lock_bytes.hex()]) assert self.getTxid(tx_lock_bytes).hex() == lock_tx_obj["txid"] # Nonce is derived from vkbv, ephemeral_key isn't used ephemeral_key = self.getNewRandomKey() ephemeral_pubkey = self.getPubkey(ephemeral_key) assert len(ephemeral_pubkey) == 33 nonce = self.getScriptLockTxNonce(vkbv) output_nonce = self.getScriptLockRefundTxNonce(vkbv) # Find the output of the lock tx to spend spend_n, input_blinded_info = self.findOutputByNonce(lock_tx_obj, nonce) ensure(spend_n is not None, "Output not found in tx") locked_coin = input_blinded_info["amount"] tx_lock_id = lock_tx_obj["txid"] refund_script = self.genScriptLockRefundTxScript(Kal, Kaf, csv_val) p2wsh_addr = self.encode_p2wsh(self.getP2WSHScriptDest(refund_script)) inputs = [ { "txid": tx_lock_id, "vout": spend_n, "sequence": lock1_value, "blindingfactor": input_blinded_info["blind"], } ] outputs = [ { "type": "blind", "amount": locked_coin, "address": p2wsh_addr, "nonce": output_nonce.hex(), "data": ephemeral_pubkey.hex(), } ] params = [inputs, outputs] rv = self.rpc_wallet("createrawparttransaction", params) lock_refund_tx_hex = rv["hex"] # Set dummy witness data for fee estimation dummy_witness_stack = self.getScriptLockTxDummyWitness(script_lock) dummy_witness_stack = [x.hex() for x in dummy_witness_stack] # Use a junk change pubkey to avoid adding unused keys to the wallet zero_change_key = self.getNewRandomKey() zero_change_pubkey = self.getPubkey(zero_change_key) inputs_info = { "0": { "value": input_blinded_info["amount"], "blind": input_blinded_info["blind"], "witnessstack": dummy_witness_stack, } } outputs_info = rv["amounts"] options = { "changepubkey": zero_change_pubkey.hex(), "feeRate": self.format_amount(tx_fee_rate), "subtractFeeFromOutputs": [ 0, ], } rv = self.rpc_wallet( "fundrawtransactionfrom", ["blind", lock_refund_tx_hex, inputs_info, outputs_info, options], ) lock_refund_tx_hex = rv["hex"] for vout, txo in rv["output_amounts"].items(): if txo["value"] > 0: refunded_value = txo["value"] return bytes.fromhex(lock_refund_tx_hex), refund_script, refunded_value def createSCLockRefundSpendTx( self, tx_lock_refund_bytes, script_lock_refund, pkh_refund_to, tx_fee_rate, vkbv ): # Returns the coinA locked coin to the leader # The follower will sign the multisig path with a signature encumbered by the leader's coinB spend pubkey # If the leader publishes the decrypted signature the leader's coinB spend privatekey will be revealed to the follower lock_refund_tx_obj = self.rpc( "decoderawtransaction", [tx_lock_refund_bytes.hex()] ) # Nonce is derived from vkbv nonce = self.getScriptLockRefundTxNonce(vkbv) # Find the output of the lock refund tx to spend spend_n, input_blinded_info = self.findOutputByNonce(lock_refund_tx_obj, nonce) ensure(spend_n is not None, "Output not found in tx") tx_lock_refund_id = lock_refund_tx_obj["txid"] addr_out = self.pkh_to_address(pkh_refund_to) addr_info = self.rpc_wallet("getaddressinfo", [addr_out]) output_pubkey_hex = addr_info["pubkey"] # Follower won't be able to decode output to check amount, shouldn't matter as fee is public and output is to leader, sum has to balance inputs = [ { "txid": tx_lock_refund_id, "vout": spend_n, "sequence": 0, "blindingfactor": input_blinded_info["blind"], } ] outputs = [ { "type": "blind", "amount": input_blinded_info["amount"], "address": addr_out, "pubkey": output_pubkey_hex, } ] params = [inputs, outputs] rv = self.rpc_wallet("createrawparttransaction", params) lock_refund_spend_tx_hex = rv["hex"] # Set dummy witness data for fee estimation dummy_witness_stack = self.getScriptLockRefundSpendTxDummyWitness( script_lock_refund ) dummy_witness_stack = [x.hex() for x in dummy_witness_stack] # Use a junk change pubkey to avoid adding unused keys to the wallet zero_change_key = self.getNewRandomKey() zero_change_pubkey = self.getPubkey(zero_change_key) inputs_info = { "0": { "value": input_blinded_info["amount"], "blind": input_blinded_info["blind"], "witnessstack": dummy_witness_stack, } } outputs_info = rv["amounts"] options = { "changepubkey": zero_change_pubkey.hex(), "feeRate": self.format_amount(tx_fee_rate), "subtractFeeFromOutputs": [ 0, ], } rv = self.rpc_wallet( "fundrawtransactionfrom", ["blind", lock_refund_spend_tx_hex, inputs_info, outputs_info, options], ) lock_refund_spend_tx_hex = rv["hex"] return bytes.fromhex(lock_refund_spend_tx_hex) def verifySCLockTx( self, tx_bytes, script_out, swap_value, Kal, Kaf, feerate, check_lock_tx_inputs, vkbv, ): lock_tx_obj = self.rpc("decoderawtransaction", [tx_bytes.hex()]) lock_txid_hex = lock_tx_obj["txid"] self._log.info("Verifying lock tx: {}.".format(self._log.id(lock_txid_hex))) ensure(lock_tx_obj["version"] == self.txVersion(), "Bad version") lock_time: int = lock_tx_obj["locktime"] # locktime must be <= chainheight + 2 # TODO: locktime is set to 0 to keep compaitibility with older nodes. # Set locktime to current chainheight in createSCLockTx. if lock_time != 0: current_height: int = self.getChainHeight() if lock_time > current_height + 2: raise ValueError( f"{self.coin_name()} - Bad nLockTime {lock_time}, current height {current_height}" ) # Find the output of the lock tx to verify nonce = self.getScriptLockTxNonce(vkbv) lock_output_n, blinded_info = self.findOutputByNonce(lock_tx_obj, nonce) ensure(lock_output_n is not None, "Output not found in tx") # Check value locked_txo_value = self.make_int(blinded_info["amount"]) ensure(locked_txo_value == swap_value, "Bad locked value") # Check script lock_txo_scriptpk = bytes.fromhex( lock_tx_obj["vout"][lock_output_n]["scriptPubKey"]["hex"] ) script_pk = self.getP2WSHScriptDest(script_out) ensure(lock_txo_scriptpk == script_pk, "Bad output script") A, B = extractScriptLockScriptValues(script_out) ensure(A == Kal, "Bad script leader pubkey") ensure(B == Kaf, "Bad script follower pubkey") # TODO: Check that inputs are unspent, rangeproofs and commitments sum # Verify fee rate vsize = lock_tx_obj["vsize"] fee_paid = self.make_int(lock_tx_obj["vout"][0]["ct_fee"]) fee_rate_paid = fee_paid * 1000 // vsize self._log.info( "tx amount, vsize, feerate: %ld, %ld, %ld", locked_txo_value, vsize, fee_rate_paid, ) if not self.compareFeeRates(fee_rate_paid, feerate): self._log.warning( "feerate paid doesn't match expected: %ld, %ld", fee_rate_paid, feerate ) # TODO: Display warning to user return bytes.fromhex(lock_txid_hex), lock_output_n def verifySCLockRefundTx( self, tx_bytes, lock_tx_bytes, script_out, prevout_id, prevout_n, prevout_seq, prevout_script, Kal, Kaf, csv_val_expect, swap_value, feerate, vkbv, ): lock_refund_tx_obj = self.rpc("decoderawtransaction", [tx_bytes.hex()]) lock_refund_txid_hex = lock_refund_tx_obj["txid"] self._log.info( "Verifying lock refund tx: {}.".format(self._log.id(lock_refund_txid_hex)) ) ensure(lock_refund_tx_obj["version"] == self.txVersion(), "Bad version") ensure(lock_refund_tx_obj["locktime"] == 0, "Bad nLockTime") ensure(len(lock_refund_tx_obj["vin"]) == 1, "tx doesn't have one input") txin = lock_refund_tx_obj["vin"][0] ensure(txin["sequence"] == prevout_seq, "Bad input nSequence") ensure(txin["scriptSig"]["hex"] == "", "Input scriptsig not empty") ensure( txin["txid"] == prevout_id.hex() and txin["vout"] == prevout_n, "Input prevout mismatch", ) ensure(len(lock_refund_tx_obj["vout"]) == 3, "tx doesn't have three outputs") # Find the output of the lock refund tx to verify nonce = self.getScriptLockRefundTxNonce(vkbv) lock_refund_output_n, blinded_info = self.findOutputByNonce( lock_refund_tx_obj, nonce ) ensure(lock_refund_output_n is not None, "Output not found in tx") lock_refund_txo_value = self.make_int(blinded_info["amount"]) # Check script lock_refund_txo_scriptpk = bytes.fromhex( lock_refund_tx_obj["vout"][lock_refund_output_n]["scriptPubKey"]["hex"] ) script_pk = self.getP2WSHScriptDest(script_out) ensure(lock_refund_txo_scriptpk == script_pk, "Bad output script") A, B, csv_val, C = extractScriptLockRefundScriptValues(script_out) ensure(A == Kal, "Bad script pubkey") ensure(B == Kaf, "Bad script pubkey") ensure(csv_val == csv_val_expect, "Bad script csv value") ensure(C == Kaf, "Bad script pubkey") # Check rangeproofs and commitments sum lock_tx_obj = self.rpc("decoderawtransaction", [lock_tx_bytes.hex()]) prevout = lock_tx_obj["vout"][prevout_n] prevtxns = [ { "txid": prevout_id.hex(), "vout": prevout_n, "scriptPubKey": prevout["scriptPubKey"]["hex"], "amount_commitment": prevout["valueCommitment"], } ] rv = self.rpc("verifyrawtransaction", [tx_bytes.hex(), prevtxns]) ensure(rv["outputs_valid"] is True, "Invalid outputs") ensure(rv["inputs_valid"] is True, "Invalid inputs") # Check value fee_paid = self.make_int(lock_refund_tx_obj["vout"][0]["ct_fee"]) ensure(swap_value - lock_refund_txo_value == fee_paid, "Bad output value") # Check fee rate dummy_witness_stack = self.getScriptLockTxDummyWitness(prevout_script) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(self.loadTx(tx_bytes), add_witness_bytes=witness_bytes) fee_rate_paid = fee_paid * 1000 // vsize self._log.info("vsize, feerate: %ld, %ld", vsize, fee_rate_paid) ensure( self.compareFeeRates(fee_rate_paid, feerate), "Bad fee rate, expected: {}".format(feerate), ) return ( bytes.fromhex(lock_refund_txid_hex), lock_refund_txo_value, lock_refund_output_n, ) def verifySCLockRefundSpendTx( self, tx_bytes, lock_refund_tx_bytes, lock_refund_tx_id, prevout_script, Kal, prevout_n, prevout_value, feerate, vkbv, ): lock_refund_spend_tx_obj = self.rpc("decoderawtransaction", [tx_bytes.hex()]) lock_refund_spend_txid_hex = lock_refund_spend_tx_obj["txid"] self._log.info( "Verifying lock refund spend tx: {}.".format( self._log.id(lock_refund_spend_txid_hex) ) ) ensure(lock_refund_spend_tx_obj["version"] == self.txVersion(), "Bad version") ensure(lock_refund_spend_tx_obj["locktime"] == 0, "Bad nLockTime") ensure(len(lock_refund_spend_tx_obj["vin"]) == 1, "tx doesn't have one input") txin = lock_refund_spend_tx_obj["vin"][0] ensure(txin["sequence"] == 0, "Bad input nSequence") ensure(txin["scriptSig"]["hex"] == "", "Input scriptsig not empty") ensure( txin["txid"] == lock_refund_tx_id.hex() and txin["vout"] == prevout_n, "Input prevout mismatch", ) ensure( len(lock_refund_spend_tx_obj["vout"]) == 3, "tx doesn't have three outputs" ) # Leader picks output destinations # Follower is not concerned with them as they pay to leader # Check rangeproofs and commitments sum lock_refund_tx_obj = self.rpc( "decoderawtransaction", [lock_refund_tx_bytes.hex()] ) prevout = lock_refund_tx_obj["vout"][prevout_n] prevtxns = [ { "txid": lock_refund_tx_id.hex(), "vout": prevout_n, "scriptPubKey": prevout["scriptPubKey"]["hex"], "amount_commitment": prevout["valueCommitment"], } ] rv = self.rpc("verifyrawtransaction", [tx_bytes.hex(), prevtxns]) ensure(rv["outputs_valid"] is True, "Invalid outputs") ensure(rv["inputs_valid"] is True, "Invalid inputs") # Check fee rate dummy_witness_stack = self.getScriptLockRefundSpendTxDummyWitness( prevout_script ) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(self.loadTx(tx_bytes), add_witness_bytes=witness_bytes) fee_paid = self.make_int(lock_refund_spend_tx_obj["vout"][0]["ct_fee"]) fee_rate_paid = fee_paid * 1000 // vsize ensure( self.compareFeeRates(fee_rate_paid, feerate), "Bad fee rate, expected: {}".format(feerate), ) return True def getLockTxSwapOutputValue(self, bid, xmr_swap): lock_tx_obj = self.rpc("decoderawtransaction", [xmr_swap.a_lock_tx.hex()]) nonce = self.getScriptLockTxNonce(xmr_swap.vkbv) output_n, _ = self.findOutputByNonce(lock_tx_obj, nonce) ensure(output_n is not None, "Output not found in tx") return bytes.fromhex(lock_tx_obj["vout"][output_n]["valueCommitment"]) def getLockRefundTxSwapOutputValue(self, bid, xmr_swap): lock_refund_tx_obj = self.rpc( "decoderawtransaction", [xmr_swap.a_lock_refund_tx.hex()] ) nonce = self.getScriptLockRefundTxNonce(xmr_swap.vkbv) output_n, _ = self.findOutputByNonce(lock_refund_tx_obj, nonce) ensure(output_n is not None, "Output not found in tx") return bytes.fromhex(lock_refund_tx_obj["vout"][output_n]["valueCommitment"]) def getLockRefundTxSwapOutput(self, xmr_swap): lock_refund_tx_obj = self.rpc( "decoderawtransaction", [xmr_swap.a_lock_refund_tx.hex()] ) nonce = self.getScriptLockRefundTxNonce(xmr_swap.vkbv) output_n, _ = self.findOutputByNonce(lock_refund_tx_obj, nonce) ensure(output_n is not None, "Output not found in tx") return output_n def createSCLockSpendTx( self, tx_lock_bytes: bytes, script_lock: bytes, pk_dest: bytes, tx_fee_rate: int, vkbv: bytes, fee_info={}, ) -> bytes: lock_tx_obj = self.rpc("decoderawtransaction", [tx_lock_bytes.hex()]) lock_txid_hex = lock_tx_obj["txid"] ensure(lock_tx_obj["version"] == self.txVersion(), "Bad version") ensure(lock_tx_obj["locktime"] == 0, "Bad nLockTime") # Find the output of the lock tx to verify nonce = self.getScriptLockTxNonce(vkbv) spend_n, blinded_info = self.findOutputByNonce(lock_tx_obj, nonce) ensure(spend_n is not None, "Output not found in tx") addr_out = self.pubkey_to_address(pk_dest) inputs = [ { "txid": lock_txid_hex, "vout": spend_n, "sequence": 0, "blindingfactor": blinded_info["blind"], } ] outputs = [ { "type": "blind", "amount": blinded_info["amount"], "address": addr_out, "pubkey": pk_dest.hex(), } ] params = [inputs, outputs] rv = self.rpc_wallet("createrawparttransaction", params) lock_spend_tx_hex = rv["hex"] # Set dummy witness data for fee estimation dummy_witness_stack = self.getScriptLockTxDummyWitness(script_lock) # Use a junk change pubkey to avoid adding unused keys to the wallet zero_change_key = self.getNewRandomKey() zero_change_pubkey = self.getPubkey(zero_change_key) inputs_info = { "0": { "value": blinded_info["amount"], "blind": blinded_info["blind"], "witnessstack": [x.hex() for x in dummy_witness_stack], } } outputs_info = rv["amounts"] options = { "changepubkey": zero_change_pubkey.hex(), "feeRate": self.format_amount(tx_fee_rate), "subtractFeeFromOutputs": [ 0, ], } rv = self.rpc_wallet( "fundrawtransactionfrom", ["blind", lock_spend_tx_hex, inputs_info, outputs_info, options], ) lock_spend_tx_hex = rv["hex"] lock_spend_tx_obj = self.rpc("decoderawtransaction", [lock_spend_tx_hex]) pay_fee = self.make_int(lock_spend_tx_obj["vout"][0]["ct_fee"]) # lock_spend_tx_hex does not include the dummy witness stack witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize( self.loadTx(bytes.fromhex(lock_spend_tx_hex)), add_witness_bytes=witness_bytes, ) actual_tx_fee_rate = pay_fee * 1000 // vsize self._log.info( "createSCLockSpendTx {}{}.".format( self._log.id(lock_spend_tx_obj["txid"]), ( "" if self._log.safe_logs else f":\n fee_rate, vsize, fee: {actual_tx_fee_rate}, {vsize}, {pay_fee}" ), ) ) fee_info["vsize"] = vsize fee_info["fee_paid"] = pay_fee fee_info["rate_input"] = tx_fee_rate fee_info["rate_actual"] = actual_tx_fee_rate return bytes.fromhex(lock_spend_tx_hex) def verifySCLockSpendTx( self, tx_bytes, lock_tx_bytes, lock_tx_script, a_pk_f, feerate, vkbv ): lock_spend_tx_obj = self.rpc("decoderawtransaction", [tx_bytes.hex()]) lock_spend_txid_hex = lock_spend_tx_obj["txid"] self._log.info( "Verifying lock spend tx: {}.".format(self._log.id(lock_spend_txid_hex)) ) ensure(lock_spend_tx_obj["version"] == self.txVersion(), "Bad version") ensure(lock_spend_tx_obj["locktime"] == 0, "Bad nLockTime") ensure(len(lock_spend_tx_obj["vin"]) == 1, "tx doesn't have one input") lock_tx_obj = self.rpc("decoderawtransaction", [lock_tx_bytes.hex()]) lock_txid_hex = lock_tx_obj["txid"] # Find the output of the lock tx to verify nonce = self.getScriptLockTxNonce(vkbv) spend_n, input_blinded_info = self.findOutputByNonce(lock_tx_obj, nonce) ensure(spend_n is not None, "Output not found in tx") txin = lock_spend_tx_obj["vin"][0] ensure(txin["sequence"] == 0, "Bad input nSequence") ensure(txin["scriptSig"]["hex"] == "", "Input scriptsig not empty") ensure( txin["txid"] == lock_txid_hex and txin["vout"] == spend_n, "Input prevout mismatch", ) ensure(len(lock_spend_tx_obj["vout"]) == 3, "tx doesn't have three outputs") addr_out = self.pubkey_to_address(a_pk_f) privkey = self.rpc_wallet("dumpprivkey", [addr_out]) # Find output: output_blinded_info = None output_n = None for txo in lock_spend_tx_obj["vout"]: if txo["type"] != "blind": continue try: output_blinded_info = self.rpc( "rewindrangeproof", [ txo["rangeproof"], txo["valueCommitment"], privkey, txo["data_hex"], ], ) output_n = txo["n"] break except Exception as e: self._log.debug("Searching for locked output: {}".format(str(e))) pass ensure(output_n is not None, "Output not found in tx") # Commitment v = self.rpc( "verifycommitment", [ lock_spend_tx_obj["vout"][output_n]["valueCommitment"], output_blinded_info["blind"], output_blinded_info["amount"], ], ) ensure(v["result"] is True, "verifycommitment failed") # Check rangeproofs and commitments sum prevout = lock_tx_obj["vout"][spend_n] prevtxns = [ { "txid": lock_txid_hex, "vout": spend_n, "scriptPubKey": prevout["scriptPubKey"]["hex"], "amount_commitment": prevout["valueCommitment"], } ] rv = self.rpc("verifyrawtransaction", [tx_bytes.hex(), prevtxns]) ensure(rv["outputs_valid"] is True, "Invalid outputs") ensure(rv["inputs_valid"] is True, "Invalid inputs") # Check amount fee_paid = self.make_int(lock_spend_tx_obj["vout"][0]["ct_fee"]) amount_difference = self.make_int(input_blinded_info["amount"]) - self.make_int( output_blinded_info["amount"] ) ensure(fee_paid == amount_difference, "Invalid output amount") # Check fee dummy_witness_stack = self.getScriptLockTxDummyWitness(lock_tx_script) witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack) vsize = self.getTxVSize(self.loadTx(tx_bytes), add_witness_bytes=witness_bytes) fee_rate_paid = fee_paid * 1000 // vsize self._log.info("vsize, feerate: %ld, %ld", vsize, fee_rate_paid) if not self.compareFeeRates(fee_rate_paid, feerate): raise ValueError("Bad fee rate, expected: {}".format(feerate)) return True def createSCLockRefundSpendToFTx( self, tx_lock_refund_bytes, script_lock_refund, pkh_dest, tx_fee_rate, vkbv, kbsf=None, ): # lock refund swipe tx # Sends the coinA locked coin to the follower lock_refund_tx_obj = self.rpc( "decoderawtransaction", [tx_lock_refund_bytes.hex()] ) nonce = self.getScriptLockRefundTxNonce(vkbv) # Find the output of the lock refund tx to spend spend_n, input_blinded_info = self.findOutputByNonce(lock_refund_tx_obj, nonce) ensure(spend_n is not None, "Output not found in tx") tx_lock_refund_id = lock_refund_tx_obj["txid"] addr_out = self.pkh_to_address(pkh_dest) addr_info = self.rpc_wallet("getaddressinfo", [addr_out]) output_pubkey_hex = addr_info["pubkey"] A, B, lock2_value, C = extractScriptLockRefundScriptValues(script_lock_refund) # Follower won't be able to decode output to check amount, shouldn't matter as fee is public and output is to leader, sum has to balance inputs = [ { "txid": tx_lock_refund_id, "vout": spend_n, "sequence": lock2_value, "blindingfactor": input_blinded_info["blind"], } ] outputs = [ { "type": "blind", "amount": input_blinded_info["amount"], "address": addr_out, "pubkey": output_pubkey_hex, } ] params = [inputs, outputs] rv = self.rpc_wallet("createrawparttransaction", params) lock_refund_swipe_tx_hex = rv["hex"] # Set dummy witness data for fee estimation dummy_witness_stack = self.getScriptLockRefundSwipeTxDummyWitness( script_lock_refund ) dummy_witness_stack = [x.hex() for x in dummy_witness_stack] # Use a junk change pubkey to avoid adding unused keys to the wallet zero_change_key = self.getNewRandomKey() zero_change_pubkey = self.getPubkey(zero_change_key) inputs_info = { "0": { "value": input_blinded_info["amount"], "blind": input_blinded_info["blind"], "witnessstack": dummy_witness_stack, } } outputs_info = rv["amounts"] options = { "changepubkey": zero_change_pubkey.hex(), "feeRate": self.format_amount(tx_fee_rate), "subtractFeeFromOutputs": [ 0, ], } rv = self.rpc_wallet( "fundrawtransactionfrom", ["blind", lock_refund_swipe_tx_hex, inputs_info, outputs_info, options], ) lock_refund_swipe_tx_hex = rv["hex"] return bytes.fromhex(lock_refund_swipe_tx_hex) def getSpendableBalance(self) -> int: return self.make_int(self.rpc_wallet("getbalances")["mine"]["blind_trusted"]) def publishBLockTx( self, vkbv: bytes, Kbs: bytes, output_amount: int, feerate: int, unlock_time: int = 0, ) -> bytes: Kbv = self.getPubkey(vkbv) sx_addr = self.formatStealthAddress(Kbv, Kbs) self._log.debug("sx_addr: {}".format(sx_addr)) # TODO: Fund from other balances params = [ "blind", "blind", [ {"address": sx_addr, "amount": self.format_amount(output_amount)}, ], "", "", self._anon_tx_ring_size, 1, False, {"conf_target": self._conf_target, "blind_watchonly_visible": True}, ] txid = self.rpc_wallet("sendtypeto", params) return bytes.fromhex(txid) def findTxB( self, kbv, Kbs, cb_swap_value: int, cb_block_confirmed: int, restore_height: int, bid_sender: bool, check_amount: bool = True, ): Kbv = self.getPubkey(kbv) sx_addr = self.formatStealthAddress(Kbv, Kbs) # Tx recipient must import the stealth address as watch only if bid_sender: cb_swap_value *= -1 else: addr_info = self.rpc_wallet("getaddressinfo", [sx_addr]) if not addr_info["iswatchonly"]: wif_scan_key = self.encodeKey(kbv) self.rpc_wallet("importstealthaddress", [wif_scan_key, Kbs.hex()]) self._log.info("Imported watch-only sx_addr: {}".format(sx_addr)) self._log.info( "Rescanning {} chain from height: {}".format( self.coin_name(), restore_height ) ) self.rpc_wallet("rescanblockchain", [restore_height]) params = [{"include_watchonly": True, "search": sx_addr}] txns = self.rpc_wallet("filtertransactions", params) if len(txns) == 1: tx = txns[0] assert ( tx["outputs"][0]["stealth_address"] == sx_addr ) # Should not be possible ensure(tx["outputs"][0]["type"] == "blind", "Output is not anon") if ( self.make_int(tx["outputs"][0]["amount"]) == cb_swap_value or check_amount is False ): height = 0 if tx["confirmations"] > 0: chain_height = self.rpc("getblockcount") height = chain_height - (tx["confirmations"] - 1) return {"txid": tx["txid"], "amount": cb_swap_value, "height": height} else: self._log.warning( "Incorrect amount detected for coin b lock txn: {}".format( tx["txid"] ) ) return -1 return None def spendBLockTx( self, chain_b_lock_txid: bytes, address_to: str, kbv: bytes, kbs: bytes, cb_swap_value: int, b_fee: int, restore_height: int, spend_actual_balance: bool = False, lock_tx_vout=None, ) -> bytes: Kbv = self.getPubkey(kbv) Kbs = self.getPubkey(kbs) sx_addr = self.formatStealthAddress(Kbv, Kbs) addr_info = self.rpc_wallet("getaddressinfo", [sx_addr]) if not addr_info["ismine"]: wif_scan_key = self.encodeKey(kbv) wif_spend_key = self.encodeKey(kbs) self.rpc_wallet("importstealthaddress", [wif_scan_key, wif_spend_key]) self._log.info("Imported spend key for sx_addr: {}".format(sx_addr)) self._log.info( "Rescanning {} chain from height: {}".format( self.coin_name(), restore_height ) ) self.rpc_wallet("rescanblockchain", [restore_height]) # TODO: Remove workaround # utxos = self.rpc_wallet('listunspentblind', [1, 9999999, [sx_addr]]) utxos = [] all_utxos = self.rpc_wallet("listunspentblind", [1, 9999999]) for utxo in all_utxos: if utxo.get("stealth_address", "_") == sx_addr: utxos.append(utxo) if len(utxos) < 1: raise TemporaryError("No spendable outputs") elif len(utxos) > 1: raise ValueError("Too many spendable outputs") utxo = utxos[0] utxo_sats = self.make_int(utxo["amount"]) if spend_actual_balance and utxo_sats != cb_swap_value: self._log.warning( "Spending actual balance {}, not swap value {}.".format( utxo_sats, cb_swap_value ) ) cb_swap_value = utxo_sats inputs = [ {"tx": utxo["txid"], "n": utxo["vout"]}, ] params = [ "blind", "blind", [ { "address": address_to, "amount": self.format_amount(cb_swap_value), "subfee": True, }, ], "", "", self._anon_tx_ring_size, 1, False, {"conf_target": self._conf_target, "inputs": inputs, "show_fee": True}, ] rv = self.rpc_wallet("sendtypeto", params) return bytes.fromhex(rv["txid"]) def findTxnByHash(self, txid_hex): # txindex is enabled for Particl try: rv = self.rpc("getrawtransaction", [txid_hex, True]) except Exception as e: # noqa: F841 self._log.debug( "findTxnByHash getrawtransaction failed: {}".format(txid_hex) ) return None if "confirmations" in rv and rv["confirmations"] >= self.blocks_confirmed: return {"txid": txid_hex, "amount": 0, "height": rv["height"]} return None def createRawFundedTransaction( self, addr_to: str, amount: int, sub_fee: bool = False, lock_unspents: bool = True, ) -> str: # Estimate lock tx size / fee # self.createSCLockTx vkbv = self.getNewRandomKey() ephemeral_key = self.getNewRandomKey() ephemeral_pubkey = self.getPubkey(ephemeral_key) assert len(ephemeral_pubkey) == 33 nonce = self.getScriptLockTxNonce(vkbv) inputs = [] outputs = [ { "type": "blind", "amount": self.format_amount(amount), "address": addr_to, "nonce": nonce.hex(), "data": ephemeral_pubkey.hex(), } ] params = [inputs, outputs] tx_hex = self.rpc_wallet("createrawparttransaction", params)["hex"] # self.fundSCLockTx tx_obj = self.rpc("decoderawtransaction", [tx_hex]) assert len(tx_obj["vout"]) == 1 txo = tx_obj["vout"][0] blinded_info = self.rpc( "rewindrangeproof", [txo["rangeproof"], txo["valueCommitment"], nonce.hex()] ) outputs_info = { 0: { "value": blinded_info["amount"], "blind": blinded_info["blind"], "nonce": nonce.hex(), } } options = { "lockUnspents": lock_unspents, "conf_target": self._conf_target, } if sub_fee: options["subtractFeeFromOutputs"] = [ 0, ] return self.rpc_wallet( "fundrawtransactionfrom", ["blind", tx_hex, {}, outputs_info, options] )["hex"] class PARTInterfaceAnon(PARTInterface): def interface_type(self) -> int: return Coins.PART_ANON @staticmethod def balance_type(): return BalanceTypes.ANON @staticmethod def est_lock_tx_vsize() -> int: return 1153 @staticmethod def xmr_swap_a_lock_spend_tx_vsize() -> int: raise ValueError("Not possible") @staticmethod def xmr_swap_b_lock_spend_tx_vsize() -> int: # TODO: Estimate with ringsize return 1153 @staticmethod def depth_spendable() -> int: return 12 def coin_name(self) -> str: return super().coin_name() + " Anon" def publishBLockTx( self, kbv: bytes, Kbs: bytes, output_amount: int, feerate: int, unlock_time: int = 0, ) -> bytes: Kbv = self.getPubkey(kbv) sx_addr = self.formatStealthAddress(Kbv, Kbs) # TODO: Fund from other balances params = [ "anon", "anon", [ {"address": sx_addr, "amount": self.format_amount(output_amount)}, ], "", "", self._anon_tx_ring_size, 1, False, {"conf_target": self._conf_target, "blind_watchonly_visible": True}, ] txid = self.rpc_wallet("sendtypeto", params) return bytes.fromhex(txid) def findTxB( self, kbv, Kbs, cb_swap_value, cb_block_confirmed, restore_height, bid_sender, check_amount: bool = True, ): Kbv = self.getPubkey(kbv) sx_addr = self.formatStealthAddress(Kbv, Kbs) self._log.debug("sx_addr: {}".format(sx_addr)) # Tx recipient must import the stealth address as watch only if bid_sender: cb_swap_value *= -1 else: addr_info = self.rpc_wallet("getaddressinfo", [sx_addr]) if not addr_info["iswatchonly"]: wif_scan_key = self.encodeKey(kbv) self.rpc_wallet("importstealthaddress", [wif_scan_key, Kbs.hex()]) self._log.info("Imported watch-only sx_addr: {}".format(sx_addr)) self._log.info( "Rescanning {} chain from height: {}".format( self.coin_name(), restore_height ) ) self.rpc_wallet("rescanblockchain", [restore_height]) params = [{"include_watchonly": True, "search": sx_addr}] txns = self.rpc_wallet("filtertransactions", params) if len(txns) == 1: tx = txns[0] assert ( tx["outputs"][0]["stealth_address"] == sx_addr ) # Should not be possible ensure(tx["outputs"][0]["type"] == "anon", "Output is not anon") if ( self.make_int(tx["outputs"][0]["amount"]) == cb_swap_value or check_amount is False ): height = 0 if tx["confirmations"] > 0: chain_height = self.rpc("getblockcount") height = chain_height - (tx["confirmations"] - 1) return {"txid": tx["txid"], "amount": cb_swap_value, "height": height} else: self._log.warning( "Incorrect amount detected for coin b lock txn: {}".format( tx["txid"] ) ) return -1 return None def spendBLockTx( self, chain_b_lock_txid: bytes, address_to: str, kbv: bytes, kbs: bytes, cb_swap_value: int, b_fee: int, restore_height: int, spend_actual_balance: bool = False, lock_tx_vout=None, ) -> bytes: Kbv = self.getPubkey(kbv) Kbs = self.getPubkey(kbs) sx_addr = self.formatStealthAddress(Kbv, Kbs) addr_info = self.rpc_wallet("getaddressinfo", [sx_addr]) if not addr_info["ismine"]: wif_scan_key = self.encodeKey(kbv) wif_spend_key = self.encodeKey(kbs) self.rpc_wallet("importstealthaddress", [wif_scan_key, wif_spend_key]) self._log.info("Imported spend key for sx_addr: {}".format(sx_addr)) self._log.info( "Rescanning {} chain from height: {}".format( self.coin_name(), restore_height ) ) self.rpc_wallet("rescanblockchain", [restore_height]) autxos = self.rpc_wallet("listunspentanon", [1, 9999999, [sx_addr]]) if len(autxos) < 1: raise TemporaryError("No spendable outputs") elif len(autxos) > 1: raise ValueError("Too many spendable outputs") utxo = autxos[0] utxo_sats = self.make_int(utxo["amount"]) if spend_actual_balance and utxo_sats != cb_swap_value: self._log.warning( "Spending actual balance {}, not swap value {}.".format( utxo_sats, cb_swap_value ) ) cb_swap_value = utxo_sats inputs = [ {"tx": utxo["txid"], "n": utxo["vout"]}, ] params = [ "anon", "anon", [ { "address": address_to, "amount": self.format_amount(cb_swap_value), "subfee": True, }, ], "", "", self._anon_tx_ring_size, 1, False, {"conf_target": self._conf_target, "inputs": inputs, "show_fee": True}, ] rv = self.rpc_wallet("sendtypeto", params) return bytes.fromhex(rv["txid"]) def findTxnByHash(self, txid_hex: str): # txindex is enabled for Particl try: rv = self.rpc("getrawtransaction", [txid_hex, True]) except Exception as e: # noqa: F841 self._log.debug( "findTxnByHash getrawtransaction failed: {}".format(txid_hex) ) return None if "confirmations" in rv and rv["confirmations"] >= self.blocks_confirmed: return {"txid": txid_hex, "amount": 0, "height": rv["height"]} return None def getSpendableBalance(self) -> int: return self.make_int(self.rpc_wallet("getbalances")["mine"]["anon_trusted"])