Persistent notifications.

This commit is contained in:
tecnovert
2022-10-13 22:21:43 +02:00
parent 7e4dd125e1
commit eb9eb49bd9
8 changed files with 145 additions and 71 deletions

View File

@@ -96,6 +96,7 @@ from .db import (
XmrSwap,
XmrSplitData,
Wallets,
Notification,
KnownIdentity,
AutomationLink,
AutomationStrategy,
@@ -235,6 +236,12 @@ class BasicSwap(BaseApp):
self._updating_wallets_info = {}
self._last_updated_wallets_info = 0
self._notifications_enabled = self.settings.get('notifications_enabled', True)
self._disabled_notification_types = self.settings.get('disabled_notification_types', [])
self._keep_notifications = self.settings.get('keep_notifications', 50)
self._show_notifications = self.settings.get('show_notifications', 10)
self._notifications_cache = {}
# TODO: Adjust ranges
self.min_delay_event = self.settings.get('min_delay_event', 10)
self.max_delay_event = self.settings.get('max_delay_event', 60)
@@ -304,6 +311,7 @@ class BasicSwap(BaseApp):
value=self._contract_count
))
session.commit()
session.close()
session.remove()
@@ -364,6 +372,19 @@ class BasicSwap(BaseApp):
close_all_sessions()
self.engine.dispose()
def openSession(self, session=None):
if session:
return session
self.mxDB.acquire()
return scoped_session(self.session_factory)
def closeSession(self, use_session, commit=True):
if commit:
use_session.commit()
use_session.close()
use_session.remove()
self.mxDB.release()
def setCoinConnectParams(self, coin):
# Set anything that does not require the daemon to be running
chain_client_settings = self.getChainClientSettings(coin)
@@ -831,13 +852,8 @@ class BasicSwap(BaseApp):
if ptx_state is not None and ptx_state != TxStates.TX_REFUNDED:
self.returnAddressToPool(bid.bid_id, TxTypes.PTX_REFUND)
use_session = None
try:
if session:
use_session = session
else:
self.mxDB.acquire()
use_session = scoped_session(self.session_factory)
use_session = self.openSession(session)
# Remove any delayed events
if self.debug:
@@ -864,10 +880,7 @@ class BasicSwap(BaseApp):
finally:
if session is None:
use_session.commit()
use_session.close()
use_session.remove()
self.mxDB.release()
self.closeSession(use_session)
def loadFromDB(self):
self.log.info('Loading data from db')
@@ -890,6 +903,7 @@ class BasicSwap(BaseApp):
self.log.error('Further error deactivating: %s', str(ex))
if self.debug:
self.log.error(traceback.format_exc())
self.buildNotificationsCache(session)
finally:
session.close()
session.remove()
@@ -957,25 +971,67 @@ class BasicSwap(BaseApp):
if coin_from == Coins.PIVX and swap_type == SwapTypes.XMR_SWAP:
raise ValueError('TODO: PIVX -> XMR')
def notify(self, event_type, event_data):
def notify(self, event_type, event_data, session=None):
show_event = event_type not in self._disabled_notification_types
if event_type == NT.OFFER_RECEIVED:
self.log.debug('Received new offer %s', event_data['offer_id'])
if self.ws_server:
if self.ws_server and show_event:
event_data['event'] = 'new_offer'
self.ws_server.send_message_to_all(json.dumps(event_data))
elif event_type == NT.BID_RECEIVED:
self.log.info('Received valid bid %s for %s offer %s', event_data['bid_id'], event_data['type'], event_data['offer_id'])
if self.ws_server:
if self.ws_server and show_event:
event_data['event'] = 'new_bid'
self.ws_server.send_message_to_all(json.dumps(event_data))
elif event_type == NT.BID_ACCEPTED:
self.log.info('Received valid bid accept for %s', event_data['bid_id'])
if self.ws_server:
if self.ws_server and show_event:
event_data['event'] = 'bid_accepted'
self.ws_server.send_message_to_all(json.dumps(event_data))
else:
self.log.warning(f'Unknown notification {event_type}')
try:
now = int(time.time())
use_session = self.openSession(session)
use_session.add(Notification(
active_ind=1,
created_at=now,
event_type=int(event_type),
event_data=bytes(json.dumps(event_data), 'UTF-8'),
))
use_session.execute(f'DELETE FROM notifications WHERE record_id NOT IN (SELECT record_id FROM notifications WHERE active_ind=1 ORDER BY created_at ASC LIMIT {self._keep_notifications})')
if show_event:
self._notifications_cache[now] = (event_type, event_data)
while len(self._notifications_cache) > self._show_notifications:
# dicts preserve insertion order in Python 3.7+
self._notifications_cache.pop(next(iter(self._notifications_cache)))
finally:
if session is None:
self.closeSession(use_session)
def buildNotificationsCache(self, session):
q = session.execute(f'SELECT created_at, event_type, event_data FROM notifications WHERE active_ind=1 ORDER BY created_at ASC LIMIT {self._show_notifications}')
for entry in q:
self._notifications_cache[entry[0]] = (entry[1], json.loads(entry[2].decode('UTF-8')))
def getNotifications(self):
rv = []
for k, v in self._notifications_cache.items():
rv.append((k, int(v[0]), json.dumps(v[1])))
return rv
def vacuumDB(self):
try:
session = self.openSession()
return session.execute('VACUUM')
finally:
self.closeSession(session)
def validateOfferAmounts(self, coin_from, coin_to, amount, rate, min_bid_amount):
ci_from = self.ci(coin_from)
ci_to = self.ci(coin_to)
@@ -1846,30 +1902,19 @@ class BasicSwap(BaseApp):
self.mxDB.release()
def getBid(self, bid_id, session=None):
use_session = None
try:
if session:
use_session = session
else:
self.mxDB.acquire()
use_session = scoped_session(self.session_factory)
use_session = self.openSession(session)
bid = use_session.query(Bid).filter_by(bid_id=bid_id).first()
if bid:
self.loadBidTxns(bid, use_session)
return bid
finally:
if session is None:
use_session.close()
use_session.remove()
self.mxDB.release()
self.closeSession(use_session, commit=False)
def getBidAndOffer(self, bid_id, session=None):
try:
if session:
use_session = session
else:
self.mxDB.acquire()
use_session = scoped_session(self.session_factory)
use_session = self.openSession(session)
bid = use_session.query(Bid).filter_by(bid_id=bid_id).first()
offer = None
if bid:
@@ -1878,9 +1923,7 @@ class BasicSwap(BaseApp):
return bid, offer
finally:
if session is None:
use_session.close()
use_session.remove()
self.mxDB.release()
self.closeSession(use_session, commit=False)
def getXmrBidAndOffer(self, bid_id, list_events=True):
self.mxDB.acquire()
@@ -3798,7 +3841,7 @@ class BasicSwap(BaseApp):
session.add(xmr_offer)
self.notify(NT.OFFER_RECEIVED, {'offer_id': offer_id.hex()})
self.notify(NT.OFFER_RECEIVED, {'offer_id': offer_id.hex()}, session)
else:
existing_offer.setState(OfferStates.OFFER_RECEIVED)
session.add(existing_offer)
@@ -3868,13 +3911,8 @@ class BasicSwap(BaseApp):
return bids, total_value
def shouldAutoAcceptBid(self, offer, bid, session=None):
use_session = None
try:
if session:
use_session = session
else:
self.mxDB.acquire()
use_session = scoped_session(self.session_factory)
use_session = self.openSession(session)
link = use_session.query(AutomationLink).filter_by(active_ind=1, linked_type=Concepts.OFFER, linked_id=offer.offer_id).first()
if not link:
@@ -3943,10 +3981,7 @@ class BasicSwap(BaseApp):
return False
finally:
if session is None:
use_session.commit()
use_session.close()
use_session.remove()
self.mxDB.release()
self.closeSession(use_session)
def processBid(self, msg):
self.log.debug('Processing bid msg %s', msg['msgid'])
@@ -4155,7 +4190,7 @@ class BasicSwap(BaseApp):
ensure(ci_to.verifyKey(xmr_swap.vkbvf), 'Invalid key, vkbvf')
ensure(ci_from.verifyPubkey(xmr_swap.pkaf), 'Invalid pubkey, pkaf')
self.notify(NT.BID_RECEIVED, {'type': 'xmr', 'bid_id': bid.bid_id.hex(), 'offer_id': bid.offer_id.hex()})
self.notify(NT.BID_RECEIVED, {'type': 'xmr', 'bid_id': bid.bid_id.hex(), 'offer_id': bid.offer_id.hex()}, session)
bid.setState(BidStates.BID_RECEIVED)
@@ -4217,7 +4252,7 @@ class BasicSwap(BaseApp):
bid.setState(BidStates.BID_ACCEPTED) # XMR
self.saveBidInSession(bid.bid_id, bid, session, xmr_swap)
self.notify(NT.BID_ACCEPTED, {'bid_id': bid.bid_id.hex()})
self.notify(NT.BID_ACCEPTED, {'bid_id': bid.bid_id.hex()}, session)
delay = random.randrange(self.min_delay_event, self.max_delay_event)
self.log.info('Responding to xmr bid accept %s in %d seconds', bid.bid_id.hex(), delay)
@@ -5734,13 +5769,8 @@ class BasicSwap(BaseApp):
def newSMSGAddress(self, use_type=AddressTypes.RECV_OFFER, addressnote=None, session=None):
now = int(time.time())
use_session = None
try:
if session:
use_session = session
else:
self.mxDB.acquire()
use_session = scoped_session(self.session_factory)
use_session = self.openSession(session)
v = use_session.query(DBKVString).filter_by(key='smsg_chain_id').first()
if not v:
@@ -5780,10 +5810,7 @@ class BasicSwap(BaseApp):
return new_addr, addr_info['pubkey']
finally:
if session is None:
use_session.commit()
use_session.close()
use_session.remove()
self.mxDB.release()
self.closeSession(use_session)
def addSMSGAddress(self, pubkey_hex, addressnote=None):
self.mxDB.acquire()