diff --git a/.gitignore b/.gitignore index 3a5b40b..e62caca 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,8 @@ __pycache__ /*.eggs .tox .eggs +.ruff_cache +.pytest_cache *~ # geckodriver.log @@ -15,4 +17,4 @@ __pycache__ docker/.env # vscode dev container settings -compose-dev.yaml \ No newline at end of file +compose-dev.yaml diff --git a/basicswap/bin/prepare.py b/basicswap/bin/prepare.py index f3b9d29..0efd9cd 100755 --- a/basicswap/bin/prepare.py +++ b/basicswap/bin/prepare.py @@ -3115,8 +3115,6 @@ def main(): for c in with_coins: prepareDataDir(c, settings, chain, particl_wallet_mnemonic, extra_opts) - save_config(config_path, settings) - if particl_wallet_mnemonic == "none": save_config(config_path, settings) logger.info("Done.") diff --git a/basicswap/interface/btc.py b/basicswap/interface/btc.py index 8a657d2..8729b6a 100644 --- a/basicswap/interface/btc.py +++ b/basicswap/interface/btc.py @@ -10,9 +10,13 @@ import base64 import hashlib import json import logging +import mmap import os +import shutil +import sqlite3 import traceback + from io import BytesIO from basicswap.basicswap_util import ( @@ -413,7 +417,18 @@ class BTCInterface(Secp256k1Interface): raise ValueError("Failed to import descriptors.") else: key_wif = self.encodeKey(key_bytes) - self.rpc_wallet("sethdseed", [True, key_wif]) + try: + self.rpc_wallet("sethdseed", [True, key_wif]) + except Exception as e: + self._log.debug(f"sethdseed failed: {e}") + """ + # TODO: Find derived key counts + if "Already have this key" in str(e): + key_id: bytes = self.getSeedHash(key_bytes) + self.setActiveKeyChain(key_id) + else: + """ + raise (e) def getWalletInfo(self): rv = self.rpc_wallet("getwalletinfo") @@ -1984,12 +1999,95 @@ class BTCInterface(Secp256k1Interface): def encryptWallet(self, password: str, check_seed: bool = True): # Watchonly wallets are not encrypted - + # Workaround for https://github.com/bitcoin/bitcoin/issues/26607 seed_id_before: str = self.getWalletSeedID() + chain_client_settings = self._sc.getChainClientSettings( + self.coin_type() + ) # basicswap.json + if ( + chain_client_settings.get("manage_daemon", False) + and check_seed is True + and seed_id_before != "Not found" + ): + self.rpc("unloadwallet", [self._rpc_wallet]) + + # Store active keys + seedid_bytes: bytes = bytes.fromhex(seed_id_before)[::-1] + orig_active_descriptors = [] + orig_hdchain_bytes = None + walletpath = None + max_hdchain_key_count: int = 4000000 # Arbitrary + + datadir = chain_client_settings["datadir"] + if self._network != "mainnet": + datadir = os.path.join(datadir, self._network) + try_wallet_path = os.path.join(datadir, self._rpc_wallet) + if os.path.exists(try_wallet_path): + walletpath = try_wallet_path + else: + try_wallet_path = os.path.join(datadir, "wallets", self._rpc_wallet) + if os.path.exists(try_wallet_path): + walletpath = try_wallet_path + + walletfilepath = walletpath + if os.path.isdir(walletpath): + walletfilepath = os.path.join(walletpath, "wallet.dat") + + if walletpath is None: + self._log.warning(f"Unable to find {self.ticker()} wallet path.") + else: + if self._use_descriptors: + orig_active_descriptors = [] + with sqlite3.connect(walletfilepath) as conn: + c = conn.cursor() + rows = c.execute( + "SELECT * FROM main WHERE key in (:kext, :kint)", + { + "kext": bytes.fromhex( + "1161637469766565787465726e616c73706b02" + ), + "kint": bytes.fromhex( + "11616374697665696e7465726e616c73706b02" + ), + }, + ) + for row in rows: + k, v = row + orig_active_descriptors.append({"k": k, "v": v}) + else: + with open(walletfilepath, "rb") as fp: + with mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) as mm: + pos = mm.find(seedid_bytes) + while pos != -1: + mm.seek(pos - 8) + hdchain_bytes = mm.read(12 + 20) + version = int.from_bytes(hdchain_bytes[:4], "little") + if version == 2: + external_counter = int.from_bytes( + hdchain_bytes[4:8], "little" + ) + internal_counter = int.from_bytes( + hdchain_bytes[-4:], "little" + ) + if ( + external_counter > 0 + and external_counter <= max_hdchain_key_count + and internal_counter > 0 + and internal_counter <= max_hdchain_key_count + ): + orig_hdchain_bytes = hdchain_bytes + self._log.debug( + f"Found hdchain for: {seed_id_before} external_counter: {external_counter}, internal_counter: {internal_counter}." + ) + break + pos = mm.find(seedid_bytes, pos + 1) + + self.rpc("loadwallet", [self._rpc_wallet]) + self.rpc_wallet("encryptwallet", [password]) - if check_seed is False or seed_id_before == "Not found": + if check_seed is False or seed_id_before == "Not found" or walletpath is None: return seed_id_after: str = self.getWalletSeedID() @@ -2000,42 +2098,104 @@ class BTCInterface(Secp256k1Interface): f"seed_id_before: {seed_id_before} seed_id_after: {seed_id_after}." ) self.setWalletSeedWarning(True) - # Workaround for https://github.com/bitcoin/bitcoin/issues/26607 - chain_client_settings = self._sc.getChainClientSettings( - self.coin_type() - ) # basicswap.json if chain_client_settings.get("manage_daemon", False) is False: self._log.warning( - f"{self.ticker()} manage_daemon is false. Please manually stop BSX, remove the wallet, start BSX, and reseed." + f"{self.ticker()} manage_daemon is false. Can't attempt to fix." ) return + if self._use_descriptors: + if len(orig_active_descriptors) < 2: + self._log.error( + "Could not find original active descriptors for wallet." + ) + return + self._log.info("Attempting to revert to last descriptors.") + else: + if orig_hdchain_bytes is None: + self._log.error("Could not find hdchain for wallet.") + return + self._log.info("Attempting to revert to last hdchain.") try: - self.rpc_wallet("unloadwallet", [self._rpc_wallet]) - datadir = chain_client_settings["datadir"] - if self._network != "mainnet": - datadir = os.path.join(datadir, self._network) + # Make a copy of the encrypted wallet before modifying it + bkp_path = walletpath + ".bkp" + for i in range(100): + if not os.path.exists(bkp_path): + break + bkp_path = walletpath + f".bkp{i}" - try_wallet_path = os.path.join(datadir, self._rpc_wallet) - if os.path.exists(try_wallet_path): - new_wallet_path = os.path.join(datadir, self._rpc_wallet + ".old") - os.rename(try_wallet_path, new_wallet_path) + if os.path.exists(bkp_path): + self._log.error("Could not find backup path for wallet.") + return + + self.rpc("unloadwallet", [self._rpc_wallet]) + + if os.path.isfile(walletpath): + shutil.copy(walletpath, bkp_path) else: - try_wallet_path = os.path.join(datadir, "wallets", self._rpc_wallet) - if os.path.exists(try_wallet_path): - new_wallet_path = os.path.join( - datadir, "wallets", self._rpc_wallet + ".old" - ) - os.rename(try_wallet_path, new_wallet_path) - else: - raise ValueError("Can't find old wallet path.") + shutil.copytree(walletpath, bkp_path) - self.createWallet(self._rpc_wallet, password=password) - self._sc.ci(Coins.PART).unlockWallet(password, check_seed=False) - self.unlockWallet(password, check_seed=False) - restore_time = chain_client_settings.get("restore_time", 0) - self._sc.initialiseWallet(self.coin_type(), True, restore_time=restore_time) - self._sc.checkWalletSeed(self.coin_type()) + hdchain_replaced: bool = False + if self._use_descriptors: + with sqlite3.connect(walletfilepath) as conn: + c = conn.cursor() + c.executemany( + "UPDATE main SET value = :v WHERE key = :k", + orig_active_descriptors, + ) + conn.commit() + else: + seedid_after_bytes: bytes = bytes.fromhex(seed_id_after)[::-1] + with open(walletfilepath, "r+b") as fp: + with mmap.mmap(fp.fileno(), 0) as mm: + pos = mm.find(seedid_after_bytes) + while pos != -1: + mm.seek(pos - 8) + hdchain_bytes = mm.read(12 + 20) + version = int.from_bytes(hdchain_bytes[:4], "little") + if version == 2: + external_counter = int.from_bytes( + hdchain_bytes[4:8], "little" + ) + internal_counter = int.from_bytes( + hdchain_bytes[-4:], "little" + ) + if ( + external_counter > 0 + and external_counter <= max_hdchain_key_count + and internal_counter > 0 + and internal_counter <= max_hdchain_key_count + ): + self._log.debug( + f"Replacing hdchain for: {seed_id_after} external_counter: {external_counter}, internal_counter: {internal_counter}." + ) + offset: int = pos - 8 + mm.seek(offset) + mm.write(orig_hdchain_bytes) + self._log.debug( + f"hdchain replaced at offset: {offset}." + ) + hdchain_replaced = True + # Can appear multiple times in file, replace all. + pos = mm.find(seedid_after_bytes, pos + 1) + + if hdchain_replaced is False: + self._log.error("Could not find new hdchain in wallet.") + + self.rpc("loadwallet", [self._rpc_wallet]) + + if hdchain_replaced: + self.unlockWallet(password, check_seed=False) + seed_id_after_restore: str = self.getWalletSeedID() + if seed_id_after_restore == seed_id_before: + self._log.debug("Running newkeypool.") + self.rpc_wallet("newkeypool") + else: + self._log.warning( + f"Expected seed id not found: {seed_id_before}, have {seed_id_after_restore}." + ) + + self.lockWallet() except Exception as e: self._log.error(f"{self.ticker()} recreating wallet failed: {e}.") diff --git a/basicswap/interface/firo.py b/basicswap/interface/firo.py index 884e2e4..bb56292 100644 --- a/basicswap/interface/firo.py +++ b/basicswap/interface/firo.py @@ -58,6 +58,25 @@ class FIROInterface(BTCInterface): def checkWallets(self) -> int: return 1 + def encryptWallet(self, password: str, check_seed: bool = True): + # Watchonly wallets are not encrypted + # Firo shuts down after encryptwallet + seed_id_before: str = self.getWalletSeedID() if check_seed else "Not found" + + self.rpc_wallet("encryptwallet", [password]) + + if check_seed is False or seed_id_before == "Not found": + return + seed_id_after: str = self.getWalletSeedID() + + if seed_id_before == seed_id_after: + return + self._log.warning(f"{self.ticker()} wallet seed changed after encryption.") + self._log.debug( + f"seed_id_before: {seed_id_before} seed_id_after: {seed_id_after}." + ) + self.setWalletSeedWarning(True) + def getNewAddress(self, use_segwit, label="swap_receive"): return self.rpc("getnewaddress", [label]) # addr_plain = self.rpc('getnewaddress', [label]) diff --git a/basicswap/interface/pivx.py b/basicswap/interface/pivx.py index 1d58c7d..2c7017f 100644 --- a/basicswap/interface/pivx.py +++ b/basicswap/interface/pivx.py @@ -59,7 +59,7 @@ class PIVXInterface(BTCInterface): if chain_client_settings.get("manage_daemon", False) is False: self._log.warning( - f"{self.ticker()} manage_daemon is false. Please manually stop BSX, remove the wallet, start BSX, and reseed." + f"{self.ticker()} manage_daemon is false. Can't attempt to fix." ) return diff --git a/tests/basicswap/extended/test_wallet_encryption.py b/tests/basicswap/extended/test_wallet_encryption.py index d08b965..6f0c7dd 100644 --- a/tests/basicswap/extended/test_wallet_encryption.py +++ b/tests/basicswap/extended/test_wallet_encryption.py @@ -7,25 +7,42 @@ import json import logging +import mmap import multiprocessing import os import shlex import shutil +import sqlite3 import subprocess import sys import threading import unittest + from unittest.mock import patch +from basicswap.util.ecc import ( + i2b, + getSecretInt, +) from basicswap.rpc import escape_rpcauth, make_rpc_func from basicswap.interface.dcr.rpc import make_rpc_func as make_dcr_rpc_func from tests.basicswap.util import ( read_json_api, waitForServer, ) +from basicswap.contrib.rpcauth import generate_salt, password_to_hmac +from basicswap.util.address import ( + b58encode, + toWIF, +) +from basicswap.util.crypto import ( + sha256, +) +from basicswap.util.extkey import ExtKeyPair +from basicswap.contrib.test_framework.descriptors import descsum_create bin_path = os.path.expanduser(os.getenv("TEST_BIN_PATH", "")) -test_base_path = os.path.expanduser(os.getenv("TEST_PATH", "~/test_basicswap")) +test_base_path = os.path.expanduser(os.getenv("TEST_PATH", "/tmp/test_basicswap")) delay_event = threading.Event() logger = logging.getLogger() @@ -75,6 +92,21 @@ def callcoincli(binpath, datadir, params, wallet=None, timeout=None): return out[0].decode("utf-8").strip() +def encode_secret_extkey(prefix, ek_data: bytes) -> str: + assert len(ek_data) == 74 + data: bytes = prefix.to_bytes(4, "big") + ek_data + checksum = sha256(sha256(data)) + return b58encode(data + checksum[0:4]) + + +test_seed: bytes = bytes.fromhex( + "8e54a313e6df8918df6d758fafdbf127a115175fdd2238d0e908dd8093c9ac3b" +) +test_seedid = "3da5c0af91879e8ce97d9a843874601c08688078" +wif_prefix: int = 239 +test_wif: str = toWIF(wif_prefix, test_seed) + + class Test(unittest.TestCase): test_coins = [ @@ -594,6 +626,7 @@ class Test(unittest.TestCase): assert "success" in rv for coin in self.test_coins: + logging.info(f"Coin: {coin}") if coin == "particl": continue if coin == "firo": @@ -658,6 +691,588 @@ class Test(unittest.TestCase): process.join() assert process.exitcode == 0 + def write_btc_conf(self, datadir): + conf_path = os.path.join(datadir, "bitcoin.conf") + with open(conf_path, "w") as fp: + fp.write("regtest=1\n") + fp.write("[regtest]\n") + fp.write("printtoconsole=0\n") + fp.write("server=1\n") + fp.write("discover=0\n") + fp.write("listenonion=0\n") + fp.write("bind=127.0.0.1\n") + fp.write("debug=1\n") + fp.write("debugexclude=libevent\n") + fp.write("deprecatedrpc=create_bdb\n") + fp.write("rpcport=12223\n") + salt = generate_salt(16) + fp.write( + "rpcauth={}:{}${}\n".format( + "test", + salt, + password_to_hmac(salt, "test_pass"), + ) + ) + + def test_btc_wallets_with_module(self): + import berkeleydb + + if not os.path.exists(bin_path): + raise ValueError("TEST_BIN_PATH not set.") + test_path = os.path.join(test_base_path, "test_btc_wallets_with_module") + if os.path.exists(test_path): + shutil.rmtree(test_path) + os.makedirs(test_path) + self.write_btc_conf(test_path) + daemon_path = os.path.join(bin_path, "bitcoin", "bitcoind") + args = [ + daemon_path, + "-datadir=" + test_path, + ] + bitcoind_process = subprocess.Popen(args) + try: + rpc = make_rpc_func(12223, "test:test_pass") + for i in range(20): + try: + rv = rpc("listwallets") + break + except Exception as e: # noqa: F841 + delay_event.wait(1.0) + + # wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors + rpc( + "createwallet", + ["bdb_wallet", False, True, "", False, False], + ) + + rpc("sethdseed", [True, test_wif], wallet_override="bdb_wallet") + rv = rpc("getwalletinfo", wallet_override="bdb_wallet") + assert rv["hdseedid"] == test_seedid + + new_addr = rpc("getnewaddress", wallet_override="bdb_wallet") + logging.info(f"getnewaddress {new_addr}") + rv = rpc( + "getaddressinfo", + [ + new_addr, + ], + wallet_override="bdb_wallet", + ) + logging.info(f"getaddressinfo before encrypt {rv}") + assert rv["hdmasterfingerprint"] == "a55b7ea9" + + rpc( + "unloadwallet", + [ + "bdb_wallet", + ], + ) + + walletdir = os.path.join(test_path, "regtest", "wallets", "bdb_wallet") + walletpath = os.path.join(walletdir, "wallet.dat") + + db = berkeleydb.db.DB() + db.open( + walletpath, + "main", + berkeleydb.db.DB_BTREE, + berkeleydb.db.DB_THREAD | berkeleydb.db.DB_CREATE, + ) + prev_hdchain_key = None + prev_hdchain_value = None + for k, v in db.items(): + if b"hdchain" in k: + prev_hdchain_key = k + prev_hdchain_value = v + db.close() + + rpc( + "loadwallet", + [ + "bdb_wallet", + ], + ) + + rv = rpc( + "encryptwallet", + [ + "test.123", + ], + wallet_override="bdb_wallet", + ) + logging.info(f"encryptwallet {rv}") + rv = rpc("getwalletinfo", wallet_override="bdb_wallet") + logging.info(f"getwalletinfo {rv}") + assert rv["hdseedid"] != test_seedid + + rpc( + "unloadwallet", + [ + "bdb_wallet", + ], + ) + + bkp_path = os.path.join(walletdir, "wallet.dat" + ".bkp") + for i in range(1000): + if os.path.exists(bkp_path): + bkp_path = os.path.join(walletdir, "wallet.dat" + f".bkp{i}") + + assert os.path.exists(bkp_path) is False + if os.path.isfile(walletpath): + shutil.copy(walletpath, bkp_path) + else: + shutil.copytree(walletpath, bkp_path) + + # Replace hdchain with previous value + db = berkeleydb.db.DB() + db.open( + walletpath, + "main", + berkeleydb.db.DB_BTREE, + berkeleydb.db.DB_THREAD | berkeleydb.db.DB_CREATE, + ) + db[prev_hdchain_key] = prev_hdchain_value + db.close() + + rpc( + "loadwallet", + [ + "bdb_wallet", + ], + ) + + rv = rpc("getwalletinfo", wallet_override="bdb_wallet") + logging.info(f"getwalletinfo {rv}") + assert rv["hdseedid"] == test_seedid + + # Picks from wrong keypool + new_addr = rpc("getnewaddress", wallet_override="bdb_wallet") + rv = rpc( + "getaddressinfo", + [ + new_addr, + ], + wallet_override="bdb_wallet", + ) + assert rv["hdmasterfingerprint"] != "a55b7ea9" + + rpc("walletpassphrase", ["test.123", 1000], wallet_override="bdb_wallet") + rpc("newkeypool", wallet_override="bdb_wallet") + + new_addr = rpc("getnewaddress", wallet_override="bdb_wallet") + rv = rpc( + "getaddressinfo", + [ + new_addr, + ], + wallet_override="bdb_wallet", + ) + assert rv["hdmasterfingerprint"] == "a55b7ea9" + + finally: + rpc("stop") + bitcoind_process.wait(timeout=30) + + def test_btc_wallets_without_module(self): + if not os.path.exists(bin_path): + raise ValueError("TEST_BIN_PATH not set.") + test_path = os.path.join(test_base_path, "test_btc_wallets_without_module") + if os.path.exists(test_path): + shutil.rmtree(test_path) + os.makedirs(test_path) + self.write_btc_conf(test_path) + daemon_path = os.path.join(bin_path, "bitcoin", "bitcoind") + args = [ + daemon_path, + "-datadir=" + test_path, + ] + bitcoind_process = subprocess.Popen(args) + try: + rpc = make_rpc_func(12223, "test:test_pass") + for i in range(20): + try: + rv = rpc("listwallets") + break + except Exception as e: # noqa: F841 + delay_event.wait(1.0) + + rv = rpc( + "createwallet", + ["bdb_wallet2", False, True, "", False, False], + ) + logging.info(f"createwallet {rv}") + + rpc("sethdseed", [True, test_wif], wallet_override="bdb_wallet2") + + new_addr = rpc("getnewaddress", wallet_override="bdb_wallet2") + logging.info(f"getnewaddress {new_addr}") + rv = rpc( + "getaddressinfo", + [ + new_addr, + ], + wallet_override="bdb_wallet2", + ) + logging.info(f"getaddressinfo before encrypt {rv}") + assert rv["hdmasterfingerprint"] == "a55b7ea9" + + rv = rpc("getwalletinfo", wallet_override="bdb_wallet2") + logging.info(f"getwalletinfo {rv}") + assert rv["hdseedid"] == test_seedid + + seedid_bytes = bytes.fromhex(rv["hdseedid"])[::-1] + # keypoolsize and keypoolsize_hd_internal are how many pre-generated keys remain unused. + orig_hdchain_bytes_predicted = ( + int(2).to_bytes(4, "little") + + int(1000).to_bytes(4, "little") + + seedid_bytes + + int(1000).to_bytes(4, "little") + ) + logging.info( + f"orig_hdchain_bytes_predicted {orig_hdchain_bytes_predicted.hex()}" + ) + + rv = rpc( + "unloadwallet", + [ + "bdb_wallet2", + ], + ) + logging.info(f"Looking for hdchain for {seedid_bytes.hex()}") + walletdir = os.path.join(test_path, "regtest", "wallets", "bdb_wallet2") + walletpath = os.path.join(walletdir, "wallet.dat") + found_hdchain = False + max_key_count = 4000000 # arbitrary + with open(walletpath, "rb") as fp: + with mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) as mm: + pos = mm.find(seedid_bytes) + while pos != -1: + mm.seek(pos - 8) + hdchain_bytes = mm.read(12 + 20) + version = int.from_bytes(hdchain_bytes[:4], "little") + if version == 2: + external_counter = int.from_bytes( + hdchain_bytes[4:8], "little" + ) + internal_counter = int.from_bytes( + hdchain_bytes[-4:], "little" + ) + if ( + external_counter > 0 + and external_counter <= max_key_count + and internal_counter > 0 + and internal_counter <= max_key_count + ): + orig_hdchain_bytes = hdchain_bytes + found_hdchain = True + break + pos = mm.find(seedid_bytes, pos + 1) + logging.info(f"orig_hdchain_bytes {orig_hdchain_bytes.hex()}") + assert found_hdchain + + rpc( + "loadwallet", + [ + "bdb_wallet2", + ], + ) + + rv = rpc( + "encryptwallet", + [ + "test.123", + ], + wallet_override="bdb_wallet2", + ) + logging.info(f"encryptwallet {rv}") + rv = rpc("getwalletinfo", wallet_override="bdb_wallet2") + logging.info(f"getwalletinfo {rv}") + assert rv["hdseedid"] != test_seedid + + new_hdchain_bytes = ( + int(2).to_bytes(4, "little") + + int(rv["keypoolsize"]).to_bytes(4, "little") + + bytes.fromhex(rv["hdseedid"])[::-1] + + int(rv["keypoolsize_hd_internal"]).to_bytes(4, "little") + ) + + rpc( + "unloadwallet", + [ + "bdb_wallet2", + ], + ) + + with open(walletpath, "r+b") as fp: + with mmap.mmap(fp.fileno(), 0) as mm: + offset = mm.find(new_hdchain_bytes) + if offset != -1: + mm.seek(offset) + mm.write(orig_hdchain_bytes) + print(f"Replaced at offset: {offset}") + else: + print("Byte sequence not found.") + + rpc( + "loadwallet", + [ + "bdb_wallet2", + ], + ) + + rv = rpc("getwalletinfo", wallet_override="bdb_wallet2") + logging.info(f"getwalletinfo {rv}") + assert rv["hdseedid"] == test_seedid + + rpc("walletpassphrase", ["test.123", 1000], wallet_override="bdb_wallet2") + rpc("newkeypool", wallet_override="bdb_wallet2") + + new_addr = rpc("getnewaddress", wallet_override="bdb_wallet2") + rv = rpc( + "getaddressinfo", + [ + new_addr, + ], + wallet_override="bdb_wallet2", + ) + assert rv["hdmasterfingerprint"] == "a55b7ea9" + finally: + rpc("stop") + bitcoind_process.wait(timeout=30) + + def test_btc_wallets_descriptors(self): + if not os.path.exists(bin_path): + raise ValueError("TEST_BIN_PATH not set.") + test_path = os.path.join(test_base_path, "test_btc_wallets_descriptors") + if os.path.exists(test_path): + shutil.rmtree(test_path) + os.makedirs(test_path) + self.write_btc_conf(test_path) + daemon_path = os.path.join(bin_path, "bitcoin", "bitcoind") + args = [ + daemon_path, + "-datadir=" + test_path, + ] + bitcoind_process = subprocess.Popen(args) + try: + rpc = make_rpc_func(12223, "test:test_pass") + for i in range(20): + try: + rv = rpc("listwallets") + break + except Exception as e: # noqa: F841 + delay_event.wait(1.0) + + rpc( + "createwallet", + ["descr_wallet", False, True, "", False, True], + ) + + ek = ExtKeyPair() + ek.set_seed(test_seed) + ek_encoded: str = encode_secret_extkey(0x04358394, ek.encode_v()) + + desc_external = descsum_create(f"wpkh({ek_encoded}/0h/0h/*h)") + desc_internal = descsum_create(f"wpkh({ek_encoded}/0h/1h/*h)") + rv = rpc( + "importdescriptors", + [ + [ + { + "desc": desc_external, + "timestamp": "now", + "active": True, + "range": [0, 10], + "next_index": 0, + }, + { + "desc": desc_internal, + "timestamp": "now", + "active": True, + "internal": True, + }, + ], + ], + wallet_override="descr_wallet", + ) + logging.info(f"importdescriptors {rv}") + addr = rpc( + "getnewaddress", ["test descriptors"], wallet_override="descr_wallet" + ) + addr_info = rpc( + "getaddressinfo", + [ + addr, + ], + wallet_override="descr_wallet", + ) + assert addr_info["hdmasterfingerprint"] == "a55b7ea9" + + descriptors_before = rpc( + "listdescriptors", [], wallet_override="descr_wallet" + ) + logging.info(f"descriptors_before {descriptors_before}") + + rv = rpc("getwalletinfo", wallet_override="descr_wallet") + logging.info(f"getwalletinfo {rv}") + + new_addr = rpc("getnewaddress", wallet_override="descr_wallet") + rv = rpc( + "getaddressinfo", + [ + new_addr, + ], + wallet_override="descr_wallet", + ) + assert rv["hdmasterfingerprint"] == "a55b7ea9" + + rpc( + "unloadwallet", + [ + "descr_wallet", + ], + ) + + walletdir = os.path.join(test_path, "regtest", "wallets", "descr_wallet") + walletpath = os.path.join(walletdir, "wallet.dat") + + orig_active_descriptors = [] + with sqlite3.connect(walletpath) as conn: + c = conn.cursor() + rows = c.execute( + "SELECT * FROM main WHERE key in (:kext, :kint)", + { + "kext": bytes.fromhex("1161637469766565787465726e616c73706b02"), + "kint": bytes.fromhex("11616374697665696e7465726e616c73706b02"), + }, + ) + for row in rows: + k, v = row + orig_active_descriptors.append({"k": k, "v": v}) + + assert len(orig_active_descriptors) == 2 + rpc( + "loadwallet", + [ + "descr_wallet", + ], + ) + + rv = rpc( + "encryptwallet", + [ + "test.123", + ], + wallet_override="descr_wallet", + ) + logging.info(f"encryptwallet {rv}") + + rv = rpc("listdescriptors", [], wallet_override="descr_wallet") + logging.info(f"listdescriptors {rv}") + + # The descriptors don't seem to be replaced + addr = rpc( + "getnewaddress", ["test descriptors"], wallet_override="descr_wallet" + ) + addr_info = rpc( + "getaddressinfo", + [ + addr, + ], + wallet_override="descr_wallet", + ) + assert addr_info["hdmasterfingerprint"] == "a55b7ea9" + + # Simulate, in case it changes + rpc("walletpassphrase", ["test.123", 1000], wallet_override="descr_wallet") + ek = ExtKeyPair() + ek.set_seed(i2b(getSecretInt())) + ek_encoded: str = encode_secret_extkey(0x04358394, ek.encode_v()) + + desc_external = descsum_create(f"wpkh({ek_encoded}/0h/0h/*h)") + desc_internal = descsum_create(f"wpkh({ek_encoded}/0h/1h/*h)") + rv = rpc( + "importdescriptors", + [ + [ + { + "desc": desc_external, + "timestamp": "now", + "active": True, + "range": [0, 10], + "next_index": 0, + }, + { + "desc": desc_internal, + "timestamp": "now", + "active": True, + "internal": True, + }, + ], + ], + wallet_override="descr_wallet", + ) + logging.info(f"importdescriptors {rv}") + addr = rpc( + "getnewaddress", ["test descriptors"], wallet_override="descr_wallet" + ) + addr_info = rpc( + "getaddressinfo", + [ + addr, + ], + wallet_override="descr_wallet", + ) + assert addr_info["hdmasterfingerprint"] != "a55b7ea9" + + rv = rpc("getwalletinfo", [], wallet_override="descr_wallet") + logging.info(f"getwalletinfo {rv}") + + rpc( + "unloadwallet", + [ + "descr_wallet", + ], + ) + bkp_path = os.path.join(walletdir, "wallet.dat" + ".bkp") + for i in range(1000): + if os.path.exists(bkp_path): + bkp_path = os.path.join(walletdir, "wallet.dat" + f".bkp{i}") + + assert os.path.exists(bkp_path) is False + if os.path.isfile(walletpath): + shutil.copy(walletpath, bkp_path) + else: + shutil.copytree(walletpath, bkp_path) + + with sqlite3.connect(walletpath) as conn: + c = conn.cursor() + c.executemany( + "UPDATE main SET value = :v WHERE key = :k", orig_active_descriptors + ) + conn.commit() + + rpc( + "loadwallet", + [ + "descr_wallet", + ], + ) + addr = rpc("getnewaddress", wallet_override="descr_wallet") + addr_info = rpc( + "getaddressinfo", + [ + addr, + ], + wallet_override="descr_wallet", + ) + logging.info(f"getaddressinfo {addr_info}") + assert addr_info["hdmasterfingerprint"] == "a55b7ea9" + + finally: + rpc("stop") + bitcoind_process.wait(timeout=30) + if __name__ == "__main__": unittest.main()