# -*- coding: utf-8 -*- # Copyright (c) 2022-2023 tecnovert # Copyright (c) 2024-2026 The Basicswap developers # Distributed under the MIT software license, see the accompanying # file LICENSE or http://www.opensource.org/licenses/mit-license.php. import hashlib import random from .btc import BTCInterface, find_vout_for_address_from_txobj from basicswap.util import ( i2b, ensure, ) from basicswap.rpc import make_rpc_func from basicswap.util.crypto import hash160 from basicswap.util.address import decodeAddress from basicswap.chainparams import Coins from basicswap.interface.contrib.firo_test_framework.script import ( CScript, OP_DUP, OP_EQUAL, OP_HASH160, OP_CHECKSIG, OP_EQUALVERIFY, ) from basicswap.interface.contrib.firo_test_framework.mininode import ( CBlock, FromHex, CTransaction, ) class FIROInterface(BTCInterface): @staticmethod def coin_type(): return Coins.FIRO def __init__(self, coin_settings, network, swap_client=None, **kwargs): super().__init__( coin_settings=coin_settings, network=network, swap_client=swap_client, **kwargs, ) # No multiwallet support self.rpc_wallet = make_rpc_func( self._rpcport, self._rpcauth, host=self._rpc_host ) self.rpc_wallet_watch = self.rpc_wallet if "wallet_name" in coin_settings: raise ValueError(f"Invalid setting for {self.coin_name()}: wallet_name") def getExchangeName(self, exchange_name: str) -> str: return "zcoin" def initialiseWallet(self, key, restore_time: int = -1): # load with -hdseed= parameter pass def checkWallets(self) -> int: return 1 def encryptWallet(self, password: str, check_seed: bool = True): # Watchonly wallets are not encrypted # Firo shuts down after encryptwallet seed_id_before: str = self.getWalletSeedID() if check_seed else "Not found" self.rpc_wallet("encryptwallet", [password], timeout=120) if check_seed is False or seed_id_before == "Not found": return seed_id_after: str = self.getWalletSeedID() if seed_id_before == seed_id_after: return self._log.warning(f"{self.ticker()} wallet seed changed after encryption.") self._log.debug( f"seed_id_before: {seed_id_before} seed_id_after: {seed_id_after}." ) self.setWalletSeedWarning(True) def getNewAddress(self, use_segwit, label="swap_receive"): return self.rpc("getnewaddress", [label]) # addr_plain = self.rpc('getnewaddress', [label]) # return self.rpc('addwitnessaddress', [addr_plain]) def decodeAddress(self, address): return decodeAddress(address)[1:] def encodeSegwitAddress(self, script): raise ValueError("TODO") def decodeSegwitAddress(self, addr): raise ValueError("TODO") def isWatchOnlyAddress(self, address): addr_info = self.rpc("validateaddress", [address]) return addr_info["iswatchonly"] def isAddressMine(self, address: str, or_watch_only: bool = False) -> bool: addr_info = self.rpc("validateaddress", [address]) if not or_watch_only: return addr_info["ismine"] return addr_info["ismine"] or addr_info["iswatchonly"] def getNewSparkAddress(self) -> str: try: return self.rpc_wallet("getnewsparkaddress")[0] except Exception as e: self._log.error(f"getnewsparkaddress failed: {str(e)}") raise def getNewStealthAddress(self): """Get a new Spark address (alias for consistency with other coins).""" return self.getNewSparkAddress() def getWalletInfo(self): """Get wallet info including Spark balance.""" rv = super(FIROInterface, self).getWalletInfo() try: spark_balance_info = self.rpc("getsparkbalance") # getsparkbalance returns amounts in atomic units (satoshis) # Field names: availableBalance, unconfirmedBalance, fullBalance confirmed = spark_balance_info.get("availableBalance", 0) unconfirmed = spark_balance_info.get("unconfirmedBalance", 0) full_balance = spark_balance_info.get("fullBalance", 0) # Values are already in atomic units, keep as integers # basicswap.py will format them using format_amount rv["spark_balance"] = confirmed if confirmed else 0 rv["spark_unconfirmed"] = unconfirmed if unconfirmed else 0 immature = full_balance - confirmed - unconfirmed rv["spark_immature"] = immature if immature > 0 else 0 except Exception as e: self._log.warning(f"getsparkbalance failed: {str(e)}") rv["spark_balance"] = 0 rv["spark_unconfirmed"] = 0 rv["spark_immature"] = 0 return rv def createUTXO(self, value_sats: int): # Create a new address and send value_sats to it spendable_balance = self.getSpendableBalance() if spendable_balance < value_sats: raise ValueError("Balance too low") address = self.getNewAddress(self._use_segwit, "create_utxo") return ( self.withdrawCoin(self.format_amount(value_sats), "plain", address, False), address, ) def withdrawCoin(self, value, type_from: str, addr_to: str, subfee: bool) -> str: """Withdraw coins, supporting both transparent and Spark transactions. Args: value: Amount to withdraw type_from: "plain" for transparent, "spark" for Spark addr_to: Destination address subfee: Whether to subtract fee from amount """ type_to = "spark" if addr_to.startswith("sm1") else "plain" if "spark" in (type_from, type_to): # RPC format: spendspark {"address": {"amount": ..., "subtractfee": ..., "memo": ...}} # RPC wrapper will serialize this as: {"method": "spendspark", "params": [{...}], ...} try: if type_from == "spark": # Construct params: dict where address is the key, wrapped in array for RPC params = [ {"address": addr_to, "amount": value, "subtractfee": subfee} ] result = self.rpc_wallet("spendspark", params) else: # Use automintspark to perform a plain -> spark tx of full balance balance = self.rpc_wallet("getbalance") if str(balance) == str(value): result = self.rpc_wallet("automintspark") else: # subfee param is available on plain -> spark transactions mint_params = {"amount": value} if subfee: mint_params["subfee"] = True params = [{addr_to: mint_params}] result = self.rpc_wallet("mintspark", params) # spendspark returns a txid string directly, in a result dict, or as an array if isinstance(result, list) and len(result) > 0: return result[0] if isinstance(result, dict): return result.get("txid", result.get("tx", "")) return result except Exception as e: self._log.error(f"spark tx failed: {str(e)}") raise else: # Use standard sendtoaddress for transparent transactions params = [addr_to, value, "", "", subfee] return self.rpc_wallet("sendtoaddress", params) def getSCLockScriptAddress(self, lock_script: bytes) -> str: lock_tx_dest = self.getScriptDest(lock_script) address = self.encodeScriptDest(lock_tx_dest) if not self.isAddressMine(address, or_watch_only=True): # Expects P2WSH nested in BIP16_P2SH self.rpc("importaddress", [lock_tx_dest.hex(), "bid lock", False, True]) return address def getLockTxHeight( self, txid, dest_address, bid_amount, rescan_from, find_index: bool = False, vout: int = -1, ): # Add watchonly address and rescan if required if not self.isAddressMine(dest_address, or_watch_only=True): self.importWatchOnlyAddress(dest_address, "bid") self._log.info( "Imported watch-only addr: {}".format(self._log.addr(dest_address)) ) self._log.info( "Rescanning {} chain from height: {}".format( self.coin_name(), rescan_from ) ) self.rescanBlockchainForAddress(rescan_from, dest_address) return_txid = True if txid is None else False if txid is None: txns = self.rpc( "listunspent", [ 0, 9999999, [ dest_address, ], ], ) for tx in txns: if self.make_int(tx["amount"]) == bid_amount: txid = bytes.fromhex(tx["txid"]) break if txid is None: return None try: tx = self.rpc("gettransaction", [txid.hex()]) block_height = 0 if "blockhash" in tx: block_header = self.rpc("getblockheader", [tx["blockhash"]]) block_height = block_header["height"] rv = { "depth": 0 if "confirmations" not in tx else tx["confirmations"], "height": block_height, } except Exception as e: self._log.debug( "getLockTxHeight gettransaction failed: %s, %s", txid.hex(), str(e) ) return None if find_index: tx_obj = self.rpc("decoderawtransaction", [tx["hex"]]) rv["index"] = find_vout_for_address_from_txobj(tx_obj, dest_address) if rv["index"] is not None and rv["index"] >= 0: rv["value"] = self.make_int(tx_obj["vout"][rv["index"]]["value"]) if return_txid: rv["txid"] = txid.hex() return rv def createSCLockTx( self, value: int, script: bytearray, vkbv: bytes = None ) -> bytes: tx = CTransaction() tx.nVersion = self.txVersion() tx.vout.append(self.txoType()(value, self.getScriptDest(script))) return tx.serialize() def fundSCLockTx(self, tx_bytes, feerate, vkbv=None): return self.fundTx(tx_bytes, feerate) def signTxWithWallet(self, tx): rv = self.rpc("signrawtransaction", [tx.hex()]) return bytes.fromhex(rv["hex"]) def createRawFundedTransaction( self, addr_to: str, amount: int, sub_fee: bool = False, lock_unspents: bool = True, feerate: int = None, ) -> str: txn = self.rpc( "createrawtransaction", [[], {addr_to: self.format_amount(amount)}] ) if feerate: fee_rate = self.format_amount(feerate) fee_src = "specified" else: fee_rate, fee_src = self.get_fee_rate(self._conf_target) self._log.debug( f"Fee rate: {fee_rate}, source: {fee_src}, block target: {self._conf_target}" ) options = { "lockUnspents": lock_unspents, "feeRate": fee_rate, } if sub_fee: options["subtractFeeFromOutputs"] = [ 0, ] return self.rpc("fundrawtransaction", [txn, options])["hex"] def createRawSignedTransaction(self, addr_to, amount) -> str: txn_funded = self.createRawFundedTransaction(addr_to, amount) return self.rpc("signrawtransaction", [txn_funded])["hex"] def getScriptForPubkeyHash(self, pkh: bytes) -> bytearray: # Return P2PKH return CScript([OP_DUP, OP_HASH160, pkh, OP_EQUALVERIFY, OP_CHECKSIG]) def getScriptDest(self, script: bytearray) -> bytearray: # P2SH script_hash = hash160(script) assert len(script_hash) == 20 return CScript([OP_HASH160, script_hash, OP_EQUAL]) def getSeedHash(self, seed: bytes) -> bytes: return hash160(seed)[::-1] def encodeScriptDest(self, script_dest: bytes) -> str: # Extract hash from script script_hash = script_dest[2:-1] return self.sh_to_address(script_hash) def getDestForScriptHash(self, script_hash): assert len(script_hash) == 20 return CScript([OP_HASH160, script_hash, OP_EQUAL]) def getWalletSeedID(self): return self.rpc("getwalletinfo")["hdmasterkeyid"] def getSpendableBalance(self) -> int: return self.make_int(self.rpc("getwalletinfo")["balance"]) def getBLockSpendTxFee(self, tx, fee_rate: int) -> int: add_bytes = 107 size = len(tx.serialize_with_witness()) + add_bytes pay_fee = round(fee_rate * size / 1000) self._log.info( f"BLockSpendTx fee_rate, size, fee: {fee_rate}, {size}, {pay_fee}." ) return pay_fee def signTxWithKey(self, tx: bytes, key: bytes, prev_amount=None) -> bytes: key_wif = self.encodeKey(key) rv = self.rpc( "signrawtransaction", [ tx.hex(), [], [ key_wif, ], ], ) return bytes.fromhex(rv["hex"]) def findTxnByHash(self, txid_hex: str): # Only works for wallet txns try: rv = self.rpc("gettransaction", [txid_hex]) except Exception as e: # noqa: F841 self._log.debug( "findTxnByHash getrawtransaction failed: {}".format(txid_hex) ) return None if "confirmations" in rv and rv["confirmations"] >= self.blocks_confirmed: block_height = self.getBlockHeader(rv["blockhash"])["height"] return {"txid": txid_hex, "amount": 0, "height": block_height} return None def getProofOfFunds(self, amount_for, extra_commit_bytes): # TODO: Lock unspent and use same output/s to fund bid unspents_by_addr = dict() unspents = self.rpc("listunspent") for u in unspents: if u["spendable"] is not True: continue if u["address"] not in unspents_by_addr: unspents_by_addr[u["address"]] = {"total": 0, "utxos": []} utxo_amount: int = self.make_int(u["amount"], r=1) unspents_by_addr[u["address"]]["total"] += utxo_amount unspents_by_addr[u["address"]]["utxos"].append( (utxo_amount, u["txid"], u["vout"]) ) max_utxos: int = 4 viable_addrs = [] for addr, data in unspents_by_addr.items(): if data["total"] >= amount_for: # Sort from largest to smallest amount sorted_utxos = sorted(data["utxos"], key=lambda x: x[0]) # Max outputs required to reach amount_for utxos_req: int = 0 sum_value: int = 0 for utxo in sorted_utxos: sum_value += utxo[0] utxos_req += 1 if sum_value >= amount_for: break if utxos_req <= max_utxos: viable_addrs.append(addr) continue ensure( len(viable_addrs) > 0, "Could not find address with enough funds for proof" ) sign_for_addr: str = random.choice(viable_addrs) self._log.debug("sign_for_addr %s", sign_for_addr) prove_utxos = [] sorted_utxos = sorted( unspents_by_addr[sign_for_addr]["utxos"], key=lambda x: x[0] ) hasher = hashlib.sha256() sum_value: int = 0 for utxo in sorted_utxos: sum_value += utxo[0] outpoint = (bytes.fromhex(utxo[1]), utxo[2]) prove_utxos.append(outpoint) hasher.update(outpoint[0]) hasher.update(outpoint[1].to_bytes(2, "big")) if sum_value >= amount_for: break utxos_hash = hasher.digest() if ( self.using_segwit() ): # TODO: Use isSegwitAddress when scantxoutset can use combo # 'Address does not refer to key' for non p2pkh pkh = self.decodeAddress(sign_for_addr) sign_for_addr = self.pkh_to_address(pkh) self._log.debug("sign_for_addr converted %s", sign_for_addr) signature = self.rpc( "signmessage", [ sign_for_addr, sign_for_addr + "_swap_proof_" + utxos_hash.hex() + extra_commit_bytes.hex(), ], ) return (sign_for_addr, signature, prove_utxos) def verifyProofOfFunds(self, address, signature, utxos, extra_commit_bytes): hasher = hashlib.sha256() sum_value: int = 0 for outpoint in utxos: hasher.update(outpoint[0]) hasher.update(outpoint[1].to_bytes(2, "big")) utxos_hash = hasher.digest() passed = self.verifyMessage( address, address + "_swap_proof_" + utxos_hash.hex() + extra_commit_bytes.hex(), signature, ) ensure(passed is True, "Proof of funds signature invalid") if self.using_segwit(): address = self.encodeSegwitAddress(decodeAddress(address)[1:]) sum_value: int = 0 for outpoint in utxos: txout = self.rpc("gettxout", [outpoint[0].hex(), outpoint[1]]) sum_value += self.make_int(txout["value"]) return sum_value def rescanBlockchainForAddress(self, height_start: int, addr_find: str): # Very ugly workaround for missing `rescanblockchain` rpc command chain_blocks: int = self.getChainHeight() current_height: int = chain_blocks block_hash = self.rpc("getblockhash", [current_height]) script_hash: bytes = self.decodeAddress(addr_find) find_scriptPubKey = self.getDestForScriptHash(script_hash) while current_height > height_start: block_hash = self.rpc("getblockhash", [current_height]) block = self.rpc("getblock", [block_hash, False]) decoded_block = CBlock() decoded_block = FromHex(decoded_block, block) for tx in decoded_block.vtx: for txo in tx.vout: if txo.scriptPubKey == find_scriptPubKey: tx.rehash() txid = i2b(tx.sha256) self._log.info( "Found output to addr: {} in tx {} in block {}".format( addr_find, txid.hex(), block_hash ) ) self._log.info( "rescanblockchain hack invalidateblock {}".format( block_hash ) ) self.rpc("invalidateblock", [block_hash]) self.rpc("reconsiderblock", [block_hash]) return current_height -= 1 def getBlockWithTxns(self, block_hash: str): # TODO: Bypass decoderawtransaction and getblockheader block = self.rpc("getblock", [block_hash, False]) block_header = self.rpc("getblockheader", [block_hash]) decoded_block = CBlock() decoded_block = FromHex(decoded_block, block) tx_rv = [] for tx in decoded_block.vtx: tx_hex = tx.serialize_with_witness().hex() tx_dec = self.rpc("decoderawtransaction", [tx_hex]) if "hex" not in tx_dec: tx_dec["hex"] = tx_hex tx_rv.append(tx_dec) block_rv = { "hash": block_hash, "previousblockhash": block_header["previousblockhash"], "tx": tx_rv, "confirmations": block_header["confirmations"], "height": block_header["height"], "time": block_header["time"], "version": block_header["version"], "merkleroot": block_header["merkleroot"], } return block_rv