Files
basicswap/basicswap/ui/page_wallet.py

593 lines
24 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2022-2023 tecnovert
# Copyright (c) 2024 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import traceback
from .util import (
get_data_entry,
have_data_entry,
checkAddressesOwned,
)
from basicswap.util import (
ensure,
format_timestamp,
)
from basicswap.chainparams import (
Coins,
getCoinIdFromTicker,
)
# Todo: Move at JS
DONATION_ADDRESSES = {
"XMR": "8BuQsYBNdfhfoWsvVR1unE7YuZEoTkC4hANaPm2fD6VR5VM2DzQoJhq2CHHXUN1UCWQfH3dctJgorSRxksVa5U4RNTJkcAc",
"LTC": "ltc1qevlumv48nz2afl0re9ml4tdewc56svxq3egkyt",
"LTC_MWEB": "ltcmweb1qqt9rwznnxzkghv4s5wgtwxs0m0ry6n3atp95f47slppapxljde3xyqmdlnrc8ag7y2k354jzdc4pc4ks0kr43jehr77lngdecgh6689nn5mgv5yn",
"BTC": "bc1q72j07vkn059xnmsrkk8x9up9lgvd9h9xjf8cq8",
"PART": "pw1qf59ef0zjdckldjs8smfhv4j04gsjv302w7pdpz",
}
def format_wallet_data(swap_client, ci, w):
coin_id = ci.coin_type()
connection_type = swap_client.coin_clients.get(coin_id, {}).get(
"connection_type", w.get("connection_type", "rpc")
)
wf = {
"name": ci.coin_name(),
"version": w.get("version", "?"),
"ticker": ci.ticker_mainnet(),
"cid": str(int(coin_id)),
"balance": w.get("balance", "?"),
"blocks": w.get("blocks", "?"),
"synced": w.get("synced", "?"),
"expected_seed": w.get("expected_seed", "?"),
"encrypted": w.get("encrypted", "?"),
"locked": w.get("locked", "?"),
"updating": w.get("updating", "?"),
"havedata": True,
"connection_type": connection_type,
}
if "wallet_blocks" in w:
wf["wallet_blocks"] = w["wallet_blocks"]
if w.get("bootstrapping", False) is True:
wf["bootstrapping"] = True
if "known_block_count" in w:
wf["known_block_count"] = w["known_block_count"]
if "locked_utxos" in w:
wf["locked_utxos"] = w["locked_utxos"]
if "balance" in w and "unconfirmed" in w:
wf["balance_all"] = float(w["balance"]) + float(w["unconfirmed"])
if "lastupdated" in w:
wf["lastupdated"] = format_timestamp(w["lastupdated"])
pending: int = 0
if "unconfirmed" in w and float(w["unconfirmed"]) > 0.0:
pending += ci.make_int(w["unconfirmed"])
if "immature" in w and float(w["immature"]) > 0.0:
pending += ci.make_int(w["immature"])
if pending > 0.0:
wf["pending"] = ci.format_amount(pending)
if "unconfirmed" in w and float(w["unconfirmed"]) < 0.0:
wf["pending_out"] = ci.format_amount(abs(ci.make_int(w["unconfirmed"])))
if ci.coin_type() == Coins.PART:
wf["stealth_address"] = w.get("stealth_address", "?")
wf["blind_balance"] = w.get("blind_balance", "?")
if "blind_unconfirmed" in w and float(w["blind_unconfirmed"]) > 0.0:
wf["blind_unconfirmed"] = w["blind_unconfirmed"]
wf["anon_balance"] = w.get("anon_balance", "?")
if "anon_pending" in w and float(w["anon_pending"]) > 0.0:
wf["anon_pending"] = w["anon_pending"]
elif ci.coin_type() == Coins.LTC:
wf["mweb_address"] = w.get("mweb_address", "?")
wf["mweb_balance"] = w.get("mweb_balance", "?")
wf["mweb_pending"] = w.get("mweb_pending", "?")
elif ci.coin_type() == Coins.FIRO:
wf["spark_address"] = w.get("spark_address", "?")
wf["spark_balance"] = w.get("spark_balance", "?")
wf["spark_pending"] = w.get("spark_pending", "?")
if hasattr(ci, "getScanStatus"):
wf["scan_status"] = ci.getScanStatus()
if connection_type == "electrum" and hasattr(ci, "_backend") and ci._backend:
backend = ci._backend
wf["electrum_server"] = backend.getServerHost()
wf["electrum_version"] = backend.getServerVersion()
try:
conn_status = backend.getConnectionStatus()
wf["electrum_connected"] = conn_status.get("connected", False)
wf["electrum_failures"] = conn_status.get("failures", 0)
wf["electrum_using_defaults"] = conn_status.get("using_defaults", True)
wf["electrum_all_failed"] = conn_status.get("all_failed", False)
wf["electrum_last_error"] = conn_status.get("last_error")
if conn_status.get("connected"):
wf["electrum_status"] = "connected"
elif conn_status.get("all_failed"):
wf["electrum_status"] = "all_failed"
else:
wf["electrum_status"] = "disconnected"
except Exception:
wf["electrum_connected"] = False
wf["electrum_status"] = "error"
try:
sync_status = backend.getSyncStatus()
wf["electrum_synced"] = sync_status.get("synced", False)
wf["electrum_height"] = sync_status.get("height", 0)
except Exception:
wf["electrum_synced"] = False
wf["electrum_height"] = 0
checkAddressesOwned(swap_client, ci, wf)
return wf
def format_transactions(ci, transactions, coin_id):
formatted_txs = []
if coin_id in (Coins.XMR, Coins.WOW):
for tx in transactions:
tx_type = tx.get("type", "")
direction = (
"Incoming"
if tx_type == "in"
else "Outgoing" if tx_type == "out" else tx_type.capitalize()
)
formatted_txs.append(
{
"txid": tx.get("txid", ""),
"type": direction,
"amount": ci.format_amount(tx.get("amount", 0)),
"confirmations": tx.get("confirmations", 0),
"timestamp": format_timestamp(tx.get("timestamp", 0)),
"height": tx.get("height", 0),
}
)
else:
for tx in transactions:
category = tx.get("category", "")
if category == "send":
direction = "Outgoing"
amount = abs(tx.get("amount", 0))
elif category == "receive":
direction = "Incoming"
amount = tx.get("amount", 0)
else:
direction = category.capitalize()
amount = abs(tx.get("amount", 0))
formatted_txs.append(
{
"txid": tx.get("txid", ""),
"type": direction,
"amount": ci.format_amount(ci.make_int(amount)),
"confirmations": tx.get("confirmations", 0),
"timestamp": format_timestamp(
tx.get("time", tx.get("timereceived", 0))
),
"address": tx.get("address", ""),
}
)
return formatted_txs
def page_wallets(self, url_split, post_string):
server = self.server
swap_client = server.swap_client
swap_client.checkSystemStatus()
summary = swap_client.getSummary()
messages = []
err_messages = []
swap_client.updateWalletsInfo()
wallets = swap_client.getCachedWalletsInfo()
wallets_formatted = []
sk = sorted(wallets.keys())
for k in sk:
w = wallets[k]
if "error" in w:
wallets_formatted.append({"cid": str(int(k)), "error": w["error"]})
continue
if "no_data" in w:
wallets_formatted.append(
{
"name": w["name"],
"havedata": False,
"updating": w["updating"],
}
)
continue
ci = swap_client.ci(k)
wf = format_wallet_data(swap_client, ci, w)
wallets_formatted.append(wf)
template = server.env.get_template("wallets.html")
return self.render_template(
template,
{
"messages": messages,
"err_messages": err_messages,
"wallets": wallets_formatted,
"summary": summary,
"use_tor": getattr(swap_client, "use_tor_proxy", False),
},
)
def page_wallet(self, url_split, post_string):
ensure(len(url_split) > 2, "Wallet not specified")
wallet_ticker = url_split[2]
server = self.server
swap_client = server.swap_client
swap_client.checkSystemStatus()
summary = swap_client.getSummary()
coin_id = getCoinIdFromTicker(wallet_ticker)
page_data = {}
messages = []
err_messages = []
show_utxo_groups: bool = False
withdrawal_successful: bool = False
force_refresh: bool = False
tx_filters = {
"page_no": 1,
"limit": 30,
"offset": 0,
}
form_data = self.checkForm(post_string, "wallet", err_messages)
if form_data:
if have_data_entry(form_data, "pageback"):
tx_filters["page_no"] = int(form_data[b"pageno"][0]) - 1
if tx_filters["page_no"] < 1:
tx_filters["page_no"] = 1
elif have_data_entry(form_data, "pageforwards"):
tx_filters["page_no"] = int(form_data[b"pageno"][0]) + 1
if tx_filters["page_no"] > 1:
tx_filters["offset"] = (tx_filters["page_no"] - 1) * 30
cid = str(int(coin_id))
estimate_fee: bool = have_data_entry(form_data, "estfee_" + cid)
withdraw: bool = have_data_entry(form_data, "withdraw_" + cid)
if have_data_entry(form_data, "newaddr_" + cid):
swap_client.cacheNewAddressForCoin(coin_id)
elif have_data_entry(form_data, "forcerefresh"):
force_refresh = True
elif have_data_entry(form_data, "newmwebaddr_" + cid):
swap_client.cacheNewStealthAddressForCoin(coin_id)
elif have_data_entry(form_data, "newsparkaddr_" + cid):
swap_client.cacheNewStealthAddressForCoin(coin_id)
elif have_data_entry(form_data, "reseed_" + cid):
try:
swap_client.reseedWallet(coin_id)
messages.append("Reseed complete " + str(coin_id))
except Exception as ex:
err_messages.append("Reseed failed " + str(ex))
swap_client.updateWalletsInfo(True, coin_id)
elif have_data_entry(form_data, "importkey_" + cid):
try:
wif_key = form_data[bytes("wifkey_" + cid, "utf-8")][0].decode("utf-8")
if wif_key:
result = swap_client.importWIFKey(coin_id, wif_key)
if result.get("success"):
messages.append(
f"Imported key for address: {result['address']}"
)
else:
err_messages.append(f"Import failed: {result.get('error')}")
else:
err_messages.append("Missing WIF key")
except Exception as ex:
err_messages.append(f"Import failed: {ex}")
swap_client.updateWalletsInfo(True, coin_id)
elif withdraw or estimate_fee:
subfee = True if have_data_entry(form_data, "subfee_" + cid) else False
page_data["wd_subfee_" + cid] = subfee
sweepall = True if have_data_entry(form_data, "sweepall_" + cid) else False
page_data["wd_sweepall_" + cid] = sweepall
value = None
if not sweepall:
try:
value = form_data[bytes("amt_" + cid, "utf-8")][0].decode("utf-8")
page_data["wd_value_" + cid] = value
except Exception as e: # noqa: F841
err_messages.append("Missing value")
try:
address = form_data[bytes("to_" + cid, "utf-8")][0].decode("utf-8")
page_data["wd_address_" + cid] = address
except Exception as e: # noqa: F841
err_messages.append("Missing address")
if estimate_fee and withdraw:
err_messages.append("Estimate fee and withdraw can't be used together.")
if estimate_fee and coin_id not in (Coins.XMR, Coins.WOW):
ci = swap_client.ci(coin_id)
ticker: str = ci.ticker()
err_messages.append(f"Estimate fee unavailable for {ticker}.")
if coin_id == Coins.PART:
try:
type_from = form_data[bytes("withdraw_type_from_" + cid, "utf-8")][
0
].decode("utf-8")
type_to = form_data[bytes("withdraw_type_to_" + cid, "utf-8")][
0
].decode("utf-8")
page_data["wd_type_from_" + cid] = type_from
page_data["wd_type_to_" + cid] = type_to
except Exception as e: # noqa: F841
err_messages.append("Missing type")
elif coin_id in (Coins.LTC, Coins.FIRO):
try:
type_from = form_data[bytes("withdraw_type_from_" + cid, "utf-8")][
0
].decode("utf-8")
page_data["wd_type_from_" + cid] = type_from
except Exception as e: # noqa: F841
if (
swap_client.coin_clients[coin_id].get("connection_type")
== "electrum"
):
type_from = "plain"
page_data["wd_type_from_" + cid] = type_from
else:
err_messages.append("Missing type")
if len(err_messages) == 0:
ci = swap_client.ci(coin_id)
ticker: str = ci.ticker()
try:
if coin_id == Coins.PART:
txid = swap_client.withdrawParticl(
type_from, type_to, value, address, subfee
)
messages.append(
"Withdrew {} {} ({} to {}) to address {}<br/>In txid: {}".format(
value, ticker, type_from, type_to, address, txid
)
)
elif coin_id in (Coins.LTC, Coins.FIRO):
txid = swap_client.withdrawCoinExtended(
coin_id, type_from, value, address, subfee
)
messages.append(
"Withdrew {} {} (from {}) to address {}<br/>In txid: {}".format(
value, ticker, type_from, address, txid
)
)
elif coin_id in (Coins.XMR, Coins.WOW):
if estimate_fee:
fee_estimate = ci.estimateFee(value, address, sweepall)
suffix = "s" if fee_estimate["num_txns"] > 1 else ""
sum_fees = ci.format_amount(fee_estimate["sum_fee"])
value_str = ci.format_amount(fee_estimate["sum_amount"])
messages.append(
f'Estimated fee for {value_str} {ticker} to address {address}: {sum_fees} in {fee_estimate["num_txns"]} transaction{suffix}.'
)
page_data["fee_estimate"] = fee_estimate
else:
txid = swap_client.withdrawCoin(
coin_id, value, address, sweepall
)
if sweepall:
messages.append(
"Swept all {} to address {}<br/>In txid: {}".format(
ticker, address, txid
)
)
else:
messages.append(
"Withdrew {} {} to address {}<br/>In txid: {}".format(
value, ticker, address, txid
)
)
messages.append(
"Note: The wallet balance can take a while to update."
)
else:
txid = swap_client.withdrawCoin(coin_id, value, address, subfee)
messages.append(
"Withdrew {} {} to address {}<br/>In txid: {}".format(
value, ticker, address, txid
)
)
if not estimate_fee:
withdrawal_successful = True
except Exception as e:
if swap_client.debug is True:
swap_client.log.error(traceback.format_exc())
err_messages.append(str(e))
if not estimate_fee:
swap_client.updateWalletsInfo(True, only_coin=coin_id)
elif have_data_entry(form_data, "showutxogroups"):
show_utxo_groups = True
elif have_data_entry(form_data, "create_utxo"):
show_utxo_groups = True
try:
value = get_data_entry(form_data, "utxo_value")
page_data["utxo_value"] = value
ci = swap_client.ci(coin_id)
value_sats = ci.make_int(value)
txid, address = ci.createUTXO(value_sats)
messages.append(
"Created new utxo of value {} and address {}<br/>In txid: {}".format(
value, address, txid
)
)
except Exception as e:
err_messages.append(str(e))
if swap_client.debug is True:
swap_client.log.error(traceback.format_exc())
is_electrum_mode = (
swap_client.coin_clients.get(coin_id, {}).get("connection_type") == "electrum"
)
swap_client.updateWalletsInfo(
force_refresh, only_coin=coin_id, wait_for_complete=not is_electrum_mode
)
wallets = swap_client.getCachedWalletsInfo({"coin_id": coin_id})
wallet_data = {}
for k in wallets.keys():
w = wallets[k]
if "error" in w:
wallet_data = {"cid": str(int(k)), "error": w["error"]}
continue
if "no_data" in w:
wallet_data = {
"name": w["name"],
"havedata": False,
"updating": w["updating"],
}
continue
ci = swap_client.ci(k)
cid = str(int(coin_id))
wallet_data = format_wallet_data(swap_client, ci, w)
wallet_data["is_electrum_mode"] = (
getattr(ci, "_connection_type", "rpc") == "electrum"
)
fee_rate, fee_src = swap_client.getFeeRateForCoin(k)
est_fee = swap_client.estimateWithdrawFee(k, fee_rate)
wallet_data["fee_rate"] = ci.format_amount(int(fee_rate * ci.COIN()))
wallet_data["fee_rate_src"] = fee_src
wallet_data["est_fee"] = (
"Unknown" if est_fee is None else ci.format_amount(int(est_fee * ci.COIN()))
)
wallet_data["deposit_address"] = w.get("deposit_address", "Refresh necessary")
if k in (Coins.XMR, Coins.WOW):
wallet_data["main_address"] = w.get("main_address", "Refresh necessary")
elif k == Coins.LTC:
wallet_data["mweb_address"] = w.get("mweb_address", "Refresh necessary")
elif k == Coins.FIRO:
wallet_data["spark_address"] = w.get("spark_address", "Refresh necessary")
if "wd_type_from_" + cid in page_data:
wallet_data["wd_type_from"] = page_data["wd_type_from_" + cid]
if "wd_type_to_" + cid in page_data:
wallet_data["wd_type_to"] = page_data["wd_type_to_" + cid]
if "utxo_value" in page_data:
wallet_data["utxo_value"] = page_data["utxo_value"]
if not withdrawal_successful:
if "wd_value_" + cid in page_data:
wallet_data["wd_value"] = page_data["wd_value_" + cid]
if "wd_address_" + cid in page_data:
wallet_data["wd_address"] = page_data["wd_address_" + cid]
if "wd_subfee_" + cid in page_data:
wallet_data["wd_subfee"] = page_data["wd_subfee_" + cid]
if "wd_sweepall_" + cid in page_data:
wallet_data["wd_sweepall"] = page_data["wd_sweepall_" + cid]
if "fee_estimate" in page_data:
wallet_data["est_fee"] = ci.format_amount(
page_data["fee_estimate"]["sum_fee"]
)
wallet_data["fee_rate"] = ci.format_amount(
page_data["fee_estimate"]["sum_fee"]
* 1000
// page_data["fee_estimate"]["sum_weight"]
)
if show_utxo_groups:
utxo_groups = ""
unspent_by_addr = ci.getUnspentsByAddr()
sorted_unspent_by_addr = sorted(
unspent_by_addr.items(), key=lambda x: x[1], reverse=True
)
for kv in sorted_unspent_by_addr:
utxo_groups += kv[0] + " " + ci.format_amount(kv[1]) + "\n"
wallet_data["show_utxo_groups"] = True
wallet_data["utxo_groups"] = utxo_groups
checkAddressesOwned(swap_client, ci, wallet_data)
donation_info = None
ticker = wallet_data.get("ticker", "").upper()
if ticker == "LTC":
donation_info = {
"address": DONATION_ADDRESSES["LTC"],
"coin_name": wallet_data.get("name", "Litecoin"),
"mweb_address": DONATION_ADDRESSES["LTC_MWEB"],
}
elif ticker in DONATION_ADDRESSES:
donation_info = {
"address": DONATION_ADDRESSES[ticker],
"coin_name": wallet_data.get("name", ticker),
}
transactions = []
total_transactions = 0
is_electrum_mode = False
legacy_funds_info = None
if wallet_data.get("havedata", False) and not wallet_data.get("error"):
try:
ci = swap_client.ci(coin_id)
is_electrum_mode = getattr(ci, "_connection_type", "rpc") == "electrum"
if not is_electrum_mode:
count = tx_filters.get("limit", 30)
skip = tx_filters.get("offset", 0)
all_txs = ci.listWalletTransactions(count=10000, skip=0)
all_txs = list(reversed(all_txs)) if all_txs else []
total_transactions = len(all_txs)
raw_txs = all_txs[skip : skip + count] if all_txs else []
transactions = format_transactions(ci, raw_txs, coin_id)
else:
if coin_id in (Coins.BTC, Coins.LTC):
legacy_funds_info = swap_client.getElectrumLegacyFundsInfo(coin_id)
except Exception as e:
swap_client.log.warning(f"Failed to fetch transactions for {ticker}: {e}")
template = server.env.get_template("wallet.html")
return self.render_template(
template,
{
"messages": messages,
"err_messages": err_messages,
"w": wallet_data,
"summary": summary,
"block_unknown_seeds": swap_client._restrict_unknown_seed_wallets,
"donation_info": donation_info,
"debug_ui": swap_client.debug_ui,
"transactions": transactions,
"tx_page_no": tx_filters.get("page_no", 1),
"tx_total": total_transactions,
"tx_limit": tx_filters.get("limit", 30),
"is_electrum_mode": is_electrum_mode,
"legacy_funds_info": legacy_funds_info,
"use_tor": getattr(swap_client, "use_tor_proxy", False),
},
)