Files
basicswap/tests/basicswap/extended/test_pivx.py
T
2026-05-08 17:10:27 +02:00

1001 lines
32 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2022-2023 tecnovert
# Copyright (c) 2024-2025 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
"""
basicswap]$ python tests/basicswap/extended/test_pivx.py
"""
import json
import logging
import os
import random
import shutil
import signal
import sys
import threading
import time
import unittest
from coincurve.keys import PrivateKey
import basicswap.config as cfg
from basicswap.basicswap import (
BasicSwap,
Coins,
SwapTypes,
BidStates,
TxStates,
DebugTypes,
)
from basicswap.util import (
COIN,
)
from basicswap.basicswap_util import (
TxLockTypes,
)
from basicswap.util.address import (
toWIF,
)
from tests.basicswap.util import (
read_json_api,
)
from tests.basicswap.common import (
callrpc_cli,
checkForks,
stopDaemons,
wait_for_bid,
wait_for_offer,
wait_for_balance,
wait_for_unspent,
wait_for_in_progress,
wait_for_bid_tx_state,
TEST_HTTP_HOST,
TEST_HTTP_PORT,
BASE_PORT,
BASE_RPC_PORT,
BASE_ZMQ_PORT,
PREFIX_SECRET_KEY_REGTEST,
waitForRPC,
)
from basicswap.bin.run import startDaemon
from basicswap.bin.prepare import downloadPIVXParams
logger = logging.getLogger()
logger.level = logging.DEBUG
if not len(logger.handlers):
logger.addHandler(logging.StreamHandler(sys.stdout))
NUM_NODES = 3
PIVX_NODE = 3
BTC_NODE = 4
delay_event = threading.Event()
stop_test = False
PIVX_BINDIR = os.path.expanduser(
os.getenv("PIVX_BINDIR", os.path.join(cfg.DEFAULT_TEST_BINDIR, "pivx"))
)
PIVXD = os.getenv("PIVXD", "pivxd" + cfg.bin_suffix)
PIVX_CLI = os.getenv("PIVX_CLI", "pivx-cli" + cfg.bin_suffix)
PIVX_TX = os.getenv("PIVX_TX", "pivx-tx" + cfg.bin_suffix)
def prepareOtherDir(datadir, nodeId, conf_file="pivx.conf"):
node_dir = os.path.join(datadir, str(nodeId))
if not os.path.exists(node_dir):
os.makedirs(node_dir)
filePath = os.path.join(node_dir, conf_file)
with open(filePath, "w+") as fp:
fp.write("regtest=1\n")
fp.write("[regtest]\n")
fp.write("port=" + str(BASE_PORT + nodeId) + "\n")
fp.write("rpcport=" + str(BASE_RPC_PORT + nodeId) + "\n")
fp.write("daemon=0\n")
fp.write("printtoconsole=0\n")
fp.write("server=1\n")
fp.write("discover=0\n")
fp.write("listenonion=0\n")
fp.write("bind=127.0.0.1\n")
fp.write("findpeers=0\n")
fp.write("debug=1\n")
fp.write("debugexclude=libevent\n")
fp.write("fallbackfee=0.01\n")
fp.write("acceptnonstdtxn=0\n")
if conf_file == "pivx.conf":
params_dir = os.path.join(datadir, "pivx-params")
downloadPIVXParams(params_dir)
fp.write(f"paramsdir={params_dir}\n")
if conf_file == "bitcoin.conf":
fp.write("wallet=bsx_wallet\n")
def prepareDir(datadir, nodeId, network_key, network_pubkey):
node_dir = os.path.join(datadir, str(nodeId))
if not os.path.exists(node_dir):
os.makedirs(node_dir)
filePath = os.path.join(node_dir, "particl.conf")
with open(filePath, "w+") as fp:
fp.write("regtest=1\n")
fp.write("[regtest]\n")
fp.write("port=" + str(BASE_PORT + nodeId) + "\n")
fp.write("rpcport=" + str(BASE_RPC_PORT + nodeId) + "\n")
fp.write("daemon=0\n")
fp.write("printtoconsole=0\n")
fp.write("server=1\n")
fp.write("discover=0\n")
fp.write("listenonion=0\n")
fp.write("bind=127.0.0.1\n")
fp.write("findpeers=0\n")
fp.write("debug=1\n")
fp.write("debugexclude=libevent\n")
fp.write("zmqpubsmsg=tcp://127.0.0.1:" + str(BASE_ZMQ_PORT + nodeId) + "\n")
fp.write("wallet=bsx_wallet\n")
fp.write("fallbackfee=0.01\n")
fp.write("acceptnonstdtxn=0\n")
fp.write("minstakeinterval=5\n")
fp.write("smsgsregtestadjust=0\n")
for i in range(0, NUM_NODES):
if nodeId == i:
continue
fp.write("addnode=127.0.0.1:%d\n" % (BASE_PORT + i))
if nodeId < 2:
fp.write("spentindex=1\n")
fp.write("txindex=1\n")
basicswap_dir = os.path.join(datadir, str(nodeId), "basicswap")
if not os.path.exists(basicswap_dir):
os.makedirs(basicswap_dir)
pivxdatadir = os.path.join(datadir, str(PIVX_NODE))
btcdatadir = os.path.join(datadir, str(BTC_NODE))
settings_path = os.path.join(basicswap_dir, cfg.CONFIG_FILENAME)
settings = {
"debug": True,
"zmqhost": "tcp://127.0.0.1",
"zmqport": BASE_ZMQ_PORT + nodeId,
"htmlhost": TEST_HTTP_HOST,
"htmlport": TEST_HTTP_PORT + nodeId,
"network_key": network_key,
"network_pubkey": network_pubkey,
"chainclients": {
"particl": {
"connection_type": "rpc",
"manage_daemon": False,
"rpcport": BASE_RPC_PORT + nodeId,
"datadir": node_dir,
"bindir": cfg.PARTICL_BINDIR,
"blocks_confirmed": 2, # Faster testing
"wallet_name": "bsx_wallet",
},
"pivx": {
"connection_type": "rpc",
"manage_daemon": False,
"rpcport": BASE_RPC_PORT + PIVX_NODE,
"datadir": pivxdatadir,
"bindir": PIVX_BINDIR,
"use_csv": False,
"use_segwit": False,
"wallet_name": "",
},
"bitcoin": {
"connection_type": "rpc",
"manage_daemon": False,
"rpcport": BASE_RPC_PORT + BTC_NODE,
"datadir": btcdatadir,
"bindir": cfg.BITCOIN_BINDIR,
"use_segwit": True,
"wallet_name": "bsx_wallet",
},
},
"check_progress_seconds": 2,
"check_watched_seconds": 4,
"check_expired_seconds": 60,
"check_events_seconds": 1,
"check_xmr_swaps_seconds": 1,
"min_delay_event": 1,
"max_delay_event": 3,
"min_delay_event_short": 1,
"max_delay_event_short": 3,
"min_delay_retry": 2,
"max_delay_retry": 10,
"restrict_unknown_seed_wallets": False,
"check_updates": False,
}
with open(settings_path, "w") as fp:
json.dump(settings, fp, indent=4)
def partRpc(cmd, node_id=0):
return callrpc_cli(
cfg.PARTICL_BINDIR,
os.path.join(cfg.TEST_DATADIRS, str(node_id)),
"regtest",
cmd,
cfg.PARTICL_CLI,
)
def btcRpc(cmd):
return callrpc_cli(
cfg.BITCOIN_BINDIR,
os.path.join(cfg.TEST_DATADIRS, str(BTC_NODE)),
"regtest",
cmd,
cfg.BITCOIN_CLI,
)
def pivxRpc(cmd):
return callrpc_cli(
PIVX_BINDIR,
os.path.join(cfg.TEST_DATADIRS, str(PIVX_NODE)),
"regtest",
cmd,
PIVX_CLI,
)
def signal_handler(sig, frame):
global stop_test
os.write(sys.stdout.fileno(), f"Signal {sig} detected.\n".encode("utf-8"))
stop_test = True
delay_event.set()
def run_coins_loop(cls):
while not stop_test:
try:
pivxRpc("generatetoaddress 1 {}".format(cls.pivx_addr))
btcRpc("generatetoaddress 1 {}".format(cls.btc_addr))
except Exception as e:
logging.warning("run_coins_loop " + str(e))
time.sleep(1.0)
def run_loop(self):
while not stop_test:
for c in self.swap_clients:
c.update()
time.sleep(1)
def make_part_cli_rpc_func(node_id):
node_id = node_id
def rpc_func(method, params=None, wallet=None):
cmd = method
if params:
for p in params:
cmd += ' "' + p + '"'
return partRpc(cmd, node_id)
return rpc_func
class Test(unittest.TestCase):
test_coin_from = Coins.PIVX
@classmethod
def setUpClass(cls):
super(Test, cls).setUpClass()
k = PrivateKey()
cls.network_key = toWIF(PREFIX_SECRET_KEY_REGTEST, k.secret)
cls.network_pubkey = k.public_key.format().hex()
if os.path.isdir(cfg.TEST_DATADIRS):
logging.info("Removing " + cfg.TEST_DATADIRS)
for name in os.listdir(cfg.TEST_DATADIRS):
if name == "pivx-params":
continue
fullpath = os.path.join(cfg.TEST_DATADIRS, name)
if os.path.isdir(fullpath):
shutil.rmtree(fullpath)
else:
os.remove(fullpath)
for i in range(NUM_NODES):
prepareDir(cfg.TEST_DATADIRS, i, cls.network_key, cls.network_pubkey)
prepareOtherDir(cfg.TEST_DATADIRS, PIVX_NODE)
prepareOtherDir(cfg.TEST_DATADIRS, BTC_NODE, "bitcoin.conf")
cls.daemons = []
cls.swap_clients = []
btc_data_dir = os.path.join(cfg.TEST_DATADIRS, str(BTC_NODE))
if os.path.exists(os.path.join(cfg.BITCOIN_BINDIR, "bitcoin-wallet")):
try:
callrpc_cli(
cfg.BITCOIN_BINDIR,
btc_data_dir,
"regtest",
"-wallet=bsx_wallet -legacy create",
"bitcoin-wallet",
)
except Exception:
callrpc_cli(
cfg.BITCOIN_BINDIR,
btc_data_dir,
"regtest",
"-wallet=bsx_wallet create",
"bitcoin-wallet",
)
cls.daemons.append(startDaemon(btc_data_dir, cfg.BITCOIN_BINDIR, cfg.BITCOIND))
logging.info("Started %s %d", cfg.BITCOIND, cls.daemons[-1].handle.pid)
cls.daemons.append(
startDaemon(
os.path.join(cfg.TEST_DATADIRS, str(PIVX_NODE)), PIVX_BINDIR, PIVXD
)
)
logging.info("Started %s %d", PIVXD, cls.daemons[-1].handle.pid)
for i in range(NUM_NODES):
data_dir = os.path.join(cfg.TEST_DATADIRS, str(i))
if os.path.exists(os.path.join(cfg.PARTICL_BINDIR, "particl-wallet")):
try:
callrpc_cli(
cfg.PARTICL_BINDIR,
data_dir,
"regtest",
"-wallet=bsx_wallet -legacy create",
"particl-wallet",
)
except Exception:
callrpc_cli(
cfg.PARTICL_BINDIR,
data_dir,
"regtest",
"-wallet=bsx_wallet create",
"particl-wallet",
)
cls.daemons.append(startDaemon(data_dir, cfg.PARTICL_BINDIR, cfg.PARTICLD))
logging.info("Started %s %d", cfg.PARTICLD, cls.daemons[-1].handle.pid)
for i in range(NUM_NODES):
rpc = make_part_cli_rpc_func(i)
waitForRPC(rpc, delay_event)
if i == 0:
rpc(
"extkeyimportmaster",
[
"abandon baby cabbage dad eager fabric gadget habit ice kangaroo lab absorb"
],
)
elif i == 1:
rpc(
"extkeyimportmaster",
[
"pact mammal barrel matrix local final lecture chunk wasp survey bid various book strong spread fall ozone daring like topple door fatigue limb olympic",
"",
"true",
],
)
rpc("getnewextaddress", ["lblExtTest"])
rpc("rescanblockchain")
else:
rpc("extkeyimportmaster", [rpc("mnemonic", ["new"])["master"]])
rpc(
"walletsettings",
[
"stakingoptions",
json.dumps(
{"stakecombinethreshold": 100, "stakesplitthreshold": 200}
).replace('"', '\\"'),
],
)
rpc("reservebalance", ["false"])
basicswap_dir = os.path.join(
os.path.join(cfg.TEST_DATADIRS, str(i)), "basicswap"
)
settings_path = os.path.join(basicswap_dir, cfg.CONFIG_FILENAME)
with open(settings_path) as fs:
settings = json.load(fs)
sc = BasicSwap(
basicswap_dir, settings, "regtest", log_name="BasicSwap{}".format(i)
)
cls.swap_clients.append(sc)
sc.setDaemonPID(Coins.BTC, cls.daemons[0].handle.pid)
sc.setDaemonPID(Coins.PIVX, cls.daemons[1].handle.pid)
sc.setDaemonPID(Coins.PART, cls.daemons[2 + i].handle.pid)
sc.start()
waitForRPC(pivxRpc, delay_event)
num_blocks = 1352 # CHECKLOCKTIMEVERIFY soft-fork activates at (regtest) block height 1351.
logging.info("Mining %d pivx blocks", num_blocks)
cls.pivx_addr = pivxRpc("getnewaddress mining_addr")
pivxRpc("generatetoaddress {} {}".format(num_blocks, cls.pivx_addr))
ro = pivxRpc("getblockchaininfo")
try:
assert ro["bip9_softforks"]["csv"]["status"] == "active"
except Exception:
logging.info("pivx: csv is not active")
try:
assert ro["bip9_softforks"]["segwit"]["status"] == "active"
except Exception:
logging.info("pivx: segwit is not active")
waitForRPC(btcRpc, delay_event)
cls.btc_addr = btcRpc("getnewaddress mining_addr bech32")
logging.info("Mining %d Bitcoin blocks to %s", num_blocks, cls.btc_addr)
btcRpc("generatetoaddress {} {}".format(num_blocks, cls.btc_addr))
ro = btcRpc("getblockchaininfo")
checkForks(ro)
signal.signal(signal.SIGINT, signal_handler)
cls.update_thread = threading.Thread(target=run_loop, args=(cls,))
cls.update_thread.start()
cls.coins_update_thread = threading.Thread(target=run_coins_loop, args=(cls,))
cls.coins_update_thread.start()
# Wait for height, or sequencelock is thrown off by genesis blocktime
num_blocks = 3
logging.info("Waiting for Particl chain height %d", num_blocks)
for i in range(60):
particl_blocks = cls.swap_clients[0].callrpc("getblockcount")
print("particl_blocks", particl_blocks)
if particl_blocks >= num_blocks:
break
delay_event.wait(1)
assert particl_blocks >= num_blocks
@classmethod
def tearDownClass(cls):
global stop_test
logging.info("Finalising")
stop_test = True
cls.update_thread.join()
cls.coins_update_thread.join()
for c in cls.swap_clients:
c.finalise()
stopDaemons(cls.daemons)
cls.swap_clients.clear()
cls.daemons.clear()
super(Test, cls).tearDownClass()
def test_02_part_pivx(self):
logging.info("---------- Test PART to PIVX")
swap_clients = self.swap_clients
offer_id = swap_clients[0].postOffer(
Coins.PART,
Coins.PIVX,
100 * COIN,
0.1 * COIN,
100 * COIN,
SwapTypes.SELLER_FIRST,
TxLockTypes.ABS_LOCK_TIME,
)
wait_for_offer(delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
wait_for_bid(delay_event, swap_clients[0], bid_id)
swap_clients[0].acceptBid(bid_id)
wait_for_in_progress(delay_event, swap_clients[1], bid_id, sent=True)
wait_for_bid(
delay_event, swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, wait_for=60
)
wait_for_bid(
delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=60,
)
js_0 = read_json_api(1800)
js_1 = read_json_api(1801)
assert js_0["num_swapping"] == 0 and js_0["num_watched_outputs"] == 0
assert js_1["num_swapping"] == 0 and js_1["num_watched_outputs"] == 0
def test_03_pivx_part(self):
logging.info("---------- Test PIVX to PART")
swap_clients = self.swap_clients
offer_id = swap_clients[1].postOffer(
Coins.PIVX,
Coins.PART,
10 * COIN,
9.0 * COIN,
10 * COIN,
SwapTypes.SELLER_FIRST,
TxLockTypes.ABS_LOCK_TIME,
)
wait_for_offer(delay_event, swap_clients[0], offer_id)
offer = swap_clients[0].getOffer(offer_id)
bid_id = swap_clients[0].postBid(offer_id, offer.amount_from)
wait_for_bid(delay_event, swap_clients[1], bid_id)
swap_clients[1].acceptBid(bid_id)
wait_for_in_progress(delay_event, swap_clients[0], bid_id, sent=True)
wait_for_bid(
delay_event,
swap_clients[0],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=60,
)
wait_for_bid(
delay_event, swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, wait_for=60
)
js_0 = read_json_api(1800)
js_1 = read_json_api(1801)
assert js_0["num_swapping"] == 0 and js_0["num_watched_outputs"] == 0
assert js_1["num_swapping"] == 0 and js_1["num_watched_outputs"] == 0
def test_04_pivx_btc(self):
logging.info("---------- Test PIVX to BTC")
swap_clients = self.swap_clients
offer_id = swap_clients[0].postOffer(
Coins.PIVX,
Coins.BTC,
10 * COIN,
0.1 * COIN,
10 * COIN,
SwapTypes.SELLER_FIRST,
TxLockTypes.ABS_LOCK_TIME,
)
wait_for_offer(delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
wait_for_bid(delay_event, swap_clients[0], bid_id)
swap_clients[0].acceptBid(bid_id)
wait_for_in_progress(delay_event, swap_clients[1], bid_id, sent=True)
wait_for_bid(
delay_event, swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, wait_for=60
)
wait_for_bid(
delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=60,
)
js_0 = read_json_api(1800)
js_1 = read_json_api(1801)
assert js_0["num_swapping"] == 0 and js_0["num_watched_outputs"] == 0
assert js_1["num_swapping"] == 0 and js_1["num_watched_outputs"] == 0
def test_05_refund(self):
# Seller submits initiate txn, buyer doesn't respond
logging.info("---------- Test refund, PIVX to BTC")
swap_clients = self.swap_clients
offer_id = swap_clients[0].postOffer(
Coins.PIVX,
Coins.BTC,
10 * COIN,
0.1 * COIN,
10 * COIN,
SwapTypes.SELLER_FIRST,
TxLockTypes.ABS_LOCK_BLOCKS,
10,
)
wait_for_offer(delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
wait_for_bid(delay_event, swap_clients[0], bid_id)
swap_clients[1].abandonBid(bid_id)
swap_clients[0].acceptBid(bid_id)
wait_for_bid(
delay_event, swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, wait_for=60
)
wait_for_bid(
delay_event,
swap_clients[1],
bid_id,
BidStates.BID_ABANDONED,
sent=True,
wait_for=60,
)
js_0 = read_json_api(1800)
js_1 = read_json_api(1801)
assert js_0["num_swapping"] == 0 and js_0["num_watched_outputs"] == 0
assert js_1["num_swapping"] == 0 and js_1["num_watched_outputs"] == 0
def test_06_self_bid(self):
logging.info("---------- Test same client, BTC to PIVX")
swap_clients = self.swap_clients
js_0_before = read_json_api(1800)
offer_id = swap_clients[0].postOffer(
Coins.PIVX,
Coins.BTC,
10 * COIN,
10 * COIN,
10 * COIN,
SwapTypes.SELLER_FIRST,
TxLockTypes.ABS_LOCK_TIME,
)
wait_for_offer(delay_event, swap_clients[0], offer_id)
offer = swap_clients[0].getOffer(offer_id)
bid_id = swap_clients[0].postBid(offer_id, offer.amount_from)
wait_for_bid(delay_event, swap_clients[0], bid_id)
swap_clients[0].acceptBid(bid_id)
wait_for_bid_tx_state(
delay_event,
swap_clients[0],
bid_id,
TxStates.TX_REDEEMED,
TxStates.TX_REDEEMED,
wait_for=60,
)
wait_for_bid(
delay_event, swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, wait_for=60
)
js_0 = read_json_api(1800)
assert js_0["num_swapping"] == 0 and js_0["num_watched_outputs"] == 0
assert (
js_0["num_recv_bids"] == js_0_before["num_recv_bids"] + 1
and js_0["num_sent_bids"] == js_0_before["num_sent_bids"] + 1
)
def test_07_error(self):
logging.info("---------- Test error, BTC to PIVX, set fee above bid value")
swap_clients = self.swap_clients
offer_id = swap_clients[0].postOffer(
Coins.PIVX,
Coins.BTC,
0.01 * COIN,
1.0 * COIN,
0.01 * COIN,
SwapTypes.SELLER_FIRST,
TxLockTypes.ABS_LOCK_TIME,
)
wait_for_offer(delay_event, swap_clients[0], offer_id)
offer = swap_clients[0].getOffer(offer_id)
bid_id = swap_clients[0].postBid(offer_id, offer.amount_from)
wait_for_bid(delay_event, swap_clients[0], bid_id)
swap_clients[0].acceptBid(bid_id)
try:
swap_clients[0].getChainClientSettings(Coins.BTC)["override_feerate"] = 10.0
swap_clients[0].getChainClientSettings(Coins.PIVX)[
"override_feerate"
] = 100.0
wait_for_bid(
delay_event, swap_clients[0], bid_id, BidStates.BID_ERROR, wait_for=60
)
swap_clients[0].abandonBid(bid_id)
finally:
del swap_clients[0].getChainClientSettings(Coins.BTC)["override_feerate"]
del swap_clients[0].getChainClientSettings(Coins.PIVX)["override_feerate"]
def test_08_wallet(self):
logging.info("---------- Test {} wallet".format(self.test_coin_from.name))
logging.info("Test withdrawal")
addr = pivxRpc('getnewaddress "Withdrawal test"')
wallets = read_json_api(TEST_HTTP_PORT + 0, "wallets")
assert float(wallets[self.test_coin_from.name]["balance"]) > 100
post_json = {
"value": 100,
"address": addr,
"subfee": False,
}
json_rv = read_json_api(
TEST_HTTP_PORT + 0,
"wallets/{}/withdraw".format(self.test_coin_from.name.lower()),
post_json,
)
assert len(json_rv["txid"]) == 64
logging.info("Test createutxo")
post_json = {
"value": 10,
}
json_rv = read_json_api(
TEST_HTTP_PORT + 0,
"wallets/{}/createutxo".format(self.test_coin_from.name.lower()),
post_json,
)
assert len(json_rv["txid"]) == 64
def test_09_v3_tx(self):
logging.info("---------- Test PIVX v3 txns")
generate_addr = pivxRpc('getnewaddress "generate test"')
pivx_addr = pivxRpc('getnewaddress "Sapling test"')
pivx_sapling_addr = pivxRpc('getnewshieldaddress "shield addr"')
pivxRpc(f'sendtoaddress "{pivx_addr}" 6.0')
pivxRpc(f'generatetoaddress 1 "{generate_addr}"')
txid = pivxRpc(
'shieldsendmany "{}" "[{{\\"address\\": \\"{}\\", \\"amount\\": 1}}]"'.format(
pivx_addr, pivx_sapling_addr
)
)
rtx = pivxRpc(f'getrawtransaction "{txid}" true')
assert rtx["version"] == 3
block_hash = None
for i in range(15):
rtx = pivxRpc(f'getrawtransaction "{txid}" true')
if "blockhash" in rtx:
block_hash = rtx["blockhash"]
logging.info(f"Shielded tx confirmed in block {block_hash} after {i}s")
break
if i == 5:
pivxRpc(f'generatetoaddress 1 "{generate_addr}"')
delay_event.wait(1)
assert block_hash is not None, "Shielded tx was not confirmed"
ci = self.swap_clients[0].ci(Coins.PIVX)
block = ci.getBlockWithTxns(block_hash)
found = False
for tx in block["tx"]:
if txid == tx["txid"]:
found = True
break
assert found
def ensure_balance(self, coin_type, node_id, amount):
tla = coin_type.name
js_w = read_json_api(1800 + node_id, "wallets")
if float(js_w[tla]["balance"]) < amount:
post_json = {
"value": amount,
"address": js_w[tla]["deposit_address"],
"subfee": False,
}
json_rv = read_json_api(
1800, "wallets/{}/withdraw".format(tla.lower()), post_json
)
assert len(json_rv["txid"]) == 64
wait_for_balance(
delay_event,
"http://127.0.0.1:{}/json/wallets/{}".format(
1800 + node_id, tla.lower()
),
"balance",
amount,
)
def test_10_prefunded_itx(self):
logging.info("---------- Test prefunded itx offer")
swap_clients = self.swap_clients
coin_from = Coins.PIVX
coin_to = Coins.BTC
swap_type = SwapTypes.SELLER_FIRST
ci_from = swap_clients[2].ci(coin_from)
ci_to = swap_clients[1].ci(coin_to)
tla_from = coin_from.name
# Prepare balance
self.ensure_balance(coin_from, 2, 10.0)
self.ensure_balance(coin_to, 1, 100.0)
js_w2 = read_json_api(1802, "wallets")
post_json = {
"value": 10.0,
"address": read_json_api(
1802, "wallets/{}/nextdepositaddr".format(tla_from.lower())
),
"subfee": True,
}
json_rv = read_json_api(
1802, "wallets/{}/withdraw".format(tla_from.lower()), post_json
)
wait_for_balance(
delay_event,
"http://127.0.0.1:1802/json/wallets/{}".format(tla_from.lower()),
"balance",
9.0,
)
assert len(json_rv["txid"]) == 64
# Create prefunded ITX
pi = swap_clients[2].pi(SwapTypes.XMR_SWAP)
js_w2 = read_json_api(1802, "wallets")
swap_value = 10.0
if float(js_w2[tla_from]["balance"]) < swap_value:
swap_value = js_w2[tla_from]["balance"]
swap_value = ci_from.make_int(swap_value)
assert swap_value > ci_from.make_int(9)
addr_to = pi.getMockAddrTo(ci_from)
funded_tx = ci_from.createRawFundedTransaction(
addr_to, swap_value, True, lock_unspents=True
)
itx = bytes.fromhex(funded_tx)
itx_decoded = ci_from.describeTx(itx.hex())
n = pi.findMockVout(ci_from, itx_decoded)
value_after_subfee = ci_from.make_int(itx_decoded["vout"][n]["value"])
assert value_after_subfee < swap_value
swap_value = value_after_subfee
wait_for_unspent(delay_event, ci_from, swap_value)
extra_options = {"prefunded_itx": itx}
rate_swap = ci_to.make_int(random.uniform(0.2, 10.0), r=1)
offer_id = swap_clients[2].postOffer(
coin_from,
coin_to,
swap_value,
rate_swap,
swap_value,
swap_type,
TxLockTypes.ABS_LOCK_TIME,
extra_options=extra_options,
)
wait_for_offer(delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
wait_for_bid(delay_event, swap_clients[2], bid_id, BidStates.BID_RECEIVED)
swap_clients[2].acceptBid(bid_id)
wait_for_bid(
delay_event, swap_clients[2], bid_id, BidStates.SWAP_COMPLETED, wait_for=120
)
wait_for_bid(
delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=120,
)
# Verify expected inputs were used
bid, offer = swap_clients[2].getBidAndOffer(bid_id)
assert bid.initiate_tx
wtx = ci_from.rpc(
"gettransaction",
[
bid.initiate_tx.txid.hex(),
],
)
itx_after = ci_from.describeTx(wtx["hex"])
assert len(itx_after["vin"]) == len(itx_decoded["vin"])
for i, txin in enumerate(itx_decoded["vin"]):
assert txin["txid"] == itx_after["vin"][i]["txid"]
assert txin["vout"] == itx_after["vin"][i]["vout"]
def test_11_xmrswap_to(self):
logging.info("---------- Test xmr swap protocol to")
swap_clients = self.swap_clients
coin_from = Coins.BTC
coin_to = Coins.PIVX
swap_type = SwapTypes.XMR_SWAP
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[1].ci(coin_to)
swap_value = ci_from.make_int(random.uniform(0.2, 20.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[0].postOffer(
coin_from, coin_to, swap_value, rate_swap, swap_value, swap_type
)
wait_for_offer(delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
wait_for_bid(delay_event, swap_clients[0], bid_id, BidStates.BID_RECEIVED)
swap_clients[0].acceptBid(bid_id)
wait_for_bid(
delay_event, swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, wait_for=120
)
wait_for_bid(
delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=120,
)
def test_12_xmrswap_to_recover_b_lock_tx(self):
coin_from = Coins.BTC
coin_to = Coins.PIVX
logging.info(
"---------- Test {} to {} follower recovers coin b lock tx".format(
coin_from.name, coin_to.name
)
)
swap_clients = self.swap_clients
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[1].ci(coin_to)
amt_swap = ci_from.make_int(random.uniform(0.1, 2.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[0].postOffer(
coin_from,
coin_to,
amt_swap,
rate_swap,
amt_swap,
SwapTypes.XMR_SWAP,
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS,
lock_value=32,
)
wait_for_offer(delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
bid_id = swap_clients[1].postXmrBid(offer_id, offer.amount_from)
wait_for_bid(delay_event, swap_clients[0], bid_id, BidStates.BID_RECEIVED)
bid, xmr_swap = swap_clients[0].getXmrBid(bid_id)
swap_clients[1].setBidDebugInd(bid_id, DebugTypes.CREATE_INVALID_COIN_B_LOCK)
swap_clients[0].acceptXmrBid(bid_id)
wait_for_bid(
delay_event,
swap_clients[0],
bid_id,
BidStates.XMR_SWAP_FAILED_REFUNDED,
wait_for=180,
)
wait_for_bid(
delay_event,
swap_clients[1],
bid_id,
BidStates.XMR_SWAP_FAILED_REFUNDED,
sent=True,
)
if __name__ == "__main__":
unittest.main()