mirror of
https://github.com/basicswap/basicswap.git
synced 2026-02-28 00:25:11 +01:00
795 lines
27 KiB
Python
795 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2019-2024 tecnovert
|
|
# Copyright (c) 2024-2026 The Basicswap developers
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
import hashlib
|
|
import logging
|
|
import random
|
|
import secrets
|
|
import threading
|
|
import unittest
|
|
|
|
from coincurve.ed25519 import ed25519_get_pubkey
|
|
from coincurve.ecdsaotves import (
|
|
ecdsaotves_enc_sign,
|
|
ecdsaotves_enc_verify,
|
|
ecdsaotves_dec_sig,
|
|
ecdsaotves_rec_enc_key,
|
|
)
|
|
from coincurve.keys import PrivateKey
|
|
|
|
from basicswap.contrib.mnemonic import Mnemonic
|
|
from basicswap.db import create_db_, DBMethods, KnownIdentity
|
|
from basicswap.util import h2b
|
|
from basicswap.util.address import decodeAddress
|
|
from basicswap.util.crypto import ripemd160, hash160, blake256
|
|
from basicswap.util.extkey import ExtKeyPair
|
|
from basicswap.util.integer import encode_varint, decode_varint
|
|
from basicswap.util.network import is_private_ip_address
|
|
from basicswap.util.rfc2440 import rfc2440_hash_password
|
|
from basicswap.util_xmr import encode_address as xmr_encode_address
|
|
from basicswap.interface.btc import BTCInterface
|
|
from basicswap.interface.xmr import XMRInterface
|
|
from tests.basicswap.mnemonics import mnemonics
|
|
from tests.basicswap.util import REQUIRED_SETTINGS
|
|
|
|
from basicswap.basicswap_util import TxLockTypes
|
|
from basicswap.util import (
|
|
make_int,
|
|
SerialiseNum,
|
|
format_amount,
|
|
DeserialiseNum,
|
|
validate_amount,
|
|
)
|
|
from basicswap.messages_npb import (
|
|
BidMessage,
|
|
)
|
|
from basicswap.contrib.test_framework.script import (
|
|
hash160 as hash160_btc,
|
|
SegwitV0SignatureHash,
|
|
SIGHASH_ALL,
|
|
)
|
|
from basicswap.contrib.test_framework.messages import (
|
|
COutPoint,
|
|
CTransaction,
|
|
CTxIn,
|
|
CTxOut,
|
|
uint256_from_str,
|
|
)
|
|
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
class Test(unittest.TestCase):
|
|
|
|
@staticmethod
|
|
def ci_btc():
|
|
btc_coin_settings = {"rpcport": 0, "rpcauth": "none"}
|
|
btc_coin_settings.update(REQUIRED_SETTINGS)
|
|
return BTCInterface(btc_coin_settings, "regtest")
|
|
|
|
@staticmethod
|
|
def ci_xmr():
|
|
xmr_coin_settings = {"rpcport": 0, "walletrpcport": 0, "walletrpcauth": "none"}
|
|
xmr_coin_settings.update(REQUIRED_SETTINGS)
|
|
return XMRInterface(xmr_coin_settings, "regtest")
|
|
|
|
def test_serialise_num(self):
|
|
def test_case(v, nb=None):
|
|
b = SerialiseNum(v)
|
|
if nb is not None:
|
|
assert len(b) == nb
|
|
assert v == DeserialiseNum(b)
|
|
|
|
test_case(0, 1)
|
|
test_case(1, 1)
|
|
test_case(16, 1)
|
|
|
|
test_case(-1, 2)
|
|
test_case(17, 2)
|
|
|
|
test_case(500)
|
|
test_case(-500)
|
|
test_case(4194642)
|
|
|
|
def test_sequence(self):
|
|
ci = self.ci_btc()
|
|
|
|
time_val = 48 * 60 * 60
|
|
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_TIME, time_val)
|
|
decoded = ci.decodeSequence(encoded)
|
|
assert encoded == 4194642
|
|
assert decoded >= time_val
|
|
assert decoded <= time_val + 512
|
|
|
|
time_val = 24 * 60
|
|
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_TIME, time_val)
|
|
decoded = ci.decodeSequence(encoded)
|
|
assert decoded >= time_val
|
|
assert decoded <= time_val + 512
|
|
|
|
blocks_val = 123
|
|
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_BLOCKS, blocks_val)
|
|
decoded = ci.decodeSequence(encoded)
|
|
assert decoded == blocks_val
|
|
|
|
def test_make_int(self):
|
|
def test_case(vs, vf, expect_int):
|
|
i = make_int(vs)
|
|
assert i == expect_int and isinstance(i, int)
|
|
i = make_int(vf)
|
|
assert i == expect_int and isinstance(i, int)
|
|
vs_out = format_amount(i, 8)
|
|
# Strip
|
|
for i in range(7):
|
|
if vs_out[-1] == "0":
|
|
vs_out = vs_out[:-1]
|
|
if "." in vs:
|
|
assert vs_out == vs
|
|
else:
|
|
assert vs_out[:-2] == vs
|
|
|
|
test_case("0", 0, 0)
|
|
test_case("1", 1, 100000000)
|
|
test_case("10", 10, 1000000000)
|
|
test_case("0.00899999", 0.00899999, 899999)
|
|
test_case("899999.0", 899999.0, 89999900000000)
|
|
test_case("899999.00899999", 899999.00899999, 89999900899999)
|
|
test_case("0.0", 0.0, 0)
|
|
test_case("1.0", 1.0, 100000000)
|
|
test_case("1.1", 1.1, 110000000)
|
|
test_case("1.2", 1.2, 120000000)
|
|
test_case("0.00899991", 0.00899991, 899991)
|
|
test_case("0.0089999", 0.0089999, 899990)
|
|
test_case("0.0089991", 0.0089991, 899910)
|
|
test_case("0.123", 0.123, 12300000)
|
|
test_case("123000.000123", 123000.000123, 12300000012300)
|
|
|
|
try:
|
|
make_int("0.123456789")
|
|
assert False
|
|
except Exception as e:
|
|
assert str(e) == "Mantissa too long"
|
|
validate_amount("0.12345678")
|
|
|
|
# floor
|
|
assert make_int("0.123456789", r=-1) == 12345678
|
|
# Round up
|
|
assert make_int("0.123456789", r=1) == 12345679
|
|
|
|
def test_make_int12(self):
|
|
def test_case(vs, vf, expect_int):
|
|
i = make_int(vs, 12)
|
|
assert i == expect_int and isinstance(i, int)
|
|
i = make_int(vf, 12)
|
|
assert i == expect_int and isinstance(i, int)
|
|
vs_out = format_amount(i, 12)
|
|
# Strip
|
|
for i in range(7):
|
|
if vs_out[-1] == "0":
|
|
vs_out = vs_out[:-1]
|
|
if "." in vs:
|
|
assert vs_out == vs
|
|
else:
|
|
assert vs_out[:-2] == vs
|
|
|
|
test_case("0.123456789", 0.123456789, 123456789000)
|
|
test_case("0.123456789123", 0.123456789123, 123456789123)
|
|
try:
|
|
make_int("0.1234567891234", 12)
|
|
assert False
|
|
except Exception as e:
|
|
assert str(e) == "Mantissa too long"
|
|
validate_amount("0.123456789123", 12)
|
|
try:
|
|
validate_amount("0.1234567891234", 12)
|
|
assert False
|
|
except Exception as e:
|
|
assert "Too many decimal places" in str(e)
|
|
try:
|
|
validate_amount(0.1234567891234, 12)
|
|
assert False
|
|
except Exception as e:
|
|
assert "Too many decimal places" in str(e)
|
|
|
|
def test_ed25519(self):
|
|
privkey = bytes.fromhex(
|
|
"0b4c6e34c21b910f92c7985a8093de526f5f8677a112a8c672d1098139b70e0f"
|
|
)
|
|
pubkey = ed25519_get_pubkey(privkey)
|
|
assert pubkey == bytes.fromhex(
|
|
"5c26c518fb698e91a5858c33e9075488c55c235f391162fe9e6cbd4f694f80aa"
|
|
)
|
|
|
|
def test_key_summing(self):
|
|
ci_btc = self.ci_btc()
|
|
ci_xmr = self.ci_xmr()
|
|
keys = [
|
|
bytes.fromhex(
|
|
"e6b8e7c2ca3a88fe4f28591aa0f91fec340179346559e4ec430c2531aecc19aa"
|
|
),
|
|
bytes.fromhex(
|
|
"b725b6359bd2b510d9d5a7bba7bdee17abbf113253f6338ea50a8f0cf45fd0d0"
|
|
),
|
|
]
|
|
sum_secp256k1: bytes = ci_btc.sumKeys(keys[0], keys[1])
|
|
assert (
|
|
sum_secp256k1.hex()
|
|
== "9dde9df8660d3e0f28fe00d648b70e052511ad800a07783f284455b1d2f5a939"
|
|
)
|
|
|
|
sum_ed25519: bytes = ci_xmr.sumKeys(keys[0], keys[1])
|
|
assert (
|
|
sum_ed25519.hex()
|
|
== "0dde9df8660d3e0f28fe00d648b70e0323e9c192fe9b94f1cf7138515e877725"
|
|
)
|
|
|
|
sum_secp256k1 = ci_btc.sumPubkeys(
|
|
ci_btc.getPubkey(keys[0]), ci_btc.getPubkey(keys[1])
|
|
)
|
|
assert (
|
|
sum_secp256k1.hex()
|
|
== "028c30392e35620af0787b363a03cf9a695336759664436e1f609481c869541a5c"
|
|
)
|
|
|
|
sum_ed25519 = ci_xmr.sumPubkeys(
|
|
ci_xmr.getPubkey(keys[0]), ci_xmr.getPubkey(keys[1])
|
|
)
|
|
assert (
|
|
sum_ed25519.hex()
|
|
== "4b2dd2dc9acc9be7efed4fdbfb96f0002aeb9e4c8638c5b24562a7158b283626"
|
|
)
|
|
|
|
def test_ecdsa_otves(self):
|
|
ci = self.ci_btc()
|
|
vk_sign = ci.getNewRandomKey()
|
|
vk_encrypt = ci.getNewRandomKey()
|
|
|
|
pk_sign = ci.getPubkey(vk_sign)
|
|
pk_encrypt = ci.getPubkey(vk_encrypt)
|
|
sign_hash = secrets.token_bytes(32)
|
|
|
|
cipher_text = ecdsaotves_enc_sign(vk_sign, pk_encrypt, sign_hash)
|
|
assert ecdsaotves_enc_verify(pk_sign, pk_encrypt, sign_hash, cipher_text)
|
|
|
|
sig = ecdsaotves_dec_sig(vk_encrypt, cipher_text)
|
|
assert ci.verifySig(pk_sign, sign_hash, sig)
|
|
|
|
recovered_key = ecdsaotves_rec_enc_key(pk_encrypt, cipher_text, sig)
|
|
assert vk_encrypt == recovered_key
|
|
|
|
def test_sign(self):
|
|
ci = self.ci_btc()
|
|
|
|
vk = ci.getNewRandomKey()
|
|
pk = ci.getPubkey(vk)
|
|
|
|
message = "test signing message"
|
|
message_hash = hashlib.sha256(bytes(message, "utf-8")).digest()
|
|
eck = PrivateKey(vk)
|
|
sig = eck.sign(message.encode("utf-8"))
|
|
|
|
ci.verifySig(pk, message_hash, sig)
|
|
|
|
def test_sign_compact(self):
|
|
ci = self.ci_btc()
|
|
|
|
vk = ci.getNewRandomKey()
|
|
pk = ci.getPubkey(vk)
|
|
sig = ci.signCompact(vk, "test signing message")
|
|
assert len(sig) == 64
|
|
ci.verifyCompactSig(pk, "test signing message", sig)
|
|
|
|
# Nonce is set deterministically (using default libsecp256k1 method rfc6979)
|
|
sig2 = ci.signCompact(vk, "test signing message")
|
|
assert sig == sig2
|
|
|
|
def test_sign_recoverable(self):
|
|
ci = self.ci_btc()
|
|
|
|
vk = ci.getNewRandomKey()
|
|
pk = ci.getPubkey(vk)
|
|
sig = ci.signRecoverable(vk, "test signing message")
|
|
assert len(sig) == 65
|
|
pk_rec = ci.verifySigAndRecover(sig, "test signing message")
|
|
assert pk == pk_rec
|
|
|
|
# Nonce is set deterministically (using default libsecp256k1 method rfc6979)
|
|
sig2 = ci.signRecoverable(vk, "test signing message")
|
|
assert sig == sig2
|
|
|
|
def test_pubkey_to_address(self):
|
|
ci = self.ci_btc()
|
|
pk = h2b("02c26a344e7d21bcc6f291532679559f2fd234c881271ff98714855edc753763a6")
|
|
addr = ci.pubkey_to_address(pk)
|
|
assert addr == "mj6SdSxmWRmdDqR5R3FfZmRiLmQfQAsLE8"
|
|
|
|
def test_dleag(self):
|
|
ci = self.ci_xmr()
|
|
|
|
key = ci.getNewRandomKey()
|
|
proof = ci.proveDLEAG(key)
|
|
assert ci.verifyDLEAG(proof)
|
|
|
|
def test_rate(self):
|
|
scale_from = 8
|
|
scale_to = 12
|
|
amount_from = make_int(100, scale_from)
|
|
rate = make_int(0.1, scale_to)
|
|
|
|
amount_to = int((amount_from * rate) // (10**scale_from))
|
|
assert "100.00000000" == format_amount(amount_from, scale_from)
|
|
assert "10.000000000000" == format_amount(amount_to, scale_to)
|
|
|
|
rate_check = make_int((amount_to / amount_from), scale_from)
|
|
assert rate == rate_check
|
|
|
|
scale_from = 12
|
|
scale_to = 8
|
|
amount_from = make_int(1, scale_from)
|
|
rate = make_int(12, scale_to)
|
|
|
|
amount_to = int((amount_from * rate) // (10**scale_from))
|
|
assert "1.000000000000" == format_amount(amount_from, scale_from)
|
|
assert "12.00000000" == format_amount(amount_to, scale_to)
|
|
|
|
rate_check = make_int((amount_to / amount_from), scale_from)
|
|
assert rate == rate_check
|
|
|
|
def test_rate_tolerance_precision(self):
|
|
scale = 8
|
|
amount_from = make_int("0.001", scale)
|
|
offer_rate = make_int("0.354185354480", scale, r=1)
|
|
amount_to = int((amount_from * offer_rate) // (10**scale))
|
|
bid_rate = make_int(amount_to / amount_from, r=1)
|
|
|
|
rate_tolerance = max(1, offer_rate // 10000)
|
|
rate_diff = abs(bid_rate - offer_rate)
|
|
assert (
|
|
rate_diff <= rate_tolerance
|
|
), f"Rate difference {rate_diff} exceeds tolerance {rate_tolerance}"
|
|
|
|
test_cases = [
|
|
("0.001", "0.123456789"),
|
|
("0.5", "1.23456789"),
|
|
("0.00001", "999.99999999"),
|
|
]
|
|
|
|
for amount_str, rate_str in test_cases:
|
|
amount_from = make_int(amount_str, scale)
|
|
offer_rate = make_int(rate_str, scale, r=1)
|
|
amount_to = int((amount_from * offer_rate) // (10**scale))
|
|
bid_rate = make_int(amount_to / amount_from, r=1)
|
|
|
|
rate_tolerance = max(1, offer_rate // 10000)
|
|
rate_diff = abs(bid_rate - offer_rate)
|
|
|
|
assert rate_diff <= rate_tolerance, (
|
|
f"Rate difference {rate_diff} exceeds tolerance {rate_tolerance} "
|
|
f"for amount {amount_str} at rate {rate_str}"
|
|
)
|
|
|
|
large_offer_rate = make_int("1.0", scale)
|
|
large_tolerance = max(1, large_offer_rate // 10000)
|
|
bad_bid_rate = large_offer_rate + large_tolerance + 1
|
|
rate_diff = abs(bad_bid_rate - large_offer_rate)
|
|
assert (
|
|
rate_diff > large_tolerance
|
|
), "Test setup error: difference should exceed tolerance"
|
|
|
|
def test_rate_tolerance_helper_functions(self):
|
|
class MockBasicSwap:
|
|
def calculateRateTolerance(self, offer_rate: int) -> int:
|
|
return max(1, offer_rate // 10000)
|
|
|
|
def ratesMatch(self, rate1: int, rate2: int, offer_rate: int) -> bool:
|
|
tolerance = self.calculateRateTolerance(offer_rate)
|
|
return abs(rate1 - rate2) <= tolerance
|
|
|
|
mock_swap = MockBasicSwap()
|
|
|
|
assert mock_swap.calculateRateTolerance(100000000) == 10000
|
|
assert mock_swap.calculateRateTolerance(1000000) == 100
|
|
assert mock_swap.calculateRateTolerance(100) == 1
|
|
assert mock_swap.calculateRateTolerance(50) == 1
|
|
|
|
offer_rate = 100000000
|
|
tolerance = 10000
|
|
|
|
assert mock_swap.ratesMatch(offer_rate, offer_rate, offer_rate)
|
|
assert mock_swap.ratesMatch(offer_rate, offer_rate + tolerance, offer_rate)
|
|
assert mock_swap.ratesMatch(offer_rate, offer_rate - tolerance, offer_rate)
|
|
assert mock_swap.ratesMatch(offer_rate + tolerance // 2, offer_rate, offer_rate)
|
|
|
|
assert not mock_swap.ratesMatch(
|
|
offer_rate, offer_rate + tolerance + 1, offer_rate
|
|
)
|
|
assert not mock_swap.ratesMatch(
|
|
offer_rate, offer_rate - tolerance - 1, offer_rate
|
|
)
|
|
|
|
small_rate = 1000
|
|
assert mock_swap.ratesMatch(small_rate, small_rate + 1, small_rate)
|
|
assert not mock_swap.ratesMatch(small_rate, small_rate + 2, small_rate)
|
|
|
|
scale_from = 8
|
|
scale_to = 8
|
|
amount_from = make_int(0.073, scale_from)
|
|
amount_to = make_int(10, scale_to)
|
|
rate = make_int(amount_to / amount_from, scale_to, r=1)
|
|
amount_to_recreate = int((amount_from * rate) // (10**scale_from))
|
|
assert "10.00000000" == format_amount(amount_to_recreate, scale_to)
|
|
|
|
scale_from = 8
|
|
scale_to = 12
|
|
amount_from = make_int(10.0, scale_from)
|
|
amount_to = make_int(0.06935, scale_to)
|
|
rate = make_int(amount_to / amount_from, scale_from, r=1)
|
|
amount_to_recreate = int((amount_from * rate) // (10**scale_from))
|
|
assert "0.069350000000" == format_amount(amount_to_recreate, scale_to)
|
|
|
|
scale_from = 12
|
|
scale_to = 8
|
|
amount_from = make_int(0.06935, scale_from)
|
|
amount_to = make_int(10.0, scale_to)
|
|
rate = make_int(amount_to / amount_from, scale_from, r=1)
|
|
amount_to_recreate = int((amount_from * rate) // (10**scale_from))
|
|
assert "10.00000000" == format_amount(amount_to_recreate, scale_to)
|
|
|
|
ci_xmr = self.ci_xmr()
|
|
ci_btc = self.ci_btc()
|
|
|
|
for i in range(10000):
|
|
test_pairs = random.randint(0, 3)
|
|
if test_pairs == 0:
|
|
ci_from = ci_btc
|
|
ci_to = ci_xmr
|
|
elif test_pairs == 1:
|
|
ci_from = ci_xmr
|
|
ci_to = ci_btc
|
|
elif test_pairs == 2:
|
|
ci_from = ci_xmr
|
|
ci_to = ci_xmr
|
|
else:
|
|
ci_from = ci_btc
|
|
ci_to = ci_btc
|
|
|
|
test_range = random.randint(0, 5)
|
|
if test_range == 0:
|
|
amount_from = random.randint(10000, 1 * ci_from.COIN())
|
|
elif test_range == 1:
|
|
amount_from = random.randint(10000, 1000 * ci_from.COIN())
|
|
elif test_range == 2:
|
|
amount_from = random.randint(10000, 2100 * ci_from.COIN())
|
|
elif test_range == 3:
|
|
amount_from = random.randint(10000, 210000 * ci_from.COIN())
|
|
elif test_range == 4:
|
|
amount_from = random.randint(10000, 21000000 * ci_from.COIN())
|
|
else:
|
|
amount_from = random.randint(10000, 2100000000 * ci_from.COIN())
|
|
|
|
test_range = random.randint(0, 5)
|
|
if test_range == 0:
|
|
amount_to = random.randint(10000, 1 * ci_to.COIN())
|
|
elif test_range == 1:
|
|
amount_to = random.randint(10000, 1000 * ci_to.COIN())
|
|
elif test_range == 2:
|
|
amount_to = random.randint(10000, 2100 * ci_to.COIN())
|
|
elif test_range == 3:
|
|
amount_to = random.randint(10000, 210000 * ci_to.COIN())
|
|
elif test_range == 4:
|
|
amount_to = random.randint(10000, 21000000 * ci_to.COIN())
|
|
else:
|
|
amount_to = random.randint(10000, 2100000000 * ci_to.COIN())
|
|
|
|
offer_rate = ci_from.make_int(amount_to / amount_from, r=1)
|
|
amount_to_from_rate: int = int(
|
|
(int(amount_from) * offer_rate) // (10**scale_from)
|
|
)
|
|
|
|
scale_from = 24
|
|
offer_rate = make_int(amount_to, scale_from) // amount_from
|
|
amount_to_from_rate: int = int(
|
|
(int(amount_from) * offer_rate) // (10**scale_from)
|
|
)
|
|
|
|
if abs(amount_to - amount_to_from_rate) == 1:
|
|
offer_rate += 1
|
|
|
|
offer_rate_human_read: int = int(
|
|
offer_rate // (10 ** (scale_from - ci_from.exp()))
|
|
)
|
|
amount_to_from_rate: int = int(
|
|
(int(amount_from) * offer_rate) // (10**scale_from)
|
|
)
|
|
|
|
if amount_to != amount_to_from_rate:
|
|
print("from exp, amount", ci_from.exp(), amount_from)
|
|
print("to exp, amount", ci_to.exp(), amount_to)
|
|
print("offer_rate_human_read", offer_rate_human_read)
|
|
print("amount_to_from_rate", amount_to_from_rate)
|
|
raise ValueError("Bad amount_to")
|
|
|
|
scale_to = 24
|
|
reversed_rate = make_int(amount_from, scale_to) // amount_to
|
|
|
|
amount_from_from_rate: int = int(
|
|
(int(amount_to) * reversed_rate) // (10**scale_to)
|
|
)
|
|
if abs(amount_from - amount_from_from_rate) == 1:
|
|
reversed_rate += 1
|
|
|
|
amount_from_from_rate: int = int(
|
|
(int(amount_to) * reversed_rate) // (10**scale_to)
|
|
)
|
|
|
|
if amount_from != amount_from_from_rate:
|
|
print("from exp, amount", ci_from.exp(), amount_from)
|
|
print("to exp, amount", ci_to.exp(), amount_to)
|
|
print("amount_from_from_rate", amount_from_from_rate)
|
|
raise ValueError("Bad amount_from")
|
|
|
|
def test_rfc2440(self):
|
|
password = "test"
|
|
salt = bytes.fromhex("B7A94A7E4988630E")
|
|
password_hash = rfc2440_hash_password(password, salt=salt)
|
|
|
|
assert (
|
|
password_hash
|
|
== "16:B7A94A7E4988630E6095334BA67F06FBA509B2A7136A04C9C1B430F539"
|
|
)
|
|
|
|
def test_ripemd160(self):
|
|
input_data = b"hash this"
|
|
assert ripemd160(input_data).hex() == "d5443a154f167e2c1332f6de72cfb4c6ab9c8c17"
|
|
|
|
def test_hash160(self):
|
|
# hash160 is RIPEMD(SHA256(data))
|
|
input_data = b"hash this"
|
|
assert hash160(input_data).hex() == "072985b3583a4a71f548494a5e1d5f6b00d0fe13"
|
|
assert (
|
|
hash160_btc(input_data).hex() == "072985b3583a4a71f548494a5e1d5f6b00d0fe13"
|
|
)
|
|
|
|
def test_protobuf(self):
|
|
msg_buf = BidMessage()
|
|
msg_buf.protocol_version = 2
|
|
msg_buf.time_valid = 1024
|
|
serialised_msg = msg_buf.to_bytes()
|
|
|
|
msg_buf_2 = BidMessage()
|
|
msg_buf_2.from_bytes(serialised_msg)
|
|
assert msg_buf_2.protocol_version == 2
|
|
assert msg_buf_2.time_valid == 1024
|
|
assert msg_buf_2.amount == 0
|
|
assert msg_buf_2.pkhash_buyer is not None
|
|
assert len(msg_buf_2.pkhash_buyer) == 0
|
|
|
|
# Decode only the first field
|
|
msg_buf_3 = BidMessage()
|
|
msg_buf_3.from_bytes(serialised_msg[:2])
|
|
assert msg_buf_3.protocol_version == 2
|
|
assert msg_buf_3.time_valid == 0
|
|
|
|
try:
|
|
_ = BidMessage(doesnotexist=1)
|
|
except Exception as e:
|
|
assert "unexpected keyword argument" in str(e)
|
|
else:
|
|
raise ValueError("Should have errored.")
|
|
|
|
def test_is_private_ip_address(self):
|
|
test_addresses = [
|
|
("localhost", True),
|
|
("127.0.0.1", True),
|
|
("10.0.0.0", True),
|
|
("172.16.0.0", True),
|
|
("192.168.0.0", True),
|
|
("20.87.245.0", False),
|
|
("particl.io", False),
|
|
]
|
|
for addr, is_private in test_addresses:
|
|
assert is_private_ip_address(addr) is is_private
|
|
|
|
def test_varint(self):
|
|
test_vectors = [
|
|
(0, 1),
|
|
(1, 1),
|
|
(127, 1),
|
|
(128, 2),
|
|
(253, 2),
|
|
(8321, 2),
|
|
(16383, 2),
|
|
(16384, 3),
|
|
(2097151, 3),
|
|
(2097152, 4),
|
|
]
|
|
for i, expect_length in test_vectors:
|
|
b = encode_varint(i)
|
|
assert len(b) == expect_length
|
|
assert decode_varint(b) == (i, expect_length)
|
|
|
|
def test_base58(self):
|
|
k = bytes.fromhex(
|
|
"0b4c6e34c21b910f92c7985a8093de526f5f8677a112a8c672d1098139b70e0f"
|
|
)
|
|
K = ed25519_get_pubkey(k)
|
|
|
|
addr = xmr_encode_address(K, K)
|
|
assert addr.startswith("4")
|
|
|
|
addr = xmr_encode_address(K, K, 4146)
|
|
assert addr.startswith("Wo")
|
|
|
|
def test_blake256(self):
|
|
test_vectors = [
|
|
("716f6e863f744b9ac22c97ec7b76ea5f5908bc5b2f67c61510bfc4751384ea7a", b""),
|
|
(
|
|
"7576698ee9cad30173080678e5965916adbb11cb5245d386bf1ffda1cb26c9d7",
|
|
b"The quick brown fox jumps over the lazy dog",
|
|
),
|
|
]
|
|
for expect_hash, data in test_vectors:
|
|
assert blake256(data).hex() == expect_hash
|
|
|
|
def test_extkey(self):
|
|
test_key = "XPARHAr37YxmFP8wyjkaHAQWmp84GiyLikL7EL8j9BCx4LkB8Q1Bw5Kr8sA1GA3Ym53zNLcaxxFHr6u81JVTeCaD61c6fKS1YRAuti8Zu5SzJCjh"
|
|
test_key_c0 = "XPARHAt1XMcNYAwP5wEnQXknBAkGSzaetdZt2eoJZehdB4WXfV1xbSjpgHe44AivmumcSejW5KaYx6L5M6MyR1WyXrsWTwaiUEfHq2RrqCfXj3ZW"
|
|
test_key_c0_p = "PPARTKPL4rp5WLnrYP6jZfuRjx6jrmvbsz5QdHofPfFqJdm918mQwdPLq6Dd9TkdbQeKUqjbHWkyzWe7Pftd7itzm7ETEoUMq4cbG4fY9FKH1YSU"
|
|
test_key_c0h = "XPARHAt1XMcNgWbv48LwoQbjs1bC8kCXKomzvJLRT5xmbQ2GKf9e8Vfr1MMcfiWJC34RyDp5HvAfjeiNyLDfkFm1UrRCrPkVC9GGaAWa3nXMWew8"
|
|
|
|
ek_data = decodeAddress(test_key)[4:]
|
|
|
|
ek = ExtKeyPair()
|
|
ek.decode(ek_data)
|
|
assert ek.encode_v() == ek_data
|
|
|
|
m_0 = ek.derive(0)
|
|
|
|
ek_c0_data = decodeAddress(test_key_c0)[4:]
|
|
assert m_0.encode_v() == ek_c0_data
|
|
|
|
child_no: int = 0 | (1 << 31)
|
|
m_0h = ek.derive(child_no)
|
|
|
|
ek_c0h_data = decodeAddress(test_key_c0h)[4:]
|
|
assert m_0h.encode_v() == ek_c0h_data
|
|
|
|
ek.neuter()
|
|
assert ek.has_key() is False
|
|
m_0 = ek.derive(0)
|
|
|
|
ek_c0_p_data = decodeAddress(test_key_c0_p)[4:]
|
|
assert m_0.encode_p() == ek_c0_p_data
|
|
|
|
def test_mnemonic(self):
|
|
entropy0: bytes = Mnemonic("english").to_entropy(mnemonics[0])
|
|
assert entropy0.hex() == "0002207e9b744ea2d7ab41702f31f000"
|
|
mnemonic_recovered: str = Mnemonic("english").to_mnemonic(entropy0)
|
|
assert mnemonic_recovered == mnemonics[0]
|
|
|
|
def test_db(self):
|
|
db_test = DBMethods()
|
|
db_test.sqlite_file = ":memory:"
|
|
db_test.mxDB = threading.Lock()
|
|
cursor = db_test.openDB()
|
|
try:
|
|
create_db_(db_test._db_con, logger)
|
|
# Test upsert
|
|
ki = KnownIdentity()
|
|
ki.address = "test"
|
|
ki.label = "test"
|
|
db_test.add(ki, cursor)
|
|
ki.record_id = 1
|
|
ki.address = "test1"
|
|
ki.label = "test1"
|
|
ki.note = "note1"
|
|
try:
|
|
db_test.add(ki, cursor, upsert=False)
|
|
except Exception as e:
|
|
assert "UNIQUE constraint failed" in str(e)
|
|
else:
|
|
raise ValueError("Should have errored.")
|
|
db_test.add(ki, cursor, upsert=True)
|
|
|
|
# Test columns list
|
|
ki_test = db_test.queryOne(
|
|
KnownIdentity,
|
|
cursor,
|
|
{"address": "test1"},
|
|
columns_list=[
|
|
"label",
|
|
],
|
|
)
|
|
assert ki_test.label == "test1"
|
|
assert ki_test.address is None
|
|
|
|
# Test updating partial row
|
|
ki_test.label = "test2"
|
|
ki_test.record_id = 1
|
|
db_test.add(
|
|
ki_test,
|
|
cursor,
|
|
upsert=True,
|
|
columns_list=[
|
|
"record_id",
|
|
"label",
|
|
],
|
|
)
|
|
ki_test = db_test.queryOne(KnownIdentity, cursor, {"address": "test1"})
|
|
assert ki_test.record_id == 1
|
|
assert ki_test.address == "test1"
|
|
assert ki_test.label == "test2"
|
|
assert ki_test.note == "note1"
|
|
|
|
ki_test.note = "test2"
|
|
ki_test.label = "test3"
|
|
|
|
db_test.updateDB(
|
|
ki_test,
|
|
cursor,
|
|
["record_id"],
|
|
columns_list=[
|
|
"label",
|
|
],
|
|
)
|
|
ki_test = db_test.queryOne(KnownIdentity, cursor, {"address": "test1"})
|
|
assert ki_test.record_id == 1
|
|
assert ki_test.address == "test1"
|
|
assert ki_test.label == "test3"
|
|
assert ki_test.note == "note1"
|
|
|
|
# Test partially initialised object
|
|
ki_test_p = KnownIdentity(
|
|
_init_all_columns=False, record_id=1, label="test4"
|
|
)
|
|
db_test.add(ki_test_p, cursor, upsert=True)
|
|
ki_test = db_test.queryOne(KnownIdentity, cursor, {"address": "test1"})
|
|
assert ki_test.record_id == 1
|
|
assert ki_test.address == "test1"
|
|
assert ki_test.label == "test4"
|
|
assert ki_test.note == "note1"
|
|
|
|
finally:
|
|
db_test.closeDB(cursor)
|
|
|
|
def test_tx_hashes(self):
|
|
tx = CTransaction()
|
|
tx.nVersion = 2
|
|
tx.nLockTime = 0
|
|
tx.vout.append(CTxOut(1, bytes.fromhex("a15143aa086e05e3b5a73046")))
|
|
tx.vout.append(CTxOut(2, bytes.fromhex("eed7d63fd86225f7159ed7e5")))
|
|
tx.vin.append(
|
|
CTxIn(
|
|
COutPoint(
|
|
uint256_from_str(
|
|
bytes.fromhex(
|
|
"0101010101010101010101010101010101010010101010101010101010101010"
|
|
)
|
|
),
|
|
1,
|
|
),
|
|
bytes.fromhex("c2dca8ecbcf058b79f188692"),
|
|
)
|
|
)
|
|
assert (
|
|
tx.rehash()
|
|
== "28d0e9afad2740504eb9d0428352bc77a7b94eaafa364ef4cc07aeeff0c631a2"
|
|
)
|
|
sighash = SegwitV0SignatureHash(
|
|
bytes.fromhex("a15143aa086e05e3b5a73046"), tx, 0, SIGHASH_ALL, 3
|
|
)
|
|
assert (
|
|
sighash.hex()
|
|
== "252cd6e85b99e0fd554c44d5fe638923f7ef563048362406a665cf3400feb1bd"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|