network: Use Simplex direct chats.

This commit is contained in:
tecnovert
2025-05-25 13:11:59 +02:00
parent b57ff3497a
commit f3adc17bb8
16 changed files with 2123 additions and 464 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -36,6 +36,11 @@ class KeyTypes(IntEnum):
KAF = 6
class MessageNetworks(IntEnum):
SMSG = auto()
SIMPLEX = auto()
class MessageTypes(IntEnum):
OFFER = auto()
BID = auto()
@@ -53,6 +58,8 @@ class MessageTypes(IntEnum):
ADS_BID_LF = auto()
ADS_BID_ACCEPT_FL = auto()
CONNECT_REQ = auto()
class AddressTypes(IntEnum):
OFFER = auto()
@@ -111,6 +118,7 @@ class BidStates(IntEnum):
BID_EXPIRED = 31
BID_AACCEPT_DELAY = 32
BID_AACCEPT_FAIL = 33
CONNECT_REQ_SENT = 34
class TxStates(IntEnum):
@@ -228,6 +236,10 @@ class NotificationTypes(IntEnum):
BID_ACCEPTED = auto()
class ConnectionRequestTypes(IntEnum):
BID = 1
class AutomationOverrideOptions(IntEnum):
DEFAULT = 0
ALWAYS_ACCEPT = 1
@@ -339,6 +351,8 @@ def strBidState(state):
return "Auto accept delay"
if state == BidStates.BID_AACCEPT_FAIL:
return "Auto accept failed"
if state == BidStates.CONNECT_REQ_SENT:
return "Connect request sent"
return "Unknown" + " " + str(state)

View File

@@ -13,7 +13,7 @@ from enum import IntEnum, auto
from typing import Optional
CURRENT_DB_VERSION = 28
CURRENT_DB_VERSION = 29
CURRENT_DB_DATA_VERSION = 6
@@ -219,6 +219,7 @@ class Bid(Table):
bid_addr = Column("string")
pk_bid_addr = Column("blob")
proof_address = Column("string")
proof_signature = Column("blob")
proof_utxos = Column("blob")
# Address to spend lock tx to - address from wallet if empty TODO
withdraw_to_addr = Column("string")
@@ -658,6 +659,41 @@ class CoinRates(Table):
last_updated = Column("integer")
class MessageNetworks(Table):
__tablename__ = "message_networks"
record_id = Column("integer", primary_key=True, autoincrement=True)
active_ind = Column("integer")
name = Column("string")
created_at = Column("integer")
class DirectMessageRoute(Table):
__tablename__ = "direct_message_routes"
record_id = Column("integer", primary_key=True, autoincrement=True)
active_ind = Column("integer")
network_id = Column("integer")
linked_type = Column("integer")
linked_id = Column("blob")
smsg_addr_local = Column("string")
smsg_addr_remote = Column("string")
# smsg_addr_id_local = Column("integer") # SmsgAddress
# smsg_addr_id_remote = Column("integer") # KnownIdentity
route_data = Column("blob")
created_at = Column("integer")
class DirectMessageRouteLink(Table):
__tablename__ = "direct_message_route_links"
record_id = Column("integer", primary_key=True, autoincrement=True)
active_ind = Column("integer")
direct_message_route_id = Column("integer")
linked_type = Column("integer")
linked_id = Column("blob")
created_at = Column("integer")
def create_db_(con, log) -> None:
c = con.cursor()
@@ -915,6 +951,7 @@ class DBMethods:
query += f"{key}=:{key}"
cursor.execute(query, values)
return cursor.lastrowid
def query(
self,

View File

@@ -76,6 +76,10 @@ def remove_expired_data(self, time_offset: int = 0):
"DELETE FROM message_links WHERE linked_type = :type_ind AND linked_id = :linked_id",
{"type_ind": int(Concepts.BID), "linked_id": bid_row[0]},
)
cursor.execute(
"DELETE FROM direct_message_route_links WHERE linked_type = :type_ind AND linked_id = :linked_id",
{"type_ind": int(Concepts.BID), "linked_id": bid_row[0]},
)
cursor.execute(
"DELETE FROM eventlog WHERE eventlog.linked_type = :type_ind AND eventlog.linked_id = :offer_id",

View File

@@ -858,7 +858,7 @@ def js_automationstrategies(self, url_split, post_string: str, is_json: bool) ->
"label": strat_data.label,
"type_ind": strat_data.type_ind,
"only_known_identities": strat_data.only_known_identities,
"data": json.loads(strat_data.data.decode("utf-8")),
"data": json.loads(strat_data.data.decode("UTF-8")),
"note": "" if strat_data.note is None else strat_data.note,
}
return bytes(json.dumps(rv), "UTF-8")
@@ -992,7 +992,7 @@ def js_unlock(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
post_data = getFormData(post_string, is_json)
password = get_data_entry(post_data, "password")
password: str = get_data_entry(post_data, "password")
if have_data_entry(post_data, "coin"):
coin = getCoinType(str(get_data_entry(post_data, "coin")))
@@ -1167,6 +1167,49 @@ def js_coinprices(self, url_split, post_string, is_json) -> bytes:
)
def js_messageroutes(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
post_data = {} if post_string == "" else getFormData(post_string, is_json)
filters = {
"page_no": 1,
"limit": PAGE_LIMIT,
"sort_by": "created_at",
"sort_dir": "desc",
}
if have_data_entry(post_data, "sort_by"):
sort_by = get_data_entry(post_data, "sort_by")
ensure(
sort_by
in [
"created_at",
],
"Invalid sort by",
)
filters["sort_by"] = sort_by
if have_data_entry(post_data, "sort_dir"):
sort_dir = get_data_entry(post_data, "sort_dir")
ensure(sort_dir in ["asc", "desc"], "Invalid sort dir")
filters["sort_dir"] = sort_dir
if have_data_entry(post_data, "offset"):
filters["offset"] = int(get_data_entry(post_data, "offset"))
if have_data_entry(post_data, "limit"):
filters["limit"] = int(get_data_entry(post_data, "limit"))
ensure(filters["limit"] > 0, "Invalid limit")
if have_data_entry(post_data, "address_from"):
filters["address_from"] = get_data_entry(post_data, "address_from")
if have_data_entry(post_data, "address_to"):
filters["address_to"] = get_data_entry(post_data, "address_to")
action = get_data_entry_or(post_data, "action", None)
message_routes = swap_client.listMessageRoutes(filters, action)
return bytes(json.dumps(message_routes), "UTF-8")
endpoints = {
"coins": js_coins,
"wallets": js_wallets,
@@ -1194,6 +1237,7 @@ endpoints = {
"readurl": js_readurl,
"active": js_active,
"coinprices": js_coinprices,
"messageroutes": js_messageroutes,
}

View File

@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2024 tecnovert
# Copyright (c) 2025 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
@@ -23,6 +24,13 @@ protobuf ParseFromString would reset the whole object, from_bytes won't.
from basicswap.util.integer import encode_varint, decode_varint
NPBW_INT = 0
NPBW_BYTES = 2
NPBF_STR = 1
NPBF_BOOL = 2
class NonProtobufClass:
def __init__(self, init_all: bool = True, **kwargs):
for key, value in kwargs.items():
@@ -34,7 +42,7 @@ class NonProtobufClass:
found_field = True
break
if found_field is False:
raise ValueError(f"got an unexpected keyword argument '{key}'")
raise ValueError(f"Got an unexpected keyword argument '{key}'")
if init_all:
self.init_fields()
@@ -117,151 +125,160 @@ class NonProtobufClass:
class OfferMessage(NonProtobufClass):
_map = {
1: ("protocol_version", 0, 0),
2: ("coin_from", 0, 0),
3: ("coin_to", 0, 0),
4: ("amount_from", 0, 0),
5: ("amount_to", 0, 0),
6: ("min_bid_amount", 0, 0),
7: ("time_valid", 0, 0),
8: ("lock_type", 0, 0),
9: ("lock_value", 0, 0),
10: ("swap_type", 0, 0),
11: ("proof_address", 2, 1),
12: ("proof_signature", 2, 1),
13: ("pkhash_seller", 2, 0),
14: ("secret_hash", 2, 0),
15: ("fee_rate_from", 0, 0),
16: ("fee_rate_to", 0, 0),
17: ("amount_negotiable", 0, 2),
18: ("rate_negotiable", 0, 2),
19: ("proof_utxos", 2, 0),
1: ("protocol_version", NPBW_INT, 0),
2: ("coin_from", NPBW_INT, 0),
3: ("coin_to", NPBW_INT, 0),
4: ("amount_from", NPBW_INT, 0),
5: ("amount_to", NPBW_INT, 0),
6: ("min_bid_amount", NPBW_INT, 0),
7: ("time_valid", NPBW_INT, 0),
8: ("lock_type", NPBW_INT, 0),
9: ("lock_value", NPBW_INT, 0),
10: ("swap_type", NPBW_INT, 0),
11: ("proof_address", NPBW_BYTES, NPBF_STR),
12: ("proof_signature", NPBW_BYTES, NPBF_STR),
13: ("pkhash_seller", NPBW_BYTES, 0),
14: ("secret_hash", NPBW_BYTES, 0),
15: ("fee_rate_from", NPBW_INT, 0),
16: ("fee_rate_to", NPBW_INT, 0),
17: ("amount_negotiable", NPBW_INT, NPBF_BOOL),
18: ("rate_negotiable", NPBW_INT, NPBF_BOOL),
19: ("proof_utxos", NPBW_BYTES, 0),
20: ("auto_accept_type", 0, 0),
}
class BidMessage(NonProtobufClass):
_map = {
1: ("protocol_version", 0, 0),
2: ("offer_msg_id", 2, 0),
3: ("time_valid", 0, 0),
4: ("amount", 0, 0),
5: ("amount_to", 0, 0),
6: ("pkhash_buyer", 2, 0),
7: ("proof_address", 2, 1),
8: ("proof_signature", 2, 1),
9: ("proof_utxos", 2, 0),
10: ("pkhash_buyer_to", 2, 0),
1: ("protocol_version", NPBW_INT, 0),
2: ("offer_msg_id", NPBW_BYTES, 0),
3: ("time_valid", NPBW_INT, 0),
4: ("amount", NPBW_INT, 0),
5: ("amount_to", NPBW_INT, 0),
6: ("pkhash_buyer", NPBW_BYTES, 0),
7: ("proof_address", NPBW_BYTES, NPBF_STR),
8: ("proof_signature", NPBW_BYTES, NPBF_STR),
9: ("proof_utxos", NPBW_BYTES, 0),
10: ("pkhash_buyer_to", NPBW_BYTES, 0),
}
class BidAcceptMessage(NonProtobufClass):
# Step 3, seller -> buyer
_map = {
1: ("bid_msg_id", 2, 0),
2: ("initiate_txid", 2, 0),
3: ("contract_script", 2, 0),
4: ("pkhash_seller", 2, 0),
1: ("bid_msg_id", NPBW_BYTES, 0),
2: ("initiate_txid", NPBW_BYTES, 0),
3: ("contract_script", NPBW_BYTES, 0),
4: ("pkhash_seller", NPBW_BYTES, 0),
}
class OfferRevokeMessage(NonProtobufClass):
_map = {
1: ("offer_msg_id", 2, 0),
2: ("signature", 2, 0),
1: ("offer_msg_id", NPBW_BYTES, 0),
2: ("signature", NPBW_BYTES, 0),
}
class BidRejectMessage(NonProtobufClass):
_map = {
1: ("bid_msg_id", 2, 0),
2: ("reject_code", 0, 0),
1: ("bid_msg_id", NPBW_BYTES, 0),
2: ("reject_code", NPBW_INT, 0),
}
class XmrBidMessage(NonProtobufClass):
# MSG1L, F -> L
_map = {
1: ("protocol_version", 0, 0),
2: ("offer_msg_id", 2, 0),
3: ("time_valid", 0, 0),
4: ("amount", 0, 0),
5: ("amount_to", 0, 0),
6: ("pkaf", 2, 0),
7: ("kbvf", 2, 0),
8: ("kbsf_dleag", 2, 0),
9: ("dest_af", 2, 0),
1: ("protocol_version", NPBW_INT, 0),
2: ("offer_msg_id", NPBW_BYTES, 0),
3: ("time_valid", NPBW_INT, 0),
4: ("amount", NPBW_INT, 0),
5: ("amount_to", NPBW_INT, 0),
6: ("pkaf", NPBW_BYTES, 0),
7: ("kbvf", NPBW_BYTES, 0),
8: ("kbsf_dleag", NPBW_BYTES, 0),
9: ("dest_af", NPBW_BYTES, 0),
}
class XmrSplitMessage(NonProtobufClass):
_map = {
1: ("msg_id", 2, 0),
2: ("msg_type", 0, 0),
3: ("sequence", 0, 0),
4: ("dleag", 2, 0),
1: ("msg_id", NPBW_BYTES, 0),
2: ("msg_type", NPBW_INT, 0),
3: ("sequence", NPBW_INT, 0),
4: ("dleag", NPBW_BYTES, 0),
}
class XmrBidAcceptMessage(NonProtobufClass):
_map = {
1: ("bid_msg_id", 2, 0),
2: ("pkal", 2, 0),
3: ("kbvl", 2, 0),
4: ("kbsl_dleag", 2, 0),
1: ("bid_msg_id", NPBW_BYTES, 0),
2: ("pkal", NPBW_BYTES, 0),
3: ("kbvl", NPBW_BYTES, 0),
4: ("kbsl_dleag", NPBW_BYTES, 0),
# MSG2F
5: ("a_lock_tx", 2, 0),
6: ("a_lock_tx_script", 2, 0),
7: ("a_lock_refund_tx", 2, 0),
8: ("a_lock_refund_tx_script", 2, 0),
9: ("a_lock_refund_spend_tx", 2, 0),
10: ("al_lock_refund_tx_sig", 2, 0),
5: ("a_lock_tx", NPBW_BYTES, 0),
6: ("a_lock_tx_script", NPBW_BYTES, 0),
7: ("a_lock_refund_tx", NPBW_BYTES, 0),
8: ("a_lock_refund_tx_script", NPBW_BYTES, 0),
9: ("a_lock_refund_spend_tx", NPBW_BYTES, 0),
10: ("al_lock_refund_tx_sig", NPBW_BYTES, 0),
}
class XmrBidLockTxSigsMessage(NonProtobufClass):
# MSG3L
_map = {
1: ("bid_msg_id", 2, 0),
2: ("af_lock_refund_spend_tx_esig", 2, 0),
3: ("af_lock_refund_tx_sig", 2, 0),
1: ("bid_msg_id", NPBW_BYTES, 0),
2: ("af_lock_refund_spend_tx_esig", NPBW_BYTES, 0),
3: ("af_lock_refund_tx_sig", NPBW_BYTES, 0),
}
class XmrBidLockSpendTxMessage(NonProtobufClass):
# MSG4F
_map = {
1: ("bid_msg_id", 2, 0),
2: ("a_lock_spend_tx", 2, 0),
3: ("kal_sig", 2, 0),
1: ("bid_msg_id", NPBW_BYTES, 0),
2: ("a_lock_spend_tx", NPBW_BYTES, 0),
3: ("kal_sig", NPBW_BYTES, 0),
}
class XmrBidLockReleaseMessage(NonProtobufClass):
# MSG5F
_map = {
1: ("bid_msg_id", 2, 0),
2: ("al_lock_spend_tx_esig", 2, 0),
1: ("bid_msg_id", NPBW_BYTES, 0),
2: ("al_lock_spend_tx_esig", NPBW_BYTES, 0),
}
class ADSBidIntentMessage(NonProtobufClass):
# L -> F Sent from bidder, construct a reverse bid
_map = {
1: ("protocol_version", 0, 0),
2: ("offer_msg_id", 2, 0),
3: ("time_valid", 0, 0),
4: ("amount_from", 0, 0),
5: ("amount_to", 0, 0),
1: ("protocol_version", NPBW_INT, 0),
2: ("offer_msg_id", NPBW_BYTES, 0),
3: ("time_valid", NPBW_INT, 0),
4: ("amount_from", NPBW_INT, 0),
5: ("amount_to", NPBW_INT, 0),
}
class ADSBidIntentAcceptMessage(NonProtobufClass):
# F -> L Sent from offerer, construct a reverse bid
_map = {
1: ("bid_msg_id", 2, 0),
2: ("pkaf", 2, 0),
3: ("kbvf", 2, 0),
4: ("kbsf_dleag", 2, 0),
5: ("dest_af", 2, 0),
1: ("bid_msg_id", NPBW_BYTES, 0),
2: ("pkaf", NPBW_BYTES, 0),
3: ("kbvf", NPBW_BYTES, 0),
4: ("kbsf_dleag", NPBW_BYTES, 0),
5: ("dest_af", NPBW_BYTES, 0),
}
class ConnectReqMessage(NonProtobufClass):
_map = {
1: ("network_type", NPBW_INT, 0),
2: ("network_data", NPBW_BYTES, 0),
3: ("request_type", NPBW_INT, 0),
4: ("request_data", NPBW_BYTES, 0),
}

View File

@@ -8,6 +8,7 @@
import base64
import json
import threading
import traceback
import websocket
@@ -25,9 +26,6 @@ from basicswap.util.address import (
b58decode,
decodeWif,
)
from basicswap.basicswap_util import (
BidStates,
)
def encode_base64(data: bytes) -> str:
@@ -52,6 +50,20 @@ class WebSocketThread(threading.Thread):
self.recv_queue = Queue()
self.cmd_recv_queue = Queue()
self.delayed_events_queue = Queue()
self.ignore_events: bool = False
self.num_messages_received: int = 0
def disable_debug_mode(self):
self.ignore_events = False
for i in range(100):
try:
message = self.delayed_events_queue.get(block=False)
except Empty:
break
self.recv_queue.put(message)
def on_message(self, ws, message):
if self.logger:
@@ -62,6 +74,7 @@ class WebSocketThread(threading.Thread):
if message.startswith('{"corrId"'):
self.cmd_recv_queue.put(message)
else:
self.num_messages_received += 1
self.recv_queue.put(message)
def queue_get(self):
@@ -106,6 +119,18 @@ class WebSocketThread(threading.Thread):
self.ws.send(cmd)
return self.corrId
def wait_for_command_response(self, cmd_id):
cmd_id = str(cmd_id)
for i in range(100):
message = self.cmd_queue_get()
if message is not None:
data = json.loads(message)
if "corrId" in data:
if data["corrId"] == cmd_id:
return data
self.delay_event.wait(0.5)
raise ValueError(f"waitForResponse timed-out waiting for id: {cmd_id}")
def run(self):
self.ws = websocket.WebSocketApp(
self.url,
@@ -130,7 +155,6 @@ def waitForResponse(ws_thread, sent_id, delay_event):
message = ws_thread.cmd_queue_get()
if message is not None:
data = json.loads(message)
# print(f"json: {json.dumps(data, indent=4)}")
if "corrId" in data:
if data["corrId"] == sent_id:
return data
@@ -174,10 +198,17 @@ def getPrivkeyForAddress(self, addr) -> bytes:
raise ValueError("key not found")
def sendSimplexMsg(
self, network, addr_from: str, addr_to: str, payload: bytes, msg_valid: int, cursor
def encryptMsg(
self,
addr_from: str,
addr_to: str,
payload: bytes,
msg_valid: int,
cursor,
timestamp=None,
deterministic=False,
) -> bytes:
self.log.debug("sendSimplexMsg")
self.log.debug("encryptMsg")
try:
rv = self.callrpc(
@@ -210,12 +241,38 @@ def sendSimplexMsg(
privkey_from = getPrivkeyForAddress(self, addr_from)
payload += bytes((0,)) # Include null byte to match smsg
smsg_msg: bytes = smsgEncrypt(privkey_from, pubkey_to, payload)
smsg_msg: bytes = smsgEncrypt(
privkey_from, pubkey_to, payload, timestamp, deterministic
)
return smsg_msg
def sendSimplexMsg(
self,
network,
addr_from: str,
addr_to: str,
payload: bytes,
msg_valid: int,
cursor,
timestamp: int = None,
deterministic: bool = False,
to_user_name: str = None,
) -> bytes:
self.log.debug("sendSimplexMsg")
smsg_msg: bytes = encryptMsg(
self, addr_from, addr_to, payload, msg_valid, cursor, timestamp, deterministic
)
smsg_id = smsgGetID(smsg_msg)
ws_thread = network["ws_thread"]
sent_id = ws_thread.send_command("#bsx " + encode_base64(smsg_msg))
if to_user_name is not None:
to = "@" + to_user_name + " "
else:
to = "#bsx "
sent_id = ws_thread.send_command(to + encode_base64(smsg_msg))
response = waitForResponse(ws_thread, sent_id, self.delay_event)
if response["resp"]["type"] != "newChatItems":
json_str = json.dumps(response, indent=4)
@@ -243,8 +300,10 @@ def decryptSimplexMsg(self, msg_data):
# Try with all active bid/offer addresses
query: str = """SELECT DISTINCT address FROM (
SELECT bid_addr AS address FROM bids WHERE active_ind = 1
AND (in_progress = 1 OR (state > :bid_received AND state < :bid_completed) OR (state IN (:bid_received, :bid_sent) AND expire_at > :now))
SELECT b.bid_addr AS address FROM bids b
JOIN bidstates s ON b.state = s.state_id
WHERE b.active_ind = 1
AND (s.in_progress OR (s.swap_ended = 0 AND b.expire_at > :now))
UNION
SELECT addr_from AS address FROM offers WHERE active_ind = 1 AND expire_at > :now
)"""
@@ -253,15 +312,7 @@ def decryptSimplexMsg(self, msg_data):
try:
cursor = self.openDB()
addr_rows = cursor.execute(
query,
{
"bid_received": int(BidStates.BID_RECEIVED),
"bid_completed": int(BidStates.SWAP_COMPLETED),
"bid_sent": int(BidStates.BID_SENT),
"now": now,
},
).fetchall()
addr_rows = cursor.execute(query, {"now": now}).fetchall()
finally:
self.closeDB(cursor, commit=False)
@@ -283,42 +334,97 @@ def decryptSimplexMsg(self, msg_data):
return decrypted
def parseSimplexMsg(self, chat_item):
item_status = chat_item["chatItem"]["meta"]["itemStatus"]
dir_type = item_status["type"]
if dir_type not in ("sndRcvd", "rcvNew"):
return None
snd_progress = item_status.get("sndProgress", None)
if snd_progress and snd_progress != "complete":
item_id = chat_item["chatItem"]["meta"]["itemId"]
self.log.debug(f"simplex chat item {item_id} {snd_progress}")
return None
conn_id = None
msg_dir: str = "recv" if dir_type == "rcvNew" else "sent"
chat_type: str = chat_item["chatInfo"]["type"]
if chat_type == "group":
chat_name = chat_item["chatInfo"]["groupInfo"]["localDisplayName"]
conn_id = chat_item["chatInfo"]["groupInfo"]["groupId"]
self.num_group_simplex_messages_received += 1
elif chat_type == "direct":
chat_name = chat_item["chatInfo"]["contact"]["localDisplayName"]
conn_id = chat_item["chatInfo"]["contact"]["activeConn"]["connId"]
self.num_direct_simplex_messages_received += 1
else:
return None
msg_content = chat_item["chatItem"]["content"]["msgContent"]["text"]
try:
msg_data: bytes = decode_base64(msg_content)
decrypted_msg = decryptSimplexMsg(self, msg_data)
if decrypted_msg is None:
return None
decrypted_msg["chat_type"] = chat_type
decrypted_msg["chat_name"] = chat_name
decrypted_msg["conn_id"] = conn_id
decrypted_msg["msg_dir"] = msg_dir
return decrypted_msg
except Exception as e: # noqa: F841
# self.log.debug(f"decryptSimplexMsg error: {e}")
self.log.debug(f"decryptSimplexMsg error: {e}")
pass
return None
def processEvent(self, ws_thread, msg_type: str, data) -> bool:
if ws_thread.ignore_events:
if msg_type not in ("contactConnected", "contactDeletedByContact"):
return False
ws_thread.delayed_events_queue.put(json.dumps(data))
return True
if msg_type == "contactConnected":
self.processContactConnected(data)
elif msg_type == "contactDeletedByContact":
self.processContactDisconnected(data)
else:
return False
return True
def readSimplexMsgs(self, network):
ws_thread = network["ws_thread"]
for i in range(100):
message = ws_thread.queue_get()
if message is None:
break
if self.delay_event.is_set():
break
data = json.loads(message)
# self.log.debug(f"message 1: {json.dumps(data, indent=4)}")
# self.log.debug(f"Message: {json.dumps(data, indent=4)}")
try:
if data["resp"]["type"] in ("chatItemsStatusesUpdated", "newChatItems"):
msg_type: str = data["resp"]["type"]
if msg_type in ("chatItemsStatusesUpdated", "newChatItems"):
for chat_item in data["resp"]["chatItems"]:
item_status = chat_item["chatItem"]["meta"]["itemStatus"]
if item_status["type"] in ("sndRcvd", "rcvNew"):
snd_progress = item_status.get("sndProgress", None)
if snd_progress:
if snd_progress != "complete":
item_id = chat_item["chatItem"]["meta"]["itemId"]
self.log.debug(
f"simplex chat item {item_id} {snd_progress}"
)
continue
try:
msg_data: bytes = decode_base64(
chat_item["chatItem"]["content"]["msgContent"]["text"]
)
decrypted_msg = decryptSimplexMsg(self, msg_data)
if decrypted_msg is None:
continue
self.processMsg(decrypted_msg)
except Exception as e: # noqa: F841
# self.log.debug(f"decryptSimplexMsg error: {e}")
pass
decrypted_msg = parseSimplexMsg(self, chat_item)
if decrypted_msg is None:
continue
self.processMsg(decrypted_msg)
elif msg_type == "chatError":
# self.log.debug(f"chatError Message: {json.dumps(data, indent=4)}")
pass
elif processEvent(self, ws_thread, msg_type, data):
pass
else:
self.log.debug(f"Unknown msg_type: {msg_type}")
# self.log.debug(f"Message: {json.dumps(data, indent=4)}")
except Exception as e:
self.log.debug(f"readSimplexMsgs error: {e}")
if self.debug:
self.log.error(traceback.format_exc())
self.delay_event.wait(0.05)
@@ -348,3 +454,37 @@ def initialiseSimplexNetwork(self, network_config) -> None:
}
self.active_networks.append(network)
def closeSimplexChat(self, net_i, connId) -> bool:
cmd_id = net_i.send_command("/chats")
response = net_i.wait_for_command_response(cmd_id)
remote_name = None
for chat in response["resp"]["chats"]:
if (
"chatInfo" not in chat
or "type" not in chat["chatInfo"]
or chat["chatInfo"]["type"] != "direct"
):
continue
try:
if chat["chatInfo"]["contact"]["activeConn"]["connId"] == connId:
remote_name = chat["chatInfo"]["contact"]["localDisplayName"]
break
except Exception as e:
self.log.debug(f"Error parsing chat: {e}")
if remote_name is None:
self.log.warning(
f"Unable to find remote name for simplex direct chat, ID: {connId}"
)
return False
self.log.debug(f"Deleting simplex chat @{remote_name}, connID {connId}")
cmd_id = net_i.send_command(f"/delete @{remote_name}")
cmd_response = net_i.wait_for_command_response(cmd_id)
if cmd_response["resp"]["type"] != "contactDeleted":
self.log.warning(f"Failed to delete simplex chat, ID: {connId}")
self.log.debug("cmd_response: {}".format(json.dumps(cmd_response, indent=4)))
return False
return True

View File

@@ -76,7 +76,6 @@ def startSimplexClient(
os.makedirs(data_path)
db_path = os.path.join(data_path, "simplex_client_data")
args = [bin_path, "-d", db_path, "-s", server_address, "-p", str(websocket_port)]
if not os.path.exists(db_path):

93
basicswap/ui/app.py Normal file
View File

@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2025 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import json
from basicswap.db import getOrderByStr
class UIApp:
def listMessageRoutes(self, filters={}, action=None):
cursor = self.openDB()
try:
rv = []
query_data: dict = {}
filter_query_str: str = ""
address_from: str = filters.get("address_from", None)
if address_from is not None:
filter_query_str += " AND smsg_addr_local = :address_from "
query_data["address_from"] = address_from
address_to: str = filters.get("address_to", None)
if address_from is not None:
filter_query_str += " AND smsg_addr_remote = :address_to "
query_data["address_to"] = address_to
if action is None:
pass
elif action == "clear":
self.log.info("Clearing message routes")
query_str: str = (
"SELECT record_id, network_id, route_data"
+ " FROM direct_message_routes "
+ " WHERE active_ind = 1 "
)
query_str += filter_query_str
rows = cursor.execute(query_str, query_data).fetchall()
for row in rows:
record_id, network_id, route_data = row
route_data = json.loads(route_data.decode("UTF-8"))
self.closeMessageRoute(record_id, network_id, route_data, cursor)
else:
raise ValueError("Unknown action")
query_str: str = (
"SELECT record_id, network_id, linked_type, linked_id, "
+ " smsg_addr_local, smsg_addr_remote, route_data, created_at"
+ " FROM direct_message_routes "
+ " WHERE active_ind = 1 "
)
query_str += filter_query_str
query_str += getOrderByStr(filters)
limit = filters.get("limit", None)
if limit is not None:
query_str += " LIMIT :limit"
query_data["limit"] = limit
offset = filters.get("offset", None)
if offset is not None:
query_str += " OFFSET :offset"
query_data["offset"] = offset
q = cursor.execute(query_str, query_data)
rv = []
for row in q:
(
record_id,
network_id,
linked_type,
linked_id,
smsg_addr_local,
smsg_addr_remote,
route_data,
created_at,
) = row
rv.append(
{
"record_id": record_id,
"network_id": network_id,
"smsg_addr_local": smsg_addr_local,
"smsg_addr_remote": smsg_addr_remote,
"route_data": json.loads(route_data.decode("UTF-8")),
}
)
return rv
finally:
self.closeDB(cursor, commit=False)

View File

@@ -83,19 +83,35 @@ def smsgGetID(smsg_message: bytes) -> bytes:
return smsg_timestamp.to_bytes(8, byteorder="big") + ripemd160(smsg_message[8:])
def smsgEncrypt(privkey_from: bytes, pubkey_to: bytes, payload: bytes) -> bytes:
def smsgEncrypt(
privkey_from: bytes,
pubkey_to: bytes,
payload: bytes,
smsg_timestamp: int = None,
deterministic: bool = False,
) -> bytes:
# assert len(payload) < 128 # Requires lz4 if payload > 128 bytes
# TODO: Add lz4 to match core smsg
smsg_timestamp = int(time.time())
r = getSecretInt().to_bytes(32, byteorder="big")
if deterministic:
assert smsg_timestamp is not None
h = hashlib.sha256(b"smsg")
h.update(privkey_from)
h.update(pubkey_to)
h.update(payload)
h.update(smsg_timestamp.to_bytes(8, byteorder="big"))
r = h.digest()
smsg_iv: bytes = hashlib.sha256(b"smsg_iv" + r).digest()[:16]
else:
r = getSecretInt().to_bytes(32, byteorder="big")
smsg_iv: bytes = secrets.token_bytes(16)
if smsg_timestamp is None:
smsg_timestamp = int(time.time())
R = PublicKey.from_secret(r).format()
p = PrivateKey(r).ecdh(pubkey_to)
H = hashlib.sha512(p).digest()
key_e: bytes = H[:32]
key_m: bytes = H[32:]
smsg_iv: bytes = secrets.token_bytes(16)
payload_hash: bytes = sha256(sha256(payload))
signature: bytes = PrivateKey(privkey_from).sign_recoverable(
payload_hash, hasher=None

View File

@@ -142,7 +142,7 @@ Observe progress with
## Start a subset of the configured coins using docker
docker compose run --service-ports swapclient basicswap-run -datadir=/coindata -withcoins=monero
docker compose run --rm --service-ports swapclient basicswap-run -datadir=/coindata -withcoins=monero

View File

@@ -184,7 +184,7 @@ def wait_for_bid(
swap_client.log.debug(
f"TEST: wait_for_bid {bid_id.hex()}: Bid not found."
)
raise ValueError("wait_for_bid timed out.")
raise ValueError(f"wait_for_bid timed out {bid_id.hex()}.")
def wait_for_bid_tx_state(
@@ -331,7 +331,7 @@ def wait_for_balance(
delay_event.wait(delay_time)
i += 1
if i > iterations:
raise ValueError("Expect {} {}".format(balance_key, expect_amount))
raise ValueError(f"Expect {balance_key} {expect_amount}")
def wait_for_unspent(
@@ -347,11 +347,11 @@ def wait_for_unspent(
delay_event.wait(delay_time)
i += 1
if i > iterations:
raise ValueError("wait_for_unspent {}".format(expect_amount))
raise ValueError(f"wait_for_unspent {expect_amount}")
def delay_for(delay_event, delay_for=60):
logging.info("Delaying for {} seconds.".format(delay_for))
logging.info(f"Delaying for {delay_for} seconds.")
delay_event.wait(delay_for)
@@ -375,9 +375,7 @@ def waitForRPC(rpc_func, delay_event, rpc_command="getwalletinfo", max_tries=7):
except Exception as ex:
if i < max_tries:
logging.warning(
"Can't connect to RPC: %s. Retrying in %d second/s.",
str(ex),
(i + 1),
f"Can't connect to RPC: {ex}. Retrying in {i + 1} second/s."
)
delay_event.wait(i + 1)
raise ValueError("waitForRPC failed")

View File

@@ -89,15 +89,19 @@ DOGECOIN_RPC_PORT_BASE = int(os.getenv("DOGECOIN_RPC_PORT_BASE", DOGE_BASE_RPC_P
EXTRA_CONFIG_JSON = json.loads(os.getenv("EXTRA_CONFIG_JSON", "{}"))
def waitForBidState(delay_event, port, bid_id, state_str, wait_for=60):
def waitForBidState(delay_event, port, bid_id, wait_for_state, wait_for=60):
for i in range(wait_for):
if delay_event.is_set():
raise ValueError("Test stopped.")
bid = json.loads(
urlopen("http://127.0.0.1:12700/json/bids/{}".format(bid_id)).read()
)
if bid["bid_state"] == state_str:
return
if isinstance(wait_for_state, (list, tuple)):
if bid["bid_state"] in wait_for_state:
return
else:
if bid["bid_state"] == wait_for_state:
return
delay_event.wait(1)
raise ValueError("waitForBidState failed")

View File

@@ -17,8 +17,7 @@ docker run \
Fingerprint: Q8SNxc2SRcKyXlhJM8KFUgPNW4KXPGRm4eSLtT_oh-I=
export SIMPLEX_SERVER_ADDRESS=smp://Q8SNxc2SRcKyXlhJM8KFUgPNW4KXPGRm4eSLtT_oh-I=:password@127.0.0.1:5223,443
export SIMPLEX_SERVER_ADDRESS=smp://Q8SNxc2SRcKyXlhJM8KFUgPNW4KXPGRm4eSLtT_oh-I=:password@127.0.0.1:5223
https://github.com/simplex-chat/simplex-chat/issues/4127
json: {"corrId":"3","cmd":"/_send #1 text test123"}
@@ -53,10 +52,14 @@ from tests.basicswap.common import (
wait_for_bid,
wait_for_offer,
)
from tests.basicswap.util import read_json_api
from tests.basicswap.test_xmr import BaseTest, test_delay_event, RESET_TEST
SIMPLEX_SERVER_ADDRESS = os.getenv("SIMPLEX_SERVER_ADDRESS")
SIMPLEX_SERVER_FINGERPRINT = os.getenv("SIMPLEX_SERVER_FINGERPRINT", "")
SIMPLEX_SERVER_ADDRESS = os.getenv(
"SIMPLEX_SERVER_ADDRESS",
f"smp://{SIMPLEX_SERVER_FINGERPRINT}:password@127.0.0.1:5223",
)
SIMPLEX_CLIENT_PATH = os.path.expanduser(os.getenv("SIMPLEX_CLIENT_PATH"))
TEST_DIR = cfg.TEST_DATADIRS
@@ -67,6 +70,32 @@ if not len(logger.handlers):
logger.addHandler(logging.StreamHandler(sys.stdout))
def parse_message(msg_data):
if msg_data["resp"]["type"] not in ("chatItemsStatusesUpdated", "newChatItems"):
return None
for chat_item in msg_data["resp"]["chatItems"]:
chat_type: str = chat_item["chatInfo"]["type"]
if chat_type == "group":
chat_name = chat_item["chatInfo"]["groupInfo"]["localDisplayName"]
elif chat_type == "direct":
chat_name = chat_item["chatInfo"]["contact"]["localDisplayName"]
else:
return None
dir_type = chat_item["chatItem"]["meta"]["itemStatus"]["type"]
msg_dir = "recv" if dir_type == "rcvNew" else "sent"
if dir_type in ("sndRcvd", "rcvNew"):
msg_content = chat_item["chatItem"]["content"]["msgContent"]["text"]
return {
"text": msg_content,
"chat_type": chat_type,
"chat_name": chat_name,
"msg_dir": msg_dir,
}
return None
class TestSimplex(unittest.TestCase):
daemons = []
remove_testdir: bool = False
@@ -79,10 +108,10 @@ class TestSimplex(unittest.TestCase):
if os.path.isdir(TEST_DIR):
if RESET_TEST:
logging.info("Removing " + TEST_DIR)
logger.info("Removing " + TEST_DIR)
shutil.rmtree(TEST_DIR)
else:
logging.info("Restoring instance from " + TEST_DIR)
logger.info("Restoring instance from " + TEST_DIR)
if not os.path.exists(TEST_DIR):
os.makedirs(TEST_DIR)
@@ -134,8 +163,8 @@ class TestSimplex(unittest.TestCase):
ws_thread.send_command("/set reactions #bsx off")
ws_thread.send_command("/set reports #bsx off")
ws_thread.send_command("/set disappear #bsx on week")
sent_id = ws_thread.send_command("/create link #bsx")
sent_id = ws_thread.send_command("/create link #bsx")
connReqContact = None
connReqMsgData = waitForResponse(ws_thread, sent_id, test_delay_event)
connReqContact = connReqMsgData["resp"]["connReqContact"]
@@ -151,65 +180,199 @@ class TestSimplex(unittest.TestCase):
response = waitForResponse(ws_thread2, sent_id, test_delay_event)
assert len(response["resp"]["groups"]) == 1
ws_thread.send_command("#bsx test msg 1")
sent_id = ws_thread2.send_command("/connect")
response = waitForResponse(ws_thread2, sent_id, test_delay_event)
with open(os.path.join(client2_dir, "chat_inv.txt"), "w") as fp:
fp.write(json.dumps(response, indent=4))
connReqInvitation = response["resp"]["connReqInvitation"]
logger.info(f"direct_link: {connReqInvitation}")
pccConnId_2_sent = response["resp"]["connection"]["pccConnId"]
print(f"pccConnId_2_sent: {pccConnId_2_sent}")
sent_id = ws_thread.send_command(f"/connect {connReqInvitation}")
response = waitForResponse(ws_thread, sent_id, test_delay_event)
with open(os.path.join(client1_dir, "chat_inv_accept.txt"), "w") as fp:
fp.write(json.dumps(response, indent=4))
pccConnId_1_accepted = response["resp"]["connection"]["pccConnId"]
print(f"pccConnId_1_accepted: {pccConnId_1_accepted}")
sent_id = ws_thread.send_command("/chats")
response = waitForResponse(ws_thread, sent_id, test_delay_event)
with open(os.path.join(client1_dir, "chats.txt"), "w") as fp:
fp.write(json.dumps(response, indent=4))
direct_local_name_1 = None
for chat in response["resp"]["chats"]:
print(f"chat: {chat}")
if (
chat["chatInfo"]["contact"]["activeConn"]["connId"]
== pccConnId_1_accepted
):
direct_local_name_1 = chat["chatInfo"]["contact"][
"localDisplayName"
]
break
print(f"direct_local_name_1: {direct_local_name_1}")
sent_id = ws_thread2.send_command("/chats")
response = waitForResponse(ws_thread2, sent_id, test_delay_event)
with open(os.path.join(client2_dir, "chats.txt"), "w") as fp:
fp.write(json.dumps(response, indent=4))
direct_local_name_2 = None
for chat in response["resp"]["chats"]:
print(f"chat: {chat}")
if (
chat["chatInfo"]["contact"]["activeConn"]["connId"]
== pccConnId_2_sent
):
direct_local_name_2 = chat["chatInfo"]["contact"][
"localDisplayName"
]
break
print(f"direct_local_name_2: {direct_local_name_2}")
# localDisplayName in chats doesn't match the contactConnected message.
assert direct_local_name_1 == "user_1"
assert direct_local_name_2 == "user_1"
sent_id = ws_thread.send_command("#bsx test msg 1")
response = waitForResponse(ws_thread, sent_id, test_delay_event)
assert response["resp"]["type"] == "newChatItems"
sent_id = ws_thread.send_command("@user_1 test msg 2")
response = waitForResponse(ws_thread, sent_id, test_delay_event)
assert response["resp"]["type"] == "newChatItems"
msg_counter1: int = 0
msg_counter2: int = 0
found = [dict(), dict()]
found_connected = [dict(), dict()]
found_1 = False
found_2 = False
for i in range(100):
message = ws_thread.queue_get()
if message is not None:
if test_delay_event.is_set():
break
for k in range(100):
message = ws_thread.queue_get()
if message is None or test_delay_event.is_set():
break
msg_counter1 += 1
data = json.loads(message)
# print(f"message 1: {json.dumps(data, indent=4)}")
try:
if data["resp"]["type"] in (
"chatItemsStatusesUpdated",
"newChatItems",
):
for chat_item in data["resp"]["chatItems"]:
# print(f"chat_item 1: {json.dumps(chat_item, indent=4)}")
if chat_item["chatItem"]["meta"]["itemStatus"][
"type"
] in ("sndRcvd", "rcvNew"):
if (
chat_item["chatItem"]["content"]["msgContent"][
"text"
]
== "test msg 1"
):
found_1 = True
msg_type = data["resp"]["type"]
except Exception as e:
print(f"msg_type error: {e}")
msg_type = "None"
with open(
os.path.join(
client1_dir, f"recv_{msg_counter1}_{msg_type}.txt"
),
"w",
) as fp:
fp.write(json.dumps(data, indent=4))
if msg_type == "contactConnected":
found_connected[0][msg_counter1] = data
continue
try:
simplex_msg = parse_message(data)
if simplex_msg:
simplex_msg["msg_id"] = msg_counter1
found[0][msg_counter1] = simplex_msg
except Exception as e:
print(f"error 1: {e}")
message = ws_thread2.queue_get()
if message is not None:
for k in range(100):
message = ws_thread2.queue_get()
if message is None or test_delay_event.is_set():
break
msg_counter2 += 1
data = json.loads(message)
# print(f"message 2: {json.dumps(data, indent=4)}")
try:
if data["resp"]["type"] in (
"chatItemsStatusesUpdated",
"newChatItems",
):
for chat_item in data["resp"]["chatItems"]:
# print(f"chat_item 1: {json.dumps(chat_item, indent=4)}")
if chat_item["chatItem"]["meta"]["itemStatus"][
"type"
] in ("sndRcvd", "rcvNew"):
if (
chat_item["chatItem"]["content"]["msgContent"][
"text"
]
== "test msg 1"
):
found_2 = True
msg_type = data["resp"]["type"]
except Exception as e:
print(f"msg_type error: {e}")
msg_type = "None"
with open(
os.path.join(
client2_dir, f"recv_{msg_counter2}_{msg_type}.txt"
),
"w",
) as fp:
fp.write(json.dumps(data, indent=4))
if msg_type == "contactConnected":
found_connected[1][msg_counter2] = data
continue
try:
simplex_msg = parse_message(data)
if simplex_msg:
simplex_msg["msg_id"] = msg_counter2
found[1][msg_counter2] = simplex_msg
except Exception as e:
print(f"error 2: {e}")
if found_1 and found_2:
if (
len(found[0]) >= 2
and len(found[1]) >= 2
and len(found_connected[0]) >= 1
and len(found_connected[1]) >= 1
):
break
test_delay_event.wait(0.5)
assert found_1 is True
assert found_2 is True
assert len(found_connected[0]) == 1
node1_connect = list(found_connected[0].values())[0]
assert (
node1_connect["resp"]["contact"]["activeConn"]["connId"]
== pccConnId_1_accepted
)
assert node1_connect["resp"]["contact"]["localDisplayName"] == "user_2"
assert len(found_connected[1]) == 1
node2_connect = list(found_connected[1].values())[0]
assert (
node2_connect["resp"]["contact"]["activeConn"]["connId"]
== pccConnId_2_sent
)
assert node2_connect["resp"]["contact"]["localDisplayName"] == "user_2"
node1_msg1 = [m for m in found[0].values() if m["text"] == "test msg 1"]
assert len(node1_msg1) == 1
node1_msg1 = node1_msg1[0]
assert node1_msg1["chat_type"] == "group"
assert node1_msg1["chat_name"] == "bsx"
assert node1_msg1["msg_dir"] == "sent"
node1_msg2 = [m for m in found[0].values() if m["text"] == "test msg 2"]
assert len(node1_msg2) == 1
node1_msg2 = node1_msg2[0]
assert node1_msg2["chat_type"] == "direct"
assert node1_msg2["chat_name"] == "user_1"
assert node1_msg2["msg_dir"] == "sent"
node2_msg1 = [m for m in found[1].values() if m["text"] == "test msg 1"]
assert len(node2_msg1) == 1
node2_msg1 = node2_msg1[0]
assert node2_msg1["chat_type"] == "group"
assert node2_msg1["chat_name"] == "bsx"
assert node2_msg1["msg_dir"] == "recv"
node2_msg2 = [m for m in found[1].values() if m["text"] == "test msg 2"]
assert len(node2_msg2) == 1
node2_msg2 = node2_msg2[0]
assert node2_msg2["chat_type"] == "direct"
assert node2_msg2["chat_name"] == "user_1"
assert node2_msg2["msg_dir"] == "recv"
sent_id = ws_thread.send_command("/delete @user_1")
response = waitForResponse(ws_thread, sent_id, test_delay_event)
assert response["resp"]["type"] == "contactDeleted"
sent_id = ws_thread2.send_command("/delete @user_1")
response = waitForResponse(ws_thread2, sent_id, test_delay_event)
assert response["resp"]["type"] == "contactDeleted"
sent_id = ws_thread2.send_command("/chats")
response = waitForResponse(ws_thread2, sent_id, test_delay_event)
with open(os.path.join(client2_dir, "chats_after_delete.txt"), "w") as fp:
fp.write(json.dumps(response, indent=4))
assert len(response["resp"]["chats"]) == 4
finally:
for t in threads:
@@ -277,7 +440,7 @@ class Test(BaseTest):
@classmethod
def tearDownClass(cls):
logging.info("Finalising Test")
logger.info("Finalising Test")
super(Test, cls).tearDownClass()
stopDaemons(cls.daemons)
@@ -295,17 +458,24 @@ class Test(BaseTest):
]
def test_01_swap(self):
logging.info("---------- Test xmr swap")
logger.info("---------- Test adaptor sig swap")
swap_clients = self.swap_clients
for sc in swap_clients:
sc.dleag_split_size_init = 9000
sc.dleag_split_size = 11000
sc._dleag_split_size_init = 9000
sc._dleag_split_size = 11000
sc._use_direct_messages = False
assert len(swap_clients[0].active_networks) == 1
assert swap_clients[0].active_networks[0]["type"] == "simplex"
num_direct_messages_received_before = [0] * 3
for i in range(3):
num_direct_messages_received_before[i] = swap_clients[
i
].num_direct_simplex_messages_received
coin_from = Coins.BTC
coin_to = self.coin_to
@@ -340,3 +510,518 @@ class Test(BaseTest):
sent=True,
wait_for=320,
)
for i in range(3):
assert (
num_direct_messages_received_before[i]
== swap_clients[i].num_direct_simplex_messages_received
)
def test_01_swap_reverse(self):
logger.info("---------- Test adaptor sig swap reverse")
swap_clients = self.swap_clients
for sc in swap_clients:
sc._dleag_split_size_init = 9000
sc._dleag_split_size = 11000
sc._use_direct_messages = False
assert len(swap_clients[0].active_networks) == 1
assert swap_clients[0].active_networks[0]["type"] == "simplex"
num_direct_messages_received_before = [0] * 3
for i in range(3):
num_direct_messages_received_before[i] = swap_clients[
i
].num_direct_simplex_messages_received
coin_from = self.coin_to
coin_to = Coins.BTC
ci_from = swap_clients[1].ci(coin_from)
ci_to = swap_clients[0].ci(coin_to)
swap_value = ci_from.make_int(random.uniform(0.2, 20.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[1].postOffer(
coin_from, coin_to, swap_value, rate_swap, swap_value, SwapTypes.XMR_SWAP
)
wait_for_offer(test_delay_event, swap_clients[0], offer_id)
offer = swap_clients[0].getOffer(offer_id)
bid_id = swap_clients[0].postBid(offer_id, offer.amount_from)
wait_for_bid(test_delay_event, swap_clients[1], bid_id, BidStates.BID_RECEIVED)
swap_clients[1].acceptBid(bid_id)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
wait_for=320,
)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=320,
)
for i in range(3):
assert (
num_direct_messages_received_before[i]
== swap_clients[i].num_direct_simplex_messages_received
)
def test_02_direct(self):
logger.info("---------- Test adaptor sig swap with direct messages")
swap_clients = self.swap_clients
for sc in swap_clients:
sc._dleag_split_size_init = 9000
sc._dleag_split_size = 11000
sc._use_direct_message_routes = True
assert len(swap_clients[0].active_networks) == 1
assert swap_clients[0].active_networks[0]["type"] == "simplex"
num_direct_messages_received_before = [0] * 3
num_group_messages_received_before = [0] * 3
for i in range(3):
num_direct_messages_received_before[i] = swap_clients[
i
].num_direct_simplex_messages_received
num_group_messages_received_before[i] = swap_clients[
i
].num_group_simplex_messages_received
coin_from = Coins.BTC
coin_to = self.coin_to
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[1].ci(coin_to)
swap_value = ci_from.make_int(random.uniform(0.2, 20.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[0].postOffer(
coin_from, coin_to, swap_value, rate_swap, swap_value, SwapTypes.XMR_SWAP
)
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.BID_RECEIVED,
wait_for=60,
)
swap_clients[0].acceptBid(bid_id)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.SWAP_COMPLETED,
wait_for=320,
)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=320,
)
for i in range(3):
swap_clients[
i
].num_group_simplex_messages_received == num_group_messages_received_before[
i
] + 2
swap_clients[
2
].num_direct_simplex_messages_received == num_direct_messages_received_before[2]
def test_02_direct_reverse(self):
logger.info(
"---------- Test test_02_direct_reverse adaptor sig swap with direct messages"
)
swap_clients = self.swap_clients
for sc in swap_clients:
sc._dleag_split_size_init = 9000
sc._dleag_split_size = 11000
sc._use_direct_message_routes = True
assert len(swap_clients[0].active_networks) == 1
assert swap_clients[0].active_networks[0]["type"] == "simplex"
num_direct_messages_received_before = [0] * 3
num_group_messages_received_before = [0] * 3
for i in range(3):
num_direct_messages_received_before[i] = swap_clients[
i
].num_direct_simplex_messages_received
num_group_messages_received_before[i] = swap_clients[
i
].num_group_simplex_messages_received
coin_from = self.coin_to
coin_to = Coins.BTC
ci_from = swap_clients[1].ci(coin_from)
ci_to = swap_clients[0].ci(coin_to)
swap_value = ci_from.make_int(random.uniform(0.2, 20.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[1].postOffer(
coin_from, coin_to, swap_value, rate_swap, swap_value, SwapTypes.XMR_SWAP
)
wait_for_offer(test_delay_event, swap_clients[0], offer_id)
offer = swap_clients[0].getOffer(offer_id)
bid_id = swap_clients[0].postBid(offer_id, offer.amount_from)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.BID_RECEIVED,
wait_for=60,
)
swap_clients[1].acceptBid(bid_id)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
wait_for=320,
)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=320,
)
for i in range(3):
swap_clients[
i
].num_group_simplex_messages_received == num_group_messages_received_before[
i
] + 2
swap_clients[
2
].num_direct_simplex_messages_received == num_direct_messages_received_before[2]
def test_03_hltc(self):
logger.info("---------- Test secret hash swap")
swap_clients = self.swap_clients
for sc in swap_clients:
sc._dleag_split_size_init = 9000
sc._dleag_split_size = 11000
sc._use_direct_message_routes = False
assert len(swap_clients[0].active_networks) == 1
assert swap_clients[0].active_networks[0]["type"] == "simplex"
num_direct_messages_received_before = [0] * 3
num_group_messages_received_before = [0] * 3
for i in range(3):
num_direct_messages_received_before[i] = swap_clients[
i
].num_direct_simplex_messages_received
num_group_messages_received_before[i] = swap_clients[
i
].num_group_simplex_messages_received
coin_from = Coins.PART
coin_to = Coins.BTC
self.prepare_balance(coin_to, 200.0, 1801, 1800)
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[1].ci(coin_to)
swap_value = ci_from.make_int(random.uniform(0.2, 20.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[0].postOffer(
coin_from,
coin_to,
swap_value,
rate_swap,
swap_value,
SwapTypes.SELLER_FIRST,
)
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.BID_RECEIVED,
wait_for=90,
)
swap_clients[0].acceptBid(bid_id)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.SWAP_COMPLETED,
wait_for=320,
)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=320,
)
for i in range(3):
assert (
num_direct_messages_received_before[i]
== swap_clients[i].num_direct_simplex_messages_received
)
def test_03_direct_hltc(self):
logger.info("---------- Test secret hash swap with direct messages")
for i in range(3):
message_routes = read_json_api(
1800 + i, "messageroutes", {"action": "clear"}
)
assert len(message_routes) == 0
swap_clients = self.swap_clients
for sc in swap_clients:
sc._dleag_split_size_init = 9000
sc._dleag_split_size = 11000
sc._use_direct_message_routes = True
assert len(swap_clients[0].active_networks) == 1
assert swap_clients[0].active_networks[0]["type"] == "simplex"
num_direct_messages_received_before = [0] * 3
num_group_messages_received_before = [0] * 3
for i in range(3):
num_direct_messages_received_before[i] = swap_clients[
i
].num_direct_simplex_messages_received
num_group_messages_received_before[i] = swap_clients[
i
].num_group_simplex_messages_received
coin_from = Coins.PART
coin_to = Coins.BTC
self.prepare_balance(coin_to, 200.0, 1801, 1800)
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[1].ci(coin_to)
swap_value = ci_from.make_int(random.uniform(0.2, 20.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[0].postOffer(
coin_from,
coin_to,
swap_value,
rate_swap,
swap_value,
SwapTypes.SELLER_FIRST,
)
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.BID_RECEIVED,
wait_for=90,
)
swap_clients[0].acceptBid(bid_id)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.SWAP_COMPLETED,
wait_for=320,
)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=320,
)
message_routes = read_json_api(1800, "messageroutes")
assert len(message_routes) == 1
for i in range(3):
swap_clients[
i
].num_group_simplex_messages_received == num_group_messages_received_before[
i
] + 2
swap_clients[
2
].num_direct_simplex_messages_received == num_direct_messages_received_before[2]
def test_04_multiple(self):
logger.info("---------- Test multiple swaps with direct messages")
for i in range(3):
message_routes = read_json_api(
1800 + i, "messageroutes", {"action": "clear"}
)
assert len(message_routes) == 0
swap_clients = self.swap_clients
for sc in swap_clients:
sc._dleag_split_size_init = 9000
sc._dleag_split_size = 11000
sc._use_direct_message_routes = True
assert len(swap_clients[0].active_networks) == 1
assert swap_clients[0].active_networks[0]["type"] == "simplex"
num_direct_messages_received_before = [0] * 3
num_group_messages_received_before = [0] * 3
for i in range(3):
num_direct_messages_received_before[i] = swap_clients[
i
].num_direct_simplex_messages_received
num_group_messages_received_before[i] = swap_clients[
i
].num_group_simplex_messages_received
coin_from = Coins.BTC
coin_to = self.coin_to
ci_from = swap_clients[0].ci(coin_from)
ci_to = swap_clients[1].ci(coin_to)
swap_value = ci_from.make_int(random.uniform(0.2, 20.0), r=1)
rate_swap = ci_to.make_int(random.uniform(0.2, 20.0), r=1)
offer_id = swap_clients[0].postOffer(
coin_from, coin_to, swap_value, rate_swap, swap_value, SwapTypes.XMR_SWAP
)
swap_clients[1].active_networks[0]["ws_thread"].ignore_events = True
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
addr1_bids = swap_clients[1].getReceiveAddressForCoin(Coins.PART)
bid_ids = []
for i in range(2):
bid_ids.append(
swap_clients[1].postBid(
offer_id, offer.amount_from, addr_send_from=addr1_bids
)
)
swap_clients[1].active_networks[0]["ws_thread"].disable_debug_mode()
bid_ids.append(swap_clients[1].postBid(offer_id, offer.amount_from))
for i in range(len(bid_ids)):
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_ids[i],
BidStates.BID_RECEIVED,
wait_for=60,
)
swap_clients[0].acceptBid(bid_ids[i])
logger.info("Message routes with active bids shouldn't expire")
swap_clients[0].mock_time_offset = (
swap_clients[0]._expire_message_routes_after + 1
)
swap_clients[0].expireMessageRoutes()
swap_clients[0].mock_time_offset = 0
message_routes_0 = read_json_api(1800, "messageroutes")
assert len(message_routes_0) == 2
for i in range(len(bid_ids)):
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_ids[i],
BidStates.SWAP_COMPLETED,
wait_for=320,
)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_ids[i],
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=320,
)
for i in range(3):
swap_clients[
i
].num_group_simplex_messages_received == num_group_messages_received_before[
i
] + 2
swap_clients[
2
].num_direct_simplex_messages_received == num_direct_messages_received_before[2]
message_routes_0 = read_json_api(1800, "messageroutes")
assert len(message_routes_0) == 2
message_routes_1 = read_json_api(1801, "messageroutes")
assert len(message_routes_1) == 2
logger.info("Test closing routes")
read_json_api(1800, "messageroutes", {"action": "clear"})
def waitForNumMessageRoutes(
port: int = 1800, num_routes: int = 0, num_tries: int = 40
):
logger.info(
f"Waiting for {num_routes} message route{'s' if num_routes != 1 else ''}, port: {port}."
)
for i in range(num_tries):
test_delay_event.wait(1)
if test_delay_event.is_set():
raise ValueError("Test stopped.")
message_routes = read_json_api(port, "messageroutes")
if len(message_routes) == num_routes:
return True
raise ValueError("waitForNumMessageRoutes timed out.")
waitForNumMessageRoutes(1800, 0)
waitForNumMessageRoutes(1801, 0)

View File

@@ -87,6 +87,9 @@ TEST_COINS_LIST = os.getenv("TEST_COINS_LIST", "bitcoin,monero")
NUM_NODES = int(os.getenv("NUM_NODES", 3))
EXTRA_CONFIG_JSON = json.loads(os.getenv("EXTRA_CONFIG_JSON", "{}"))
SIMPLEX_SERVER_ADDRESS = os.getenv("SIMPLEX_SERVER_ADDRESS", "")
SIMPLEX_CLIENT_PATH = os.path.expanduser(os.getenv("SIMPLEX_CLIENT_PATH", ""))
logger = logging.getLogger()
logger.level = logging.DEBUG
if not len(logger.handlers):

View File

@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2021-2022 tecnovert
# Copyright (c) 2024 The Basicswap developers
# Copyright (c) 2024-2025 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
@@ -17,12 +17,9 @@ python tests/basicswap/test_xmr_bids_offline.py
"""
import sys
import json
import logging
import unittest
import multiprocessing
from urllib import parse
from urllib.request import urlopen
from tests.basicswap.util import (
read_json_api,
@@ -64,21 +61,11 @@ class Test(XmrTestBase):
"lockhrs": 24,
"automation_strat_id": 1,
}
rv = json.loads(
urlopen(
"http://127.0.0.1:12700/json/offers/new",
data=parse.urlencode(offer_data).encode(),
).read()
)
rv = read_json_api(12700, "offers/new", offer_data)
offer0_id = rv["offer_id"]
offer_data["amt_from"] = "2"
rv = json.loads(
urlopen(
"http://127.0.0.1:12700/json/offers/new",
data=parse.urlencode(offer_data).encode(),
).read()
)
rv = read_json_api(12700, "offers/new", offer_data)
offer1_id = rv["offer_id"]
summary = read_json_api(12700)
@@ -92,52 +79,26 @@ class Test(XmrTestBase):
c0.terminate()
c0.join()
offers = json.loads(
urlopen("http://127.0.0.1:12701/json/offers/{}".format(offer0_id)).read()
)
offers = read_json_api(12701, f"offers/{offer0_id}")
assert len(offers) == 1
offer0 = offers[0]
post_data = {"coin_from": "PART"}
test_post_offers = json.loads(
urlopen(
"http://127.0.0.1:12701/json/offers",
data=parse.urlencode(post_data).encode(),
).read()
)
test_post_offers = read_json_api(12701, "offers", post_data)
assert len(test_post_offers) == 2
post_data["coin_from"] = "2"
test_post_offers = json.loads(
urlopen(
"http://127.0.0.1:12701/json/offers",
data=parse.urlencode(post_data).encode(),
).read()
)
test_post_offers = read_json_api(12701, "offers", post_data)
assert len(test_post_offers) == 0
bid_data = {"offer_id": offer0_id, "amount_from": offer0["amount_from"]}
bid0_id = read_json_api(12701, "bids/new", bid_data)["bid_id"]
bid0_id = json.loads(
urlopen(
"http://127.0.0.1:12701/json/bids/new",
data=parse.urlencode(bid_data).encode(),
).read()
)["bid_id"]
offers = json.loads(
urlopen("http://127.0.0.1:12701/json/offers/{}".format(offer1_id)).read()
)
offers = read_json_api(12701, f"offers/{offer1_id}")
assert len(offers) == 1
offer1 = offers[0]
bid_data = {"offer_id": offer1_id, "amount_from": offer1["amount_from"]}
bid1_id = json.loads(
urlopen(
"http://127.0.0.1:12701/json/bids/new",
data=parse.urlencode(bid_data).encode(),
).read()
)["bid_id"]
bid1_id = read_json_api(12701, "bids/new", bid_data)["bid_id"]
logger.info("Delaying for 5 seconds.")
self.delay_event.wait(5)
@@ -149,26 +110,17 @@ class Test(XmrTestBase):
waitForServer(self.delay_event, 12700)
waitForNumBids(self.delay_event, 12700, 2)
waitForBidState(self.delay_event, 12700, bid0_id, "Received")
waitForBidState(self.delay_event, 12700, bid1_id, "Received")
waitForBidState(self.delay_event, 12700, bid0_id, ("Received", "Delaying"))
waitForBidState(self.delay_event, 12700, bid1_id, ("Received", "Delaying"))
# Manually accept on top of auto-accept for extra chaos
data = parse.urlencode({"accept": True}).encode()
try:
rv = json.loads(
urlopen(
"http://127.0.0.1:12700/json/bids/{}".format(bid0_id), data=data
).read()
)
rv = read_json_api(12700, f"bids/{bid0_id}", {"accept": True})
assert rv["bid_state"] == "Accepted"
except Exception as e:
print("Accept bid failed", str(e), rv)
try:
rv = json.loads(
urlopen(
"http://127.0.0.1:12700/json/bids/{}".format(bid1_id), data=data
).read()
)
rv = read_json_api(12700, f"bids/{bid1_id}", {"accept": True})
assert rv["bid_state"] == "Accepted"
except Exception as e:
print("Accept bid failed", str(e), rv)
@@ -179,8 +131,8 @@ class Test(XmrTestBase):
raise ValueError("Test stopped.")
self.delay_event.wait(4)
rv0 = read_json_api(12700, "bids/{}".format(bid0_id))
rv1 = read_json_api(12700, "bids/{}".format(bid1_id))
rv0 = read_json_api(12700, f"bids/{bid0_id}")
rv1 = read_json_api(12700, f"bids/{bid1_id}")
if rv0["bid_state"] == "Completed" and rv1["bid_state"] == "Completed":
break
assert rv0["bid_state"] == "Completed"