#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (c) 2021-2023 tecnovert # Copyright (c) 2024-2025 The Basicswap developers # Distributed under the MIT software license, see the accompanying # file LICENSE or http://www.opensource.org/licenses/mit-license.php. import random import logging import unittest from basicswap.basicswap import ( Coins, SwapTypes, BidStates, DebugTypes, ) from basicswap.basicswap_util import ( TxLockTypes, ) from basicswap.util import ( make_int, format_amount, ) from tests.basicswap.util import ( read_json_api, ) from tests.basicswap.common import ( wait_for_bid, wait_for_offer, wait_for_none_active, wait_for_balance, wait_for_unspent, ) from .test_xmr import BaseTest, test_delay_event logger = logging.getLogger() class Test(BaseTest): __test__ = True test_coin_from = Coins.PART_BLIND has_segwit = True @classmethod def setUpClass(cls): super(Test, cls).setUpClass() js_0 = read_json_api(1800, "wallets/part") node0_blind_before = js_0["blind_balance"] + js_0["blind_unconfirmed"] post_json = { "value": 100, "address": js_0["stealth_address"], "subfee": False, "type_to": "blind", } json_rv = read_json_api(1800, "wallets/part/withdraw", post_json) assert len(json_rv["txid"]) == 64 logging.info("Waiting for blind balance") wait_for_balance( test_delay_event, "http://127.0.0.1:1800/json/wallets/part", "blind_balance", 100.0 + node0_blind_before, ) js_0 = read_json_api(1800, "wallets/part") def ensure_balance(self, coin_type, node_id, amount): tla = "PART" js_w = read_json_api(1800 + node_id, "wallets") print("js_w", js_w) if float(js_w[tla]["blind_balance"]) < amount: post_json = { "value": amount, "type_to": "blind", "address": js_w[tla]["stealth_address"], "subfee": False, } json_rv = read_json_api( 1800, "wallets/{}/withdraw".format(tla.lower()), post_json ) assert len(json_rv["txid"]) == 64 wait_for_balance( test_delay_event, "http://127.0.0.1:{}/json/wallets/{}".format( 1800 + node_id, tla.lower() ), "blind_balance", amount, ) def getBalance(self, js_wallets): return float(js_wallets[Coins.PART.name]["blind_balance"]) + float( js_wallets[Coins.PART.name]["blind_unconfirmed"] ) def getXmrBalance(self, js_wallets): return float(js_wallets[Coins.XMR.name]["unconfirmed"]) + float( js_wallets[Coins.XMR.name]["balance"] ) def test_010_txn_size(self): logging.info("---------- Test {} txn_size".format(self.test_coin_from.name)) self.ensure_balance(self.test_coin_from, 0, 100.0) swap_clients = self.swap_clients ci = swap_clients[0].ci(self.test_coin_from) pi = swap_clients[0].pi(SwapTypes.XMR_SWAP) def wait_for_unspents(delay_event, iterations=20, delay_time=0.5): i = 0 while not delay_event.is_set(): unspents = ci.rpc_wallet("listunspentblind") if len(unspents) >= 1: return delay_event.wait(delay_time) i += 1 if i > iterations: raise ValueError("wait_for_unspents timed out") wait_for_unspents(test_delay_event) amount: int = ci.make_int(random.uniform(0.1, 2.0), r=1) # Record unspents before createSCLockTx as the used ones will be locked unspents = ci.rpc_wallet("listunspentblind") locked_utxos_before = ci.rpc_wallet("listlockunspent") # fee_rate is in sats/kvB fee_rate: int = 1000 vkbv = ci.getNewRandomKey() a = ci.getNewRandomKey() b = ci.getNewRandomKey() A = ci.getPubkey(a) B = ci.getPubkey(b) lock_tx_script = pi.genScriptLockTxScript(ci, A, B) lock_tx = ci.createSCLockTx(amount, lock_tx_script, vkbv) lock_tx = ci.fundSCLockTx(lock_tx, fee_rate, vkbv) lock_tx = ci.signTxWithWallet(lock_tx) unspents_after = ci.rpc_wallet("listunspentblind") locked_utxos_after = ci.rpc_wallet("listlockunspent") assert len(unspents) > len(unspents_after) assert len(locked_utxos_after) > len(locked_utxos_before) lock_tx_decoded = ci.rpc_wallet("decoderawtransaction", [lock_tx.hex()]) txid = lock_tx_decoded["txid"] vsize = lock_tx_decoded["vsize"] expect_fee_int = round(fee_rate * vsize / 1000) ci.rpc_wallet("sendrawtransaction", [lock_tx.hex()]) rv = ci.rpc_wallet("gettransaction", [txid]) wallet_tx_fee = -ci.make_int(rv["details"][0]["fee"]) assert wallet_tx_fee >= expect_fee_int assert wallet_tx_fee - expect_fee_int < 20 addr_out = ci.getNewAddress(True) addrinfo = ci.rpc_wallet( "getaddressinfo", [ addr_out, ], ) pk_out = bytes.fromhex(addrinfo["pubkey"]) fee_info = {} lock_spend_tx = ci.createSCLockSpendTx( lock_tx, lock_tx_script, pk_out, fee_rate, vkbv, fee_info=fee_info ) vsize_estimated: int = fee_info["vsize"] spend_tx_decoded = ci.rpc("decoderawtransaction", [lock_spend_tx.hex()]) txid = spend_tx_decoded["txid"] nonce = ci.getScriptLockTxNonce(vkbv) output_n, _ = ci.findOutputByNonce(lock_tx_decoded, nonce) assert output_n is not None valueCommitment = bytes.fromhex( lock_tx_decoded["vout"][output_n]["valueCommitment"] ) witness_stack = [ b"", ci.signTx(a, lock_spend_tx, 0, lock_tx_script, valueCommitment), ci.signTx(b, lock_spend_tx, 0, lock_tx_script, valueCommitment), lock_tx_script, ] lock_spend_tx = ci.setTxSignature(lock_spend_tx, witness_stack) tx_decoded = ci.rpc("decoderawtransaction", [lock_spend_tx.hex()]) vsize_actual: int = tx_decoded["vsize"] # Note: The fee is set allowing 9 bytes for the encoded fee amount, causing a small overestimate assert vsize_actual <= vsize_estimated and vsize_estimated - vsize_actual < 10 assert ci.rpc("sendrawtransaction", [lock_spend_tx.hex()]) == txid # Test chain b (no-script) lock tx size v = ci.getNewRandomKey() s = ci.getNewRandomKey() S = ci.getPubkey(s) lock_tx_b_txid = ci.publishBLockTx(v, S, amount, fee_rate) addr_out = ci.getNewStealthAddress() lock_tx_b_spend_txid = None for i in range(20): try: lock_tx_b_spend_txid = ci.spendBLockTx( lock_tx_b_txid, addr_out, v, s, amount, fee_rate, 0 ) break except Exception as e: print("spendBLockTx failed", str(e)) test_delay_event.wait(2) assert lock_tx_b_spend_txid is not None lock_tx_b_spend = ci.getTransaction(lock_tx_b_spend_txid) if lock_tx_b_spend is None: lock_tx_b_spend = ci.getWalletTransaction(lock_tx_b_spend_txid) lock_tx_b_spend_decoded = ci.rpc( "decoderawtransaction", [lock_tx_b_spend.hex()] ) expect_vsize: int = ci.xmr_swap_b_lock_spend_tx_vsize() assert expect_vsize >= lock_tx_b_spend_decoded["vsize"] assert expect_vsize - lock_tx_b_spend_decoded["vsize"] < 10 def test_01_part_xmr(self): logging.info("---------- Test PARTct to XMR") swap_clients = self.swap_clients js_0 = read_json_api(1800, "wallets/part") assert float(js_0["blind_balance"]) > 10.0 node0_blind_before = js_0["blind_balance"] + js_0["blind_unconfirmed"] js_1 = read_json_api(1801, "wallets/part") node1_blind_before = js_1["blind_balance"] + js_1["blind_unconfirmed"] js_1_xmr = read_json_api(1801, "wallets/xmr") amt_swap = make_int(random.uniform(0.1, 2.0), scale=8, r=1) rate_swap = make_int(random.uniform(0.2, 20.0), scale=12, r=1) offer_id = swap_clients[0].postOffer( Coins.PART_BLIND, Coins.XMR, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP, ) wait_for_offer(test_delay_event, swap_clients[1], offer_id) offers = swap_clients[0].listOffers(filters={"offer_id": offer_id}) offer = offers[0] bid_id = swap_clients[1].postXmrBid(offer_id, offer.amount_from) wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.BID_RECEIVED) swap_clients[0].acceptXmrBid(bid_id) wait_for_bid( test_delay_event, swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, wait_for=180, ) wait_for_bid( test_delay_event, swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, sent=True, ) amount_from = float(format_amount(amt_swap, 8)) js_1 = read_json_api(1801, "wallets/part") node1_blind_after = js_1["blind_balance"] + js_1["blind_unconfirmed"] assert node1_blind_after > node1_blind_before + (amount_from - 0.05) js_0 = read_json_api(1800, "wallets/part") node0_blind_after = js_0["blind_balance"] + js_0["blind_unconfirmed"] assert node0_blind_after < node0_blind_before - amount_from js_1_xmr_after = read_json_api(1801, "wallets/xmr") scale_from = 8 amount_to = int((amt_swap * rate_swap) // (10**scale_from)) amount_to_float = float(format_amount(amount_to, 12)) node1_xmr_after = float(js_1_xmr_after["unconfirmed"]) + float( js_1_xmr_after["balance"] ) node1_xmr_before = float(js_1_xmr["unconfirmed"]) + float(js_1_xmr["balance"]) assert node1_xmr_after > node1_xmr_before + (amount_to_float - 0.02) def test_02_leader_recover_a_lock_tx(self): logging.info("---------- Test PARTct to XMR leader recovers coin a lock tx") swap_clients = self.swap_clients js_w0_before = read_json_api(1800, "wallets") node0_blind_before = self.getBalance(js_w0_before) amt_swap = make_int(random.uniform(0.1, 2.0), scale=8, r=1) rate_swap = make_int(random.uniform(0.2, 20.0), scale=12, r=1) offer_id = swap_clients[0].postOffer( Coins.PART_BLIND, Coins.XMR, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP, lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=12, ) wait_for_offer(test_delay_event, swap_clients[1], offer_id) offer = swap_clients[1].getOffer(offer_id) bid_id = swap_clients[1].postXmrBid(offer_id, offer.amount_from) wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.BID_RECEIVED) bid, xmr_swap = swap_clients[0].getXmrBid(bid_id) assert xmr_swap swap_clients[1].setBidDebugInd(bid_id, DebugTypes.BID_STOP_AFTER_COIN_A_LOCK) swap_clients[0].acceptXmrBid(bid_id) wait_for_bid( test_delay_event, swap_clients[0], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, wait_for=180, ) wait_for_bid( test_delay_event, swap_clients[1], bid_id, [BidStates.BID_STALLED_FOR_TEST, BidStates.XMR_SWAP_FAILED], sent=True, ) js_w0_after = read_json_api(1800, "wallets") node0_blind_after = self.getBalance(js_w0_after) assert node0_blind_before - node0_blind_after < 0.02 def test_03_follower_recover_a_lock_tx(self): logging.info("---------- Test PARTct to XMR follower recovers coin a lock tx") swap_clients = self.swap_clients js_w1_before = read_json_api(1801, "wallets") amt_swap = make_int(random.uniform(0.1, 2.0), scale=8, r=1) rate_swap = make_int(random.uniform(0.2, 20.0), scale=12, r=1) offer_id = swap_clients[0].postOffer( Coins.PART_BLIND, Coins.XMR, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP, lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=32, ) wait_for_offer(test_delay_event, swap_clients[1], offer_id) offer = swap_clients[1].getOffer(offer_id) bid_id = swap_clients[1].postXmrBid(offer_id, offer.amount_from) wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.BID_RECEIVED) bid, xmr_swap = swap_clients[0].getXmrBid(bid_id) assert xmr_swap swap_clients[1].setBidDebugInd(bid_id, DebugTypes.CREATE_INVALID_COIN_B_LOCK) swap_clients[0].setBidDebugInd( bid_id, DebugTypes.BID_DONT_SPEND_COIN_A_LOCK_REFUND2 ) swap_clients[0].acceptXmrBid(bid_id) wait_for_bid( test_delay_event, swap_clients[0], bid_id, (BidStates.BID_STALLED_FOR_TEST, BidStates.XMR_SWAP_FAILED_SWIPED), wait_for=180, ) wait_for_bid( test_delay_event, swap_clients[1], bid_id, BidStates.XMR_SWAP_FAILED_SWIPED, wait_for=80, sent=True, ) js_w1_after = read_json_api(1801, "wallets") node1_blind_before = self.getBalance(js_w1_before) node1_blind_after = self.getBalance(js_w1_after) amount_from = float(format_amount(amt_swap, 8)) assert node1_blind_after - node1_blind_before > (amount_from - 0.02) swap_clients[0].abandonBid(bid_id) swap_clients[1].abandonBid(bid_id) wait_for_none_active(test_delay_event, 1800) wait_for_none_active(test_delay_event, 1801) offerer_key = read_json_api( 1800, "bids/{}".format(bid_id.hex()), {"chainbkeysplit": True} )["splitkey"] data = {"spendchainblocktx": True, "remote_key": offerer_key} redeemed_txid = read_json_api(1801, "bids/{}".format(bid_id.hex()), data)[ "txid" ] assert len(redeemed_txid) == 64 def do_test_04_follower_recover_b_lock_tx(self, coin_from, coin_to): logging.info( "---------- Test {} to {} follower recovers coin b lock tx".format( coin_from.name, coin_to.name ) ) swap_clients = self.swap_clients ci_from = swap_clients[0].ci(coin_from) ci_to = swap_clients[0].ci(coin_to) amt_swap = ci_from.make_int(random.uniform(0.1, 2.0), r=1) rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1) offer_id = swap_clients[0].postOffer( coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP, lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=28, ) wait_for_offer(test_delay_event, swap_clients[1], offer_id) offer = swap_clients[1].getOffer(offer_id) bid_id = swap_clients[1].postXmrBid(offer_id, offer.amount_from) wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.BID_RECEIVED) bid, xmr_swap = swap_clients[0].getXmrBid(bid_id) assert xmr_swap swap_clients[1].setBidDebugInd(bid_id, DebugTypes.CREATE_INVALID_COIN_B_LOCK) swap_clients[0].acceptXmrBid(bid_id) wait_for_bid( test_delay_event, swap_clients[0], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, wait_for=180, ) wait_for_bid( test_delay_event, swap_clients[1], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, sent=True, ) def test_04_follower_recover_b_lock_tx(self): js_w0_before = read_json_api(1800, "wallets") js_w1_before = read_json_api(1801, "wallets") self.do_test_04_follower_recover_b_lock_tx(self.test_coin_from, Coins.XMR) js_w0_after = read_json_api(1800, "wallets") js_w1_after = read_json_api(1801, "wallets") node0_blind_before = self.getBalance(js_w0_before) node0_blind_after = self.getBalance(js_w0_after) assert node0_blind_before - node0_blind_after < 0.02 node1_xmr_before = self.getXmrBalance(js_w1_before) node1_xmr_after = self.getXmrBalance(js_w1_after) assert node1_xmr_before - node1_xmr_after < 0.02 def test_04_follower_recover_b_lock_tx_from_part(self): self.ensure_balance(self.test_coin_from, 1, 50.0) self.do_test_04_follower_recover_b_lock_tx(Coins.PART, self.test_coin_from) def do_test_05_self_bid(self, coin_from, coin_to): logging.info( "---------- Test {} to {} same client".format(coin_from.name, coin_to.name) ) swap_clients = self.swap_clients ci_to = swap_clients[0].ci(coin_to) self.ensure_balance(coin_from, 1, 50.0) amt_swap = make_int(random.uniform(0.1, 2.0), scale=8, r=1) rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1) offer_id = swap_clients[1].postOffer( coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP, auto_accept_bids=True, ) bid_id = swap_clients[1].postXmrBid(offer_id, amt_swap) wait_for_bid( test_delay_event, swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, wait_for=180, ) def test_05_self_bid(self): if not self.has_segwit: return self.do_test_05_self_bid(self.test_coin_from, Coins.XMR) def test_05_self_bid_to_part(self): if not self.has_segwit: return self.do_test_05_self_bid(self.test_coin_from, Coins.PART) def test_05_self_bid_from_part(self): self.do_test_05_self_bid(Coins.PART, self.test_coin_from) def test_06_preselect_inputs(self): raise ValueError("TODO") tla_from = self.test_coin_from.name logging.info("---------- Test {} Preselected inputs".format(tla_from)) swap_clients = self.swap_clients # Prepare balance self.ensure_balance(self.test_coin_from, 2, 100.0) js_w2 = read_json_api(1802, "wallets") post_json = { "value": float(js_w2["PART"]["blind_balance"]), "type_from": "blind", "type_to": "blind", "address": js_w2["PART"]["stealth_address"], "subfee": True, } json_rv = read_json_api(1802, "wallets/{}/withdraw".format("part"), post_json) wait_for_balance( test_delay_event, "http://127.0.0.1:1802/json/wallets/{}".format("part"), "blind_balance", 10.0, ) assert len(json_rv["txid"]) == 64 # Create prefunded ITX ci = swap_clients[2].ci(self.test_coin_from) ci_to = swap_clients[2].ci(Coins.XMR) pi = swap_clients[2].pi(SwapTypes.XMR_SWAP) js_w2 = read_json_api(1802, "wallets") swap_value = ci.make_int(js_w2["PART"]["blind_balance"]) assert swap_value > ci.make_int(95) itx = pi.getFundedInitiateTxTemplate(ci, swap_value, True) itx_decoded = ci.describeTx(itx.hex()) n = pi.findMockVout(ci, itx_decoded) value_after_subfee = ci.make_int(itx_decoded["vout"][n]["value"]) assert value_after_subfee < swap_value swap_value = value_after_subfee wait_for_unspent(test_delay_event, ci, swap_value) extra_options = {"prefunded_itx": itx} rate_swap = ci_to.make_int(random.uniform(0.2, 20.0)) offer_id = swap_clients[2].postOffer( self.test_coin_from, Coins.XMR, swap_value, rate_swap, swap_value, SwapTypes.XMR_SWAP, extra_options=extra_options, ) wait_for_offer(test_delay_event, swap_clients[1], offer_id) offer = swap_clients[1].getOffer(offer_id) bid_id = swap_clients[1].postBid(offer_id, offer.amount_from) wait_for_bid(test_delay_event, swap_clients[2], bid_id, BidStates.BID_RECEIVED) swap_clients[2].acceptBid(bid_id) wait_for_bid( test_delay_event, swap_clients[2], bid_id, BidStates.SWAP_COMPLETED, wait_for=120, ) wait_for_bid( test_delay_event, swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, sent=True, wait_for=120, ) # Verify expected inputs were used bid, _, _, _, _ = swap_clients[2].getXmrBidAndOffer(bid_id) assert bid.xmr_a_lock_tx wtx = ci.rpc_wallet( "gettransaction", [ bid.xmr_a_lock_tx.txid.hex(), ], ) itx_after = ci.describeTx(wtx["hex"]) assert len(itx_after["vin"]) == len(itx_decoded["vin"]) for i, txin in enumerate(itx_decoded["vin"]): txin_after = itx_after["vin"][i] assert txin["txid"] == txin_after["txid"] assert txin["vout"] == txin_after["vout"] if __name__ == "__main__": unittest.main()