mirror of
https://github.com/basicswap/basicswap.git
synced 2025-11-05 18:38:09 +01:00
Change default key derivation paths.
To allow account keys to be imported into electrum.
Only applies when using descriptor wallets.
To match keys from legacy (sethdseed) wallets set the {COIN}_USE_LEGACY_KEY_PATHS environment variable before prepare.py.
This commit is contained in:
@@ -401,6 +401,12 @@ def getDescriptorWalletOption(coin_params):
|
|||||||
return toBool(os.getenv(ticker + "_USE_DESCRIPTORS", default_option))
|
return toBool(os.getenv(ticker + "_USE_DESCRIPTORS", default_option))
|
||||||
|
|
||||||
|
|
||||||
|
def getLegacyKeyPathOption(coin_params):
|
||||||
|
ticker: str = coin_params["ticker"]
|
||||||
|
default_option: bool = False
|
||||||
|
return toBool(os.getenv(ticker + "_USE_LEGACY_KEY_PATHS", default_option))
|
||||||
|
|
||||||
|
|
||||||
def getKnownVersion(coin_name: str) -> str:
|
def getKnownVersion(coin_name: str) -> str:
|
||||||
version, version_tag, _ = known_coins[coin_name]
|
version, version_tag, _ = known_coins[coin_name]
|
||||||
return version + version_tag
|
return version + version_tag
|
||||||
@@ -2792,6 +2798,8 @@ def main():
|
|||||||
coin_settings["watch_wallet_name"] = getWalletName(
|
coin_settings["watch_wallet_name"] = getWalletName(
|
||||||
coin_params, "bsx_watch", prefix_override=f"{ticker}_WATCH"
|
coin_params, "bsx_watch", prefix_override=f"{ticker}_WATCH"
|
||||||
)
|
)
|
||||||
|
if getLegacyKeyPathOption(coin_params) is True:
|
||||||
|
coin_settings["use_legacy_key_paths"] = True
|
||||||
|
|
||||||
if PART_RPC_USER != "":
|
if PART_RPC_USER != "":
|
||||||
chainclients["particl"]["rpcuser"] = PART_RPC_USER
|
chainclients["particl"]["rpcuser"] = PART_RPC_USER
|
||||||
|
|||||||
@@ -16,8 +16,8 @@ import shutil
|
|||||||
import sqlite3
|
import sqlite3
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
from typing import Dict, Optional
|
||||||
|
|
||||||
from basicswap.basicswap_util import (
|
from basicswap.basicswap_util import (
|
||||||
getVoutByAddress,
|
getVoutByAddress,
|
||||||
@@ -25,9 +25,9 @@ from basicswap.basicswap_util import (
|
|||||||
)
|
)
|
||||||
from basicswap.interface.base import Secp256k1Interface
|
from basicswap.interface.base import Secp256k1Interface
|
||||||
from basicswap.util import (
|
from basicswap.util import (
|
||||||
|
b2i,
|
||||||
ensure,
|
ensure,
|
||||||
i2b,
|
i2b,
|
||||||
b2i,
|
|
||||||
i2h,
|
i2h,
|
||||||
)
|
)
|
||||||
from basicswap.util.ecc import (
|
from basicswap.util.ecc import (
|
||||||
@@ -36,18 +36,18 @@ from basicswap.util.ecc import (
|
|||||||
)
|
)
|
||||||
from basicswap.util.extkey import ExtKeyPair
|
from basicswap.util.extkey import ExtKeyPair
|
||||||
from basicswap.util.script import (
|
from basicswap.util.script import (
|
||||||
|
SerialiseNumCompact,
|
||||||
decodeScriptNum,
|
decodeScriptNum,
|
||||||
getCompactSizeLen,
|
getCompactSizeLen,
|
||||||
SerialiseNumCompact,
|
|
||||||
getWitnessElementLen,
|
getWitnessElementLen,
|
||||||
)
|
)
|
||||||
from basicswap.util.address import (
|
from basicswap.util.address import (
|
||||||
toWIF,
|
|
||||||
b58encode,
|
|
||||||
b58decode,
|
b58decode,
|
||||||
decodeWif,
|
b58encode,
|
||||||
decodeAddress,
|
decodeAddress,
|
||||||
|
decodeWif,
|
||||||
pubkeyToAddress,
|
pubkeyToAddress,
|
||||||
|
toWIF,
|
||||||
)
|
)
|
||||||
from basicswap.util.crypto import (
|
from basicswap.util.crypto import (
|
||||||
hash160,
|
hash160,
|
||||||
@@ -294,6 +294,8 @@ class BTCInterface(Secp256k1Interface):
|
|||||||
self._expect_seedid_hex = None
|
self._expect_seedid_hex = None
|
||||||
self._altruistic = coin_settings.get("altruistic", True)
|
self._altruistic = coin_settings.get("altruistic", True)
|
||||||
self._use_descriptors = coin_settings.get("use_descriptors", False)
|
self._use_descriptors = coin_settings.get("use_descriptors", False)
|
||||||
|
# Use hardened account indices to match existing wallet keys, only applies when use_descriptors is True
|
||||||
|
self._use_legacy_key_paths = coin_settings.get("use_legacy_key_paths", False)
|
||||||
|
|
||||||
def open_rpc(self, wallet=None):
|
def open_rpc(self, wallet=None):
|
||||||
return openrpc(self._rpcport, self._rpcauth, wallet=wallet, host=self._rpc_host)
|
return openrpc(self._rpcport, self._rpcauth, wallet=wallet, host=self._rpc_host)
|
||||||
@@ -397,6 +399,13 @@ class BTCInterface(Secp256k1Interface):
|
|||||||
last_block_header = prev_block_header
|
last_block_header = prev_block_header
|
||||||
raise ValueError(f"Block header not found at time: {time}")
|
raise ValueError(f"Block header not found at time: {time}")
|
||||||
|
|
||||||
|
def getWalletAccountPath(self) -> str:
|
||||||
|
# Use a bip44 style path, however the seed (derived from the particl mnemonic keychain) can't be turned into a bip39 mnemonic without the matching entropy
|
||||||
|
purpose: int = 84 # native segwit
|
||||||
|
coin_type: int = self.chainparams_network()["bip44"]
|
||||||
|
account: int = 0
|
||||||
|
return f"{purpose}h/{coin_type}h/{account}h"
|
||||||
|
|
||||||
def initialiseWallet(self, key_bytes: bytes, restore_time: int = -1) -> None:
|
def initialiseWallet(self, key_bytes: bytes, restore_time: int = -1) -> None:
|
||||||
assert len(key_bytes) == 32
|
assert len(key_bytes) == 32
|
||||||
self._have_checked_seed = False
|
self._have_checked_seed = False
|
||||||
@@ -405,8 +414,15 @@ class BTCInterface(Secp256k1Interface):
|
|||||||
ek = ExtKeyPair()
|
ek = ExtKeyPair()
|
||||||
ek.set_seed(key_bytes)
|
ek.set_seed(key_bytes)
|
||||||
ek_encoded: str = self.encode_secret_extkey(ek.encode_v())
|
ek_encoded: str = self.encode_secret_extkey(ek.encode_v())
|
||||||
|
if self._use_legacy_key_paths:
|
||||||
|
# Match keys from legacy wallets (created from sethdseed)
|
||||||
desc_external = descsum_create(f"wpkh({ek_encoded}/0h/0h/*h)")
|
desc_external = descsum_create(f"wpkh({ek_encoded}/0h/0h/*h)")
|
||||||
desc_internal = descsum_create(f"wpkh({ek_encoded}/0h/1h/*h)")
|
desc_internal = descsum_create(f"wpkh({ek_encoded}/0h/1h/*h)")
|
||||||
|
else:
|
||||||
|
# Use a bip44 path so the seed can be exported as a mnemonic
|
||||||
|
path: str = self.getWalletAccountPath()
|
||||||
|
desc_external = descsum_create(f"wpkh({ek_encoded}/{path}/0/*)")
|
||||||
|
desc_internal = descsum_create(f"wpkh({ek_encoded}/{path}/1/*)")
|
||||||
|
|
||||||
rv = self.rpc_wallet(
|
rv = self.rpc_wallet(
|
||||||
"importdescriptors",
|
"importdescriptors",
|
||||||
@@ -445,6 +461,50 @@ class BTCInterface(Secp256k1Interface):
|
|||||||
"""
|
"""
|
||||||
raise (e)
|
raise (e)
|
||||||
|
|
||||||
|
def canExportToElectrum(self) -> bool:
|
||||||
|
# keychains must be unhardened to export into electrum
|
||||||
|
return self._use_descriptors is True and self._use_legacy_key_paths is False
|
||||||
|
|
||||||
|
def getAccountKey(
|
||||||
|
self,
|
||||||
|
key_bytes: bytes,
|
||||||
|
extkey_prefix: Optional[int] = None,
|
||||||
|
coin_type_overide: Optional[int] = None,
|
||||||
|
) -> str:
|
||||||
|
# For electrum, must start with zprv to get P2WPKH, addresses
|
||||||
|
# extkey_prefix: 0x04b2430c
|
||||||
|
ek = ExtKeyPair()
|
||||||
|
ek.set_seed(key_bytes)
|
||||||
|
path: str = self.getWalletAccountPath()
|
||||||
|
account_ek = ek.derive_path(path)
|
||||||
|
return self.encode_secret_extkey(account_ek.encode_v(), extkey_prefix)
|
||||||
|
|
||||||
|
def getWalletKeyChains(
|
||||||
|
self, key_bytes: bytes, extkey_prefix: Optional[int] = None
|
||||||
|
) -> Dict[str, str]:
|
||||||
|
ek = ExtKeyPair()
|
||||||
|
ek.set_seed(key_bytes)
|
||||||
|
|
||||||
|
# extkey must contain keydata to derive hardened child keys
|
||||||
|
|
||||||
|
if self.canExportToElectrum():
|
||||||
|
path: str = self.getWalletAccountPath()
|
||||||
|
external_extkey = ek.derive_path(f"{path}/0")
|
||||||
|
internal_extkey = ek.derive_path(f"{path}/1")
|
||||||
|
else:
|
||||||
|
# Match keychain paths of legacy wallets
|
||||||
|
external_extkey = ek.derive_path("0h/0h")
|
||||||
|
internal_extkey = ek.derive_path("0h/1h")
|
||||||
|
|
||||||
|
def encode_extkey(extkey):
|
||||||
|
return self.encode_secret_extkey(extkey.encode_v(), extkey_prefix)
|
||||||
|
|
||||||
|
rv = {
|
||||||
|
"external": encode_extkey(external_extkey),
|
||||||
|
"internal": encode_extkey(internal_extkey),
|
||||||
|
}
|
||||||
|
return rv
|
||||||
|
|
||||||
def getWalletInfo(self):
|
def getWalletInfo(self):
|
||||||
rv = self.rpc_wallet("getwalletinfo")
|
rv = self.rpc_wallet("getwalletinfo")
|
||||||
rv["encrypted"] = "unlocked_until" in rv
|
rv["encrypted"] = "unlocked_until" in rv
|
||||||
@@ -504,10 +564,16 @@ class BTCInterface(Secp256k1Interface):
|
|||||||
if descriptor is None:
|
if descriptor is None:
|
||||||
self._log.debug("Could not find active descriptor.")
|
self._log.debug("Could not find active descriptor.")
|
||||||
return "Not found"
|
return "Not found"
|
||||||
end = descriptor["desc"].find("/")
|
start = descriptor["desc"].find("]")
|
||||||
|
if start < 3:
|
||||||
|
return "Could not parse descriptor"
|
||||||
|
descriptor = descriptor["desc"][start + 1 :]
|
||||||
|
|
||||||
|
end = descriptor.find("/")
|
||||||
if end < 10:
|
if end < 10:
|
||||||
return "Not found"
|
return "Could not parse descriptor"
|
||||||
extkey = descriptor["desc"][5:end]
|
extkey = descriptor[:end]
|
||||||
|
|
||||||
extkey_data = b58decode(extkey)[4:-4]
|
extkey_data = b58decode(extkey)[4:-4]
|
||||||
extkey_data_hash: bytes = hash160(extkey_data)
|
extkey_data_hash: bytes = hash160(extkey_data)
|
||||||
return extkey_data_hash.hex()
|
return extkey_data_hash.hex()
|
||||||
@@ -611,8 +677,9 @@ class BTCInterface(Secp256k1Interface):
|
|||||||
pkh = hash160(pk)
|
pkh = hash160(pk)
|
||||||
return segwit_addr.encode(bech32_prefix, version, pkh)
|
return segwit_addr.encode(bech32_prefix, version, pkh)
|
||||||
|
|
||||||
def encode_secret_extkey(self, ek_data: bytes) -> str:
|
def encode_secret_extkey(self, ek_data: bytes, prefix=None) -> str:
|
||||||
assert len(ek_data) == 74
|
assert len(ek_data) == 74
|
||||||
|
if prefix is None:
|
||||||
prefix = self.chainparams_network()["ext_secret_key_prefix"]
|
prefix = self.chainparams_network()["ext_secret_key_prefix"]
|
||||||
data: bytes = prefix.to_bytes(4, "big") + ek_data
|
data: bytes = prefix.to_bytes(4, "big") + ek_data
|
||||||
checksum = sha256(sha256(data))
|
checksum = sha256(sha256(data))
|
||||||
|
|||||||
@@ -1054,6 +1054,15 @@ def js_getcoinseed(self, url_split, post_string, is_json) -> bytes:
|
|||||||
post_data = getFormData(post_string, is_json)
|
post_data = getFormData(post_string, is_json)
|
||||||
|
|
||||||
coin_in = get_data_entry(post_data, "coin")
|
coin_in = get_data_entry(post_data, "coin")
|
||||||
|
extkey_prefix = get_data_entry_or(
|
||||||
|
post_data, "extkey_prefix", 0x04B2430C
|
||||||
|
) # default, zprv for P2WPKH in electrum
|
||||||
|
if isinstance(extkey_prefix, str):
|
||||||
|
if extkey_prefix.isdigit():
|
||||||
|
extkey_prefix = int(extkey_prefix)
|
||||||
|
else:
|
||||||
|
extkey_prefix = int(extkey_prefix, 16) # Try hex
|
||||||
|
|
||||||
try:
|
try:
|
||||||
coin = getCoinIdFromName(coin_in)
|
coin = getCoinIdFromName(coin_in)
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -1090,7 +1099,6 @@ def js_getcoinseed(self, url_split, post_string, is_json) -> bytes:
|
|||||||
wallet_seed_id = ci.getWalletSeedID()
|
wallet_seed_id = ci.getWalletSeedID()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
wallet_seed_id = f"Error: {e}"
|
wallet_seed_id = f"Error: {e}"
|
||||||
|
|
||||||
rv.update(
|
rv.update(
|
||||||
{
|
{
|
||||||
"seed": seed_key.hex(),
|
"seed": seed_key.hex(),
|
||||||
@@ -1099,6 +1107,10 @@ def js_getcoinseed(self, url_split, post_string, is_json) -> bytes:
|
|||||||
"current_seed_id": wallet_seed_id,
|
"current_seed_id": wallet_seed_id,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
if hasattr(ci, "canExportToElectrum") and ci.canExportToElectrum():
|
||||||
|
rv.update(
|
||||||
|
{"account_key": ci.getAccountKey(seed_key, extkey_prefix)}
|
||||||
|
) # Master key can be imported into electrum (Must set prefix for P2WPKH)
|
||||||
|
|
||||||
return bytes(
|
return bytes(
|
||||||
json.dumps(rv),
|
json.dumps(rv),
|
||||||
|
|||||||
@@ -54,6 +54,8 @@ def ensure(v, err_string):
|
|||||||
def toBool(s) -> bool:
|
def toBool(s) -> bool:
|
||||||
if isinstance(s, bool):
|
if isinstance(s, bool):
|
||||||
return s
|
return s
|
||||||
|
if isinstance(s, int):
|
||||||
|
return False if s == 0 else True
|
||||||
return s.lower() in ["1", "true"]
|
return s.lower() in ["1", "true"]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
# Copyright (c) 2024 tecnovert
|
# Copyright (c) 2024-2025 tecnovert
|
||||||
# Distributed under the MIT software license, see the accompanying
|
# Distributed under the MIT software license, see the accompanying
|
||||||
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
|
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
|
||||||
|
from copy import deepcopy
|
||||||
from .crypto import blake256, hash160, hmac_sha512, ripemd160
|
from .crypto import blake256, hash160, hmac_sha512, ripemd160
|
||||||
|
|
||||||
from coincurve.keys import PrivateKey, PublicKey
|
from coincurve.keys import PrivateKey, PublicKey
|
||||||
@@ -21,6 +22,10 @@ def hash160_dcr(data: bytes) -> bytes:
|
|||||||
return ripemd160(blake256(data))
|
return ripemd160(blake256(data))
|
||||||
|
|
||||||
|
|
||||||
|
def hardened(i: int) -> int:
|
||||||
|
return i | (1 << 31)
|
||||||
|
|
||||||
|
|
||||||
class ExtKeyPair:
|
class ExtKeyPair:
|
||||||
__slots__ = (
|
__slots__ = (
|
||||||
"_depth",
|
"_depth",
|
||||||
@@ -50,6 +55,11 @@ class ExtKeyPair:
|
|||||||
def has_key(self) -> bool:
|
def has_key(self) -> bool:
|
||||||
return False if self._key is None else True
|
return False if self._key is None else True
|
||||||
|
|
||||||
|
def get_pubkey(self) -> bytes:
|
||||||
|
return (
|
||||||
|
self._pubkey if self._pubkey else PublicKey.from_secret(self._key).format()
|
||||||
|
)
|
||||||
|
|
||||||
def neuter(self) -> None:
|
def neuter(self) -> None:
|
||||||
if self._key is None:
|
if self._key is None:
|
||||||
raise ValueError("Already neutered")
|
raise ValueError("Already neutered")
|
||||||
@@ -83,11 +93,7 @@ class ExtKeyPair:
|
|||||||
out._pubkey = K.format()
|
out._pubkey = K.format()
|
||||||
else:
|
else:
|
||||||
k = PrivateKey(self._key)
|
k = PrivateKey(self._key)
|
||||||
out._fingerprint = self.hash_func(
|
out._fingerprint = self.hash_func(self.get_pubkey())[:4]
|
||||||
self._pubkey
|
|
||||||
if self._pubkey
|
|
||||||
else PublicKey.from_secret(self._key).format()
|
|
||||||
)[:4]
|
|
||||||
new_hash = BIP32Hash(self._chaincode, child_no, 0, self._key)
|
new_hash = BIP32Hash(self._chaincode, child_no, 0, self._key)
|
||||||
out._chaincode = new_hash[32:]
|
out._chaincode = new_hash[32:]
|
||||||
k.add(new_hash[:32], update=True)
|
k.add(new_hash[:32], update=True)
|
||||||
@@ -97,6 +103,26 @@ class ExtKeyPair:
|
|||||||
out.hash_func = self.hash_func
|
out.hash_func = self.hash_func
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
def derive_path(self, path: str):
|
||||||
|
path_entries = path.split("/")
|
||||||
|
rv = deepcopy(self)
|
||||||
|
for i, level in enumerate(path_entries):
|
||||||
|
level = level.lower()
|
||||||
|
if i == 0 and level == "s":
|
||||||
|
continue
|
||||||
|
should_harden: bool = False
|
||||||
|
if len(level) > 1 and level.endswith("h") or level.endswith("'"):
|
||||||
|
level = level[:-1]
|
||||||
|
should_harden = True
|
||||||
|
if level.isdigit():
|
||||||
|
child_no: int = int(level)
|
||||||
|
if should_harden:
|
||||||
|
child_no = hardened(child_no)
|
||||||
|
rv = rv.derive(child_no)
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid path node")
|
||||||
|
return rv
|
||||||
|
|
||||||
def encode_v(self) -> bytes:
|
def encode_v(self) -> bytes:
|
||||||
return (
|
return (
|
||||||
self._depth.to_bytes(1, "big")
|
self._depth.to_bytes(1, "big")
|
||||||
@@ -108,17 +134,12 @@ class ExtKeyPair:
|
|||||||
)
|
)
|
||||||
|
|
||||||
def encode_p(self) -> bytes:
|
def encode_p(self) -> bytes:
|
||||||
pubkey = (
|
|
||||||
PublicKey.from_secret(self._key).format()
|
|
||||||
if self._pubkey is None
|
|
||||||
else self._pubkey
|
|
||||||
)
|
|
||||||
return (
|
return (
|
||||||
self._depth.to_bytes(1, "big")
|
self._depth.to_bytes(1, "big")
|
||||||
+ self._fingerprint
|
+ self._fingerprint
|
||||||
+ self._child_no.to_bytes(4, "big")
|
+ self._child_no.to_bytes(4, "big")
|
||||||
+ self._chaincode
|
+ self._chaincode
|
||||||
+ pubkey
|
+ self.get_pubkey()
|
||||||
)
|
)
|
||||||
|
|
||||||
def decode(self, data: bytes) -> None:
|
def decode(self, data: bytes) -> None:
|
||||||
|
|||||||
@@ -48,6 +48,7 @@ PIVX_BASE_ZMQ_PORT = 36892
|
|||||||
PREFIX_SECRET_KEY_REGTEST = 0x2E
|
PREFIX_SECRET_KEY_REGTEST = 0x2E
|
||||||
|
|
||||||
BTC_USE_DESCRIPTORS = toBool(os.getenv("BTC_USE_DESCRIPTORS", False))
|
BTC_USE_DESCRIPTORS = toBool(os.getenv("BTC_USE_DESCRIPTORS", False))
|
||||||
|
BTC_USE_LEGACY_KEY_PATHS = toBool(os.getenv("BTC_USE_LEGACY_KEY_PATHS", False))
|
||||||
|
|
||||||
|
|
||||||
def prepareDataDir(
|
def prepareDataDir(
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ from tests.basicswap.common import (
|
|||||||
LTC_BASE_RPC_PORT,
|
LTC_BASE_RPC_PORT,
|
||||||
PIVX_BASE_PORT,
|
PIVX_BASE_PORT,
|
||||||
BTC_USE_DESCRIPTORS,
|
BTC_USE_DESCRIPTORS,
|
||||||
|
BTC_USE_LEGACY_KEY_PATHS,
|
||||||
)
|
)
|
||||||
from tests.basicswap.extended.test_nmc import (
|
from tests.basicswap.extended.test_nmc import (
|
||||||
NMC_BASE_PORT,
|
NMC_BASE_PORT,
|
||||||
@@ -146,6 +147,7 @@ def run_prepare(
|
|||||||
os.environ["BTC_RPC_PORT"] = str(BITCOIN_RPC_PORT_BASE)
|
os.environ["BTC_RPC_PORT"] = str(BITCOIN_RPC_PORT_BASE)
|
||||||
os.environ["BTC_PORT"] = str(BITCOIN_PORT_BASE)
|
os.environ["BTC_PORT"] = str(BITCOIN_PORT_BASE)
|
||||||
os.environ["BTC_USE_DESCRIPTORS"] = str(BTC_USE_DESCRIPTORS)
|
os.environ["BTC_USE_DESCRIPTORS"] = str(BTC_USE_DESCRIPTORS)
|
||||||
|
os.environ["BTC_USE_LEGACY_KEY_PATHS"] = str(BTC_USE_LEGACY_KEY_PATHS)
|
||||||
os.environ["BTC_ONION_PORT"] = str(BITCOIN_TOR_PORT_BASE)
|
os.environ["BTC_ONION_PORT"] = str(BITCOIN_TOR_PORT_BASE)
|
||||||
os.environ["LTC_RPC_PORT"] = str(LITECOIN_RPC_PORT_BASE)
|
os.environ["LTC_RPC_PORT"] = str(LITECOIN_RPC_PORT_BASE)
|
||||||
os.environ["DCR_RPC_PORT"] = str(DECRED_RPC_PORT_BASE)
|
os.environ["DCR_RPC_PORT"] = str(DECRED_RPC_PORT_BASE)
|
||||||
|
|||||||
85
tests/basicswap/extended/test_key_paths.py
Normal file
85
tests/basicswap/extended/test_key_paths.py
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Copyright (c) 2025 The Basicswap developers
|
||||||
|
# Distributed under the MIT software license, see the accompanying
|
||||||
|
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
export BTC_USE_DESCRIPTORS=true
|
||||||
|
export BTC_USE_LEGACY_KEY_PATHS=false
|
||||||
|
export EXTRA_CONFIG_JSON="{\"btc0\":[\"txindex=1\",\"rpcworkqueue=1100\"]}"
|
||||||
|
python tests/basicswap/extended/test_xmr_persistent.py
|
||||||
|
|
||||||
|
|
||||||
|
Start electrumx and electrum daemon
|
||||||
|
|
||||||
|
python tests/basicswap/extended/test_key_paths.py
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from tests.basicswap.util import (
|
||||||
|
read_json_api,
|
||||||
|
waitForServer,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger()
|
||||||
|
logger.level = logging.DEBUG
|
||||||
|
if not len(logger.handlers):
|
||||||
|
logger.addHandler(logging.StreamHandler(sys.stdout))
|
||||||
|
|
||||||
|
|
||||||
|
PORT_OFS = int(os.getenv("PORT_OFS", 1))
|
||||||
|
UI_PORT = 12700 + PORT_OFS
|
||||||
|
|
||||||
|
ELECTRUM_PATH = os.getenv("ELECTRUM_PATH")
|
||||||
|
ELECTRUM_DATADIR = os.getenv("ELECTRUM_DATADIR")
|
||||||
|
|
||||||
|
|
||||||
|
def signal_handler(self, sig, frame):
|
||||||
|
os.write(sys.stdout.fileno(), f"Signal {sig} detected.\n".encode("utf-8"))
|
||||||
|
self.delay_event.set()
|
||||||
|
|
||||||
|
|
||||||
|
class Test(unittest.TestCase):
|
||||||
|
delay_event = threading.Event()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
super(Test, cls).setUpClass()
|
||||||
|
|
||||||
|
signal.signal(
|
||||||
|
signal.SIGINT, lambda signal, frame: signal_handler(cls, signal, frame)
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_export(self):
|
||||||
|
|
||||||
|
waitForServer(self.delay_event, UI_PORT + 0)
|
||||||
|
waitForServer(self.delay_event, UI_PORT + 1)
|
||||||
|
|
||||||
|
coin_seed = read_json_api(UI_PORT, "getcoinseed", {"coin": "BTC"})
|
||||||
|
assert coin_seed["account_key"].startswith("zprv")
|
||||||
|
|
||||||
|
# override the prefix for testnet
|
||||||
|
coin_seed = read_json_api(
|
||||||
|
UI_PORT,
|
||||||
|
"getcoinseed",
|
||||||
|
{"coin": "BTC", "extkey_prefix": 0x045F18BC, "with_mnemonic": True},
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
coin_seed["account_key"]
|
||||||
|
== "vprv9K5NS8v2JWNxMeyKtfARGjUSW2zC6F6WbrJUo1HGyZ1NhRZpk6keXadq8XF25KgFMvT5AfXb6Ccn62c6wW2mbJTGfiDFPSE2oaQuvW6tSUX"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -124,7 +124,7 @@ def callbtcrpc(
|
|||||||
node_id,
|
node_id,
|
||||||
method,
|
method,
|
||||||
params=[],
|
params=[],
|
||||||
wallet=None,
|
wallet="wallet.dat",
|
||||||
base_rpc_port=BITCOIN_RPC_PORT_BASE + PORT_OFS,
|
base_rpc_port=BITCOIN_RPC_PORT_BASE + PORT_OFS,
|
||||||
):
|
):
|
||||||
auth = "test_btc_{0}:test_btc_pwd_{0}".format(node_id)
|
auth = "test_btc_{0}:test_btc_pwd_{0}".format(node_id)
|
||||||
@@ -315,9 +315,7 @@ def start_processes(self):
|
|||||||
]
|
]
|
||||||
< num_blocks
|
< num_blocks
|
||||||
):
|
):
|
||||||
logging.info(
|
logging.info(f"Mining {num_blocks} Monero blocks to {self.xmr_addr}.")
|
||||||
"Mining {} Monero blocks to {}.".format(num_blocks, self.xmr_addr)
|
|
||||||
)
|
|
||||||
callrpc_xmr(
|
callrpc_xmr(
|
||||||
XMR_BASE_RPC_PORT + 1,
|
XMR_BASE_RPC_PORT + 1,
|
||||||
"generateblocks",
|
"generateblocks",
|
||||||
@@ -336,7 +334,7 @@ def start_processes(self):
|
|||||||
if callbtcrpc(0, "getblockcount") < num_blocks:
|
if callbtcrpc(0, "getblockcount") < num_blocks:
|
||||||
logging.info(f"Mining {num_blocks} Bitcoin blocks to {self.btc_addr}")
|
logging.info(f"Mining {num_blocks} Bitcoin blocks to {self.btc_addr}")
|
||||||
callbtcrpc(0, "generatetoaddress", [num_blocks, self.btc_addr])
|
callbtcrpc(0, "generatetoaddress", [num_blocks, self.btc_addr])
|
||||||
logging.info("BTC blocks: %d", callbtcrpc(0, "getblockcount"))
|
logging.info("BTC blocks: {}".format(callbtcrpc(0, "getblockcount")))
|
||||||
|
|
||||||
if "litecoin" in TEST_COINS_LIST:
|
if "litecoin" in TEST_COINS_LIST:
|
||||||
self.ltc_addr = callltcrpc(
|
self.ltc_addr = callltcrpc(
|
||||||
@@ -345,7 +343,7 @@ def start_processes(self):
|
|||||||
num_blocks: int = 431
|
num_blocks: int = 431
|
||||||
have_blocks: int = callltcrpc(0, "getblockcount")
|
have_blocks: int = callltcrpc(0, "getblockcount")
|
||||||
if have_blocks < 500:
|
if have_blocks < 500:
|
||||||
logging.info("Mining %d Litecoin blocks to %s", num_blocks, self.ltc_addr)
|
logging.info(f"Mining {num_blocks} Litecoin blocks to {self.ltc_addr}")
|
||||||
callltcrpc(
|
callltcrpc(
|
||||||
0,
|
0,
|
||||||
"generatetoaddress",
|
"generatetoaddress",
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ from basicswap.db import (
|
|||||||
from basicswap.util import (
|
from basicswap.util import (
|
||||||
make_int,
|
make_int,
|
||||||
)
|
)
|
||||||
|
from basicswap.util.address import b58decode
|
||||||
from basicswap.util.address import (
|
from basicswap.util.address import (
|
||||||
decodeAddress,
|
decodeAddress,
|
||||||
)
|
)
|
||||||
@@ -773,6 +774,10 @@ class TestFunctions(BaseTest):
|
|||||||
class BasicSwapTest(TestFunctions):
|
class BasicSwapTest(TestFunctions):
|
||||||
|
|
||||||
test_fee_rate: int = 1000 # sats/kvB
|
test_fee_rate: int = 1000 # sats/kvB
|
||||||
|
expected_addresses = {
|
||||||
|
"external": "bcrt1qps7hnjd866e9ynxadgseprkc2l56m00dvwargr",
|
||||||
|
"internal": "bcrt1qdl9ryxkqjltv42lhfnqgdjf9tagxsjpp2xak9a",
|
||||||
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setUpClass(cls):
|
def setUpClass(cls):
|
||||||
@@ -1211,7 +1216,7 @@ class BasicSwapTest(TestFunctions):
|
|||||||
)
|
)
|
||||||
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
|
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
|
||||||
assert addr_info["hdkeypath"] == "m/0'/0'/0'"
|
assert addr_info["hdkeypath"] == "m/0'/0'/0'"
|
||||||
assert addr == "bcrt1qps7hnjd866e9ynxadgseprkc2l56m00dvwargr"
|
assert addr == self.expected_addresses["external"]
|
||||||
|
|
||||||
addr_change = self.callnoderpc("getrawchangeaddress", wallet=new_wallet_name)
|
addr_change = self.callnoderpc("getrawchangeaddress", wallet=new_wallet_name)
|
||||||
addr_info = self.callnoderpc(
|
addr_info = self.callnoderpc(
|
||||||
@@ -1223,9 +1228,20 @@ class BasicSwapTest(TestFunctions):
|
|||||||
)
|
)
|
||||||
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
|
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
|
||||||
assert addr_info["hdkeypath"] == "m/0'/1'/0'"
|
assert addr_info["hdkeypath"] == "m/0'/1'/0'"
|
||||||
assert addr_change == "bcrt1qdl9ryxkqjltv42lhfnqgdjf9tagxsjpp2xak9a"
|
assert addr_change == self.expected_addresses["internal"]
|
||||||
|
|
||||||
self.callnoderpc("unloadwallet", [new_wallet_name])
|
self.callnoderpc("unloadwallet", [new_wallet_name])
|
||||||
|
|
||||||
|
address_chains = ci.getWalletKeyChains(bytes.fromhex(test_seed))
|
||||||
|
for chain in ["external", "internal"]:
|
||||||
|
extkey_data = b58decode(address_chains[chain])[4:-4]
|
||||||
|
ek = ExtKeyPair()
|
||||||
|
ek.decode(extkey_data)
|
||||||
|
addr0h = ci.encodeSegwitAddress(
|
||||||
|
ci.getPubkeyHash(ek.derive_path("0h").get_pubkey())
|
||||||
|
)
|
||||||
|
assert addr0h == self.expected_addresses[chain]
|
||||||
|
|
||||||
self.swap_clients[0].initialiseWallet(Coins.BTC, raise_errors=True)
|
self.swap_clients[0].initialiseWallet(Coins.BTC, raise_errors=True)
|
||||||
assert self.swap_clients[0].checkWalletSeed(Coins.BTC) is True
|
assert self.swap_clients[0].checkWalletSeed(Coins.BTC) is True
|
||||||
for i in range(1500):
|
for i in range(1500):
|
||||||
@@ -1647,7 +1663,7 @@ class BasicSwapTest(TestFunctions):
|
|||||||
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
|
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
|
||||||
assert addr_info["hdkeypath"] == "m/0h/0h/0h"
|
assert addr_info["hdkeypath"] == "m/0h/0h/0h"
|
||||||
if self.test_coin_from == Coins.BTC:
|
if self.test_coin_from == Coins.BTC:
|
||||||
assert addr == "bcrt1qps7hnjd866e9ynxadgseprkc2l56m00dvwargr"
|
assert addr == self.expected_addresses["external"]
|
||||||
|
|
||||||
addr_change = self.callnoderpc("getrawchangeaddress", wallet=new_wallet_name)
|
addr_change = self.callnoderpc("getrawchangeaddress", wallet=new_wallet_name)
|
||||||
addr_info = self.callnoderpc(
|
addr_info = self.callnoderpc(
|
||||||
@@ -1660,7 +1676,17 @@ class BasicSwapTest(TestFunctions):
|
|||||||
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
|
assert addr_info["hdmasterfingerprint"] == "a55b7ea9"
|
||||||
assert addr_info["hdkeypath"] == "m/0h/1h/0h"
|
assert addr_info["hdkeypath"] == "m/0h/1h/0h"
|
||||||
if self.test_coin_from == Coins.BTC:
|
if self.test_coin_from == Coins.BTC:
|
||||||
assert addr_change == "bcrt1qdl9ryxkqjltv42lhfnqgdjf9tagxsjpp2xak9a"
|
assert addr_change == self.expected_addresses["internal"]
|
||||||
|
|
||||||
|
address_chains = ci.getWalletKeyChains(bytes.fromhex(test_seed))
|
||||||
|
for chain in ["external", "internal"]:
|
||||||
|
extkey_data = b58decode(address_chains[chain])[4:-4]
|
||||||
|
ek = ExtKeyPair()
|
||||||
|
ek.decode(extkey_data)
|
||||||
|
addr0h = ci.encodeSegwitAddress(
|
||||||
|
ci.getPubkeyHash(ek.derive_path("0h").get_pubkey())
|
||||||
|
)
|
||||||
|
assert addr0h == self.expected_addresses[chain]
|
||||||
|
|
||||||
desc_watch = descsum_create(f"addr({addr})")
|
desc_watch = descsum_create(f"addr({addr})")
|
||||||
self.callnoderpc(
|
self.callnoderpc(
|
||||||
|
|||||||
@@ -87,6 +87,7 @@ from tests.basicswap.common import (
|
|||||||
LTC_BASE_RPC_PORT,
|
LTC_BASE_RPC_PORT,
|
||||||
PREFIX_SECRET_KEY_REGTEST,
|
PREFIX_SECRET_KEY_REGTEST,
|
||||||
BTC_USE_DESCRIPTORS,
|
BTC_USE_DESCRIPTORS,
|
||||||
|
BTC_USE_LEGACY_KEY_PATHS,
|
||||||
)
|
)
|
||||||
from basicswap.db_util import (
|
from basicswap.db_util import (
|
||||||
remove_expired_data,
|
remove_expired_data,
|
||||||
@@ -177,6 +178,7 @@ def prepare_swapclient_dir(
|
|||||||
"bindir": cfg.BITCOIN_BINDIR,
|
"bindir": cfg.BITCOIN_BINDIR,
|
||||||
"use_segwit": True,
|
"use_segwit": True,
|
||||||
"use_descriptors": BTC_USE_DESCRIPTORS,
|
"use_descriptors": BTC_USE_DESCRIPTORS,
|
||||||
|
"use_legacy_key_paths": BTC_USE_LEGACY_KEY_PATHS,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"check_progress_seconds": 2,
|
"check_progress_seconds": 2,
|
||||||
|
|||||||
Reference in New Issue
Block a user