Add p2sh-p2wsh support, add Navcoin tests.

This commit is contained in:
tecnovert
2023-08-29 22:06:16 +02:00
parent 45e49848b1
commit 0b963bffde
23 changed files with 5703 additions and 192 deletions

View File

@@ -44,6 +44,7 @@ from basicswap.contrib.test_framework.messages import (
)
from basicswap.contrib.test_framework.script import (
CScript,
OP_EQUAL,
OP_CHECKLOCKTIMEVERIFY,
OP_CHECKSEQUENCEVERIFY,
)
@@ -54,6 +55,10 @@ logger = logging.getLogger()
class TestFunctions(BaseTest):
base_rpc_port = None
extra_wait_time = 0
node_a_id = 0
node_b_id = 1
def getBalance(self, js_wallets, coin) -> float:
if coin == Coins.PART_BLIND:
@@ -64,6 +69,9 @@ class TestFunctions(BaseTest):
coin_ticker: str = 'PART'
balance_type: str = 'anon_balance'
unconfirmed_name: str = 'anon_pending'
elif coin == Coins.NAV:
coin_wallet = js_wallets[coin.name]
return float(coin_wallet['balance']) + float(coin_wallet['unconfirmed']) + float(coin_wallet['immature'])
else:
coin_ticker: str = coin.name
balance_type: str = 'balance'
@@ -77,7 +85,9 @@ class TestFunctions(BaseTest):
def mineBlock(self, num_blocks=1):
self.callnoderpc('generatetoaddress', [num_blocks, self.btc_addr])
def prepare_balance(self, coin, amount: float, port_target_node: int, port_take_from_node: int) -> None:
def prepare_balance(self, coin, amount: float, port_target_node: int, port_take_from_node: int, test_balance: bool = True) -> None:
delay_iterations = 100 if coin == Coins.NAV else 20
delay_time = 5 if coin == Coins.NAV else 3
if coin == Coins.PART_BLIND:
coin_ticker: str = 'PART'
balance_type: str = 'blind_balance'
@@ -93,7 +103,8 @@ class TestFunctions(BaseTest):
balance_type: str = 'balance'
address_type: str = 'deposit_address'
js_w = read_json_api(port_target_node, 'wallets')
if float(js_w[coin_ticker][balance_type]) >= amount:
current_balance: float = float(js_w[coin_ticker][balance_type])
if test_balance and current_balance >= amount:
return
post_json = {
'value': amount,
@@ -104,33 +115,38 @@ class TestFunctions(BaseTest):
post_json['type_to'] = type_to
json_rv = read_json_api(port_take_from_node, 'wallets/{}/withdraw'.format(coin_ticker.lower()), post_json)
assert (len(json_rv['txid']) == 64)
wait_for_balance(test_delay_event, 'http://127.0.0.1:{}/json/wallets/{}'.format(port_target_node, coin_ticker.lower()), balance_type, amount)
wait_for_amount: float = amount
if not test_balance:
wait_for_amount += current_balance
wait_for_balance(test_delay_event, 'http://127.0.0.1:{}/json/wallets/{}'.format(port_target_node, coin_ticker.lower()), balance_type, wait_for_amount, iterations=delay_iterations, delay_time=delay_time)
def do_test_01_full_swap(self, coin_from: Coins, coin_to: Coins) -> None:
logging.info('---------- Test {} to {}'.format(coin_from.name, coin_to.name))
swap_clients = self.swap_clients
reverse_bid: bool = coin_from in swap_clients[0].scriptless_coins
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[1].ci(coin_to)
ci_part0 = swap_clients[0].ci(Coins.PART)
ci_part1 = swap_clients[1].ci(Coins.PART)
# Offerer sends the offer
# Bidder sends the bid
id_offerer: int = 0
id_bidder: int = 1
id_offerer: int = self.node_a_id
id_bidder: int = self.node_b_id
swap_clients = self.swap_clients
reverse_bid: bool = coin_from in swap_clients[id_offerer].scriptless_coins
ci_from = swap_clients[id_offerer].ci(coin_from)
ci_to = swap_clients[id_bidder].ci(coin_to)
ci_part0 = swap_clients[id_offerer].ci(Coins.PART)
ci_part1 = swap_clients[id_bidder].ci(Coins.PART)
self.prepare_balance(coin_from, 100.0, 1800 + id_offerer, 1801 if reverse_bid else 1800)
# Leader sends the initial (chain a) lock tx.
# Follower sends the participate (chain b) lock tx.
id_leader: int = 1 if reverse_bid else 0
id_follower: int = 0 if reverse_bid else 1
id_leader: int = id_bidder if reverse_bid else id_offerer
id_follower: int = id_offerer if reverse_bid else id_bidder
logging.info(f'Offerer, bidder, leader, follower: {id_offerer}, {id_bidder}, {id_leader}, {id_follower}')
js_0 = read_json_api(1800, 'wallets')
js_0 = read_json_api(1800 + id_offerer, 'wallets')
node0_from_before: float = self.getBalance(js_0, coin_from)
js_1 = read_json_api(1801, 'wallets')
js_1 = read_json_api(1800 + id_bidder, 'wallets')
node1_from_before: float = self.getBalance(js_1, coin_from)
node0_sent_messages_before: int = ci_part0.rpc_callback('smsgoutbox', ['count',])['num_messages']
@@ -144,20 +160,20 @@ class TestFunctions(BaseTest):
assert (offer.offer_id == offer_id)
post_json = {'with_extra_info': True}
offer0 = read_json_api(1800, f'offers/{offer_id.hex()}', post_json)[0]
offer1 = read_json_api(1800, f'offers/{offer_id.hex()}', post_json)[0]
offer0 = read_json_api(1800 + id_offerer, f'offers/{offer_id.hex()}', post_json)[0]
offer1 = read_json_api(1800 + id_offerer, f'offers/{offer_id.hex()}', post_json)[0]
from basicswap.util import dumpj
logging.info('offer0 {} '.format(dumpj(offer0)))
logging.info('offer1 {} '.format(dumpj(offer1)))
assert ('lock_time_1' in offer0)
assert ('lock_time_1' in offer1)
bid_id = swap_clients[1].postXmrBid(offer_id, offer.amount_from)
bid_id = swap_clients[id_bidder].postXmrBid(offer_id, offer.amount_from)
wait_for_bid(test_delay_event, swap_clients[id_offerer], bid_id, BidStates.BID_RECEIVED)
bid0 = read_json_api(1800, f'bids/{bid_id.hex()}')
bid1 = read_json_api(1801, f'bids/{bid_id.hex()}')
bid0 = read_json_api(1800 + id_offerer, f'bids/{bid_id.hex()}')
bid1 = read_json_api(1800 + id_bidder, f'bids/{bid_id.hex()}')
tolerance = 20 if reverse_bid else 0
assert (bid0['ticker_from'] == ci_from.ticker())
@@ -172,7 +188,7 @@ class TestFunctions(BaseTest):
assert (bid1['reverse_bid'] == reverse_bid)
found: bool = False
bids0 = read_json_api(1800, 'bids')
bids0 = read_json_api(1800 + id_offerer, 'bids')
logging.info('bids0 {} '.format(bids0))
for bid in bids0:
logging.info('bid {} '.format(bid))
@@ -186,16 +202,16 @@ class TestFunctions(BaseTest):
swap_clients[id_offerer].acceptBid(bid_id)
wait_for_bid(test_delay_event, swap_clients[id_offerer], bid_id, BidStates.SWAP_COMPLETED, wait_for=180)
wait_for_bid(test_delay_event, swap_clients[id_bidder], bid_id, BidStates.SWAP_COMPLETED, sent=True)
wait_for_bid(test_delay_event, swap_clients[id_offerer], bid_id, BidStates.SWAP_COMPLETED, wait_for=(self.extra_wait_time + 180))
wait_for_bid(test_delay_event, swap_clients[id_bidder], bid_id, BidStates.SWAP_COMPLETED, sent=True, wait_for=(self.extra_wait_time + 30))
amount_from = float(ci_from.format_amount(amt_swap))
js_1_after = read_json_api(1801, 'wallets')
js_1_after = read_json_api(1800 + id_bidder, 'wallets')
node1_from_after = self.getBalance(js_1_after, coin_from)
if coin_from is not Coins.PART: # TODO: staking
assert (node1_from_after > node1_from_before + (amount_from - 0.05))
js_0_after = read_json_api(1800, 'wallets')
js_0_after = read_json_api(1800 + id_offerer, 'wallets')
node0_from_after: float = self.getBalance(js_0_after, coin_from)
# TODO: Discard block rewards
# assert (node0_from_after < node0_from_before - amount_from)
@@ -217,8 +233,8 @@ class TestFunctions(BaseTest):
assert (node1_sent_messages == (4 + split_msgs if reverse_bid else 2 + split_msgs))
post_json = {'show_extra': True}
bid0 = read_json_api(1800, f'bids/{bid_id.hex()}', post_json)
bid1 = read_json_api(1801, f'bids/{bid_id.hex()}', post_json)
bid0 = read_json_api(1800 + id_offerer, f'bids/{bid_id.hex()}', post_json)
bid1 = read_json_api(1800 + id_bidder, f'bids/{bid_id.hex()}', post_json)
logging.info('bid0 {} '.format(dumpj(bid0)))
logging.info('bid1 {} '.format(dumpj(bid1)))
@@ -237,18 +253,19 @@ class TestFunctions(BaseTest):
assert (chain_a_lock_txid is not None)
assert (chain_b_lock_txid is not None)
def do_test_02_leader_recover_a_lock_tx(self, coin_from: Coins, coin_to: Coins) -> None:
def do_test_02_leader_recover_a_lock_tx(self, coin_from: Coins, coin_to: Coins, lock_value: int = 32) -> None:
logging.info('---------- Test {} to {} leader recovers coin a lock tx'.format(coin_from.name, coin_to.name))
swap_clients = self.swap_clients
reverse_bid: bool = coin_from in swap_clients[0].scriptless_coins
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[0].ci(coin_to)
id_offerer: int = self.node_a_id
id_bidder: int = self.node_b_id
id_offerer: int = 0
id_bidder: int = 1
id_leader: int = 1 if reverse_bid else 0
id_follower: int = 0 if reverse_bid else 1
swap_clients = self.swap_clients
reverse_bid: bool = coin_from in swap_clients[id_offerer].scriptless_coins
ci_from = swap_clients[id_offerer].ci(coin_from)
ci_to = swap_clients[id_offerer].ci(coin_to)
id_leader: int = id_bidder if reverse_bid else id_offerer
id_follower: int = id_offerer if reverse_bid else id_bidder
logging.info(f'Offerer, bidder, leader, follower: {id_offerer}, {id_bidder}, {id_leader}, {id_follower}')
js_wl_before = read_json_api(1800 + id_leader, 'wallets')
@@ -258,7 +275,7 @@ class TestFunctions(BaseTest):
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[id_offerer].postOffer(
coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP,
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=32)
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=lock_value)
wait_for_offer(test_delay_event, swap_clients[id_bidder], offer_id)
offer = swap_clients[id_bidder].getOffer(offer_id)
@@ -269,8 +286,8 @@ class TestFunctions(BaseTest):
swap_clients[id_offerer].acceptBid(bid_id)
leader_sent_bid: bool = True if reverse_bid else False
wait_for_bid(test_delay_event, swap_clients[id_leader], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, sent=leader_sent_bid, wait_for=180)
wait_for_bid(test_delay_event, swap_clients[id_follower], bid_id, [BidStates.BID_STALLED_FOR_TEST, BidStates.XMR_SWAP_FAILED], sent=(not leader_sent_bid))
wait_for_bid(test_delay_event, swap_clients[id_leader], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, sent=leader_sent_bid, wait_for=(self.extra_wait_time + 180))
wait_for_bid(test_delay_event, swap_clients[id_follower], bid_id, [BidStates.BID_STALLED_FOR_TEST, BidStates.XMR_SWAP_FAILED], sent=(not leader_sent_bid), wait_for=(self.extra_wait_time + 30))
js_wl_after = read_json_api(1800 + id_leader, 'wallets')
wl_from_after = self.getBalance(js_wl_after, coin_from)
@@ -278,32 +295,33 @@ class TestFunctions(BaseTest):
# TODO: Discard block rewards
# assert (node0_from_before - node0_from_after < 0.02)
def do_test_03_follower_recover_a_lock_tx(self, coin_from, coin_to, lock_value=32):
def do_test_03_follower_recover_a_lock_tx(self, coin_from, coin_to, lock_value: int = 32):
logging.info('---------- Test {} to {} follower recovers coin a lock tx'.format(coin_from.name, coin_to.name))
# Leader is too slow to recover the coin a lock tx and follower swipes it
# coin b lock tx remains unspent
swap_clients = self.swap_clients
reverse_bid: bool = coin_from in swap_clients[0].scriptless_coins
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[0].ci(coin_to)
id_offerer: int = self.node_a_id
id_bidder: int = self.node_b_id
id_offerer: int = 0
id_bidder: int = 1
id_leader: int = 1 if reverse_bid else 0
id_follower: int = 0 if reverse_bid else 1
swap_clients = self.swap_clients
reverse_bid: bool = coin_from in swap_clients[id_offerer].scriptless_coins
ci_from = swap_clients[id_offerer].ci(coin_from)
ci_to = swap_clients[id_offerer].ci(coin_to)
id_leader: int = id_bidder if reverse_bid else id_offerer
id_follower: int = id_offerer if reverse_bid else id_bidder
logging.info(f'Offerer, bidder, leader, follower: {id_offerer}, {id_bidder}, {id_leader}, {id_follower}')
js_w0_before = read_json_api(1800, 'wallets')
js_w1_before = read_json_api(1801, 'wallets')
js_w0_before = read_json_api(1800 + id_offerer, 'wallets')
js_w1_before = read_json_api(1800 + id_bidder, 'wallets')
amt_swap = ci_from.make_int(random.uniform(0.1, 2.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[id_offerer].postOffer(
coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP,
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=lock_value)
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
wait_for_offer(test_delay_event, swap_clients[id_bidder], offer_id)
offer = swap_clients[id_bidder].getOffer(offer_id)
bid_id = swap_clients[id_bidder].postXmrBid(offer_id, offer.amount_from)
@@ -315,10 +333,10 @@ class TestFunctions(BaseTest):
swap_clients[id_offerer].acceptBid(bid_id)
leader_sent_bid: bool = True if reverse_bid else False
wait_for_bid(test_delay_event, swap_clients[id_leader], bid_id, BidStates.BID_STALLED_FOR_TEST, wait_for=180, sent=leader_sent_bid)
wait_for_bid(test_delay_event, swap_clients[id_follower], bid_id, BidStates.XMR_SWAP_FAILED_SWIPED, wait_for=80, sent=(not leader_sent_bid))
wait_for_bid(test_delay_event, swap_clients[id_leader], bid_id, BidStates.BID_STALLED_FOR_TEST, wait_for=(self.extra_wait_time + 180), sent=leader_sent_bid)
wait_for_bid(test_delay_event, swap_clients[id_follower], bid_id, BidStates.XMR_SWAP_FAILED_SWIPED, wait_for=(self.extra_wait_time + 80), sent=(not leader_sent_bid))
js_w1_after = read_json_api(1801, 'wallets')
js_w1_after = read_json_api(1800 + id_bidder, 'wallets')
node1_from_before = self.getBalance(js_w1_before, coin_from)
node1_from_after = self.getBalance(js_w1_after, coin_from)
@@ -326,33 +344,36 @@ class TestFunctions(BaseTest):
# TODO: Discard block rewards
# assert (node1_from_after - node1_from_before > (amount_from - 0.02))
swap_clients[0].abandonBid(bid_id)
swap_clients[id_offerer].abandonBid(bid_id)
wait_for_none_active(test_delay_event, 1800)
wait_for_none_active(test_delay_event, 1801)
wait_for_none_active(test_delay_event, 1800 + id_offerer)
wait_for_none_active(test_delay_event, 1800 + id_bidder)
def do_test_04_follower_recover_b_lock_tx(self, coin_from, coin_to):
def do_test_04_follower_recover_b_lock_tx(self, coin_from, coin_to, lock_value: int = 32):
logging.info('---------- Test {} to {} follower recovers coin b lock tx'.format(coin_from.name, coin_to.name))
swap_clients = self.swap_clients
reverse_bid: bool = coin_from in swap_clients[0].scriptless_coins
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[0].ci(coin_to)
id_offerer: int = self.node_a_id
id_bidder: int = self.node_b_id
id_offerer: int = 0
id_bidder: int = 1
id_leader: int = 1 if reverse_bid else 0
id_follower: int = 0 if reverse_bid else 1
swap_clients = self.swap_clients
reverse_bid: bool = coin_from in swap_clients[id_offerer].scriptless_coins
ci_from = swap_clients[id_offerer].ci(coin_from)
ci_to = swap_clients[id_offerer].ci(coin_to)
id_offerer: int = id_offerer
id_bidder: int = id_bidder
id_leader: int = id_bidder if reverse_bid else id_offerer
id_follower: int = id_offerer if reverse_bid else id_bidder
logging.info(f'Offerer, bidder, leader, follower: {id_offerer}, {id_bidder}, {id_leader}, {id_follower}')
js_w0_before = read_json_api(1800, 'wallets')
js_w1_before = read_json_api(1801, 'wallets')
js_w0_before = read_json_api(1800 + id_offerer, 'wallets')
js_w1_before = read_json_api(1800 + id_bidder, 'wallets')
amt_swap = ci_from.make_int(random.uniform(0.1, 2.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[id_offerer].postOffer(
coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP,
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=32)
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=lock_value)
wait_for_offer(test_delay_event, swap_clients[id_bidder], offer_id)
offer = swap_clients[id_bidder].getOffer(offer_id)
@@ -363,11 +384,11 @@ class TestFunctions(BaseTest):
swap_clients[id_offerer].acceptBid(bid_id)
leader_sent_bid: bool = True if reverse_bid else False
wait_for_bid(test_delay_event, swap_clients[id_leader], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, wait_for=200, sent=leader_sent_bid)
wait_for_bid(test_delay_event, swap_clients[id_follower], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, sent=(not leader_sent_bid))
wait_for_bid(test_delay_event, swap_clients[id_leader], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, wait_for=(self.extra_wait_time + 200), sent=leader_sent_bid)
wait_for_bid(test_delay_event, swap_clients[id_follower], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, sent=(not leader_sent_bid), wait_for=(self.extra_wait_time + 30))
js_w0_after = read_json_api(1800, 'wallets')
js_w1_after = read_json_api(1801, 'wallets')
js_w0_after = read_json_api(1800 + id_offerer, 'wallets')
js_w1_after = read_json_api(1800 + id_bidder, 'wallets')
node0_from_before = self.getBalance(js_w0_before, coin_from)
node0_from_after = self.getBalance(js_w0_after, coin_from)
@@ -392,22 +413,26 @@ class TestFunctions(BaseTest):
def do_test_05_self_bid(self, coin_from, coin_to):
logging.info('---------- Test {} to {} same client'.format(coin_from.name, coin_to.name))
id_offerer: int = self.node_a_id
id_bidder: int = self.node_b_id
swap_clients = self.swap_clients
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[0].ci(coin_to)
ci_from = swap_clients[id_offerer].ci(coin_from)
ci_to = swap_clients[id_offerer].ci(coin_to)
amt_swap = ci_from.make_int(random.uniform(0.1, 2.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[1].postOffer(coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP, auto_accept_bids=True)
bid_id = swap_clients[1].postXmrBid(offer_id, amt_swap)
offer_id = swap_clients[id_bidder].postOffer(coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP, auto_accept_bids=True)
bid_id = swap_clients[id_bidder].postXmrBid(offer_id, amt_swap)
wait_for_bid(test_delay_event, swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, wait_for=180)
wait_for_bid(test_delay_event, swap_clients[id_bidder], bid_id, BidStates.SWAP_COMPLETED, wait_for=(self.extra_wait_time + 180))
class BasicSwapTest(TestFunctions):
def test_001_nested_segwit(self):
# p2sh-p2wpkh
logging.info('---------- Test {} p2sh nested segwit'.format(self.test_coin_from.name))
addr_p2sh_segwit = self.callnoderpc('getnewaddress', ['segwit test', 'p2sh-segwit'])
@@ -446,6 +471,7 @@ class BasicSwapTest(TestFunctions):
assert txid_with_scriptsig == tx_signed_decoded['txid']
def test_002_native_segwit(self):
# p2wpkh
logging.info('---------- Test {} p2sh native segwit'.format(self.test_coin_from.name))
addr_segwit = self.callnoderpc('getnewaddress', ['segwit test', 'bech32'])
@@ -542,6 +568,10 @@ class BasicSwapTest(TestFunctions):
pkh = ci.decodeSegwitAddress(addr_out)
script_out = ci.getScriptForPubkeyHash(pkh)
# Double check output type
prev_tx = self.callnoderpc('decoderawtransaction', [tx_signed, ])
assert (prev_tx['vout'][utxo_pos]['scriptPubKey']['type'] == 'witness_v0_scripthash')
tx_spend = CTransaction()
tx_spend.nVersion = ci.txVersion()
tx_spend.vin.append(CTxIn(COutPoint(int(txid, 16), utxo_pos),
@@ -567,6 +597,10 @@ class BasicSwapTest(TestFunctions):
sum_addr += entry['amount']
assert (sum_addr == 1.0999)
# Ensure tx was mined
tx_wallet = self.callnoderpc('gettransaction', [txid, ])
assert (len(tx_wallet['blockhash']) == 64)
def test_005_watchonly(self):
logging.info('---------- Test {} watchonly'.format(self.test_coin_from.name))
@@ -709,6 +743,102 @@ class BasicSwapTest(TestFunctions):
assert (expect_vsize >= lock_tx_b_spend_decoded['vsize'])
assert (expect_vsize - lock_tx_b_spend_decoded['vsize'] < 10)
def test_011_p2sh(self):
# Not used in bsx for native-segwit coins
logging.info('---------- Test {} p2sh'.format(self.test_coin_from.name))
swap_clients = self.swap_clients
ci = self.swap_clients[0].ci(self.test_coin_from)
script = CScript([2, 2, OP_EQUAL, ])
script_dest = ci.get_p2sh_script_pubkey(script)
tx = CTransaction()
tx.nVersion = ci.txVersion()
tx.vout.append(ci.txoType()(ci.make_int(1.1), script_dest))
tx_hex = ToHex(tx)
tx_funded = self.callnoderpc('fundrawtransaction', [tx_hex])
utxo_pos = 0 if tx_funded['changepos'] == 1 else 1
tx_signed = self.callnoderpc('signrawtransactionwithwallet', [tx_funded['hex'], ])['hex']
txid = self.callnoderpc('sendrawtransaction', [tx_signed, ])
addr_out = self.callnoderpc('getnewaddress', ['csv test', 'bech32'])
pkh = ci.decodeSegwitAddress(addr_out)
script_out = ci.getScriptForPubkeyHash(pkh)
# Double check output type
prev_tx = self.callnoderpc('decoderawtransaction', [tx_signed, ])
assert (prev_tx['vout'][utxo_pos]['scriptPubKey']['type'] == 'scripthash')
tx_spend = CTransaction()
tx_spend.nVersion = ci.txVersion()
tx_spend.vin.append(CTxIn(COutPoint(int(txid, 16), utxo_pos),
scriptSig=CScript([script,])))
tx_spend.vout.append(ci.txoType()(ci.make_int(1.0999), script_out))
tx_spend_hex = ToHex(tx_spend)
txid = self.callnoderpc('sendrawtransaction', [tx_spend_hex, ])
self.mineBlock(1)
ro = self.callnoderpc('listreceivedbyaddress', [0, ])
sum_addr = 0
for entry in ro:
if entry['address'] == addr_out:
sum_addr += entry['amount']
assert (sum_addr == 1.0999)
# Ensure tx was mined
tx_wallet = self.callnoderpc('gettransaction', [txid, ])
assert (len(tx_wallet['blockhash']) == 64)
def test_012_p2sh_p2wsh(self):
# Not used in bsx for native-segwit coins
logging.info('---------- Test {} p2sh-p2wsh'.format(self.test_coin_from.name))
swap_clients = self.swap_clients
ci = self.swap_clients[0].ci(self.test_coin_from)
script = CScript([2, 2, OP_EQUAL, ])
script_dest = ci.getP2SHP2WSHDest(script)
tx = CTransaction()
tx.nVersion = ci.txVersion()
tx.vout.append(ci.txoType()(ci.make_int(1.1), script_dest))
tx_hex = ToHex(tx)
tx_funded = self.callnoderpc('fundrawtransaction', [tx_hex])
utxo_pos = 0 if tx_funded['changepos'] == 1 else 1
tx_signed = self.callnoderpc('signrawtransactionwithwallet', [tx_funded['hex'], ])['hex']
txid = self.callnoderpc('sendrawtransaction', [tx_signed, ])
addr_out = self.callnoderpc('getnewaddress', ['csv test', 'bech32'])
pkh = ci.decodeSegwitAddress(addr_out)
script_out = ci.getScriptForPubkeyHash(pkh)
# Double check output type
prev_tx = self.callnoderpc('decoderawtransaction', [tx_signed, ])
assert (prev_tx['vout'][utxo_pos]['scriptPubKey']['type'] == 'scripthash')
tx_spend = CTransaction()
tx_spend.nVersion = ci.txVersion()
tx_spend.vin.append(CTxIn(COutPoint(int(txid, 16), utxo_pos),
scriptSig=ci.getP2SHP2WSHScriptSig(script)))
tx_spend.vout.append(ci.txoType()(ci.make_int(1.0999), script_out))
tx_spend.wit.vtxinwit.append(CTxInWitness())
tx_spend.wit.vtxinwit[0].scriptWitness.stack = [script, ]
tx_spend_hex = ToHex(tx_spend)
txid = self.callnoderpc('sendrawtransaction', [tx_spend_hex, ])
self.mineBlock(1)
ro = self.callnoderpc('listreceivedbyaddress', [0, ])
sum_addr = 0
for entry in ro:
if entry['address'] == addr_out:
sum_addr += entry['amount']
assert (sum_addr == 1.0999)
# Ensure tx was mined
tx_wallet = self.callnoderpc('gettransaction', [txid, ])
assert (len(tx_wallet['blockhash']) == 64)
def test_01_a_full_swap(self):
if not self.has_segwit:
return