Merge pull request #467 from tecnovert/refactor

refactor: simplify getAddressInfo
This commit is contained in:
tecnovert
2026-05-08 18:26:13 +00:00
committed by GitHub
2 changed files with 20 additions and 64 deletions
+11 -17
View File
@@ -939,32 +939,26 @@ class BTCInterface(Secp256k1Interface):
if wm:
info = wm.getAddressInfo(self.coin_type(), address)
if info:
if or_watch_only:
return True
if or_watch_only is False and info["is_watch_only"] is True:
return False
return True
return False
try:
addr_info = self.rpc_wallet("getaddressinfo", [address])
if not or_watch_only:
if addr_info["ismine"]:
return True
else:
if self._use_descriptors:
addr_info = self.rpc_wallet_watch("getaddressinfo", [address])
if addr_info["ismine"] or addr_info["iswatchonly"]:
if addr_info["ismine"]:
return True
if or_watch_only is False:
return False
if addr_info["iswatchonly"]:
return True
if self._use_descriptors:
wo_addr_info = self.rpc_wallet_watch("getaddressinfo", [address])
if wo_addr_info["iswatchonly"]:
return True
except Exception as e:
self._log.debug(f"isAddressMine RPC check failed: {e}")
wm = self.getWalletManager()
if wm:
info = wm.getAddressInfo(self.coin_type(), address)
if info:
if or_watch_only:
return True
return True
return False
def checkAddressMine(self, address: str) -> None:
+9 -47
View File
@@ -4,10 +4,12 @@
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import hashlib
import json
import sqlite3
import threading
import time
from typing import Dict, List, Optional, Tuple
from coincurve import PrivateKey, PublicKey
from .chainparams import Coins
from .contrib.test_framework import segwit_addr
@@ -19,7 +21,7 @@ from .db_wallet import (
WalletTxCache,
WalletWatchOnly,
)
from .util.crypto import hash160
from .util.crypto import hash160, sha256
from .util.extkey import ExtKeyPair
@@ -112,13 +114,11 @@ class WalletManager:
def _deriveAddress(
self, coin_type: Coins, index: int, internal: bool = False
) -> Tuple[str, str, bytes]:
from coincurve import PublicKey
key = self._deriveKey(coin_type, index, internal)
pubkey = PublicKey.from_secret(key).format()
pkh = hash160(pubkey)
address = segwit_addr.encode(self._getHRP(coin_type), 0, pkh)
scripthash = hashlib.sha256(bytes([0x00, 0x14]) + pkh).digest()[::-1].hex()
scripthash = sha256(bytes([0x00, 0x14]) + pkh)[::-1].hex()
return address, scripthash, pubkey
def _syncStateIndices(self, coin_type: Coins, cursor) -> None:
@@ -275,8 +275,6 @@ class WalletManager:
def getAddress(
self, coin_type: Coins, index: int, internal: bool = False
) -> Optional[str]:
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -395,8 +393,6 @@ class WalletManager:
include_watch_only: bool = True,
funded_only: bool = False,
) -> List[str]:
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -426,8 +422,6 @@ class WalletManager:
return []
def getFundedAddresses(self, coin_type: Coins) -> Dict[str, str]:
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -452,8 +446,6 @@ class WalletManager:
return {}
def getExistingInternalAddress(self, coin_type: Coins) -> Optional[str]:
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -517,8 +509,6 @@ class WalletManager:
self._swap_client.closeDB(cursor, commit=False)
def getAddressInfo(self, coin_type: Coins, address: str) -> Optional[dict]:
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -559,8 +549,6 @@ class WalletManager:
return None
def getCachedTotalBalance(self, coin_type: Coins) -> int:
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -700,8 +688,6 @@ class WalletManager:
if not self.isInitialized(coin_type):
return None
import sqlite3
now = int(time.time())
min_cache_time = now - max_cache_age
@@ -744,8 +730,6 @@ class WalletManager:
return None
def hasCachedBalances(self, coin_type: Coins, max_cache_age: int = 120) -> bool:
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -762,8 +746,6 @@ class WalletManager:
def getPrivateKey(self, coin_type: Coins, address: str) -> Optional[bytes]:
if not self.isInitialized(coin_type):
return None
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -788,8 +770,6 @@ class WalletManager:
return None
def getSignableAddresses(self, coin_type: Coins) -> Dict[str, str]:
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -861,10 +841,8 @@ class WalletManager:
label: str = "",
source: str = "import",
) -> bool:
from coincurve import PublicKey as CCPublicKey
try:
pubkey = CCPublicKey.from_secret(private_key).format()
pubkey = PublicKey.from_secret(private_key).format()
if (
segwit_addr.encode(self._getHRP(coin_type), 0, hash160(pubkey))
!= address
@@ -1003,7 +981,7 @@ class WalletManager:
def _b58decode_check(self, s: str) -> bytes:
data = self._b58decode(s)
payload, checksum = data[:-4], data[-4:]
expected = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4]
expected = sha256(sha256(payload))[:4]
if checksum != expected:
raise ValueError("Invalid base58 checksum")
return payload
@@ -1025,7 +1003,7 @@ class WalletManager:
master_key = self._master_keys.get(coin_type)
if master_key is None:
raise ValueError(f"Wallet not initialized for {coin_type}")
return hashlib.sha256(master_key + b"_import_key").digest()
return sha256(master_key + b"_import_key")
def _encryptPrivateKey(self, private_key: bytes, coin_type: Coins) -> bytes:
return bytes(a ^ b for a, b in zip(private_key, self._getXorKey(coin_type)))
@@ -1037,7 +1015,7 @@ class WalletManager:
_, data = segwit_addr.decode(self._getHRP(coin_type), address)
if data is None:
return ""
return hashlib.sha256(bytes([0x00, 0x14]) + bytes(data)).digest()[::-1].hex()
return sha256(bytes([0x00, 0x14]) + bytes(data))[::-1].hex()
def needsMigration(self, coin_type: Coins) -> bool:
cursor = self._swap_client.openDB()
@@ -1193,8 +1171,6 @@ class WalletManager:
self._swap_client.closeDB(cursor, commit=False)
def getSeedID(self, coin_type: Coins) -> Optional[str]:
from basicswap.contrib.test_framework.script import hash160
master_key = self._master_keys.get(coin_type)
if master_key is None:
return None
@@ -1204,16 +1180,12 @@ class WalletManager:
return hash160(ek.encode_p()).hex()
def signMessage(self, coin_type: Coins, address: str, message: str) -> bytes:
from coincurve import PrivateKey
key = self.getPrivateKey(coin_type, address)
if key is None:
raise ValueError(f"Cannot sign: no key for address {address}")
return PrivateKey(key).sign(message.encode("utf-8"))
def signHash(self, coin_type: Coins, address: str, msg_hash: bytes) -> bytes:
from coincurve import PrivateKey
key = self.getPrivateKey(coin_type, address)
if key is None:
raise ValueError(f"Cannot sign: no key for address {address}")
@@ -1222,8 +1194,6 @@ class WalletManager:
def getKeyForAddress(
self, coin_type: Coins, address: str
) -> Optional[Tuple[bytes, bytes]]:
from coincurve import PublicKey
key = self.getPrivateKey(coin_type, address)
if key is None:
return None
@@ -1232,8 +1202,6 @@ class WalletManager:
def findAddressByScripthash(
self, coin_type: Coins, scripthash: str
) -> Optional[str]:
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -1256,8 +1224,6 @@ class WalletManager:
return None
def getAllScripthashes(self, coin_type: Coins) -> List[str]:
import sqlite3
try:
conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor()
@@ -1895,8 +1861,6 @@ class WalletManager:
for _ in existing:
return False
import json
pending = WalletPendingTx()
pending.coin_type = int(coin_type)
pending.txid = txid
@@ -1945,8 +1909,6 @@ class WalletManager:
) -> List[dict]:
cursor = self._swap_client.openDB()
try:
import json
results = self._swap_client.query(
WalletPendingTx,
cursor,