mirror of
https://github.com/basicswap/basicswap.git
synced 2026-04-09 02:47:22 +02:00
753 lines
26 KiB
Python
753 lines
26 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2024-2026 The Basicswap developers
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, List, Optional
|
|
|
|
|
|
class WalletBackend(ABC):
|
|
|
|
@abstractmethod
|
|
def getBalance(self, addresses: List[str]) -> Dict[str, int]:
|
|
pass
|
|
|
|
def findAddressWithBalance(
|
|
self, addresses: List[str], min_balance: int
|
|
) -> Optional[tuple]:
|
|
balances = self.getBalance(addresses)
|
|
for addr, balance in balances.items():
|
|
if balance >= min_balance:
|
|
return (addr, balance)
|
|
return None
|
|
|
|
@abstractmethod
|
|
def getUnspentOutputs(
|
|
self, addresses: List[str], min_confirmations: int = 0
|
|
) -> List[dict]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def broadcastTransaction(self, tx_hex: str) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def getTransaction(self, txid: str) -> Optional[dict]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def getTransactionRaw(self, txid: str) -> Optional[str]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def getBlockHeight(self) -> int:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def estimateFee(self, blocks: int = 6) -> int:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def isConnected(self) -> bool:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def getAddressHistory(self, address: str) -> List[dict]:
|
|
pass
|
|
|
|
|
|
class FullNodeBackend(WalletBackend):
|
|
|
|
def __init__(self, rpc_client, coin_type, log):
|
|
self._rpc = rpc_client
|
|
self._coin_type = coin_type
|
|
self._log = log
|
|
|
|
def getBalance(self, addresses: List[str]) -> Dict[str, int]:
|
|
result = {}
|
|
for addr in addresses:
|
|
result[addr] = 0
|
|
|
|
try:
|
|
utxos = self._rpc("listunspent", [0, 9999999, addresses])
|
|
for utxo in utxos:
|
|
addr = utxo.get("address")
|
|
if addr in result:
|
|
result[addr] += int(utxo.get("amount", 0) * 1e8)
|
|
except Exception as e:
|
|
self._log.warning(f"FullNodeBackend.getBalance error: {e}")
|
|
|
|
return result
|
|
|
|
def getUnspentOutputs(
|
|
self, addresses: List[str], min_confirmations: int = 0
|
|
) -> List[dict]:
|
|
try:
|
|
utxos = self._rpc("listunspent", [min_confirmations, 9999999, addresses])
|
|
result = []
|
|
for utxo in utxos:
|
|
result.append(
|
|
{
|
|
"txid": utxo.get("txid"),
|
|
"vout": utxo.get("vout"),
|
|
"value": int(utxo.get("amount", 0) * 1e8),
|
|
"address": utxo.get("address"),
|
|
"confirmations": utxo.get("confirmations", 0),
|
|
"scriptPubKey": utxo.get("scriptPubKey"),
|
|
}
|
|
)
|
|
return result
|
|
except Exception as e:
|
|
self._log.warning(f"FullNodeBackend.getUnspentOutputs error: {e}")
|
|
return []
|
|
|
|
def broadcastTransaction(self, tx_hex: str) -> str:
|
|
return self._rpc("sendrawtransaction", [tx_hex])
|
|
|
|
def getTransaction(self, txid: str) -> Optional[dict]:
|
|
try:
|
|
return self._rpc("getrawtransaction", [txid, True])
|
|
except Exception:
|
|
return None
|
|
|
|
def getTransactionRaw(self, txid: str) -> Optional[str]:
|
|
try:
|
|
return self._rpc("getrawtransaction", [txid, False])
|
|
except Exception:
|
|
return None
|
|
|
|
def getBlockHeight(self) -> int:
|
|
return self._rpc("getblockcount")
|
|
|
|
def estimateFee(self, blocks: int = 6) -> int:
|
|
try:
|
|
result = self._rpc("estimatesmartfee", [blocks])
|
|
if "feerate" in result:
|
|
return int(result["feerate"] * 1e8 / 1000)
|
|
return 1
|
|
except Exception:
|
|
return 1
|
|
|
|
def isConnected(self) -> bool:
|
|
try:
|
|
self._rpc("getblockchaininfo")
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def getAddressHistory(self, address: str) -> List[dict]:
|
|
return []
|
|
|
|
def importAddress(self, address: str, label: str = "", rescan: bool = False):
|
|
try:
|
|
self._rpc("importaddress", [address, label, rescan])
|
|
except Exception as e:
|
|
if "already in wallet" not in str(e).lower():
|
|
raise
|
|
|
|
|
|
class ElectrumBackend(WalletBackend):
|
|
|
|
def __init__(
|
|
self,
|
|
coin_type,
|
|
log,
|
|
clearnet_servers=None,
|
|
onion_servers=None,
|
|
chain="mainnet",
|
|
proxy_host=None,
|
|
proxy_port=None,
|
|
):
|
|
from basicswap.interface.electrumx import ElectrumServer
|
|
from basicswap.chainparams import Coins, chainparams
|
|
|
|
self._coin_type = coin_type
|
|
self._log = log
|
|
self._subscribed_scripthashes = set()
|
|
|
|
coin_params = chainparams.get(coin_type, chainparams.get(Coins.BTC))
|
|
self._network_params = coin_params.get(chain, coin_params.get("mainnet", {}))
|
|
|
|
coin_name_map = {
|
|
Coins.BTC: "bitcoin",
|
|
Coins.LTC: "litecoin",
|
|
}
|
|
coin_name = coin_name_map.get(coin_type, "bitcoin")
|
|
|
|
self._host = "localhost"
|
|
self._port = 50002
|
|
self._use_ssl = True
|
|
|
|
self._server = ElectrumServer(
|
|
coin_name,
|
|
clearnet_servers=clearnet_servers,
|
|
onion_servers=onion_servers,
|
|
log=log,
|
|
proxy_host=proxy_host,
|
|
proxy_port=proxy_port,
|
|
)
|
|
|
|
self._realtime_callback = None
|
|
self._address_to_scripthash = {}
|
|
|
|
self._cached_height = 0
|
|
self._cached_height_time = 0
|
|
self._height_cache_ttl = 5
|
|
|
|
self._max_batch_size = 10
|
|
self._background_mode = False
|
|
|
|
def setBackgroundMode(self, enabled: bool):
|
|
self._background_mode = enabled
|
|
|
|
def _call(self, method: str, params: list = None, timeout: int = 10):
|
|
if self._background_mode and hasattr(self._server, "call_background"):
|
|
return self._server.call_background(method, params, timeout)
|
|
return self._server.call(method, params, timeout)
|
|
|
|
def _call_batch(self, calls: list, timeout: int = 15):
|
|
if self._background_mode and hasattr(self._server, "call_batch_background"):
|
|
return self._server.call_batch_background(calls, timeout)
|
|
return self._server.call_batch(calls, timeout)
|
|
|
|
def _split_batch_call(
|
|
self, scripthashes: list, method: str, batch_size: int = None
|
|
) -> list:
|
|
if batch_size is None:
|
|
batch_size = self._max_batch_size
|
|
|
|
all_results = []
|
|
for i in range(0, len(scripthashes), batch_size):
|
|
chunk = scripthashes[i : i + batch_size]
|
|
try:
|
|
calls = [(method, [sh]) for sh in chunk]
|
|
results = self._call_batch(calls)
|
|
all_results.extend(results)
|
|
except Exception as e:
|
|
self._log.debug(f"Batch chunk failed ({len(chunk)} items): {e}")
|
|
for sh in chunk:
|
|
try:
|
|
result = self._call(method, [sh])
|
|
all_results.append(result)
|
|
except Exception as e2:
|
|
self._log.debug(f"Individual call failed for {sh[:8]}...: {e2}")
|
|
all_results.append(None)
|
|
return all_results
|
|
|
|
def _isUnsupportedAddress(self, address: str) -> bool:
|
|
if address.startswith("ltcmweb1"):
|
|
return True
|
|
return False
|
|
|
|
def _addressToScripthash(self, address: str) -> str:
|
|
from basicswap.interface.electrumx import scripthash_from_address
|
|
|
|
return scripthash_from_address(address, self._network_params)
|
|
|
|
def getBalance(self, addresses: List[str]) -> Dict[str, int]:
|
|
result = {}
|
|
for addr in addresses:
|
|
result[addr] = 0
|
|
|
|
if not addresses:
|
|
return result
|
|
|
|
addr_list = [addr for addr in addresses if not self._isUnsupportedAddress(addr)]
|
|
if not addr_list:
|
|
return result
|
|
|
|
addr_to_scripthash = {}
|
|
for addr in addr_list:
|
|
try:
|
|
addr_to_scripthash[addr] = self._addressToScripthash(addr)
|
|
except Exception as e:
|
|
self._log.debug(f"getBalance: scripthash error for {addr[:10]}...: {e}")
|
|
|
|
if not addr_to_scripthash:
|
|
return result
|
|
|
|
scripthashes = list(addr_to_scripthash.values())
|
|
scripthash_to_addr = {v: k for k, v in addr_to_scripthash.items()}
|
|
|
|
batch_results = self._split_batch_call(
|
|
scripthashes, "blockchain.scripthash.get_balance"
|
|
)
|
|
|
|
for i, balance in enumerate(batch_results):
|
|
if balance and isinstance(balance, dict):
|
|
addr = scripthash_to_addr.get(scripthashes[i])
|
|
if addr:
|
|
confirmed = balance.get("confirmed", 0)
|
|
unconfirmed = balance.get("unconfirmed", 0)
|
|
result[addr] = confirmed + unconfirmed
|
|
|
|
return result
|
|
|
|
def getDetailedBalance(self, addresses: List[str]) -> Dict[str, dict]:
|
|
result = {}
|
|
for addr in addresses:
|
|
result[addr] = {"confirmed": 0, "unconfirmed": 0}
|
|
|
|
if not addresses:
|
|
return result
|
|
|
|
addr_list = [addr for addr in addresses if not self._isUnsupportedAddress(addr)]
|
|
if not addr_list:
|
|
return result
|
|
|
|
batch_size = 10
|
|
for batch_start in range(0, len(addr_list), batch_size):
|
|
batch = addr_list[batch_start : batch_start + batch_size]
|
|
|
|
addr_to_scripthash = {}
|
|
for addr in batch:
|
|
try:
|
|
addr_to_scripthash[addr] = self._addressToScripthash(addr)
|
|
except Exception as e:
|
|
self._log.debug(
|
|
f"getDetailedBalance: scripthash error for {addr[:10]}...: {e}"
|
|
)
|
|
|
|
if not addr_to_scripthash:
|
|
continue
|
|
|
|
scripthashes = list(addr_to_scripthash.values())
|
|
scripthash_to_addr = {v: k for k, v in addr_to_scripthash.items()}
|
|
batch_success = False
|
|
|
|
for attempt in range(2):
|
|
try:
|
|
batch_results = self._server.get_balance_batch(scripthashes)
|
|
for i, balance in enumerate(batch_results):
|
|
if balance and isinstance(balance, dict):
|
|
addr = scripthash_to_addr.get(scripthashes[i])
|
|
if addr:
|
|
result[addr] = {
|
|
"confirmed": balance.get("confirmed", 0),
|
|
"unconfirmed": balance.get("unconfirmed", 0),
|
|
}
|
|
batch_success = True
|
|
break
|
|
except Exception as e:
|
|
if attempt == 0:
|
|
self._log.debug(
|
|
f"Batch detailed balance query failed, reconnecting: {e}"
|
|
)
|
|
try:
|
|
self._server.disconnect()
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.5)
|
|
else:
|
|
self._log.debug(
|
|
f"Batch detailed balance query failed after retry, falling back: {e}"
|
|
)
|
|
|
|
if not batch_success:
|
|
for addr, scripthash in addr_to_scripthash.items():
|
|
try:
|
|
balance = self._call(
|
|
"blockchain.scripthash.get_balance", [scripthash]
|
|
)
|
|
if balance and isinstance(balance, dict):
|
|
result[addr] = {
|
|
"confirmed": balance.get("confirmed", 0),
|
|
"unconfirmed": balance.get("unconfirmed", 0),
|
|
}
|
|
except Exception as e:
|
|
self._log.debug(
|
|
f"ElectrumBackend.getDetailedBalance error for {addr[:10]}...: {e}"
|
|
)
|
|
|
|
return result
|
|
|
|
def findAddressWithBalance(
|
|
self, addresses: List[str], min_balance: int
|
|
) -> Optional[tuple]:
|
|
if not addresses:
|
|
return None
|
|
|
|
addr_list = [addr for addr in addresses if not self._isUnsupportedAddress(addr)]
|
|
if not addr_list:
|
|
return None
|
|
|
|
batch_size = 50
|
|
for batch_start in range(0, len(addr_list), batch_size):
|
|
batch = addr_list[batch_start : batch_start + batch_size]
|
|
|
|
addr_to_scripthash = {}
|
|
for addr in batch:
|
|
try:
|
|
addr_to_scripthash[addr] = self._addressToScripthash(addr)
|
|
except Exception:
|
|
continue
|
|
|
|
if not addr_to_scripthash:
|
|
continue
|
|
|
|
try:
|
|
scripthashes = list(addr_to_scripthash.values())
|
|
batch_results = self._server.get_balance_batch(scripthashes)
|
|
scripthash_to_addr = {v: k for k, v in addr_to_scripthash.items()}
|
|
|
|
for i, balance in enumerate(batch_results):
|
|
if balance and isinstance(balance, dict):
|
|
confirmed = balance.get("confirmed", 0)
|
|
unconfirmed = balance.get("unconfirmed", 0)
|
|
total = confirmed + unconfirmed
|
|
if total >= min_balance:
|
|
addr = scripthash_to_addr.get(scripthashes[i])
|
|
if addr:
|
|
return (addr, total)
|
|
except Exception as e:
|
|
self._log.debug(f"findAddressWithBalance batch error: {e}")
|
|
|
|
return None
|
|
|
|
def getUnspentOutputs(
|
|
self, addresses: List[str], min_confirmations: int = 0
|
|
) -> List[dict]:
|
|
result = []
|
|
if not addresses:
|
|
return result
|
|
|
|
try:
|
|
current_height = self.getBlockHeight()
|
|
|
|
for addr in addresses:
|
|
if self._isUnsupportedAddress(addr):
|
|
continue
|
|
try:
|
|
scripthash = self._addressToScripthash(addr)
|
|
utxos = self._call(
|
|
"blockchain.scripthash.listunspent", [scripthash]
|
|
)
|
|
if utxos:
|
|
for utxo in utxos:
|
|
height = utxo.get("height", 0)
|
|
if height <= 0:
|
|
confirmations = 0
|
|
else:
|
|
confirmations = current_height - height + 1
|
|
|
|
if confirmations >= min_confirmations:
|
|
result.append(
|
|
{
|
|
"txid": utxo.get("tx_hash"),
|
|
"vout": utxo.get("tx_pos"),
|
|
"value": utxo.get("value", 0),
|
|
"address": addr,
|
|
"confirmations": confirmations,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
self._log.debug(
|
|
f"ElectrumBackend.getUnspentOutputs error for {addr[:10]}...: {e}"
|
|
)
|
|
except Exception as e:
|
|
self._log.warning(f"ElectrumBackend.getUnspentOutputs error: {e}")
|
|
|
|
return result
|
|
|
|
def broadcastTransaction(self, tx_hex: str) -> str:
|
|
import time
|
|
|
|
max_retries = 3
|
|
retry_delay = 0.5
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
result = self._server.call("blockchain.transaction.broadcast", [tx_hex])
|
|
if result:
|
|
return result
|
|
except Exception as e:
|
|
error_msg = str(e).lower()
|
|
if any(
|
|
pattern in error_msg
|
|
for pattern in [
|
|
"missing inputs",
|
|
"bad-txns",
|
|
"txn-mempool-conflict",
|
|
"already in block chain",
|
|
"transaction already exists",
|
|
"insufficient fee",
|
|
"dust",
|
|
]
|
|
):
|
|
raise
|
|
if attempt < max_retries - 1:
|
|
self._log.debug(
|
|
f"broadcastTransaction retry {attempt + 1}/{max_retries}: {e}"
|
|
)
|
|
time.sleep(retry_delay * (2**attempt)) # Exponential backoff
|
|
continue
|
|
raise
|
|
return None
|
|
|
|
def getTransaction(self, txid: str) -> Optional[dict]:
|
|
try:
|
|
return self._call("blockchain.transaction.get", [txid, True])
|
|
except Exception:
|
|
return None
|
|
|
|
def getTransactionRaw(self, txid: str) -> Optional[str]:
|
|
try:
|
|
tx_hex = self._call("blockchain.transaction.get", [txid, False])
|
|
return tx_hex
|
|
except Exception as e:
|
|
self._log.warning(f"getTransactionRaw failed for {txid[:16]}...: {e}")
|
|
return None
|
|
|
|
def getTransactionBatch(self, txids: List[str]) -> Dict[str, Optional[dict]]:
|
|
result = {}
|
|
if not txids:
|
|
return result
|
|
|
|
try:
|
|
calls = [("blockchain.transaction.get", [txid, True]) for txid in txids]
|
|
responses = self._call_batch(calls)
|
|
for txid, tx_info in zip(txids, responses):
|
|
result[txid] = tx_info if tx_info else None
|
|
except Exception as e:
|
|
self._log.debug(f"getTransactionBatch error: {e}")
|
|
for txid in txids:
|
|
result[txid] = self.getTransaction(txid)
|
|
|
|
return result
|
|
|
|
def getTransactionBatchRaw(self, txids: List[str]) -> Dict[str, Optional[str]]:
|
|
result = {}
|
|
if not txids:
|
|
return result
|
|
|
|
try:
|
|
calls = [("blockchain.transaction.get", [txid, False]) for txid in txids]
|
|
responses = self._call_batch(calls)
|
|
for txid, tx_hex in zip(txids, responses):
|
|
result[txid] = tx_hex if tx_hex else None
|
|
except Exception as e:
|
|
self._log.debug(f"getTransactionBatchRaw error: {e}")
|
|
for txid in txids:
|
|
result[txid] = self.getTransactionRaw(txid)
|
|
|
|
return result
|
|
|
|
def getBlockHeight(self) -> int:
|
|
import time
|
|
|
|
if hasattr(self._server, "get_subscribed_height"):
|
|
subscribed_height = self._server.get_subscribed_height()
|
|
if subscribed_height > 0:
|
|
if subscribed_height > self._cached_height:
|
|
self._cached_height = subscribed_height
|
|
self._cached_height_time = time.time()
|
|
return subscribed_height
|
|
|
|
now = time.time()
|
|
if (
|
|
self._cached_height > 0
|
|
and (now - self._cached_height_time) < self._height_cache_ttl
|
|
):
|
|
return self._cached_height
|
|
|
|
try:
|
|
header = self._call("blockchain.headers.subscribe", [])
|
|
if header:
|
|
height = header.get("height", 0)
|
|
if height > 0:
|
|
self._cached_height = height
|
|
self._cached_height_time = now
|
|
return height
|
|
return self._cached_height if self._cached_height > 0 else 0
|
|
except Exception:
|
|
return self._cached_height if self._cached_height > 0 else 0
|
|
|
|
def estimateFee(self, blocks: int = 6) -> int:
|
|
try:
|
|
fee = self._call("blockchain.estimatefee", [blocks])
|
|
if fee and fee > 0:
|
|
return int(fee * 1e8 / 1000)
|
|
return 1
|
|
except Exception:
|
|
return 1
|
|
|
|
def isConnected(self) -> bool:
|
|
try:
|
|
self._call("server.ping", [])
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def getServerVersion(self) -> str:
|
|
version = self._server.get_server_version()
|
|
if not version:
|
|
try:
|
|
self._call("server.ping", [])
|
|
version = self._server.get_server_version()
|
|
except Exception:
|
|
pass
|
|
return version or "electrum"
|
|
|
|
def getServerHost(self) -> str:
|
|
host, port = self._server.get_current_server()
|
|
if host and port:
|
|
return f"{host}:{port}"
|
|
return f"{self._host}:{self._port}"
|
|
|
|
def getConnectionStatus(self) -> dict:
|
|
if hasattr(self._server, "getConnectionStatus"):
|
|
status = self._server.getConnectionStatus()
|
|
else:
|
|
status = {
|
|
"connected": self.isConnected(),
|
|
"failures": 0,
|
|
"last_error": None,
|
|
"all_failed": False,
|
|
"using_defaults": True,
|
|
"server_count": 1,
|
|
}
|
|
status["server"] = self.getServerHost()
|
|
status["version"] = self.getServerVersion()
|
|
return status
|
|
|
|
def getAddressHistory(self, address: str) -> List[dict]:
|
|
if self._isUnsupportedAddress(address):
|
|
return []
|
|
try:
|
|
scripthash = self._addressToScripthash(address)
|
|
history = self._call("blockchain.scripthash.get_history", [scripthash])
|
|
if history:
|
|
return [
|
|
{"txid": h.get("tx_hash"), "height": h.get("height", 0)}
|
|
for h in history
|
|
]
|
|
return []
|
|
except Exception:
|
|
return []
|
|
|
|
def getAddressHistoryBackground(self, address: str) -> List[dict]:
|
|
if self._isUnsupportedAddress(address):
|
|
return []
|
|
try:
|
|
scripthash = self._addressToScripthash(address)
|
|
history = self._server.call_background(
|
|
"blockchain.scripthash.get_history", [scripthash]
|
|
)
|
|
if history:
|
|
return [
|
|
{"txid": h.get("tx_hash"), "height": h.get("height", 0)}
|
|
for h in history
|
|
]
|
|
return []
|
|
except Exception:
|
|
return []
|
|
|
|
def getBatchBalance(self, scripthashes: List[str]) -> Dict[str, int]:
|
|
result = {}
|
|
for sh in scripthashes:
|
|
result[sh] = 0
|
|
|
|
try:
|
|
calls = [("blockchain.scripthash.get_balance", [sh]) for sh in scripthashes]
|
|
responses = self._call_batch(calls)
|
|
for sh, balance in zip(scripthashes, responses):
|
|
if balance:
|
|
confirmed = balance.get("confirmed", 0)
|
|
unconfirmed = balance.get("unconfirmed", 0)
|
|
result[sh] = confirmed + unconfirmed
|
|
except Exception as e:
|
|
self._log.warning(f"ElectrumBackend.getBatchBalance error: {e}")
|
|
|
|
return result
|
|
|
|
def getBatchUnspent(
|
|
self, scripthashes: List[str], min_confirmations: int = 0
|
|
) -> Dict[str, List[dict]]:
|
|
result = {}
|
|
for sh in scripthashes:
|
|
result[sh] = []
|
|
|
|
try:
|
|
current_height = self.getBlockHeight()
|
|
|
|
calls = [("blockchain.scripthash.listunspent", [sh]) for sh in scripthashes]
|
|
responses = self._call_batch(calls)
|
|
for sh, utxos in zip(scripthashes, responses):
|
|
if utxos:
|
|
for utxo in utxos:
|
|
height = utxo.get("height", 0)
|
|
if height <= 0:
|
|
confirmations = 0
|
|
else:
|
|
confirmations = current_height - height + 1
|
|
|
|
if confirmations >= min_confirmations:
|
|
result[sh].append(
|
|
{
|
|
"txid": utxo.get("tx_hash"),
|
|
"vout": utxo.get("tx_pos"),
|
|
"value": utxo.get("value", 0),
|
|
"confirmations": confirmations,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
self._log.warning(f"ElectrumBackend.getBatchUnspent error: {e}")
|
|
|
|
return result
|
|
|
|
def enableRealtimeNotifications(self, callback) -> None:
|
|
self._realtime_callback = callback
|
|
self._server.enable_realtime_notifications()
|
|
self._log.info(f"Real-time notifications enabled for {self._coin_type}")
|
|
|
|
def _create_scripthash_callback(self, scripthash):
|
|
|
|
def callback(sh, new_status):
|
|
self._handle_scripthash_notification(sh, new_status)
|
|
|
|
return callback
|
|
|
|
def _handle_scripthash_notification(self, scripthash, new_status):
|
|
if not self._realtime_callback:
|
|
return
|
|
|
|
address = None
|
|
for addr, sh in self._address_to_scripthash.items():
|
|
if sh == scripthash:
|
|
address = addr
|
|
break
|
|
|
|
try:
|
|
self._realtime_callback(
|
|
self._coin_type, address, scripthash, "balance_change"
|
|
)
|
|
except Exception as e:
|
|
self._log.debug(f"Error in realtime callback: {e}")
|
|
|
|
def subscribeAddressWithCallback(self, address: str) -> str:
|
|
if self._isUnsupportedAddress(address):
|
|
return None
|
|
|
|
try:
|
|
scripthash = self._addressToScripthash(address)
|
|
self._address_to_scripthash[address] = scripthash
|
|
|
|
if self._realtime_callback:
|
|
status = self._server.subscribe_with_callback(
|
|
scripthash, self._create_scripthash_callback(scripthash)
|
|
)
|
|
else:
|
|
status = self._call("blockchain.scripthash.subscribe", [scripthash])
|
|
|
|
self._subscribed_scripthashes.add(scripthash)
|
|
return status
|
|
except Exception as e:
|
|
self._log.debug(f"Failed to subscribe to {address}: {e}")
|
|
return None
|
|
|
|
def getServer(self):
|
|
return self._server
|