scripts: remove default values occluding error

fix test
This commit is contained in:
tecnovert
2025-10-02 15:54:58 +02:00
parent 390fb71aa7
commit 292a3713c0
3 changed files with 49 additions and 80 deletions

View File

@@ -11968,9 +11968,10 @@ class BasicSwap(BaseApp, BSXNetwork, UIApp):
strategy = self.queryOne(
AutomationStrategy, cursor, {"record_id": strategy_id}
)
if strategy is None:
raise ValueError("AutomationStrategy not found.")
if "data" in data:
strategy.data = json.dumps(data["data"]).encode("UTF-8")
self.log.debug("data {}".format(data["data"]))
if "note" in data:
strategy.note = data["note"]
if "label" in data:

View File

@@ -69,7 +69,7 @@ Create offers
"""
__version__ = "0.3"
__version__ = "0.4"
import argparse
import json
@@ -90,6 +90,7 @@ delay_event = threading.Event()
shutdown_in_progress = False
coins_map = {}
read_json_api = None
read_json_api_wallet = None
DEFAULT_CONFIG_FILE: str = "createoffers.json"
DEFAULT_STATE_FILE: str = "createoffers_state.json"
@@ -206,42 +207,6 @@ def make_json_api_func(host: str, port: int, auth_string: str = None):
return api_func
def read_json_api_wallet(path):
"""Read wallet data from API with error handling"""
try:
wallet_data = read_json_api(path)
# Check if wallet_data is a valid dictionary response
if not isinstance(wallet_data, dict):
# Return safe defaults if response is not a dictionary (e.g., error string)
return {
"balance": "0",
"unconfirmed": "0",
"anon_balance": "0",
"blind_balance": "0",
}
default_wallet = {
"balance": "0",
"unconfirmed": "0",
"anon_balance": "0",
"blind_balance": "0",
}
for key, default_value in default_wallet.items():
if key not in wallet_data:
wallet_data[key] = default_value
return wallet_data
except Exception:
return {
"balance": "0",
"unconfirmed": "0",
"anon_balance": "0",
"blind_balance": "0",
}
def signal_handler(sig, _) -> None:
global shutdown_in_progress
os.write(
@@ -706,30 +671,25 @@ def process_offers(args, config, script_state) -> None:
coin_from_data = coins_map[offer_template["coin_from"]]
coin_to_data = coins_map[offer_template["coin_to"]]
except KeyError as e:
print(f"Skipping {offer_template['name']} - coin not available")
if args.debug:
print(f"Coin not found in coins_map: {e}")
else:
print(f"Skipping {offer_template['name']} - coin not available")
print(f"Error: {e}")
continue
wallet_from = read_json_api_wallet(
"wallets/{}".format(coin_from_data["ticker"])
)
coin_ticker = coin_from_data["ticker"]
coin_from_data_name = offer_template["coin_from"]
try:
wallet_from = read_json_api_wallet(f"wallets/{coin_ticker}")
if coin_ticker == "PART":
if "variant" in coin_from_data:
coin_variant = coin_from_data["variant"]
if coin_variant == "Anon":
wallet_balance = float(wallet_from.get("anon_balance", 0))
wallet_balance = float(wallet_from["anon_balance"])
if args.debug:
print(f"Using anon balance: {wallet_balance}")
elif coin_variant == "Blind":
wallet_balance = float(wallet_from.get("blind_balance", 0))
wallet_balance = float(wallet_from["blind_balance"])
if args.debug:
print(f"Using blind balance: {wallet_balance}")
else:
@@ -737,19 +697,18 @@ def process_offers(args, config, script_state) -> None:
f"{coin_ticker} variant {coin_variant} not handled"
)
else:
wallet_balance = float(wallet_from.get("balance", 0))
wallet_balance = float(wallet_from["balance"])
if args.debug:
print(f"Using regular balance: {wallet_balance}")
else:
wallet_balance = float(wallet_from.get("balance", 0))
wallet_balance = float(wallet_from["balance"])
if args.debug:
print(f"Using balance for {coin_ticker}: {wallet_balance}")
except (KeyError, TypeError, ValueError) as e:
print(f"Skipping {offer_template['name']} - wallet balance unavailable")
if args.debug:
print(f"Error getting wallet balance for {coin_ticker}: {e}")
print(f"coin_ticker {coin_ticker}, error: {e}")
print(f"Wallet data: {wallet_from}")
else:
print(f"Skipping {offer_template['name']} - wallet balance unavailable")
continue
for offer in sent_offers:
@@ -1446,8 +1405,8 @@ def process_bids(args, config, script_state) -> None:
"wallets/{}".format(coin_from_data["ticker"])
)
wallet_balance = float(wallet_from.get("balance", 0)) + float(
wallet_from.get("unconfirmed", 0)
wallet_balance = float(wallet_from["balance"]) + float(
wallet_from["unconfirmed"]
)
# Get minimum amount from the offer
@@ -1599,7 +1558,7 @@ def process_bids(args, config, script_state) -> None:
print(
f"Not bidding on offer {offer_id}, offers_to_bid_on is known_only but no successful swaps with this identity."
)
continue
continue
successful_sent_bids = id_offer_from["num_sent_bids_successful"]
failed_sent_bids = id_offer_from["num_sent_bids_failed"]
@@ -1675,11 +1634,12 @@ def process_bids(args, config, script_state) -> None:
print(f"Reduced bid amount to {bid_amount}")
swap_amount_to = adjusted_bid_amount * offer_rate
if args.debug:
print(
f"Bid amount would exceed minimum coin to wallet total for offer {offer_id}"
)
continue
if total_balance_to - swap_amount_to < min_coin_to_balance:
if args.debug:
print(
f"Bid amount would exceed minimum coin to wallet total for offer {offer_id}"
)
continue
except (KeyError, TypeError, ValueError) as e:
if args.debug:
print(

View File

@@ -249,15 +249,26 @@ class Test(unittest.TestCase):
"--debug",
]
# Set defaults
post_json = {
"set_data": json.dumps({"exact_rate_only": True, "max_concurrent_bids": 5}),
}
for i in range(3):
waitForServer(cls.delay_event, UI_PORT + i)
for j in range(1, 3):
json_rv = read_json_api(
UI_PORT + i, f"automationstrategies/{j}", post_json
)
assert json_rv["data"]["max_concurrent_bids"] == 5
@classmethod
def tearDownClass(cls):
logging.info("Stopping test")
cls.thread_http.stop()
def test_enabled(self):
waitForServer(self.delay_event, UI_PORT + 0)
waitForServer(self.delay_event, UI_PORT + 1)
for i in range(2):
waitForServer(self.delay_event, UI_PORT + i)
# Test no 'Processing...' messages are shown without config
node0_test_config = {}
@@ -318,9 +329,8 @@ class Test(unittest.TestCase):
assert count_lines_with(rv_stdout, "Processing 0 bid templates") == 1
def test_offers(self):
waitForServer(self.delay_event, UI_PORT + 0)
waitForServer(self.delay_event, UI_PORT + 1)
for i in range(2):
waitForServer(self.delay_event, UI_PORT + i)
# Reset test
clear_offers(self.delay_event, 0)
@@ -410,6 +420,7 @@ class Test(unittest.TestCase):
"max_coin_from_balance": -1,
"min_coin_to_balance": -1,
"max_concurrent": 4,
"offers_to_bid_on": "all", # !?
},
{
"coin_from": "PART",
@@ -421,6 +432,7 @@ class Test(unittest.TestCase):
"min_swap_amount": 0.1,
"max_coin_from_balance": -1,
"min_coin_to_balance": -1,
"offers_to_bid_on": "all", # !?
},
],
}
@@ -553,7 +565,6 @@ class Test(unittest.TestCase):
result = subprocess.run(self.node1_args, stdout=subprocess.PIPE)
rv_stdout = result.stdout.decode().split("\n")
possible_bids = get_possible_bids(rv_stdout)
possible_bids = get_possible_bids(rv_stdout)
assert len(possible_bids) == 1
assert float(possible_bids[0]["amount_from"]) < 20.0
@@ -581,8 +592,8 @@ class Test(unittest.TestCase):
assert count_lines_with(rv_stdout, "too many failed bids") == 1
def test_offer_amount_step(self):
waitForServer(self.delay_event, UI_PORT + 0)
waitForServer(self.delay_event, UI_PORT + 1)
for i in range(2):
waitForServer(self.delay_event, UI_PORT + i)
# Reset test
clear_offers(self.delay_event, 0)
@@ -645,8 +656,8 @@ class Test(unittest.TestCase):
assert len(get_created_offers(rv_stdout)) == 1
def test_error_messages(self):
waitForServer(self.delay_event, UI_PORT + 0)
waitForServer(self.delay_event, UI_PORT + 1)
for i in range(2):
waitForServer(self.delay_event, UI_PORT + i)
# Reset test
clear_offers(self.delay_event, 0)
@@ -678,15 +689,14 @@ class Test(unittest.TestCase):
rv_stdout = result.stdout.decode().split("\n")
assert (
count_lines_with(
rv_stdout, "Error: Server failed to create offer: To amount above max"
rv_stdout, "Server failed to create offer: To amount above max"
)
== 1
)
def test_bid_tracking(self):
waitForServer(self.delay_event, UI_PORT + 0)
waitForServer(self.delay_event, UI_PORT + 1)
for i in range(2):
waitForServer(self.delay_event, UI_PORT + i)
# Reset test
clear_offers(self.delay_event, 0)
@@ -870,10 +880,8 @@ class Test(unittest.TestCase):
assert bid["addr_from"] == addr_bid_from
def test_auto_accept(self):
waitForServer(self.delay_event, UI_PORT + 0)
waitForServer(self.delay_event, UI_PORT + 1)
waitForServer(self.delay_event, UI_PORT + 2)
for i in range(3):
waitForServer(self.delay_event, UI_PORT + i)
logging.info("Reset test")
clear_offers(self.delay_event, 0)