Add non-segwit Firo support.

Rework tests to combine atomic and xmr test cases.
Modify btc interface to support P2WSH_nested_in_BIP16_P2SH
Add coin feature tests to test_btc_xmr.py
This commit is contained in:
tecnovert
2022-11-07 22:31:10 +02:00
parent c0c2c8b9bb
commit ca264db0d0
21 changed files with 1400 additions and 411 deletions

View File

@@ -320,7 +320,7 @@ class BTCInterface(CoinInterface):
def decodeAddress(self, address):
bech32_prefix = self.chainparams_network()['hrp']
if address.startswith(bech32_prefix + '1'):
if len(bech32_prefix) > 0 and address.startswith(bech32_prefix + '1'):
return bytes(segwit_addr.decode(bech32_prefix, address)[1])
return decodeAddress(address)[1:]
@@ -338,12 +338,22 @@ class BTCInterface(CoinInterface):
checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest()
return b58encode(data + checksum[0:4])
def sh_to_address(self, sh):
assert (len(sh) == 20)
prefix = self.chainparams_network()['script_address']
data = bytes((prefix,)) + sh
checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest()
return b58encode(data + checksum[0:4])
def encode_p2wsh(self, script):
bech32_prefix = self.chainparams_network()['hrp']
version = 0
program = script[2:] # strip version and length
return segwit_addr.encode(bech32_prefix, version, program)
def encodeScriptDest(self, script):
return self.encode_p2wsh(script)
def encode_p2sh(self, script):
return pubkeyToAddress(self.chainparams_network()['script_address'], script)
@@ -375,6 +385,12 @@ class BTCInterface(CoinInterface):
def encodePubkey(self, pk):
return pointToCPK(pk)
def encodeSegwitAddress(self, key_hash):
return segwit_addr.encode(self.chainparams_network()['hrp'], 0, key_hash)
def decodeSegwitAddress(self, addr):
return bytes(segwit_addr.decode(self.chainparams_network()['hrp'], addr)[1])
def decodePubkey(self, pke):
return CPKToPoint(pke)
@@ -415,7 +431,7 @@ class BTCInterface(CoinInterface):
return CScript([2, Kal_enc, Kaf_enc, 2, CScriptOp(OP_CHECKMULTISIG)])
def createScriptLockTx(self, value, Kal, Kaf, vkbv=None):
def createSCLockTx(self, value, Kal, Kaf, vkbv=None):
script = self.genScriptLockTxScript(Kal, Kaf)
tx = CTransaction()
tx.nVersion = self.txVersion()
@@ -423,7 +439,7 @@ class BTCInterface(CoinInterface):
return tx.serialize(), script
def fundScriptLockTx(self, tx_bytes, feerate, vkbv=None):
def fundSCLockTx(self, tx_bytes, feerate, vkbv=None):
return self.fundTx(tx_bytes, feerate)
def extractScriptLockRefundScriptValues(self, script_bytes):
@@ -470,11 +486,11 @@ class BTCInterface(CoinInterface):
Kaf_enc, CScriptOp(OP_CHECKSIG),
CScriptOp(OP_ENDIF)])
def createScriptLockRefundTx(self, tx_lock_bytes, script_lock, Kal, Kaf, lock1_value, csv_val, tx_fee_rate, vkbv=None):
def createSCLockRefundTx(self, tx_lock_bytes, script_lock, Kal, Kaf, lock1_value, csv_val, tx_fee_rate, vkbv=None):
tx_lock = CTransaction()
tx_lock = FromHex(tx_lock, tx_lock_bytes.hex())
output_script = CScript([OP_0, hashlib.sha256(script_lock).digest()])
output_script = self.getScriptDest(script_lock)
locked_n = findOutput(tx_lock, output_script)
ensure(locked_n is not None, 'Output not found in tx')
locked_coin = tx_lock.vout[locked_n].nValue
@@ -485,8 +501,10 @@ class BTCInterface(CoinInterface):
refund_script = self.genScriptLockRefundTxScript(Kal, Kaf, csv_val)
tx = CTransaction()
tx.nVersion = self.txVersion()
tx.vin.append(CTxIn(COutPoint(tx_lock_id_int, locked_n), nSequence=lock1_value))
tx.vout.append(self.txoType()(locked_coin, CScript([OP_0, hashlib.sha256(refund_script).digest()])))
tx.vin.append(CTxIn(COutPoint(tx_lock_id_int, locked_n),
nSequence=lock1_value,
scriptSig=self.getScriptScriptSig(script_lock)))
tx.vout.append(self.txoType()(locked_coin, self.getScriptDest(refund_script)))
dummy_witness_stack = self.getScriptLockTxDummyWitness(script_lock)
witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack)
@@ -495,19 +513,19 @@ class BTCInterface(CoinInterface):
tx.vout[0].nValue = locked_coin - pay_fee
tx.rehash()
self._log.info('createScriptLockRefundTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.',
self._log.info('createSCLockRefundTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.',
i2h(tx.sha256), tx_fee_rate, vsize, pay_fee)
return tx.serialize(), refund_script, tx.vout[0].nValue
def createScriptLockRefundSpendTx(self, tx_lock_refund_bytes, script_lock_refund, pkh_refund_to, tx_fee_rate, vkbv=None):
def createSCLockRefundSpendTx(self, tx_lock_refund_bytes, script_lock_refund, pkh_refund_to, tx_fee_rate, vkbv=None):
# Returns the coinA locked coin to the leader
# The follower will sign the multisig path with a signature encumbered by the leader's coinB spend pubkey
# If the leader publishes the decrypted signature the leader's coinB spend privatekey will be revealed to the follower
tx_lock_refund = self.loadTx(tx_lock_refund_bytes)
output_script = CScript([OP_0, hashlib.sha256(script_lock_refund).digest()])
output_script = self.getScriptDest(script_lock_refund)
locked_n = findOutput(tx_lock_refund, output_script)
ensure(locked_n is not None, 'Output not found in tx')
locked_coin = tx_lock_refund.vout[locked_n].nValue
@@ -517,7 +535,9 @@ class BTCInterface(CoinInterface):
tx = CTransaction()
tx.nVersion = self.txVersion()
tx.vin.append(CTxIn(COutPoint(tx_lock_refund_hash_int, locked_n), nSequence=0))
tx.vin.append(CTxIn(COutPoint(tx_lock_refund_hash_int, locked_n),
nSequence=0,
scriptSig=self.getScriptScriptSig(script_lock_refund)))
tx.vout.append(self.txoType()(locked_coin, self.getScriptForPubkeyHash(pkh_refund_to)))
@@ -528,18 +548,18 @@ class BTCInterface(CoinInterface):
tx.vout[0].nValue = locked_coin - pay_fee
tx.rehash()
self._log.info('createScriptLockRefundSpendTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.',
self._log.info('createSCLockRefundSpendTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.',
i2h(tx.sha256), tx_fee_rate, vsize, pay_fee)
return tx.serialize()
def createScriptLockRefundSpendToFTx(self, tx_lock_refund_bytes, script_lock_refund, pkh_dest, tx_fee_rate, vkbv=None):
def createSCLockRefundSpendToFTx(self, tx_lock_refund_bytes, script_lock_refund, pkh_dest, tx_fee_rate, vkbv=None):
# lock refund swipe tx
# Sends the coinA locked coin to the follower
tx_lock_refund = self.loadTx(tx_lock_refund_bytes)
output_script = CScript([OP_0, hashlib.sha256(script_lock_refund).digest()])
output_script = self.getScriptDest(script_lock_refund)
locked_n = findOutput(tx_lock_refund, output_script)
ensure(locked_n is not None, 'Output not found in tx')
locked_coin = tx_lock_refund.vout[locked_n].nValue
@@ -551,7 +571,9 @@ class BTCInterface(CoinInterface):
tx = CTransaction()
tx.nVersion = self.txVersion()
tx.vin.append(CTxIn(COutPoint(tx_lock_refund_hash_int, locked_n), nSequence=lock2_value))
tx.vin.append(CTxIn(COutPoint(tx_lock_refund_hash_int, locked_n),
nSequence=lock2_value,
scriptSig=self.getScriptScriptSig(script_lock_refund)))
tx.vout.append(self.txoType()(locked_coin, self.getScriptForPubkeyHash(pkh_dest)))
@@ -562,14 +584,14 @@ class BTCInterface(CoinInterface):
tx.vout[0].nValue = locked_coin - pay_fee
tx.rehash()
self._log.info('createScriptLockRefundSpendToFTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.',
self._log.info('createSCLockRefundSpendToFTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.',
i2h(tx.sha256), tx_fee_rate, vsize, pay_fee)
return tx.serialize()
def createScriptLockSpendTx(self, tx_lock_bytes, script_lock, pkh_dest, tx_fee_rate, vkbv=None):
def createSCLockSpendTx(self, tx_lock_bytes, script_lock, pkh_dest, tx_fee_rate, vkbv=None):
tx_lock = self.loadTx(tx_lock_bytes)
output_script = CScript([OP_0, hashlib.sha256(script_lock).digest()])
output_script = self.getScriptDest(script_lock)
locked_n = findOutput(tx_lock, output_script)
ensure(locked_n is not None, 'Output not found in tx')
locked_coin = tx_lock.vout[locked_n].nValue
@@ -579,7 +601,8 @@ class BTCInterface(CoinInterface):
tx = CTransaction()
tx.nVersion = self.txVersion()
tx.vin.append(CTxIn(COutPoint(tx_lock_id_int, locked_n)))
tx.vin.append(CTxIn(COutPoint(tx_lock_id_int, locked_n),
scriptSig=self.getScriptScriptSig(script_lock)))
tx.vout.append(self.txoType()(locked_coin, self.getScriptForPubkeyHash(pkh_dest)))
@@ -590,16 +613,16 @@ class BTCInterface(CoinInterface):
tx.vout[0].nValue = locked_coin - pay_fee
tx.rehash()
self._log.info('createScriptLockSpendTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.',
self._log.info('createSCLockSpendTx %s:\n fee_rate, vsize, fee: %ld, %ld, %ld.',
i2h(tx.sha256), tx_fee_rate, vsize, pay_fee)
return tx.serialize()
def verifyLockTx(self, tx_bytes, script_out,
swap_value,
Kal, Kaf,
feerate,
check_lock_tx_inputs, vkbv=None):
def verifySCLockTx(self, tx_bytes, script_out,
swap_value,
Kal, Kaf,
feerate,
check_lock_tx_inputs, vkbv=None):
# Verify:
#
@@ -614,7 +637,7 @@ class BTCInterface(CoinInterface):
ensure(tx.nVersion == self.txVersion(), 'Bad version')
ensure(tx.nLockTime == 0, 'Bad nLockTime') # TODO match txns created by cores
script_pk = CScript([OP_0, hashlib.sha256(script_out).digest()])
script_pk = self.getScriptDest(script_out)
locked_n = findOutput(tx, script_pk)
ensure(locked_n is not None, 'Output not found in tx')
locked_coin = tx.vout[locked_n].nValue
@@ -663,9 +686,9 @@ class BTCInterface(CoinInterface):
return txid, locked_n
def verifyLockRefundTx(self, tx_bytes, lock_tx_bytes, script_out,
prevout_id, prevout_n, prevout_seq, prevout_script,
Kal, Kaf, csv_val_expect, swap_value, feerate, vkbv=None):
def verifySCLockRefundTx(self, tx_bytes, lock_tx_bytes, script_out,
prevout_id, prevout_n, prevout_seq, prevout_script,
Kal, Kaf, csv_val_expect, swap_value, feerate, vkbv=None):
# Verify:
# Must have only one input with correct prevout and sequence
# Must have only one output to the p2wsh of the lock refund script
@@ -680,12 +703,12 @@ class BTCInterface(CoinInterface):
ensure(len(tx.vin) == 1, 'tx doesn\'t have one input')
ensure(tx.vin[0].nSequence == prevout_seq, 'Bad input nSequence')
ensure(len(tx.vin[0].scriptSig) == 0, 'Input scriptsig not empty')
ensure(tx.vin[0].scriptSig == self.getScriptScriptSig(prevout_script), 'Input scriptsig mismatch')
ensure(tx.vin[0].prevout.hash == b2i(prevout_id) and tx.vin[0].prevout.n == prevout_n, 'Input prevout mismatch')
ensure(len(tx.vout) == 1, 'tx doesn\'t have one output')
script_pk = CScript([OP_0, hashlib.sha256(script_out).digest()])
script_pk = self.getScriptDest(script_out)
locked_n = findOutput(tx, script_pk)
ensure(locked_n is not None, 'Output not found in tx')
locked_coin = tx.vout[locked_n].nValue
@@ -712,10 +735,10 @@ class BTCInterface(CoinInterface):
return txid, locked_coin, locked_n
def verifyLockRefundSpendTx(self, tx_bytes, lock_refund_tx_bytes,
lock_refund_tx_id, prevout_script,
Kal,
prevout_n, prevout_value, feerate, vkbv=None):
def verifySCLockRefundSpendTx(self, tx_bytes, lock_refund_tx_bytes,
lock_refund_tx_id, prevout_script,
Kal,
prevout_n, prevout_value, feerate, vkbv=None):
# Verify:
# Must have only one input with correct prevout (n is always 0) and sequence
# Must have only one output sending lock refund tx value - fee to leader's address, TODO: follower shouldn't need to verify destination addr
@@ -728,7 +751,7 @@ class BTCInterface(CoinInterface):
ensure(len(tx.vin) == 1, 'tx doesn\'t have one input')
ensure(tx.vin[0].nSequence == 0, 'Bad input nSequence')
ensure(len(tx.vin[0].scriptSig) == 0, 'Input scriptsig not empty')
ensure(tx.vin[0].scriptSig == self.getScriptScriptSig(prevout_script), 'Input scriptsig mismatch')
ensure(tx.vin[0].prevout.hash == b2i(lock_refund_tx_id) and tx.vin[0].prevout.n == 0, 'Input prevout mismatch')
ensure(len(tx.vout) == 1, 'tx doesn\'t have one output')
@@ -756,9 +779,9 @@ class BTCInterface(CoinInterface):
return True
def verifyLockSpendTx(self, tx_bytes,
lock_tx_bytes, lock_tx_script,
a_pkhash_f, feerate, vkbv=None):
def verifySCLockSpendTx(self, tx_bytes,
lock_tx_bytes, lock_tx_script,
a_pkhash_f, feerate, vkbv=None):
# Verify:
# Must have only one input with correct prevout (n is always 0) and sequence
# Must have only one output with destination and amount
@@ -774,13 +797,13 @@ class BTCInterface(CoinInterface):
lock_tx = self.loadTx(lock_tx_bytes)
lock_tx_id = self.getTxid(lock_tx)
output_script = CScript([OP_0, hashlib.sha256(lock_tx_script).digest()])
output_script = self.getScriptDest(lock_tx_script)
locked_n = findOutput(lock_tx, output_script)
ensure(locked_n is not None, 'Output not found in tx')
locked_coin = lock_tx.vout[locked_n].nValue
ensure(tx.vin[0].nSequence == 0, 'Bad input nSequence')
ensure(len(tx.vin[0].scriptSig) == 0, 'Input scriptsig not empty')
ensure(tx.vin[0].scriptSig == self.getScriptScriptSig(lock_tx_script), 'Input scriptsig mismatch')
ensure(tx.vin[0].prevout.hash == b2i(lock_tx_id) and tx.vin[0].prevout.n == locked_n, 'Input prevout mismatch')
ensure(len(tx.vout) == 1, 'tx doesn\'t have one output')
@@ -891,7 +914,7 @@ class BTCInterface(CoinInterface):
def getTxOutputPos(self, tx, script):
if isinstance(tx, bytes):
tx = self.loadTx(tx)
script_pk = CScript([OP_0, hashlib.sha256(script).digest()])
script_pk = self.getScriptDest(script)
return findOutput(tx, script_pk)
def getPubkeyHash(self, K):
@@ -900,6 +923,9 @@ class BTCInterface(CoinInterface):
def getScriptDest(self, script):
return CScript([OP_0, hashlib.sha256(script).digest()])
def getScriptScriptSig(self, script):
return bytes()
def getPkDest(self, K):
return self.getScriptForPubkeyHash(self.getPubkeyHash(K))
@@ -1003,11 +1029,22 @@ class BTCInterface(CoinInterface):
def spendBLockTx(self, chain_b_lock_txid, address_to, kbv, kbs, cb_swap_value, b_fee, restore_height):
raise ValueError('TODO')
def importWatchOnlyAddress(self, address, label):
self.rpc_callback('importaddress', [address, label, False])
def isWatchOnlyAddress(self, address):
addr_info = self.rpc_callback('getaddressinfo', [address])
return addr_info['iswatchonly']
def getSCLockScriptAddress(self, lock_script):
lock_tx_dest = self.getScriptDest(lock_script)
return self.encodeScriptDest(lock_tx_dest)
def getLockTxHeight(self, txid, dest_address, bid_amount, rescan_from, find_index=False):
# Add watchonly address and rescan if required
addr_info = self.rpc_callback('getaddressinfo', [dest_address])
if not addr_info['iswatchonly']:
ro = self.rpc_callback('importaddress', [dest_address, 'bid', False])
if not self.isWatchOnlyAddress(dest_address):
self.importWatchOnlyAddress(dest_address, 'bid')
self._log.info('Imported watch-only addr: {}'.format(dest_address))
self._log.info('Rescanning {} chain from height: {}'.format(self.coin_name(), rescan_from))
self.rpc_callback('rescanblockchain', [rescan_from])
@@ -1082,7 +1119,7 @@ class BTCInterface(CoinInterface):
privkey = PrivateKey(k)
return privkey.sign_recoverable(message_hash, hasher=None)[:64]
def verifyCompact(self, K, message, sig):
def verifyCompactSig(self, K, message, sig):
message_hash = hashlib.sha256(bytes(message, 'utf-8')).digest()
pubkey = PublicKey(K)
rv = pubkey.verify_compact(sig, message_hash, hasher=None)
@@ -1090,7 +1127,7 @@ class BTCInterface(CoinInterface):
def verifyMessage(self, address, message, signature, message_magic=None) -> bool:
if message_magic is None:
message_magic = self.chainparams_network()['message_magic']
message_magic = self.chainparams()['message_magic']
message_bytes = SerialiseNumCompact(len(message_magic)) + bytes(message_magic, 'utf-8') + SerialiseNumCompact(len(message)) + bytes(message, 'utf-8')
message_hash = hashlib.sha256(hashlib.sha256(message_bytes).digest()).digest()
@@ -1185,9 +1222,61 @@ class BTCInterface(CoinInterface):
def getBlockWithTxns(self, block_hash):
return self.rpc_callback('getblock', [block_hash, 2])
def getUnspentsByAddr(self):
unspent_addr = dict()
unspent = self.rpc_callback('listunspent')
for u in unspent:
if u['spendable'] is not True:
continue
unspent_addr[u['address']] = unspent_addr.get(u['address'], 0) + self.make_int(u['amount'], r=1)
return unspent_addr
def getUTXOBalance(self, address):
num_blocks = self.rpc_callback('getblockcount')
sum_unspent = 0
self._log.debug('[rm] scantxoutset start') # scantxoutset is slow
ro = self.rpc_callback('scantxoutset', ['start', ['addr({})'.format(address)]]) # TODO: Use combo(address) where possible
self._log.debug('[rm] scantxoutset end')
for o in ro['unspents']:
sum_unspent += self.make_int(o['amount'])
return sum_unspent
def getProofOfFunds(self, amount_for, extra_commit_bytes):
# TODO: Lock unspent and use same output/s to fund bid
unspent_addr = self.getUnspentsByAddr()
sign_for_addr = None
for addr, value in unspent_addr.items():
if value >= amount_for:
sign_for_addr = addr
break
ensure(sign_for_addr is not None, 'Could not find address with enough funds for proof')
self._log.debug('sign_for_addr %s', sign_for_addr)
if self._use_segwit: # TODO: Use isSegwitAddress when scantxoutset can use combo
# 'Address does not refer to key' for non p2pkh
pkh = self.decodeAddress(sign_for_addr)
sign_for_addr = self.pkh_to_address(pkh)
self._log.debug('sign_for_addr converted %s', sign_for_addr)
signature = self.rpc_callback('signmessage', [sign_for_addr, sign_for_addr + '_swap_proof_' + extra_commit_bytes.hex()])
return (sign_for_addr, signature)
def verifyProofOfFunds(self, address, signature, extra_commit_bytes):
passed = self.verifyMessage(address, address + '_swap_proof_' + extra_commit_bytes.hex(), signature)
ensure(passed is True, 'Proof of funds signature invalid')
if self._use_segwit:
address = self.encodeSegwitAddress(decodeAddress(address)[1:])
return self.getUTXOBalance(address)
def testBTCInterface():
print('testBTCInterface')
print('TODO: testBTCInterface')
if __name__ == "__main__":