Reformat tests with black.

This commit is contained in:
tecnovert
2024-11-15 17:21:20 +02:00
parent b484827c15
commit 7b03ce4769
38 changed files with 10345 additions and 5226 deletions

View File

@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2021-2023 tecnovert
# Copyright (c) 2024 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
@@ -51,45 +52,65 @@ class Test(BaseTest):
def setUpClass(cls):
super(Test, cls).setUpClass()
js_0 = read_json_api(1800, 'wallets/part')
node0_blind_before = js_0['blind_balance'] + js_0['blind_unconfirmed']
js_0 = read_json_api(1800, "wallets/part")
node0_blind_before = js_0["blind_balance"] + js_0["blind_unconfirmed"]
post_json = {
'value': 100,
'address': js_0['stealth_address'],
'subfee': False,
'type_to': 'blind',
"value": 100,
"address": js_0["stealth_address"],
"subfee": False,
"type_to": "blind",
}
json_rv = json.loads(post_json_req('http://127.0.0.1:1800/json/wallets/part/withdraw', post_json))
assert (len(json_rv['txid']) == 64)
json_rv = json.loads(
post_json_req("http://127.0.0.1:1800/json/wallets/part/withdraw", post_json)
)
assert len(json_rv["txid"]) == 64
logging.info('Waiting for blind balance')
wait_for_balance(test_delay_event, 'http://127.0.0.1:1800/json/wallets/part', 'blind_balance', 100.0 + node0_blind_before)
js_0 = read_json_api(1800, 'wallets/part')
logging.info("Waiting for blind balance")
wait_for_balance(
test_delay_event,
"http://127.0.0.1:1800/json/wallets/part",
"blind_balance",
100.0 + node0_blind_before,
)
js_0 = read_json_api(1800, "wallets/part")
def ensure_balance(self, coin_type, node_id, amount):
tla = 'PART'
js_w = read_json_api(1800 + node_id, 'wallets')
print('js_w', js_w)
if float(js_w[tla]['blind_balance']) < amount:
tla = "PART"
js_w = read_json_api(1800 + node_id, "wallets")
print("js_w", js_w)
if float(js_w[tla]["blind_balance"]) < amount:
post_json = {
'value': amount,
'type_to': 'blind',
'address': js_w[tla]['stealth_address'],
'subfee': False,
"value": amount,
"type_to": "blind",
"address": js_w[tla]["stealth_address"],
"subfee": False,
}
json_rv = read_json_api(1800, 'wallets/{}/withdraw'.format(tla.lower()), post_json)
assert (len(json_rv['txid']) == 64)
wait_for_balance(test_delay_event, 'http://127.0.0.1:{}/json/wallets/{}'.format(1800 + node_id, tla.lower()), 'blind_balance', amount)
json_rv = read_json_api(
1800, "wallets/{}/withdraw".format(tla.lower()), post_json
)
assert len(json_rv["txid"]) == 64
wait_for_balance(
test_delay_event,
"http://127.0.0.1:{}/json/wallets/{}".format(
1800 + node_id, tla.lower()
),
"blind_balance",
amount,
)
def getBalance(self, js_wallets):
return float(js_wallets[Coins.PART.name]['blind_balance']) + float(js_wallets[Coins.PART.name]['blind_unconfirmed'])
return float(js_wallets[Coins.PART.name]["blind_balance"]) + float(
js_wallets[Coins.PART.name]["blind_unconfirmed"]
)
def getXmrBalance(self, js_wallets):
return float(js_wallets[Coins.XMR.name]['unconfirmed']) + float(js_wallets[Coins.XMR.name]['balance'])
return float(js_wallets[Coins.XMR.name]["unconfirmed"]) + float(
js_wallets[Coins.XMR.name]["balance"]
)
def test_010_txn_size(self):
logging.info('---------- Test {} txn_size'.format(self.test_coin_from.name))
logging.info("---------- Test {} txn_size".format(self.test_coin_from.name))
self.ensure_balance(self.test_coin_from, 0, 100.0)
@@ -101,20 +122,21 @@ class Test(BaseTest):
nonlocal ci
i = 0
while not delay_event.is_set():
unspents = ci.rpc_wallet('listunspentblind')
unspents = ci.rpc_wallet("listunspentblind")
if len(unspents) >= 1:
return
delay_event.wait(delay_time)
i += 1
if i > iterations:
raise ValueError('wait_for_unspents timed out')
raise ValueError("wait_for_unspents timed out")
wait_for_unspents(test_delay_event)
amount: int = ci.make_int(random.uniform(0.1, 2.0), r=1)
# Record unspents before createSCLockTx as the used ones will be locked
unspents = ci.rpc_wallet('listunspentblind')
locked_utxos_before = ci.rpc_wallet('listlockunspent')
unspents = ci.rpc_wallet("listunspentblind")
locked_utxos_before = ci.rpc_wallet("listlockunspent")
# fee_rate is in sats/kvB
fee_rate: int = 1000
@@ -131,53 +153,61 @@ class Test(BaseTest):
lock_tx = ci.fundSCLockTx(lock_tx, fee_rate, vkbv)
lock_tx = ci.signTxWithWallet(lock_tx)
unspents_after = ci.rpc_wallet('listunspentblind')
locked_utxos_after = ci.rpc_wallet('listlockunspent')
unspents_after = ci.rpc_wallet("listunspentblind")
locked_utxos_after = ci.rpc_wallet("listlockunspent")
assert (len(unspents) > len(unspents_after))
assert (len(locked_utxos_after) > len(locked_utxos_before))
lock_tx_decoded = ci.rpc_wallet('decoderawtransaction', [lock_tx.hex()])
txid = lock_tx_decoded['txid']
assert len(unspents) > len(unspents_after)
assert len(locked_utxos_after) > len(locked_utxos_before)
lock_tx_decoded = ci.rpc_wallet("decoderawtransaction", [lock_tx.hex()])
txid = lock_tx_decoded["txid"]
vsize = lock_tx_decoded['vsize']
vsize = lock_tx_decoded["vsize"]
expect_fee_int = round(fee_rate * vsize / 1000)
expect_fee = ci.format_amount(expect_fee_int)
ci.rpc_wallet('sendrawtransaction', [lock_tx.hex()])
rv = ci.rpc_wallet('gettransaction', [txid])
wallet_tx_fee = -ci.make_int(rv['details'][0]['fee'])
ci.rpc_wallet("sendrawtransaction", [lock_tx.hex()])
rv = ci.rpc_wallet("gettransaction", [txid])
wallet_tx_fee = -ci.make_int(rv["details"][0]["fee"])
assert (wallet_tx_fee >= expect_fee_int)
assert (wallet_tx_fee - expect_fee_int < 20)
assert wallet_tx_fee >= expect_fee_int
assert wallet_tx_fee - expect_fee_int < 20
addr_out = ci.getNewAddress(True)
addrinfo = ci.rpc_wallet('getaddressinfo', [addr_out,])
pk_out = bytes.fromhex(addrinfo['pubkey'])
addrinfo = ci.rpc_wallet(
"getaddressinfo",
[
addr_out,
],
)
pk_out = bytes.fromhex(addrinfo["pubkey"])
fee_info = {}
lock_spend_tx = ci.createSCLockSpendTx(lock_tx, lock_tx_script, pk_out, fee_rate, vkbv, fee_info=fee_info)
vsize_estimated: int = fee_info['vsize']
lock_spend_tx = ci.createSCLockSpendTx(
lock_tx, lock_tx_script, pk_out, fee_rate, vkbv, fee_info=fee_info
)
vsize_estimated: int = fee_info["vsize"]
spend_tx_decoded = ci.rpc('decoderawtransaction', [lock_spend_tx.hex()])
txid = spend_tx_decoded['txid']
spend_tx_decoded = ci.rpc("decoderawtransaction", [lock_spend_tx.hex()])
txid = spend_tx_decoded["txid"]
nonce = ci.getScriptLockTxNonce(vkbv)
output_n, _ = ci.findOutputByNonce(lock_tx_decoded, nonce)
assert (output_n is not None)
valueCommitment = bytes.fromhex(lock_tx_decoded['vout'][output_n]['valueCommitment'])
assert output_n is not None
valueCommitment = bytes.fromhex(
lock_tx_decoded["vout"][output_n]["valueCommitment"]
)
witness_stack = [
b'',
b"",
ci.signTx(a, lock_spend_tx, 0, lock_tx_script, valueCommitment),
ci.signTx(b, lock_spend_tx, 0, lock_tx_script, valueCommitment),
lock_tx_script,
]
lock_spend_tx = ci.setTxSignature(lock_spend_tx, witness_stack)
tx_decoded = ci.rpc('decoderawtransaction', [lock_spend_tx.hex()])
vsize_actual: int = tx_decoded['vsize']
tx_decoded = ci.rpc("decoderawtransaction", [lock_spend_tx.hex()])
vsize_actual: int = tx_decoded["vsize"]
# Note: The fee is set allowing 9 bytes for the encoded fee amount, causing a small overestimate
assert (vsize_actual <= vsize_estimated and vsize_estimated - vsize_actual < 10)
assert (ci.rpc('sendrawtransaction', [lock_spend_tx.hex()]) == txid)
assert vsize_actual <= vsize_estimated and vsize_estimated - vsize_actual < 10
assert ci.rpc("sendrawtransaction", [lock_spend_tx.hex()]) == txid
# Test chain b (no-script) lock tx size
v = ci.getNewSecretKey()
@@ -189,40 +219,50 @@ class Test(BaseTest):
lock_tx_b_spend_txid = None
for i in range(20):
try:
lock_tx_b_spend_txid = ci.spendBLockTx(lock_tx_b_txid, addr_out, v, s, amount, fee_rate, 0)
lock_tx_b_spend_txid = ci.spendBLockTx(
lock_tx_b_txid, addr_out, v, s, amount, fee_rate, 0
)
break
except Exception as e:
print('spendBLockTx failed', str(e))
print("spendBLockTx failed", str(e))
test_delay_event.wait(2)
assert (lock_tx_b_spend_txid is not None)
assert lock_tx_b_spend_txid is not None
lock_tx_b_spend = ci.getTransaction(lock_tx_b_spend_txid)
if lock_tx_b_spend is None:
lock_tx_b_spend = ci.getWalletTransaction(lock_tx_b_spend_txid)
lock_tx_b_spend_decoded = ci.rpc('decoderawtransaction', [lock_tx_b_spend.hex()])
lock_tx_b_spend_decoded = ci.rpc(
"decoderawtransaction", [lock_tx_b_spend.hex()]
)
expect_vsize: int = ci.xmr_swap_b_lock_spend_tx_vsize()
assert (expect_vsize >= lock_tx_b_spend_decoded['vsize'])
assert (expect_vsize - lock_tx_b_spend_decoded['vsize'] < 10)
assert expect_vsize >= lock_tx_b_spend_decoded["vsize"]
assert expect_vsize - lock_tx_b_spend_decoded["vsize"] < 10
def test_01_part_xmr(self):
logging.info('---------- Test PARTct to XMR')
logging.info("---------- Test PARTct to XMR")
swap_clients = self.swap_clients
js_0 = read_json_api(1800, 'wallets/part')
assert (float(js_0['blind_balance']) > 10.0)
node0_blind_before = js_0['blind_balance'] + js_0['blind_unconfirmed']
js_0 = read_json_api(1800, "wallets/part")
assert float(js_0["blind_balance"]) > 10.0
node0_blind_before = js_0["blind_balance"] + js_0["blind_unconfirmed"]
js_1 = read_json_api(1801, 'wallets/part')
node1_blind_before = js_1['blind_balance'] + js_1['blind_unconfirmed']
js_1 = read_json_api(1801, "wallets/part")
node1_blind_before = js_1["blind_balance"] + js_1["blind_unconfirmed"]
js_0_xmr = read_json_api(1800, 'wallets/xmr')
js_1_xmr = read_json_api(1801, 'wallets/xmr')
js_1_xmr = read_json_api(1801, "wallets/xmr")
amt_swap = make_int(random.uniform(0.1, 2.0), scale=8, r=1)
rate_swap = make_int(random.uniform(0.2, 20.0), scale=12, r=1)
offer_id = swap_clients[0].postOffer(Coins.PART_BLIND, Coins.XMR, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP)
offer_id = swap_clients[0].postOffer(
Coins.PART_BLIND,
Coins.XMR,
amt_swap,
rate_swap,
amt_swap,
SwapTypes.XMR_SWAP,
)
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
offers = swap_clients[0].listOffers(filters={'offer_id': offer_id})
offers = swap_clients[0].listOffers(filters={"offer_id": offer_id})
offer = offers[0]
bid_id = swap_clients[1].postXmrBid(offer_id, offer.amount_from)
@@ -231,40 +271,60 @@ class Test(BaseTest):
swap_clients[0].acceptXmrBid(bid_id)
wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, wait_for=180)
wait_for_bid(test_delay_event, swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, sent=True)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.SWAP_COMPLETED,
wait_for=180,
)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
)
amount_from = float(format_amount(amt_swap, 8))
js_1 = read_json_api(1801, 'wallets/part')
node1_blind_after = js_1['blind_balance'] + js_1['blind_unconfirmed']
assert (node1_blind_after > node1_blind_before + (amount_from - 0.05))
js_1 = read_json_api(1801, "wallets/part")
node1_blind_after = js_1["blind_balance"] + js_1["blind_unconfirmed"]
assert node1_blind_after > node1_blind_before + (amount_from - 0.05)
js_0 = read_json_api(1800, 'wallets/part')
node0_blind_after = js_0['blind_balance'] + js_0['blind_unconfirmed']
assert (node0_blind_after < node0_blind_before - amount_from)
js_0 = read_json_api(1800, "wallets/part")
node0_blind_after = js_0["blind_balance"] + js_0["blind_unconfirmed"]
assert node0_blind_after < node0_blind_before - amount_from
js_0_xmr_after = read_json_api(1800, 'wallets/xmr')
js_1_xmr_after = read_json_api(1801, 'wallets/xmr')
js_1_xmr_after = read_json_api(1801, "wallets/xmr")
scale_from = 8
amount_to = int((amt_swap * rate_swap) // (10 ** scale_from))
amount_to = int((amt_swap * rate_swap) // (10**scale_from))
amount_to_float = float(format_amount(amount_to, 12))
node1_xmr_after = float(js_1_xmr_after['unconfirmed']) + float(js_1_xmr_after['balance'])
node1_xmr_before = float(js_1_xmr['unconfirmed']) + float(js_1_xmr['balance'])
assert (node1_xmr_after > node1_xmr_before + (amount_to_float - 0.02))
node1_xmr_after = float(js_1_xmr_after["unconfirmed"]) + float(
js_1_xmr_after["balance"]
)
node1_xmr_before = float(js_1_xmr["unconfirmed"]) + float(js_1_xmr["balance"])
assert node1_xmr_after > node1_xmr_before + (amount_to_float - 0.02)
def test_02_leader_recover_a_lock_tx(self):
logging.info('---------- Test PARTct to XMR leader recovers coin a lock tx')
logging.info("---------- Test PARTct to XMR leader recovers coin a lock tx")
swap_clients = self.swap_clients
js_w0_before = read_json_api(1800, 'wallets')
js_w0_before = read_json_api(1800, "wallets")
node0_blind_before = self.getBalance(js_w0_before)
amt_swap = make_int(random.uniform(0.1, 2.0), scale=8, r=1)
rate_swap = make_int(random.uniform(0.2, 20.0), scale=12, r=1)
offer_id = swap_clients[0].postOffer(
Coins.PART_BLIND, Coins.XMR, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP,
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=12)
Coins.PART_BLIND,
Coins.XMR,
amt_swap,
rate_swap,
amt_swap,
SwapTypes.XMR_SWAP,
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS,
lock_value=12,
)
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
@@ -273,31 +333,49 @@ class Test(BaseTest):
wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.BID_RECEIVED)
bid, xmr_swap = swap_clients[0].getXmrBid(bid_id)
assert (xmr_swap)
assert xmr_swap
swap_clients[1].setBidDebugInd(bid_id, DebugTypes.BID_STOP_AFTER_COIN_A_LOCK)
swap_clients[0].acceptXmrBid(bid_id)
wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, wait_for=180)
wait_for_bid(test_delay_event, swap_clients[1], bid_id, [BidStates.BID_STALLED_FOR_TEST, BidStates.XMR_SWAP_FAILED], sent=True)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.XMR_SWAP_FAILED_REFUNDED,
wait_for=180,
)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
[BidStates.BID_STALLED_FOR_TEST, BidStates.XMR_SWAP_FAILED],
sent=True,
)
js_w0_after = read_json_api(1800, 'wallets')
js_w0_after = read_json_api(1800, "wallets")
node0_blind_after = self.getBalance(js_w0_after)
assert (node0_blind_before - node0_blind_after < 0.02)
assert node0_blind_before - node0_blind_after < 0.02
def test_03_follower_recover_a_lock_tx(self):
logging.info('---------- Test PARTct to XMR follower recovers coin a lock tx')
logging.info("---------- Test PARTct to XMR follower recovers coin a lock tx")
swap_clients = self.swap_clients
js_w0_before = read_json_api(1800, 'wallets')
js_w1_before = read_json_api(1801, 'wallets')
js_w1_before = read_json_api(1801, "wallets")
amt_swap = make_int(random.uniform(0.1, 2.0), scale=8, r=1)
rate_swap = make_int(random.uniform(0.2, 20.0), scale=12, r=1)
offer_id = swap_clients[0].postOffer(
Coins.PART_BLIND, Coins.XMR, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP,
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=32)
Coins.PART_BLIND,
Coins.XMR,
amt_swap,
rate_swap,
amt_swap,
SwapTypes.XMR_SWAP,
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS,
lock_value=32,
)
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
@@ -306,22 +384,37 @@ class Test(BaseTest):
wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.BID_RECEIVED)
bid, xmr_swap = swap_clients[0].getXmrBid(bid_id)
assert (xmr_swap)
assert xmr_swap
swap_clients[1].setBidDebugInd(bid_id, DebugTypes.CREATE_INVALID_COIN_B_LOCK)
swap_clients[0].setBidDebugInd(bid_id, DebugTypes.BID_DONT_SPEND_COIN_A_LOCK_REFUND)
swap_clients[0].setBidDebugInd(
bid_id, DebugTypes.BID_DONT_SPEND_COIN_A_LOCK_REFUND
)
swap_clients[0].acceptXmrBid(bid_id)
wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.BID_STALLED_FOR_TEST, wait_for=180)
wait_for_bid(test_delay_event, swap_clients[1], bid_id, BidStates.XMR_SWAP_FAILED_SWIPED, wait_for=80, sent=True)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.BID_STALLED_FOR_TEST,
wait_for=180,
)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.XMR_SWAP_FAILED_SWIPED,
wait_for=80,
sent=True,
)
js_w1_after = read_json_api(1801, 'wallets')
js_w1_after = read_json_api(1801, "wallets")
node1_blind_before = self.getBalance(js_w1_before)
node1_blind_after = self.getBalance(js_w1_after)
amount_from = float(format_amount(amt_swap, 8))
assert (node1_blind_after - node1_blind_before > (amount_from - 0.02))
assert node1_blind_after - node1_blind_before > (amount_from - 0.02)
swap_clients[0].abandonBid(bid_id)
swap_clients[1].abandonBid(bid_id)
@@ -329,20 +422,29 @@ class Test(BaseTest):
wait_for_none_active(test_delay_event, 1800)
wait_for_none_active(test_delay_event, 1801)
data = parse.urlencode({
'chainbkeysplit': True
}).encode()
offerer_key = json.loads(urlopen('http://127.0.0.1:1800/json/bids/{}'.format(bid_id.hex()), data=data).read())['splitkey']
data = parse.urlencode({"chainbkeysplit": True}).encode()
offerer_key = json.loads(
urlopen(
"http://127.0.0.1:1800/json/bids/{}".format(bid_id.hex()), data=data
).read()
)["splitkey"]
data = parse.urlencode({
'spendchainblocktx': True,
'remote_key': offerer_key
}).encode()
redeemed_txid = json.loads(urlopen('http://127.0.0.1:1801/json/bids/{}'.format(bid_id.hex()), data=data).read())['txid']
assert (len(redeemed_txid) == 64)
data = parse.urlencode(
{"spendchainblocktx": True, "remote_key": offerer_key}
).encode()
redeemed_txid = json.loads(
urlopen(
"http://127.0.0.1:1801/json/bids/{}".format(bid_id.hex()), data=data
).read()
)["txid"]
assert len(redeemed_txid) == 64
def do_test_04_follower_recover_b_lock_tx(self, coin_from, coin_to):
logging.info('---------- Test {} to {} follower recovers coin b lock tx'.format(coin_from.name, coin_to.name))
logging.info(
"---------- Test {} to {} follower recovers coin b lock tx".format(
coin_from.name, coin_to.name
)
)
swap_clients = self.swap_clients
ci_from = swap_clients[0].ci(coin_from)
@@ -351,8 +453,15 @@ class Test(BaseTest):
amt_swap = ci_from.make_int(random.uniform(0.1, 2.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[0].postOffer(
coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP,
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS, lock_value=28)
coin_from,
coin_to,
amt_swap,
rate_swap,
amt_swap,
SwapTypes.XMR_SWAP,
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS,
lock_value=28,
)
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
@@ -361,36 +470,50 @@ class Test(BaseTest):
wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.BID_RECEIVED)
bid, xmr_swap = swap_clients[0].getXmrBid(bid_id)
assert (xmr_swap)
assert xmr_swap
swap_clients[1].setBidDebugInd(bid_id, DebugTypes.CREATE_INVALID_COIN_B_LOCK)
swap_clients[0].acceptXmrBid(bid_id)
wait_for_bid(test_delay_event, swap_clients[0], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, wait_for=180)
wait_for_bid(test_delay_event, swap_clients[1], bid_id, BidStates.XMR_SWAP_FAILED_REFUNDED, sent=True)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.XMR_SWAP_FAILED_REFUNDED,
wait_for=180,
)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.XMR_SWAP_FAILED_REFUNDED,
sent=True,
)
def test_04_follower_recover_b_lock_tx(self):
js_w0_before = read_json_api(1800, 'wallets')
js_w1_before = read_json_api(1801, 'wallets')
js_w0_before = read_json_api(1800, "wallets")
js_w1_before = read_json_api(1801, "wallets")
self.do_test_04_follower_recover_b_lock_tx(self.test_coin_from, Coins.XMR)
js_w0_after = read_json_api(1800, 'wallets')
js_w1_after = read_json_api(1801, 'wallets')
js_w0_after = read_json_api(1800, "wallets")
js_w1_after = read_json_api(1801, "wallets")
node0_blind_before = self.getBalance(js_w0_before)
node0_blind_after = self.getBalance(js_w0_after)
assert (node0_blind_before - node0_blind_after < 0.02)
assert node0_blind_before - node0_blind_after < 0.02
node1_xmr_before = self.getXmrBalance(js_w1_before)
node1_xmr_after = self.getXmrBalance(js_w1_after)
assert (node1_xmr_before - node1_xmr_after < 0.02)
assert node1_xmr_before - node1_xmr_after < 0.02
def test_04_follower_recover_b_lock_tx_from_part(self):
self.ensure_balance(self.test_coin_from, 1, 50.0)
self.do_test_04_follower_recover_b_lock_tx(Coins.PART, self.test_coin_from)
def do_test_05_self_bid(self, coin_from, coin_to):
logging.info('---------- Test {} to {} same client'.format(coin_from.name, coin_to.name))
logging.info(
"---------- Test {} to {} same client".format(coin_from.name, coin_to.name)
)
swap_clients = self.swap_clients
ci_to = swap_clients[0].ci(coin_to)
@@ -400,10 +523,24 @@ class Test(BaseTest):
amt_swap = make_int(random.uniform(0.1, 2.0), scale=8, r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[1].postOffer(coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP, auto_accept_bids=True)
offer_id = swap_clients[1].postOffer(
coin_from,
coin_to,
amt_swap,
rate_swap,
amt_swap,
SwapTypes.XMR_SWAP,
auto_accept_bids=True,
)
bid_id = swap_clients[1].postXmrBid(offer_id, amt_swap)
wait_for_bid(test_delay_event, swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, wait_for=180)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
wait_for=180,
)
def test_05_self_bid(self):
if not self.has_segwit:
@@ -419,45 +556,58 @@ class Test(BaseTest):
self.do_test_05_self_bid(Coins.PART, self.test_coin_from)
def test_06_preselect_inputs(self):
raise ValueError('TODO')
raise ValueError("TODO")
tla_from = self.test_coin_from.name
logging.info('---------- Test {} Preselected inputs'.format(tla_from))
logging.info("---------- Test {} Preselected inputs".format(tla_from))
swap_clients = self.swap_clients
# Prepare balance
self.ensure_balance(self.test_coin_from, 2, 100.0)
js_w2 = read_json_api(1802, 'wallets')
js_w2 = read_json_api(1802, "wallets")
post_json = {
'value': float(js_w2['PART']['blind_balance']),
'type_from': 'blind',
'type_to': 'blind',
'address': js_w2['PART']['stealth_address'],
'subfee': True,
"value": float(js_w2["PART"]["blind_balance"]),
"type_from": "blind",
"type_to": "blind",
"address": js_w2["PART"]["stealth_address"],
"subfee": True,
}
json_rv = read_json_api(1802, 'wallets/{}/withdraw'.format('part'), post_json)
wait_for_balance(test_delay_event, 'http://127.0.0.1:1802/json/wallets/{}'.format('part'), 'blind_balance', 10.0)
assert (len(json_rv['txid']) == 64)
json_rv = read_json_api(1802, "wallets/{}/withdraw".format("part"), post_json)
wait_for_balance(
test_delay_event,
"http://127.0.0.1:1802/json/wallets/{}".format("part"),
"blind_balance",
10.0,
)
assert len(json_rv["txid"]) == 64
# Create prefunded ITX
ci = swap_clients[2].ci(self.test_coin_from)
ci_to = swap_clients[2].ci(Coins.XMR)
pi = swap_clients[2].pi(SwapTypes.XMR_SWAP)
js_w2 = read_json_api(1802, 'wallets')
swap_value = ci.make_int(js_w2['PART']['blind_balance'])
assert (swap_value > ci.make_int(95))
js_w2 = read_json_api(1802, "wallets")
swap_value = ci.make_int(js_w2["PART"]["blind_balance"])
assert swap_value > ci.make_int(95)
itx = pi.getFundedInitiateTxTemplate(ci, swap_value, True)
itx_decoded = ci.describeTx(itx.hex())
n = pi.findMockVout(ci, itx_decoded)
value_after_subfee = ci.make_int(itx_decoded['vout'][n]['value'])
assert (value_after_subfee < swap_value)
value_after_subfee = ci.make_int(itx_decoded["vout"][n]["value"])
assert value_after_subfee < swap_value
swap_value = value_after_subfee
wait_for_unspent(test_delay_event, ci, swap_value)
extra_options = {'prefunded_itx': itx}
extra_options = {"prefunded_itx": itx}
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0))
offer_id = swap_clients[2].postOffer(self.test_coin_from, Coins.XMR, swap_value, rate_swap, swap_value, SwapTypes.XMR_SWAP, extra_options=extra_options)
offer_id = swap_clients[2].postOffer(
self.test_coin_from,
Coins.XMR,
swap_value,
rate_swap,
swap_value,
SwapTypes.XMR_SWAP,
extra_options=extra_options,
)
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
@@ -466,20 +616,38 @@ class Test(BaseTest):
wait_for_bid(test_delay_event, swap_clients[2], bid_id, BidStates.BID_RECEIVED)
swap_clients[2].acceptBid(bid_id)
wait_for_bid(test_delay_event, swap_clients[2], bid_id, BidStates.SWAP_COMPLETED, wait_for=120)
wait_for_bid(test_delay_event, swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, sent=True, wait_for=120)
wait_for_bid(
test_delay_event,
swap_clients[2],
bid_id,
BidStates.SWAP_COMPLETED,
wait_for=120,
)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=120,
)
# Verify expected inputs were used
bid, _, _, _, _ = swap_clients[2].getXmrBidAndOffer(bid_id)
assert (bid.xmr_a_lock_tx)
wtx = ci.rpc_wallet('gettransaction', [bid.xmr_a_lock_tx.txid.hex(),])
itx_after = ci.describeTx(wtx['hex'])
assert (len(itx_after['vin']) == len(itx_decoded['vin']))
for i, txin in enumerate(itx_decoded['vin']):
txin_after = itx_after['vin'][i]
assert (txin['txid'] == txin_after['txid'])
assert (txin['vout'] == txin_after['vout'])
assert bid.xmr_a_lock_tx
wtx = ci.rpc_wallet(
"gettransaction",
[
bid.xmr_a_lock_tx.txid.hex(),
],
)
itx_after = ci.describeTx(wtx["hex"])
assert len(itx_after["vin"]) == len(itx_decoded["vin"])
for i, txin in enumerate(itx_decoded["vin"]):
txin_after = itx_after["vin"][i]
assert txin["txid"] == txin_after["txid"]
assert txin["vout"] == txin_after["vout"]
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()