basicswap-prepare can enable and disable tor config.

This commit is contained in:
tecnovert
2022-03-24 00:00:35 +02:00
parent cf797afae9
commit d1e015962c
25 changed files with 890 additions and 467 deletions

187
basicswap/util/__init__.py Normal file
View File

@@ -0,0 +1,187 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2018-2022 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import json
import time
import decimal
COIN = 100000000
decimal_ctx = decimal.Context()
decimal_ctx.prec = 20
class TemporaryError(ValueError):
pass
def ensure(v, err_string):
if not v:
raise ValueError(err_string)
def toBool(s) -> bool:
return s.lower() in ['1', 'true']
def jsonDecimal(obj):
if isinstance(obj, decimal.Decimal):
return str(obj)
raise TypeError
def dumpj(jin, indent=4):
return json.dumps(jin, indent=indent, default=jsonDecimal)
def dumpje(jin):
return json.dumps(jin, default=jsonDecimal).replace('"', '\\"')
def SerialiseNum(n):
if n == 0:
return bytes((0x00,))
if n > 0 and n <= 16:
return bytes((0x50 + n,))
rv = bytearray()
neg = n < 0
absvalue = -n if neg else n
while(absvalue):
rv.append(absvalue & 0xff)
absvalue >>= 8
if rv[-1] & 0x80:
rv.append(0x80 if neg else 0)
elif neg:
rv[-1] |= 0x80
return bytes((len(rv),)) + rv
def DeserialiseNum(b, o=0) -> int:
if b[o] == 0:
return 0
if b[o] > 0x50 and b[o] <= 0x50 + 16:
return b[o] - 0x50
v = 0
nb = b[o]
o += 1
for i in range(0, nb):
v |= b[o + i] << (8 * i)
# If the input vector's most significant byte is 0x80, remove it from the result's msb and return a negative.
if b[o + nb - 1] & 0x80:
return -(v & ~(0x80 << (8 * (nb - 1))))
return v
def float_to_str(f):
# stackoverflow.com/questions/38847690
d1 = decimal_ctx.create_decimal(repr(f))
return format(d1, 'f')
def make_int(v, scale=8, r=0): # r = 0, no rounding, fail, r > 0 round up, r < 0 floor
if type(v) == float:
v = float_to_str(v)
elif type(v) == int:
return v * 10 ** scale
sign = 1
if v[0] == '-':
v = v[1:]
sign = -1
ep = 10 ** scale
have_dp = False
rv = 0
for c in v:
if c == '.':
rv *= ep
have_dp = True
continue
if not c.isdigit():
raise ValueError('Invalid char: ' + c)
if have_dp:
ep //= 10
if ep <= 0:
if r == 0:
raise ValueError('Mantissa too long')
if r > 0:
# Round up
if int(c) > 4:
rv += 1
break
rv += ep * int(c)
else:
rv = rv * 10 + int(c)
if not have_dp:
rv *= ep
return rv * sign
def validate_amount(amount, scale=8) -> bool:
str_amount = float_to_str(amount) if type(amount) == float else str(amount)
has_decimal = False
for c in str_amount:
if c == '.' and not has_decimal:
has_decimal = True
continue
if not c.isdigit():
raise ValueError('Invalid amount')
ar = str_amount.split('.')
if len(ar) > 1 and len(ar[1]) > scale:
raise ValueError('Too many decimal places in amount {}'.format(str_amount))
return True
def format_amount(i, display_scale, scale=None):
if not isinstance(i, int):
raise ValueError('Amount must be an integer.') # Raise error instead of converting as amounts should always be integers
if scale is None:
scale = display_scale
ep = 10 ** scale
n = abs(i)
quotient = n // ep
remainder = n % ep
if display_scale != scale:
remainder %= (10 ** display_scale)
rv = '{}.{:0>{scale}}'.format(quotient, remainder, scale=display_scale)
if i < 0:
rv = '-' + rv
return rv
def format_timestamp(value, with_seconds=False):
str_format = '%Y-%m-%d %H:%M'
if with_seconds:
str_format += ':%S'
str_format += ' %Z'
return time.strftime(str_format, time.localtime(value))
def b2i(b) -> int:
# bytes32ToInt
return int.from_bytes(b, byteorder='big')
def i2b(i: int) -> bytes:
# intToBytes32
return i.to_bytes(32, byteorder='big')
def b2h(b: bytes) -> str:
return b.hex()
def h2b(h: str) -> bytes:
if h.startswith('0x'):
h = h[2:]
return bytes.fromhex(h)
def i2h(x):
return b2h(i2b(x))

128
basicswap/util/address.py Normal file
View File

@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import hashlib
from basicswap.contrib.segwit_addr import bech32_decode, convertbits, bech32_encode
__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
def b58decode(v, length=None):
long_value = 0
for (i, c) in enumerate(v[::-1]):
ofs = __b58chars.find(c)
if ofs < 0:
return None
long_value += ofs * (58**i)
result = bytes()
while long_value >= 256:
div, mod = divmod(long_value, 256)
result = bytes((mod,)) + result
long_value = div
result = bytes((long_value,)) + result
nPad = 0
for c in v:
if c == __b58chars[0]:
nPad += 1
else:
break
pad = bytes((0,)) * nPad
result = pad + result
if length is not None and len(result) != length:
return None
return result
def b58encode(v):
long_value = 0
for (i, c) in enumerate(v[::-1]):
long_value += (256**i) * c
result = ''
while long_value >= 58:
div, mod = divmod(long_value, 58)
result = __b58chars[mod] + result
long_value = div
result = __b58chars[long_value] + result
# leading 0-bytes in the input become leading-1s
nPad = 0
for c in v:
if c == 0:
nPad += 1
else:
break
return (__b58chars[0] * nPad) + result
def encodeStealthAddress(prefix_byte, scan_pubkey, spend_pubkey):
data = bytes((0x00,))
data += scan_pubkey
data += bytes((0x01,))
data += spend_pubkey
data += bytes((0x00,)) # number_signatures - unused
data += bytes((0x00,)) # num prefix bits
b = bytes((prefix_byte,)) + data
b += hashlib.sha256(hashlib.sha256(b).digest()).digest()[:4]
return b58encode(b)
def decodeWif(encoded_key):
key = b58decode(encoded_key)[1:-4]
if len(key) == 33:
return key[:-1]
return key
def toWIF(prefix_byte, b, compressed=True):
b = bytes((prefix_byte,)) + b
if compressed:
b += bytes((0x01,))
b += hashlib.sha256(hashlib.sha256(b).digest()).digest()[:4]
return b58encode(b)
def getKeyID(bytes):
data = hashlib.sha256(bytes).digest()
return hashlib.new('ripemd160', data).digest()
def bech32Decode(hrp, addr):
hrpgot, data = bech32_decode(addr)
if hrpgot != hrp:
return None
decoded = convertbits(data, 5, 8, False)
if decoded is None or len(decoded) < 2 or len(decoded) > 40:
return None
return bytes(decoded)
def bech32Encode(hrp, data):
ret = bech32_encode(hrp, convertbits(data, 8, 5))
if bech32Decode(hrp, ret) is None:
return None
return ret
def decodeAddress(address_str):
b58_addr = b58decode(address_str)
if b58_addr is not None:
address = b58_addr[:-4]
checksum = b58_addr[-4:]
assert(hashlib.sha256(hashlib.sha256(address).digest()).digest()[:4] == checksum), 'Checksum mismatch'
return b58_addr[:-4]
return None
def encodeAddress(address):
checksum = hashlib.sha256(hashlib.sha256(address).digest()).digest()
return b58encode(address + checksum[0:4])
def pubkeyToAddress(prefix, pubkey):
return encodeAddress(bytes((prefix,)) + getKeyID(pubkey))

191
basicswap/util/ecc.py Normal file
View File

@@ -0,0 +1,191 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import hashlib
import secrets
from basicswap.contrib.ellipticcurve import CurveFp, Point, INFINITY, jacobi_symbol
from . import i2b
class ECCParameters():
def __init__(self, p, a, b, Gx, Gy, o):
self.p = p
self.a = a
self.b = b
self.Gx = Gx
self.Gy = Gy
self.o = o
ep = ECCParameters(
p=0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffefffffc2f,
a=0x0,
b=0x7,
Gx=0x79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798,
Gy=0x483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8,
o=0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141)
curve_secp256k1 = CurveFp(ep.p, ep.a, ep.b)
G = Point(curve_secp256k1, ep.Gx, ep.Gy, ep.o)
SECP256K1_ORDER_HALF = ep.o // 2
def ToDER(P) -> bytes:
return bytes((4, )) + int(P.x()).to_bytes(32, byteorder='big') + int(P.y()).to_bytes(32, byteorder='big')
def getSecretBytes() -> bytes:
i = 1 + secrets.randbelow(ep.o - 1)
return i2b(i)
def getSecretInt() -> int:
return 1 + secrets.randbelow(ep.o - 1)
def getInsecureBytes() -> bytes:
while True:
s = os.urandom(32)
s_test = int.from_bytes(s, byteorder='big')
if s_test > 1 and s_test < ep.o:
return s
def getInsecureInt() -> int:
while True:
s = os.urandom(32)
s_test = int.from_bytes(s, byteorder='big')
if s_test > 1 and s_test < ep.o:
return s_test
def powMod(x, y, z) -> int:
# Calculate (x ** y) % z efficiently.
number = 1
while y:
if y & 1:
number = number * x % z
y >>= 1 # y //= 2
x = x * x % z
return number
def ExpandPoint(xb, sign):
x = int.from_bytes(xb, byteorder='big')
a = (powMod(x, 3, ep.p) + 7) % ep.p
y = powMod(a, (ep.p + 1) // 4, ep.p)
if sign:
y = ep.p - y
return Point(curve_secp256k1, x, y, ep.o)
def CPKToPoint(cpk):
y_parity = cpk[0] - 2
x = int.from_bytes(cpk[1:], byteorder='big')
a = (powMod(x, 3, ep.p) + 7) % ep.p
y = powMod(a, (ep.p + 1) // 4, ep.p)
if y % 2 != y_parity:
y = ep.p - y
return Point(curve_secp256k1, x, y, ep.o)
def pointToCPK2(point, ind=0x09):
# The function is_square(x), where x is an integer, returns whether or not x is a quadratic residue modulo p. Since p is prime, it is equivalent to the Legendre symbol (x / p) = x(p-1)/2 mod p being equal to 1[8].
ind = bytes((ind ^ (1 if jacobi_symbol(point.y(), ep.p) == 1 else 0),))
return ind + point.x().to_bytes(32, byteorder='big')
def pointToCPK(point):
y = point.y().to_bytes(32, byteorder='big')
ind = bytes((0x03,)) if y[31] % 2 else bytes((0x02,))
cpk = ind + point.x().to_bytes(32, byteorder='big')
return cpk
def secretToCPK(secret):
secretInt = secret if isinstance(secret, int) \
else int.from_bytes(secret, byteorder='big')
R = G * secretInt
Y = R.y().to_bytes(32, byteorder='big')
ind = bytes((0x03,)) if Y[31] % 2 else bytes((0x02,))
pubkey = ind + R.x().to_bytes(32, byteorder='big')
return pubkey
def getKeypair():
secretBytes = getSecretBytes()
return secretBytes, secretToCPK(secretBytes)
def hashToCurve(pubkey):
xBytes = hashlib.sha256(pubkey).digest()
x = int.from_bytes(xBytes, byteorder='big')
for k in range(0, 100):
# get matching y element for point
y_parity = 0 # always pick 0,
a = (powMod(x, 3, ep.p) + 7) % ep.p
y = powMod(a, (ep.p + 1) // 4, ep.p)
# print("before parity %x" % (y))
if y % 2 != y_parity:
y = ep.p - y
# If x is always mod P, can R ever not be on the curve?
try:
R = Point(curve_secp256k1, x, y, ep.o)
except Exception:
x = (x + 1) % ep.p # % P?
continue
if R == INFINITY or R * ep.o != INFINITY: # is R * O != INFINITY check necessary? Validation of Elliptic Curve Public Keys says no if cofactor = 1
x = (x + 1) % ep.p # % P?
continue
return R
raise ValueError('hashToCurve failed for 100 tries')
def hash256(inb):
return hashlib.sha256(inb).digest()
def testEccUtils():
print('testEccUtils()')
G_enc = ToDER(G)
assert(G_enc.hex() == '0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8')
G_enc = pointToCPK(G)
assert(G_enc.hex() == '0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798')
G_dec = CPKToPoint(G_enc)
assert(G_dec == G)
G_enc = pointToCPK2(G)
assert(G_enc.hex() == '0879be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798')
H = hashToCurve(ToDER(G))
assert(pointToCPK(H).hex() == '0250929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0')
print('Passed.')
if __name__ == "__main__":
testEccUtils()

31
basicswap/util/rfc2440.py Normal file
View File

@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
import hashlib
import secrets
def rfc2440_hash_password(password, salt=None):
# Match tor --hash-password
# secret_to_key_rfc2440
EXPBIAS = 6
c = 96
count = (16 + (c & 15)) << ((c >> 4) + EXPBIAS)
if salt is None:
salt = secrets.token_bytes(8)
assert(len(salt) == 8)
hashbytes = salt + password.encode('utf-8')
len_hashbytes = len(hashbytes)
h = hashlib.sha1()
while count > 0:
if count >= len_hashbytes:
h.update(hashbytes)
count -= len_hashbytes
continue
h.update(hashbytes[:count])
break
rv = '16:' + salt.hex() + '60' + h.hexdigest()
return rv.upper()

71
basicswap/util/script.py Normal file
View File

@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import struct
import hashlib
from basicswap.script import OpCodes
def decodeScriptNum(script_bytes, o):
v = 0
num_len = script_bytes[o]
if num_len >= OpCodes.OP_1 and num_len <= OpCodes.OP_16:
return((num_len - OpCodes.OP_1) + 1, 1)
if num_len > 4:
raise ValueError('Bad scriptnum length') # Max 4 bytes
if num_len + o >= len(script_bytes):
raise ValueError('Bad script length')
o += 1
for i in range(num_len):
b = script_bytes[o + i]
# Negative flag set in last byte, if num is positive and > 0x80 an extra 0x00 byte will be appended
if i == num_len - 1 and b & 0x80:
b &= (~(0x80) & 0xFF)
v += int(b) << 8 * i
v *= -1
else:
v += int(b) << 8 * i
return(v, 1 + num_len)
def getP2SHScriptForHash(p2sh):
return bytes((OpCodes.OP_HASH160, 0x14)) \
+ p2sh \
+ bytes((OpCodes.OP_EQUAL,))
def getP2WSH(script):
return bytes((OpCodes.OP_0, 0x20)) + hashlib.sha256(script).digest()
def SerialiseNumCompact(v):
if v < 253:
return bytes((v,))
if v <= 0xffff: # USHRT_MAX
return struct.pack("<BH", 253, v)
if v <= 0xffffffff: # UINT_MAX
return struct.pack("<BI", 254, v)
if v <= 0xffffffffffffffff: # UINT_MAX
return struct.pack("<BQ", 255, v)
raise ValueError('Value too large')
def getCompactSizeLen(v):
# Compact Size
if v < 253:
return 1
if v <= 0xffff: # USHRT_MAX
return 3
if v <= 0xffffffff: # UINT_MAX
return 5
if v <= 0xffffffffffffffff: # UINT_MAX
return 9
raise ValueError('Value too large')
def getWitnessElementLen(v):
return getCompactSizeLen(v) + v