coins: Add LTC MWEB wallet

This commit is contained in:
tecnovert
2023-12-29 15:36:00 +02:00
parent 7547587d4e
commit 38fa498b0b
29 changed files with 987 additions and 517 deletions

View File

@@ -672,7 +672,7 @@ class Test(unittest.TestCase):
# Verify expected inputs were used
bid, offer = swap_clients[2].getBidAndOffer(bid_id)
assert (bid.initiate_tx)
wtx = ci_from.rpc_callback('gettransaction', [bid.initiate_tx.txid.hex(),])
wtx = ci_from.rpc_wallet('gettransaction', [bid.initiate_tx.txid.hex(),])
itx_after = ci_from.describeTx(wtx['hex'])
assert (len(itx_after['vin']) == len(itx_decoded['vin']))
for i, txin in enumerate(itx_decoded['vin']):

View File

@@ -902,7 +902,7 @@ class Test(TestFunctions):
# Verify expected inputs were used
bid, offer = swap_clients[2].getBidAndOffer(bid_id)
assert (bid.initiate_tx)
wtx = ci_from.rpc_callback('gettransaction', [bid.initiate_tx.txid.hex(),])
wtx = ci_from.rpc('gettransaction', [bid.initiate_tx.txid.hex(),])
itx_after = ci_from.describeTx(wtx['hex'])
assert (len(itx_after['vin']) == len(itx_decoded['vin']))
for i, txin in enumerate(itx_decoded['vin']):

View File

@@ -677,7 +677,7 @@ class Test(unittest.TestCase):
# Verify expected inputs were used
bid, offer = swap_clients[2].getBidAndOffer(bid_id)
assert (bid.initiate_tx)
wtx = ci_from.rpc_callback('gettransaction', [bid.initiate_tx.txid.hex(),])
wtx = ci_from.rpc('gettransaction', [bid.initiate_tx.txid.hex(),])
itx_after = ci_from.describeTx(wtx['hex'])
assert (len(itx_after['vin']) == len(itx_decoded['vin']))
for i, txin in enumerate(itx_decoded['vin']):

View File

@@ -60,6 +60,16 @@ class TestFunctions(BaseTest):
node_a_id = 0
node_b_id = 1
def callnoderpc(self, method, params=[], wallet=None, node_id=0):
return callnoderpc(node_id, method, params, wallet, self.base_rpc_port)
def mineBlock(self, num_blocks=1):
self.callnoderpc('generatetoaddress', [num_blocks, self.btc_addr])
def check_softfork_active(self, feature_name):
deploymentinfo = self.callnoderpc('getdeploymentinfo')
assert (deploymentinfo['deployments'][feature_name]['active'] is True)
def getBalance(self, js_wallets, coin) -> float:
if coin == Coins.PART_BLIND:
coin_ticker: str = 'PART'
@@ -79,12 +89,6 @@ class TestFunctions(BaseTest):
return float(js_wallets[coin_ticker][balance_type]) + float(js_wallets[coin_ticker][unconfirmed_name])
def callnoderpc(self, method, params=[], wallet=None, node_id=0):
return callnoderpc(node_id, method, params, wallet, self.base_rpc_port)
def mineBlock(self, num_blocks=1):
self.callnoderpc('generatetoaddress', [num_blocks, self.btc_addr])
def prepare_balance(self, coin, amount: float, port_target_node: int, port_take_from_node: int, test_balance: bool = True) -> None:
delay_iterations = 100 if coin == Coins.NAV else 20
delay_time = 5 if coin == Coins.NAV else 3
@@ -149,8 +153,8 @@ class TestFunctions(BaseTest):
js_1 = read_json_api(1800 + id_bidder, 'wallets')
node1_from_before: float = self.getBalance(js_1, coin_from)
node0_sent_messages_before: int = ci_part0.rpc_callback('smsgoutbox', ['count',])['num_messages']
node1_sent_messages_before: int = ci_part1.rpc_callback('smsgoutbox', ['count',])['num_messages']
node0_sent_messages_before: int = ci_part0.rpc('smsgoutbox', ['count',])['num_messages']
node1_sent_messages_before: int = ci_part1.rpc('smsgoutbox', ['count',])['num_messages']
amt_swap = ci_from.make_int(random.uniform(0.1, 2.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
@@ -224,8 +228,8 @@ class TestFunctions(BaseTest):
if False: # TODO: set stakeaddress and xmr rewards to non wallet addresses
assert (node1_to_after < node1_to_before - amount_to_float)
node0_sent_messages_after: int = ci_part0.rpc_callback('smsgoutbox', ['count',])['num_messages']
node1_sent_messages_after: int = ci_part1.rpc_callback('smsgoutbox', ['count',])['num_messages']
node0_sent_messages_after: int = ci_part0.rpc('smsgoutbox', ['count',])['num_messages']
node1_sent_messages_after: int = ci_part1.rpc('smsgoutbox', ['count',])['num_messages']
node0_sent_messages: int = node0_sent_messages_after - node0_sent_messages_before
node1_sent_messages: int = node1_sent_messages_after - node1_sent_messages_before
split_msgs: int = 2 if (ci_from.curve_type() != Curves.secp256k1 or ci_to.curve_type() != Curves.secp256k1) else 0
@@ -434,21 +438,22 @@ class BasicSwapTest(TestFunctions):
def test_001_nested_segwit(self):
# p2sh-p2wpkh
logging.info('---------- Test {} p2sh nested segwit'.format(self.test_coin_from.name))
ci = self.swap_clients[0].ci(self.test_coin_from)
addr_p2sh_segwit = self.callnoderpc('getnewaddress', ['segwit test', 'p2sh-segwit'])
addr_info = self.callnoderpc('getaddressinfo', [addr_p2sh_segwit, ])
addr_p2sh_segwit = ci.rpc_wallet('getnewaddress', ['segwit test', 'p2sh-segwit'])
addr_info = ci.rpc_wallet('getaddressinfo', [addr_p2sh_segwit, ])
assert addr_info['script'] == 'witness_v0_keyhash'
txid = self.callnoderpc('sendtoaddress', [addr_p2sh_segwit, 1.0])
txid = ci.rpc_wallet('sendtoaddress', [addr_p2sh_segwit, 1.0])
assert len(txid) == 64
self.mineBlock()
ro = self.callnoderpc('scantxoutset', ['start', ['addr({})'.format(addr_p2sh_segwit)]])
ro = ci.rpc('scantxoutset', ['start', ['addr({})'.format(addr_p2sh_segwit)]])
assert (len(ro['unspents']) == 1)
assert (ro['unspents'][0]['txid'] == txid)
tx_wallet = self.callnoderpc('gettransaction', [txid, ])['hex']
tx = self.callnoderpc('decoderawtransaction', [tx_wallet, ])
tx_wallet = ci.rpc_wallet('gettransaction', [txid, ])['hex']
tx = ci.rpc('decoderawtransaction', [tx_wallet, ])
prevout_n = -1
for txo in tx['vout']:
@@ -457,14 +462,14 @@ class BasicSwapTest(TestFunctions):
break
assert prevout_n > -1
tx_funded = self.callnoderpc('createrawtransaction', [[{'txid': txid, 'vout': prevout_n}], {addr_p2sh_segwit: 0.99}])
tx_signed = self.callnoderpc('signrawtransactionwithwallet', [tx_funded, ])['hex']
tx_funded_decoded = self.callnoderpc('decoderawtransaction', [tx_funded, ])
tx_signed_decoded = self.callnoderpc('decoderawtransaction', [tx_signed, ])
tx_funded = ci.rpc('createrawtransaction', [[{'txid': txid, 'vout': prevout_n}], {addr_p2sh_segwit: 0.99}])
tx_signed = ci.rpc_wallet('signrawtransactionwithwallet', [tx_funded, ])['hex']
tx_funded_decoded = ci.rpc('decoderawtransaction', [tx_funded, ])
tx_signed_decoded = ci.rpc('decoderawtransaction', [tx_signed, ])
assert tx_funded_decoded['txid'] != tx_signed_decoded['txid']
# Add scriptsig for txids to match
addr_p2sh_segwit_info = self.callnoderpc('getaddressinfo', [addr_p2sh_segwit, ])
addr_p2sh_segwit_info = ci.rpc_wallet('getaddressinfo', [addr_p2sh_segwit, ])
decoded_tx = FromHex(CTransaction(), tx_funded)
decoded_tx.vin[0].scriptSig = bytes.fromhex('16' + addr_p2sh_segwit_info['hex'])
txid_with_scriptsig = decoded_tx.rehash()
@@ -473,18 +478,19 @@ class BasicSwapTest(TestFunctions):
def test_002_native_segwit(self):
# p2wpkh
logging.info('---------- Test {} p2sh native segwit'.format(self.test_coin_from.name))
ci = self.swap_clients[0].ci(self.test_coin_from)
addr_segwit = self.callnoderpc('getnewaddress', ['segwit test', 'bech32'])
addr_info = self.callnoderpc('getaddressinfo', [addr_segwit, ])
addr_segwit = ci.rpc_wallet('getnewaddress', ['segwit test', 'bech32'])
addr_info = ci.rpc_wallet('getaddressinfo', [addr_segwit, ])
assert addr_info['iswitness'] is True
txid = self.callnoderpc('sendtoaddress', [addr_segwit, 1.0])
txid = ci.rpc_wallet('sendtoaddress', [addr_segwit, 1.0])
assert len(txid) == 64
tx_wallet = self.callnoderpc('gettransaction', [txid, ])['hex']
tx = self.callnoderpc('decoderawtransaction', [tx_wallet, ])
tx_wallet = ci.rpc_wallet('gettransaction', [txid, ])['hex']
tx = ci.rpc('decoderawtransaction', [tx_wallet, ])
self.mineBlock()
ro = self.callnoderpc('scantxoutset', ['start', ['addr({})'.format(addr_segwit)]])
ro = ci.rpc('scantxoutset', ['start', ['addr({})'.format(addr_segwit)]])
assert (len(ro['unspents']) == 1)
assert (ro['unspents'][0]['txid'] == txid)
@@ -495,19 +501,17 @@ class BasicSwapTest(TestFunctions):
break
assert prevout_n > -1
tx_funded = self.callnoderpc('createrawtransaction', [[{'txid': txid, 'vout': prevout_n}], {addr_segwit: 0.99}])
tx_signed = self.callnoderpc('signrawtransactionwithwallet', [tx_funded, ])['hex']
tx_funded_decoded = self.callnoderpc('decoderawtransaction', [tx_funded, ])
tx_signed_decoded = self.callnoderpc('decoderawtransaction', [tx_signed, ])
tx_funded = ci.rpc('createrawtransaction', [[{'txid': txid, 'vout': prevout_n}], {addr_segwit: 0.99}])
tx_signed = ci.rpc_wallet('signrawtransactionwithwallet', [tx_funded, ])['hex']
tx_funded_decoded = ci.rpc('decoderawtransaction', [tx_funded, ])
tx_signed_decoded = ci.rpc('decoderawtransaction', [tx_signed, ])
assert tx_funded_decoded['txid'] == tx_signed_decoded['txid']
def test_003_cltv(self):
logging.info('---------- Test {} cltv'.format(self.test_coin_from.name))
ci = self.swap_clients[0].ci(self.test_coin_from)
deploymentinfo = self.callnoderpc('getdeploymentinfo')
bip65_active = deploymentinfo['deployments']['bip65']['active']
assert (bip65_active)
self.check_softfork_active('bip65')
chain_height = self.callnoderpc('getblockcount')
script = CScript([chain_height + 3, OP_CHECKLOCKTIMEVERIFY, ])
@@ -517,12 +521,12 @@ class BasicSwapTest(TestFunctions):
tx.nVersion = ci.txVersion()
tx.vout.append(ci.txoType()(ci.make_int(1.1), script_dest))
tx_hex = ToHex(tx)
tx_funded = self.callnoderpc('fundrawtransaction', [tx_hex])
tx_funded = ci.rpc_wallet('fundrawtransaction', [tx_hex])
utxo_pos = 0 if tx_funded['changepos'] == 1 else 1
tx_signed = self.callnoderpc('signrawtransactionwithwallet', [tx_funded['hex'], ])['hex']
txid = self.callnoderpc('sendrawtransaction', [tx_signed, ])
tx_signed = ci.rpc_wallet('signrawtransactionwithwallet', [tx_funded['hex'], ])['hex']
txid = ci.rpc('sendrawtransaction', [tx_signed, ])
addr_out = self.callnoderpc('getnewaddress', ['cltv test', 'bech32'])
addr_out = ci.rpc_wallet('getnewaddress', ['cltv test', 'bech32'])
pkh = ci.decodeSegwitAddress(addr_out)
script_out = ci.getScriptForPubkeyHash(pkh)
@@ -548,15 +552,15 @@ class BasicSwapTest(TestFunctions):
self.mineBlock(5)
try:
txid = self.callnoderpc('sendrawtransaction', [tx_spend_invalid_hex, ])
txid = ci.rpc('sendrawtransaction', [tx_spend_invalid_hex, ])
except Exception as e:
assert ('Locktime requirement not satisfied' in str(e))
else:
assert False, 'Should fail'
txid = self.callnoderpc('sendrawtransaction', [tx_spend_hex, ])
txid = ci.rpc('sendrawtransaction', [tx_spend_hex, ])
self.mineBlock()
ro = self.callnoderpc('listreceivedbyaddress', [0, ])
ro = ci.rpc_wallet('listreceivedbyaddress', [0, ])
sum_addr = 0
for entry in ro:
if entry['address'] == addr_out:
@@ -564,7 +568,7 @@ class BasicSwapTest(TestFunctions):
assert (sum_addr == 1.0999)
# Ensure tx was mined
tx_wallet = self.callnoderpc('gettransaction', [txid, ])
tx_wallet = ci.rpc_wallet('gettransaction', [txid, ])
assert (len(tx_wallet['blockhash']) == 64)
def test_004_csv(self):
@@ -572,6 +576,8 @@ class BasicSwapTest(TestFunctions):
swap_clients = self.swap_clients
ci = self.swap_clients[0].ci(self.test_coin_from)
self.check_softfork_active('bip66')
script = CScript([3, OP_CHECKSEQUENCEVERIFY, ])
script_dest = ci.getScriptDest(script)
@@ -579,17 +585,17 @@ class BasicSwapTest(TestFunctions):
tx.nVersion = ci.txVersion()
tx.vout.append(ci.txoType()(ci.make_int(1.1), script_dest))
tx_hex = ToHex(tx)
tx_funded = self.callnoderpc('fundrawtransaction', [tx_hex])
tx_funded = ci.rpc_wallet('fundrawtransaction', [tx_hex])
utxo_pos = 0 if tx_funded['changepos'] == 1 else 1
tx_signed = self.callnoderpc('signrawtransactionwithwallet', [tx_funded['hex'], ])['hex']
txid = self.callnoderpc('sendrawtransaction', [tx_signed, ])
tx_signed = ci.rpc_wallet('signrawtransactionwithwallet', [tx_funded['hex'], ])['hex']
txid = ci.rpc('sendrawtransaction', [tx_signed, ])
addr_out = self.callnoderpc('getnewaddress', ['csv test', 'bech32'])
addr_out = ci.rpc_wallet('getnewaddress', ['csv test', 'bech32'])
pkh = ci.decodeSegwitAddress(addr_out)
script_out = ci.getScriptForPubkeyHash(pkh)
# Double check output type
prev_tx = self.callnoderpc('decoderawtransaction', [tx_signed, ])
prev_tx = ci.rpc('decoderawtransaction', [tx_signed, ])
assert (prev_tx['vout'][utxo_pos]['scriptPubKey']['type'] == 'witness_v0_scripthash')
tx_spend = CTransaction()
@@ -601,16 +607,16 @@ class BasicSwapTest(TestFunctions):
tx_spend.wit.vtxinwit[0].scriptWitness.stack = [script, ]
tx_spend_hex = ToHex(tx_spend)
try:
txid = self.callnoderpc('sendrawtransaction', [tx_spend_hex, ])
txid = ci.rpc('sendrawtransaction', [tx_spend_hex, ])
except Exception as e:
assert ('non-BIP68-final' in str(e))
else:
assert False, 'Should fail'
self.mineBlock(3)
txid = self.callnoderpc('sendrawtransaction', [tx_spend_hex, ])
txid = ci.rpc('sendrawtransaction', [tx_spend_hex, ])
self.mineBlock(1)
ro = self.callnoderpc('listreceivedbyaddress', [0, ])
ro = ci.rpc_wallet('listreceivedbyaddress', [0, ])
sum_addr = 0
for entry in ro:
if entry['address'] == addr_out:
@@ -618,20 +624,22 @@ class BasicSwapTest(TestFunctions):
assert (sum_addr == 1.0999)
# Ensure tx was mined
tx_wallet = self.callnoderpc('gettransaction', [txid, ])
tx_wallet = ci.rpc_wallet('gettransaction', [txid, ])
assert (len(tx_wallet['blockhash']) == 64)
def test_005_watchonly(self):
logging.info('---------- Test {} watchonly'.format(self.test_coin_from.name))
ci = self.swap_clients[0].ci(self.test_coin_from)
ci1 = self.swap_clients[1].ci(self.test_coin_from)
addr = self.callnoderpc('getnewaddress', ['watchonly test', 'bech32'])
ro = self.callnoderpc('importaddress', [addr, '', False], node_id=1)
txid = self.callnoderpc('sendtoaddress', [addr, 1.0])
tx_hex = self.callnoderpc('getrawtransaction', [txid, ])
self.callnoderpc('sendrawtransaction', [tx_hex, ], node_id=1)
ro = self.callnoderpc('gettransaction', [txid, ], node_id=1)
addr = ci.rpc_wallet('getnewaddress', ['watchonly test', 'bech32'])
ro = ci1.rpc_wallet('importaddress', [addr, '', False])
txid = ci.rpc_wallet('sendtoaddress', [addr, 1.0])
tx_hex = ci.rpc('getrawtransaction', [txid, ])
ci1.rpc_wallet('sendrawtransaction', [tx_hex, ])
ro = ci1.rpc_wallet('gettransaction', [txid, ])
assert (ro['txid'] == txid)
balances = self.callnoderpc('getbalances', node_id=1)
balances = ci1.rpc_wallet('getbalances')
assert (balances['watchonly']['trusted'] + balances['watchonly']['untrusted_pending'] >= 1.0)
def test_006_getblock_verbosity(self):
@@ -643,6 +651,7 @@ class BasicSwapTest(TestFunctions):
def test_007_hdwallet(self):
logging.info('---------- Test {} hdwallet'.format(self.test_coin_from.name))
ci = self.swap_clients[0].ci(self.test_coin_from)
test_seed = '8e54a313e6df8918df6d758fafdbf127a115175fdd2238d0e908dd8093c9ac3b'
test_wif = self.swap_clients[0].ci(self.test_coin_from).encodeKey(bytes.fromhex(test_seed))
@@ -657,7 +666,7 @@ class BasicSwapTest(TestFunctions):
self.swap_clients[0].initialiseWallet(Coins.BTC, raise_errors=True)
assert self.swap_clients[0].checkWalletSeed(Coins.BTC) is True
for i in range(1500):
self.callnoderpc('getnewaddress')
ci.rpc_wallet('getnewaddress')
assert self.swap_clients[0].checkWalletSeed(Coins.BTC) is True
rv = read_json_api(1800, 'getcoinseed', {'coin': 'XMR'})
@@ -667,38 +676,44 @@ class BasicSwapTest(TestFunctions):
logging.info('---------- Test {} gettxout'.format(self.test_coin_from.name))
swap_client = self.swap_clients[0]
ci = swap_client.ci(self.test_coin_from)
addr_1 = self.callnoderpc('getnewaddress', ['gettxout test 1',])
txid = self.callnoderpc('sendtoaddress', [addr_1, 1.0])
addr_1 = ci.rpc_wallet('getnewaddress', ['gettxout test 1',])
txid = ci.rpc_wallet('sendtoaddress', [addr_1, 1.0])
assert len(txid) == 64
self.mineBlock()
unspents = self.callnoderpc('listunspent', [0, 999999999, [addr_1,]])
unspents = ci.rpc_wallet('listunspent', [0, 999999999, [addr_1,]])
assert (len(unspents) == 1)
utxo = unspents[0]
txout = self.callnoderpc('gettxout', [utxo['txid'], utxo['vout']])
assert (addr_1 == txout['scriptPubKey']['address'])
txout = ci.rpc('gettxout', [utxo['txid'], utxo['vout']])
if 'address' in txout:
assert (addr_1 == txout['scriptPubKey']['address'])
else:
assert (addr_1 in txout['scriptPubKey']['addresses'])
# Spend
addr_2 = self.callnoderpc('getnewaddress', ['gettxout test 2',])
tx_funded = self.callnoderpc('createrawtransaction', [[{'txid': utxo['txid'], 'vout': utxo['vout']}], {addr_2: 0.99}])
tx_signed = self.callnoderpc('signrawtransactionwithwallet', [tx_funded,])['hex']
self.callnoderpc('sendrawtransaction', [tx_signed,])
addr_2 = ci.rpc_wallet('getnewaddress', ['gettxout test 2',])
tx_funded = ci.rpc('createrawtransaction', [[{'txid': utxo['txid'], 'vout': utxo['vout']}], {addr_2: 0.99}])
tx_signed = ci.rpc_wallet('signrawtransactionwithwallet', [tx_funded,])['hex']
ci.rpc('sendrawtransaction', [tx_signed,])
# utxo should be unavailable when spent in the mempool
txout = self.callnoderpc('gettxout', [utxo['txid'], utxo['vout']])
txout = ci.rpc('gettxout', [utxo['txid'], utxo['vout']])
assert (txout is None)
def test_009_scantxoutset(self):
logging.info('---------- Test {} scantxoutset'.format(self.test_coin_from.name))
addr_1 = self.callnoderpc('getnewaddress', ['scantxoutset test', ])
txid = self.callnoderpc('sendtoaddress', [addr_1, 1.0])
ci = self.swap_clients[0].ci(self.test_coin_from)
addr_1 = ci.rpc_wallet('getnewaddress', ['scantxoutset test', ])
txid = ci.rpc_wallet('sendtoaddress', [addr_1, 1.0])
assert len(txid) == 64
self.mineBlock()
ro = self.callnoderpc('scantxoutset', ['start', ['addr({})'.format(addr_1)]])
ro = ci.rpc('scantxoutset', ['start', ['addr({})'.format(addr_1)]])
assert (len(ro['unspents']) == 1)
assert (ro['unspents'][0]['txid'] == txid)
@@ -712,7 +727,7 @@ class BasicSwapTest(TestFunctions):
amount: int = ci.make_int(random.uniform(0.1, 2.0), r=1)
# Record unspents before createSCLockTx as the used ones will be locked
unspents = self.callnoderpc('listunspent')
unspents = ci.rpc_wallet('listunspent')
# fee_rate is in sats/kvB
fee_rate: int = 1000
@@ -728,10 +743,10 @@ class BasicSwapTest(TestFunctions):
lock_tx = ci.fundSCLockTx(lock_tx, fee_rate)
lock_tx = ci.signTxWithWallet(lock_tx)
unspents_after = self.callnoderpc('listunspent')
unspents_after = ci.rpc_wallet('listunspent')
assert (len(unspents) > len(unspents_after))
tx_decoded = self.callnoderpc('decoderawtransaction', [lock_tx.hex()])
tx_decoded = ci.rpc('decoderawtransaction', [lock_tx.hex()])
txid = tx_decoded['txid']
vsize = tx_decoded['vsize']
@@ -752,8 +767,8 @@ class BasicSwapTest(TestFunctions):
break
fee_value = in_value - out_value
self.callnoderpc('sendrawtransaction', [lock_tx.hex()])
rv = self.callnoderpc('gettransaction', [txid])
ci.rpc('sendrawtransaction', [lock_tx.hex()])
rv = ci.rpc_wallet('gettransaction', [txid])
wallet_tx_fee = -ci.make_int(rv['fee'])
assert (wallet_tx_fee == fee_value)
@@ -765,7 +780,7 @@ class BasicSwapTest(TestFunctions):
lock_spend_tx = ci.createSCLockSpendTx(lock_tx, lock_tx_script, pkh_out, fee_rate, fee_info=fee_info)
vsize_estimated: int = fee_info['vsize']
tx_decoded = self.callnoderpc('decoderawtransaction', [lock_spend_tx.hex()])
tx_decoded = ci.rpc('decoderawtransaction', [lock_spend_tx.hex()])
txid = tx_decoded['txid']
witness_stack = [
@@ -775,11 +790,11 @@ class BasicSwapTest(TestFunctions):
lock_tx_script,
]
lock_spend_tx = ci.setTxSignature(lock_spend_tx, witness_stack)
tx_decoded = self.callnoderpc('decoderawtransaction', [lock_spend_tx.hex()])
tx_decoded = ci.rpc('decoderawtransaction', [lock_spend_tx.hex()])
vsize_actual: int = tx_decoded['vsize']
assert (vsize_actual <= vsize_estimated and vsize_estimated - vsize_actual < 4)
assert (self.callnoderpc('sendrawtransaction', [lock_spend_tx.hex()]) == txid)
assert (ci.rpc('sendrawtransaction', [lock_spend_tx.hex()]) == txid)
expect_vsize: int = ci.xmr_swap_a_lock_spend_tx_vsize()
assert (expect_vsize >= vsize_actual)
@@ -796,7 +811,7 @@ class BasicSwapTest(TestFunctions):
lock_tx_b_spend = ci.getTransaction(lock_tx_b_spend_txid)
if lock_tx_b_spend is None:
lock_tx_b_spend = ci.getWalletTransaction(lock_tx_b_spend_txid)
lock_tx_b_spend_decoded = self.callnoderpc('decoderawtransaction', [lock_tx_b_spend.hex()])
lock_tx_b_spend_decoded = ci.rpc('decoderawtransaction', [lock_tx_b_spend.hex()])
expect_vsize: int = ci.xmr_swap_b_lock_spend_tx_vsize()
assert (expect_vsize >= lock_tx_b_spend_decoded['vsize'])
@@ -816,17 +831,17 @@ class BasicSwapTest(TestFunctions):
tx.nVersion = ci.txVersion()
tx.vout.append(ci.txoType()(ci.make_int(1.1), script_dest))
tx_hex = ToHex(tx)
tx_funded = self.callnoderpc('fundrawtransaction', [tx_hex])
tx_funded = ci.rpc_wallet('fundrawtransaction', [tx_hex])
utxo_pos = 0 if tx_funded['changepos'] == 1 else 1
tx_signed = self.callnoderpc('signrawtransactionwithwallet', [tx_funded['hex'], ])['hex']
txid = self.callnoderpc('sendrawtransaction', [tx_signed, ])
tx_signed = ci.rpc_wallet('signrawtransactionwithwallet', [tx_funded['hex'], ])['hex']
txid = ci.rpc('sendrawtransaction', [tx_signed, ])
addr_out = self.callnoderpc('getnewaddress', ['csv test', 'bech32'])
addr_out = ci.rpc_wallet('getnewaddress', ['csv test', 'bech32'])
pkh = ci.decodeSegwitAddress(addr_out)
script_out = ci.getScriptForPubkeyHash(pkh)
# Double check output type
prev_tx = self.callnoderpc('decoderawtransaction', [tx_signed, ])
prev_tx = ci.rpc('decoderawtransaction', [tx_signed, ])
assert (prev_tx['vout'][utxo_pos]['scriptPubKey']['type'] == 'scripthash')
tx_spend = CTransaction()
@@ -836,9 +851,9 @@ class BasicSwapTest(TestFunctions):
tx_spend.vout.append(ci.txoType()(ci.make_int(1.0999), script_out))
tx_spend_hex = ToHex(tx_spend)
txid = self.callnoderpc('sendrawtransaction', [tx_spend_hex, ])
txid = ci.rpc('sendrawtransaction', [tx_spend_hex, ])
self.mineBlock(1)
ro = self.callnoderpc('listreceivedbyaddress', [0, ])
ro = ci.rpc_wallet('listreceivedbyaddress', [0, ])
sum_addr = 0
for entry in ro:
if entry['address'] == addr_out:
@@ -846,7 +861,7 @@ class BasicSwapTest(TestFunctions):
assert (sum_addr == 1.0999)
# Ensure tx was mined
tx_wallet = self.callnoderpc('gettransaction', [txid, ])
tx_wallet = ci.rpc_wallet('gettransaction', [txid, ])
assert (len(tx_wallet['blockhash']) == 64)
def test_012_p2sh_p2wsh(self):
@@ -863,17 +878,17 @@ class BasicSwapTest(TestFunctions):
tx.nVersion = ci.txVersion()
tx.vout.append(ci.txoType()(ci.make_int(1.1), script_dest))
tx_hex = ToHex(tx)
tx_funded = self.callnoderpc('fundrawtransaction', [tx_hex])
tx_funded = ci.rpc_wallet('fundrawtransaction', [tx_hex])
utxo_pos = 0 if tx_funded['changepos'] == 1 else 1
tx_signed = self.callnoderpc('signrawtransactionwithwallet', [tx_funded['hex'], ])['hex']
txid = self.callnoderpc('sendrawtransaction', [tx_signed, ])
tx_signed = ci.rpc_wallet('signrawtransactionwithwallet', [tx_funded['hex'], ])['hex']
txid = ci.rpc('sendrawtransaction', [tx_signed, ])
addr_out = self.callnoderpc('getnewaddress', ['csv test', 'bech32'])
addr_out = ci.rpc_wallet('getnewaddress', ['csv test', 'bech32'])
pkh = ci.decodeSegwitAddress(addr_out)
script_out = ci.getScriptForPubkeyHash(pkh)
# Double check output type
prev_tx = self.callnoderpc('decoderawtransaction', [tx_signed, ])
prev_tx = ci.rpc('decoderawtransaction', [tx_signed, ])
assert (prev_tx['vout'][utxo_pos]['scriptPubKey']['type'] == 'scripthash')
tx_spend = CTransaction()
@@ -885,9 +900,9 @@ class BasicSwapTest(TestFunctions):
tx_spend.wit.vtxinwit[0].scriptWitness.stack = [script, ]
tx_spend_hex = ToHex(tx_spend)
txid = self.callnoderpc('sendrawtransaction', [tx_spend_hex, ])
txid = ci.rpc('sendrawtransaction', [tx_spend_hex, ])
self.mineBlock(1)
ro = self.callnoderpc('listreceivedbyaddress', [0, ])
ro = ci.rpc_wallet('listreceivedbyaddress', [0, ])
sum_addr = 0
for entry in ro:
if entry['address'] == addr_out:
@@ -895,7 +910,7 @@ class BasicSwapTest(TestFunctions):
assert (sum_addr == 1.0999)
# Ensure tx was mined
tx_wallet = self.callnoderpc('gettransaction', [txid, ])
tx_wallet = ci.rpc_wallet('gettransaction', [txid, ])
assert (len(tx_wallet['blockhash']) == 64)
def test_01_a_full_swap(self):
@@ -1045,7 +1060,7 @@ class BasicSwapTest(TestFunctions):
# Verify expected inputs were used
bid, _, _, _, _ = swap_clients[2].getXmrBidAndOffer(bid_id)
assert (bid.xmr_a_lock_tx)
wtx = ci.rpc_callback('gettransaction', [bid.xmr_a_lock_tx.txid.hex(),])
wtx = ci.rpc_wallet('gettransaction', [bid.xmr_a_lock_tx.txid.hex(),])
itx_after = ci.describeTx(wtx['hex'])
assert (len(itx_after['vin']) == len(itx_decoded['vin']))
for i, txin in enumerate(itx_decoded['vin']):

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021-2022 tecnovert
# Copyright (c) 2021-2023 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
@@ -24,9 +24,10 @@ from tests.basicswap.common import (
wait_for_bid,
wait_for_offer,
wait_for_in_progress,
TEST_HTTP_PORT,
LTC_BASE_RPC_PORT,
)
from .test_btc_xmr import BasicSwapTest, test_delay_event
from .test_btc_xmr import BasicSwapTest, test_delay_event, callnoderpc
logger = logging.getLogger()
@@ -37,9 +38,20 @@ class TestLTC(BasicSwapTest):
start_ltc_nodes = True
base_rpc_port = LTC_BASE_RPC_PORT
@classmethod
def prepareExtraCoins(cls):
logging.info('Mining {} chain to height 1352 to activate CVS (BIP66)'.format(cls.test_coin_from.name))
chain_height = callnoderpc(0, 'getblockcount', base_rpc_port=LTC_BASE_RPC_PORT)
num_blocks: int = 1352 - chain_height
callnoderpc(0, 'generatetoaddress', [num_blocks, cls.ltc_addr], base_rpc_port=LTC_BASE_RPC_PORT)
def mineBlock(self, num_blocks=1):
self.callnoderpc('generatetoaddress', [num_blocks, self.ltc_addr])
def check_softfork_active(self, feature_name):
deploymentinfo = self.callnoderpc('getblockchaininfo')
assert (deploymentinfo['softforks'][feature_name]['active'] is True)
def test_001_nested_segwit(self):
logging.info('---------- Test {} p2sh nested segwit'.format(self.test_coin_from.name))
logging.info('Skipped')
@@ -47,17 +59,18 @@ class TestLTC(BasicSwapTest):
def test_002_native_segwit(self):
logging.info('---------- Test {} p2sh native segwit'.format(self.test_coin_from.name))
addr_segwit = self.callnoderpc('getnewaddress', ['segwit test', 'bech32'])
addr_info = self.callnoderpc('getaddressinfo', [addr_segwit, ])
ci = self.swap_clients[0].ci(self.test_coin_from)
addr_segwit = ci.rpc_wallet('getnewaddress', ['segwit test', 'bech32'])
addr_info = ci.rpc_wallet('getaddressinfo', [addr_segwit, ])
assert addr_info['iswitness'] is True
txid = self.callnoderpc('sendtoaddress', [addr_segwit, 1.0])
txid = ci.rpc_wallet('sendtoaddress', [addr_segwit, 1.0])
assert len(txid) == 64
tx_wallet = self.callnoderpc('gettransaction', [txid, ])['hex']
tx = self.callnoderpc('decoderawtransaction', [tx_wallet, ])
tx_wallet = ci.rpc_wallet('gettransaction', [txid, ])['hex']
tx = ci.rpc('decoderawtransaction', [tx_wallet, ])
self.mineBlock()
ro = self.callnoderpc('scantxoutset', ['start', ['addr({})'.format(addr_segwit)]])
ro = ci.rpc('scantxoutset', ['start', ['addr({})'.format(addr_segwit)]])
assert (len(ro['unspents']) == 1)
assert (ro['unspents'][0]['txid'] == txid)
@@ -68,10 +81,10 @@ class TestLTC(BasicSwapTest):
break
assert prevout_n > -1
tx_funded = self.callnoderpc('createrawtransaction', [[{'txid': txid, 'vout': prevout_n}], {addr_segwit: 0.99}])
tx_signed = self.callnoderpc('signrawtransactionwithwallet', [tx_funded, ])['hex']
tx_funded_decoded = self.callnoderpc('decoderawtransaction', [tx_funded, ])
tx_signed_decoded = self.callnoderpc('decoderawtransaction', [tx_signed, ])
tx_funded = ci.rpc('createrawtransaction', [[{'txid': txid, 'vout': prevout_n}], {addr_segwit: 0.99}])
tx_signed = ci.rpc_wallet('signrawtransactionwithwallet', [tx_funded, ])['hex']
tx_funded_decoded = ci.rpc('decoderawtransaction', [tx_funded, ])
tx_signed_decoded = ci.rpc('decoderawtransaction', [tx_signed, ])
assert tx_funded_decoded['txid'] == tx_signed_decoded['txid']
def test_007_hdwallet(self):
@@ -108,6 +121,115 @@ class TestLTC(BasicSwapTest):
assert (js_0['num_swapping'] == 0 and js_0['num_watched_outputs'] == 0)
assert (js_1['num_swapping'] == 0 and js_1['num_watched_outputs'] == 0)
def test_21_mweb(self):
logging.info('---------- Test MWEB {}'.format(self.test_coin_from.name))
swap_clients = self.swap_clients
ci0 = swap_clients[0].ci(self.test_coin_from)
ci1 = swap_clients[1].ci(self.test_coin_from)
mweb_addr_0 = ci0.rpc_wallet('getnewaddress', ['mweb addr test 0', 'mweb'])
mweb_addr_1 = ci1.rpc_wallet('getnewaddress', ['mweb addr test 1', 'mweb'])
addr_info0 = ci0.rpc_wallet('getaddressinfo', [mweb_addr_0,])
assert (addr_info0['ismweb'] is True)
addr_info1 = ci1.rpc_wallet('getaddressinfo', [mweb_addr_1,])
assert (addr_info1['ismweb'] is True)
txid = ci0.rpc_wallet('sendtoaddress', [mweb_addr_0, 10.0])
self.mineBlock()
txns = ci0.rpc_wallet('listtransactions')
utxos = ci0.rpc_wallet('listunspent')
balances = ci0.rpc_wallet('getbalances')
wi = ci0.rpc_wallet('getwalletinfo')
txid = ci0.rpc_wallet('sendtoaddress', [mweb_addr_1, 10.0])
self.mineBlock()
txns = ci1.rpc_wallet('listtransactions')
utxos = ci1.rpc_wallet('listunspent')
balances = ci1.rpc_wallet('getbalances')
wi = ci1.rpc_wallet('getwalletinfo')
mweb_tx = None
for utxo in utxos:
if utxo.get('address', '') == mweb_addr_1:
mweb_tx = utxo
assert (mweb_tx is not None)
tx = ci1.rpc_wallet('gettransaction', [mweb_tx['txid'],])
blockhash = tx['blockhash']
block = ci1.rpc('getblock', [blockhash, 3])
block = ci1.rpc('getblock', [blockhash, 0])
# TODO
def test_22_mweb_balance(self):
logging.info('---------- Test MWEB balance {}'.format(self.test_coin_from.name))
swap_clients = self.swap_clients
ci_mweb = swap_clients[0].ci(Coins.LTC_MWEB)
mweb_addr_0 = ci_mweb.getNewAddress()
addr_info0 = ci_mweb.rpc_wallet('getaddressinfo', [mweb_addr_0,])
assert (addr_info0['ismweb'] is True)
ltc_addr = read_json_api(TEST_HTTP_PORT + 0, 'wallets/ltc/nextdepositaddr')
ltc_mweb_addr = read_json_api(TEST_HTTP_PORT + 0, 'wallets/ltc_mweb/nextdepositaddr')
ltc_mweb_addr2 = read_json_api(TEST_HTTP_PORT + 0, 'wallets/ltc/newmwebaddress')
assert (ci_mweb.rpc_wallet('getaddressinfo', [ltc_addr,])['ismweb'] is False)
assert (ci_mweb.rpc_wallet('getaddressinfo', [ltc_mweb_addr,])['ismweb'] is True)
assert (ci_mweb.rpc_wallet('getaddressinfo', [ltc_mweb_addr2,])['ismweb'] is True)
post_json = {
'value': 10,
'address': ltc_mweb_addr,
'subfee': False,
}
json_rv = read_json_api(TEST_HTTP_PORT + 0, 'wallets/ltc/withdraw', post_json)
assert (len(json_rv['txid']) == 64)
self.mineBlock()
json_rv = read_json_api(TEST_HTTP_PORT + 0, 'wallets/ltc', post_json)
assert (json_rv['mweb_balance'] == 10.0)
mweb_address = json_rv['mweb_address']
post_json = {
'value': 11,
'address': mweb_address,
'subfee': False,
}
json_rv = read_json_api(TEST_HTTP_PORT + 0, 'wallets/ltc/withdraw', post_json)
assert (len(json_rv['txid']) == 64)
self.mineBlock()
json_rv = read_json_api(TEST_HTTP_PORT + 0, 'wallets/ltc_mweb', post_json)
assert (json_rv['mweb_balance'] == 21.0)
assert (json_rv['mweb_address'] == mweb_address)
ltc_address = json_rv['deposit_address']
# Check that spending the mweb balance takes from the correct wallet
post_json = {
'value': 1,
'address': ltc_address,
'subfee': False,
'type_from': 'mweb',
}
json_rv = read_json_api(TEST_HTTP_PORT + 0, 'wallets/ltc/withdraw', post_json)
assert (len(json_rv['txid']) == 64)
json_rv = read_json_api(TEST_HTTP_PORT + 0, 'wallets/ltc', post_json)
assert (json_rv['mweb_balance'] <= 20.0)
if __name__ == '__main__':
unittest.main()

View File

@@ -101,7 +101,7 @@ class Test(BaseTest):
nonlocal ci
i = 0
while not delay_event.is_set():
unspents = ci.rpc_callback('listunspentblind')
unspents = ci.rpc_wallet('listunspentblind')
if len(unspents) >= 1:
return
delay_event.wait(delay_time)
@@ -113,8 +113,8 @@ class Test(BaseTest):
amount: int = ci.make_int(random.uniform(0.1, 2.0), r=1)
# Record unspents before createSCLockTx as the used ones will be locked
unspents = ci.rpc_callback('listunspentblind')
locked_utxos_before = ci.rpc_callback('listlockunspent')
unspents = ci.rpc_wallet('listunspentblind')
locked_utxos_before = ci.rpc_wallet('listlockunspent')
# fee_rate is in sats/kvB
fee_rate: int = 1000
@@ -131,33 +131,33 @@ class Test(BaseTest):
lock_tx = ci.fundSCLockTx(lock_tx, fee_rate, vkbv)
lock_tx = ci.signTxWithWallet(lock_tx)
unspents_after = ci.rpc_callback('listunspentblind')
locked_utxos_after = ci.rpc_callback('listlockunspent')
unspents_after = ci.rpc_wallet('listunspentblind')
locked_utxos_after = ci.rpc_wallet('listlockunspent')
assert (len(unspents) > len(unspents_after))
assert (len(locked_utxos_after) > len(locked_utxos_before))
lock_tx_decoded = ci.rpc_callback('decoderawtransaction', [lock_tx.hex()])
lock_tx_decoded = ci.rpc_wallet('decoderawtransaction', [lock_tx.hex()])
txid = lock_tx_decoded['txid']
vsize = lock_tx_decoded['vsize']
expect_fee_int = round(fee_rate * vsize / 1000)
expect_fee = ci.format_amount(expect_fee_int)
ci.rpc_callback('sendrawtransaction', [lock_tx.hex()])
rv = ci.rpc_callback('gettransaction', [txid])
ci.rpc_wallet('sendrawtransaction', [lock_tx.hex()])
rv = ci.rpc_wallet('gettransaction', [txid])
wallet_tx_fee = -ci.make_int(rv['details'][0]['fee'])
assert (wallet_tx_fee >= expect_fee_int)
assert (wallet_tx_fee - expect_fee_int < 20)
addr_out = ci.getNewAddress(True)
addrinfo = ci.rpc_callback('getaddressinfo', [addr_out,])
addrinfo = ci.rpc_wallet('getaddressinfo', [addr_out,])
pk_out = bytes.fromhex(addrinfo['pubkey'])
fee_info = {}
lock_spend_tx = ci.createSCLockSpendTx(lock_tx, lock_tx_script, pk_out, fee_rate, vkbv, fee_info=fee_info)
vsize_estimated: int = fee_info['vsize']
spend_tx_decoded = ci.rpc_callback('decoderawtransaction', [lock_spend_tx.hex()])
spend_tx_decoded = ci.rpc('decoderawtransaction', [lock_spend_tx.hex()])
txid = spend_tx_decoded['txid']
nonce = ci.getScriptLockTxNonce(vkbv)
@@ -172,12 +172,12 @@ class Test(BaseTest):
lock_tx_script,
]
lock_spend_tx = ci.setTxSignature(lock_spend_tx, witness_stack)
tx_decoded = ci.rpc_callback('decoderawtransaction', [lock_spend_tx.hex()])
tx_decoded = ci.rpc('decoderawtransaction', [lock_spend_tx.hex()])
vsize_actual: int = tx_decoded['vsize']
# Note: The fee is set allowing 9 bytes for the encoded fee amount, causing a small overestimate
assert (vsize_actual <= vsize_estimated and vsize_estimated - vsize_actual < 10)
assert (ci.rpc_callback('sendrawtransaction', [lock_spend_tx.hex()]) == txid)
assert (ci.rpc('sendrawtransaction', [lock_spend_tx.hex()]) == txid)
# Test chain b (no-script) lock tx size
v = ci.getNewSecretKey()
@@ -198,7 +198,7 @@ class Test(BaseTest):
lock_tx_b_spend = ci.getTransaction(lock_tx_b_spend_txid)
if lock_tx_b_spend is None:
lock_tx_b_spend = ci.getWalletTransaction(lock_tx_b_spend_txid)
lock_tx_b_spend_decoded = ci.rpc_callback('decoderawtransaction', [lock_tx_b_spend.hex()])
lock_tx_b_spend_decoded = ci.rpc('decoderawtransaction', [lock_tx_b_spend.hex()])
expect_vsize: int = ci.xmr_swap_b_lock_spend_tx_vsize()
assert (expect_vsize >= lock_tx_b_spend_decoded['vsize'])
@@ -472,7 +472,7 @@ class Test(BaseTest):
# Verify expected inputs were used
bid, _, _, _, _ = swap_clients[2].getXmrBidAndOffer(bid_id)
assert (bid.xmr_a_lock_tx)
wtx = ci.rpc_callback('gettransaction', [bid.xmr_a_lock_tx.txid.hex(),])
wtx = ci.rpc_wallet('gettransaction', [bid.xmr_a_lock_tx.txid.hex(),])
itx_after = ci.describeTx(wtx['hex'])
assert (len(itx_after['vin']) == len(itx_decoded['vin']))
for i, txin in enumerate(itx_decoded['vin']):

View File

@@ -80,10 +80,10 @@ class Test(BaseTest):
super(Test, cls).setUpClass()
btc_addr1 = callnoderpc(1, 'getnewaddress', ['initial funds', 'bech32'], base_rpc_port=BTC_BASE_RPC_PORT)
ltc_addr1 = callnoderpc(1, 'getnewaddress', ['initial funds', 'bech32'], base_rpc_port=LTC_BASE_RPC_PORT)
ltc_addr1 = callnoderpc(1, 'getnewaddress', ['initial funds', 'bech32'], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
callnoderpc(0, 'sendtoaddress', [btc_addr1, 1000], base_rpc_port=BTC_BASE_RPC_PORT)
callnoderpc(0, 'sendtoaddress', [ltc_addr1, 1000], base_rpc_port=LTC_BASE_RPC_PORT)
callnoderpc(0, 'sendtoaddress', [ltc_addr1, 1000], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
wait_for_balance(test_delay_event, 'http://127.0.0.1:1801/json/wallets/btc', 'balance', 1000.0)
wait_for_balance(test_delay_event, 'http://127.0.0.1:1801/json/wallets/ltc', 'balance', 1000.0)
@@ -182,6 +182,9 @@ class Test(BaseTest):
rv = read_json_api(1800, 'automationstrategies/1')
assert (rv['label'] == 'Accept All')
sx_addr = read_json_api(1800, 'wallets/part/newstealthaddress')
assert (callnoderpc(0, 'getaddressinfo', [sx_addr, ])['isstealthaddress'] is True)
def test_004_validateSwapType(self):
logging.info('---------- Test validateSwapType')
@@ -570,7 +573,7 @@ class Test(BaseTest):
def test_12_withdrawal(self):
logging.info('---------- Test LTC withdrawals')
ltc_addr = callnoderpc(0, 'getnewaddress', ['Withdrawal test', 'legacy'], base_rpc_port=LTC_BASE_RPC_PORT)
ltc_addr = callnoderpc(0, 'getnewaddress', ['Withdrawal test', 'legacy'], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
wallets0 = read_json_api(TEST_HTTP_PORT + 0, 'wallets')
assert (float(wallets0['LTC']['balance']) > 100)
@@ -712,7 +715,7 @@ class Test(BaseTest):
# Verify expected inputs were used
bid, offer = swap_clients[2].getBidAndOffer(bid_id)
assert (bid.initiate_tx)
wtx = ci.rpc_callback('gettransaction', [bid.initiate_tx.txid.hex(),])
wtx = ci.rpc('gettransaction', [bid.initiate_tx.txid.hex(),])
itx_after = ci.describeTx(wtx['hex'])
assert (len(itx_after['vin']) == len(itx_decoded['vin']))
for i, txin in enumerate(itx_decoded['vin']):

View File

@@ -530,29 +530,29 @@ class BaseTest(unittest.TestCase):
if cls.start_ltc_nodes:
num_blocks = 400
cls.ltc_addr = callnoderpc(0, 'getnewaddress', ['mining_addr', 'bech32'], base_rpc_port=LTC_BASE_RPC_PORT)
cls.ltc_addr = callnoderpc(0, 'getnewaddress', ['mining_addr', 'bech32'], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
logging.info('Mining %d Litecoin blocks to %s', num_blocks, cls.ltc_addr)
callnoderpc(0, 'generatetoaddress', [num_blocks, cls.ltc_addr], base_rpc_port=LTC_BASE_RPC_PORT)
callnoderpc(0, 'generatetoaddress', [num_blocks, cls.ltc_addr], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
num_blocks = 31
cls.ltc_addr = cls.swap_clients[0].ci(Coins.LTC).pubkey_to_address(void_block_rewards_pubkey)
logging.info('Mining %d Litecoin blocks to %s', num_blocks, cls.ltc_addr)
callnoderpc(0, 'generatetoaddress', [num_blocks, cls.ltc_addr], base_rpc_port=LTC_BASE_RPC_PORT)
callnoderpc(0, 'generatetoaddress', [num_blocks, cls.ltc_addr], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
# https://github.com/litecoin-project/litecoin/issues/807
# Block 432 is when MWEB activates. It requires a peg-in. You'll need to generate an mweb address and send some coins to it. Then it will allow you to mine the next block.
mweb_addr = callnoderpc(2, 'getnewaddress', ['mweb_addr', 'mweb'], base_rpc_port=LTC_BASE_RPC_PORT)
callnoderpc(0, 'sendtoaddress', [mweb_addr, 1], base_rpc_port=LTC_BASE_RPC_PORT)
mweb_addr = callnoderpc(2, 'getnewaddress', ['mweb_addr', 'mweb'], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
callnoderpc(0, 'sendtoaddress', [mweb_addr, 1], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
ltc_addr1 = callnoderpc(1, 'getnewaddress', ['initial addr'], base_rpc_port=LTC_BASE_RPC_PORT)
ltc_addr1 = callnoderpc(1, 'getnewaddress', ['initial addr'], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
for i in range(5):
callnoderpc(0, 'sendtoaddress', [ltc_addr1, 100], base_rpc_port=LTC_BASE_RPC_PORT)
callnoderpc(0, 'sendtoaddress', [ltc_addr1, 100], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
num_blocks = 69
cls.ltc_addr = cls.swap_clients[0].ci(Coins.LTC).pubkey_to_address(void_block_rewards_pubkey)
callnoderpc(0, 'generatetoaddress', [num_blocks, cls.ltc_addr], base_rpc_port=LTC_BASE_RPC_PORT)
callnoderpc(0, 'generatetoaddress', [num_blocks, cls.ltc_addr], base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat')
checkForks(callnoderpc(0, 'getblockchaininfo', base_rpc_port=LTC_BASE_RPC_PORT))
checkForks(callnoderpc(0, 'getblockchaininfo', base_rpc_port=LTC_BASE_RPC_PORT, wallet='wallet.dat'))
num_blocks = 100
if cls.start_xmr_nodes:
@@ -682,7 +682,7 @@ class Test(BaseTest):
amount: int = ci.make_int(random.uniform(0.1, 2.0), r=1)
# Record unspents before createSCLockTx as the used ones will be locked
unspents = ci.rpc_callback('listunspent')
unspents = ci.rpc('listunspent')
# fee_rate is in sats/kvB
fee_rate: int = 1000
@@ -698,10 +698,10 @@ class Test(BaseTest):
lock_tx = ci.fundSCLockTx(lock_tx, fee_rate)
lock_tx = ci.signTxWithWallet(lock_tx)
unspents_after = ci.rpc_callback('listunspent')
unspents_after = ci.rpc('listunspent')
assert (len(unspents) > len(unspents_after))
tx_decoded = ci.rpc_callback('decoderawtransaction', [lock_tx.hex()])
tx_decoded = ci.rpc('decoderawtransaction', [lock_tx.hex()])
txid = tx_decoded['txid']
vsize = tx_decoded['vsize']
@@ -722,8 +722,8 @@ class Test(BaseTest):
break
fee_value = in_value - out_value
ci.rpc_callback('sendrawtransaction', [lock_tx.hex()])
rv = ci.rpc_callback('gettransaction', [txid])
ci.rpc('sendrawtransaction', [lock_tx.hex()])
rv = ci.rpc('gettransaction', [txid])
wallet_tx_fee = -ci.make_int(rv['fee'])
assert (wallet_tx_fee == fee_value)
@@ -735,7 +735,7 @@ class Test(BaseTest):
lock_spend_tx = ci.createSCLockSpendTx(lock_tx, lock_tx_script, pkh_out, fee_rate, fee_info=fee_info)
vsize_estimated: int = fee_info['vsize']
tx_decoded = ci.rpc_callback('decoderawtransaction', [lock_spend_tx.hex()])
tx_decoded = ci.rpc('decoderawtransaction', [lock_spend_tx.hex()])
txid = tx_decoded['txid']
witness_stack = [
@@ -745,11 +745,11 @@ class Test(BaseTest):
lock_tx_script,
]
lock_spend_tx = ci.setTxSignature(lock_spend_tx, witness_stack)
tx_decoded = ci.rpc_callback('decoderawtransaction', [lock_spend_tx.hex()])
tx_decoded = ci.rpc('decoderawtransaction', [lock_spend_tx.hex()])
vsize_actual: int = tx_decoded['vsize']
assert (vsize_actual <= vsize_estimated and vsize_estimated - vsize_actual < 4)
assert (ci.rpc_callback('sendrawtransaction', [lock_spend_tx.hex()]) == txid)
assert (ci.rpc('sendrawtransaction', [lock_spend_tx.hex()]) == txid)
expect_vsize: int = ci.xmr_swap_a_lock_spend_tx_vsize()
assert (expect_vsize >= vsize_actual)
@@ -766,7 +766,7 @@ class Test(BaseTest):
lock_tx_b_spend = ci.getTransaction(lock_tx_b_spend_txid)
if lock_tx_b_spend is None:
lock_tx_b_spend = ci.getWalletTransaction(lock_tx_b_spend_txid)
lock_tx_b_spend_decoded = ci.rpc_callback('decoderawtransaction', [lock_tx_b_spend.hex()])
lock_tx_b_spend_decoded = ci.rpc('decoderawtransaction', [lock_tx_b_spend.hex()])
expect_vsize: int = ci.xmr_swap_b_lock_spend_tx_vsize()
assert (expect_vsize >= lock_tx_b_spend_decoded['vsize'])
@@ -1354,7 +1354,7 @@ class Test(BaseTest):
lock_tx_b_spend = ci.getTransaction(lock_tx_b_spend_txid)
if lock_tx_b_spend is None:
lock_tx_b_spend = ci.getWalletTransaction(lock_tx_b_spend_txid)
lock_tx_b_spend_decoded = ci.rpc_callback('decoderawtransaction', [lock_tx_b_spend.hex()])
lock_tx_b_spend_decoded = ci.rpc('decoderawtransaction', [lock_tx_b_spend.hex()])
expect_vsize: int = ci.xmr_swap_b_lock_spend_tx_vsize()
assert (expect_vsize >= lock_tx_b_spend_decoded['vsize'])
@@ -1501,7 +1501,7 @@ class Test(BaseTest):
# Verify expected inputs were used
bid, _, _, _, _ = swap_clients[2].getXmrBidAndOffer(bid_id)
assert (bid.xmr_a_lock_tx)
wtx = ci.rpc_callback('gettransaction', [bid.xmr_a_lock_tx.txid.hex(),])
wtx = ci.rpc('gettransaction', [bid.xmr_a_lock_tx.txid.hex(),])
itx_after = ci.describeTx(wtx['hex'])
assert (len(itx_after['vin']) == len(itx_decoded['vin']))
for i, txin in enumerate(itx_decoded['vin']):