Refresh BCH keypool.

This commit is contained in:
tecnovert
2025-04-12 13:07:11 +02:00
parent 6bc654f57e
commit 5a4b1c737c
3 changed files with 121 additions and 15 deletions

View File

@@ -108,7 +108,28 @@ class BCHInterface(BTCInterface):
def createWallet(self, wallet_name: str, password: str = ""):
self.rpc("createwallet", [wallet_name, False])
self.rpc_wallet("encryptwallet", [password])
if password != "":
self.rpc(
"encryptwallet",
[
password,
],
override_wallet=wallet_name,
)
def newKeypool(self) -> None:
self._log.debug("Refreshing keypool.")
# Use up current keypool
wi = self.rpc_wallet("getwalletinfo")
keypool_size: int = wi["keypoolsize"]
for i in range(keypool_size):
_ = self.rpc_wallet("getnewaddress")
keypoolsize_hd_internal: int = wi["keypoolsize_hd_internal"]
for i in range(keypoolsize_hd_internal):
_ = self.rpc_wallet("getrawchangeaddress")
self.rpc_wallet("keypoolrefill")
# returns pkh
def decodeAddress(self, address: str) -> bytes:

View File

@@ -1994,13 +1994,31 @@ class BTCInterface(Secp256k1Interface):
locked = encrypted and wallet_info["unlocked_until"] <= 0
return encrypted, locked
def createWallet(self, wallet_name: str, password: str = ""):
self.rpc("createwallet", [wallet_name, False, True, password, False, False])
def createWallet(self, wallet_name: str, password: str = "") -> None:
self.rpc(
"createwallet",
[wallet_name, False, True, password, False, self._use_descriptors],
)
def setActiveWallet(self, wallet_name: str) -> None:
# For debugging
self.rpc_wallet = make_rpc_func(
self._rpcport, self._rpcauth, host=self._rpc_host, wallet=wallet_name
)
self._rpc_wallet = wallet_name
def newKeypool(self) -> None:
self._log.debug("Running newkeypool.")
self.rpc_wallet("newkeypool")
def encryptWallet(self, password: str, check_seed: bool = True):
# Watchonly wallets are not encrypted
# Workaround for https://github.com/bitcoin/bitcoin/issues/26607
seed_id_before: str = self.getWalletSeedID()
orig_active_descriptors = []
orig_hdchain_bytes = None
walletpath = None
max_hdchain_key_count: int = 4000000 # Arbitrary
chain_client_settings = self._sc.getChainClientSettings(
self.coin_type()
@@ -2010,14 +2028,8 @@ class BTCInterface(Secp256k1Interface):
and check_seed is True
and seed_id_before != "Not found"
):
self.rpc("unloadwallet", [self._rpc_wallet])
# Store active keys
seedid_bytes: bytes = bytes.fromhex(seed_id_before)[::-1]
orig_active_descriptors = []
orig_hdchain_bytes = None
walletpath = None
max_hdchain_key_count: int = 4000000 # Arbitrary
self.rpc("unloadwallet", [self._rpc_wallet])
datadir = chain_client_settings["datadir"]
if self._network != "mainnet":
@@ -2056,6 +2068,7 @@ class BTCInterface(Secp256k1Interface):
k, v = row
orig_active_descriptors.append({"k": k, "v": v})
else:
seedid_bytes: bytes = bytes.fromhex(seed_id_before)[::-1]
with open(walletfilepath, "rb") as fp:
with mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) as mm:
pos = mm.find(seedid_bytes)
@@ -2188,8 +2201,7 @@ class BTCInterface(Secp256k1Interface):
self.unlockWallet(password, check_seed=False)
seed_id_after_restore: str = self.getWalletSeedID()
if seed_id_after_restore == seed_id_before:
self._log.debug("Running newkeypool.")
self.rpc_wallet("newkeypool")
self.newKeypool()
else:
self._log.warning(
f"Expected seed id not found: {seed_id_before}, have {seed_id_after_restore}."
@@ -2201,8 +2213,6 @@ class BTCInterface(Secp256k1Interface):
self._log.error(f"{self.ticker()} recreating wallet failed: {e}.")
if self._sc.debug:
self._log.error(traceback.format_exc())
finally:
self._sc.ci(Coins.PART).lockWallet()
def changeWalletPassword(
self, old_password: str, new_password: str, check_seed_if_encrypt: bool = True
@@ -2230,7 +2240,14 @@ class BTCInterface(Secp256k1Interface):
# wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors
self.rpc(
"createwallet",
[self._rpc_wallet, False, True, password, False, False],
[
self._rpc_wallet,
False,
True,
password,
False,
self._use_descriptors,
],
)
# Max timeout value, ~3 years

View File

@@ -1740,6 +1740,74 @@ class BasicSwapTest(TestFunctions):
self.callnoderpc("unloadwallet", [new_wallet_name])
self.callnoderpc("unloadwallet", [new_watch_wallet_name])
def test_014_encrypt_existing_wallet(self):
logging.info(
f"---------- Test {self.test_coin_from.name} encrypt existing wallet"
)
ci = self.swap_clients[0].ci(self.test_coin_from)
wallet_name = "encrypt_existing_wallet"
ci.createWallet(wallet_name)
ci.setActiveWallet(wallet_name)
chain_client_settings = self.swap_clients[0].getChainClientSettings(
self.test_coin_from
)
try:
chain_client_settings["manage_daemon"] = True
ci.initialiseWallet(ci.getNewRandomKey())
original_seed_id = ci.getWalletSeedID()
addr1 = ci.getNewAddress(True)
addr1_info = ci.rpc_wallet(
"getaddressinfo",
[
addr1,
],
)
addr_int1 = ci.rpc_wallet("getrawchangeaddress")
addr_int1_info = ci.rpc_wallet(
"getaddressinfo",
[
addr_int1,
],
)
ci.encryptWallet("test.123")
after_seed_id = ci.getWalletSeedID()
addr2 = ci.getNewAddress(True)
addr2_info = ci.rpc_wallet(
"getaddressinfo",
[
addr2,
],
)
addr_int2 = ci.rpc_wallet("getrawchangeaddress")
addr_int2_info = ci.rpc_wallet(
"getaddressinfo",
[
addr_int2,
],
)
key_id_field: str = (
"hdmasterkeyid"
if "hdmasterkeyid" in addr1_info
else "hdmasterfingerprint"
)
assert addr1_info[key_id_field] == addr2_info[key_id_field]
assert addr_int1_info[key_id_field] == addr_int2_info[key_id_field]
assert addr1_info[key_id_field] == addr_int1_info[key_id_field]
assert original_seed_id == after_seed_id
finally:
ci.setActiveWallet("wallet.dat")
chain_client_settings["manage_daemon"] = False
def test_01_0_lock_bad_prevouts(self):
logging.info(
"---------- Test {} lock_bad_prevouts".format(self.test_coin_from.name)