tests: add remaining electrum tests

Signed-off-by: tecnovert <tecnovert@tecnovert.net>
This commit is contained in:
tecnovert
2026-02-08 01:29:37 +02:00
parent 1ca454b269
commit 54ece5dff9
4 changed files with 439 additions and 48 deletions

View File

@@ -639,8 +639,13 @@ def js_bids(self, url_split, post_string: str, is_json: bool) -> bytes:
) )
if have_data_entry(post_data, "debugind"): if have_data_entry(post_data, "debugind"):
main_debug_ind: bool = toBool(
get_data_entry_or(post_data, "maindebugind", True)
)
swap_client.setBidDebugInd( swap_client.setBidDebugInd(
bid_id, int(get_data_entry(post_data, "debugind")) bid_id,
int(get_data_entry(post_data, "debugind")),
add_to_bid=main_debug_ind,
) )
rv = {"bid_id": bid_id.hex()} rv = {"bid_id": bid_id.hex()}
@@ -658,8 +663,13 @@ def js_bids(self, url_split, post_string: str, is_json: bool) -> bytes:
elif have_data_entry(post_data, "abandon"): elif have_data_entry(post_data, "abandon"):
swap_client.abandonBid(bid_id) swap_client.abandonBid(bid_id)
elif have_data_entry(post_data, "debugind"): elif have_data_entry(post_data, "debugind"):
main_debug_ind: bool = toBool(
get_data_entry_or(post_data, "maindebugind", True)
)
swap_client.setBidDebugInd( swap_client.setBidDebugInd(
bid_id, int(get_data_entry(post_data, "debugind")) bid_id,
int(get_data_entry(post_data, "debugind")),
add_to_bid=main_debug_ind,
) )
if have_data_entry(post_data, "show_extra"): if have_data_entry(post_data, "show_extra"):
@@ -668,7 +678,9 @@ def js_bids(self, url_split, post_string: str, is_json: bool) -> bytes:
with_events = True with_events = True
bid, xmr_swap, offer, xmr_offer, events = swap_client.getXmrBidAndOffer(bid_id) bid, xmr_swap, offer, xmr_offer, events = swap_client.getXmrBidAndOffer(bid_id)
assert bid, "Unknown bid ID" if bid is None:
swap_client.log.debug(f"js_bids: Unknown bid id {bid_id.hex()}")
return bytes(json.dumps({"error": "Unknown bid id"}), "UTF-8")
if post_string != "": if post_string != "":
if have_data_entry(post_data, "chainbkeysplit"): if have_data_entry(post_data, "chainbkeysplit"):

View File

@@ -171,7 +171,7 @@ def prepare_balance(
wait_for_amount: float = amount wait_for_amount: float = amount
if not test_balance: if not test_balance:
wait_for_amount += current_balance wait_for_amount += current_balance
delay_iterations = 100 if coin == Coins.NAV else 20 delay_iterations = 100 if coin == Coins.NAV else 30
delay_time = 5 if coin == Coins.NAV else 3 delay_time = 5 if coin == Coins.NAV else 3
wait_for_balance( wait_for_balance(
use_delay_event, use_delay_event,

View File

@@ -209,7 +209,7 @@ def updateThread(cls):
calldogerpc(0, "generatetoaddress", [1, cls.doge_addr]) calldogerpc(0, "generatetoaddress", [1, cls.doge_addr])
except Exception as e: except Exception as e:
print("updateThread error", str(e)) print("updateThread error", str(e))
cls.delay_event.wait(random.randrange(cls.update_min, cls.update_max)) cls.delay_event.wait(random.uniform(cls.update_min, cls.update_max))
def updateThreadXMR(cls): def updateThreadXMR(cls):
@@ -228,7 +228,7 @@ def updateThreadXMR(cls):
) )
except Exception as e: except Exception as e:
print("updateThreadXMR error", str(e)) print("updateThreadXMR error", str(e))
cls.delay_event.wait(random.randrange(cls.xmr_update_min, cls.xmr_update_max)) cls.delay_event.wait(random.uniform(cls.xmr_update_min, cls.xmr_update_max))
def updateThreadDCR(cls): def updateThreadDCR(cls):
@@ -262,7 +262,7 @@ def updateThreadDCR(cls):
logging.warning("updateThreadDCR generate {}".format(e)) logging.warning("updateThreadDCR generate {}".format(e))
except Exception as e: except Exception as e:
print("updateThreadDCR error", str(e)) print("updateThreadDCR error", str(e))
cls.delay_event.wait(random.randrange(cls.dcr_update_min, cls.dcr_update_max)) cls.delay_event.wait(random.uniform(cls.dcr_update_min, cls.dcr_update_max))
def signal_handler(self, sig, frame): def signal_handler(self, sig, frame):

View File

@@ -69,8 +69,6 @@ from basicswap.util.daemon import Daemon
from tests.basicswap.common import ( from tests.basicswap.common import (
prepare_balance, prepare_balance,
stopDaemons, stopDaemons,
waitForNumBids,
waitForNumOffers,
) )
from tests.basicswap.common_xmr import run_prepare, TEST_PATH from tests.basicswap.common_xmr import run_prepare, TEST_PATH
from tests.basicswap.extended.test_xmr_persistent import ( from tests.basicswap.extended.test_xmr_persistent import (
@@ -91,22 +89,37 @@ if not len(logger.handlers):
logger.addHandler(logging.StreamHandler(sys.stdout)) logger.addHandler(logging.StreamHandler(sys.stdout))
def modifyConfig(test_path, i): def modify_config(test_path, i):
if i == 1:
config_path = os.path.join(test_path, f"client{i}", cfg.CONFIG_FILENAME) config_path = os.path.join(test_path, f"client{i}", cfg.CONFIG_FILENAME)
with open(config_path) as fp: with open(config_path) as fp:
settings = json.load(fp) settings = json.load(fp)
if i == 1:
settings["debug_ui"] = True settings["debug_ui"] = True
settings["fetchpricesthread"] = False settings.update(
{
"fetchpricesthread": False,
"check_progress_seconds": 2,
"check_watched_seconds": 3,
"check_expired_seconds": 60,
"check_events_seconds": 1,
"check_xmr_swaps_seconds": 1,
"min_delay_event": 1,
"max_delay_event": 4,
"min_delay_event_short": 1,
"max_delay_event_short": 3,
"min_delay_retry": 2,
"max_delay_retry": 10,
}
)
with open(config_path, "w") as fp: with open(config_path, "w") as fp:
json.dump(settings, fp, indent=4) json.dump(settings, fp, indent=4)
def wait_for_bid_state( def wait_for_bid_state(
delay_event, node_port: int, bid_id: str, state=None, wait_for: int = 40 delay_event, node_port: int, bid_id: str, state=None, wait_for: int = 30
) -> None: ) -> None:
logger.info(f"TEST: wait_for_bid {bid_id}, state {state}") logger.info(f"TEST: wait_for_bid {bid_id}, node {node_port}, state {state}")
pass_state_strs = [] pass_state_strs = []
if isinstance(state, (list, tuple)): if isinstance(state, (list, tuple)):
@@ -129,6 +142,25 @@ def wait_for_bid_state(
raise ValueError(f"wait_for_bid timed out {bid_id}.") raise ValueError(f"wait_for_bid timed out {bid_id}.")
def wait_for_offer(
delay_event, node_port: int, offer_id: str, state=None, wait_for: int = 30
) -> None:
logger.info(f"TEST: wait_for_offer {offer_id}, node {node_port}, state {state}")
for i in range(wait_for):
if delay_event.is_set():
raise ValueError("Test stopped.")
delay_event.wait(1)
try:
rv = read_json_api(node_port, f"offers/{offer_id}")
if any(offer["offer_id"] == offer_id for offer in rv):
return
except Exception as e: # noqa: F841
pass
# logger.debug(f"TEST: wait_for_offer {offer_id}, error {e}")
raise ValueError(f"wait_for_offer timed out {offer_id}.")
class TestFunctions(BaseTestWithPrepare): class TestFunctions(BaseTestWithPrepare):
__test__ = False __test__ = False
@@ -165,17 +197,12 @@ class TestFunctions(BaseTestWithPrepare):
f"Creating offer {amt_from_str} {ticker_from} -> {amt_to_str} {ticker_to}" f"Creating offer {amt_from_str} {ticker_from} -> {amt_to_str} {ticker_to}"
) )
offer_id: str = post_json_api(port_node_from, "offers/new", data)["offer_id"] offer_id: str = post_json_api(port_node_from, "offers/new", data)["offer_id"]
summary = read_json_api(port_node_from) wait_for_offer(self.delay_event, port_node_to, offer_id)
assert summary["num_sent_offers"] == 1 offer = read_json_api(port_node_to, f"offers/{offer_id}")[0]
assert offer["offer_id"] == offer_id
logger.info(f"Waiting for offer: {offer_id}")
waitForNumOffers(self.delay_event, port_node_to, 1)
offers = read_json_api(port_node_to, "offers")
offer = offers[0]
data = { data = {
"offer_id": offer["offer_id"], "offer_id": offer_id,
"amount_from": offer["amount_from"], "amount_from": offer["amount_from"],
"validmins": 60, "validmins": 60,
} }
@@ -185,7 +212,7 @@ class TestFunctions(BaseTestWithPrepare):
self.delay_event, port_node_from, bid_id, BidStates.BID_RECEIVED self.delay_event, port_node_from, bid_id, BidStates.BID_RECEIVED
) )
rv = post_json_api(port_node_from, "bids/{}".format(bid_id), {"accept": True}) rv = post_json_api(port_node_from, f"bids/{bid_id}", {"accept": True})
assert rv["bid_state"] in ("Accepted", "Request accepted") assert rv["bid_state"] in ("Accepted", "Request accepted")
logger.info("Completing swap") logger.info("Completing swap")
@@ -195,8 +222,6 @@ class TestFunctions(BaseTestWithPrepare):
wait_for_bid_state( wait_for_bid_state(
self.delay_event, port_node_to, bid_id, BidStates.SWAP_COMPLETED, 240 self.delay_event, port_node_to, bid_id, BidStates.SWAP_COMPLETED, 240
) )
# Wait for bid to be removed from in-progress
waitForNumBids(self.delay_event, port_node_from, 0)
def do_test_02_leader_recover_a_lock_tx( def do_test_02_leader_recover_a_lock_tx(
self, self,
@@ -206,7 +231,7 @@ class TestFunctions(BaseTestWithPrepare):
port_node_to: int = port_node_1, port_node_to: int = port_node_1,
lock_value: int = 12, lock_value: int = 12,
) -> None: ) -> None:
logging.info( logger.info(
f"---------- Test {coin_from.name} ({port_node_from}) to {coin_to.name} ({port_node_to}) leader recovers coin a lock tx" f"---------- Test {coin_from.name} ({port_node_from}) to {coin_to.name} ({port_node_to}) leader recovers coin a lock tx"
) )
@@ -218,7 +243,7 @@ class TestFunctions(BaseTestWithPrepare):
port_bidder: int = port_node_to port_bidder: int = port_node_to
port_leader: int = port_bidder if reverse_bid else port_offerer port_leader: int = port_bidder if reverse_bid else port_offerer
port_follower: int = port_offerer if reverse_bid else port_bidder port_follower: int = port_offerer if reverse_bid else port_bidder
logging.info( logger.info(
f"Offerer, bidder, leader, follower: {port_offerer}, {port_bidder}, {port_leader}, {port_follower}" f"Offerer, bidder, leader, follower: {port_offerer}, {port_bidder}, {port_leader}, {port_follower}"
) )
@@ -239,17 +264,12 @@ class TestFunctions(BaseTestWithPrepare):
f"Creating offer {amt_from_str} {ticker_from} -> {amt_to_str} {ticker_to}" f"Creating offer {amt_from_str} {ticker_from} -> {amt_to_str} {ticker_to}"
) )
offer_id: str = post_json_api(port_node_from, "offers/new", data)["offer_id"] offer_id: str = post_json_api(port_node_from, "offers/new", data)["offer_id"]
summary = read_json_api(port_node_from) wait_for_offer(self.delay_event, port_node_to, offer_id)
assert summary["num_sent_offers"] == 1 offer = read_json_api(port_node_to, f"offers/{offer_id}")[0]
assert offer["offer_id"] == offer_id
logger.info(f"Waiting for offer: {offer_id}")
waitForNumOffers(self.delay_event, port_node_to, 1)
offers = read_json_api(port_node_to, "offers")
offer = offers[0]
data = { data = {
"offer_id": offer["offer_id"], "offer_id": offer_id,
"amount_from": offer["amount_from"], "amount_from": offer["amount_from"],
"validmins": 60, "validmins": 60,
} }
@@ -267,7 +287,6 @@ class TestFunctions(BaseTestWithPrepare):
wait_for_bid_state( wait_for_bid_state(
self.delay_event, port_node_from, bid_id, BidStates.BID_RECEIVED self.delay_event, port_node_from, bid_id, BidStates.BID_RECEIVED
) )
rv = post_json_api(port_offerer, f"bids/{bid_id}", {"accept": True}) rv = post_json_api(port_offerer, f"bids/{bid_id}", {"accept": True})
assert rv["bid_state"] in ("Accepted", "Request accepted") assert rv["bid_state"] in ("Accepted", "Request accepted")
@@ -278,11 +297,255 @@ class TestFunctions(BaseTestWithPrepare):
BidStates.XMR_SWAP_FAILED_REFUNDED, BidStates.XMR_SWAP_FAILED_REFUNDED,
240, 240,
) )
wait_for_bid_state(
self.delay_event,
port_follower,
bid_id,
[BidStates.BID_STALLED_FOR_TEST, BidStates.XMR_SWAP_FAILED],
240,
)
def do_test_03_follower_recover_a_lock_tx(
self,
coin_from: Coins,
coin_to: Coins,
port_node_from: int = port_node_0,
port_node_to: int = port_node_1,
lock_value: int = 12,
with_mercy: bool = True,
) -> None:
logger.info(
"---------- Test {} ({}) to {} ({}) follower recovers coin a lock tx{}".format(
coin_from.name,
port_node_from,
coin_to.name,
port_node_to,
" (with mercy tx)" if with_mercy else "",
)
)
# Leader is too slow to recover the coin a lock tx and follower swipes it
# Coin B lock tx remains unspent unless a mercy output revealing the follower's keyshare is sent
ticker_from: str = chainparams[coin_from]["ticker"]
ticker_to: str = chainparams[coin_to]["ticker"]
reverse_bid: bool = True if coin_from in (Coins.XMR,) else False
port_offerer: int = port_node_from
port_bidder: int = port_node_to
port_leader: int = port_bidder if reverse_bid else port_offerer
port_follower: int = port_offerer if reverse_bid else port_bidder
logger.info(
f"Offerer, bidder, leader, follower: {port_offerer}, {port_bidder}, {port_leader}, {port_follower}"
)
amt_from_str = f"{random.uniform(0.5, 10.0):.{8}f}"
amt_to_str = f"{random.uniform(0.5, 10.0):.{8}f}"
data = {
"addr_from": "-1",
"coin_from": ticker_from,
"coin_to": ticker_to,
"amt_from": amt_from_str,
"amt_to": amt_to_str,
"swap_type": "adaptor_sig",
"lock_type": str(int(TxLockTypes.SEQUENCE_LOCK_BLOCKS)),
"lock_blocks": str(lock_value),
}
logger.info(
f"Creating offer {amt_from_str} {ticker_from} -> {amt_to_str} {ticker_to}"
)
offer_id: str = post_json_api(port_node_from, "offers/new", data)["offer_id"]
wait_for_offer(self.delay_event, port_node_to, offer_id)
offer = read_json_api(port_node_to, f"offers/{offer_id}")[0]
assert offer["offer_id"] == offer_id
data = {
"offer_id": offer_id,
"amount_from": offer["amount_from"],
"validmins": 60,
}
rv = post_json_api(port_node_to, "bids/new", data)
bid_id: str = rv["bid_id"]
wait_for_bid_state(self.delay_event, port_leader, bid_id)
wait_for_bid_state(self.delay_event, port_follower, bid_id)
rv = post_json_api(
port_leader,
f"bids/{bid_id}",
{"debugind": DebugTypes.BID_DONT_SPEND_COIN_A_LOCK_REFUND2},
)
assert "bid_state" in rv # Test that the return didn't fail
rv = post_json_api(
port_follower,
f"bids/{bid_id}",
{"debugind": DebugTypes.BID_DONT_SPEND_COIN_B_LOCK},
)
assert "bid_state" in rv
for node_port in (port_leader, port_follower):
rv = post_json_api(
port_follower,
f"bids/{bid_id}",
{
"debugind": DebugTypes.BID_DONT_SPEND_COIN_B_LOCK,
"maindebugind": False,
},
)
assert "bid_state" in rv
wait_for_bid_state(
self.delay_event, port_node_from, bid_id, BidStates.BID_RECEIVED
)
rv = post_json_api(port_offerer, f"bids/{bid_id}", {"accept": True})
assert rv["bid_state"] in ("Accepted", "Request accepted")
expect_state = (
(BidStates.XMR_SWAP_NOSCRIPT_TX_REDEEMED, BidStates.SWAP_COMPLETED)
if with_mercy
else (BidStates.BID_STALLED_FOR_TEST, BidStates.XMR_SWAP_FAILED_SWIPED)
)
wait_for_bid_state(
self.delay_event,
port_leader,
bid_id,
expect_state,
240,
)
wait_for_bid_state(
self.delay_event,
port_follower,
bid_id,
[BidStates.XMR_SWAP_FAILED_SWIPED],
240,
)
rv = post_json_api(
port_leader,
f"bids/{bid_id}",
{"show_extra": True, "with_events": True},
)
events = rv["events"]
logger.info(f"Initiator events: {events}")
if with_mercy:
assert any(
event["desc"] == "Lock tx B spend tx published" for event in events
)
rv = post_json_api(
port_follower,
f"bids/{bid_id}",
{"show_extra": True, "with_events": True},
)
events = rv["events"]
logger.info(f"Participant events: {events}")
assert any(
event["desc"] == "Lock tx A refund swipe tx published" for event in events
)
def do_test_04_follower_recover_b_lock_tx(
self,
coin_from: Coins,
coin_to: Coins,
port_node_from: int = port_node_0,
port_node_to: int = port_node_1,
lock_value: int = 16,
) -> None:
logger.info(
f"---------- Test {coin_from.name} ({port_node_from}) to {coin_to.name} ({port_node_to}) follower recovers coin b lock tx"
)
ticker_from: str = chainparams[coin_from]["ticker"]
ticker_to: str = chainparams[coin_to]["ticker"]
reverse_bid: bool = True if coin_from in (Coins.XMR,) else False
port_offerer: int = port_node_from
port_bidder: int = port_node_to
port_leader: int = port_bidder if reverse_bid else port_offerer
port_follower: int = port_offerer if reverse_bid else port_bidder
logger.info(
f"Offerer, bidder, leader, follower: {port_offerer}, {port_bidder}, {port_leader}, {port_follower}"
)
amt_from_str = f"{random.uniform(0.5, 10.0):.{8}f}"
amt_to_str = f"{random.uniform(0.5, 10.0):.{8}f}"
data = {
"addr_from": "-1",
"coin_from": ticker_from,
"coin_to": ticker_to,
"amt_from": amt_from_str,
"amt_to": amt_to_str,
"swap_type": "adaptor_sig",
"lock_type": str(int(TxLockTypes.SEQUENCE_LOCK_BLOCKS)),
"lock_blocks": str(lock_value),
}
logger.info(
f"Creating offer {amt_from_str} {ticker_from} -> {amt_to_str} {ticker_to}"
)
offer_id: str = post_json_api(port_node_from, "offers/new", data)["offer_id"]
wait_for_offer(self.delay_event, port_node_to, offer_id)
offer = read_json_api(port_node_to, f"offers/{offer_id}")[0]
assert offer["offer_id"] == offer_id
data = {
"offer_id": offer_id,
"amount_from": offer["amount_from"],
"validmins": 60,
}
rv = post_json_api(port_node_to, "bids/new", data)
bid_id: str = rv["bid_id"]
wait_for_bid_state(self.delay_event, port_follower, bid_id)
rv = post_json_api(
port_follower,
f"bids/{bid_id}",
{"debugind": DebugTypes.CREATE_INVALID_COIN_B_LOCK},
)
assert "bid_state" in rv
wait_for_bid_state(
self.delay_event, port_node_from, bid_id, BidStates.BID_RECEIVED
)
rv = post_json_api(port_offerer, f"bids/{bid_id}", {"accept": True})
assert rv["bid_state"] in ("Accepted", "Request accepted")
wait_for_bid_state(
self.delay_event,
port_leader,
bid_id,
BidStates.XMR_SWAP_FAILED_REFUNDED,
240,
)
wait_for_bid_state(
self.delay_event,
port_follower,
bid_id,
BidStates.XMR_SWAP_FAILED_REFUNDED,
240,
)
rv = post_json_api(
port_leader,
f"bids/{bid_id}",
{"show_extra": True, "with_events": True},
)
events = rv["events"]
logger.info(f"Initiator events: {events}")
assert any(event["desc"] == "Detected invalid lock Tx B" for event in events)
assert any(
event["desc"] == "Lock tx A refund spend tx published" for event in events
)
rv = post_json_api(
port_follower,
f"bids/{bid_id}",
{"show_extra": True, "with_events": True},
)
events = rv["events"]
logger.info(f"Participant events: {events}")
assert any(event["desc"] == "Lock tx B refund tx published" for event in events)
class Test(TestFunctions): class Test(TestFunctions):
__test__ = True __test__ = True
update_min = 2 update_min = 1.7
daemons = [] daemons = []
test_coin_a = Coins.PART test_coin_a = Coins.PART
@@ -382,7 +645,7 @@ class Test(TestFunctions):
@classmethod @classmethod
def modifyConfig(cls, test_path, i): def modifyConfig(cls, test_path, i):
modifyConfig(test_path, i) modify_config(test_path, i)
@classmethod @classmethod
def setupNodes(cls): def setupNodes(cls):
@@ -426,7 +689,7 @@ class Test(TestFunctions):
prepare_balance( prepare_balance(
self.delay_event, self.delay_event,
self.test_coin_b, self.test_coin_b,
100, 1000,
self.port_node_1, self.port_node_1,
self.port_node_0, self.port_node_0,
True, True,
@@ -445,8 +708,24 @@ class Test(TestFunctions):
self.do_test_01_full_swap(self.test_coin_b, self.test_coin_xmr) self.do_test_01_full_swap(self.test_coin_b, self.test_coin_xmr)
def test_01_c_full_swap_xmr_reverse(self): def test_01_c_full_swap_xmr_reverse(self):
prepare_balance(
self.delay_event,
self.test_coin_b,
100,
self.port_node_1,
self.port_node_0,
True,
)
prepare_balance(
self.delay_event,
self.test_coin_xmr,
1000,
self.port_node_0,
self.port_node_1,
True,
)
self.do_test_01_full_swap( self.do_test_01_full_swap(
self.test_coin_xmr, self.test_coin_b, self.port_node_1, self.port_node_0 self.test_coin_xmr, self.test_coin_b, self.port_node_0, self.port_node_1
) )
def test_02_a_leader_recover_a_lock_tx(self): def test_02_a_leader_recover_a_lock_tx(self):
@@ -471,8 +750,108 @@ class Test(TestFunctions):
) )
def test_02_b_leader_recover_a_lock_tx_reverse(self): def test_02_b_leader_recover_a_lock_tx_reverse(self):
prepare_balance(
self.delay_event,
self.test_coin_b,
100,
self.port_node_1,
self.port_node_0,
True,
)
prepare_balance(
self.delay_event,
self.test_coin_xmr,
100,
self.port_node_0,
self.port_node_1,
True,
)
self.do_test_02_leader_recover_a_lock_tx( self.do_test_02_leader_recover_a_lock_tx(
self.test_coin_xmr, self.test_coin_b, self.port_node_1, self.port_node_0 self.test_coin_xmr, self.test_coin_b, self.port_node_0, self.port_node_1
)
def test_03_a_follower_recover_a_lock_tx(self):
prepare_balance(
self.delay_event,
self.test_coin_b,
100,
self.port_node_1,
self.port_node_0,
True,
)
prepare_balance(
self.delay_event,
self.test_coin_xmr,
100,
self.port_node_0,
self.port_node_1,
True,
)
self.do_test_03_follower_recover_a_lock_tx(
self.test_coin_b, self.test_coin_xmr, self.port_node_1, self.port_node_0
)
def test_03_b_follower_recover_a_lock_tx_reverse(self):
prepare_balance(
self.delay_event,
self.test_coin_b,
100,
self.port_node_1,
self.port_node_0,
True,
)
prepare_balance(
self.delay_event,
self.test_coin_xmr,
100,
self.port_node_0,
self.port_node_1,
True,
)
self.do_test_03_follower_recover_a_lock_tx(
self.test_coin_xmr, self.test_coin_b, self.port_node_0, self.port_node_1
)
def test_04_a_follower_recover_b_lock_tx(self):
prepare_balance(
self.delay_event,
self.test_coin_b,
100,
self.port_node_1,
self.port_node_0,
True,
)
prepare_balance(
self.delay_event,
self.test_coin_xmr,
100,
self.port_node_0,
self.port_node_1,
True,
)
self.do_test_04_follower_recover_b_lock_tx(
self.test_coin_b, self.test_coin_xmr, self.port_node_1, self.port_node_0
)
def test_04_b_follower_recover_b_lock_tx_reverse(self):
prepare_balance(
self.delay_event,
self.test_coin_b,
100,
self.port_node_1,
self.port_node_0,
True,
)
prepare_balance(
self.delay_event,
self.test_coin_xmr,
100,
self.port_node_0,
self.port_node_1,
True,
)
self.do_test_04_follower_recover_b_lock_tx(
self.test_coin_xmr, self.test_coin_b, self.port_node_0, self.port_node_1
) )