Load in-progress bids only when unlocked.

This commit is contained in:
tecnovert
2023-02-16 22:57:55 +02:00
parent 3234e3fba3
commit 2922b171a6
11 changed files with 276 additions and 89 deletions

View File

@@ -216,6 +216,7 @@ class WatchedTransaction():
class BasicSwap(BaseApp):
ws_server = None
_read_zmq_queue: bool = True
protocolInterfaces = {
SwapTypes.SELLER_FIRST: atomic_swap_1.AtomicSwapInterface(),
SwapTypes.XMR_SWAP: xmr_swap_1.XmrSwapInterface(),
@@ -696,7 +697,41 @@ class BasicSwap(BaseApp):
self._network = bsn.Network(self.settings['p2p_host'], self.settings['p2p_port'], network_key, self)
self._network.startNetwork()
self.initialise()
self.log.debug('network_key %s\nnetwork_pubkey %s\nnetwork_addr %s',
self.network_key, self.network_pubkey, self.network_addr)
ro = self.callrpc('smsglocalkeys')
found = False
for k in ro['smsg_keys']:
if k['address'] == self.network_addr:
found = True
break
if not found:
self.log.info('Importing network key to SMSG')
self.callrpc('smsgimportprivkey', [self.network_key, 'basicswap offers'])
ro = self.callrpc('smsglocalkeys', ['anon', '-', self.network_addr])
ensure(ro['result'] == 'Success.', 'smsglocalkeys failed')
# TODO: Ensure smsg is enabled for the active wallet.
# Initialise locked state
_, _ = self.getLockedState()
# Re-load in-progress bids
self.loadFromDB()
# Scan inbox
# TODO: Redundant? small window for zmq messages to go unnoticed during startup?
# options = {'encoding': 'hex'}
options = {'encoding': 'none'}
ro = self.callrpc('smsginbox', ['unread', '', options])
nm = 0
for msg in ro['messages']:
# TODO: Remove workaround for smsginbox bug
get_msg = self.callrpc('smsg', [msg['msgid'], {'encoding': 'hex', 'setread': True}])
self.processMsg(get_msg)
nm += 1
self.log.info('Scanned %d unread messages.', nm)
def stopDaemon(self, coin):
if coin == Coins.XMR:
@@ -757,6 +792,11 @@ class BasicSwap(BaseApp):
if synced < 1.0:
raise ValueError('{} chain is still syncing, currently at {}.'.format(self.coin_clients[c]['name'], synced))
def isSystemUnlocked(self):
# TODO - Check all active coins
ci = self.ci(Coins.PART)
return not ci.isWalletLocked()
def checkSystemStatus(self):
ci = self.ci(Coins.PART)
if ci.isWalletLocked():
@@ -801,6 +841,7 @@ class BasicSwap(BaseApp):
self._is_encrypted, self._is_locked = self.ci(Coins.PART).isWalletEncryptedLocked()
def unlockWallets(self, password, coin=None):
self._read_zmq_queue = False
for c in self.activeCoins():
if coin and c != coin:
continue
@@ -808,13 +849,20 @@ class BasicSwap(BaseApp):
if c == Coins.PART:
self._is_locked = False
self.loadFromDB()
self._read_zmq_queue = True
def lockWallets(self, coin=None):
self._read_zmq_queue = False
self.swaps_in_progress.clear()
for c in self.activeCoins():
if coin and c != coin:
continue
self.ci(c).lockWallet()
if c == Coins.PART:
self._is_locked = True
self._read_zmq_queue = True
def initialiseWallet(self, coin_type, raise_errors=False):
if coin_type == Coins.PART:
@@ -929,7 +977,7 @@ class BasicSwap(BaseApp):
with self.mxDB:
try:
session = scoped_session(self.session_factory)
session.execute('DELETE FROM kv_string WHERE key = "{}" '.format(str_key))
session.execute('DELETE FROM kv_string WHERE key = :key', {'key': str_key})
session.commit()
finally:
session.close()
@@ -1037,7 +1085,10 @@ class BasicSwap(BaseApp):
if session is None:
self.closeSession(use_session)
def loadFromDB(self):
def loadFromDB(self) -> None:
if self.isSystemUnlocked() is False:
self.log.info('Not loading from db. System is locked.')
return
self.log.info('Loading data from db')
self.mxDB.acquire()
self.swaps_in_progress.clear()
@@ -1061,39 +1112,6 @@ class BasicSwap(BaseApp):
session.remove()
self.mxDB.release()
def initialise(self):
self.log.debug('network_key %s\nnetwork_pubkey %s\nnetwork_addr %s',
self.network_key, self.network_pubkey, self.network_addr)
ro = self.callrpc('smsglocalkeys')
found = False
for k in ro['smsg_keys']:
if k['address'] == self.network_addr:
found = True
break
if not found:
self.log.info('Importing network key to SMSG')
self.callrpc('smsgimportprivkey', [self.network_key, 'basicswap offers'])
ro = self.callrpc('smsglocalkeys', ['anon', '-', self.network_addr])
ensure(ro['result'] == 'Success.', 'smsglocalkeys failed')
# TODO: Ensure smsg is enabled for the active wallet.
self.loadFromDB()
# Scan inbox
# TODO: Redundant? small window for zmq messages to go unnoticed during startup?
# options = {'encoding': 'hex'}
options = {'encoding': 'none'}
ro = self.callrpc('smsginbox', ['unread', '', options])
nm = 0
for msg in ro['messages']:
# TODO: Remove workaround for smsginbox bug
get_msg = self.callrpc('smsg', [msg['msgid'], {'encoding': 'hex', 'setread': True}])
self.processMsg(get_msg)
nm += 1
self.log.info('Scanned %d unread messages.', nm)
def getActiveBidMsgValidTime(self):
return self.SMSG_SECONDS_IN_HOUR * 48
@@ -1882,7 +1900,7 @@ class BasicSwap(BaseApp):
try:
self._contract_count += 1
session = scoped_session(self.session_factory)
session.execute('UPDATE kv_int SET value = {} WHERE KEY="contract_count"'.format(self._contract_count))
session.execute('UPDATE kv_int SET value = :value WHERE KEY="contract_count"', {'value': self._contract_count})
session.commit()
finally:
session.close()
@@ -3870,7 +3888,11 @@ class BasicSwap(BaseApp):
c['last_height_checked'] = last_height_checked
self.setIntKV('last_height_checked_' + chainparams[coin_type]['name'], last_height_checked)
def expireMessages(self):
def expireMessages(self) -> None:
if self._is_locked is True:
self.log.debug('Not expiring messages while system locked')
return
self.mxDB.acquire()
rpc_conn = None
try:
@@ -3947,9 +3969,9 @@ class BasicSwap(BaseApp):
self.logException(f'checkQueuedActions failed: {ex}')
if self.debug:
session.execute('UPDATE actions SET active_ind = 2 WHERE trigger_at <= {}'.format(now))
session.execute('UPDATE actions SET active_ind = 2 WHERE trigger_at <= :now', {'now': now})
else:
session.execute('DELETE FROM actions WHERE trigger_at <= {}'.format(now))
session.execute('DELETE FROM actions WHERE trigger_at <= :now', {'now': now})
session.commit()
except Exception as ex:
@@ -5014,7 +5036,7 @@ class BasicSwap(BaseApp):
if coin_to == Coins.XMR:
address_to = self.getCachedMainWalletAddress(ci_to)
elif coin_to == Coins.PART_BLIND:
elif coin_to in (Coins.PART_BLIND, Coins.PART_ANON):
address_to = self.getCachedStealthAddressForCoin(coin_to)
else:
address_to = self.getReceiveAddressFromPool(coin_to, bid_id, TxTypes.XMR_SWAP_B_LOCK_SPEND)
@@ -5323,6 +5345,9 @@ class BasicSwap(BaseApp):
rv = None
if msg_type == MessageTypes.OFFER:
self.processOffer(msg)
elif msg_type == MessageTypes.OFFER_REVOKE:
self.processOfferRevoke(msg)
# TODO: When changing from wallet keys (encrypted/locked) handle swap messages while locked
elif msg_type == MessageTypes.BID:
self.processBid(msg)
elif msg_type == MessageTypes.BID_ACCEPT:
@@ -5339,8 +5364,6 @@ class BasicSwap(BaseApp):
self.processXmrSplitMessage(msg)
elif msg_type == MessageTypes.XMR_BID_LOCK_RELEASE_LF:
self.processXmrLockReleaseMessage(msg)
if msg_type == MessageTypes.OFFER_REVOKE:
self.processOfferRevoke(msg)
except InactiveCoin as ex:
self.log.info('Ignoring message involving inactive coin {}, type {}'.format(Coins(ex.coinid).name, MessageTypes(msg_type).name))
@@ -5381,10 +5404,10 @@ class BasicSwap(BaseApp):
def update(self):
try:
# while True:
message = self.zmqSubscriber.recv(flags=zmq.NOBLOCK)
if message == b'smsg':
self.processZmqSmsg()
if self._read_zmq_queue:
message = self.zmqSubscriber.recv(flags=zmq.NOBLOCK)
if message == b'smsg':
self.processZmqSmsg()
except zmq.Again as ex:
pass
except Exception as ex:
@@ -6178,6 +6201,7 @@ class BasicSwap(BaseApp):
addr_info = self.callrpc('getaddressinfo', [new_addr])
self.callrpc('smsgaddlocaladdress', [new_addr]) # Enable receiving smsgs
self.callrpc('smsglocalkeys', ['anon', '-', new_addr])
use_session.add(SmsgAddress(addr=new_addr, use_type=use_type, active_ind=1, created_at=now, note=addressnote, pubkey=addr_info['pubkey']))
return new_addr, addr_info['pubkey']
@@ -6193,6 +6217,7 @@ class BasicSwap(BaseApp):
ci = self.ci(Coins.PART)
add_addr = ci.pubkey_to_address(bytes.fromhex(pubkey_hex))
self.callrpc('smsgaddaddress', [add_addr, pubkey_hex])
self.callrpc('smsglocalkeys', ['anon', '-', add_addr])
session.add(SmsgAddress(addr=add_addr, use_type=AddressTypes.SEND_OFFER, active_ind=1, created_at=now, note=addressnote, pubkey=pubkey_hex))
session.commit()
@@ -6209,7 +6234,7 @@ class BasicSwap(BaseApp):
mode = '-' if active_ind == 0 else '+'
self.callrpc('smsglocalkeys', ['recv', mode, address])
session.execute('UPDATE smsgaddresses SET active_ind = {}, note = "{}" WHERE addr = "{}"'.format(active_ind, addressnote, address))
session.execute('UPDATE smsgaddresses SET active_ind = :active_ind, note = :note WHERE addr = :addr', {'active_ind': active_ind, 'note': addressnote, 'addr': address})
session.commit()
finally:
session.close()