mirror of
https://github.com/basicswap/basicswap.git
synced 2025-12-04 06:27:08 +01:00
Add workaround for btc seed changing after encrypting wallet.
Loses wallet history, rescanblockchain won't work on pruned chains.
This commit is contained in:
@@ -1358,7 +1358,9 @@ class BasicSwap(BaseApp):
|
||||
legacy_root_hash = ci.getSeedHash(root_key, 20)
|
||||
self.setStringKV(key_str, legacy_root_hash.hex(), cursor)
|
||||
|
||||
def initialiseWallet(self, interface_type, raise_errors: bool = False) -> None:
|
||||
def initialiseWallet(
|
||||
self, interface_type, raise_errors: bool = False, restore_time: int = -1
|
||||
) -> None:
|
||||
if interface_type == Coins.PART:
|
||||
return
|
||||
ci = self.ci(interface_type)
|
||||
@@ -1377,7 +1379,7 @@ class BasicSwap(BaseApp):
|
||||
|
||||
root_key = self.getWalletKey(interface_type, 1)
|
||||
try:
|
||||
ci.initialiseWallet(root_key)
|
||||
ci.initialiseWallet(root_key, restore_time)
|
||||
except Exception as e:
|
||||
# < 0.21: sethdseed cannot set a new HD seed while still in Initial Block Download.
|
||||
self.log.error(f"initialiseWallet failed: {e}")
|
||||
|
||||
@@ -182,6 +182,7 @@ BSX_UPDATE_UNMANAGED = toBool(
|
||||
UI_HTML_PORT = int(os.getenv("UI_HTML_PORT", 12700))
|
||||
UI_WS_PORT = int(os.getenv("UI_WS_PORT", 11700))
|
||||
COINS_RPCBIND_IP = os.getenv("COINS_RPCBIND_IP", "127.0.0.1")
|
||||
DEFAULT_RESTORE_TIME = int(os.getenv("DEFAULT_RESTORE_TIME", 1577833261)) # 2020
|
||||
|
||||
PART_ZMQ_PORT = int(os.getenv("PART_ZMQ_PORT", 20792))
|
||||
PART_RPC_HOST = os.getenv("PART_RPC_HOST", "127.0.0.1")
|
||||
@@ -1707,6 +1708,11 @@ def printHelp():
|
||||
DEFAULT_WOW_RESTORE_HEIGHT
|
||||
)
|
||||
)
|
||||
print(
|
||||
"--walletrestoretime=n Time to restore wallets from, default:{}, -1 for now.".format(
|
||||
DEFAULT_RESTORE_TIME
|
||||
)
|
||||
)
|
||||
print(
|
||||
"--trustremotenode Set trusted-daemon for XMR, defaults to auto: true when daemon rpchost value is a private ip address else false"
|
||||
)
|
||||
@@ -1808,12 +1814,18 @@ def test_particl_encryption(data_dir, settings, chain, use_tor_proxy):
|
||||
|
||||
def encrypt_wallet(swap_client, coin_type) -> None:
|
||||
ci = swap_client.ci(coin_type)
|
||||
ci.changeWalletPassword("", WALLET_ENCRYPTION_PWD)
|
||||
ci.changeWalletPassword("", WALLET_ENCRYPTION_PWD, check_seed_if_encrypt=False)
|
||||
ci.unlockWallet(WALLET_ENCRYPTION_PWD)
|
||||
|
||||
|
||||
def initialise_wallets(
|
||||
particl_wallet_mnemonic, with_coins, data_dir, settings, chain, use_tor_proxy
|
||||
particl_wallet_mnemonic,
|
||||
with_coins,
|
||||
data_dir,
|
||||
settings,
|
||||
chain,
|
||||
use_tor_proxy,
|
||||
extra_opts={},
|
||||
):
|
||||
swap_client = None
|
||||
daemons = []
|
||||
@@ -1922,7 +1934,7 @@ def initialise_wallets(
|
||||
if WALLET_ENCRYPTION_PWD == ""
|
||||
else WALLET_ENCRYPTION_PWD
|
||||
)
|
||||
extra_opts = [
|
||||
extra_args = [
|
||||
'--appdata="{}"'.format(coin_settings["datadir"]),
|
||||
"--pass={}".format(dcr_password),
|
||||
]
|
||||
@@ -1931,7 +1943,7 @@ def initialise_wallets(
|
||||
args = [
|
||||
os.path.join(coin_settings["bindir"], filename),
|
||||
"--create",
|
||||
] + extra_opts
|
||||
] + extra_args
|
||||
hex_seed = swap_client.getWalletKey(Coins.DCR, 1).hex()
|
||||
createDCRWallet(args, hex_seed, logger, threading.Event())
|
||||
continue
|
||||
@@ -2028,6 +2040,7 @@ def initialise_wallets(
|
||||
)
|
||||
|
||||
for coin_name in with_coins:
|
||||
coin_settings = settings["chainclients"][coin_name]
|
||||
c = swap_client.getCoinIdFromName(coin_name)
|
||||
if c in (Coins.PART,):
|
||||
continue
|
||||
@@ -2035,14 +2048,29 @@ def initialise_wallets(
|
||||
# initialiseWallet only sets main_wallet_seedid_
|
||||
swap_client.waitForDaemonRPC(c)
|
||||
try:
|
||||
swap_client.initialiseWallet(c, raise_errors=True)
|
||||
default_restore_time = (
|
||||
-1 if generated_mnemonic else DEFAULT_RESTORE_TIME
|
||||
) # Set to -1 (now) if key is newly generated
|
||||
restore_time: int = extra_opts.get(
|
||||
"walletrestoretime", default_restore_time
|
||||
)
|
||||
|
||||
swap_client.initialiseWallet(
|
||||
c, raise_errors=True, restore_time=restore_time
|
||||
)
|
||||
if c not in (Coins.XMR, Coins.WOW):
|
||||
if restore_time == -1:
|
||||
restore_time = int(time.time())
|
||||
coin_settings["restore_time"] = restore_time
|
||||
except Exception as e:
|
||||
coins_failed_to_initialise.append((c, e))
|
||||
if WALLET_ENCRYPTION_PWD != "" and (
|
||||
c not in coins_to_create_wallets_for or c in (Coins.DASH,)
|
||||
): # TODO: Remove DASH workaround
|
||||
try:
|
||||
swap_client.ci(c).changeWalletPassword("", WALLET_ENCRYPTION_PWD)
|
||||
swap_client.ci(c).changeWalletPassword(
|
||||
"", WALLET_ENCRYPTION_PWD, check_seed_if_encrypt=False
|
||||
)
|
||||
except Exception as e: # noqa: F841
|
||||
logger.warning(f"changeWalletPassword failed for {coin_name}.")
|
||||
|
||||
@@ -2363,6 +2391,9 @@ def main():
|
||||
if name == "wowrestoreheight":
|
||||
wow_restore_height = int(s[1])
|
||||
continue
|
||||
if name == "walletrestoretime":
|
||||
extra_opts["walletrestoretime"] = int(s[1])
|
||||
continue
|
||||
if name == "keysdirpath":
|
||||
extra_opts["keysdirpath"] = os.path.expanduser(s[1].strip('"'))
|
||||
continue
|
||||
@@ -2823,6 +2854,7 @@ def main():
|
||||
settings,
|
||||
chain,
|
||||
use_tor_proxy,
|
||||
extra_opts=extra_opts,
|
||||
)
|
||||
|
||||
print("Done.")
|
||||
@@ -2944,6 +2976,7 @@ def main():
|
||||
settings,
|
||||
chain,
|
||||
use_tor_proxy,
|
||||
extra_opts=extra_opts,
|
||||
)
|
||||
|
||||
save_config(config_path, settings)
|
||||
@@ -3085,12 +3118,20 @@ def main():
|
||||
save_config(config_path, settings)
|
||||
|
||||
if particl_wallet_mnemonic == "none":
|
||||
save_config(config_path, settings)
|
||||
logger.info("Done.")
|
||||
return 0
|
||||
|
||||
initialise_wallets(
|
||||
particl_wallet_mnemonic, with_coins, data_dir, settings, chain, use_tor_proxy
|
||||
particl_wallet_mnemonic,
|
||||
with_coins,
|
||||
data_dir,
|
||||
settings,
|
||||
chain,
|
||||
use_tor_proxy,
|
||||
extra_opts=extra_opts,
|
||||
)
|
||||
save_config(config_path, settings)
|
||||
print("Done.")
|
||||
|
||||
|
||||
|
||||
@@ -106,6 +106,10 @@ class BCHInterface(BTCInterface):
|
||||
) + self.make_int(u["amount"], r=1)
|
||||
return unspent_addr
|
||||
|
||||
def createWallet(self, wallet_name: str, password: str = ""):
|
||||
self.rpc("createwallet", [wallet_name, False])
|
||||
self.rpc_wallet("encryptwallet", [password])
|
||||
|
||||
# returns pkh
|
||||
def decodeAddress(self, address: str) -> bytes:
|
||||
return bytes(Address.from_string(address).payload)
|
||||
|
||||
@@ -10,6 +10,7 @@ import base64
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import traceback
|
||||
|
||||
from io import BytesIO
|
||||
@@ -377,7 +378,7 @@ class BTCInterface(Secp256k1Interface):
|
||||
last_block_header = prev_block_header
|
||||
raise ValueError(f"Block header not found at time: {time}")
|
||||
|
||||
def initialiseWallet(self, key_bytes: bytes) -> None:
|
||||
def initialiseWallet(self, key_bytes: bytes, restore_time: int = -1) -> None:
|
||||
assert len(key_bytes) == 32
|
||||
self._have_checked_seed = False
|
||||
if self._use_descriptors:
|
||||
@@ -387,6 +388,7 @@ class BTCInterface(Secp256k1Interface):
|
||||
ek_encoded: str = self.encode_secret_extkey(ek.encode_v())
|
||||
desc_external = descsum_create(f"wpkh({ek_encoded}/0h/0h/*h)")
|
||||
desc_internal = descsum_create(f"wpkh({ek_encoded}/0h/1h/*h)")
|
||||
|
||||
rv = self.rpc_wallet(
|
||||
"importdescriptors",
|
||||
[
|
||||
@@ -394,7 +396,7 @@ class BTCInterface(Secp256k1Interface):
|
||||
{"desc": desc_external, "timestamp": "now", "active": True},
|
||||
{
|
||||
"desc": desc_internal,
|
||||
"timestamp": "now",
|
||||
"timestamp": "now" if restore_time == -1 else restore_time,
|
||||
"active": True,
|
||||
"internal": True,
|
||||
},
|
||||
@@ -455,10 +457,6 @@ class BTCInterface(Secp256k1Interface):
|
||||
self.close_rpc(rpc_conn)
|
||||
raise ValueError(f"{self.coin_name()} wallet restore height not found.")
|
||||
|
||||
def getWalletSeedID(self) -> str:
|
||||
wi = self.rpc_wallet("getwalletinfo")
|
||||
return "Not found" if "hdseedid" not in wi else wi["hdseedid"]
|
||||
|
||||
def getActiveDescriptor(self):
|
||||
descriptors = self.rpc_wallet("listdescriptors")["descriptors"]
|
||||
for descriptor in descriptors:
|
||||
@@ -470,21 +468,24 @@ class BTCInterface(Secp256k1Interface):
|
||||
return descriptor
|
||||
return None
|
||||
|
||||
def checkExpectedSeed(self, expect_seedid: str) -> bool:
|
||||
def getWalletSeedID(self) -> str:
|
||||
if self._use_descriptors:
|
||||
descriptor = self.getActiveDescriptor()
|
||||
if descriptor is None:
|
||||
self._log.debug("Could not find active descriptor.")
|
||||
return False
|
||||
|
||||
return "Not found"
|
||||
end = descriptor["desc"].find("/")
|
||||
if end < 10:
|
||||
return False
|
||||
return "Not found"
|
||||
extkey = descriptor["desc"][5:end]
|
||||
extkey_data = b58decode(extkey)[4:-4]
|
||||
extkey_data_hash: bytes = hash160(extkey_data)
|
||||
return True if extkey_data_hash.hex() == expect_seedid else False
|
||||
return extkey_data_hash.hex()
|
||||
|
||||
wi = self.rpc_wallet("getwalletinfo")
|
||||
return "Not found" if "hdseedid" not in wi else wi["hdseedid"]
|
||||
|
||||
def checkExpectedSeed(self, expect_seedid: str) -> bool:
|
||||
wallet_seed_id = self.getWalletSeedID()
|
||||
self._expect_seedid_hex = expect_seedid
|
||||
self._have_checked_seed = True
|
||||
@@ -1978,12 +1979,79 @@ class BTCInterface(Secp256k1Interface):
|
||||
locked = encrypted and wallet_info["unlocked_until"] <= 0
|
||||
return encrypted, locked
|
||||
|
||||
def changeWalletPassword(self, old_password: str, new_password: str):
|
||||
def createWallet(self, wallet_name: str, password: str = ""):
|
||||
self.rpc("createwallet", [wallet_name, False, True, password, False, False])
|
||||
|
||||
def encryptWallet(self, password: str, check_seed: bool = True):
|
||||
# Watchonly wallets are not encrypted
|
||||
|
||||
seed_id_before: str = self.getWalletSeedID()
|
||||
|
||||
self.rpc_wallet("encryptwallet", [password])
|
||||
|
||||
if check_seed is False or seed_id_before == "Not found":
|
||||
return
|
||||
seed_id_after: str = self.getWalletSeedID()
|
||||
|
||||
if seed_id_before == seed_id_after:
|
||||
return
|
||||
self._log.warning(f"{self.ticker()} wallet seed changed after encryption.")
|
||||
self._log.debug(
|
||||
f"seed_id_before: {seed_id_before} seed_id_after: {seed_id_after}."
|
||||
)
|
||||
self.setWalletSeedWarning(True)
|
||||
# Workaround for https://github.com/bitcoin/bitcoin/issues/26607
|
||||
chain_client_settings = self._sc.getChainClientSettings(
|
||||
self.coin_type()
|
||||
) # basicswap.json
|
||||
|
||||
if chain_client_settings.get("manage_daemon", False) is False:
|
||||
self._log.warning(
|
||||
f"{self.ticker()} manage_daemon is false. Please manually stop BSX, remove the wallet, start BSX, and reseed."
|
||||
)
|
||||
return
|
||||
try:
|
||||
self.rpc_wallet("unloadwallet", [self._rpc_wallet])
|
||||
datadir = chain_client_settings["datadir"]
|
||||
if self._network != "mainnet":
|
||||
datadir = os.path.join(datadir, self._network)
|
||||
|
||||
try_wallet_path = os.path.join(datadir, self._rpc_wallet)
|
||||
if os.path.exists(try_wallet_path):
|
||||
new_wallet_path = os.path.join(datadir, self._rpc_wallet + ".old")
|
||||
os.rename(try_wallet_path, new_wallet_path)
|
||||
else:
|
||||
try_wallet_path = os.path.join(datadir, "wallets", self._rpc_wallet)
|
||||
if os.path.exists(try_wallet_path):
|
||||
new_wallet_path = os.path.join(
|
||||
datadir, "wallets", self._rpc_wallet + ".old"
|
||||
)
|
||||
os.rename(try_wallet_path, new_wallet_path)
|
||||
else:
|
||||
raise ValueError("Can't find old wallet path.")
|
||||
|
||||
self.createWallet(self._rpc_wallet, password=password)
|
||||
self._sc.ci(Coins.PART).unlockWallet(password, check_seed=False)
|
||||
self.unlockWallet(password, check_seed=False)
|
||||
restore_time = chain_client_settings.get("restore_time", 0)
|
||||
self._sc.initialiseWallet(self.coin_type(), True, restore_time=restore_time)
|
||||
self._sc.checkWalletSeed(self.coin_type())
|
||||
|
||||
except Exception as e:
|
||||
self._log.error(f"{self.ticker()} recreating wallet failed: {e}.")
|
||||
if self._sc.debug:
|
||||
self._log.error(traceback.format_exc())
|
||||
finally:
|
||||
self._sc.ci(Coins.PART).lockWallet()
|
||||
|
||||
def changeWalletPassword(
|
||||
self, old_password: str, new_password: str, check_seed_if_encrypt: bool = True
|
||||
):
|
||||
self._log.info("changeWalletPassword - {}".format(self.ticker()))
|
||||
if old_password == "":
|
||||
if self.isWalletEncrypted():
|
||||
raise ValueError("Old password must be set")
|
||||
return self.rpc_wallet("encryptwallet", [new_password])
|
||||
return self.encryptWallet(new_password, check_seed=check_seed_if_encrypt)
|
||||
self.rpc_wallet("walletpassphrasechange", [old_password, new_password])
|
||||
|
||||
def unlockWallet(self, password: str, check_seed: bool = True) -> None:
|
||||
@@ -2001,9 +2069,9 @@ class BTCInterface(Secp256k1Interface):
|
||||
)
|
||||
# wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors
|
||||
self.rpc(
|
||||
"createwallet", [self._rpc_wallet, False, True, "", False, False]
|
||||
"createwallet",
|
||||
[self._rpc_wallet, False, True, password, False, False],
|
||||
)
|
||||
self.rpc_wallet("encryptwallet", [password])
|
||||
|
||||
# Max timeout value, ~3 years
|
||||
self.rpc_wallet("walletpassphrase", [password, 100000000])
|
||||
|
||||
@@ -47,7 +47,7 @@ class DASHInterface(BTCInterface):
|
||||
def entropyToMnemonic(self, key: bytes) -> None:
|
||||
return Mnemonic("english").to_mnemonic(key)
|
||||
|
||||
def initialiseWallet(self, key_bytes: bytes) -> None:
|
||||
def initialiseWallet(self, key_bytes: bytes, restore_time: int = -1) -> None:
|
||||
self._have_checked_seed = False
|
||||
if self._wallet_v20_compatible:
|
||||
self._log.warning("Generating wallet compatible with v20 seed.")
|
||||
@@ -66,7 +66,11 @@ class DASHInterface(BTCInterface):
|
||||
|
||||
def checkExpectedSeed(self, expect_seedid: str) -> bool:
|
||||
self._expect_seedid_hex = expect_seedid
|
||||
rv = self.rpc_wallet("dumphdinfo")
|
||||
try:
|
||||
rv = self.rpc_wallet("dumphdinfo")
|
||||
except Exception as e:
|
||||
self._log.debug(f"DASH dumphdinfo failed {e}.")
|
||||
return False
|
||||
if rv["mnemonic"] != "":
|
||||
entropy = Mnemonic("english").to_entropy(rv["mnemonic"].split(" "))
|
||||
entropy_hash = self.getAddressHashFromKey(entropy)[::-1].hex()
|
||||
@@ -120,3 +124,36 @@ class DASHInterface(BTCInterface):
|
||||
def lockWallet(self):
|
||||
super().lockWallet()
|
||||
self._wallet_passphrase = ""
|
||||
|
||||
def encryptWallet(
|
||||
self, old_password: str, new_password: str, check_seed: bool = True
|
||||
):
|
||||
if old_password != "":
|
||||
self.unlockWallet(old_password, check_seed=False)
|
||||
seed_id_before: str = self.getWalletSeedID()
|
||||
|
||||
self.rpc_wallet("encryptwallet", [new_password])
|
||||
|
||||
if check_seed is False or seed_id_before == "Not found":
|
||||
return
|
||||
self.unlockWallet(new_password, check_seed=False)
|
||||
seed_id_after: str = self.getWalletSeedID()
|
||||
|
||||
self.lockWallet()
|
||||
if seed_id_before == seed_id_after:
|
||||
return
|
||||
self._log.warning(f"{self.ticker()} wallet seed changed after encryption.")
|
||||
self._log.debug(
|
||||
f"seed_id_before: {seed_id_before} seed_id_after: {seed_id_after}."
|
||||
)
|
||||
self.setWalletSeedWarning(True)
|
||||
|
||||
def changeWalletPassword(
|
||||
self, old_password: str, new_password: str, check_seed_if_encrypt: bool = True
|
||||
):
|
||||
self._log.info("changeWalletPassword - {}".format(self.ticker()))
|
||||
if old_password == "":
|
||||
if self.isWalletEncrypted():
|
||||
raise ValueError("Old password must be set")
|
||||
return self.encryptWallet(old_password, new_password, check_seed_if_encrypt)
|
||||
self.rpc_wallet("walletpassphrasechange", [old_password, new_password])
|
||||
|
||||
@@ -332,14 +332,14 @@ class DCRInterface(Secp256k1Interface):
|
||||
|
||||
def testDaemonRPC(self, with_wallet=True) -> None:
|
||||
if with_wallet:
|
||||
self.rpc_wallet("getinfo")
|
||||
self.rpc_wallet("walletislocked")
|
||||
else:
|
||||
self.rpc("getblockchaininfo")
|
||||
|
||||
def getChainHeight(self) -> int:
|
||||
return self.rpc("getblockcount")
|
||||
|
||||
def initialiseWallet(self, key: bytes) -> None:
|
||||
def initialiseWallet(self, key: bytes, restore_time: int = -1) -> None:
|
||||
# Load with --create
|
||||
pass
|
||||
|
||||
@@ -354,7 +354,9 @@ class DCRInterface(Secp256k1Interface):
|
||||
walletislocked = self.rpc_wallet("walletislocked")
|
||||
return True, walletislocked
|
||||
|
||||
def changeWalletPassword(self, old_password: str, new_password: str):
|
||||
def changeWalletPassword(
|
||||
self, old_password: str, new_password: str, check_seed_if_encrypt: bool = True
|
||||
):
|
||||
self._log.info("changeWalletPassword - {}".format(self.ticker()))
|
||||
if old_password == "":
|
||||
# Read initial pwd from settings
|
||||
|
||||
@@ -51,7 +51,7 @@ class FIROInterface(BTCInterface):
|
||||
def getExchangeName(self, exchange_name: str) -> str:
|
||||
return "zcoin"
|
||||
|
||||
def initialiseWallet(self, key):
|
||||
def initialiseWallet(self, key, restore_time: int = -1):
|
||||
# load with -hdseed= parameter
|
||||
pass
|
||||
|
||||
|
||||
@@ -87,7 +87,7 @@ class NAVInterface(BTCInterface):
|
||||
# p2sh-p2wsh
|
||||
return True
|
||||
|
||||
def initialiseWallet(self, key):
|
||||
def initialiseWallet(self, key, restore_time: int = -1):
|
||||
# Load with -importmnemonic= parameter
|
||||
pass
|
||||
|
||||
|
||||
@@ -110,7 +110,7 @@ class PARTInterface(BTCInterface):
|
||||
)
|
||||
return index_info["spentindex"]
|
||||
|
||||
def initialiseWallet(self, key: bytes) -> None:
|
||||
def initialiseWallet(self, key: bytes, restore_time: int = -1) -> None:
|
||||
raise ValueError("TODO")
|
||||
|
||||
def withdrawCoin(self, value, addr_to, subfee):
|
||||
|
||||
@@ -34,6 +34,35 @@ class PIVXInterface(BTCInterface):
|
||||
self._rpcport, self._rpcauth, host=self._rpc_host
|
||||
)
|
||||
|
||||
def encryptWallet(self, password: str, check_seed: bool = True):
|
||||
# Watchonly wallets are not encrypted
|
||||
|
||||
seed_id_before: str = self.getWalletSeedID()
|
||||
|
||||
self.rpc_wallet("encryptwallet", [password])
|
||||
|
||||
if check_seed is False or seed_id_before == "Not found":
|
||||
return
|
||||
seed_id_after: str = self.getWalletSeedID()
|
||||
|
||||
if seed_id_before == seed_id_after:
|
||||
return
|
||||
self._log.warning(f"{self.ticker()} wallet seed changed after encryption.")
|
||||
self._log.debug(
|
||||
f"seed_id_before: {seed_id_before} seed_id_after: {seed_id_after}."
|
||||
)
|
||||
self.setWalletSeedWarning(True)
|
||||
# Workaround for https://github.com/bitcoin/bitcoin/issues/26607
|
||||
chain_client_settings = self._sc.getChainClientSettings(
|
||||
self.coin_type()
|
||||
) # basicswap.json
|
||||
|
||||
if chain_client_settings.get("manage_daemon", False) is False:
|
||||
self._log.warning(
|
||||
f"{self.ticker()} manage_daemon is false. Please manually stop BSX, remove the wallet, start BSX, and reseed."
|
||||
)
|
||||
return
|
||||
|
||||
def signTxWithWallet(self, tx):
|
||||
rv = self.rpc("signrawtransaction", [tx.hex()])
|
||||
return bytes.fromhex(rv["hex"])
|
||||
|
||||
@@ -766,7 +766,9 @@ class XMRInterface(CoinInterface):
|
||||
balance_info = self.rpc_wallet("get_balance")
|
||||
return balance_info["unlocked_balance"]
|
||||
|
||||
def changeWalletPassword(self, old_password, new_password):
|
||||
def changeWalletPassword(
|
||||
self, old_password, new_password, check_seed_if_encrypt: bool = True
|
||||
):
|
||||
self._log.info("changeWalletPassword - {}".format(self.ticker()))
|
||||
orig_password = self._wallet_password
|
||||
if old_password != "":
|
||||
|
||||
@@ -850,7 +850,12 @@ def js_getcoinseed(self, url_split, post_string, is_json) -> bytes:
|
||||
swap_client.checkSystemStatus()
|
||||
post_data = getFormData(post_string, is_json)
|
||||
|
||||
coin = getCoinType(get_data_entry(post_data, "coin"))
|
||||
coin_in = get_data_entry(post_data, "coin")
|
||||
try:
|
||||
coin = getCoinIdFromName(coin_in)
|
||||
except Exception:
|
||||
coin = getCoinType(coin_in)
|
||||
|
||||
if coin in (Coins.PART, Coins.PART_ANON, Coins.PART_BLIND):
|
||||
raise ValueError("Particl wallet seed is set from the Basicswap mnemonic.")
|
||||
|
||||
@@ -878,12 +883,17 @@ def js_getcoinseed(self, url_split, post_string, is_json) -> bytes:
|
||||
expect_seedid = swap_client.getStringKV(
|
||||
"main_wallet_seedid_" + ci.coin_name().lower()
|
||||
)
|
||||
try:
|
||||
wallet_seed_id = ci.getWalletSeedID()
|
||||
except Exception as e:
|
||||
wallet_seed_id = f"Error: {e}"
|
||||
|
||||
rv.update(
|
||||
{
|
||||
"seed": seed_key.hex(),
|
||||
"seed_id": seed_id.hex(),
|
||||
"expected_seed_id": "Unset" if expect_seedid is None else expect_seedid,
|
||||
"current_seed_id": wallet_seed_id,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
663
tests/basicswap/extended/test_wallet_encryption.py
Normal file
663
tests/basicswap/extended/test_wallet_encryption.py
Normal file
@@ -0,0 +1,663 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (c) 2025 The Basicswap developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import unittest
|
||||
|
||||
from unittest.mock import patch
|
||||
from basicswap.rpc import escape_rpcauth, make_rpc_func
|
||||
from basicswap.interface.dcr.rpc import make_rpc_func as make_dcr_rpc_func
|
||||
from tests.basicswap.util import (
|
||||
read_json_api,
|
||||
waitForServer,
|
||||
)
|
||||
|
||||
bin_path = os.path.expanduser(os.getenv("TEST_BIN_PATH", ""))
|
||||
test_base_path = os.path.expanduser(os.getenv("TEST_PATH", "~/test_basicswap"))
|
||||
|
||||
delay_event = threading.Event()
|
||||
logger = logging.getLogger()
|
||||
logger.level = logging.DEBUG
|
||||
logger.addHandler(logging.StreamHandler(sys.stdout))
|
||||
|
||||
|
||||
def start_prepare(args, datadir=None, env_pairs=[]):
|
||||
for pair in env_pairs:
|
||||
os.environ[pair[0]] = pair[1]
|
||||
print(pair[0], os.environ[pair[0]])
|
||||
if datadir:
|
||||
sys.stdout = open(os.path.join(datadir, "prepare.stdout"), "w")
|
||||
sys.stderr = open(os.path.join(datadir, "prepare.stderr"), "w")
|
||||
import basicswap.bin.prepare as prepareSystemThread
|
||||
|
||||
with patch.object(sys, "argv", args):
|
||||
prepareSystemThread.main()
|
||||
del prepareSystemThread
|
||||
|
||||
|
||||
def start_run(args, datadir=None, env_pairs=[]):
|
||||
for pair in env_pairs:
|
||||
os.environ[pair[0]] = pair[1]
|
||||
print(pair[0], os.environ[pair[0]])
|
||||
if datadir:
|
||||
sys.stdout = open(os.path.join(datadir, "run.stdout"), "w")
|
||||
sys.stderr = open(os.path.join(datadir, "run.stderr"), "w")
|
||||
import basicswap.bin.run as runSystemThread
|
||||
|
||||
with patch.object(sys, "argv", args):
|
||||
runSystemThread.main()
|
||||
del runSystemThread
|
||||
|
||||
|
||||
def callcoincli(binpath, datadir, params, wallet=None, timeout=None):
|
||||
args = [binpath, "-regtest", "-datadir=" + datadir]
|
||||
if wallet:
|
||||
args.append("-rpcwallet=" + wallet)
|
||||
args += shlex.split(params)
|
||||
p = subprocess.Popen(
|
||||
args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
)
|
||||
out = p.communicate(timeout=timeout)
|
||||
if len(out[1]) > 0:
|
||||
raise ValueError("CLI error " + str(out[1]))
|
||||
return out[0].decode("utf-8").strip()
|
||||
|
||||
|
||||
class Test(unittest.TestCase):
|
||||
|
||||
test_coins = [
|
||||
"particl",
|
||||
"bitcoin",
|
||||
"litecoin",
|
||||
"decred",
|
||||
"namecoin",
|
||||
"monero",
|
||||
"wownero",
|
||||
"pivx",
|
||||
"dash",
|
||||
"firo",
|
||||
"bitcoincash",
|
||||
"dogecoin",
|
||||
]
|
||||
|
||||
def test_coins_list(self):
|
||||
test_path = os.path.join(test_base_path, "coins_list")
|
||||
if os.path.exists(test_path):
|
||||
shutil.rmtree(test_path)
|
||||
os.makedirs(test_path)
|
||||
testargs = (
|
||||
"basicswap-prepare",
|
||||
"-help",
|
||||
)
|
||||
process = multiprocessing.Process(
|
||||
target=start_prepare, args=(testargs, test_path)
|
||||
)
|
||||
process.start()
|
||||
process.join()
|
||||
|
||||
with open(os.path.join(test_path, "prepare.stdout"), "r") as fp:
|
||||
output = fp.read()
|
||||
|
||||
known_coins_line = None
|
||||
for line in output.split("\n"):
|
||||
if line.startswith("Known coins: "):
|
||||
known_coins_line = line[13:]
|
||||
assert known_coins_line
|
||||
known_coins = known_coins_line.split(", ")
|
||||
|
||||
for known_coin in known_coins:
|
||||
if known_coin not in self.test_coins:
|
||||
raise ValueError(f"Not testing: {known_coin}")
|
||||
for test_coin in self.test_coins:
|
||||
if test_coin not in known_coins:
|
||||
raise ValueError(f"Unknown coin: {test_coin}")
|
||||
|
||||
def test_with_encrypt(self):
|
||||
test_path = os.path.join(test_base_path, "with_encrypt")
|
||||
if os.path.exists(test_path):
|
||||
shutil.rmtree(test_path)
|
||||
os.makedirs(test_path)
|
||||
if bin_path != "":
|
||||
os.symlink(bin_path, os.path.join(test_path, "bin"))
|
||||
|
||||
env_vars = [
|
||||
("WALLET_ENCRYPTION_PWD", "test.123"),
|
||||
]
|
||||
testargs = [
|
||||
"basicswap-prepare",
|
||||
"-regtest=1",
|
||||
"-datadir=" + test_path,
|
||||
"-withcoin=" + ",".join(self.test_coins),
|
||||
]
|
||||
process = multiprocessing.Process(
|
||||
target=start_prepare, args=(testargs, test_path, env_vars)
|
||||
)
|
||||
process.start()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
with open(os.path.join(test_path, "prepare.stdout"), "r") as fp:
|
||||
output = fp.read()
|
||||
|
||||
note_lines = []
|
||||
warning_lines = []
|
||||
for line in output.split("\n"):
|
||||
print("line", line)
|
||||
if line.startswith("NOTE -"):
|
||||
note_lines.append(line)
|
||||
if line.startswith("WARNING -"):
|
||||
warning_lines.append(line)
|
||||
|
||||
assert len(warning_lines) == 1
|
||||
assert any(
|
||||
"WARNING - dcrwallet requires the password to be entered at the first startup when encrypted."
|
||||
in x
|
||||
for x in warning_lines
|
||||
)
|
||||
|
||||
assert len(note_lines) == 2
|
||||
assert any("Unable to initialise wallet for PIVX." in x for x in note_lines)
|
||||
assert any(
|
||||
"Unable to initialise wallet for Bitcoin Cash." in x for x in note_lines
|
||||
)
|
||||
|
||||
dcr_rpcport = None
|
||||
dcr_rpcuser = None
|
||||
dcr_rpcpass = None
|
||||
bch_rpcport = None
|
||||
pivx_rpcport = None
|
||||
# Make (regtest) ports unique
|
||||
settings_path = os.path.join(test_path, "basicswap.json")
|
||||
with open(settings_path) as fs:
|
||||
settings = json.load(fs)
|
||||
settings["chainclients"]["dogecoin"]["port"] = 12444
|
||||
dcr_rpcuser = settings["chainclients"]["decred"]["rpcuser"]
|
||||
dcr_rpcpass = settings["chainclients"]["decred"]["rpcpassword"]
|
||||
dcr_rpcport = settings["chainclients"]["decred"]["rpcport"]
|
||||
bch_rpcport = settings["chainclients"]["bitcoincash"]["rpcport"]
|
||||
pivx_rpcport = settings["chainclients"]["pivx"]["rpcport"]
|
||||
with open(settings_path, "w") as fp:
|
||||
json.dump(settings, fp, indent=4)
|
||||
|
||||
dcr_conf_path = os.path.join(test_path, "decred", "dcrd.conf")
|
||||
with open(dcr_conf_path, "a") as fp:
|
||||
fp.write("miningaddr=SsjkQJHak5pRVUdUzqyFHKnojCVRZMU24w6\n")
|
||||
|
||||
testargs = [
|
||||
"basicswap-run",
|
||||
"-regtest=1",
|
||||
"-datadir=" + test_path,
|
||||
"--startonlycoin=decred",
|
||||
]
|
||||
process = multiprocessing.Process(
|
||||
target=start_run, args=(testargs, test_path, env_vars)
|
||||
)
|
||||
process.start()
|
||||
try:
|
||||
auth = f"{dcr_rpcuser}:{dcr_rpcpass}"
|
||||
dcr_rpc = make_dcr_rpc_func(dcr_rpcport, auth)
|
||||
for i in range(10):
|
||||
try:
|
||||
rv = dcr_rpc("generate", [110])
|
||||
break
|
||||
except Exception as e: # noqa: F841
|
||||
delay_event.wait(1.0)
|
||||
|
||||
finally:
|
||||
process.terminate()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
testargs = ["basicswap-run", "-regtest=1", "-datadir=" + test_path]
|
||||
process = multiprocessing.Process(target=start_run, args=(testargs, test_path))
|
||||
process.start()
|
||||
try:
|
||||
waitForServer(delay_event, 12700, wait_for=40)
|
||||
logging.info("Unlocking")
|
||||
rv = read_json_api(12700, "unlock", {"password": "test.123"})
|
||||
assert "success" in rv
|
||||
|
||||
for coin in self.test_coins:
|
||||
if coin == "particl":
|
||||
continue
|
||||
rv = read_json_api(12700, "getcoinseed", {"coin": coin})
|
||||
if coin in ("monero", "wownero"):
|
||||
assert rv["address"] == rv["expected_address"]
|
||||
elif coin in ("bitcoincash", "pivx"):
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
# Reseed required
|
||||
assert rv["seed_id"] != rv["current_seed_id"]
|
||||
else:
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
assert rv["seed_id"] == rv["current_seed_id"]
|
||||
|
||||
authcookiepath = os.path.join(
|
||||
test_path, "bitcoincash", "regtest", ".cookie"
|
||||
)
|
||||
with open(authcookiepath, "rb") as fp:
|
||||
bch_rpcauth = escape_rpcauth(fp.read().decode("utf-8"))
|
||||
|
||||
bch_rpc = make_rpc_func(bch_rpcport, bch_rpcauth)
|
||||
|
||||
bch_addr = bch_rpc("getnewaddress")
|
||||
rv = bch_rpc("generatetoaddress", [1, bch_addr])
|
||||
rv = read_json_api(12700, "wallets/bch/reseed")
|
||||
assert rv["reseeded"] is True
|
||||
|
||||
authcookiepath = os.path.join(test_path, "pivx", "regtest", ".cookie")
|
||||
with open(authcookiepath, "rb") as fp:
|
||||
pivx_rpcauth = escape_rpcauth(fp.read().decode("utf-8"))
|
||||
|
||||
pivx_rpc = make_rpc_func(pivx_rpcport, pivx_rpcauth)
|
||||
|
||||
pivx_addr = pivx_rpc("getnewaddress")
|
||||
rv = pivx_rpc("generatetoaddress", [1, pivx_addr])
|
||||
rv = read_json_api(12700, "wallets/pivx/reseed")
|
||||
assert rv["reseeded"] is True
|
||||
|
||||
for coin in self.test_coins:
|
||||
if coin == "particl":
|
||||
continue
|
||||
rv = read_json_api(12700, "getcoinseed", {"coin": coin})
|
||||
if coin in ("monero", "wownero"):
|
||||
assert rv["address"] == rv["expected_address"]
|
||||
else:
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
assert rv["seed_id"] == rv["current_seed_id"]
|
||||
|
||||
ltc_cli_path = os.path.join(test_path, "bin", "litecoin", "litecoin-cli")
|
||||
ltc_datadir = os.path.join(test_path, "litecoin")
|
||||
rv = json.loads(
|
||||
callcoincli(
|
||||
ltc_cli_path, ltc_datadir, "getwalletinfo", wallet="wallet.dat"
|
||||
)
|
||||
)
|
||||
assert "unlocked_until" in rv
|
||||
rv = json.loads(
|
||||
callcoincli(ltc_cli_path, ltc_datadir, "getwalletinfo", wallet="mweb")
|
||||
)
|
||||
assert "unlocked_until" in rv
|
||||
finally:
|
||||
process.terminate()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
def test_with_encrypt_addcoin(self):
|
||||
test_path = os.path.join(test_base_path, "encrypt_addcoin")
|
||||
if os.path.exists(test_path):
|
||||
shutil.rmtree(test_path)
|
||||
os.makedirs(test_path)
|
||||
if bin_path != "":
|
||||
os.symlink(bin_path, os.path.join(test_path, "bin"))
|
||||
|
||||
env_vars = [
|
||||
("WALLET_ENCRYPTION_PWD", "test.123"),
|
||||
]
|
||||
testargs = [
|
||||
"basicswap-prepare",
|
||||
"-regtest=1",
|
||||
"-datadir=" + test_path,
|
||||
]
|
||||
process = multiprocessing.Process(
|
||||
target=start_prepare, args=(testargs, test_path, env_vars)
|
||||
)
|
||||
process.start()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
for coin in self.test_coins:
|
||||
if coin == "particl":
|
||||
continue
|
||||
testargs = [
|
||||
"basicswap-prepare",
|
||||
"-regtest=1",
|
||||
"-datadir=" + test_path,
|
||||
"-addcoin=" + coin,
|
||||
]
|
||||
process = multiprocessing.Process(
|
||||
target=start_prepare, args=(testargs, test_path, env_vars)
|
||||
)
|
||||
process.start()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
dcr_rpcport = None
|
||||
dcr_rpcuser = None
|
||||
dcr_rpcpass = None
|
||||
bch_rpcport = None
|
||||
pivx_rpcport = None
|
||||
# Make (regtest) ports unique
|
||||
settings_path = os.path.join(test_path, "basicswap.json")
|
||||
with open(settings_path) as fs:
|
||||
settings = json.load(fs)
|
||||
settings["chainclients"]["dogecoin"]["port"] = 12444
|
||||
dcr_rpcuser = settings["chainclients"]["decred"]["rpcuser"]
|
||||
dcr_rpcpass = settings["chainclients"]["decred"]["rpcpassword"]
|
||||
dcr_rpcport = settings["chainclients"]["decred"]["rpcport"]
|
||||
bch_rpcport = settings["chainclients"]["bitcoincash"]["rpcport"]
|
||||
pivx_rpcport = settings["chainclients"]["pivx"]["rpcport"]
|
||||
with open(settings_path, "w") as fp:
|
||||
json.dump(settings, fp, indent=4)
|
||||
|
||||
dcr_conf_path = os.path.join(test_path, "decred", "dcrd.conf")
|
||||
with open(dcr_conf_path, "a") as fp:
|
||||
fp.write("miningaddr=SsjkQJHak5pRVUdUzqyFHKnojCVRZMU24w6\n")
|
||||
|
||||
testargs = [
|
||||
"basicswap-run",
|
||||
"-regtest=1",
|
||||
"-datadir=" + test_path,
|
||||
"--startonlycoin=decred",
|
||||
]
|
||||
process = multiprocessing.Process(
|
||||
target=start_run, args=(testargs, test_path, env_vars)
|
||||
)
|
||||
process.start()
|
||||
try:
|
||||
auth = f"{dcr_rpcuser}:{dcr_rpcpass}"
|
||||
dcr_rpc = make_dcr_rpc_func(dcr_rpcport, auth)
|
||||
for i in range(10):
|
||||
try:
|
||||
rv = dcr_rpc("generate", [110])
|
||||
break
|
||||
except Exception as e: # noqa: F841
|
||||
delay_event.wait(1.0)
|
||||
finally:
|
||||
process.terminate()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
testargs = ["basicswap-run", "-regtest=1", "-datadir=" + test_path]
|
||||
process = multiprocessing.Process(target=start_run, args=(testargs, test_path))
|
||||
process.start()
|
||||
try:
|
||||
waitForServer(delay_event, 12700, wait_for=40)
|
||||
logging.info("Unlocking")
|
||||
rv = read_json_api(12700, "unlock", {"password": "test.123"})
|
||||
assert "success" in rv
|
||||
|
||||
for coin in self.test_coins:
|
||||
if coin == "particl":
|
||||
continue
|
||||
rv = read_json_api(12700, "getcoinseed", {"coin": coin})
|
||||
if coin in ("monero", "wownero"):
|
||||
assert rv["address"] == rv["expected_address"]
|
||||
elif coin in ("bitcoincash", "pivx"):
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
# Reseed required
|
||||
assert rv["seed_id"] != rv["current_seed_id"]
|
||||
else:
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
assert rv["seed_id"] == rv["current_seed_id"]
|
||||
|
||||
authcookiepath = os.path.join(
|
||||
test_path, "bitcoincash", "regtest", ".cookie"
|
||||
)
|
||||
with open(authcookiepath, "rb") as fp:
|
||||
bch_rpcauth = escape_rpcauth(fp.read().decode("utf-8"))
|
||||
|
||||
bch_rpc = make_rpc_func(bch_rpcport, bch_rpcauth)
|
||||
|
||||
logging.info("Reseeding BCH")
|
||||
bch_addr = bch_rpc("getnewaddress")
|
||||
rv = bch_rpc("generatetoaddress", [1, bch_addr])
|
||||
rv = read_json_api(12700, "wallets/bch/reseed")
|
||||
assert rv["reseeded"] is True
|
||||
|
||||
logging.info("Reseeding PIVX")
|
||||
authcookiepath = os.path.join(test_path, "pivx", "regtest", ".cookie")
|
||||
with open(authcookiepath, "rb") as fp:
|
||||
pivx_rpcauth = escape_rpcauth(fp.read().decode("utf-8"))
|
||||
|
||||
pivx_rpc = make_rpc_func(pivx_rpcport, pivx_rpcauth)
|
||||
|
||||
pivx_addr = pivx_rpc("getnewaddress")
|
||||
rv = pivx_rpc("generatetoaddress", [1, pivx_addr])
|
||||
rv = read_json_api(12700, "wallets/pivx/reseed")
|
||||
assert rv["reseeded"] is True
|
||||
|
||||
for coin in self.test_coins:
|
||||
if coin == "particl":
|
||||
continue
|
||||
rv = read_json_api(12700, "getcoinseed", {"coin": coin})
|
||||
if coin in ("monero", "wownero"):
|
||||
assert rv["address"] == rv["expected_address"]
|
||||
else:
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
assert rv["seed_id"] == rv["current_seed_id"]
|
||||
|
||||
ltc_cli_path = os.path.join(test_path, "bin", "litecoin", "litecoin-cli")
|
||||
ltc_datadir = os.path.join(test_path, "litecoin")
|
||||
rv = json.loads(
|
||||
callcoincli(
|
||||
ltc_cli_path, ltc_datadir, "getwalletinfo", wallet="wallet.dat"
|
||||
)
|
||||
)
|
||||
assert "unlocked_until" in rv
|
||||
rv = json.loads(
|
||||
callcoincli(ltc_cli_path, ltc_datadir, "getwalletinfo", wallet="mweb")
|
||||
)
|
||||
assert "unlocked_until" in rv
|
||||
finally:
|
||||
process.terminate()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
# Check that BSX starts up again
|
||||
testargs = ["basicswap-run", "-regtest=1", "-datadir=" + test_path]
|
||||
process = multiprocessing.Process(target=start_run, args=(testargs, test_path))
|
||||
process.start()
|
||||
try:
|
||||
waitForServer(delay_event, 12700, wait_for=40)
|
||||
|
||||
finally:
|
||||
process.terminate()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
def test_encrypt_after(self):
|
||||
test_path = os.path.join(test_base_path, "encrypt_after")
|
||||
if os.path.exists(test_path):
|
||||
shutil.rmtree(test_path)
|
||||
os.makedirs(test_path)
|
||||
if bin_path != "":
|
||||
os.symlink(bin_path, os.path.join(test_path, "bin"))
|
||||
|
||||
testargs = [
|
||||
"basicswap-prepare",
|
||||
"-regtest=1",
|
||||
"-datadir=" + test_path,
|
||||
"-withcoin=" + ",".join(self.test_coins),
|
||||
]
|
||||
process = multiprocessing.Process(
|
||||
target=start_prepare, args=(testargs, test_path)
|
||||
)
|
||||
process.start()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
dcr_rpcport = None
|
||||
dcr_rpcuser = None
|
||||
dcr_rpcpass = None
|
||||
bch_rpcport = None
|
||||
pivx_rpcport = None
|
||||
# Make (regtest) ports unique
|
||||
settings_path = os.path.join(test_path, "basicswap.json")
|
||||
with open(settings_path) as fs:
|
||||
settings = json.load(fs)
|
||||
settings["chainclients"]["dogecoin"]["port"] = 12444
|
||||
dcr_rpcuser = settings["chainclients"]["decred"]["rpcuser"]
|
||||
dcr_rpcpass = settings["chainclients"]["decred"]["rpcpassword"]
|
||||
dcr_rpcport = settings["chainclients"]["decred"]["rpcport"]
|
||||
bch_rpcport = settings["chainclients"]["bitcoincash"]["rpcport"]
|
||||
pivx_rpcport = settings["chainclients"]["pivx"]["rpcport"]
|
||||
|
||||
with open(settings_path, "w") as fp:
|
||||
json.dump(settings, fp, indent=4)
|
||||
|
||||
dcr_conf_path = os.path.join(test_path, "decred", "dcrd.conf")
|
||||
with open(dcr_conf_path, "a") as fp:
|
||||
fp.write("miningaddr=SsjkQJHak5pRVUdUzqyFHKnojCVRZMU24w6\n")
|
||||
|
||||
testargs = ["basicswap-run", "-regtest=1", "-datadir=" + test_path]
|
||||
process = multiprocessing.Process(target=start_run, args=(testargs, test_path))
|
||||
process.start()
|
||||
try:
|
||||
waitForServer(delay_event, 12700, wait_for=40)
|
||||
|
||||
for coin in self.test_coins:
|
||||
if coin == "particl":
|
||||
continue
|
||||
rv = read_json_api(12700, "getcoinseed", {"coin": coin})
|
||||
if coin in ("monero", "wownero"):
|
||||
assert rv["address"] == rv["expected_address"]
|
||||
elif coin in ("bitcoincash", "pivx"):
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
# Reseed required
|
||||
assert rv["seed_id"] != rv["current_seed_id"]
|
||||
else:
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
assert rv["seed_id"] == rv["current_seed_id"]
|
||||
|
||||
authcookiepath = os.path.join(
|
||||
test_path, "bitcoincash", "regtest", ".cookie"
|
||||
)
|
||||
with open(authcookiepath, "rb") as fp:
|
||||
bch_rpcauth = escape_rpcauth(fp.read().decode("utf-8"))
|
||||
|
||||
bch_rpc = make_rpc_func(bch_rpcport, bch_rpcauth)
|
||||
|
||||
bch_addr = bch_rpc("getnewaddress")
|
||||
rv = bch_rpc("generatetoaddress", [1, bch_addr])
|
||||
rv = read_json_api(12700, "wallets/bch/reseed")
|
||||
assert rv["reseeded"] is True
|
||||
|
||||
authcookiepath = os.path.join(test_path, "pivx", "regtest", ".cookie")
|
||||
with open(authcookiepath, "rb") as fp:
|
||||
pivx_rpcauth = escape_rpcauth(fp.read().decode("utf-8"))
|
||||
|
||||
pivx_rpc = make_rpc_func(pivx_rpcport, pivx_rpcauth)
|
||||
|
||||
pivx_addr = pivx_rpc("getnewaddress")
|
||||
rv = pivx_rpc("generatetoaddress", [1, pivx_addr])
|
||||
rv = read_json_api(12700, "wallets/pivx/reseed")
|
||||
assert rv["reseeded"] is True
|
||||
|
||||
for coin in self.test_coins:
|
||||
if coin == "particl":
|
||||
continue
|
||||
rv = read_json_api(12700, "getcoinseed", {"coin": coin})
|
||||
if coin in ("monero", "wownero"):
|
||||
assert rv["address"] == rv["expected_address"]
|
||||
else:
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
assert rv["seed_id"] == rv["current_seed_id"]
|
||||
|
||||
ltc_cli_path = os.path.join(test_path, "bin", "litecoin", "litecoin-cli")
|
||||
ltc_datadir = os.path.join(test_path, "litecoin")
|
||||
rv = json.loads(
|
||||
callcoincli(ltc_cli_path, ltc_datadir, "getwalletinfo", wallet="mweb")
|
||||
)
|
||||
ltc_mweb_seed_before: str = rv["hdseedid"]
|
||||
assert "unlocked_until" not in rv
|
||||
|
||||
# Get Decred out of IBD, else first start after encryption will lock up with "Since this is your first time running we need to sync accounts."
|
||||
auth = f"{dcr_rpcuser}:{dcr_rpcpass}"
|
||||
dcr_rpc = make_dcr_rpc_func(dcr_rpcport, auth)
|
||||
for i in range(10):
|
||||
try:
|
||||
rv = dcr_rpc("generate", [110])
|
||||
break
|
||||
except Exception as e: # noqa: F841
|
||||
delay_event.wait(1.0)
|
||||
|
||||
logging.info("setpassword (encrypt wallets)")
|
||||
rv = read_json_api(
|
||||
12700, "setpassword", {"oldpassword": "", "newpassword": "test.123"}
|
||||
)
|
||||
assert "success" in rv
|
||||
|
||||
logging.info("Unlocking")
|
||||
rv = read_json_api(12700, "unlock", {"password": "test.123"})
|
||||
assert "success" in rv
|
||||
|
||||
for coin in self.test_coins:
|
||||
if coin == "particl":
|
||||
continue
|
||||
if coin == "firo":
|
||||
# firo core shuts down after encryptwallet
|
||||
continue
|
||||
rv = read_json_api(12700, "getcoinseed", {"coin": coin})
|
||||
if coin in ("monero", "wownero"):
|
||||
assert rv["address"] == rv["expected_address"]
|
||||
elif coin in ("pivx"):
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
assert rv["seed_id"] != rv["current_seed_id"]
|
||||
else:
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
assert rv["seed_id"] == rv["current_seed_id"]
|
||||
|
||||
# pivx seed has changed
|
||||
rv = read_json_api(12700, "wallets/pivx")
|
||||
assert rv["expected_seed"] is False
|
||||
|
||||
logging.info("Try to reseed pivx (and fail).")
|
||||
rv = read_json_api(12700, "wallets/pivx/reseed")
|
||||
assert "Already have this key" in rv["error"]
|
||||
|
||||
logging.info("Check both LTC wallets are encrypted and mweb seeds match.")
|
||||
rv = json.loads(
|
||||
callcoincli(
|
||||
ltc_cli_path, ltc_datadir, "getwalletinfo", wallet="wallet.dat"
|
||||
)
|
||||
)
|
||||
assert "unlocked_until" in rv
|
||||
rv = json.loads(
|
||||
callcoincli(ltc_cli_path, ltc_datadir, "getwalletinfo", wallet="mweb")
|
||||
)
|
||||
assert "unlocked_until" in rv
|
||||
assert ltc_mweb_seed_before == rv["hdseedid"]
|
||||
|
||||
finally:
|
||||
process.terminate()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
logging.info("Starting BSX to check Firo")
|
||||
testargs = ["basicswap-run", "-regtest=1", "-datadir=" + test_path]
|
||||
process = multiprocessing.Process(target=start_run, args=(testargs, test_path))
|
||||
process.start()
|
||||
try:
|
||||
waitForServer(delay_event, 12700, wait_for=40)
|
||||
logging.info("Unlocking")
|
||||
rv = read_json_api(12700, "unlock", {"password": "test.123"})
|
||||
assert "success" in rv
|
||||
|
||||
rv = read_json_api(12700, "getcoinseed", {"coin": "firo"})
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
assert rv["seed_id"] == rv["current_seed_id"]
|
||||
|
||||
rv = read_json_api(12700, "getcoinseed", {"coin": "dcr"})
|
||||
assert rv["seed_id"] == rv["expected_seed_id"]
|
||||
assert rv["seed_id"] == rv["current_seed_id"]
|
||||
|
||||
finally:
|
||||
process.terminate()
|
||||
process.join()
|
||||
assert process.exitcode == 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user