mirror of
https://github.com/basicswap/basicswap.git
synced 2025-11-06 02:38:11 +01:00
1561 lines
49 KiB
Python
1561 lines
49 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2024 tecnovert
|
|
# Copyright (c) 2024 The Basicswap developers
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
import copy
|
|
import logging
|
|
import os
|
|
import random
|
|
|
|
import unittest
|
|
|
|
import basicswap.config as cfg
|
|
|
|
from basicswap.basicswap import (
|
|
BidStates,
|
|
Coins,
|
|
DebugTypes,
|
|
SwapTypes,
|
|
TxStates,
|
|
)
|
|
from basicswap.basicswap_util import TxLockTypes, TxTypes
|
|
from basicswap.util.crypto import hash160
|
|
from basicswap.interface.dcr.rpc import (
|
|
callrpc,
|
|
)
|
|
from basicswap.interface.dcr.messages import (
|
|
TxSerializeType,
|
|
)
|
|
from basicswap.interface.dcr.util import (
|
|
createDCRWallet,
|
|
)
|
|
from tests.basicswap.common import (
|
|
compare_bid_states,
|
|
compare_bid_states_unordered,
|
|
stopDaemons,
|
|
wait_for_bid,
|
|
wait_for_bid_tx_state,
|
|
wait_for_offer,
|
|
waitForRPC,
|
|
)
|
|
from tests.basicswap.util import (
|
|
read_json_api,
|
|
REQUIRED_SETTINGS,
|
|
)
|
|
from tests.basicswap.test_xmr import BaseTest, test_delay_event
|
|
from basicswap.interface.dcr import DCRInterface
|
|
from basicswap.interface.dcr.messages import CTransaction, CTxIn, COutPoint
|
|
from basicswap.interface.dcr.script import OP_CHECKSEQUENCEVERIFY, push_script_data
|
|
from basicswap.bin.run import startDaemon
|
|
|
|
logger = logging.getLogger()
|
|
|
|
DCR_BINDIR = os.path.expanduser(
|
|
os.getenv("DCR_BINDIR", os.path.join(cfg.DEFAULT_TEST_BINDIR, "decred"))
|
|
)
|
|
DCRD = os.getenv("DCRD", "dcrd" + cfg.bin_suffix)
|
|
DCR_WALLET = os.getenv("DCR_WALLET", "dcrwallet" + cfg.bin_suffix)
|
|
|
|
DCR_BASE_PORT = 44932
|
|
DCR_BASE_RPC_PORT = 45932
|
|
DCR_BASE_WALLET_RPC_PORT = 45952
|
|
|
|
|
|
def make_rpc_func(node_id, base_rpc_port):
|
|
node_id = node_id
|
|
auth = "test{0}:test_pass{0}".format(node_id)
|
|
|
|
def rpc_func(method, params=None):
|
|
return callrpc(base_rpc_port + node_id, auth, method, params)
|
|
|
|
return rpc_func
|
|
|
|
|
|
def wait_for_dcr_height(http_port, num_blocks=3):
|
|
logging.info("Waiting for DCR chain height %d", num_blocks)
|
|
for i in range(60):
|
|
if test_delay_event.is_set():
|
|
raise ValueError("Test stopped.")
|
|
try:
|
|
wallet = read_json_api(http_port, "wallets/dcr")
|
|
decred_blocks = wallet["blocks"]
|
|
print("decred_blocks", decred_blocks)
|
|
if decred_blocks >= num_blocks:
|
|
return
|
|
except Exception as e:
|
|
print("Error reading wallets", str(e))
|
|
|
|
test_delay_event.wait(1)
|
|
raise ValueError(f"wait_for_decred_blocks failed http_port: {http_port}")
|
|
|
|
|
|
def run_test_success_path(self, coin_from: Coins, coin_to: Coins):
|
|
logging.info(f"---------- Test {coin_from.name} to {coin_to.name}")
|
|
|
|
node_from = 0
|
|
node_to = 1
|
|
swap_clients = self.swap_clients
|
|
ci_from = swap_clients[node_from].ci(coin_from)
|
|
ci_to = swap_clients[node_to].ci(coin_to)
|
|
|
|
self.prepare_balance(coin_to, 100.0, 1801, 1800)
|
|
self.prepare_balance(coin_from, 100.0, 1800, 1801)
|
|
|
|
amt_swap = ci_from.make_int(random.uniform(0.1, 5.0), r=1)
|
|
rate_swap = ci_to.make_int(random.uniform(0.2, 10.0), r=1)
|
|
|
|
offer_id = swap_clients[node_from].postOffer(
|
|
coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.SELLER_FIRST
|
|
)
|
|
|
|
wait_for_offer(test_delay_event, swap_clients[node_to], offer_id)
|
|
|
|
offer = swap_clients[node_to].getOffer(offer_id)
|
|
bid_id = swap_clients[node_to].postBid(offer_id, offer.amount_from)
|
|
|
|
wait_for_bid(test_delay_event, swap_clients[node_from], bid_id)
|
|
swap_clients[node_from].acceptBid(bid_id)
|
|
|
|
wait_for_bid(
|
|
test_delay_event,
|
|
swap_clients[node_from],
|
|
bid_id,
|
|
BidStates.SWAP_COMPLETED,
|
|
wait_for=120,
|
|
)
|
|
wait_for_bid(
|
|
test_delay_event,
|
|
swap_clients[node_to],
|
|
bid_id,
|
|
BidStates.SWAP_COMPLETED,
|
|
sent=True,
|
|
wait_for=30,
|
|
)
|
|
|
|
# Verify lock tx spends are found in the expected wallets
|
|
bid, offer = swap_clients[node_from].getBidAndOffer(bid_id)
|
|
itx_spend = bid.initiate_tx.spend_txid.hex()
|
|
node_to_ci_from = swap_clients[node_to].ci(coin_from)
|
|
wtx = node_to_ci_from.rpc_wallet(
|
|
"gettransaction",
|
|
[
|
|
itx_spend,
|
|
],
|
|
)
|
|
assert (
|
|
amt_swap - node_to_ci_from.make_int(wtx["details"][0]["amount"]) < self.max_fee
|
|
)
|
|
|
|
node_from_ci_to = swap_clients[node_from].ci(coin_to)
|
|
ptx_spend = bid.participate_tx.spend_txid.hex()
|
|
wtx = node_from_ci_to.rpc_wallet(
|
|
"gettransaction",
|
|
[
|
|
ptx_spend,
|
|
],
|
|
)
|
|
assert (
|
|
bid.amount_to - node_from_ci_to.make_int(wtx["details"][0]["amount"])
|
|
< self.max_fee
|
|
)
|
|
|
|
js_0 = read_json_api(1800 + node_from)
|
|
js_1 = read_json_api(1800 + node_to)
|
|
assert js_0["num_swapping"] == 0 and js_0["num_watched_outputs"] == 0
|
|
assert js_1["num_swapping"] == 0 and js_1["num_watched_outputs"] == 0
|
|
|
|
bid_id_hex = bid_id.hex()
|
|
path = f"bids/{bid_id_hex}/states"
|
|
offerer_states = read_json_api(1800 + node_from, path)
|
|
bidder_states = read_json_api(1800 + node_to, path)
|
|
|
|
expect_states = copy.deepcopy(self.states_offerer_sh[0])
|
|
# Will miss PTX Sent event as PTX is found by searching the chain.
|
|
if coin_to == Coins.DCR:
|
|
expect_states[5] = "PTX In Chain"
|
|
assert compare_bid_states(offerer_states, expect_states) is True
|
|
assert compare_bid_states(bidder_states, self.states_bidder_sh[0]) is True
|
|
|
|
|
|
def run_test_bad_ptx(self, coin_from: Coins, coin_to: Coins):
|
|
# Invalid PTX sent, swap should stall and ITx and PTx should be reclaimed by senders
|
|
logging.info(f"---------- Test bad ptx {coin_from.name} to {coin_to.name}")
|
|
|
|
node_from = 0
|
|
node_to = 1
|
|
swap_clients = self.swap_clients
|
|
ci_from = swap_clients[node_from].ci(coin_from)
|
|
ci_to = swap_clients[node_to].ci(coin_to)
|
|
|
|
self.prepare_balance(coin_to, 100.0, 1801, 1800)
|
|
self.prepare_balance(coin_from, 100.0, 1800, 1801)
|
|
|
|
amt_swap = ci_from.make_int(random.uniform(1.1, 10.0), r=1)
|
|
rate_swap = ci_to.make_int(random.uniform(0.1, 2.0), r=1)
|
|
|
|
offer_id = swap_clients[node_from].postOffer(
|
|
coin_from,
|
|
coin_to,
|
|
amt_swap,
|
|
rate_swap,
|
|
amt_swap,
|
|
SwapTypes.SELLER_FIRST,
|
|
TxLockTypes.SEQUENCE_LOCK_BLOCKS,
|
|
10,
|
|
auto_accept_bids=True,
|
|
)
|
|
|
|
wait_for_offer(test_delay_event, swap_clients[node_to], offer_id)
|
|
offer = swap_clients[node_to].getOffer(offer_id)
|
|
bid_id = swap_clients[node_to].postBid(offer_id, offer.amount_from)
|
|
swap_clients[node_to].setBidDebugInd(bid_id, DebugTypes.MAKE_INVALID_PTX)
|
|
|
|
wait_for_bid(
|
|
test_delay_event,
|
|
swap_clients[node_from],
|
|
bid_id,
|
|
BidStates.SWAP_COMPLETED,
|
|
wait_for=120,
|
|
)
|
|
wait_for_bid(
|
|
test_delay_event,
|
|
swap_clients[node_to],
|
|
bid_id,
|
|
BidStates.SWAP_COMPLETED,
|
|
sent=True,
|
|
wait_for=30,
|
|
)
|
|
|
|
js_0_bid = read_json_api(1800 + node_from, "bids/{}".format(bid_id.hex()))
|
|
js_1_bid = read_json_api(1800 + node_to, "bids/{}".format(bid_id.hex()))
|
|
assert js_0_bid["itx_state"] == "Refunded"
|
|
assert js_1_bid["ptx_state"] == "Refunded"
|
|
|
|
# Verify lock tx spends are found in the expected wallets
|
|
bid, offer = swap_clients[node_from].getBidAndOffer(bid_id)
|
|
itx_spend = bid.initiate_tx.spend_txid.hex()
|
|
node_from_ci_from = swap_clients[node_from].ci(coin_from)
|
|
wtx = node_from_ci_from.rpc_wallet(
|
|
"gettransaction",
|
|
[
|
|
itx_spend,
|
|
],
|
|
)
|
|
assert (
|
|
amt_swap - node_from_ci_from.make_int(wtx["details"][0]["amount"])
|
|
< self.max_fee
|
|
)
|
|
|
|
node_to_ci_to = swap_clients[node_to].ci(coin_to)
|
|
bid, offer = swap_clients[node_to].getBidAndOffer(bid_id)
|
|
ptx_spend = bid.participate_tx.spend_txid.hex()
|
|
wtx = node_to_ci_to.rpc_wallet(
|
|
"gettransaction",
|
|
[
|
|
ptx_spend,
|
|
],
|
|
)
|
|
assert (
|
|
bid.amount_to - node_to_ci_to.make_int(wtx["details"][0]["amount"])
|
|
< self.max_fee
|
|
)
|
|
|
|
bid_id_hex = bid_id.hex()
|
|
path = f"bids/{bid_id_hex}/states"
|
|
offerer_states = read_json_api(1800 + node_from, path)
|
|
bidder_states = read_json_api(1800 + node_to, path)
|
|
|
|
if coin_to not in (Coins.XMR, Coins.WOW):
|
|
return
|
|
# Hard to get the timing right
|
|
assert (
|
|
compare_bid_states_unordered(offerer_states, self.states_offerer_sh[1]) is True
|
|
)
|
|
assert compare_bid_states_unordered(bidder_states, self.states_bidder_sh[1]) is True
|
|
|
|
js_0 = read_json_api(1800 + node_from)
|
|
js_1 = read_json_api(1800 + node_to)
|
|
assert js_0["num_swapping"] == 0 and js_0["num_watched_outputs"] == 0
|
|
assert js_1["num_swapping"] == 0 and js_1["num_watched_outputs"] == 0
|
|
|
|
|
|
def run_test_itx_refund(self, coin_from: Coins, coin_to: Coins):
|
|
# Offerer claims PTX and refunds ITX after lock expires
|
|
# Bidder loses PTX value without gaining ITX value
|
|
logging.info(f"---------- Test itx refund {coin_from.name} to {coin_to.name}")
|
|
|
|
node_from = 0
|
|
node_to = 1
|
|
swap_clients = self.swap_clients
|
|
ci_from = swap_clients[node_from].ci(coin_from)
|
|
ci_to = swap_clients[node_to].ci(coin_to)
|
|
|
|
self.prepare_balance(coin_to, 100.0, 1801, 1800)
|
|
self.prepare_balance(coin_from, 100.0, 1800, 1801)
|
|
|
|
swap_value = ci_from.make_int(random.uniform(2.0, 20.0), r=1)
|
|
rate_swap = ci_to.make_int(0.5, r=1)
|
|
offer_id = swap_clients[node_from].postOffer(
|
|
coin_from,
|
|
coin_to,
|
|
swap_value,
|
|
rate_swap,
|
|
swap_value,
|
|
SwapTypes.SELLER_FIRST,
|
|
TxLockTypes.SEQUENCE_LOCK_BLOCKS,
|
|
12,
|
|
)
|
|
|
|
wait_for_offer(test_delay_event, swap_clients[node_to], offer_id)
|
|
offer = swap_clients[node_to].getOffer(offer_id)
|
|
bid_id = swap_clients[node_to].postBid(offer_id, offer.amount_from)
|
|
swap_clients[node_to].setBidDebugInd(bid_id, DebugTypes.DONT_SPEND_ITX)
|
|
|
|
wait_for_bid(test_delay_event, swap_clients[node_from], bid_id)
|
|
|
|
# For testing: Block refunding the ITX until PTX has been redeemed, else ITX refund can become spendable before PTX confirms
|
|
swap_clients[node_from].setBidDebugInd(bid_id, DebugTypes.SKIP_LOCK_TX_REFUND)
|
|
swap_clients[node_from].acceptBid(bid_id)
|
|
wait_for_bid_tx_state(
|
|
test_delay_event,
|
|
swap_clients[node_from],
|
|
bid_id,
|
|
TxStates.TX_CONFIRMED,
|
|
TxStates.TX_REDEEMED,
|
|
wait_for=120,
|
|
)
|
|
swap_clients[node_from].setBidDebugInd(bid_id, DebugTypes.NONE)
|
|
|
|
wait_for_bid_tx_state(
|
|
test_delay_event,
|
|
swap_clients[node_from],
|
|
bid_id,
|
|
TxStates.TX_REFUNDED,
|
|
TxStates.TX_REDEEMED,
|
|
wait_for=90,
|
|
)
|
|
|
|
wait_for_bid(
|
|
test_delay_event, swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, wait_for=60
|
|
)
|
|
|
|
# Verify lock tx spends are found in the expected wallets
|
|
bid, offer = swap_clients[node_from].getBidAndOffer(bid_id)
|
|
itx_spend = bid.initiate_tx.spend_txid.hex()
|
|
node_from_ci_from = swap_clients[node_from].ci(coin_from)
|
|
wtx = node_from_ci_from.rpc_wallet(
|
|
"gettransaction",
|
|
[
|
|
itx_spend,
|
|
],
|
|
)
|
|
assert (
|
|
swap_value - node_from_ci_from.make_int(wtx["details"][0]["amount"])
|
|
< self.max_fee
|
|
)
|
|
|
|
node_from_ci_to = swap_clients[node_from].ci(coin_to)
|
|
ptx_spend = bid.participate_tx.spend_txid.hex()
|
|
wtx = node_from_ci_to.rpc_wallet(
|
|
"gettransaction",
|
|
[
|
|
ptx_spend,
|
|
],
|
|
)
|
|
assert (
|
|
bid.amount_to - node_from_ci_to.make_int(wtx["details"][0]["amount"])
|
|
< self.max_fee
|
|
)
|
|
|
|
|
|
def run_test_ads_success_path(self, coin_from: Coins, coin_to: Coins):
|
|
logging.info(f"---------- Test ADS swap {coin_from.name} to {coin_to.name}")
|
|
|
|
# Offerer sends the offer
|
|
# Bidder sends the bid
|
|
id_offerer: int = 0
|
|
id_bidder: int = 1
|
|
|
|
swap_clients = self.swap_clients
|
|
reverse_bid: bool = coin_from in swap_clients[id_offerer].scriptless_coins
|
|
ci_from = swap_clients[id_offerer].ci(coin_from)
|
|
ci_to = swap_clients[id_bidder].ci(coin_to)
|
|
|
|
self.prepare_balance(coin_to, 100.0, 1801, 1800)
|
|
self.prepare_balance(coin_from, 100.0, 1800, 1801)
|
|
|
|
# Leader sends the initial (chain a) lock tx.
|
|
# Follower sends the participate (chain b) lock tx.
|
|
id_leader: int = id_bidder if reverse_bid else id_offerer
|
|
id_follower: int = id_offerer if reverse_bid else id_bidder
|
|
logging.info(
|
|
f"Offerer, bidder, leader, follower: {id_offerer}, {id_bidder}, {id_leader}, {id_follower}"
|
|
)
|
|
|
|
amt_swap = ci_from.make_int(random.uniform(0.1, 2.0), r=1)
|
|
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
|
|
offer_id = swap_clients[id_offerer].postOffer(
|
|
coin_from, coin_to, amt_swap, rate_swap, amt_swap, SwapTypes.XMR_SWAP
|
|
)
|
|
|
|
wait_for_offer(test_delay_event, swap_clients[id_bidder], offer_id)
|
|
bid_id = swap_clients[id_bidder].postXmrBid(offer_id, amt_swap)
|
|
wait_for_bid(
|
|
test_delay_event, swap_clients[id_offerer], bid_id, BidStates.BID_RECEIVED
|
|
)
|
|
|
|
swap_clients[id_offerer].acceptBid(bid_id)
|
|
|
|
wait_for_bid(
|
|
test_delay_event,
|
|
swap_clients[id_offerer],
|
|
bid_id,
|
|
BidStates.SWAP_COMPLETED,
|
|
wait_for=(180),
|
|
)
|
|
wait_for_bid(
|
|
test_delay_event,
|
|
swap_clients[id_bidder],
|
|
bid_id,
|
|
BidStates.SWAP_COMPLETED,
|
|
sent=True,
|
|
wait_for=(30),
|
|
)
|
|
|
|
if reverse_bid:
|
|
return # TODO
|
|
|
|
# Verify lock tx spends are found in the expected wallets
|
|
bid, xmr_swap = swap_clients[id_offerer].getXmrBid(bid_id)
|
|
|
|
node_from_ci_to = swap_clients[0].ci(coin_to)
|
|
if node_from_ci_to.coin_type() in (Coins.XMR, Coins.WOW):
|
|
pass
|
|
else:
|
|
wtx = node_from_ci_to.rpc_wallet(
|
|
"gettransaction",
|
|
[
|
|
bid.xmr_b_lock_tx.spend_txid.hex(),
|
|
],
|
|
)
|
|
assert (
|
|
bid.amount_to - node_from_ci_to.make_int(wtx["details"][0]["amount"])
|
|
< self.max_fee
|
|
)
|
|
|
|
node_to_ci_from = swap_clients[1].ci(coin_from)
|
|
if node_to_ci_from.coin_type() in (Coins.XMR, Coins.WOW):
|
|
pass
|
|
else:
|
|
wtx = node_to_ci_from.rpc_wallet(
|
|
"gettransaction",
|
|
[
|
|
xmr_swap.a_lock_spend_tx_id.hex(),
|
|
],
|
|
)
|
|
assert (
|
|
bid.amount - node_to_ci_from.make_int(wtx["details"][0]["amount"])
|
|
< self.max_fee
|
|
)
|
|
|
|
bid_id_hex = bid_id.hex()
|
|
path = f"bids/{bid_id_hex}/states"
|
|
offerer_states = read_json_api(1800 + id_offerer, path)
|
|
bidder_states = read_json_api(1800 + id_bidder, path)
|
|
|
|
assert compare_bid_states(offerer_states, self.states_offerer[0]) is True
|
|
assert compare_bid_states(bidder_states, self.states_bidder[0]) is True
|
|
|
|
|
|
def run_test_ads_both_refund(
|
|
self, coin_from: Coins, coin_to: Coins, lock_value: int = 32
|
|
) -> None:
|
|
logging.info(
|
|
"---------- Test {} to {} both lock txns refunded".format(
|
|
coin_from.name, coin_to.name
|
|
)
|
|
)
|
|
|
|
id_offerer: int = 0
|
|
id_bidder: int = 1
|
|
|
|
swap_clients = self.swap_clients
|
|
reverse_bid: bool = coin_from in swap_clients[id_offerer].scriptless_coins
|
|
ci_from = swap_clients[id_offerer].ci(coin_from)
|
|
ci_to = swap_clients[id_offerer].ci(coin_to)
|
|
|
|
self.prepare_balance(coin_to, 100.0, 1801, 1800)
|
|
self.prepare_balance(coin_from, 100.0, 1800, 1801)
|
|
|
|
id_leader: int = id_bidder if reverse_bid else id_offerer
|
|
id_follower: int = id_offerer if reverse_bid else id_bidder
|
|
logging.info(
|
|
f"Offerer, bidder, leader, follower: {id_offerer}, {id_bidder}, {id_leader}, {id_follower}"
|
|
)
|
|
|
|
amt_swap = ci_from.make_int(random.uniform(0.1, 2.0), r=1)
|
|
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
|
|
|
|
debug_case = (None, DebugTypes.OFFER_LOCK_2_VALUE_INC)
|
|
try:
|
|
swap_clients[0]._debug_cases.append(debug_case)
|
|
swap_clients[1]._debug_cases.append(debug_case)
|
|
offer_id = swap_clients[id_offerer].postOffer(
|
|
coin_from,
|
|
coin_to,
|
|
amt_swap,
|
|
rate_swap,
|
|
amt_swap,
|
|
SwapTypes.XMR_SWAP,
|
|
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS,
|
|
lock_value=lock_value,
|
|
)
|
|
wait_for_offer(test_delay_event, swap_clients[id_bidder], offer_id)
|
|
offer = swap_clients[id_bidder].getOffer(offer_id)
|
|
finally:
|
|
swap_clients[0]._debug_cases.remove(debug_case)
|
|
swap_clients[1]._debug_cases.remove(debug_case)
|
|
|
|
bid_id = swap_clients[id_bidder].postXmrBid(offer_id, offer.amount_from)
|
|
wait_for_bid(
|
|
test_delay_event, swap_clients[id_offerer], bid_id, BidStates.BID_RECEIVED
|
|
)
|
|
|
|
swap_clients[id_follower].setBidDebugInd(
|
|
bid_id, DebugTypes.CREATE_INVALID_COIN_B_LOCK
|
|
)
|
|
swap_clients[id_offerer].acceptBid(bid_id)
|
|
|
|
leader_sent_bid: bool = True if reverse_bid else False
|
|
wait_for_bid(
|
|
test_delay_event,
|
|
swap_clients[id_leader],
|
|
bid_id,
|
|
BidStates.XMR_SWAP_FAILED_REFUNDED,
|
|
sent=leader_sent_bid,
|
|
wait_for=(self.extra_wait_time + 180),
|
|
)
|
|
wait_for_bid(
|
|
test_delay_event,
|
|
swap_clients[id_follower],
|
|
bid_id,
|
|
BidStates.XMR_SWAP_FAILED_REFUNDED,
|
|
sent=(not leader_sent_bid),
|
|
wait_for=(self.extra_wait_time + 40),
|
|
)
|
|
|
|
if reverse_bid:
|
|
return # TODO
|
|
|
|
# Verify lock tx spends are found in the expected wallets
|
|
bid, xmr_swap = swap_clients[id_offerer].getXmrBid(bid_id)
|
|
lock_refund_spend_txid = bid.txns[TxTypes.XMR_SWAP_A_LOCK_REFUND_SPEND].txid
|
|
|
|
bid, xmr_swap = swap_clients[id_bidder].getXmrBid(bid_id)
|
|
|
|
node_from_ci_from = swap_clients[0].ci(coin_from)
|
|
if node_from_ci_from.coin_type() in (Coins.XMR, Coins.WOW):
|
|
pass
|
|
else:
|
|
wtx = node_from_ci_from.rpc_wallet(
|
|
"gettransaction",
|
|
[
|
|
lock_refund_spend_txid.hex(),
|
|
],
|
|
)
|
|
assert (
|
|
bid.amount - node_from_ci_from.make_int(wtx["details"][0]["amount"])
|
|
< self.max_fee
|
|
)
|
|
|
|
node_to_ci_to = swap_clients[1].ci(coin_to)
|
|
if node_to_ci_to.coin_type() in (Coins.XMR, Coins.WOW):
|
|
pass
|
|
else:
|
|
wtx = node_to_ci_to.rpc_wallet(
|
|
"gettransaction",
|
|
[
|
|
bid.xmr_b_lock_tx.spend_txid.hex(),
|
|
],
|
|
)
|
|
assert (
|
|
bid.amount_to - node_to_ci_to.make_int(wtx["details"][0]["amount"])
|
|
< self.max_fee
|
|
)
|
|
|
|
bid_id_hex = bid_id.hex()
|
|
path = f"bids/{bid_id_hex}/states"
|
|
offerer_states = read_json_api(1800 + id_offerer, path)
|
|
bidder_states = read_json_api(1800 + id_bidder, path)
|
|
|
|
assert compare_bid_states(offerer_states, self.states_offerer[1]) is True
|
|
assert compare_bid_states(bidder_states, self.states_bidder[1]) is True
|
|
|
|
|
|
def run_test_ads_swipe_refund(
|
|
self, coin_from: Coins, coin_to: Coins, lock_value: int = 32
|
|
) -> None:
|
|
logging.info(
|
|
"---------- Test {} to {} coin a lock refund tx swiped".format(
|
|
coin_from.name, coin_to.name
|
|
)
|
|
)
|
|
|
|
id_offerer: int = 0
|
|
id_bidder: int = 1
|
|
|
|
swap_clients = self.swap_clients
|
|
reverse_bid: bool = coin_from in swap_clients[id_offerer].scriptless_coins
|
|
ci_from = swap_clients[id_offerer].ci(coin_from)
|
|
ci_to = swap_clients[id_offerer].ci(coin_to)
|
|
|
|
id_leader: int = id_bidder if reverse_bid else id_offerer
|
|
id_follower: int = id_offerer if reverse_bid else id_bidder
|
|
logging.info(
|
|
f"Offerer, bidder, leader, follower: {id_offerer}, {id_bidder}, {id_leader}, {id_follower}"
|
|
)
|
|
|
|
if reverse_bid:
|
|
self.prepare_balance(coin_to, 100.0, 1801, 1800)
|
|
self.prepare_balance(coin_from, 100.0, 1800, 1801)
|
|
|
|
amt_swap = ci_from.make_int(random.uniform(0.1, 2.0), r=1)
|
|
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
|
|
offer_id = swap_clients[id_offerer].postOffer(
|
|
coin_from,
|
|
coin_to,
|
|
amt_swap,
|
|
rate_swap,
|
|
amt_swap,
|
|
SwapTypes.XMR_SWAP,
|
|
lock_type=TxLockTypes.SEQUENCE_LOCK_BLOCKS,
|
|
lock_value=lock_value,
|
|
)
|
|
wait_for_offer(test_delay_event, swap_clients[id_bidder], offer_id)
|
|
offer = swap_clients[id_bidder].getOffer(offer_id)
|
|
|
|
bid_id = swap_clients[id_bidder].postXmrBid(offer_id, offer.amount_from)
|
|
wait_for_bid(
|
|
test_delay_event, swap_clients[id_offerer], bid_id, BidStates.BID_RECEIVED
|
|
)
|
|
|
|
swap_clients[id_follower].setBidDebugInd(
|
|
bid_id, DebugTypes.BID_STOP_AFTER_COIN_A_LOCK
|
|
)
|
|
swap_clients[id_leader].setBidDebugInd(
|
|
bid_id, DebugTypes.BID_DONT_SPEND_COIN_A_LOCK_REFUND
|
|
)
|
|
|
|
swap_clients[id_offerer].acceptBid(bid_id)
|
|
|
|
leader_sent_bid: bool = True if reverse_bid else False
|
|
wait_for_bid(
|
|
test_delay_event,
|
|
swap_clients[id_leader],
|
|
bid_id,
|
|
BidStates.BID_STALLED_FOR_TEST,
|
|
wait_for=(self.extra_wait_time + 180),
|
|
sent=leader_sent_bid,
|
|
)
|
|
wait_for_bid(
|
|
test_delay_event,
|
|
swap_clients[id_follower],
|
|
bid_id,
|
|
BidStates.XMR_SWAP_FAILED_SWIPED,
|
|
wait_for=(self.extra_wait_time + 80),
|
|
sent=(not leader_sent_bid),
|
|
)
|
|
|
|
|
|
def prepareDCDDataDir(datadir, node_id, conf_file, dir_prefix, num_nodes=3):
|
|
node_dir = os.path.join(datadir, dir_prefix + str(node_id))
|
|
if not os.path.exists(node_dir):
|
|
os.makedirs(node_dir)
|
|
cfg_file_path = os.path.join(node_dir, conf_file)
|
|
if os.path.exists(cfg_file_path):
|
|
return
|
|
config = [
|
|
"simnet=1\n",
|
|
"debuglevel=debug\n",
|
|
f"listen=127.0.0.1:{DCR_BASE_PORT + node_id}\n",
|
|
f"rpclisten=127.0.0.1:{DCR_BASE_RPC_PORT + node_id}\n",
|
|
f"rpcuser=test{node_id}\n",
|
|
f"rpcpass=test_pass{node_id}\n",
|
|
"notls=1\n",
|
|
"noseeders=1\n",
|
|
"nodnsseed=1\n",
|
|
"nodiscoverip=1\n",
|
|
"miningaddr=SsYbXyjkKAEXXcGdFgr4u4bo4L8RkCxwQpH\n",
|
|
]
|
|
|
|
for i in range(0, num_nodes):
|
|
if node_id == i:
|
|
continue
|
|
config.append("addpeer=127.0.0.1:{}\n".format(DCR_BASE_PORT + i))
|
|
|
|
with open(cfg_file_path, "w+") as fp:
|
|
for line in config:
|
|
fp.write(line)
|
|
|
|
config = [
|
|
"simnet=1\n",
|
|
"debuglevel=debug\n",
|
|
f"rpclisten=127.0.0.1:{DCR_BASE_WALLET_RPC_PORT + node_id}\n",
|
|
f"rpcconnect=127.0.0.1:{DCR_BASE_RPC_PORT + node_id}\n",
|
|
f"username=test{node_id}\n",
|
|
f"password=test_pass{node_id}\n",
|
|
"noservertls=1\n",
|
|
"noclienttls=1\n",
|
|
"enablevoting=1\n",
|
|
]
|
|
|
|
wallet_cfg_file_path = os.path.join(node_dir, "dcrwallet.conf")
|
|
with open(wallet_cfg_file_path, "w+") as fp:
|
|
for line in config:
|
|
fp.write(line)
|
|
|
|
|
|
class Test(BaseTest):
|
|
__test__ = True
|
|
test_coin = Coins.DCR
|
|
dcr_daemons = []
|
|
start_ltc_nodes = False
|
|
start_xmr_nodes = True
|
|
dcr_mining_addr = "SsYbXyjkKAEXXcGdFgr4u4bo4L8RkCxwQpH"
|
|
extra_wait_time = 0
|
|
max_fee: int = 10000
|
|
|
|
hex_seeds = [
|
|
"e8574b2a94404ee62d8acc0258cab4c0defcfab8a5dfc2f4954c1f9d7e09d72a",
|
|
"10689fc6378e5f318b663560012673441dcdd8d796134e6021a4248cc6342cc6",
|
|
"efc96ffe4fee469407826841d9700ef0a0735b0aa5ec5e7a4aa9bc1afd9a9a30", # Won't match main seed, as it's set randomly
|
|
]
|
|
|
|
@classmethod
|
|
def prepareExtraCoins(cls):
|
|
ci0 = cls.swap_clients[0].ci(cls.test_coin)
|
|
if not cls.restore_instance:
|
|
assert ci0.rpc_wallet("getnewaddress") == cls.dcr_mining_addr
|
|
cls.dcr_ticket_account = ci0.rpc_wallet(
|
|
"getaccount",
|
|
[
|
|
cls.dcr_mining_addr,
|
|
],
|
|
)
|
|
ci0.rpc(
|
|
"generate",
|
|
[
|
|
110,
|
|
],
|
|
)
|
|
else:
|
|
cls.dcr_ticket_account = ci0.rpc_wallet(
|
|
"getaccount",
|
|
[
|
|
cls.dcr_mining_addr,
|
|
],
|
|
)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
logging.info("Finalising Decred Test")
|
|
super(Test, cls).tearDownClass()
|
|
|
|
stopDaemons(cls.dcr_daemons)
|
|
cls.dcr_daemons.clear()
|
|
|
|
@classmethod
|
|
def coins_loop(cls):
|
|
super(Test, cls).coins_loop()
|
|
ci0 = cls.swap_clients[0].ci(cls.test_coin)
|
|
|
|
num_passed: int = 0
|
|
for i in range(30):
|
|
try:
|
|
ci0.rpc_wallet("purchaseticket", [cls.dcr_ticket_account, 0.1, 0])
|
|
num_passed += 1
|
|
if num_passed >= 5:
|
|
break
|
|
test_delay_event.wait(0.1)
|
|
except Exception as e:
|
|
if "double spend" in str(e):
|
|
pass
|
|
else:
|
|
logging.warning("coins_loop purchaseticket {}".format(e))
|
|
test_delay_event.wait(0.5)
|
|
|
|
try:
|
|
if num_passed >= 5:
|
|
ci0.rpc(
|
|
"generate",
|
|
[
|
|
1,
|
|
],
|
|
)
|
|
except Exception as e:
|
|
logging.warning("coins_loop generate {}".format(e))
|
|
|
|
@classmethod
|
|
def prepareExtraDataDir(cls, i):
|
|
extra_opts = []
|
|
if not cls.restore_instance:
|
|
prepareDCDDataDir(cfg.TEST_DATADIRS, i, "dcrd.conf", "dcr_")
|
|
|
|
appdata = os.path.join(cfg.TEST_DATADIRS, "dcr_" + str(i))
|
|
extra_opts.append(f'--appdata="{appdata}"')
|
|
cls.dcr_daemons.append(
|
|
startDaemon(
|
|
appdata,
|
|
DCR_BINDIR,
|
|
DCRD,
|
|
opts=extra_opts,
|
|
extra_config={
|
|
"add_datadir": False,
|
|
"stdout_to_file": True,
|
|
"stdout_filename": "dcrd_stdout.log",
|
|
},
|
|
)
|
|
)
|
|
logging.info("Started %s %d", DCRD, cls.dcr_daemons[-1].handle.pid)
|
|
|
|
waitForRPC(
|
|
make_rpc_func(i, base_rpc_port=DCR_BASE_RPC_PORT),
|
|
test_delay_event,
|
|
rpc_command="getnetworkinfo",
|
|
max_tries=12,
|
|
)
|
|
|
|
extra_opts.append("--pass=test_pass")
|
|
args = [os.path.join(DCR_BINDIR, DCR_WALLET), "--create"] + extra_opts
|
|
createDCRWallet(args, cls.hex_seeds[i], logging, test_delay_event)
|
|
|
|
test_delay_event.wait(1.0)
|
|
|
|
cls.dcr_daemons.append(
|
|
startDaemon(
|
|
appdata,
|
|
DCR_BINDIR,
|
|
DCR_WALLET,
|
|
opts=extra_opts,
|
|
extra_config={
|
|
"add_datadir": False,
|
|
"stdout_to_file": True,
|
|
"stdout_filename": "dcrwallet_stdout.log",
|
|
},
|
|
)
|
|
)
|
|
logging.info("Started %s %d", DCR_WALLET, cls.dcr_daemons[-1].handle.pid)
|
|
|
|
waitForRPC(
|
|
make_rpc_func(i, base_rpc_port=DCR_BASE_WALLET_RPC_PORT),
|
|
test_delay_event,
|
|
rpc_command="getinfo",
|
|
max_tries=12,
|
|
)
|
|
|
|
@classmethod
|
|
def addPIDInfo(cls, sc, i):
|
|
sc.setDaemonPID(Coins.DCR, cls.dcr_daemons[i].handle.pid)
|
|
|
|
@classmethod
|
|
def addCoinSettings(cls, settings, datadir, node_id):
|
|
settings["chainclients"]["decred"] = {
|
|
"connection_type": "rpc",
|
|
"manage_daemon": False,
|
|
"rpcport": DCR_BASE_RPC_PORT + node_id,
|
|
"walletrpcport": DCR_BASE_WALLET_RPC_PORT + node_id,
|
|
"rpcuser": "test" + str(node_id),
|
|
"rpcpassword": "test_pass" + str(node_id),
|
|
"datadir": os.path.join(datadir, "dcr_" + str(node_id)),
|
|
"bindir": DCR_BINDIR,
|
|
"wallet_pwd": "test_pass",
|
|
"use_csv": True,
|
|
"use_segwit": True,
|
|
"blocks_confirmed": 1,
|
|
}
|
|
|
|
def test_0001_decred_address(self):
|
|
logging.info("---------- Test {}".format(self.test_coin.name))
|
|
|
|
coin_settings = {"rpcport": 0, "rpcauth": "none"}
|
|
coin_settings.update(REQUIRED_SETTINGS)
|
|
|
|
ci = DCRInterface(coin_settings, "mainnet")
|
|
|
|
k = ci.getNewRandomKey()
|
|
K = ci.getPubkey(k)
|
|
|
|
pkh = ci.pkh(K)
|
|
address = ci.pkh_to_address(pkh)
|
|
assert address.startswith("Ds")
|
|
|
|
data = ci.decode_address(address)
|
|
assert data[2:] == pkh
|
|
|
|
for i, sc in enumerate(self.swap_clients):
|
|
loop_ci = sc.ci(self.test_coin)
|
|
root_key = sc.getWalletKey(Coins.DCR, 1)
|
|
masterpubkey = loop_ci.rpc_wallet("getmasterpubkey")
|
|
masterpubkey_data = loop_ci.decode_address(masterpubkey)[4:]
|
|
|
|
seed_hash = loop_ci.getSeedHash(root_key)
|
|
if i == 0:
|
|
assert (
|
|
masterpubkey
|
|
== "spubVV1z2AFYjVZvzM45FSaWMPRqyUoUwyW78wfANdjdNG6JGCXrr8AbRvUgYb3Lm1iun9CgHew1KswdePryNLKEnBSQ82AjNpYdQgzXPUme9c6"
|
|
)
|
|
if i < 2:
|
|
assert seed_hash == hash160(masterpubkey_data)
|
|
|
|
def test_001_segwit(self):
|
|
logging.info("---------- Test {} segwit".format(self.test_coin.name))
|
|
|
|
swap_clients = self.swap_clients
|
|
ci0 = swap_clients[0].ci(self.test_coin)
|
|
assert ci0.using_segwit() is True
|
|
|
|
addr_out = ci0.getNewAddress()
|
|
addr_info = ci0.rpc_wallet(
|
|
"validateaddress",
|
|
[
|
|
addr_out,
|
|
],
|
|
)
|
|
assert addr_info["isvalid"] is True
|
|
assert addr_info["ismine"] is True
|
|
|
|
rtx = ci0.rpc_wallet("createrawtransaction", [[], {addr_out: 2.0}])
|
|
|
|
account_from = ci0.rpc_wallet(
|
|
"getaccount",
|
|
[
|
|
self.dcr_mining_addr,
|
|
],
|
|
)
|
|
frtx = ci0.rpc_wallet("fundrawtransaction", [rtx, account_from])
|
|
|
|
f_decoded = ci0.rpc_wallet(
|
|
"decoderawtransaction",
|
|
[
|
|
frtx["hex"],
|
|
],
|
|
)
|
|
assert f_decoded["version"] == 1
|
|
|
|
sfrtx = ci0.rpc_wallet("signrawtransaction", [frtx["hex"]])
|
|
s_decoded = ci0.rpc_wallet(
|
|
"decoderawtransaction",
|
|
[
|
|
sfrtx["hex"],
|
|
],
|
|
)
|
|
sent_txid = ci0.rpc_wallet(
|
|
"sendrawtransaction",
|
|
[
|
|
sfrtx["hex"],
|
|
],
|
|
)
|
|
|
|
assert f_decoded["txid"] == sent_txid
|
|
assert f_decoded["txid"] == s_decoded["txid"]
|
|
assert f_decoded["txid"] == s_decoded["txid"]
|
|
|
|
ctx = ci0.loadTx(bytes.fromhex(sfrtx["hex"]))
|
|
ser_out = ctx.serialize()
|
|
assert ser_out.hex() == sfrtx["hex"]
|
|
assert f_decoded["txid"] == ctx.TxHash().hex()
|
|
|
|
def test_003_signature_hash(self):
|
|
logging.info("---------- Test {} signature_hash".format(self.test_coin.name))
|
|
# Test that signing a transaction manually produces the same result when signed with the wallet
|
|
|
|
swap_clients = self.swap_clients
|
|
ci0 = swap_clients[0].ci(self.test_coin)
|
|
|
|
utxos = ci0.rpc_wallet("listunspent")
|
|
addr_out = ci0.rpc_wallet("getnewaddress")
|
|
rtx = ci0.rpc_wallet("createrawtransaction", [[], {addr_out: 2.0}])
|
|
|
|
account_from = ci0.rpc_wallet(
|
|
"getaccount",
|
|
[
|
|
self.dcr_mining_addr,
|
|
],
|
|
)
|
|
frtx = ci0.rpc_wallet("fundrawtransaction", [rtx, account_from])
|
|
sfrtx = ci0.rpc_wallet("signrawtransaction", [frtx["hex"]])
|
|
|
|
ctx = ci0.loadTx(bytes.fromhex(frtx["hex"]))
|
|
|
|
prevout = None
|
|
prevout_txid = ctx.vin[0].prevout.get_hash().hex()
|
|
prevout_n = ctx.vin[0].prevout.n
|
|
for utxo in utxos:
|
|
if prevout_txid == utxo["txid"] and prevout_n == utxo["vout"]:
|
|
prevout = utxo
|
|
break
|
|
assert prevout is not None
|
|
|
|
tx_bytes_no_witness: bytes = ctx.serialize(TxSerializeType.NoWitness)
|
|
|
|
priv_key_wif = ci0.rpc_wallet(
|
|
"dumpprivkey",
|
|
[
|
|
prevout["address"],
|
|
],
|
|
)
|
|
sig_type, key_bytes = ci0.decodeKey(priv_key_wif)
|
|
|
|
addr_info = ci0.rpc_wallet(
|
|
"validateaddress",
|
|
[
|
|
prevout["address"],
|
|
],
|
|
)
|
|
pk_hex: str = addr_info["pubkey"]
|
|
|
|
sig0_py = ci0.signTx(
|
|
key_bytes,
|
|
tx_bytes_no_witness,
|
|
0,
|
|
bytes.fromhex(prevout["scriptPubKey"]),
|
|
ci0.make_int(prevout["amount"]),
|
|
)
|
|
tx_bytes_signed = ci0.setTxSignature(
|
|
tx_bytes_no_witness, [sig0_py, bytes.fromhex(pk_hex)]
|
|
)
|
|
|
|
# Set prevout value
|
|
ctx = ci0.loadTx(tx_bytes_signed)
|
|
assert ctx.vout[0].version == 0
|
|
ctx.vin[0].value_in = ci0.make_int(prevout["amount"])
|
|
tx_bytes_signed = ctx.serialize()
|
|
assert tx_bytes_signed.hex() == sfrtx["hex"]
|
|
|
|
sent_txid = ci0.rpc_wallet(
|
|
"sendrawtransaction",
|
|
[
|
|
tx_bytes_signed.hex(),
|
|
],
|
|
)
|
|
assert len(sent_txid) == 64
|
|
|
|
def test_004_csv(self):
|
|
logging.info("---------- Test {} csv".format(self.test_coin.name))
|
|
swap_clients = self.swap_clients
|
|
ci0 = swap_clients[0].ci(self.test_coin)
|
|
|
|
script = bytearray()
|
|
push_script_data(script, bytes((3,)))
|
|
script += bytes((OP_CHECKSEQUENCEVERIFY,))
|
|
|
|
script_dest = ci0.getScriptDest(script)
|
|
script_info = ci0.rpc_wallet(
|
|
"decodescript",
|
|
[
|
|
script_dest.hex(),
|
|
],
|
|
)
|
|
script_addr = ci0.encodeScriptDest(script_dest)
|
|
assert script_info["addresses"][0] == script_addr
|
|
|
|
prevout_amount: int = ci0.make_int(1.1)
|
|
tx = CTransaction()
|
|
tx.version = ci0.txVersion()
|
|
tx.vout.append(ci0.txoType()(prevout_amount, script_dest))
|
|
tx_hex = tx.serialize().hex()
|
|
tx_decoded = ci0.rpc_wallet(
|
|
"decoderawtransaction",
|
|
[
|
|
tx_hex,
|
|
],
|
|
)
|
|
|
|
utxo_pos = None
|
|
script_address = None
|
|
for i, txo in enumerate(tx_decoded["vout"]):
|
|
script_address = tx_decoded["vout"][0]["scriptPubKey"]["addresses"][0]
|
|
addr_info = ci0.rpc_wallet(
|
|
"validateaddress",
|
|
[
|
|
script_address,
|
|
],
|
|
)
|
|
if addr_info["isscript"] is True:
|
|
utxo_pos = i
|
|
break
|
|
assert utxo_pos is not None
|
|
|
|
accounts = ci0.rpc_wallet("listaccounts")
|
|
for account_from in accounts:
|
|
try:
|
|
frtx = ci0.rpc_wallet("fundrawtransaction", [tx_hex, account_from])
|
|
break
|
|
except Exception as e:
|
|
logging.warning("fundrawtransaction failed {}".format(e))
|
|
sfrtx = ci0.rpc_wallet("signrawtransaction", [frtx["hex"]])
|
|
sent_txid = ci0.rpc_wallet(
|
|
"sendrawtransaction",
|
|
[
|
|
sfrtx["hex"],
|
|
],
|
|
)
|
|
|
|
tx_spend = CTransaction()
|
|
tx_spend.version = ci0.txVersion()
|
|
|
|
tx_spend.vin.append(CTxIn(COutPoint(int(sent_txid, 16), utxo_pos), sequence=3))
|
|
tx_spend.vin[0].value_in = prevout_amount
|
|
signature_script = bytearray()
|
|
push_script_data(signature_script, script)
|
|
tx_spend.vin[0].signature_script = signature_script
|
|
|
|
addr_out = ci0.getNewAddress()
|
|
pkh = ci0.decode_address(addr_out)[2:]
|
|
|
|
tx_spend.vout.append(ci0.txoType()())
|
|
tx_spend.vout[0].value = ci0.make_int(1.09)
|
|
tx_spend.vout[0].script_pubkey = ci0.getPubkeyHashDest(pkh)
|
|
|
|
tx_spend_hex = tx_spend.serialize().hex()
|
|
|
|
try:
|
|
sent_spend_txid = ci0.rpc_wallet(
|
|
"sendrawtransaction",
|
|
[
|
|
tx_spend_hex,
|
|
],
|
|
)
|
|
logging.info(
|
|
"Sent tx spending csv output, txid: {}".format(sent_spend_txid)
|
|
)
|
|
except Exception as e:
|
|
assert "transaction sequence locks on inputs not met" in str(e)
|
|
else:
|
|
assert False, "Should fail"
|
|
|
|
sent_spend_txid = None
|
|
for i in range(20):
|
|
try:
|
|
sent_spend_txid = ci0.rpc_wallet(
|
|
"sendrawtransaction",
|
|
[
|
|
tx_spend_hex,
|
|
],
|
|
)
|
|
break
|
|
except Exception as e:
|
|
logging.info(
|
|
"sendrawtransaction failed {}, height {}".format(
|
|
e, ci0.getChainHeight()
|
|
)
|
|
)
|
|
test_delay_event.wait(1)
|
|
|
|
assert sent_spend_txid is not None
|
|
|
|
def test_005_watchonly(self):
|
|
logging.info("---------- Test {} watchonly".format(self.test_coin.name))
|
|
|
|
swap_clients = self.swap_clients
|
|
ci0 = swap_clients[0].ci(self.test_coin)
|
|
ci1 = swap_clients[1].ci(self.test_coin)
|
|
|
|
addr = ci0.getNewAddress()
|
|
pkh = ci0.decode_address(addr)[2:]
|
|
addr_info = ci0.rpc_wallet(
|
|
"validateaddress",
|
|
[
|
|
addr,
|
|
],
|
|
)
|
|
|
|
addr_script = ci0.getPubkeyHashDest(pkh).hex()
|
|
script_info = ci0.rpc_wallet(
|
|
"decodescript",
|
|
[
|
|
addr_script,
|
|
],
|
|
)
|
|
assert addr in script_info["addresses"]
|
|
|
|
# Importscript doesn't import an address
|
|
ci1.rpc_wallet(
|
|
"importscript",
|
|
[
|
|
addr_script,
|
|
],
|
|
)
|
|
addr_info1 = ci1.rpc_wallet(
|
|
"validateaddress",
|
|
[
|
|
addr,
|
|
],
|
|
)
|
|
assert addr_info1.get("ismine", False) is False
|
|
|
|
# Would need to run a second wallet daemon?
|
|
try:
|
|
ci1.rpc_wallet(
|
|
"importpubkey",
|
|
[
|
|
addr_info["pubkey"],
|
|
],
|
|
)
|
|
except Exception as e:
|
|
assert "public keys may only be imported by watching-only wallets" in str(e)
|
|
else:
|
|
logging.info("Expected importpubkey to fail on non watching-only wallet")
|
|
|
|
chain_height_last = ci1.getChainHeight()
|
|
ci0.rpc_wallet("sendtoaddress", [addr, 1])
|
|
|
|
found_txid = None
|
|
for i in range(20):
|
|
if found_txid is not None:
|
|
break
|
|
|
|
chain_height_now = ci1.getChainHeight()
|
|
while chain_height_last <= chain_height_now:
|
|
if found_txid is not None:
|
|
break
|
|
try:
|
|
check_hash = ci1.rpc(
|
|
"getblockhash",
|
|
[
|
|
chain_height_last + 1,
|
|
],
|
|
)
|
|
except Exception as e:
|
|
logging.warning(
|
|
"getblockhash {} failed {}".format(chain_height_last + 1, e)
|
|
)
|
|
test_delay_event.wait(1)
|
|
break
|
|
|
|
chain_height_last += 1
|
|
check_hash = ci1.rpc(
|
|
"getblockhash",
|
|
[
|
|
chain_height_last,
|
|
],
|
|
)
|
|
block_tx = ci1.rpc("getblock", [check_hash, True, True])
|
|
for tx in block_tx["rawtx"]:
|
|
if found_txid is not None:
|
|
break
|
|
for txo in tx["vout"]:
|
|
if addr_script == txo["scriptPubKey"]["hex"]:
|
|
found_txid = tx["txid"]
|
|
logging.info("found_txid {}".format(found_txid))
|
|
break
|
|
|
|
test_delay_event.wait(1)
|
|
|
|
assert found_txid is not None
|
|
|
|
def test_008_gettxout(self):
|
|
logging.info("---------- Test {} gettxout".format(self.test_coin.name))
|
|
|
|
ci0 = self.swap_clients[0].ci(self.test_coin)
|
|
|
|
addr = ci0.getNewAddress()
|
|
|
|
test_amount: float = 1.0
|
|
txid = ci0.withdrawCoin(test_amount, addr)
|
|
assert len(txid) == 64
|
|
|
|
unspents = None
|
|
for i in range(30):
|
|
unspents = ci0.rpc_wallet(
|
|
"listunspent",
|
|
[
|
|
0,
|
|
999999999,
|
|
[
|
|
addr,
|
|
],
|
|
],
|
|
)
|
|
if unspents is None:
|
|
unspents = []
|
|
if len(unspents) > 0:
|
|
break
|
|
test_delay_event.wait(1)
|
|
assert len(unspents) == 1
|
|
utxo = unspents[0]
|
|
|
|
include_mempool: bool = False
|
|
txout = ci0.rpc(
|
|
"gettxout", [utxo["txid"], utxo["vout"], utxo["tree"], include_mempool]
|
|
)
|
|
|
|
# Lock utxo so it's not spent for tickets, while waiting for depth
|
|
ci0.rpc_wallet(
|
|
"lockunspent",
|
|
[
|
|
False,
|
|
[
|
|
utxo,
|
|
],
|
|
],
|
|
)
|
|
|
|
def wait_for_depth():
|
|
for i in range(20):
|
|
logging.info("Waiting for txout depth, iter {}".format(i))
|
|
txout = ci0.rpc(
|
|
"gettxout", [utxo["txid"], utxo["vout"], utxo["tree"], True]
|
|
)
|
|
if txout["confirmations"] > 0:
|
|
return txout
|
|
test_delay_event.wait(1)
|
|
raise ValueError("prevout not confirmed")
|
|
|
|
txout = wait_for_depth()
|
|
assert txout["confirmations"] > 0
|
|
assert addr in txout["scriptPubKey"]["addresses"]
|
|
|
|
addr_out = ci0.getNewAddress()
|
|
rtx = ci0.rpc_wallet(
|
|
"createrawtransaction",
|
|
[
|
|
[
|
|
utxo,
|
|
],
|
|
{addr_out: test_amount - 0.0001},
|
|
],
|
|
)
|
|
stx = ci0.rpc_wallet("signrawtransaction", [rtx])
|
|
|
|
chain_height_before_send = ci0.getChainHeight()
|
|
ci0.rpc_wallet(
|
|
"sendrawtransaction",
|
|
[
|
|
stx["hex"],
|
|
],
|
|
)
|
|
|
|
# NOTE: UTXO is still found when spent in the mempool (tested in loop, not delay from wallet to core)
|
|
txout = ci0.rpc(
|
|
"gettxout", [utxo["txid"], utxo["vout"], utxo["tree"], include_mempool]
|
|
)
|
|
assert addr in txout["scriptPubKey"]["addresses"]
|
|
|
|
for i in range(20):
|
|
txout = ci0.rpc(
|
|
"gettxout", [utxo["txid"], utxo["vout"], utxo["tree"], include_mempool]
|
|
)
|
|
if txout is None:
|
|
logging.info(
|
|
"txout spent, height before spent {}, height spent {}".format(
|
|
chain_height_before_send, ci0.getChainHeight()
|
|
)
|
|
)
|
|
break
|
|
test_delay_event.wait(1)
|
|
assert txout is None
|
|
|
|
logging.info("Testing getProofOfFunds")
|
|
require_amount: int = ci0.make_int(1)
|
|
funds_proof = ci0.getProofOfFunds(require_amount, "test".encode("utf-8"))
|
|
|
|
logging.info("Testing verifyProofOfFunds")
|
|
amount_proved = ci0.verifyProofOfFunds(
|
|
funds_proof[0], funds_proof[1], funds_proof[2], "test".encode("utf-8")
|
|
)
|
|
assert amount_proved >= require_amount
|
|
|
|
def test_009_wallet_encryption(self):
|
|
logging.info("---------- Test {} wallet encryption".format(self.test_coin.name))
|
|
|
|
for coin in ("part", "dcr", "xmr"):
|
|
jsw = read_json_api(1800, f"wallets/{coin}")
|
|
assert jsw["encrypted"] is (True if coin == "dcr" else False)
|
|
assert jsw["locked"] is False
|
|
|
|
read_json_api(
|
|
1800, "setpassword", {"oldpassword": "", "newpassword": "notapassword123"}
|
|
)
|
|
|
|
# Entire system is locked with Particl wallet
|
|
jsw = read_json_api(1800, "wallets/dcr")
|
|
assert "Coin must be unlocked" in jsw["error"]
|
|
|
|
read_json_api(1800, "unlock", {"coin": "part", "password": "notapassword123"})
|
|
|
|
for coin in ("dcr", "xmr"):
|
|
jsw = read_json_api(1800, f"wallets/{coin}")
|
|
assert jsw["encrypted"] is True
|
|
assert jsw["locked"] is True
|
|
|
|
read_json_api(1800, "lock", {"coin": "part"})
|
|
jsw = read_json_api(1800, "wallets/part")
|
|
assert "Coin must be unlocked" in jsw["error"]
|
|
|
|
read_json_api(
|
|
1800,
|
|
"setpassword",
|
|
{"oldpassword": "notapassword123", "newpassword": "notapassword456"},
|
|
)
|
|
read_json_api(1800, "unlock", {"password": "notapassword456"})
|
|
|
|
for coin in ("part", "dcr", "xmr"):
|
|
jsw = read_json_api(1800, f"wallets/{coin}")
|
|
assert jsw["encrypted"] is True
|
|
assert jsw["locked"] is False
|
|
|
|
def test_010_txn_size(self):
|
|
logging.info("---------- Test {} txn size".format(self.test_coin.name))
|
|
|
|
swap_clients = self.swap_clients
|
|
ci = swap_clients[0].ci(self.test_coin)
|
|
pi = swap_clients[0].pi(SwapTypes.XMR_SWAP)
|
|
|
|
amount: int = ci.make_int(random.uniform(0.1, 2.0), r=1)
|
|
|
|
# Record unspents before createSCLockTx as the used ones will be locked
|
|
unspents = ci.rpc_wallet("listunspent")
|
|
|
|
# fee_rate is in sats/kvB
|
|
fee_rate: int = 10000
|
|
|
|
a = ci.getNewRandomKey()
|
|
b = ci.getNewRandomKey()
|
|
|
|
A = ci.getPubkey(a)
|
|
B = ci.getPubkey(b)
|
|
lock_tx_script = pi.genScriptLockTxScript(ci, A, B)
|
|
|
|
lock_tx = ci.createSCLockTx(amount, lock_tx_script)
|
|
lock_tx = ci.fundSCLockTx(lock_tx, fee_rate)
|
|
lock_tx = ci.signTxWithWallet(lock_tx)
|
|
|
|
unspents_after = ci.rpc_wallet("listunspent")
|
|
assert len(unspents) > len(unspents_after)
|
|
|
|
tx_decoded = ci.rpc("decoderawtransaction", [lock_tx.hex()])
|
|
txid = tx_decoded["txid"]
|
|
|
|
out_value: int = 0
|
|
for txo in tx_decoded["vout"]:
|
|
if "value" in txo:
|
|
out_value += ci.make_int(txo["value"])
|
|
in_value: int = 0
|
|
for txi in tx_decoded["vin"]:
|
|
for utxo in unspents:
|
|
if "vout" not in utxo:
|
|
continue
|
|
if utxo["txid"] == txi["txid"] and utxo["vout"] == txi["vout"]:
|
|
in_value += ci.make_int(utxo["amount"])
|
|
break
|
|
|
|
ci.rpc("sendrawtransaction", [lock_tx.hex()])
|
|
|
|
addr_out = ci.getNewAddress(True)
|
|
pkh_out = ci.decodeAddress(addr_out)
|
|
fee_info = {}
|
|
lock_spend_tx = ci.createSCLockSpendTx(
|
|
lock_tx, lock_tx_script, pkh_out, fee_rate, fee_info=fee_info
|
|
)
|
|
size_estimated: int = fee_info["size"]
|
|
|
|
tx_decoded = ci.rpc("decoderawtransaction", [lock_spend_tx.hex()])
|
|
txid = tx_decoded["txid"]
|
|
|
|
witness_stack = [
|
|
ci.signTx(a, lock_spend_tx, 0, lock_tx_script, amount),
|
|
ci.signTx(b, lock_spend_tx, 0, lock_tx_script, amount),
|
|
lock_tx_script,
|
|
]
|
|
lock_spend_tx = ci.setTxSignature(lock_spend_tx, witness_stack)
|
|
|
|
size_actual: int = len(lock_spend_tx)
|
|
|
|
assert size_actual <= size_estimated and size_estimated - size_actual < 4
|
|
assert ci.rpc("sendrawtransaction", [lock_spend_tx.hex()]) == txid
|
|
|
|
expect_size: int = ci.xmr_swap_a_lock_spend_tx_vsize()
|
|
assert expect_size >= size_actual
|
|
assert expect_size - size_actual < 10
|
|
|
|
# Test chain b (no-script) lock tx size
|
|
v = ci.getNewRandomKey()
|
|
s = ci.getNewRandomKey()
|
|
S = ci.getPubkey(s)
|
|
lock_tx_b_txid = ci.publishBLockTx(v, S, amount, fee_rate)
|
|
test_delay_event.wait(1)
|
|
|
|
addr_out = ci.getNewAddress(True)
|
|
lock_tx_b_spend_txid = ci.spendBLockTx(
|
|
lock_tx_b_txid, addr_out, v, s, amount, fee_rate, 0
|
|
)
|
|
test_delay_event.wait(1)
|
|
lock_tx_b_spend = ci.getWalletTransaction(lock_tx_b_spend_txid)
|
|
if lock_tx_b_spend is None:
|
|
lock_tx_b_spend = ci.getWalletTransaction(lock_tx_b_spend_txid)
|
|
|
|
tx_obj = ci.loadTx(lock_tx_b_spend)
|
|
tx_out_value: int = tx_obj.vout[0].value
|
|
fee_paid = amount - tx_out_value
|
|
|
|
actual_size = len(lock_tx_b_spend)
|
|
expect_size: int = ci.xmr_swap_b_lock_spend_tx_vsize()
|
|
fee_expect = round(fee_rate * expect_size / 1000)
|
|
assert fee_expect == fee_paid
|
|
assert expect_size >= actual_size
|
|
assert expect_size - actual_size < 10
|
|
|
|
def test_02_part_coin(self):
|
|
run_test_success_path(self, Coins.PART, self.test_coin)
|
|
|
|
def test_03_coin_part(self):
|
|
run_test_success_path(self, self.test_coin, Coins.PART)
|
|
|
|
def test_04_part_coin_bad_ptx(self):
|
|
run_test_bad_ptx(self, Coins.PART, self.test_coin)
|
|
|
|
def test_05_coin_part_bad_ptx(self):
|
|
run_test_bad_ptx(self, self.test_coin, Coins.PART)
|
|
|
|
def test_06_part_coin_itx_refund(self):
|
|
run_test_itx_refund(self, Coins.PART, self.test_coin)
|
|
|
|
def test_07_coin_part_itx_refund(self):
|
|
run_test_itx_refund(self, self.test_coin, Coins.PART)
|
|
|
|
def test_08_ads_coin_xmr(self):
|
|
run_test_ads_success_path(self, self.test_coin, Coins.XMR)
|
|
|
|
def test_09_ads_xmr_coin(self):
|
|
# Reverse bid
|
|
run_test_ads_success_path(self, Coins.XMR, self.test_coin)
|
|
|
|
def test_10_ads_part_coin(self):
|
|
run_test_ads_success_path(self, Coins.PART, self.test_coin)
|
|
|
|
def test_11_ads_coin_xmr_both_refund(self):
|
|
run_test_ads_both_refund(self, self.test_coin, Coins.XMR, lock_value=20)
|
|
|
|
def test_12_ads_xmr_coin_both_refund(self):
|
|
# Reverse bid
|
|
run_test_ads_both_refund(self, Coins.XMR, self.test_coin, lock_value=20)
|
|
|
|
def test_13_ads_part_coin_both_refund(self):
|
|
run_test_ads_both_refund(self, Coins.PART, self.test_coin, lock_value=20)
|
|
|
|
def test_14_ads_coin_xmr_swipe_refund(self):
|
|
run_test_ads_swipe_refund(self, self.test_coin, Coins.XMR, lock_value=20)
|
|
|
|
def test_15_ads_xmr_coin_swipe_refund(self):
|
|
# Reverse bid
|
|
run_test_ads_swipe_refund(self, Coins.XMR, self.test_coin, lock_value=20)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|