test: simplify PIVX test

This commit is contained in:
tecnovert
2026-06-06 21:18:28 +02:00
parent 554d362a45
commit 2b2d14b86a
6 changed files with 177 additions and 408 deletions
@@ -100,6 +100,7 @@ def prepare_balance(
class DOGETest(BaseTestWithPrepare):
__test__ = True
def test_a(self):
amount_from = 10.0
offer_json = {
+161 -401
View File
@@ -11,22 +11,14 @@ basicswap]$ python tests/basicswap/extended/test_pivx.py
"""
import json
import logging
import os
import random
import shutil
import signal
import sys
import threading
import time
import unittest
from coincurve.keys import PrivateKey
import basicswap.config as cfg
from basicswap.basicswap import (
BasicSwap,
Coins,
SwapTypes,
BidStates,
@@ -39,30 +31,27 @@ from basicswap.util import (
from basicswap.basicswap_util import (
TxLockTypes,
)
from basicswap.util.address import (
toWIF,
)
from tests.basicswap.util import (
read_json_api,
)
from tests.basicswap.common import (
callrpc_cli,
checkForks,
stopDaemons,
wait_for_bid,
wait_for_offer,
wait_for_balance,
wait_for_unspent,
wait_for_in_progress,
wait_for_bid_tx_state,
TEST_HTTP_HOST,
TEST_HTTP_PORT,
BASE_PORT,
BASE_RPC_PORT,
BASE_ZMQ_PORT,
PREFIX_SECRET_KEY_REGTEST,
waitForRPC,
make_rpc_func,
)
from tests.basicswap.test_xmr import (
BaseTest,
test_delay_event as delay_event,
callnoderpc,
)
from basicswap.contrib.rpcauth import generate_salt, password_to_hmac
from basicswap.bin.run import startDaemon
from basicswap.bin.prepare import downloadPIVXParams
@@ -72,11 +61,6 @@ if not len(logger.handlers):
logger.addHandler(logging.StreamHandler(sys.stdout))
NUM_NODES = 3
PIVX_NODE = 3
BTC_NODE = 4
delay_event = threading.Event()
stop_test = False
PIVX_BINDIR = os.path.expanduser(
os.getenv("PIVX_BINDIR", os.path.join(cfg.DEFAULT_TEST_BINDIR, "pivx"))
@@ -85,397 +69,174 @@ PIVXD = os.getenv("PIVXD", "pivxd" + cfg.bin_suffix)
PIVX_CLI = os.getenv("PIVX_CLI", "pivx-cli" + cfg.bin_suffix)
PIVX_TX = os.getenv("PIVX_TX", "pivx-tx" + cfg.bin_suffix)
def prepareOtherDir(datadir, nodeId, conf_file="pivx.conf"):
node_dir = os.path.join(datadir, str(nodeId))
if not os.path.exists(node_dir):
os.makedirs(node_dir)
filePath = os.path.join(node_dir, conf_file)
with open(filePath, "w+") as fp:
fp.write("regtest=1\n")
fp.write("[regtest]\n")
fp.write("port=" + str(BASE_PORT + nodeId) + "\n")
fp.write("rpcport=" + str(BASE_RPC_PORT + nodeId) + "\n")
fp.write("daemon=0\n")
fp.write("printtoconsole=0\n")
fp.write("server=1\n")
fp.write("discover=0\n")
fp.write("listenonion=0\n")
fp.write("bind=127.0.0.1\n")
fp.write("findpeers=0\n")
fp.write("debug=1\n")
fp.write("debugexclude=libevent\n")
fp.write("fallbackfee=0.01\n")
fp.write("acceptnonstdtxn=0\n")
if conf_file == "pivx.conf":
params_dir = os.path.join(datadir, "pivx-params")
downloadPIVXParams(params_dir)
fp.write(f"paramsdir={params_dir}\n")
if conf_file == "bitcoin.conf":
fp.write("wallet=bsx_wallet\n")
PIVX_BASE_PORT = 34832
PIVX_BASE_RPC_PORT = 35832
PIVX_BASE_ZMQ_PORT = 36832
def prepareDir(datadir, nodeId, network_key, network_pubkey):
node_dir = os.path.join(datadir, str(nodeId))
if not os.path.exists(node_dir):
os.makedirs(node_dir)
filePath = os.path.join(node_dir, "particl.conf")
with open(filePath, "w+") as fp:
fp.write("regtest=1\n")
fp.write("[regtest]\n")
fp.write("port=" + str(BASE_PORT + nodeId) + "\n")
fp.write("rpcport=" + str(BASE_RPC_PORT + nodeId) + "\n")
fp.write("daemon=0\n")
fp.write("printtoconsole=0\n")
fp.write("server=1\n")
fp.write("discover=0\n")
fp.write("listenonion=0\n")
fp.write("bind=127.0.0.1\n")
fp.write("findpeers=0\n")
fp.write("debug=1\n")
fp.write("debugexclude=libevent\n")
fp.write("zmqpubsmsg=tcp://127.0.0.1:" + str(BASE_ZMQ_PORT + nodeId) + "\n")
fp.write("wallet=bsx_wallet\n")
fp.write("fallbackfee=0.01\n")
fp.write("acceptnonstdtxn=0\n")
fp.write("minstakeinterval=5\n")
fp.write("stakethreadconddelayms=1000\n")
fp.write("smsgsregtestadjust=0\n")
for i in range(0, NUM_NODES):
if nodeId == i:
continue
fp.write("addnode=127.0.0.1:%d\n" % (BASE_PORT + i))
if nodeId < 2:
fp.write("spentindex=1\n")
fp.write("txindex=1\n")
basicswap_dir = os.path.join(datadir, str(nodeId), "basicswap")
if not os.path.exists(basicswap_dir):
os.makedirs(basicswap_dir)
pivxdatadir = os.path.join(datadir, str(PIVX_NODE))
btcdatadir = os.path.join(datadir, str(BTC_NODE))
settings_path = os.path.join(basicswap_dir, cfg.CONFIG_FILENAME)
settings = {
"debug": True,
"zmqhost": "tcp://127.0.0.1",
"zmqport": BASE_ZMQ_PORT + nodeId,
"htmlhost": TEST_HTTP_HOST,
"htmlport": TEST_HTTP_PORT + nodeId,
"network_key": network_key,
"network_pubkey": network_pubkey,
"chainclients": {
"particl": {
"connection_type": "rpc",
"manage_daemon": False,
"rpcport": BASE_RPC_PORT + nodeId,
"datadir": node_dir,
"bindir": cfg.PARTICL_BINDIR,
"blocks_confirmed": 2, # Faster testing
"wallet_name": "bsx_wallet",
},
"pivx": {
"connection_type": "rpc",
"manage_daemon": False,
"rpcport": BASE_RPC_PORT + PIVX_NODE,
"datadir": pivxdatadir,
"bindir": PIVX_BINDIR,
"use_csv": False,
"use_segwit": False,
"wallet_name": "",
},
"bitcoin": {
"connection_type": "rpc",
"manage_daemon": False,
"rpcport": BASE_RPC_PORT + BTC_NODE,
"datadir": btcdatadir,
"bindir": cfg.BITCOIN_BINDIR,
"use_segwit": True,
"wallet_name": "bsx_wallet",
},
},
"check_progress_seconds": 2,
"check_watched_seconds": 4,
"check_expired_seconds": 60,
"check_events_seconds": 1,
"check_xmr_swaps_seconds": 1,
"min_delay_event": 1,
"max_delay_event": 3,
"min_delay_event_short": 1,
"max_delay_event_short": 3,
"min_delay_retry": 2,
"max_delay_retry": 10,
"restrict_unknown_seed_wallets": False,
"check_updates": False,
}
with open(settings_path, "w") as fp:
json.dump(settings, fp, indent=4)
def partRpc(cmd, node_id=0):
return callrpc_cli(
cfg.PARTICL_BINDIR,
os.path.join(cfg.TEST_DATADIRS, str(node_id)),
"regtest",
cmd,
cfg.PARTICL_CLI,
)
def btcRpc(cmd):
return callrpc_cli(
cfg.BITCOIN_BINDIR,
os.path.join(cfg.TEST_DATADIRS, str(BTC_NODE)),
"regtest",
cmd,
cfg.BITCOIN_CLI,
)
def pivxRpc(cmd):
def pivxCli(cmd, node_id=0):
return callrpc_cli(
PIVX_BINDIR,
os.path.join(cfg.TEST_DATADIRS, str(PIVX_NODE)),
os.path.join(cfg.TEST_DATADIRS, "pivx_" + str(node_id)),
"regtest",
cmd,
PIVX_CLI,
)
def signal_handler(sig, frame):
global stop_test
os.write(sys.stdout.fileno(), f"Signal {sig} detected.\n".encode("utf-8"))
stop_test = True
delay_event.set()
def prepareDataDir(
datadir, node_id, conf_file, dir_prefix, base_p2p_port, base_rpc_port, num_nodes=3
):
node_dir = os.path.join(datadir, dir_prefix + str(node_id))
if not os.path.exists(node_dir):
os.makedirs(node_dir)
cfg_file_path = os.path.join(node_dir, conf_file)
if os.path.exists(cfg_file_path):
return
with open(cfg_file_path, "w+") as fp:
fp.write("regtest=1\n")
fp.write("[regtest]\n")
fp.write("port=" + str(base_p2p_port + node_id) + "\n")
fp.write("rpcport=" + str(base_rpc_port + node_id) + "\n")
def run_coins_loop(cls):
while not stop_test:
try:
pivxRpc("generatetoaddress 1 {}".format(cls.pivx_addr))
btcRpc("generatetoaddress 1 {}".format(cls.btc_addr))
except Exception as e:
logging.warning("run_coins_loop " + str(e))
time.sleep(1.0)
def run_loop(self):
while not stop_test:
for c in self.swap_clients:
c.update()
time.sleep(1)
def make_part_cli_rpc_func(node_id):
node_id = node_id
def rpc_func(method, params=None, wallet=None):
cmd = method
if params:
for p in params:
cmd += ' "' + p + '"'
return partRpc(cmd, node_id)
return rpc_func
class Test(unittest.TestCase):
test_coin_from = Coins.PIVX
@classmethod
def setUpClass(cls):
super().setUpClass()
k = PrivateKey()
cls.network_key = toWIF(PREFIX_SECRET_KEY_REGTEST, k.secret)
cls.network_pubkey = k.public_key.format().hex()
if os.path.isdir(cfg.TEST_DATADIRS):
logging.info("Removing " + cfg.TEST_DATADIRS)
for name in os.listdir(cfg.TEST_DATADIRS):
if name == "pivx-params":
continue
fullpath = os.path.join(cfg.TEST_DATADIRS, name)
if os.path.isdir(fullpath):
shutil.rmtree(fullpath)
else:
os.remove(fullpath)
for i in range(NUM_NODES):
prepareDir(cfg.TEST_DATADIRS, i, cls.network_key, cls.network_pubkey)
prepareOtherDir(cfg.TEST_DATADIRS, PIVX_NODE)
prepareOtherDir(cfg.TEST_DATADIRS, BTC_NODE, "bitcoin.conf")
cls.daemons = []
cls.swap_clients = []
btc_data_dir = os.path.join(cfg.TEST_DATADIRS, str(BTC_NODE))
if os.path.exists(os.path.join(cfg.BITCOIN_BINDIR, "bitcoin-wallet")):
try:
callrpc_cli(
cfg.BITCOIN_BINDIR,
btc_data_dir,
"regtest",
"-wallet=bsx_wallet -legacy create",
"bitcoin-wallet",
)
except Exception:
callrpc_cli(
cfg.BITCOIN_BINDIR,
btc_data_dir,
"regtest",
"-wallet=bsx_wallet create",
"bitcoin-wallet",
)
cls.daemons.append(startDaemon(btc_data_dir, cfg.BITCOIN_BINDIR, cfg.BITCOIND))
logging.info("Started %s %d", cfg.BITCOIND, cls.daemons[-1].handle.pid)
cls.daemons.append(
startDaemon(
os.path.join(cfg.TEST_DATADIRS, str(PIVX_NODE)), PIVX_BINDIR, PIVXD
salt = generate_salt(16)
fp.write(
"rpcauth={}:{}${}\n".format(
"test" + str(node_id),
salt,
password_to_hmac(salt, "test_pass" + str(node_id)),
)
)
logging.info("Started %s %d", PIVXD, cls.daemons[-1].handle.pid)
for i in range(NUM_NODES):
data_dir = os.path.join(cfg.TEST_DATADIRS, str(i))
if os.path.exists(os.path.join(cfg.PARTICL_BINDIR, "particl-wallet")):
try:
callrpc_cli(
cfg.PARTICL_BINDIR,
data_dir,
"regtest",
"-wallet=bsx_wallet -legacy create",
"particl-wallet",
)
except Exception:
callrpc_cli(
cfg.PARTICL_BINDIR,
data_dir,
"regtest",
"-wallet=bsx_wallet create",
"particl-wallet",
)
cls.daemons.append(startDaemon(data_dir, cfg.PARTICL_BINDIR, cfg.PARTICLD))
logging.info("Started %s %d", cfg.PARTICLD, cls.daemons[-1].handle.pid)
fp.write("daemon=0\n")
fp.write("printtoconsole=0\n")
fp.write("server=1\n")
fp.write("discover=0\n")
fp.write("listenonion=0\n")
fp.write("bind=127.0.0.1\n")
fp.write("findpeers=0\n")
fp.write("debug=1\n")
fp.write("debugexclude=libevent\n")
for i in range(NUM_NODES):
rpc = make_part_cli_rpc_func(i)
waitForRPC(rpc, delay_event)
if i == 0:
rpc(
"extkeyimportmaster",
[
"abandon baby cabbage dad eager fabric gadget habit ice kangaroo lab absorb"
],
)
elif i == 1:
rpc(
"extkeyimportmaster",
[
"pact mammal barrel matrix local final lecture chunk wasp survey bid various book strong spread fall ozone daring like topple door fatigue limb olympic",
"",
"true",
],
)
rpc("getnewextaddress", ["lblExtTest"])
rpc("rescanblockchain")
else:
rpc("extkeyimportmaster", [rpc("mnemonic", ["new"])["master"]])
rpc(
"walletsettings",
[
"stakingoptions",
json.dumps(
{"stakecombinethreshold": 100, "stakesplitthreshold": 200}
).replace('"', '\\"'),
],
fp.write("fallbackfee=0.01\n")
fp.write("acceptnonstdtxn=0\n")
params_dir = os.path.join(datadir, "pivx-params")
downloadPIVXParams(params_dir)
fp.write(f"paramsdir={params_dir}\n")
for i in range(0, num_nodes):
if node_id == i:
continue
fp.write("addnode=127.0.0.1:{}\n".format(base_p2p_port + i))
return node_dir
class Test(BaseTest):
__test__ = True
test_coin_from = Coins.PIVX
pivx_daemons = []
pivx_addr = None
start_ltc_nodes = False
start_xmr_nodes = False
@classmethod
def prepareExtraDataDir(cls, i):
extra_opts = []
if not cls.restore_instance:
prepareDataDir(
cfg.TEST_DATADIRS,
i,
"pivx.conf",
"pivx_",
base_p2p_port=PIVX_BASE_PORT,
base_rpc_port=PIVX_BASE_RPC_PORT,
)
rpc("reservebalance", ["false"])
basicswap_dir = os.path.join(
os.path.join(cfg.TEST_DATADIRS, str(i)), "basicswap"
cls.pivx_daemons.append(
startDaemon(
os.path.join(cfg.TEST_DATADIRS, "pivx_" + str(i)),
PIVX_BINDIR,
PIVXD,
opts=extra_opts,
)
settings_path = os.path.join(basicswap_dir, cfg.CONFIG_FILENAME)
with open(settings_path) as fs:
settings = json.load(fs)
sc = BasicSwap(
basicswap_dir, settings, "regtest", log_name="BasicSwap{}".format(i)
)
logging.info("Started %s %d", PIVXD, cls.pivx_daemons[-1].handle.pid)
waitForRPC(make_rpc_func(i, base_rpc_port=PIVX_BASE_RPC_PORT), delay_event)
@classmethod
def addPIDInfo(cls, sc, i):
sc.setDaemonPID(Coins.PIVX, cls.pivx_daemons[i].handle.pid)
@classmethod
def prepareExtraCoins(cls):
if cls.restore_instance:
void_block_rewards_pubkey = cls.getRandomPubkey()
cls.pivx_addr = (
cls.swap_clients[0]
.ci(Coins.PIVX)
.pubkey_to_address(void_block_rewards_pubkey)
)
cls.swap_clients.append(sc)
sc.setDaemonPID(Coins.BTC, cls.daemons[0].handle.pid)
sc.setDaemonPID(Coins.PIVX, cls.daemons[1].handle.pid)
sc.setDaemonPID(Coins.PART, cls.daemons[2 + i].handle.pid)
sc.start()
else:
num_blocks = 1352 # CHECKLOCKTIMEVERIFY soft-fork activates at (regtest) block height 1351.
logging.info(f"Mining {num_blocks} pivx blocks")
cls.pivx_addr = pivxCli("getnewaddress mining_addr")
pivxCli(f"generatetoaddress {num_blocks} {cls.pivx_addr}")
waitForRPC(pivxRpc, delay_event)
num_blocks = 1352 # CHECKLOCKTIMEVERIFY soft-fork activates at (regtest) block height 1351.
logging.info("Mining %d pivx blocks", num_blocks)
cls.pivx_addr = pivxRpc("getnewaddress mining_addr")
pivxRpc("generatetoaddress {} {}".format(num_blocks, cls.pivx_addr))
ro = pivxRpc("getblockchaininfo")
try:
assert ro["bip9_softforks"]["csv"]["status"] == "active"
except Exception:
logging.info("pivx: csv is not active")
try:
assert ro["bip9_softforks"]["segwit"]["status"] == "active"
except Exception:
logging.info("pivx: segwit is not active")
waitForRPC(btcRpc, delay_event)
cls.btc_addr = btcRpc("getnewaddress mining_addr bech32")
logging.info("Mining %d Bitcoin blocks to %s", num_blocks, cls.btc_addr)
btcRpc("generatetoaddress {} {}".format(num_blocks, cls.btc_addr))
ro = btcRpc("getblockchaininfo")
checkForks(ro)
signal.signal(signal.SIGINT, signal_handler)
cls.update_thread = threading.Thread(target=run_loop, args=(cls,))
cls.update_thread.start()
cls.coins_update_thread = threading.Thread(target=run_coins_loop, args=(cls,))
cls.coins_update_thread.start()
# Wait for height, or sequencelock is thrown off by genesis blocktime
num_blocks = 3
logging.info("Waiting for Particl chain height %d", num_blocks)
for i in range(60):
particl_blocks = cls.swap_clients[0].callrpc("getblockcount")
print("particl_blocks", particl_blocks)
if particl_blocks >= num_blocks:
break
delay_event.wait(1)
assert particl_blocks >= num_blocks
ro = pivxCli("getblockchaininfo")
try:
assert ro["bip9_softforks"]["csv"]["status"] == "active"
except Exception:
logging.info("pivx: csv is not active")
try:
assert ro["bip9_softforks"]["segwit"]["status"] == "active"
except Exception:
logging.info("pivx: segwit is not active")
@classmethod
def tearDownClass(cls):
global stop_test
logging.info("Finalising")
stop_test = True
cls.update_thread.join()
cls.coins_update_thread.join()
for c in cls.swap_clients:
c.finalise()
stopDaemons(cls.daemons)
cls.swap_clients.clear()
cls.daemons.clear()
logging.info("Finalising PIVX Test")
super().tearDownClass()
stopDaemons(cls.pivx_daemons)
cls.pivx_daemons.clear()
@classmethod
def addCoinSettings(cls, settings, datadir, node_id):
settings["chainclients"]["pivx"] = {
"connection_type": "rpc",
"manage_daemon": False,
"rpcport": PIVX_BASE_RPC_PORT + node_id,
"rpcuser": "test" + str(node_id),
"rpcpassword": "test_pass" + str(node_id),
"datadir": os.path.join(datadir, "pivx_" + str(node_id)),
"bindir": PIVX_BINDIR,
"use_csv": False,
"use_segwit": False,
"wallet_name": "",
}
@classmethod
def coins_loop(cls):
super().coins_loop()
callnoderpc(
0, "generatetoaddress", [1, cls.pivx_addr], base_rpc_port=PIVX_BASE_RPC_PORT
)
@classmethod
def prepareBalances(cls):
super().prepareBalances()
cls.prepare_balance(
cls,
Coins.PIVX,
10000.0,
1801,
1800,
)
def test_02_part_pivx(self):
logging.info("---------- Test PART to PIVX")
swap_clients = self.swap_clients
@@ -718,7 +479,7 @@ class Test(unittest.TestCase):
logging.info("---------- Test {} wallet".format(self.test_coin_from.name))
logging.info("Test withdrawal")
addr = pivxRpc('getnewaddress "Withdrawal test"')
addr = pivxCli('getnewaddress "Withdrawal test"')
wallets = read_json_api(TEST_HTTP_PORT + 0, "wallets")
assert float(wallets[self.test_coin_from.name]["balance"]) > 100
@@ -748,30 +509,30 @@ class Test(unittest.TestCase):
def test_09_v3_tx(self):
logging.info("---------- Test PIVX v3 txns")
generate_addr = pivxRpc('getnewaddress "generate test"')
pivx_addr = pivxRpc('getnewaddress "Sapling test"')
pivx_sapling_addr = pivxRpc('getnewshieldaddress "shield addr"')
generate_addr = pivxCli('getnewaddress "generate test"')
pivx_addr = pivxCli('getnewaddress "Sapling test"')
pivx_sapling_addr = pivxCli('getnewshieldaddress "shield addr"')
pivxRpc(f'sendtoaddress "{pivx_addr}" 6.0')
pivxRpc(f'generatetoaddress 1 "{generate_addr}"')
pivxCli(f'sendtoaddress "{pivx_addr}" 6.0')
pivxCli(f'generatetoaddress 1 "{generate_addr}"')
txid = pivxRpc(
txid = pivxCli(
'shieldsendmany "{}" "[{{\\"address\\": \\"{}\\", \\"amount\\": 1}}]"'.format(
pivx_addr, pivx_sapling_addr
)
)
rtx = pivxRpc(f'getrawtransaction "{txid}" true')
rtx = pivxCli(f'getrawtransaction "{txid}" true')
assert rtx["version"] == 3
block_hash = None
for i in range(15):
rtx = pivxRpc(f'getrawtransaction "{txid}" true')
rtx = pivxCli(f'getrawtransaction "{txid}" true')
if "blockhash" in rtx:
block_hash = rtx["blockhash"]
logging.info(f"Shielded tx confirmed in block {block_hash} after {i}s")
break
if i == 5:
pivxRpc(f'generatetoaddress 1 "{generate_addr}"')
pivxCli(f'generatetoaddress 1 "{generate_addr}"')
delay_event.wait(1)
assert block_hash is not None, "Shielded tx was not confirmed"
@@ -861,7 +622,6 @@ class Test(unittest.TestCase):
value_after_subfee = ci_from.make_int(itx_decoded["vout"][n]["value"])
assert value_after_subfee < swap_value
swap_value = value_after_subfee
wait_for_unspent(delay_event, ci_from, swap_value)
extra_options = {"prefunded_itx": itx}
rate_swap = ci_to.make_int(random.uniform(0.2, 10.0), r=1)
+4 -4
View File
@@ -5,9 +5,9 @@
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import time
import logging
import os
import time
from basicswap.basicswap import (
Coins,
@@ -120,14 +120,14 @@ class Test(BaseTest):
@classmethod
def tearDownClass(cls):
logging.info("Finalising Wownero Test")
super(Test, cls).tearDownClass()
super().tearDownClass()
stopDaemons(cls.wow_daemons)
cls.wow_daemons.clear()
@classmethod
def coins_loop(cls):
super(Test, cls).coins_loop()
super().coins_loop()
if cls.wow_addr is not None:
callrpc_xmr(
@@ -162,7 +162,7 @@ class Test(BaseTest):
startXmrWalletDaemon(node_dir, WOW_BINDIR, WOW_WALLET_RPC, opts=opts)
)
cls.wow_wallet_auth.append(("test{0}".format(i), "test_pass{0}".format(i)))
cls.wow_wallet_auth.append((f"test{i}", f"test_pass{i}"))
waitForWOWNode(i, auth=cls.wow_wallet_auth[i])
+6 -2
View File
@@ -306,7 +306,9 @@ class TestFunctions(BaseTest):
def do_test_02_leader_recover_a_lock_tx(
self, coin_from: Coins, coin_to: Coins, lock_value: int = 32
) -> None:
logging.info(f"---------- Test {coin_from.name} to {coin_to.name} leader recovers coin a lock tx")
logging.info(
f"---------- Test {coin_from.name} to {coin_to.name} leader recovers coin a lock tx"
)
id_offerer: int = self.node_a_id
id_bidder: int = self.node_b_id
@@ -507,7 +509,9 @@ class TestFunctions(BaseTest):
def do_test_04_follower_recover_b_lock_tx(
self, coin_from, coin_to, lock_value: int = 32
):
logging.info(f"---------- Test {coin_from.name} to {coin_to.name} follower recovers coin b lock tx")
logging.info(
f"---------- Test {coin_from.name} to {coin_to.name} follower recovers coin b lock tx"
)
id_offerer: int = self.node_a_id
id_bidder: int = self.node_b_id
+5
View File
@@ -942,6 +942,7 @@ class BaseTest(unittest.TestCase):
)
cls.coins_update_thread.start()
cls.prepareBalances()
except Exception:
traceback.print_exc()
cls.tearDownClass()
@@ -999,6 +1000,10 @@ class BaseTest(unittest.TestCase):
def prepareExtraCoins(cls):
pass
@classmethod
def prepareBalances(cls):
pass
@classmethod
def coins_loop(cls):
if cls.btc_addr is not None:
-1
View File
@@ -11,7 +11,6 @@ import os
import urllib
from urllib.request import urlopen
PORT_OFS = int(os.getenv("PORT_OFS", 1))
UI_PORT = 12700 + PORT_OFS