Revert active hdchains.

This commit is contained in:
tecnovert
2025-04-12 00:36:10 +02:00
parent 3e98f174cd
commit 6bc654f57e
6 changed files with 829 additions and 35 deletions

4
.gitignore vendored
View File

@@ -8,6 +8,8 @@ __pycache__
/*.eggs
.tox
.eggs
.ruff_cache
.pytest_cache
*~
# geckodriver.log
@@ -15,4 +17,4 @@ __pycache__
docker/.env
# vscode dev container settings
compose-dev.yaml
compose-dev.yaml

View File

@@ -3115,8 +3115,6 @@ def main():
for c in with_coins:
prepareDataDir(c, settings, chain, particl_wallet_mnemonic, extra_opts)
save_config(config_path, settings)
if particl_wallet_mnemonic == "none":
save_config(config_path, settings)
logger.info("Done.")

View File

@@ -10,9 +10,13 @@ import base64
import hashlib
import json
import logging
import mmap
import os
import shutil
import sqlite3
import traceback
from io import BytesIO
from basicswap.basicswap_util import (
@@ -413,7 +417,18 @@ class BTCInterface(Secp256k1Interface):
raise ValueError("Failed to import descriptors.")
else:
key_wif = self.encodeKey(key_bytes)
self.rpc_wallet("sethdseed", [True, key_wif])
try:
self.rpc_wallet("sethdseed", [True, key_wif])
except Exception as e:
self._log.debug(f"sethdseed failed: {e}")
"""
# TODO: Find derived key counts
if "Already have this key" in str(e):
key_id: bytes = self.getSeedHash(key_bytes)
self.setActiveKeyChain(key_id)
else:
"""
raise (e)
def getWalletInfo(self):
rv = self.rpc_wallet("getwalletinfo")
@@ -1984,12 +1999,95 @@ class BTCInterface(Secp256k1Interface):
def encryptWallet(self, password: str, check_seed: bool = True):
# Watchonly wallets are not encrypted
# Workaround for https://github.com/bitcoin/bitcoin/issues/26607
seed_id_before: str = self.getWalletSeedID()
chain_client_settings = self._sc.getChainClientSettings(
self.coin_type()
) # basicswap.json
if (
chain_client_settings.get("manage_daemon", False)
and check_seed is True
and seed_id_before != "Not found"
):
self.rpc("unloadwallet", [self._rpc_wallet])
# Store active keys
seedid_bytes: bytes = bytes.fromhex(seed_id_before)[::-1]
orig_active_descriptors = []
orig_hdchain_bytes = None
walletpath = None
max_hdchain_key_count: int = 4000000 # Arbitrary
datadir = chain_client_settings["datadir"]
if self._network != "mainnet":
datadir = os.path.join(datadir, self._network)
try_wallet_path = os.path.join(datadir, self._rpc_wallet)
if os.path.exists(try_wallet_path):
walletpath = try_wallet_path
else:
try_wallet_path = os.path.join(datadir, "wallets", self._rpc_wallet)
if os.path.exists(try_wallet_path):
walletpath = try_wallet_path
walletfilepath = walletpath
if os.path.isdir(walletpath):
walletfilepath = os.path.join(walletpath, "wallet.dat")
if walletpath is None:
self._log.warning(f"Unable to find {self.ticker()} wallet path.")
else:
if self._use_descriptors:
orig_active_descriptors = []
with sqlite3.connect(walletfilepath) as conn:
c = conn.cursor()
rows = c.execute(
"SELECT * FROM main WHERE key in (:kext, :kint)",
{
"kext": bytes.fromhex(
"1161637469766565787465726e616c73706b02"
),
"kint": bytes.fromhex(
"11616374697665696e7465726e616c73706b02"
),
},
)
for row in rows:
k, v = row
orig_active_descriptors.append({"k": k, "v": v})
else:
with open(walletfilepath, "rb") as fp:
with mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) as mm:
pos = mm.find(seedid_bytes)
while pos != -1:
mm.seek(pos - 8)
hdchain_bytes = mm.read(12 + 20)
version = int.from_bytes(hdchain_bytes[:4], "little")
if version == 2:
external_counter = int.from_bytes(
hdchain_bytes[4:8], "little"
)
internal_counter = int.from_bytes(
hdchain_bytes[-4:], "little"
)
if (
external_counter > 0
and external_counter <= max_hdchain_key_count
and internal_counter > 0
and internal_counter <= max_hdchain_key_count
):
orig_hdchain_bytes = hdchain_bytes
self._log.debug(
f"Found hdchain for: {seed_id_before} external_counter: {external_counter}, internal_counter: {internal_counter}."
)
break
pos = mm.find(seedid_bytes, pos + 1)
self.rpc("loadwallet", [self._rpc_wallet])
self.rpc_wallet("encryptwallet", [password])
if check_seed is False or seed_id_before == "Not found":
if check_seed is False or seed_id_before == "Not found" or walletpath is None:
return
seed_id_after: str = self.getWalletSeedID()
@@ -2000,42 +2098,104 @@ class BTCInterface(Secp256k1Interface):
f"seed_id_before: {seed_id_before} seed_id_after: {seed_id_after}."
)
self.setWalletSeedWarning(True)
# Workaround for https://github.com/bitcoin/bitcoin/issues/26607
chain_client_settings = self._sc.getChainClientSettings(
self.coin_type()
) # basicswap.json
if chain_client_settings.get("manage_daemon", False) is False:
self._log.warning(
f"{self.ticker()} manage_daemon is false. Please manually stop BSX, remove the wallet, start BSX, and reseed."
f"{self.ticker()} manage_daemon is false. Can't attempt to fix."
)
return
if self._use_descriptors:
if len(orig_active_descriptors) < 2:
self._log.error(
"Could not find original active descriptors for wallet."
)
return
self._log.info("Attempting to revert to last descriptors.")
else:
if orig_hdchain_bytes is None:
self._log.error("Could not find hdchain for wallet.")
return
self._log.info("Attempting to revert to last hdchain.")
try:
self.rpc_wallet("unloadwallet", [self._rpc_wallet])
datadir = chain_client_settings["datadir"]
if self._network != "mainnet":
datadir = os.path.join(datadir, self._network)
# Make a copy of the encrypted wallet before modifying it
bkp_path = walletpath + ".bkp"
for i in range(100):
if not os.path.exists(bkp_path):
break
bkp_path = walletpath + f".bkp{i}"
try_wallet_path = os.path.join(datadir, self._rpc_wallet)
if os.path.exists(try_wallet_path):
new_wallet_path = os.path.join(datadir, self._rpc_wallet + ".old")
os.rename(try_wallet_path, new_wallet_path)
if os.path.exists(bkp_path):
self._log.error("Could not find backup path for wallet.")
return
self.rpc("unloadwallet", [self._rpc_wallet])
if os.path.isfile(walletpath):
shutil.copy(walletpath, bkp_path)
else:
try_wallet_path = os.path.join(datadir, "wallets", self._rpc_wallet)
if os.path.exists(try_wallet_path):
new_wallet_path = os.path.join(
datadir, "wallets", self._rpc_wallet + ".old"
)
os.rename(try_wallet_path, new_wallet_path)
else:
raise ValueError("Can't find old wallet path.")
shutil.copytree(walletpath, bkp_path)
self.createWallet(self._rpc_wallet, password=password)
self._sc.ci(Coins.PART).unlockWallet(password, check_seed=False)
self.unlockWallet(password, check_seed=False)
restore_time = chain_client_settings.get("restore_time", 0)
self._sc.initialiseWallet(self.coin_type(), True, restore_time=restore_time)
self._sc.checkWalletSeed(self.coin_type())
hdchain_replaced: bool = False
if self._use_descriptors:
with sqlite3.connect(walletfilepath) as conn:
c = conn.cursor()
c.executemany(
"UPDATE main SET value = :v WHERE key = :k",
orig_active_descriptors,
)
conn.commit()
else:
seedid_after_bytes: bytes = bytes.fromhex(seed_id_after)[::-1]
with open(walletfilepath, "r+b") as fp:
with mmap.mmap(fp.fileno(), 0) as mm:
pos = mm.find(seedid_after_bytes)
while pos != -1:
mm.seek(pos - 8)
hdchain_bytes = mm.read(12 + 20)
version = int.from_bytes(hdchain_bytes[:4], "little")
if version == 2:
external_counter = int.from_bytes(
hdchain_bytes[4:8], "little"
)
internal_counter = int.from_bytes(
hdchain_bytes[-4:], "little"
)
if (
external_counter > 0
and external_counter <= max_hdchain_key_count
and internal_counter > 0
and internal_counter <= max_hdchain_key_count
):
self._log.debug(
f"Replacing hdchain for: {seed_id_after} external_counter: {external_counter}, internal_counter: {internal_counter}."
)
offset: int = pos - 8
mm.seek(offset)
mm.write(orig_hdchain_bytes)
self._log.debug(
f"hdchain replaced at offset: {offset}."
)
hdchain_replaced = True
# Can appear multiple times in file, replace all.
pos = mm.find(seedid_after_bytes, pos + 1)
if hdchain_replaced is False:
self._log.error("Could not find new hdchain in wallet.")
self.rpc("loadwallet", [self._rpc_wallet])
if hdchain_replaced:
self.unlockWallet(password, check_seed=False)
seed_id_after_restore: str = self.getWalletSeedID()
if seed_id_after_restore == seed_id_before:
self._log.debug("Running newkeypool.")
self.rpc_wallet("newkeypool")
else:
self._log.warning(
f"Expected seed id not found: {seed_id_before}, have {seed_id_after_restore}."
)
self.lockWallet()
except Exception as e:
self._log.error(f"{self.ticker()} recreating wallet failed: {e}.")

View File

@@ -58,6 +58,25 @@ class FIROInterface(BTCInterface):
def checkWallets(self) -> int:
return 1
def encryptWallet(self, password: str, check_seed: bool = True):
# Watchonly wallets are not encrypted
# Firo shuts down after encryptwallet
seed_id_before: str = self.getWalletSeedID() if check_seed else "Not found"
self.rpc_wallet("encryptwallet", [password])
if check_seed is False or seed_id_before == "Not found":
return
seed_id_after: str = self.getWalletSeedID()
if seed_id_before == seed_id_after:
return
self._log.warning(f"{self.ticker()} wallet seed changed after encryption.")
self._log.debug(
f"seed_id_before: {seed_id_before} seed_id_after: {seed_id_after}."
)
self.setWalletSeedWarning(True)
def getNewAddress(self, use_segwit, label="swap_receive"):
return self.rpc("getnewaddress", [label])
# addr_plain = self.rpc('getnewaddress', [label])

View File

@@ -59,7 +59,7 @@ class PIVXInterface(BTCInterface):
if chain_client_settings.get("manage_daemon", False) is False:
self._log.warning(
f"{self.ticker()} manage_daemon is false. Please manually stop BSX, remove the wallet, start BSX, and reseed."
f"{self.ticker()} manage_daemon is false. Can't attempt to fix."
)
return

View File

@@ -7,25 +7,42 @@
import json
import logging
import mmap
import multiprocessing
import os
import shlex
import shutil
import sqlite3
import subprocess
import sys
import threading
import unittest
from unittest.mock import patch
from basicswap.util.ecc import (
i2b,
getSecretInt,
)
from basicswap.rpc import escape_rpcauth, make_rpc_func
from basicswap.interface.dcr.rpc import make_rpc_func as make_dcr_rpc_func
from tests.basicswap.util import (
read_json_api,
waitForServer,
)
from basicswap.contrib.rpcauth import generate_salt, password_to_hmac
from basicswap.util.address import (
b58encode,
toWIF,
)
from basicswap.util.crypto import (
sha256,
)
from basicswap.util.extkey import ExtKeyPair
from basicswap.contrib.test_framework.descriptors import descsum_create
bin_path = os.path.expanduser(os.getenv("TEST_BIN_PATH", ""))
test_base_path = os.path.expanduser(os.getenv("TEST_PATH", "~/test_basicswap"))
test_base_path = os.path.expanduser(os.getenv("TEST_PATH", "/tmp/test_basicswap"))
delay_event = threading.Event()
logger = logging.getLogger()
@@ -75,6 +92,21 @@ def callcoincli(binpath, datadir, params, wallet=None, timeout=None):
return out[0].decode("utf-8").strip()
def encode_secret_extkey(prefix, ek_data: bytes) -> str:
assert len(ek_data) == 74
data: bytes = prefix.to_bytes(4, "big") + ek_data
checksum = sha256(sha256(data))
return b58encode(data + checksum[0:4])
test_seed: bytes = bytes.fromhex(
"8e54a313e6df8918df6d758fafdbf127a115175fdd2238d0e908dd8093c9ac3b"
)
test_seedid = "3da5c0af91879e8ce97d9a843874601c08688078"
wif_prefix: int = 239
test_wif: str = toWIF(wif_prefix, test_seed)
class Test(unittest.TestCase):
test_coins = [
@@ -594,6 +626,7 @@ class Test(unittest.TestCase):
assert "success" in rv
for coin in self.test_coins:
logging.info(f"Coin: {coin}")
if coin == "particl":
continue
if coin == "firo":
@@ -658,6 +691,588 @@ class Test(unittest.TestCase):
process.join()
assert process.exitcode == 0
def write_btc_conf(self, datadir):
conf_path = os.path.join(datadir, "bitcoin.conf")
with open(conf_path, "w") as fp:
fp.write("regtest=1\n")
fp.write("[regtest]\n")
fp.write("printtoconsole=0\n")
fp.write("server=1\n")
fp.write("discover=0\n")
fp.write("listenonion=0\n")
fp.write("bind=127.0.0.1\n")
fp.write("debug=1\n")
fp.write("debugexclude=libevent\n")
fp.write("deprecatedrpc=create_bdb\n")
fp.write("rpcport=12223\n")
salt = generate_salt(16)
fp.write(
"rpcauth={}:{}${}\n".format(
"test",
salt,
password_to_hmac(salt, "test_pass"),
)
)
def test_btc_wallets_with_module(self):
import berkeleydb
if not os.path.exists(bin_path):
raise ValueError("TEST_BIN_PATH not set.")
test_path = os.path.join(test_base_path, "test_btc_wallets_with_module")
if os.path.exists(test_path):
shutil.rmtree(test_path)
os.makedirs(test_path)
self.write_btc_conf(test_path)
daemon_path = os.path.join(bin_path, "bitcoin", "bitcoind")
args = [
daemon_path,
"-datadir=" + test_path,
]
bitcoind_process = subprocess.Popen(args)
try:
rpc = make_rpc_func(12223, "test:test_pass")
for i in range(20):
try:
rv = rpc("listwallets")
break
except Exception as e: # noqa: F841
delay_event.wait(1.0)
# wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors
rpc(
"createwallet",
["bdb_wallet", False, True, "", False, False],
)
rpc("sethdseed", [True, test_wif], wallet_override="bdb_wallet")
rv = rpc("getwalletinfo", wallet_override="bdb_wallet")
assert rv["hdseedid"] == test_seedid
new_addr = rpc("getnewaddress", wallet_override="bdb_wallet")
logging.info(f"getnewaddress {new_addr}")
rv = rpc(
"getaddressinfo",
[
new_addr,
],
wallet_override="bdb_wallet",
)
logging.info(f"getaddressinfo before encrypt {rv}")
assert rv["hdmasterfingerprint"] == "a55b7ea9"
rpc(
"unloadwallet",
[
"bdb_wallet",
],
)
walletdir = os.path.join(test_path, "regtest", "wallets", "bdb_wallet")
walletpath = os.path.join(walletdir, "wallet.dat")
db = berkeleydb.db.DB()
db.open(
walletpath,
"main",
berkeleydb.db.DB_BTREE,
berkeleydb.db.DB_THREAD | berkeleydb.db.DB_CREATE,
)
prev_hdchain_key = None
prev_hdchain_value = None
for k, v in db.items():
if b"hdchain" in k:
prev_hdchain_key = k
prev_hdchain_value = v
db.close()
rpc(
"loadwallet",
[
"bdb_wallet",
],
)
rv = rpc(
"encryptwallet",
[
"test.123",
],
wallet_override="bdb_wallet",
)
logging.info(f"encryptwallet {rv}")
rv = rpc("getwalletinfo", wallet_override="bdb_wallet")
logging.info(f"getwalletinfo {rv}")
assert rv["hdseedid"] != test_seedid
rpc(
"unloadwallet",
[
"bdb_wallet",
],
)
bkp_path = os.path.join(walletdir, "wallet.dat" + ".bkp")
for i in range(1000):
if os.path.exists(bkp_path):
bkp_path = os.path.join(walletdir, "wallet.dat" + f".bkp{i}")
assert os.path.exists(bkp_path) is False
if os.path.isfile(walletpath):
shutil.copy(walletpath, bkp_path)
else:
shutil.copytree(walletpath, bkp_path)
# Replace hdchain with previous value
db = berkeleydb.db.DB()
db.open(
walletpath,
"main",
berkeleydb.db.DB_BTREE,
berkeleydb.db.DB_THREAD | berkeleydb.db.DB_CREATE,
)
db[prev_hdchain_key] = prev_hdchain_value
db.close()
rpc(
"loadwallet",
[
"bdb_wallet",
],
)
rv = rpc("getwalletinfo", wallet_override="bdb_wallet")
logging.info(f"getwalletinfo {rv}")
assert rv["hdseedid"] == test_seedid
# Picks from wrong keypool
new_addr = rpc("getnewaddress", wallet_override="bdb_wallet")
rv = rpc(
"getaddressinfo",
[
new_addr,
],
wallet_override="bdb_wallet",
)
assert rv["hdmasterfingerprint"] != "a55b7ea9"
rpc("walletpassphrase", ["test.123", 1000], wallet_override="bdb_wallet")
rpc("newkeypool", wallet_override="bdb_wallet")
new_addr = rpc("getnewaddress", wallet_override="bdb_wallet")
rv = rpc(
"getaddressinfo",
[
new_addr,
],
wallet_override="bdb_wallet",
)
assert rv["hdmasterfingerprint"] == "a55b7ea9"
finally:
rpc("stop")
bitcoind_process.wait(timeout=30)
def test_btc_wallets_without_module(self):
if not os.path.exists(bin_path):
raise ValueError("TEST_BIN_PATH not set.")
test_path = os.path.join(test_base_path, "test_btc_wallets_without_module")
if os.path.exists(test_path):
shutil.rmtree(test_path)
os.makedirs(test_path)
self.write_btc_conf(test_path)
daemon_path = os.path.join(bin_path, "bitcoin", "bitcoind")
args = [
daemon_path,
"-datadir=" + test_path,
]
bitcoind_process = subprocess.Popen(args)
try:
rpc = make_rpc_func(12223, "test:test_pass")
for i in range(20):
try:
rv = rpc("listwallets")
break
except Exception as e: # noqa: F841
delay_event.wait(1.0)
rv = rpc(
"createwallet",
["bdb_wallet2", False, True, "", False, False],
)
logging.info(f"createwallet {rv}")
rpc("sethdseed", [True, test_wif], wallet_override="bdb_wallet2")
new_addr = rpc("getnewaddress", wallet_override="bdb_wallet2")
logging.info(f"getnewaddress {new_addr}")
rv = rpc(
"getaddressinfo",
[
new_addr,
],
wallet_override="bdb_wallet2",
)
logging.info(f"getaddressinfo before encrypt {rv}")
assert rv["hdmasterfingerprint"] == "a55b7ea9"
rv = rpc("getwalletinfo", wallet_override="bdb_wallet2")
logging.info(f"getwalletinfo {rv}")
assert rv["hdseedid"] == test_seedid
seedid_bytes = bytes.fromhex(rv["hdseedid"])[::-1]
# keypoolsize and keypoolsize_hd_internal are how many pre-generated keys remain unused.
orig_hdchain_bytes_predicted = (
int(2).to_bytes(4, "little")
+ int(1000).to_bytes(4, "little")
+ seedid_bytes
+ int(1000).to_bytes(4, "little")
)
logging.info(
f"orig_hdchain_bytes_predicted {orig_hdchain_bytes_predicted.hex()}"
)
rv = rpc(
"unloadwallet",
[
"bdb_wallet2",
],
)
logging.info(f"Looking for hdchain for {seedid_bytes.hex()}")
walletdir = os.path.join(test_path, "regtest", "wallets", "bdb_wallet2")
walletpath = os.path.join(walletdir, "wallet.dat")
found_hdchain = False
max_key_count = 4000000 # arbitrary
with open(walletpath, "rb") as fp:
with mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) as mm:
pos = mm.find(seedid_bytes)
while pos != -1:
mm.seek(pos - 8)
hdchain_bytes = mm.read(12 + 20)
version = int.from_bytes(hdchain_bytes[:4], "little")
if version == 2:
external_counter = int.from_bytes(
hdchain_bytes[4:8], "little"
)
internal_counter = int.from_bytes(
hdchain_bytes[-4:], "little"
)
if (
external_counter > 0
and external_counter <= max_key_count
and internal_counter > 0
and internal_counter <= max_key_count
):
orig_hdchain_bytes = hdchain_bytes
found_hdchain = True
break
pos = mm.find(seedid_bytes, pos + 1)
logging.info(f"orig_hdchain_bytes {orig_hdchain_bytes.hex()}")
assert found_hdchain
rpc(
"loadwallet",
[
"bdb_wallet2",
],
)
rv = rpc(
"encryptwallet",
[
"test.123",
],
wallet_override="bdb_wallet2",
)
logging.info(f"encryptwallet {rv}")
rv = rpc("getwalletinfo", wallet_override="bdb_wallet2")
logging.info(f"getwalletinfo {rv}")
assert rv["hdseedid"] != test_seedid
new_hdchain_bytes = (
int(2).to_bytes(4, "little")
+ int(rv["keypoolsize"]).to_bytes(4, "little")
+ bytes.fromhex(rv["hdseedid"])[::-1]
+ int(rv["keypoolsize_hd_internal"]).to_bytes(4, "little")
)
rpc(
"unloadwallet",
[
"bdb_wallet2",
],
)
with open(walletpath, "r+b") as fp:
with mmap.mmap(fp.fileno(), 0) as mm:
offset = mm.find(new_hdchain_bytes)
if offset != -1:
mm.seek(offset)
mm.write(orig_hdchain_bytes)
print(f"Replaced at offset: {offset}")
else:
print("Byte sequence not found.")
rpc(
"loadwallet",
[
"bdb_wallet2",
],
)
rv = rpc("getwalletinfo", wallet_override="bdb_wallet2")
logging.info(f"getwalletinfo {rv}")
assert rv["hdseedid"] == test_seedid
rpc("walletpassphrase", ["test.123", 1000], wallet_override="bdb_wallet2")
rpc("newkeypool", wallet_override="bdb_wallet2")
new_addr = rpc("getnewaddress", wallet_override="bdb_wallet2")
rv = rpc(
"getaddressinfo",
[
new_addr,
],
wallet_override="bdb_wallet2",
)
assert rv["hdmasterfingerprint"] == "a55b7ea9"
finally:
rpc("stop")
bitcoind_process.wait(timeout=30)
def test_btc_wallets_descriptors(self):
if not os.path.exists(bin_path):
raise ValueError("TEST_BIN_PATH not set.")
test_path = os.path.join(test_base_path, "test_btc_wallets_descriptors")
if os.path.exists(test_path):
shutil.rmtree(test_path)
os.makedirs(test_path)
self.write_btc_conf(test_path)
daemon_path = os.path.join(bin_path, "bitcoin", "bitcoind")
args = [
daemon_path,
"-datadir=" + test_path,
]
bitcoind_process = subprocess.Popen(args)
try:
rpc = make_rpc_func(12223, "test:test_pass")
for i in range(20):
try:
rv = rpc("listwallets")
break
except Exception as e: # noqa: F841
delay_event.wait(1.0)
rpc(
"createwallet",
["descr_wallet", False, True, "", False, True],
)
ek = ExtKeyPair()
ek.set_seed(test_seed)
ek_encoded: str = encode_secret_extkey(0x04358394, ek.encode_v())
desc_external = descsum_create(f"wpkh({ek_encoded}/0h/0h/*h)")
desc_internal = descsum_create(f"wpkh({ek_encoded}/0h/1h/*h)")
rv = rpc(
"importdescriptors",
[
[
{
"desc": desc_external,
"timestamp": "now",
"active": True,
"range": [0, 10],
"next_index": 0,
},
{
"desc": desc_internal,
"timestamp": "now",
"active": True,
"internal": True,
},
],
],
wallet_override="descr_wallet",
)
logging.info(f"importdescriptors {rv}")
addr = rpc(
"getnewaddress", ["test descriptors"], wallet_override="descr_wallet"
)
addr_info = rpc(
"getaddressinfo",
[
addr,
],
wallet_override="descr_wallet",
)
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
descriptors_before = rpc(
"listdescriptors", [], wallet_override="descr_wallet"
)
logging.info(f"descriptors_before {descriptors_before}")
rv = rpc("getwalletinfo", wallet_override="descr_wallet")
logging.info(f"getwalletinfo {rv}")
new_addr = rpc("getnewaddress", wallet_override="descr_wallet")
rv = rpc(
"getaddressinfo",
[
new_addr,
],
wallet_override="descr_wallet",
)
assert rv["hdmasterfingerprint"] == "a55b7ea9"
rpc(
"unloadwallet",
[
"descr_wallet",
],
)
walletdir = os.path.join(test_path, "regtest", "wallets", "descr_wallet")
walletpath = os.path.join(walletdir, "wallet.dat")
orig_active_descriptors = []
with sqlite3.connect(walletpath) as conn:
c = conn.cursor()
rows = c.execute(
"SELECT * FROM main WHERE key in (:kext, :kint)",
{
"kext": bytes.fromhex("1161637469766565787465726e616c73706b02"),
"kint": bytes.fromhex("11616374697665696e7465726e616c73706b02"),
},
)
for row in rows:
k, v = row
orig_active_descriptors.append({"k": k, "v": v})
assert len(orig_active_descriptors) == 2
rpc(
"loadwallet",
[
"descr_wallet",
],
)
rv = rpc(
"encryptwallet",
[
"test.123",
],
wallet_override="descr_wallet",
)
logging.info(f"encryptwallet {rv}")
rv = rpc("listdescriptors", [], wallet_override="descr_wallet")
logging.info(f"listdescriptors {rv}")
# The descriptors don't seem to be replaced
addr = rpc(
"getnewaddress", ["test descriptors"], wallet_override="descr_wallet"
)
addr_info = rpc(
"getaddressinfo",
[
addr,
],
wallet_override="descr_wallet",
)
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
# Simulate, in case it changes
rpc("walletpassphrase", ["test.123", 1000], wallet_override="descr_wallet")
ek = ExtKeyPair()
ek.set_seed(i2b(getSecretInt()))
ek_encoded: str = encode_secret_extkey(0x04358394, ek.encode_v())
desc_external = descsum_create(f"wpkh({ek_encoded}/0h/0h/*h)")
desc_internal = descsum_create(f"wpkh({ek_encoded}/0h/1h/*h)")
rv = rpc(
"importdescriptors",
[
[
{
"desc": desc_external,
"timestamp": "now",
"active": True,
"range": [0, 10],
"next_index": 0,
},
{
"desc": desc_internal,
"timestamp": "now",
"active": True,
"internal": True,
},
],
],
wallet_override="descr_wallet",
)
logging.info(f"importdescriptors {rv}")
addr = rpc(
"getnewaddress", ["test descriptors"], wallet_override="descr_wallet"
)
addr_info = rpc(
"getaddressinfo",
[
addr,
],
wallet_override="descr_wallet",
)
assert addr_info["hdmasterfingerprint"] != "a55b7ea9"
rv = rpc("getwalletinfo", [], wallet_override="descr_wallet")
logging.info(f"getwalletinfo {rv}")
rpc(
"unloadwallet",
[
"descr_wallet",
],
)
bkp_path = os.path.join(walletdir, "wallet.dat" + ".bkp")
for i in range(1000):
if os.path.exists(bkp_path):
bkp_path = os.path.join(walletdir, "wallet.dat" + f".bkp{i}")
assert os.path.exists(bkp_path) is False
if os.path.isfile(walletpath):
shutil.copy(walletpath, bkp_path)
else:
shutil.copytree(walletpath, bkp_path)
with sqlite3.connect(walletpath) as conn:
c = conn.cursor()
c.executemany(
"UPDATE main SET value = :v WHERE key = :k", orig_active_descriptors
)
conn.commit()
rpc(
"loadwallet",
[
"descr_wallet",
],
)
addr = rpc("getnewaddress", wallet_override="descr_wallet")
addr_info = rpc(
"getaddressinfo",
[
addr,
],
wallet_override="descr_wallet",
)
logging.info(f"getaddressinfo {addr_info}")
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
finally:
rpc("stop")
bitcoind_process.wait(timeout=30)
if __name__ == "__main__":
unittest.main()