mirror of
https://github.com/basicswap/basicswap.git
synced 2025-11-05 18:38:09 +01:00
1361 lines
47 KiB
Python
1361 lines
47 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2020-2024 tecnovert
|
|
# Copyright (c) 2024 The Basicswap developers
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
import hashlib
|
|
from enum import IntEnum
|
|
|
|
from basicswap.contrib.test_framework.messages import (
|
|
CTxOutPart,
|
|
)
|
|
from basicswap.contrib.test_framework.script import (
|
|
CScript,
|
|
OP_0,
|
|
OP_DUP,
|
|
OP_HASH160,
|
|
OP_EQUALVERIFY,
|
|
OP_CHECKSIG,
|
|
)
|
|
from basicswap.util import (
|
|
ensure,
|
|
TemporaryError,
|
|
)
|
|
from basicswap.util.script import (
|
|
getP2WSH,
|
|
getCompactSizeLen,
|
|
getWitnessElementLen,
|
|
)
|
|
from basicswap.util.address import (
|
|
encodeStealthAddress,
|
|
)
|
|
from basicswap.interface.btc import (
|
|
BTCInterface,
|
|
extractScriptLockScriptValues,
|
|
extractScriptLockRefundScriptValues,
|
|
)
|
|
|
|
from basicswap.chainparams import Coins, chainparams
|
|
|
|
|
|
class BalanceTypes(IntEnum):
|
|
PLAIN = 1
|
|
BLIND = 2
|
|
ANON = 3
|
|
|
|
|
|
class PARTInterface(BTCInterface):
|
|
@staticmethod
|
|
def coin_type():
|
|
# Returns the base coin type
|
|
# ANON and BLIND PART will return Coins.PART
|
|
return Coins.PART
|
|
|
|
@staticmethod
|
|
def balance_type():
|
|
return BalanceTypes.PLAIN
|
|
|
|
@staticmethod
|
|
def witnessScaleFactor() -> int:
|
|
return 2
|
|
|
|
@staticmethod
|
|
def txVersion() -> int:
|
|
return 0xA0
|
|
|
|
@staticmethod
|
|
def xmr_swap_a_lock_spend_tx_vsize() -> int:
|
|
return 200
|
|
|
|
@staticmethod
|
|
def xmr_swap_b_lock_spend_tx_vsize() -> int:
|
|
return 138
|
|
|
|
@staticmethod
|
|
def txoType():
|
|
return CTxOutPart
|
|
|
|
def __init__(self, coin_settings, network, swap_client=None):
|
|
super().__init__(coin_settings, network, swap_client)
|
|
self.setAnonTxRingSize(int(coin_settings.get("anon_tx_ring_size", 12)))
|
|
|
|
def use_tx_vsize(self) -> bool:
|
|
return True
|
|
|
|
def setAnonTxRingSize(self, value):
|
|
ensure(value >= 3 and value < 33, "Invalid anon_tx_ring_size value")
|
|
self._anon_tx_ring_size = value
|
|
|
|
def knownWalletSeed(self):
|
|
# TODO: Double check
|
|
return True
|
|
|
|
def getNewAddress(self, use_segwit, label="swap_receive") -> str:
|
|
return self.rpc_wallet("getnewaddress", [label])
|
|
|
|
def getNewStealthAddress(self, label="swap_stealth") -> str:
|
|
return self.rpc_wallet("getnewstealthaddress", [label])
|
|
|
|
def haveSpentIndex(self):
|
|
version = self.getDaemonVersion()
|
|
index_info = self.rpc(
|
|
"getinsightinfo" if int(str(version)[:2]) > 19 else "getindexinfo"
|
|
)
|
|
return index_info["spentindex"]
|
|
|
|
def initialiseWallet(self, key: bytes) -> None:
|
|
raise ValueError("TODO")
|
|
|
|
def withdrawCoin(self, value, addr_to, subfee):
|
|
params = [addr_to, value, "", "", subfee, "", True, self._conf_target]
|
|
return self.rpc_wallet("sendtoaddress", params)
|
|
|
|
def sendTypeTo(self, type_from, type_to, value, addr_to, subfee):
|
|
params = [
|
|
type_from,
|
|
type_to,
|
|
[
|
|
{"address": addr_to, "amount": value, "subfee": subfee},
|
|
],
|
|
"",
|
|
"",
|
|
self._anon_tx_ring_size,
|
|
1,
|
|
False,
|
|
{"conf_target": self._conf_target},
|
|
]
|
|
return self.rpc_wallet("sendtypeto", params)
|
|
|
|
def getScriptForPubkeyHash(self, pkh: bytes) -> CScript:
|
|
return CScript([OP_DUP, OP_HASH160, pkh, OP_EQUALVERIFY, OP_CHECKSIG])
|
|
|
|
def formatStealthAddress(self, scan_pubkey, spend_pubkey) -> str:
|
|
prefix_byte = chainparams[self.coin_type()][self._network]["stealth_key_prefix"]
|
|
|
|
return encodeStealthAddress(prefix_byte, scan_pubkey, spend_pubkey)
|
|
|
|
def getWitnessStackSerialisedLength(self, witness_stack) -> int:
|
|
length: int = getCompactSizeLen(len(witness_stack))
|
|
for e in witness_stack:
|
|
length += getWitnessElementLen(len(e))
|
|
return length
|
|
|
|
def getWalletRestoreHeight(self) -> int:
|
|
start_time = self.rpc_wallet("getwalletinfo")["keypoololdest"]
|
|
|
|
blockchaininfo = self.getBlockchainInfo()
|
|
|
|
chain_synced = round(blockchaininfo["verificationprogress"], 3)
|
|
if chain_synced < 1.0:
|
|
raise ValueError("{} chain isn't synced.".format(self.coin_name()))
|
|
|
|
self._log.debug("Finding block at time: {}".format(start_time))
|
|
block_hash = self.rpc("getblockhashafter", [start_time])
|
|
block_header = self.rpc("getblockheader", [block_hash])
|
|
return block_header["height"]
|
|
|
|
def getHTLCSpendTxVSize(self, redeem: bool = True) -> int:
|
|
tx_vsize = (
|
|
5 # Add a few bytes, sequence in script takes variable amount of bytes
|
|
)
|
|
tx_vsize += 204 if redeem else 187
|
|
return tx_vsize
|
|
|
|
def getUnspentsByAddr(self):
|
|
unspent_addr = dict()
|
|
balance_type = self.balance_type()
|
|
if balance_type == BalanceTypes.PLAIN:
|
|
unspent = self.rpc_wallet("listunspent")
|
|
elif balance_type == BalanceTypes.BLIND:
|
|
unspent = self.rpc_wallet("listunspentblind")
|
|
else:
|
|
raise ValueError(
|
|
f"getUnspentsByAddr not implemented for {balance_type} type"
|
|
)
|
|
for u in unspent:
|
|
if u["spendable"] is not True:
|
|
continue
|
|
if "address" not in u:
|
|
continue
|
|
unspent_addr[u["address"]] = unspent_addr.get(
|
|
u["address"], 0
|
|
) + self.make_int(u["amount"], r=1)
|
|
return unspent_addr
|
|
|
|
|
|
class PARTInterfaceBlind(PARTInterface):
|
|
@staticmethod
|
|
def balance_type():
|
|
return BalanceTypes.BLIND
|
|
|
|
@staticmethod
|
|
def xmr_swap_a_lock_spend_tx_vsize() -> int:
|
|
return 1032
|
|
|
|
@staticmethod
|
|
def xmr_swap_b_lock_spend_tx_vsize() -> int:
|
|
return 980
|
|
|
|
def coin_name(self) -> str:
|
|
return super().coin_name() + " Blind"
|
|
|
|
def getScriptLockTxNonce(self, data):
|
|
return hashlib.sha256(data + bytes("locktx", "utf-8")).digest()
|
|
|
|
def getScriptLockRefundTxNonce(self, data):
|
|
return hashlib.sha256(data + bytes("lockrefundtx", "utf-8")).digest()
|
|
|
|
def findOutputByNonce(self, tx_obj, nonce):
|
|
blinded_info = None
|
|
output_n = None
|
|
for txo in tx_obj["vout"]:
|
|
if txo["type"] != "blind":
|
|
continue
|
|
try:
|
|
blinded_info = self.rpc(
|
|
"rewindrangeproof",
|
|
[txo["rangeproof"], txo["valueCommitment"], nonce.hex()],
|
|
)
|
|
output_n = txo["n"]
|
|
|
|
self.rpc(
|
|
"rewindrangeproof",
|
|
[txo["rangeproof"], txo["valueCommitment"], nonce.hex()],
|
|
)
|
|
break
|
|
except Exception as e:
|
|
self._log.debug("Searching for locked output: {}".format(str(e)))
|
|
continue
|
|
# Should not be possible for commitment not to match
|
|
v = self.rpc(
|
|
"verifycommitment",
|
|
[txo["valueCommitment"], blinded_info["blind"], blinded_info["amount"]],
|
|
)
|
|
ensure(v["result"] is True, "verifycommitment failed")
|
|
return output_n, blinded_info
|
|
|
|
def createSCLockTx(self, value: int, script: bytearray, vkbv: bytes) -> bytes:
|
|
|
|
# Nonce is derived from vkbv, ephemeral_key isn't used
|
|
ephemeral_key = self.getNewSecretKey()
|
|
ephemeral_pubkey = self.getPubkey(ephemeral_key)
|
|
assert len(ephemeral_pubkey) == 33
|
|
nonce = self.getScriptLockTxNonce(vkbv)
|
|
p2wsh_addr = self.encode_p2wsh(getP2WSH(script))
|
|
inputs = []
|
|
outputs = [
|
|
{
|
|
"type": "blind",
|
|
"amount": self.format_amount(value),
|
|
"address": p2wsh_addr,
|
|
"nonce": nonce.hex(),
|
|
"data": ephemeral_pubkey.hex(),
|
|
}
|
|
]
|
|
params = [inputs, outputs]
|
|
rv = self.rpc_wallet("createrawparttransaction", params)
|
|
|
|
tx_bytes = bytes.fromhex(rv["hex"])
|
|
return tx_bytes
|
|
|
|
def fundSCLockTx(self, tx_bytes: bytes, feerate: int, vkbv: bytes) -> bytes:
|
|
feerate_str = self.format_amount(feerate)
|
|
# TODO: unlock unspents if bid cancelled
|
|
|
|
tx_hex = tx_bytes.hex()
|
|
nonce = self.getScriptLockTxNonce(vkbv)
|
|
|
|
tx_obj = self.rpc("decoderawtransaction", [tx_hex])
|
|
|
|
assert len(tx_obj["vout"]) == 1
|
|
txo = tx_obj["vout"][0]
|
|
blinded_info = self.rpc(
|
|
"rewindrangeproof", [txo["rangeproof"], txo["valueCommitment"], nonce.hex()]
|
|
)
|
|
|
|
outputs_info = {
|
|
0: {
|
|
"value": blinded_info["amount"],
|
|
"blind": blinded_info["blind"],
|
|
"nonce": nonce.hex(),
|
|
}
|
|
}
|
|
|
|
options = {
|
|
"lockUnspents": True,
|
|
"feeRate": feerate_str,
|
|
}
|
|
rv = self.rpc(
|
|
"fundrawtransactionfrom", ["blind", tx_hex, {}, outputs_info, options]
|
|
)
|
|
return bytes.fromhex(rv["hex"])
|
|
|
|
def createSCLockRefundTx(
|
|
self,
|
|
tx_lock_bytes,
|
|
script_lock,
|
|
Kal,
|
|
Kaf,
|
|
lock1_value,
|
|
csv_val,
|
|
tx_fee_rate,
|
|
vkbv,
|
|
):
|
|
lock_tx_obj = self.rpc("decoderawtransaction", [tx_lock_bytes.hex()])
|
|
assert self.getTxid(tx_lock_bytes).hex() == lock_tx_obj["txid"]
|
|
# Nonce is derived from vkbv, ephemeral_key isn't used
|
|
ephemeral_key = self.getNewSecretKey()
|
|
ephemeral_pubkey = self.getPubkey(ephemeral_key)
|
|
assert len(ephemeral_pubkey) == 33
|
|
nonce = self.getScriptLockTxNonce(vkbv)
|
|
output_nonce = self.getScriptLockRefundTxNonce(vkbv)
|
|
|
|
# Find the output of the lock tx to spend
|
|
spend_n, input_blinded_info = self.findOutputByNonce(lock_tx_obj, nonce)
|
|
ensure(spend_n is not None, "Output not found in tx")
|
|
|
|
locked_coin = input_blinded_info["amount"]
|
|
tx_lock_id = lock_tx_obj["txid"]
|
|
refund_script = self.genScriptLockRefundTxScript(Kal, Kaf, csv_val)
|
|
p2wsh_addr = self.encode_p2wsh(getP2WSH(refund_script))
|
|
|
|
inputs = [
|
|
{
|
|
"txid": tx_lock_id,
|
|
"vout": spend_n,
|
|
"sequence": lock1_value,
|
|
"blindingfactor": input_blinded_info["blind"],
|
|
}
|
|
]
|
|
outputs = [
|
|
{
|
|
"type": "blind",
|
|
"amount": locked_coin,
|
|
"address": p2wsh_addr,
|
|
"nonce": output_nonce.hex(),
|
|
"data": ephemeral_pubkey.hex(),
|
|
}
|
|
]
|
|
params = [inputs, outputs]
|
|
rv = self.rpc_wallet("createrawparttransaction", params)
|
|
lock_refund_tx_hex = rv["hex"]
|
|
|
|
# Set dummy witness data for fee estimation
|
|
dummy_witness_stack = self.getScriptLockTxDummyWitness(script_lock)
|
|
dummy_witness_stack = [x.hex() for x in dummy_witness_stack]
|
|
|
|
# Use a junk change pubkey to avoid adding unused keys to the wallet
|
|
zero_change_key = self.getNewSecretKey()
|
|
zero_change_pubkey = self.getPubkey(zero_change_key)
|
|
inputs_info = {
|
|
"0": {
|
|
"value": input_blinded_info["amount"],
|
|
"blind": input_blinded_info["blind"],
|
|
"witnessstack": dummy_witness_stack,
|
|
}
|
|
}
|
|
outputs_info = rv["amounts"]
|
|
options = {
|
|
"changepubkey": zero_change_pubkey.hex(),
|
|
"feeRate": self.format_amount(tx_fee_rate),
|
|
"subtractFeeFromOutputs": [
|
|
0,
|
|
],
|
|
}
|
|
rv = self.rpc_wallet(
|
|
"fundrawtransactionfrom",
|
|
["blind", lock_refund_tx_hex, inputs_info, outputs_info, options],
|
|
)
|
|
lock_refund_tx_hex = rv["hex"]
|
|
|
|
for vout, txo in rv["output_amounts"].items():
|
|
if txo["value"] > 0:
|
|
refunded_value = txo["value"]
|
|
|
|
return bytes.fromhex(lock_refund_tx_hex), refund_script, refunded_value
|
|
|
|
def createSCLockRefundSpendTx(
|
|
self, tx_lock_refund_bytes, script_lock_refund, pkh_refund_to, tx_fee_rate, vkbv
|
|
):
|
|
# Returns the coinA locked coin to the leader
|
|
# The follower will sign the multisig path with a signature encumbered by the leader's coinB spend pubkey
|
|
# If the leader publishes the decrypted signature the leader's coinB spend privatekey will be revealed to the follower
|
|
|
|
lock_refund_tx_obj = self.rpc(
|
|
"decoderawtransaction", [tx_lock_refund_bytes.hex()]
|
|
)
|
|
# Nonce is derived from vkbv
|
|
nonce = self.getScriptLockRefundTxNonce(vkbv)
|
|
|
|
# Find the output of the lock refund tx to spend
|
|
spend_n, input_blinded_info = self.findOutputByNonce(lock_refund_tx_obj, nonce)
|
|
ensure(spend_n is not None, "Output not found in tx")
|
|
|
|
tx_lock_refund_id = lock_refund_tx_obj["txid"]
|
|
addr_out = self.pkh_to_address(pkh_refund_to)
|
|
addr_info = self.rpc_wallet("getaddressinfo", [addr_out])
|
|
output_pubkey_hex = addr_info["pubkey"]
|
|
|
|
# Follower won't be able to decode output to check amount, shouldn't matter as fee is public and output is to leader, sum has to balance
|
|
|
|
inputs = [
|
|
{
|
|
"txid": tx_lock_refund_id,
|
|
"vout": spend_n,
|
|
"sequence": 0,
|
|
"blindingfactor": input_blinded_info["blind"],
|
|
}
|
|
]
|
|
outputs = [
|
|
{
|
|
"type": "blind",
|
|
"amount": input_blinded_info["amount"],
|
|
"address": addr_out,
|
|
"pubkey": output_pubkey_hex,
|
|
}
|
|
]
|
|
params = [inputs, outputs]
|
|
rv = self.rpc_wallet("createrawparttransaction", params)
|
|
lock_refund_spend_tx_hex = rv["hex"]
|
|
|
|
# Set dummy witness data for fee estimation
|
|
dummy_witness_stack = self.getScriptLockRefundSpendTxDummyWitness(
|
|
script_lock_refund
|
|
)
|
|
dummy_witness_stack = [x.hex() for x in dummy_witness_stack]
|
|
|
|
# Use a junk change pubkey to avoid adding unused keys to the wallet
|
|
zero_change_key = self.getNewSecretKey()
|
|
zero_change_pubkey = self.getPubkey(zero_change_key)
|
|
inputs_info = {
|
|
"0": {
|
|
"value": input_blinded_info["amount"],
|
|
"blind": input_blinded_info["blind"],
|
|
"witnessstack": dummy_witness_stack,
|
|
}
|
|
}
|
|
outputs_info = rv["amounts"]
|
|
options = {
|
|
"changepubkey": zero_change_pubkey.hex(),
|
|
"feeRate": self.format_amount(tx_fee_rate),
|
|
"subtractFeeFromOutputs": [
|
|
0,
|
|
],
|
|
}
|
|
|
|
rv = self.rpc_wallet(
|
|
"fundrawtransactionfrom",
|
|
["blind", lock_refund_spend_tx_hex, inputs_info, outputs_info, options],
|
|
)
|
|
lock_refund_spend_tx_hex = rv["hex"]
|
|
|
|
return bytes.fromhex(lock_refund_spend_tx_hex)
|
|
|
|
def verifySCLockTx(
|
|
self,
|
|
tx_bytes,
|
|
script_out,
|
|
swap_value,
|
|
Kal,
|
|
Kaf,
|
|
feerate,
|
|
check_lock_tx_inputs,
|
|
vkbv,
|
|
):
|
|
lock_tx_obj = self.rpc("decoderawtransaction", [tx_bytes.hex()])
|
|
lock_txid_hex = lock_tx_obj["txid"]
|
|
self._log.info("Verifying lock tx: {}.".format(lock_txid_hex))
|
|
|
|
ensure(lock_tx_obj["version"] == self.txVersion(), "Bad version")
|
|
ensure(lock_tx_obj["locktime"] == 0, "Bad nLockTime")
|
|
|
|
# Find the output of the lock tx to verify
|
|
nonce = self.getScriptLockTxNonce(vkbv)
|
|
lock_output_n, blinded_info = self.findOutputByNonce(lock_tx_obj, nonce)
|
|
ensure(lock_output_n is not None, "Output not found in tx")
|
|
|
|
# Check value
|
|
locked_txo_value = self.make_int(blinded_info["amount"])
|
|
ensure(locked_txo_value == swap_value, "Bad locked value")
|
|
|
|
# Check script
|
|
lock_txo_scriptpk = bytes.fromhex(
|
|
lock_tx_obj["vout"][lock_output_n]["scriptPubKey"]["hex"]
|
|
)
|
|
script_pk = CScript([OP_0, hashlib.sha256(script_out).digest()])
|
|
ensure(lock_txo_scriptpk == script_pk, "Bad output script")
|
|
A, B = extractScriptLockScriptValues(script_out)
|
|
ensure(A == Kal, "Bad script leader pubkey")
|
|
ensure(B == Kaf, "Bad script follower pubkey")
|
|
|
|
# TODO: Check that inputs are unspent, rangeproofs and commitments sum
|
|
# Verify fee rate
|
|
vsize = lock_tx_obj["vsize"]
|
|
fee_paid = self.make_int(lock_tx_obj["vout"][0]["ct_fee"])
|
|
|
|
fee_rate_paid = fee_paid * 1000 // vsize
|
|
|
|
self._log.info(
|
|
"tx amount, vsize, feerate: %ld, %ld, %ld",
|
|
locked_txo_value,
|
|
vsize,
|
|
fee_rate_paid,
|
|
)
|
|
|
|
if not self.compareFeeRates(fee_rate_paid, feerate):
|
|
self._log.warning(
|
|
"feerate paid doesn't match expected: %ld, %ld", fee_rate_paid, feerate
|
|
)
|
|
# TODO: Display warning to user
|
|
|
|
return bytes.fromhex(lock_txid_hex), lock_output_n
|
|
|
|
def verifySCLockRefundTx(
|
|
self,
|
|
tx_bytes,
|
|
lock_tx_bytes,
|
|
script_out,
|
|
prevout_id,
|
|
prevout_n,
|
|
prevout_seq,
|
|
prevout_script,
|
|
Kal,
|
|
Kaf,
|
|
csv_val_expect,
|
|
swap_value,
|
|
feerate,
|
|
vkbv,
|
|
):
|
|
lock_refund_tx_obj = self.rpc("decoderawtransaction", [tx_bytes.hex()])
|
|
lock_refund_txid_hex = lock_refund_tx_obj["txid"]
|
|
self._log.info("Verifying lock refund tx: {}.".format(lock_refund_txid_hex))
|
|
|
|
ensure(lock_refund_tx_obj["version"] == self.txVersion(), "Bad version")
|
|
ensure(lock_refund_tx_obj["locktime"] == 0, "Bad nLockTime")
|
|
ensure(len(lock_refund_tx_obj["vin"]) == 1, "tx doesn't have one input")
|
|
|
|
txin = lock_refund_tx_obj["vin"][0]
|
|
ensure(txin["sequence"] == prevout_seq, "Bad input nSequence")
|
|
ensure(txin["scriptSig"]["hex"] == "", "Input scriptsig not empty")
|
|
ensure(
|
|
txin["txid"] == prevout_id.hex() and txin["vout"] == prevout_n,
|
|
"Input prevout mismatch",
|
|
)
|
|
|
|
ensure(len(lock_refund_tx_obj["vout"]) == 3, "tx doesn't have three outputs")
|
|
|
|
# Find the output of the lock refund tx to verify
|
|
nonce = self.getScriptLockRefundTxNonce(vkbv)
|
|
lock_refund_output_n, blinded_info = self.findOutputByNonce(
|
|
lock_refund_tx_obj, nonce
|
|
)
|
|
ensure(lock_refund_output_n is not None, "Output not found in tx")
|
|
|
|
lock_refund_txo_value = self.make_int(blinded_info["amount"])
|
|
|
|
# Check script
|
|
lock_refund_txo_scriptpk = bytes.fromhex(
|
|
lock_refund_tx_obj["vout"][lock_refund_output_n]["scriptPubKey"]["hex"]
|
|
)
|
|
script_pk = CScript([OP_0, hashlib.sha256(script_out).digest()])
|
|
ensure(lock_refund_txo_scriptpk == script_pk, "Bad output script")
|
|
A, B, csv_val, C = extractScriptLockRefundScriptValues(script_out)
|
|
ensure(A == Kal, "Bad script pubkey")
|
|
ensure(B == Kaf, "Bad script pubkey")
|
|
ensure(csv_val == csv_val_expect, "Bad script csv value")
|
|
ensure(C == Kaf, "Bad script pubkey")
|
|
|
|
# Check rangeproofs and commitments sum
|
|
lock_tx_obj = self.rpc("decoderawtransaction", [lock_tx_bytes.hex()])
|
|
prevout = lock_tx_obj["vout"][prevout_n]
|
|
prevtxns = [
|
|
{
|
|
"txid": prevout_id.hex(),
|
|
"vout": prevout_n,
|
|
"scriptPubKey": prevout["scriptPubKey"]["hex"],
|
|
"amount_commitment": prevout["valueCommitment"],
|
|
}
|
|
]
|
|
rv = self.rpc("verifyrawtransaction", [tx_bytes.hex(), prevtxns])
|
|
ensure(rv["outputs_valid"] is True, "Invalid outputs")
|
|
ensure(rv["inputs_valid"] is True, "Invalid inputs")
|
|
|
|
# Check value
|
|
fee_paid = self.make_int(lock_refund_tx_obj["vout"][0]["ct_fee"])
|
|
ensure(swap_value - lock_refund_txo_value == fee_paid, "Bad output value")
|
|
|
|
# Check fee rate
|
|
dummy_witness_stack = self.getScriptLockTxDummyWitness(prevout_script)
|
|
witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack)
|
|
vsize = self.getTxVSize(self.loadTx(tx_bytes), add_witness_bytes=witness_bytes)
|
|
fee_rate_paid = fee_paid * 1000 // vsize
|
|
self._log.info("vsize, feerate: %ld, %ld", vsize, fee_rate_paid)
|
|
|
|
ensure(
|
|
self.compareFeeRates(fee_rate_paid, feerate),
|
|
"Bad fee rate, expected: {}".format(feerate),
|
|
)
|
|
|
|
return (
|
|
bytes.fromhex(lock_refund_txid_hex),
|
|
lock_refund_txo_value,
|
|
lock_refund_output_n,
|
|
)
|
|
|
|
def verifySCLockRefundSpendTx(
|
|
self,
|
|
tx_bytes,
|
|
lock_refund_tx_bytes,
|
|
lock_refund_tx_id,
|
|
prevout_script,
|
|
Kal,
|
|
prevout_n,
|
|
prevout_value,
|
|
feerate,
|
|
vkbv,
|
|
):
|
|
lock_refund_spend_tx_obj = self.rpc("decoderawtransaction", [tx_bytes.hex()])
|
|
lock_refund_spend_txid_hex = lock_refund_spend_tx_obj["txid"]
|
|
self._log.info(
|
|
"Verifying lock refund spend tx: {}.".format(lock_refund_spend_txid_hex)
|
|
)
|
|
|
|
ensure(lock_refund_spend_tx_obj["version"] == self.txVersion(), "Bad version")
|
|
ensure(lock_refund_spend_tx_obj["locktime"] == 0, "Bad nLockTime")
|
|
ensure(len(lock_refund_spend_tx_obj["vin"]) == 1, "tx doesn't have one input")
|
|
|
|
txin = lock_refund_spend_tx_obj["vin"][0]
|
|
ensure(txin["sequence"] == 0, "Bad input nSequence")
|
|
ensure(txin["scriptSig"]["hex"] == "", "Input scriptsig not empty")
|
|
ensure(
|
|
txin["txid"] == lock_refund_tx_id.hex() and txin["vout"] == prevout_n,
|
|
"Input prevout mismatch",
|
|
)
|
|
|
|
ensure(
|
|
len(lock_refund_spend_tx_obj["vout"]) == 3, "tx doesn't have three outputs"
|
|
)
|
|
|
|
# Leader picks output destinations
|
|
# Follower is not concerned with them as they pay to leader
|
|
|
|
# Check rangeproofs and commitments sum
|
|
lock_refund_tx_obj = self.rpc(
|
|
"decoderawtransaction", [lock_refund_tx_bytes.hex()]
|
|
)
|
|
prevout = lock_refund_tx_obj["vout"][prevout_n]
|
|
prevtxns = [
|
|
{
|
|
"txid": lock_refund_tx_id.hex(),
|
|
"vout": prevout_n,
|
|
"scriptPubKey": prevout["scriptPubKey"]["hex"],
|
|
"amount_commitment": prevout["valueCommitment"],
|
|
}
|
|
]
|
|
rv = self.rpc("verifyrawtransaction", [tx_bytes.hex(), prevtxns])
|
|
ensure(rv["outputs_valid"] is True, "Invalid outputs")
|
|
ensure(rv["inputs_valid"] is True, "Invalid inputs")
|
|
|
|
# Check fee rate
|
|
dummy_witness_stack = self.getScriptLockRefundSpendTxDummyWitness(
|
|
prevout_script
|
|
)
|
|
witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack)
|
|
vsize = self.getTxVSize(self.loadTx(tx_bytes), add_witness_bytes=witness_bytes)
|
|
fee_paid = self.make_int(lock_refund_spend_tx_obj["vout"][0]["ct_fee"])
|
|
fee_rate_paid = fee_paid * 1000 // vsize
|
|
ensure(
|
|
self.compareFeeRates(fee_rate_paid, feerate),
|
|
"Bad fee rate, expected: {}".format(feerate),
|
|
)
|
|
|
|
return True
|
|
|
|
def getLockTxSwapOutputValue(self, bid, xmr_swap):
|
|
lock_tx_obj = self.rpc("decoderawtransaction", [xmr_swap.a_lock_tx.hex()])
|
|
nonce = self.getScriptLockTxNonce(xmr_swap.vkbv)
|
|
output_n, _ = self.findOutputByNonce(lock_tx_obj, nonce)
|
|
ensure(output_n is not None, "Output not found in tx")
|
|
return bytes.fromhex(lock_tx_obj["vout"][output_n]["valueCommitment"])
|
|
|
|
def getLockRefundTxSwapOutputValue(self, bid, xmr_swap):
|
|
lock_refund_tx_obj = self.rpc(
|
|
"decoderawtransaction", [xmr_swap.a_lock_refund_tx.hex()]
|
|
)
|
|
nonce = self.getScriptLockRefundTxNonce(xmr_swap.vkbv)
|
|
output_n, _ = self.findOutputByNonce(lock_refund_tx_obj, nonce)
|
|
ensure(output_n is not None, "Output not found in tx")
|
|
return bytes.fromhex(lock_refund_tx_obj["vout"][output_n]["valueCommitment"])
|
|
|
|
def getLockRefundTxSwapOutput(self, xmr_swap):
|
|
lock_refund_tx_obj = self.rpc(
|
|
"decoderawtransaction", [xmr_swap.a_lock_refund_tx.hex()]
|
|
)
|
|
nonce = self.getScriptLockRefundTxNonce(xmr_swap.vkbv)
|
|
output_n, _ = self.findOutputByNonce(lock_refund_tx_obj, nonce)
|
|
ensure(output_n is not None, "Output not found in tx")
|
|
return output_n
|
|
|
|
def createSCLockSpendTx(
|
|
self,
|
|
tx_lock_bytes: bytes,
|
|
script_lock: bytes,
|
|
pk_dest: bytes,
|
|
tx_fee_rate: int,
|
|
vkbv: bytes,
|
|
fee_info={},
|
|
) -> bytes:
|
|
lock_tx_obj = self.rpc("decoderawtransaction", [tx_lock_bytes.hex()])
|
|
lock_txid_hex = lock_tx_obj["txid"]
|
|
|
|
ensure(lock_tx_obj["version"] == self.txVersion(), "Bad version")
|
|
ensure(lock_tx_obj["locktime"] == 0, "Bad nLockTime")
|
|
|
|
# Find the output of the lock tx to verify
|
|
nonce = self.getScriptLockTxNonce(vkbv)
|
|
spend_n, blinded_info = self.findOutputByNonce(lock_tx_obj, nonce)
|
|
ensure(spend_n is not None, "Output not found in tx")
|
|
|
|
addr_out = self.pubkey_to_address(pk_dest)
|
|
|
|
inputs = [
|
|
{
|
|
"txid": lock_txid_hex,
|
|
"vout": spend_n,
|
|
"sequence": 0,
|
|
"blindingfactor": blinded_info["blind"],
|
|
}
|
|
]
|
|
outputs = [
|
|
{
|
|
"type": "blind",
|
|
"amount": blinded_info["amount"],
|
|
"address": addr_out,
|
|
"pubkey": pk_dest.hex(),
|
|
}
|
|
]
|
|
params = [inputs, outputs]
|
|
rv = self.rpc_wallet("createrawparttransaction", params)
|
|
lock_spend_tx_hex = rv["hex"]
|
|
|
|
# Set dummy witness data for fee estimation
|
|
dummy_witness_stack = self.getScriptLockTxDummyWitness(script_lock)
|
|
|
|
# Use a junk change pubkey to avoid adding unused keys to the wallet
|
|
zero_change_key = self.getNewSecretKey()
|
|
zero_change_pubkey = self.getPubkey(zero_change_key)
|
|
inputs_info = {
|
|
"0": {
|
|
"value": blinded_info["amount"],
|
|
"blind": blinded_info["blind"],
|
|
"witnessstack": [x.hex() for x in dummy_witness_stack],
|
|
}
|
|
}
|
|
outputs_info = rv["amounts"]
|
|
options = {
|
|
"changepubkey": zero_change_pubkey.hex(),
|
|
"feeRate": self.format_amount(tx_fee_rate),
|
|
"subtractFeeFromOutputs": [
|
|
0,
|
|
],
|
|
}
|
|
|
|
rv = self.rpc_wallet(
|
|
"fundrawtransactionfrom",
|
|
["blind", lock_spend_tx_hex, inputs_info, outputs_info, options],
|
|
)
|
|
lock_spend_tx_hex = rv["hex"]
|
|
lock_spend_tx_obj = self.rpc("decoderawtransaction", [lock_spend_tx_hex])
|
|
pay_fee = self.make_int(lock_spend_tx_obj["vout"][0]["ct_fee"])
|
|
|
|
# lock_spend_tx_hex does not include the dummy witness stack
|
|
witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack)
|
|
vsize = self.getTxVSize(
|
|
self.loadTx(bytes.fromhex(lock_spend_tx_hex)),
|
|
add_witness_bytes=witness_bytes,
|
|
)
|
|
actual_tx_fee_rate = pay_fee * 1000 // vsize
|
|
self._log.info(
|
|
"createSCLockSpendTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.",
|
|
lock_spend_tx_obj["txid"],
|
|
actual_tx_fee_rate,
|
|
vsize,
|
|
pay_fee,
|
|
)
|
|
|
|
fee_info["vsize"] = vsize
|
|
fee_info["fee_paid"] = pay_fee
|
|
fee_info["rate_input"] = tx_fee_rate
|
|
fee_info["rate_actual"] = actual_tx_fee_rate
|
|
|
|
return bytes.fromhex(lock_spend_tx_hex)
|
|
|
|
def verifySCLockSpendTx(
|
|
self, tx_bytes, lock_tx_bytes, lock_tx_script, a_pk_f, feerate, vkbv
|
|
):
|
|
lock_spend_tx_obj = self.rpc("decoderawtransaction", [tx_bytes.hex()])
|
|
lock_spend_txid_hex = lock_spend_tx_obj["txid"]
|
|
self._log.info("Verifying lock spend tx: {}.".format(lock_spend_txid_hex))
|
|
|
|
ensure(lock_spend_tx_obj["version"] == self.txVersion(), "Bad version")
|
|
ensure(lock_spend_tx_obj["locktime"] == 0, "Bad nLockTime")
|
|
ensure(len(lock_spend_tx_obj["vin"]) == 1, "tx doesn't have one input")
|
|
|
|
lock_tx_obj = self.rpc("decoderawtransaction", [lock_tx_bytes.hex()])
|
|
lock_txid_hex = lock_tx_obj["txid"]
|
|
|
|
# Find the output of the lock tx to verify
|
|
nonce = self.getScriptLockTxNonce(vkbv)
|
|
spend_n, input_blinded_info = self.findOutputByNonce(lock_tx_obj, nonce)
|
|
ensure(spend_n is not None, "Output not found in tx")
|
|
|
|
txin = lock_spend_tx_obj["vin"][0]
|
|
ensure(txin["sequence"] == 0, "Bad input nSequence")
|
|
ensure(txin["scriptSig"]["hex"] == "", "Input scriptsig not empty")
|
|
ensure(
|
|
txin["txid"] == lock_txid_hex and txin["vout"] == spend_n,
|
|
"Input prevout mismatch",
|
|
)
|
|
|
|
ensure(len(lock_spend_tx_obj["vout"]) == 3, "tx doesn't have three outputs")
|
|
|
|
addr_out = self.pubkey_to_address(a_pk_f)
|
|
privkey = self.rpc_wallet("dumpprivkey", [addr_out])
|
|
|
|
# Find output:
|
|
output_blinded_info = None
|
|
output_n = None
|
|
for txo in lock_spend_tx_obj["vout"]:
|
|
if txo["type"] != "blind":
|
|
continue
|
|
try:
|
|
output_blinded_info = self.rpc(
|
|
"rewindrangeproof",
|
|
[
|
|
txo["rangeproof"],
|
|
txo["valueCommitment"],
|
|
privkey,
|
|
txo["data_hex"],
|
|
],
|
|
)
|
|
output_n = txo["n"]
|
|
break
|
|
except Exception as e:
|
|
self._log.debug("Searching for locked output: {}".format(str(e)))
|
|
pass
|
|
ensure(output_n is not None, "Output not found in tx")
|
|
|
|
# Commitment
|
|
v = self.rpc(
|
|
"verifycommitment",
|
|
[
|
|
lock_spend_tx_obj["vout"][output_n]["valueCommitment"],
|
|
output_blinded_info["blind"],
|
|
output_blinded_info["amount"],
|
|
],
|
|
)
|
|
ensure(v["result"] is True, "verifycommitment failed")
|
|
|
|
# Check rangeproofs and commitments sum
|
|
prevout = lock_tx_obj["vout"][spend_n]
|
|
prevtxns = [
|
|
{
|
|
"txid": lock_txid_hex,
|
|
"vout": spend_n,
|
|
"scriptPubKey": prevout["scriptPubKey"]["hex"],
|
|
"amount_commitment": prevout["valueCommitment"],
|
|
}
|
|
]
|
|
rv = self.rpc("verifyrawtransaction", [tx_bytes.hex(), prevtxns])
|
|
ensure(rv["outputs_valid"] is True, "Invalid outputs")
|
|
ensure(rv["inputs_valid"] is True, "Invalid inputs")
|
|
|
|
# Check amount
|
|
fee_paid = self.make_int(lock_spend_tx_obj["vout"][0]["ct_fee"])
|
|
amount_difference = self.make_int(input_blinded_info["amount"]) - self.make_int(
|
|
output_blinded_info["amount"]
|
|
)
|
|
ensure(fee_paid == amount_difference, "Invalid output amount")
|
|
|
|
# Check fee
|
|
dummy_witness_stack = self.getScriptLockTxDummyWitness(lock_tx_script)
|
|
witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack)
|
|
|
|
vsize = self.getTxVSize(self.loadTx(tx_bytes), add_witness_bytes=witness_bytes)
|
|
fee_rate_paid = fee_paid * 1000 // vsize
|
|
self._log.info("vsize, feerate: %ld, %ld", vsize, fee_rate_paid)
|
|
if not self.compareFeeRates(fee_rate_paid, feerate):
|
|
raise ValueError("Bad fee rate, expected: {}".format(feerate))
|
|
|
|
return True
|
|
|
|
def createSCLockRefundSpendToFTx(
|
|
self,
|
|
tx_lock_refund_bytes,
|
|
script_lock_refund,
|
|
pkh_dest,
|
|
tx_fee_rate,
|
|
vkbv,
|
|
kbsf=None,
|
|
):
|
|
# lock refund swipe tx
|
|
# Sends the coinA locked coin to the follower
|
|
lock_refund_tx_obj = self.rpc(
|
|
"decoderawtransaction", [tx_lock_refund_bytes.hex()]
|
|
)
|
|
nonce = self.getScriptLockRefundTxNonce(vkbv)
|
|
|
|
# Find the output of the lock refund tx to spend
|
|
spend_n, input_blinded_info = self.findOutputByNonce(lock_refund_tx_obj, nonce)
|
|
ensure(spend_n is not None, "Output not found in tx")
|
|
|
|
tx_lock_refund_id = lock_refund_tx_obj["txid"]
|
|
addr_out = self.pkh_to_address(pkh_dest)
|
|
addr_info = self.rpc_wallet("getaddressinfo", [addr_out])
|
|
output_pubkey_hex = addr_info["pubkey"]
|
|
|
|
A, B, lock2_value, C = extractScriptLockRefundScriptValues(script_lock_refund)
|
|
|
|
# Follower won't be able to decode output to check amount, shouldn't matter as fee is public and output is to leader, sum has to balance
|
|
|
|
inputs = [
|
|
{
|
|
"txid": tx_lock_refund_id,
|
|
"vout": spend_n,
|
|
"sequence": lock2_value,
|
|
"blindingfactor": input_blinded_info["blind"],
|
|
}
|
|
]
|
|
outputs = [
|
|
{
|
|
"type": "blind",
|
|
"amount": input_blinded_info["amount"],
|
|
"address": addr_out,
|
|
"pubkey": output_pubkey_hex,
|
|
}
|
|
]
|
|
params = [inputs, outputs]
|
|
rv = self.rpc_wallet("createrawparttransaction", params)
|
|
|
|
lock_refund_swipe_tx_hex = rv["hex"]
|
|
|
|
# Set dummy witness data for fee estimation
|
|
dummy_witness_stack = self.getScriptLockRefundSwipeTxDummyWitness(
|
|
script_lock_refund
|
|
)
|
|
dummy_witness_stack = [x.hex() for x in dummy_witness_stack]
|
|
|
|
# Use a junk change pubkey to avoid adding unused keys to the wallet
|
|
zero_change_key = self.getNewSecretKey()
|
|
zero_change_pubkey = self.getPubkey(zero_change_key)
|
|
inputs_info = {
|
|
"0": {
|
|
"value": input_blinded_info["amount"],
|
|
"blind": input_blinded_info["blind"],
|
|
"witnessstack": dummy_witness_stack,
|
|
}
|
|
}
|
|
outputs_info = rv["amounts"]
|
|
options = {
|
|
"changepubkey": zero_change_pubkey.hex(),
|
|
"feeRate": self.format_amount(tx_fee_rate),
|
|
"subtractFeeFromOutputs": [
|
|
0,
|
|
],
|
|
}
|
|
|
|
rv = self.rpc_wallet(
|
|
"fundrawtransactionfrom",
|
|
["blind", lock_refund_swipe_tx_hex, inputs_info, outputs_info, options],
|
|
)
|
|
lock_refund_swipe_tx_hex = rv["hex"]
|
|
|
|
return bytes.fromhex(lock_refund_swipe_tx_hex)
|
|
|
|
def getSpendableBalance(self) -> int:
|
|
return self.make_int(self.rpc_wallet("getbalances")["mine"]["blind_trusted"])
|
|
|
|
def publishBLockTx(
|
|
self,
|
|
vkbv: bytes,
|
|
Kbs: bytes,
|
|
output_amount: int,
|
|
feerate: int,
|
|
unlock_time: int = 0,
|
|
) -> bytes:
|
|
Kbv = self.getPubkey(vkbv)
|
|
sx_addr = self.formatStealthAddress(Kbv, Kbs)
|
|
self._log.debug("sx_addr: {}".format(sx_addr))
|
|
|
|
# TODO: Fund from other balances
|
|
params = [
|
|
"blind",
|
|
"blind",
|
|
[
|
|
{"address": sx_addr, "amount": self.format_amount(output_amount)},
|
|
],
|
|
"",
|
|
"",
|
|
self._anon_tx_ring_size,
|
|
1,
|
|
False,
|
|
{"conf_target": self._conf_target, "blind_watchonly_visible": True},
|
|
]
|
|
|
|
txid = self.rpc_wallet("sendtypeto", params)
|
|
return bytes.fromhex(txid)
|
|
|
|
def findTxB(
|
|
self,
|
|
kbv,
|
|
Kbs,
|
|
cb_swap_value,
|
|
cb_block_confirmed,
|
|
restore_height: int,
|
|
bid_sender: bool,
|
|
):
|
|
Kbv = self.getPubkey(kbv)
|
|
sx_addr = self.formatStealthAddress(Kbv, Kbs)
|
|
|
|
# Tx recipient must import the stealth address as watch only
|
|
if bid_sender:
|
|
cb_swap_value *= -1
|
|
else:
|
|
addr_info = self.rpc_wallet("getaddressinfo", [sx_addr])
|
|
if not addr_info["iswatchonly"]:
|
|
wif_scan_key = self.encodeKey(kbv)
|
|
self.rpc_wallet("importstealthaddress", [wif_scan_key, Kbs.hex()])
|
|
self._log.info("Imported watch-only sx_addr: {}".format(sx_addr))
|
|
self._log.info(
|
|
"Rescanning {} chain from height: {}".format(
|
|
self.coin_name(), restore_height
|
|
)
|
|
)
|
|
self.rpc_wallet("rescanblockchain", [restore_height])
|
|
|
|
params = [{"include_watchonly": True, "search": sx_addr}]
|
|
txns = self.rpc_wallet("filtertransactions", params)
|
|
|
|
if len(txns) == 1:
|
|
tx = txns[0]
|
|
assert (
|
|
tx["outputs"][0]["stealth_address"] == sx_addr
|
|
) # Should not be possible
|
|
ensure(tx["outputs"][0]["type"] == "blind", "Output is not anon")
|
|
|
|
if self.make_int(tx["outputs"][0]["amount"]) == cb_swap_value:
|
|
height = 0
|
|
if tx["confirmations"] > 0:
|
|
chain_height = self.rpc("getblockcount")
|
|
height = chain_height - (tx["confirmations"] - 1)
|
|
return {"txid": tx["txid"], "amount": cb_swap_value, "height": height}
|
|
else:
|
|
self._log.warning(
|
|
"Incorrect amount detected for coin b lock txn: {}".format(
|
|
tx["txid"]
|
|
)
|
|
)
|
|
return -1
|
|
return None
|
|
|
|
def spendBLockTx(
|
|
self,
|
|
chain_b_lock_txid: bytes,
|
|
address_to: str,
|
|
kbv: bytes,
|
|
kbs: bytes,
|
|
cb_swap_value: int,
|
|
b_fee: int,
|
|
restore_height: int,
|
|
spend_actual_balance: bool = False,
|
|
lock_tx_vout=None,
|
|
) -> bytes:
|
|
Kbv = self.getPubkey(kbv)
|
|
Kbs = self.getPubkey(kbs)
|
|
sx_addr = self.formatStealthAddress(Kbv, Kbs)
|
|
addr_info = self.rpc_wallet("getaddressinfo", [sx_addr])
|
|
if not addr_info["ismine"]:
|
|
wif_scan_key = self.encodeKey(kbv)
|
|
wif_spend_key = self.encodeKey(kbs)
|
|
self.rpc_wallet("importstealthaddress", [wif_scan_key, wif_spend_key])
|
|
self._log.info("Imported spend key for sx_addr: {}".format(sx_addr))
|
|
self._log.info(
|
|
"Rescanning {} chain from height: {}".format(
|
|
self.coin_name(), restore_height
|
|
)
|
|
)
|
|
self.rpc_wallet("rescanblockchain", [restore_height])
|
|
|
|
# TODO: Remove workaround
|
|
# utxos = self.rpc_wallet('listunspentblind', [1, 9999999, [sx_addr]])
|
|
utxos = []
|
|
all_utxos = self.rpc_wallet("listunspentblind", [1, 9999999])
|
|
for utxo in all_utxos:
|
|
if utxo.get("stealth_address", "_") == sx_addr:
|
|
utxos.append(utxo)
|
|
if len(utxos) < 1:
|
|
raise TemporaryError("No spendable outputs")
|
|
elif len(utxos) > 1:
|
|
raise ValueError("Too many spendable outputs")
|
|
|
|
utxo = utxos[0]
|
|
utxo_sats = self.make_int(utxo["amount"])
|
|
|
|
if spend_actual_balance and utxo_sats != cb_swap_value:
|
|
self._log.warning(
|
|
"Spending actual balance {}, not swap value {}.".format(
|
|
utxo_sats, cb_swap_value
|
|
)
|
|
)
|
|
cb_swap_value = utxo_sats
|
|
|
|
inputs = [
|
|
{"tx": utxo["txid"], "n": utxo["vout"]},
|
|
]
|
|
params = [
|
|
"blind",
|
|
"blind",
|
|
[
|
|
{
|
|
"address": address_to,
|
|
"amount": self.format_amount(cb_swap_value),
|
|
"subfee": True,
|
|
},
|
|
],
|
|
"",
|
|
"",
|
|
self._anon_tx_ring_size,
|
|
1,
|
|
False,
|
|
{"conf_target": self._conf_target, "inputs": inputs, "show_fee": True},
|
|
]
|
|
rv = self.rpc_wallet("sendtypeto", params)
|
|
return bytes.fromhex(rv["txid"])
|
|
|
|
def findTxnByHash(self, txid_hex):
|
|
# txindex is enabled for Particl
|
|
|
|
try:
|
|
rv = self.rpc("getrawtransaction", [txid_hex, True])
|
|
except Exception as e: # noqa: F841
|
|
self._log.debug(
|
|
"findTxnByHash getrawtransaction failed: {}".format(txid_hex)
|
|
)
|
|
return None
|
|
|
|
if "confirmations" in rv and rv["confirmations"] >= self.blocks_confirmed:
|
|
return {"txid": txid_hex, "amount": 0, "height": rv["height"]}
|
|
|
|
return None
|
|
|
|
def createRawFundedTransaction(
|
|
self,
|
|
addr_to: str,
|
|
amount: int,
|
|
sub_fee: bool = False,
|
|
lock_unspents: bool = True,
|
|
) -> str:
|
|
txn = self.rpc_wallet(
|
|
"createrawtransaction", [[], {addr_to: self.format_amount(amount)}]
|
|
)
|
|
|
|
options = {
|
|
"lockUnspents": lock_unspents,
|
|
"conf_target": self._conf_target,
|
|
}
|
|
if sub_fee:
|
|
options["subtractFeeFromOutputs"] = [
|
|
0,
|
|
]
|
|
return self.rpc_wallet("fundrawtransactionfrom", ["blind", txn, options])["hex"]
|
|
|
|
|
|
class PARTInterfaceAnon(PARTInterface):
|
|
@staticmethod
|
|
def balance_type():
|
|
return BalanceTypes.ANON
|
|
|
|
@staticmethod
|
|
def xmr_swap_a_lock_spend_tx_vsize() -> int:
|
|
raise ValueError("Not possible")
|
|
|
|
@staticmethod
|
|
def xmr_swap_b_lock_spend_tx_vsize() -> int:
|
|
# TODO: Estimate with ringsize
|
|
return 1153
|
|
|
|
@staticmethod
|
|
def depth_spendable() -> int:
|
|
return 12
|
|
|
|
def coin_name(self) -> str:
|
|
return super().coin_name() + " Anon"
|
|
|
|
def publishBLockTx(
|
|
self,
|
|
kbv: bytes,
|
|
Kbs: bytes,
|
|
output_amount: int,
|
|
feerate: int,
|
|
unlock_time: int = 0,
|
|
) -> bytes:
|
|
Kbv = self.getPubkey(kbv)
|
|
sx_addr = self.formatStealthAddress(Kbv, Kbs)
|
|
|
|
# TODO: Fund from other balances
|
|
params = [
|
|
"anon",
|
|
"anon",
|
|
[
|
|
{"address": sx_addr, "amount": self.format_amount(output_amount)},
|
|
],
|
|
"",
|
|
"",
|
|
self._anon_tx_ring_size,
|
|
1,
|
|
False,
|
|
{"conf_target": self._conf_target, "blind_watchonly_visible": True},
|
|
]
|
|
|
|
txid = self.rpc_wallet("sendtypeto", params)
|
|
return bytes.fromhex(txid)
|
|
|
|
def findTxB(
|
|
self, kbv, Kbs, cb_swap_value, cb_block_confirmed, restore_height, bid_sender
|
|
):
|
|
Kbv = self.getPubkey(kbv)
|
|
sx_addr = self.formatStealthAddress(Kbv, Kbs)
|
|
self._log.debug("sx_addr: {}".format(sx_addr))
|
|
|
|
# Tx recipient must import the stealth address as watch only
|
|
if bid_sender:
|
|
cb_swap_value *= -1
|
|
else:
|
|
addr_info = self.rpc_wallet("getaddressinfo", [sx_addr])
|
|
if not addr_info["iswatchonly"]:
|
|
wif_scan_key = self.encodeKey(kbv)
|
|
self.rpc_wallet("importstealthaddress", [wif_scan_key, Kbs.hex()])
|
|
self._log.info("Imported watch-only sx_addr: {}".format(sx_addr))
|
|
self._log.info(
|
|
"Rescanning {} chain from height: {}".format(
|
|
self.coin_name(), restore_height
|
|
)
|
|
)
|
|
self.rpc_wallet("rescanblockchain", [restore_height])
|
|
|
|
params = [{"include_watchonly": True, "search": sx_addr}]
|
|
txns = self.rpc_wallet("filtertransactions", params)
|
|
|
|
if len(txns) == 1:
|
|
tx = txns[0]
|
|
assert (
|
|
tx["outputs"][0]["stealth_address"] == sx_addr
|
|
) # Should not be possible
|
|
ensure(tx["outputs"][0]["type"] == "anon", "Output is not anon")
|
|
|
|
if self.make_int(tx["outputs"][0]["amount"]) == cb_swap_value:
|
|
height = 0
|
|
if tx["confirmations"] > 0:
|
|
chain_height = self.rpc("getblockcount")
|
|
height = chain_height - (tx["confirmations"] - 1)
|
|
return {"txid": tx["txid"], "amount": cb_swap_value, "height": height}
|
|
else:
|
|
self._log.warning(
|
|
"Incorrect amount detected for coin b lock txn: {}".format(
|
|
tx["txid"]
|
|
)
|
|
)
|
|
return -1
|
|
return None
|
|
|
|
def spendBLockTx(
|
|
self,
|
|
chain_b_lock_txid: bytes,
|
|
address_to: str,
|
|
kbv: bytes,
|
|
kbs: bytes,
|
|
cb_swap_value: int,
|
|
b_fee: int,
|
|
restore_height: int,
|
|
spend_actual_balance: bool = False,
|
|
lock_tx_vout=None,
|
|
) -> bytes:
|
|
Kbv = self.getPubkey(kbv)
|
|
Kbs = self.getPubkey(kbs)
|
|
sx_addr = self.formatStealthAddress(Kbv, Kbs)
|
|
addr_info = self.rpc_wallet("getaddressinfo", [sx_addr])
|
|
if not addr_info["ismine"]:
|
|
wif_scan_key = self.encodeKey(kbv)
|
|
wif_spend_key = self.encodeKey(kbs)
|
|
self.rpc_wallet("importstealthaddress", [wif_scan_key, wif_spend_key])
|
|
self._log.info("Imported spend key for sx_addr: {}".format(sx_addr))
|
|
self._log.info(
|
|
"Rescanning {} chain from height: {}".format(
|
|
self.coin_name(), restore_height
|
|
)
|
|
)
|
|
self.rpc_wallet("rescanblockchain", [restore_height])
|
|
|
|
autxos = self.rpc_wallet("listunspentanon", [1, 9999999, [sx_addr]])
|
|
|
|
if len(autxos) < 1:
|
|
raise TemporaryError("No spendable outputs")
|
|
elif len(autxos) > 1:
|
|
raise ValueError("Too many spendable outputs")
|
|
|
|
utxo = autxos[0]
|
|
utxo_sats = self.make_int(utxo["amount"])
|
|
|
|
if spend_actual_balance and utxo_sats != cb_swap_value:
|
|
self._log.warning(
|
|
"Spending actual balance {}, not swap value {}.".format(
|
|
utxo_sats, cb_swap_value
|
|
)
|
|
)
|
|
cb_swap_value = utxo_sats
|
|
|
|
inputs = [
|
|
{"tx": utxo["txid"], "n": utxo["vout"]},
|
|
]
|
|
params = [
|
|
"anon",
|
|
"anon",
|
|
[
|
|
{
|
|
"address": address_to,
|
|
"amount": self.format_amount(cb_swap_value),
|
|
"subfee": True,
|
|
},
|
|
],
|
|
"",
|
|
"",
|
|
self._anon_tx_ring_size,
|
|
1,
|
|
False,
|
|
{"conf_target": self._conf_target, "inputs": inputs, "show_fee": True},
|
|
]
|
|
rv = self.rpc_wallet("sendtypeto", params)
|
|
return bytes.fromhex(rv["txid"])
|
|
|
|
def findTxnByHash(self, txid_hex: str):
|
|
# txindex is enabled for Particl
|
|
|
|
try:
|
|
rv = self.rpc("getrawtransaction", [txid_hex, True])
|
|
except Exception as e: # noqa: F841
|
|
self._log.debug(
|
|
"findTxnByHash getrawtransaction failed: {}".format(txid_hex)
|
|
)
|
|
return None
|
|
|
|
if "confirmations" in rv and rv["confirmations"] >= self.blocks_confirmed:
|
|
return {"txid": txid_hex, "amount": 0, "height": rv["height"]}
|
|
|
|
return None
|
|
|
|
def getSpendableBalance(self) -> int:
|
|
return self.make_int(self.rpc_wallet("getbalances")["mine"]["anon_trusted"])
|