Files
basicswap/basicswap/protocols/xmr_swap_1.py
tecnovert e1bca8e384 backports
2025-12-25 13:18:48 +02:00

243 lines
8.6 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2020-2024 tecnovert
# Copyright (c) 2024-2025 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import traceback
from basicswap.util import (
ensure,
)
from basicswap.interface.base import Curves
from basicswap.chainparams import (
Coins,
)
from basicswap.basicswap_util import (
EventLogTypes,
KeyTypes,
SwapTypes,
TxTypes,
)
from . import ProtocolInterface
from basicswap.contrib.test_framework.script import CScript, CScriptOp, OP_CHECKMULTISIG
def addLockRefundSigs(self, xmr_swap, ci):
self.log.debug("Setting lock refund tx sigs")
witness_stack = []
if ci.coin_type() not in (Coins.DCR,):
witness_stack += [
b"",
]
witness_stack += [
xmr_swap.al_lock_refund_tx_sig,
xmr_swap.af_lock_refund_tx_sig,
xmr_swap.a_lock_tx_script,
]
signed_tx = ci.setTxSignature(xmr_swap.a_lock_refund_tx, witness_stack)
ensure(signed_tx, "setTxSignature failed")
xmr_swap.a_lock_refund_tx = signed_tx
def recoverNoScriptTxnWithKey(self, bid_id: bytes, encoded_key, cursor=None):
self.log.info(f"Manually recovering {self.log.id(bid_id)}")
# Manually recover txn if other key is known
try:
use_cursor = self.openDB(cursor)
bid, xmr_swap = self.getXmrBidFromSession(use_cursor, bid_id)
ensure(bid, "Bid not found: {}.".format(bid_id.hex()))
ensure(xmr_swap, "Adaptor-sig swap not found: {}.".format(bid_id.hex()))
offer, xmr_offer = self.getXmrOfferFromSession(use_cursor, bid.offer_id)
ensure(offer, "Offer not found: {}.".format(bid.offer_id.hex()))
ensure(xmr_offer, "Adaptor-sig offer not found: {}.".format(bid.offer_id.hex()))
# The no-script coin is always the follower
reverse_bid: bool = self.is_reverse_ads_bid(offer.coin_from, offer.coin_to)
ci_from = self.ci(Coins(offer.coin_from))
ci_to = self.ci(Coins(offer.coin_to))
ci_follower = ci_from if reverse_bid else ci_to
try:
decoded_key_half = ci_follower.decodeKey(encoded_key)
except Exception as e:
raise ValueError("Failed to decode provided key-half: ", str(e))
was_sent: bool = bid.was_received if reverse_bid else bid.was_sent
localkeyhalf = ci_follower.decodeKey(
getChainBSplitKey(self, bid, xmr_swap, offer)
)
if was_sent:
kbsl = decoded_key_half
kbsf = localkeyhalf
else:
kbsl = localkeyhalf
kbsf = decoded_key_half
ensure(ci_follower.verifyKey(kbsl), "Invalid kbsl")
ensure(ci_follower.verifyKey(kbsf), "Invalid kbsf")
if kbsl == kbsf:
raise ValueError("Provided key matches local key")
vkbs = ci_follower.sumKeys(kbsl, kbsf)
ensure(ci_follower.verifyPubkey(xmr_swap.pkbs), "Invalid pkbs") # Sanity check
# Ensure summed key matches the expected pubkey
summed_pkbs = ci_follower.getPubkey(vkbs)
if summed_pkbs != xmr_swap.pkbs:
err_msg: str = "Summed key does not match expected wallet spend pubkey"
self.log.error(
f"{err_msg}. Got: {summed_pkbs.hex()}, Expect: {xmr_swap.pkbs.hex()}"
)
raise ValueError(err_msg)
coin_to: int = ci_follower.interface_type()
base_coin_to: int = ci_follower.coin_type()
if coin_to in (Coins.XMR, Coins.WOW):
address_to = self.getCachedMainWalletAddress(ci_follower, use_cursor)
elif coin_to in (Coins.PART_BLIND, Coins.PART_ANON):
address_to = self.getCachedStealthAddressForCoin(base_coin_to, use_cursor)
else:
address_to = self.getReceiveAddressFromPool(
base_coin_to, bid_id, TxTypes.XMR_SWAP_B_LOCK_SPEND, use_cursor
)
amount = bid.amount_to
lock_tx_vout = bid.getLockTXBVout()
txid = ci_follower.spendBLockTx(
xmr_swap.b_lock_tx_id,
address_to,
xmr_swap.vkbv,
vkbs,
amount,
xmr_offer.b_fee_rate,
bid.chain_b_height_start,
spend_actual_balance=True,
lock_tx_vout=lock_tx_vout,
)
self.log.debug(
f"Submitted lock B spend txn {self.log.id(txid)} to {ci_follower.coin_name()} chain for bid {self.log.id(bid_id)}."
)
self.logBidEvent(
bid.bid_id,
EventLogTypes.LOCK_TX_B_SPEND_TX_PUBLISHED,
txid.hex(),
use_cursor,
)
self.commitDB()
return txid
except Exception as e:
self.log.error(traceback.format_exc())
raise (e)
finally:
if cursor is None:
self.closeDB(use_cursor, commit=False)
def getChainBSplitKey(swap_client, bid, xmr_swap, offer):
reverse_bid: bool = offer.bid_reversed
ci_leader = swap_client.ci(offer.coin_to if reverse_bid else offer.coin_from)
ci_follower = swap_client.ci(offer.coin_from if reverse_bid else offer.coin_to)
for_ed25519: bool = True if ci_follower.curve_type() == Curves.ed25519 else False
was_sent: bool = bid.was_received if reverse_bid else bid.was_sent
key_type = KeyTypes.KBSF if was_sent else KeyTypes.KBSL
return ci_follower.encodeKey(
swap_client.getPathKey(
ci_leader.interface_type(),
ci_follower.interface_type(),
bid.created_at,
xmr_swap.contract_count,
key_type,
for_ed25519,
)
)
def getChainBRemoteSplitKey(swap_client, bid, xmr_swap, offer):
reverse_bid: bool = offer.bid_reversed
ci_leader = swap_client.ci(offer.coin_to if reverse_bid else offer.coin_from)
ci_follower = swap_client.ci(offer.coin_from if reverse_bid else offer.coin_to)
if bid.was_sent:
if xmr_swap.a_lock_refund_spend_tx:
af_lock_refund_spend_tx_sig = ci_leader.extractFollowerSig(
xmr_swap.a_lock_refund_spend_tx
)
kbsl = ci_leader.recoverEncKey(
xmr_swap.af_lock_refund_spend_tx_esig,
af_lock_refund_spend_tx_sig,
xmr_swap.pkasl,
)
return ci_follower.encodeKey(kbsl)
else:
if xmr_swap.a_lock_spend_tx:
al_lock_spend_tx_sig = ci_leader.extractLeaderSig(xmr_swap.a_lock_spend_tx)
kbsf = ci_leader.recoverEncKey(
xmr_swap.al_lock_spend_tx_esig, al_lock_spend_tx_sig, xmr_swap.pkasf
)
return ci_follower.encodeKey(kbsf)
return None
def setDLEAG(xmr_swap, ci_to, kbsf: bytes) -> None:
if ci_to.curve_type() == Curves.ed25519:
xmr_swap.kbsf_dleag = ci_to.proveDLEAG(kbsf)
xmr_swap.pkasf = xmr_swap.kbsf_dleag[0:33]
elif ci_to.curve_type() == Curves.secp256k1:
xmr_swap.kbsf_dleag = ci_to.signRecoverable(kbsf, "proof kbsf owned for swap")
pk_recovered: bytes = ci_to.verifySigAndRecover(
xmr_swap.kbsf_dleag, "proof kbsf owned for swap"
)
ensure(pk_recovered == xmr_swap.pkbsf, "kbsf recovered pubkey mismatch")
xmr_swap.pkasf = xmr_swap.pkbsf
else:
raise ValueError("Unknown curve")
class XmrSwapInterface(ProtocolInterface):
swap_type = SwapTypes.XMR_SWAP
def genScriptLockTxScript(self, ci, Kal: bytes, Kaf: bytes, **kwargs) -> CScript:
# fallthrough to ci if genScriptLockTxScript is implemented there
if hasattr(ci, "genScriptLockTxScript") and callable(ci.genScriptLockTxScript):
return ci.genScriptLockTxScript(ci, Kal, Kaf, **kwargs)
assert len(Kal) == 33
assert len(Kaf) == 33
return CScript([2, Kal, Kaf, 2, CScriptOp(OP_CHECKMULTISIG)])
def getFundedInitiateTxTemplate(self, ci, amount: int, sub_fee: bool) -> bytes:
addr_to = self.getMockAddrTo(ci)
funded_tx = ci.createRawFundedTransaction(
addr_to, amount, sub_fee, lock_unspents=False
)
return bytes.fromhex(funded_tx)
def promoteMockTx(self, ci, mock_tx: bytes, script: bytearray) -> bytearray:
mock_txo_script = self.getMockScriptScriptPubkey(ci)
real_txo_script = ci.getScriptDest(script)
found: int = 0
ctx = ci.loadTx(mock_tx)
for txo in ctx.vout:
if txo.scriptPubKey == mock_txo_script:
txo.scriptPubKey = real_txo_script
found += 1
if found < 1:
raise ValueError("Mocked output not found")
if found > 1:
raise ValueError("Too many mocked outputs found")
ctx.nLockTime = 0
return ctx.serialize()