refactor: reduce wallet_manager imports

This commit is contained in:
tecnovert
2026-05-04 19:39:03 +02:00
parent 57a1a6505e
commit c8e7c02fe2
+9 -47
View File
@@ -4,10 +4,12 @@
# Distributed under the MIT software license, see the accompanying # Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php. # file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import hashlib import json
import sqlite3
import threading import threading
import time import time
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from coincurve import PrivateKey, PublicKey
from .chainparams import Coins from .chainparams import Coins
from .contrib.test_framework import segwit_addr from .contrib.test_framework import segwit_addr
@@ -19,7 +21,7 @@ from .db_wallet import (
WalletTxCache, WalletTxCache,
WalletWatchOnly, WalletWatchOnly,
) )
from .util.crypto import hash160 from .util.crypto import hash160, sha256
from .util.extkey import ExtKeyPair from .util.extkey import ExtKeyPair
@@ -112,13 +114,11 @@ class WalletManager:
def _deriveAddress( def _deriveAddress(
self, coin_type: Coins, index: int, internal: bool = False self, coin_type: Coins, index: int, internal: bool = False
) -> Tuple[str, str, bytes]: ) -> Tuple[str, str, bytes]:
from coincurve import PublicKey
key = self._deriveKey(coin_type, index, internal) key = self._deriveKey(coin_type, index, internal)
pubkey = PublicKey.from_secret(key).format() pubkey = PublicKey.from_secret(key).format()
pkh = hash160(pubkey) pkh = hash160(pubkey)
address = segwit_addr.encode(self._getHRP(coin_type), 0, pkh) address = segwit_addr.encode(self._getHRP(coin_type), 0, pkh)
scripthash = hashlib.sha256(bytes([0x00, 0x14]) + pkh).digest()[::-1].hex() scripthash = sha256(bytes([0x00, 0x14]) + pkh)[::-1].hex()
return address, scripthash, pubkey return address, scripthash, pubkey
def _syncStateIndices(self, coin_type: Coins, cursor) -> None: def _syncStateIndices(self, coin_type: Coins, cursor) -> None:
@@ -275,8 +275,6 @@ class WalletManager:
def getAddress( def getAddress(
self, coin_type: Coins, index: int, internal: bool = False self, coin_type: Coins, index: int, internal: bool = False
) -> Optional[str]: ) -> Optional[str]:
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -395,8 +393,6 @@ class WalletManager:
include_watch_only: bool = True, include_watch_only: bool = True,
funded_only: bool = False, funded_only: bool = False,
) -> List[str]: ) -> List[str]:
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -426,8 +422,6 @@ class WalletManager:
return [] return []
def getFundedAddresses(self, coin_type: Coins) -> Dict[str, str]: def getFundedAddresses(self, coin_type: Coins) -> Dict[str, str]:
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -452,8 +446,6 @@ class WalletManager:
return {} return {}
def getExistingInternalAddress(self, coin_type: Coins) -> Optional[str]: def getExistingInternalAddress(self, coin_type: Coins) -> Optional[str]:
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -517,8 +509,6 @@ class WalletManager:
self._swap_client.closeDB(cursor, commit=False) self._swap_client.closeDB(cursor, commit=False)
def getAddressInfo(self, coin_type: Coins, address: str) -> Optional[dict]: def getAddressInfo(self, coin_type: Coins, address: str) -> Optional[dict]:
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -559,8 +549,6 @@ class WalletManager:
return None return None
def getCachedTotalBalance(self, coin_type: Coins) -> int: def getCachedTotalBalance(self, coin_type: Coins) -> int:
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -700,8 +688,6 @@ class WalletManager:
if not self.isInitialized(coin_type): if not self.isInitialized(coin_type):
return None return None
import sqlite3
now = int(time.time()) now = int(time.time())
min_cache_time = now - max_cache_age min_cache_time = now - max_cache_age
@@ -744,8 +730,6 @@ class WalletManager:
return None return None
def hasCachedBalances(self, coin_type: Coins, max_cache_age: int = 120) -> bool: def hasCachedBalances(self, coin_type: Coins, max_cache_age: int = 120) -> bool:
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -762,8 +746,6 @@ class WalletManager:
def getPrivateKey(self, coin_type: Coins, address: str) -> Optional[bytes]: def getPrivateKey(self, coin_type: Coins, address: str) -> Optional[bytes]:
if not self.isInitialized(coin_type): if not self.isInitialized(coin_type):
return None return None
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -788,8 +770,6 @@ class WalletManager:
return None return None
def getSignableAddresses(self, coin_type: Coins) -> Dict[str, str]: def getSignableAddresses(self, coin_type: Coins) -> Dict[str, str]:
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -861,10 +841,8 @@ class WalletManager:
label: str = "", label: str = "",
source: str = "import", source: str = "import",
) -> bool: ) -> bool:
from coincurve import PublicKey as CCPublicKey
try: try:
pubkey = CCPublicKey.from_secret(private_key).format() pubkey = PublicKey.from_secret(private_key).format()
if ( if (
segwit_addr.encode(self._getHRP(coin_type), 0, hash160(pubkey)) segwit_addr.encode(self._getHRP(coin_type), 0, hash160(pubkey))
!= address != address
@@ -1003,7 +981,7 @@ class WalletManager:
def _b58decode_check(self, s: str) -> bytes: def _b58decode_check(self, s: str) -> bytes:
data = self._b58decode(s) data = self._b58decode(s)
payload, checksum = data[:-4], data[-4:] payload, checksum = data[:-4], data[-4:]
expected = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] expected = sha256(sha256(payload))[:4]
if checksum != expected: if checksum != expected:
raise ValueError("Invalid base58 checksum") raise ValueError("Invalid base58 checksum")
return payload return payload
@@ -1025,7 +1003,7 @@ class WalletManager:
master_key = self._master_keys.get(coin_type) master_key = self._master_keys.get(coin_type)
if master_key is None: if master_key is None:
raise ValueError(f"Wallet not initialized for {coin_type}") raise ValueError(f"Wallet not initialized for {coin_type}")
return hashlib.sha256(master_key + b"_import_key").digest() return sha256(master_key + b"_import_key")
def _encryptPrivateKey(self, private_key: bytes, coin_type: Coins) -> bytes: def _encryptPrivateKey(self, private_key: bytes, coin_type: Coins) -> bytes:
return bytes(a ^ b for a, b in zip(private_key, self._getXorKey(coin_type))) return bytes(a ^ b for a, b in zip(private_key, self._getXorKey(coin_type)))
@@ -1037,7 +1015,7 @@ class WalletManager:
_, data = segwit_addr.decode(self._getHRP(coin_type), address) _, data = segwit_addr.decode(self._getHRP(coin_type), address)
if data is None: if data is None:
return "" return ""
return hashlib.sha256(bytes([0x00, 0x14]) + bytes(data)).digest()[::-1].hex() return sha256(bytes([0x00, 0x14]) + bytes(data))[::-1].hex()
def needsMigration(self, coin_type: Coins) -> bool: def needsMigration(self, coin_type: Coins) -> bool:
cursor = self._swap_client.openDB() cursor = self._swap_client.openDB()
@@ -1193,8 +1171,6 @@ class WalletManager:
self._swap_client.closeDB(cursor, commit=False) self._swap_client.closeDB(cursor, commit=False)
def getSeedID(self, coin_type: Coins) -> Optional[str]: def getSeedID(self, coin_type: Coins) -> Optional[str]:
from basicswap.contrib.test_framework.script import hash160
master_key = self._master_keys.get(coin_type) master_key = self._master_keys.get(coin_type)
if master_key is None: if master_key is None:
return None return None
@@ -1204,16 +1180,12 @@ class WalletManager:
return hash160(ek.encode_p()).hex() return hash160(ek.encode_p()).hex()
def signMessage(self, coin_type: Coins, address: str, message: str) -> bytes: def signMessage(self, coin_type: Coins, address: str, message: str) -> bytes:
from coincurve import PrivateKey
key = self.getPrivateKey(coin_type, address) key = self.getPrivateKey(coin_type, address)
if key is None: if key is None:
raise ValueError(f"Cannot sign: no key for address {address}") raise ValueError(f"Cannot sign: no key for address {address}")
return PrivateKey(key).sign(message.encode("utf-8")) return PrivateKey(key).sign(message.encode("utf-8"))
def signHash(self, coin_type: Coins, address: str, msg_hash: bytes) -> bytes: def signHash(self, coin_type: Coins, address: str, msg_hash: bytes) -> bytes:
from coincurve import PrivateKey
key = self.getPrivateKey(coin_type, address) key = self.getPrivateKey(coin_type, address)
if key is None: if key is None:
raise ValueError(f"Cannot sign: no key for address {address}") raise ValueError(f"Cannot sign: no key for address {address}")
@@ -1222,8 +1194,6 @@ class WalletManager:
def getKeyForAddress( def getKeyForAddress(
self, coin_type: Coins, address: str self, coin_type: Coins, address: str
) -> Optional[Tuple[bytes, bytes]]: ) -> Optional[Tuple[bytes, bytes]]:
from coincurve import PublicKey
key = self.getPrivateKey(coin_type, address) key = self.getPrivateKey(coin_type, address)
if key is None: if key is None:
return None return None
@@ -1232,8 +1202,6 @@ class WalletManager:
def findAddressByScripthash( def findAddressByScripthash(
self, coin_type: Coins, scripthash: str self, coin_type: Coins, scripthash: str
) -> Optional[str]: ) -> Optional[str]:
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -1256,8 +1224,6 @@ class WalletManager:
return None return None
def getAllScripthashes(self, coin_type: Coins) -> List[str]: def getAllScripthashes(self, coin_type: Coins) -> List[str]:
import sqlite3
try: try:
conn = sqlite3.connect(self._swap_client.sqlite_file) conn = sqlite3.connect(self._swap_client.sqlite_file)
cursor = conn.cursor() cursor = conn.cursor()
@@ -1895,8 +1861,6 @@ class WalletManager:
for _ in existing: for _ in existing:
return False return False
import json
pending = WalletPendingTx() pending = WalletPendingTx()
pending.coin_type = int(coin_type) pending.coin_type = int(coin_type)
pending.txid = txid pending.txid = txid
@@ -1945,8 +1909,6 @@ class WalletManager:
) -> List[dict]: ) -> List[dict]:
cursor = self._swap_client.openDB() cursor = self._swap_client.openDB()
try: try:
import json
results = self._swap_client.query( results = self._swap_client.query(
WalletPendingTx, WalletPendingTx,
cursor, cursor,