Merge pull request #4 from tecnovert/electrum_tests

tests: add base for electrum functional tests
This commit is contained in:
Gerlof van Ek
2026-02-06 20:28:49 +01:00
committed by GitHub
6 changed files with 587 additions and 84 deletions

View File

@@ -253,6 +253,14 @@ def parseOfferFormData(swap_client, form_data, page_data, options={}):
get_data_entry(form_data, "valid_for_seconds")
)
if swap_client.debug:
if have_data_entry(form_data, "lock_type"):
parsed_data["lock_type"] = TxLockTypes(
int(get_data_entry(form_data, "lock_type"))
)
if have_data_entry(form_data, "lock_blocks"):
parsed_data["lock_blocks"] = int(get_data_entry(form_data, "lock_blocks"))
try:
if len(errors) == 0 and page_data["swap_style"] == "xmr":
reverse_bid: bool = swap_client.is_reverse_ads_bid(coin_from, coin_to)
@@ -346,7 +354,15 @@ def postNewOfferFromParsed(swap_client, parsed_data):
lock_type = TxLockTypes.ABS_LOCK_TIME
extra_options = {}
lock_value: int = parsed_data.get("lock_seconds", -1)
if swap_client.debug:
if "lock_type" in parsed_data:
lock_type = parsed_data["lock_type"]
if "lock_blocks" in parsed_data:
lock_value = parsed_data["lock_blocks"]
if "fee_from_conf" in parsed_data:
extra_options["from_fee_conf_target"] = parsed_data["fee_from_conf"]
if "fee_from_conf" in parsed_data:
extra_options["from_fee_conf_target"] = parsed_data["fee_from_conf"]
if "from_fee_multiplier_percent" in parsed_data:
@@ -397,7 +413,7 @@ def postNewOfferFromParsed(swap_client, parsed_data):
parsed_data["amt_bid_min"],
swap_type,
lock_type=lock_type,
lock_value=parsed_data["lock_seconds"],
lock_value=lock_value,
addr_send_from=parsed_data["addr_from"],
extra_options=extra_options,
)

View File

@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020-2024 tecnovert
# Copyright (c) 2024-2025 The Basicswap developers
# Copyright (c) 2024-2026 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE.txt or http://www.opensource.org/licenses/mit-license.php.
@@ -15,6 +15,7 @@ import subprocess
from urllib.request import urlopen
from .util import read_json_api
from basicswap.basicswap import Coins
from basicswap.rpc import callrpc
from basicswap.util import toBool
from basicswap.contrib.rpcauth import generate_salt, password_to_hmac
@@ -125,6 +126,65 @@ def prepareDataDir(
return node_dir
def prepare_balance(
use_delay_event,
coin,
amount: float,
port_target_node: int,
port_take_from_node: int,
test_balance: bool = True,
) -> None:
if coin == Coins.PART_BLIND:
coin_ticker: str = "PART"
balance_type: str = "blind_balance"
address_type: str = "stealth_address"
type_to: str = "blind"
elif coin == Coins.PART_ANON:
coin_ticker: str = "PART"
balance_type: str = "anon_balance"
address_type: str = "stealth_address"
type_to: str = "anon"
else:
coin_ticker: str = coin.name
balance_type: str = "balance"
address_type: str = "deposit_address"
js_w = read_json_api(port_target_node, "wallets")
current_balance: float = float(js_w[coin_ticker][balance_type])
if test_balance and current_balance >= amount:
return
post_json = {
"value": amount,
"address": js_w[coin_ticker][address_type],
"subfee": False,
}
if coin in (Coins.XMR, Coins.WOW):
post_json["sweepall"] = False
if coin in (Coins.PART_BLIND, Coins.PART_ANON):
post_json["type_to"] = type_to
json_rv = read_json_api(
port_take_from_node,
"wallets/{}/withdraw".format(coin_ticker.lower()),
post_json,
)
assert len(json_rv["txid"]) == 64
wait_for_amount: float = amount
if not test_balance:
wait_for_amount += current_balance
delay_iterations = 100 if coin == Coins.NAV else 20
delay_time = 5 if coin == Coins.NAV else 3
wait_for_balance(
use_delay_event,
"http://127.0.0.1:{}/json/wallets/{}".format(
port_target_node, coin_ticker.lower()
),
balance_type,
wait_for_amount,
iterations=delay_iterations,
delay_time=delay_time,
)
def checkForks(ro):
try:
if "bip9_softforks" in ro:

View File

@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2020-2024 tecnovert
# Copyright (c) 2024-2025 The Basicswap developers
# Copyright (c) 2024-2026 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
@@ -139,6 +139,7 @@ def run_prepare(
use_rpcauth=False,
extra_settings={},
port_ofs=0,
extra_args=[],
):
config_path = os.path.join(datadir_path, cfg.CONFIG_FILENAME)
@@ -180,7 +181,7 @@ def run_prepare(
"-noextractover",
"-noreleasesizecheck",
"-xmrrestoreheight=0",
]
] + extra_args
if mnemonic_in:
testargs.append(f'-particl_mnemonic="{mnemonic_in}"')
@@ -644,6 +645,7 @@ class XmrTestBase(TestBase):
runSystem.main()
def start_processes(self):
multiprocessing.set_start_method("fork")
self.delay_event.clear()
for i in range(3):

View File

@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2021-2024 tecnovert
# Copyright (c) 2024-2025 The Basicswap developers
# Copyright (c) 2024-2026 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
@@ -270,7 +270,7 @@ def signal_handler(self, sig, frame):
self.delay_event.set()
def run_thread(self, client_id):
def run_process(client_id):
client_path = os.path.join(test_path, "client{}".format(client_id))
testargs = [
"basicswap-run",
@@ -283,16 +283,14 @@ def run_thread(self, client_id):
def start_processes(self):
multiprocessing.set_start_method("spawn")
self.delay_event.clear()
for i in range(NUM_NODES):
self.processes.append(
multiprocessing.Process(
target=run_thread,
args=(
self,
i,
),
target=run_process,
args=(i,),
)
)
self.processes[-1].start()
@@ -302,7 +300,7 @@ def start_processes(self):
wallets = read_json_api(UI_PORT + 1, "wallets")
if "monero" in TEST_COINS_LIST:
if "monero" in self.test_coins_list:
xmr_auth = None
if os.getenv("XMR_RPC_USER", "") != "":
xmr_auth = (os.getenv("XMR_RPC_USER", ""), os.getenv("XMR_RPC_PWD", ""))
@@ -336,7 +334,7 @@ def start_processes(self):
callbtcrpc(0, "generatetoaddress", [num_blocks, self.btc_addr])
logging.info("BTC blocks: {}".format(callbtcrpc(0, "getblockcount")))
if "litecoin" in TEST_COINS_LIST:
if "litecoin" in self.test_coins_list:
self.ltc_addr = callltcrpc(
0, "getnewaddress", ["mining_addr"], wallet="wallet.dat"
)
@@ -367,7 +365,7 @@ def start_processes(self):
wallet="wallet.dat",
)
if "decred" in TEST_COINS_LIST:
if "decred" in self.test_coins_list:
if RESET_TEST:
_ = calldcrrpc(0, "getnewaddress")
# assert (addr == self.dcr_addr)
@@ -397,7 +395,7 @@ def start_processes(self):
self.update_thread_dcr = threading.Thread(target=updateThreadDCR, args=(self,))
self.update_thread_dcr.start()
if "firo" in TEST_COINS_LIST:
if "firo" in self.test_coins_list:
self.firo_addr = callfirorpc(0, "getnewaddress", ["mining_addr"])
num_blocks: int = 200
have_blocks: int = callfirorpc(0, "getblockcount")
@@ -413,7 +411,7 @@ def start_processes(self):
[num_blocks - have_blocks, self.firo_addr],
)
if "bitcoincash" in TEST_COINS_LIST:
if "bitcoincash" in self.test_coins_list:
self.bch_addr = callbchrpc(
0, "getnewaddress", ["mining_addr"], wallet="wallet.dat"
)
@@ -432,7 +430,7 @@ def start_processes(self):
wallet="wallet.dat",
)
if "dogecoin" in TEST_COINS_LIST:
if "dogecoin" in self.test_coins_list:
self.doge_addr = calldogerpc(0, "getnewaddress", ["mining_addr"])
num_blocks: int = 200
have_blocks: int = calldogerpc(0, "getblockcount")
@@ -446,7 +444,7 @@ def start_processes(self):
0, "generatetoaddress", [num_blocks - have_blocks, self.doge_addr]
)
if "namecoin" in TEST_COINS_LIST:
if "namecoin" in self.test_coins_list:
self.nmc_addr = callnmcrpc(0, "getnewaddress", ["mining_addr", "bech32"])
num_blocks: int = 500
have_blocks: int = callnmcrpc(0, "getblockcount")
@@ -539,7 +537,22 @@ class BaseTestWithPrepare(unittest.TestCase):
firo_addr = None
bch_addr = None
doge_addr = None
initialised = False
test_coins_list = TEST_COINS_LIST
@classmethod
def modifyConfig(cls, test_path, i):
modifyConfig(test_path, i)
@classmethod
def setupNodes(cls):
logging.info(f"Preparing {NUM_NODES} nodes.")
prepare_nodes(
NUM_NODES,
cls.test_coins_list,
True,
{"min_sequence_lock_seconds": 60},
PORT_OFS,
)
@classmethod
def setUpClass(cls):
@@ -550,22 +563,18 @@ class BaseTestWithPrepare(unittest.TestCase):
if os.path.exists(test_path) and not RESET_TEST:
logging.info(f"Continuing with existing directory: {test_path}")
else:
logging.info(f"Preparing {NUM_NODES} nodes.")
prepare_nodes(
NUM_NODES,
TEST_COINS_LIST,
True,
{"min_sequence_lock_seconds": 60},
PORT_OFS,
)
cls.setupNodes()
for i in range(NUM_NODES):
modifyConfig(test_path, i)
cls.modifyConfig(test_path, i)
signal.signal(
signal.SIGINT, lambda signal, frame: signal_handler(cls, signal, frame)
)
start_processes(cls)
waitForServer(cls.delay_event, UI_PORT + 0)
waitForServer(cls.delay_event, UI_PORT + 1)
@classmethod
def tearDownClass(cls):
logging.info("Stopping test")
@@ -585,14 +594,6 @@ class BaseTestWithPrepare(unittest.TestCase):
cls.update_thread_dcr = None
cls.processes = []
def setUp(self):
if self.initialised:
return
start_processes(self)
waitForServer(self.delay_event, UI_PORT + 0)
waitForServer(self.delay_event, UI_PORT + 1)
self.initialised = True
class Test(BaseTestWithPrepare):
def test_persistent(self):

View File

@@ -0,0 +1,464 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2026 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
"""
# Ensure Electrumx is installed to a venv in ELECTRUMX_SRC_DIR/venv
# Example setup with default paths:
The leveldb system package may be required to install plyvel:
sudo pacman -S leveldb
cd ~/tmp/
git clone git@github.com:spesmilo/electrumx.git
cd electrumx
python3 -m venv venv
. venv/bin/activate
pip install ".[ujson]"
# Run test
export TEST_PATH=/tmp/test_electrum
mkdir -p ${TEST_PATH}/bin
cp -r ~/tmp/basicswap_bin/* ${TEST_PATH}/bin
export ELECTRUMX_SRC_DIR="~/tmp/electrumx"
export EXTRA_CONFIG_JSON=$(cat <<EOF | jq -r @json
{
"btc0":["txindex=1","rpcworkqueue=1100"]
}
EOF
)
export TEST_COINS_LIST="bitcoin,monero"
export PYTHONPATH=$(pwd)
pytest -v -s --log-cli-level=DEBUG tests/basicswap/test_electrum.py
# Run select test
pytest -v -s --log-cli-level=DEBUG tests/basicswap/test_electrum.py::Test::test_01_b_full_swap_xmr
# Optionally copy coin releases to permanent storage for faster subsequent startups
cp -r ${TEST_PATH}/bin/* ~/tmp/basicswap_bin/
"""
import json
import logging
import os
import random
import shutil
import subprocess
import sys
import unittest
import basicswap.config as cfg
from basicswap.basicswap_util import (
BidStates,
DebugTypes,
TxLockTypes,
strBidState,
)
from basicswap.chainparams import (
Coins,
chainparams,
getCoinIdFromName,
)
from basicswap.util.daemon import Daemon
from tests.basicswap.common import (
prepare_balance,
stopDaemons,
waitForNumBids,
waitForNumOffers,
)
from tests.basicswap.common_xmr import run_prepare, TEST_PATH
from tests.basicswap.extended.test_xmr_persistent import (
BaseTestWithPrepare,
NUM_NODES,
PORT_OFS,
RESET_TEST,
)
from tests.basicswap.mnemonics import mnemonics
from tests.basicswap.util import (
read_json_api,
post_json_api,
)
logger = logging.getLogger()
logger.level = logging.DEBUG
if not len(logger.handlers):
logger.addHandler(logging.StreamHandler(sys.stdout))
def modifyConfig(test_path, i):
if i == 1:
config_path = os.path.join(test_path, f"client{i}", cfg.CONFIG_FILENAME)
with open(config_path) as fp:
settings = json.load(fp)
settings["debug_ui"] = True
settings["fetchpricesthread"] = False
with open(config_path, "w") as fp:
json.dump(settings, fp, indent=4)
class TestFunctions(BaseTestWithPrepare):
__test__ = False
port_node_0 = 12701
port_node_1 = 12702
def do_test_01_full_swap(
self,
coin_from: Coins,
coin_to: Coins,
port_node_from: int = port_node_0,
port_node_to: int = port_node_1,
) -> None:
logger.info(
f"---------- Test {coin_from.name} ({port_node_from}) to {coin_to.name} ({port_node_to})"
)
ticker_from: str = chainparams[coin_from]["ticker"]
ticker_to: str = chainparams[coin_to]["ticker"]
amt_from_str = f"{random.uniform(0.5, 10.0):.{8}f}"
amt_to_str = f"{random.uniform(0.5, 10.0):.{8}f}"
data = {
"addr_from": "-1",
"coin_from": ticker_from,
"coin_to": ticker_to,
"amt_from": amt_from_str,
"amt_to": amt_to_str,
"lockhrs": "24",
"swap_type": "adaptor_sig",
}
logger.info(
f"Creating offer {amt_from_str} {ticker_from} -> {amt_to_str} {ticker_to}"
)
offer_id: str = post_json_api(port_node_from, "offers/new", data)["offer_id"]
summary = read_json_api(port_node_from)
assert summary["num_sent_offers"] == 1
logger.info(f"Waiting for offer: {offer_id}")
waitForNumOffers(self.delay_event, port_node_to, 1)
offers = read_json_api(port_node_to, "offers")
offer = offers[0]
data = {
"offer_id": offer["offer_id"],
"amount_from": offer["amount_from"],
"validmins": 60,
}
post_json_api(port_node_to, "bids/new", data)
waitForNumBids(self.delay_event, port_node_from, 1)
for i in range(20):
bids = read_json_api(port_node_from, "bids")
bid = bids[0]
if bid["bid_state"] == "Received":
break
self.delay_event.wait(1)
assert bid["bid_state"] == "Received"
rv = post_json_api(
port_node_from, "bids/{}".format(bid["bid_id"]), {"accept": True}
)
assert rv["bid_state"] in ("Accepted", "Request accepted")
logger.info("Completing swap")
for i in range(240):
if self.delay_event.is_set():
raise ValueError("Test stopped.")
self.delay_event.wait(4)
rv = read_json_api(port_node_from, "bids/{}".format(bid["bid_id"]))
if rv["bid_state"] == "Completed":
break
assert rv["bid_state"] == "Completed"
# Wait for bid to be removed from in-progress
waitForNumBids(self.delay_event, port_node_from, 0)
def do_test_02_leader_recover_a_lock_tx(
self,
coin_from: Coins,
coin_to: Coins,
port_node_from: int = port_node_0,
port_node_to: int = port_node_1,
lock_value: int = 12,
) -> None:
logging.info(
f"---------- Test {coin_from.name} ({port_node_from}) to {coin_to.name} ({port_node_to}) leader recovers coin a lock tx"
)
ticker_from: str = chainparams[coin_from]["ticker"]
ticker_to: str = chainparams[coin_to]["ticker"]
reverse_bid: bool = True if coin_from in (Coins.XMR,) else False
port_offerer: int = port_node_from
port_bidder: int = port_node_to
port_leader: int = port_bidder if reverse_bid else port_offerer
port_follower: int = port_offerer if reverse_bid else port_bidder
logging.info(
f"Offerer, bidder, leader, follower: {port_offerer}, {port_bidder}, {port_leader}, {port_follower}"
)
amt_from_str = f"{random.uniform(0.5, 10.0):.{8}f}"
amt_to_str = f"{random.uniform(0.5, 10.0):.{8}f}"
data = {
"addr_from": "-1",
"coin_from": ticker_from,
"coin_to": ticker_to,
"amt_from": amt_from_str,
"amt_to": amt_to_str,
"swap_type": "adaptor_sig",
"lock_type": str(int(TxLockTypes.SEQUENCE_LOCK_BLOCKS)),
"lock_blocks": str(lock_value),
}
logger.info(
f"Creating offer {amt_from_str} {ticker_from} -> {amt_to_str} {ticker_to}"
)
offer_id: str = post_json_api(port_node_from, "offers/new", data)["offer_id"]
summary = read_json_api(port_node_from)
assert summary["num_sent_offers"] == 1
logger.info(f"Waiting for offer: {offer_id}")
waitForNumOffers(self.delay_event, port_node_to, 1)
offers = read_json_api(port_node_to, "offers")
offer = offers[0]
data = {
"offer_id": offer["offer_id"],
"amount_from": offer["amount_from"],
"validmins": 60,
}
rv = post_json_api(port_node_to, "bids/new", data)
bid_id: str = rv["bid_id"]
waitForNumBids(self.delay_event, port_node_from, 1)
rv = post_json_api(
port_follower,
f"bids/{bid_id}",
{"debugind": DebugTypes.BID_STOP_AFTER_COIN_A_LOCK},
)
assert "bid_state" in rv # Test that the return didn't fail
for i in range(20):
bid = read_json_api(port_node_from, f"bids/{bid_id}")
if bid["bid_state"] == "Received":
break
self.delay_event.wait(1)
assert bid["bid_state"] == "Received"
rv = post_json_api(port_offerer, f"bids/{bid_id}", {"accept": True})
assert rv["bid_state"] in ("Accepted", "Request accepted")
for i in range(100):
if self.delay_event.is_set():
raise ValueError("Test stopped.")
self.delay_event.wait(4)
rv = read_json_api(port_leader, f"bids/{bid_id}")
if rv["bid_state"] == strBidState(BidStates.XMR_SWAP_FAILED_REFUNDED):
break
assert rv["bid_state"] == strBidState(BidStates.XMR_SWAP_FAILED_REFUNDED)
class Test(TestFunctions):
__test__ = True
update_min = 2
daemons = []
test_coin_a = Coins.PART
test_coin_b = Coins.BTC
test_coin_xmr = Coins.XMR
@classmethod
def addElectrumxDaemon(cls, coin_name: str, node_rpc_port: int, services_port: int):
coin_type: Coins = getCoinIdFromName(coin_name)
ticker: str = chainparams[coin_type]["ticker"]
ticker_lc: str = ticker.lower()
logger.info(f"Starting Electrumx for {ticker}")
ELECTRUMX_SRC_DIR = os.path.expanduser(os.getenv("ELECTRUMX_SRC_DIR"))
if ELECTRUMX_SRC_DIR is None:
raise ValueError("Please set ELECTRUMX_SRC_DIR")
ELECTRUMX_VENV = os.getenv(
"ELECTRUMX_VENV", os.path.join(ELECTRUMX_SRC_DIR, "venv")
)
ELECTRUMX_DATADIR = os.getenv(
f"ELECTRUMX_DATADIR_{ticker}", f"/tmp/electrumx_{ticker_lc}"
)
SSL_CERTFILE = f"{ELECTRUMX_DATADIR}/certfile.crt"
SSL_KEYFILE = f"{ELECTRUMX_DATADIR}/keyfile.key"
if os.path.isdir(ELECTRUMX_DATADIR):
if RESET_TEST:
logger.info("Removing " + ELECTRUMX_DATADIR)
shutil.rmtree(ELECTRUMX_DATADIR)
if not os.path.exists(ELECTRUMX_DATADIR):
os.makedirs(os.path.join(ELECTRUMX_DATADIR, "db"))
with open(os.path.join(ELECTRUMX_DATADIR, "banner"), "w") as fp:
fp.write("TEST BANNER")
try:
stdout = subprocess.check_output(
[
"openssl",
"req",
"-nodes",
"-new",
"-x509",
"-keyout",
SSL_KEYFILE,
"-out",
SSL_CERTFILE,
"-subj",
'/C=CA/ST=Quebec/L=Montreal/O="Poutine LLC"/OU=devops/CN=*.poutine.net\n',
],
text=True,
)
logger.info(f"openssl {stdout}")
except subprocess.CalledProcessError as e:
logger.info(f"Error openssl {e.output}")
electrumx_env = {
"COIN": coin_name.capitalize(),
"NET": "regtest",
"LOG_LEVEL": "debug",
"SERVICES": f"tcp://:{services_port},ssl://:{services_port+1},rpc://",
"CACHE_MB": "400",
"DAEMON_URL": f"http://test_{ticker_lc}_0:test_{ticker_lc}_pwd_0@127.0.0.1:{node_rpc_port}",
"DB_DIRECTORY": f"{ELECTRUMX_DATADIR}/db",
"SSL_CERTFILE": f"{ELECTRUMX_DATADIR}/certfile.crt",
"SSL_KEYFILE": f"{ELECTRUMX_DATADIR}/keyfile.key",
"BANNER_FILE": f"{ELECTRUMX_DATADIR}/banner",
"DAEMON_POLL_INTERVAL_BLOCKS": "1000",
"DAEMON_POLL_INTERVAL_MEMPOOL": "1000",
}
opened_files = []
stdout_dest = open(f"{ELECTRUMX_DATADIR}/electrumx.log", "w")
stderr_dest = stdout_dest
cls.daemons.append(
Daemon(
subprocess.Popen(
[
os.path.join(ELECTRUMX_VENV, "bin", "python"),
os.path.join(ELECTRUMX_SRC_DIR, "electrumx_server"),
],
shell=False,
stdin=subprocess.PIPE,
stdout=stdout_dest,
stderr=stderr_dest,
cwd=ELECTRUMX_SRC_DIR,
env=electrumx_env,
),
[
opened_files,
],
f"electrumx_{ticker_lc}",
)
)
@classmethod
def setUpClass(cls):
cls.addElectrumxDaemon("bitcoin", 32793, 50001)
super(Test, cls).setUpClass()
@classmethod
def modifyConfig(cls, test_path, i):
modifyConfig(test_path, i)
@classmethod
def setupNodes(cls):
logger.info(f"Preparing {NUM_NODES} nodes.")
bins_path = os.path.join(TEST_PATH, "bin")
for i in range(NUM_NODES):
logger.info(f"Preparing node: {i}.")
client_path = os.path.join(TEST_PATH, f"client{i}")
try:
shutil.rmtree(client_path)
except Exception as ex:
logger.warning(f"setupNodes {ex}")
extra_args = []
if i == 1:
extra_args = [
"--btc-mode=electrum",
"--btc-electrum-server=127.0.0.1:50001",
]
run_prepare(
i,
client_path,
bins_path,
cls.test_coins_list,
mnemonics[i] if i < len(mnemonics) else None,
num_nodes=NUM_NODES,
use_rpcauth=True,
extra_settings={"min_sequence_lock_seconds": 10},
port_ofs=PORT_OFS,
extra_args=extra_args,
)
@classmethod
def tearDownClass(cls):
logger.info("Finalising Test")
super().tearDownClass()
stopDaemons(cls.daemons)
def test_01_a_full_swap_xmr(self):
prepare_balance(
self.delay_event,
self.test_coin_b,
100,
self.port_node_1,
self.port_node_0,
True,
)
self.do_test_01_full_swap(self.test_coin_a, self.test_coin_b)
def test_01_b_full_swap_xmr(self):
prepare_balance(
self.delay_event,
self.test_coin_b,
100,
self.port_node_1,
self.port_node_0,
True,
)
self.do_test_01_full_swap(self.test_coin_b, self.test_coin_xmr)
def test_01_c_full_swap_xmr_reverse(self):
self.do_test_01_full_swap(
self.test_coin_xmr, self.test_coin_b, self.port_node_1, self.port_node_0
)
def test_02_a_leader_recover_a_lock_tx(self):
prepare_balance(
self.delay_event,
self.test_coin_b,
100,
self.port_node_1,
self.port_node_0,
True,
)
prepare_balance(
self.delay_event,
self.test_coin_xmr,
100,
self.port_node_0,
self.port_node_1,
True,
)
self.do_test_02_leader_recover_a_lock_tx(
self.test_coin_b, Coins.XMR, self.port_node_1, self.port_node_0
)
if __name__ == "__main__":
unittest.main()

View File

@@ -60,6 +60,7 @@ from tests.basicswap.util import (
from tests.basicswap.common import (
callrpc_cli,
prepareDataDir,
prepare_balance,
make_rpc_func,
checkForks,
stopDaemons,
@@ -1055,54 +1056,13 @@ class BaseTest(unittest.TestCase):
port_take_from_node: int,
test_balance: bool = True,
) -> None:
delay_iterations = 100 if coin == Coins.NAV else 20
delay_time = 5 if coin == Coins.NAV else 3
if coin == Coins.PART_BLIND:
coin_ticker: str = "PART"
balance_type: str = "blind_balance"
address_type: str = "stealth_address"
type_to: str = "blind"
elif coin == Coins.PART_ANON:
coin_ticker: str = "PART"
balance_type: str = "anon_balance"
address_type: str = "stealth_address"
type_to: str = "anon"
else:
coin_ticker: str = coin.name
balance_type: str = "balance"
address_type: str = "deposit_address"
js_w = read_json_api(port_target_node, "wallets")
current_balance: float = float(js_w[coin_ticker][balance_type])
if test_balance and current_balance >= amount:
return
post_json = {
"value": amount,
"address": js_w[coin_ticker][address_type],
"subfee": False,
}
if coin in (Coins.XMR, Coins.WOW):
post_json["sweepall"] = False
if coin in (Coins.PART_BLIND, Coins.PART_ANON):
post_json["type_to"] = type_to
json_rv = read_json_api(
port_take_from_node,
"wallets/{}/withdraw".format(coin_ticker.lower()),
post_json,
)
assert len(json_rv["txid"]) == 64
wait_for_amount: float = amount
if not test_balance:
wait_for_amount += current_balance
wait_for_balance(
prepare_balance(
test_delay_event,
"http://127.0.0.1:{}/json/wallets/{}".format(
port_target_node, coin_ticker.lower()
),
balance_type,
wait_for_amount,
iterations=delay_iterations,
delay_time=delay_time,
coin,
amount,
port_target_node,
port_take_from_node,
test_balance,
)