refactor: Rename EventQueue table to Action

This commit is contained in:
tecnovert
2022-06-06 23:03:31 +02:00
parent d47a69c7cb
commit d909115ea4
8 changed files with 172 additions and 86 deletions

View File

@@ -77,7 +77,7 @@ from .messages_pb2 import (
)
from .db import (
CURRENT_DB_VERSION,
TableTypes,
Concepts,
Base,
DBKVInt,
DBKVString,
@@ -87,7 +87,7 @@ from .db import (
PooledAddress,
SentOffer,
SmsgAddress,
EventQueue,
Action,
EventLog,
XmrOffer,
XmrSwap,
@@ -117,7 +117,7 @@ from .basicswap_util import (
BidStates,
TxStates,
TxTypes,
EventTypes,
ActionTypes,
EventLogTypes,
XmrSplitMsgTypes,
DebugTypes,
@@ -205,14 +205,14 @@ class BasicSwap(BaseApp):
self.check_progress_seconds = self.settings.get('check_progress_seconds', 60)
self.check_watched_seconds = self.settings.get('check_watched_seconds', 60)
self.check_expired_seconds = self.settings.get('check_expired_seconds', 60 * 5)
self.check_events_seconds = self.settings.get('check_events_seconds', 10)
self.check_actions_seconds = self.settings.get('check_actions_seconds', 10)
self.check_xmr_swaps_seconds = self.settings.get('check_xmr_swaps_seconds', 20)
self.startup_tries = self.settings.get('startup_tries', 21) # Seconds waited for will be (x(1 + x+1) / 2
self.debug_ui = self.settings.get('debug_ui', False)
self._last_checked_progress = 0
self._last_checked_watched = 0
self._last_checked_expired = 0
self._last_checked_events = 0
self._last_checked_actions = 0
self._last_checked_xmr_swaps = 0
self._possibly_revoked_offers = collections.deque([], maxlen=48) # TODO: improve
self._updating_wallets_info = {}
@@ -793,9 +793,9 @@ class BasicSwap(BaseApp):
# Remove any delayed events
if self.debug:
use_session.execute('UPDATE eventqueue SET active_ind = 2 WHERE linked_id = x\'{}\' '.format(bid.bid_id.hex()))
use_session.execute('UPDATE actions SET active_ind = 2 WHERE linked_id = x\'{}\' '.format(bid.bid_id.hex()))
else:
use_session.execute('DELETE FROM eventqueue WHERE linked_id = x\'{}\' '.format(bid.bid_id.hex()))
use_session.execute('DELETE FROM actions WHERE linked_id = x\'{}\' '.format(bid.bid_id.hex()))
# Unlock locked inputs (TODO)
if offer.swap_type == SwapTypes.XMR_SWAP:
@@ -1090,7 +1090,7 @@ class BasicSwap(BaseApp):
if automation_id != -1:
auto_link = AutomationLink(
active_ind=1,
linked_type=TableTypes.OFFER,
linked_type=Concepts.OFFER,
linked_id=offer_id,
strategy_id=automation_id,
created_at=offer_created_at,
@@ -1549,36 +1549,35 @@ class BasicSwap(BaseApp):
session.remove()
self.mxDB.release()
def createEventInSession(self, delay, event_type, linked_id, session):
self.log.debug('createEvent %d %s', event_type, linked_id.hex())
def createActionInSession(self, delay, action_type, linked_id, session):
self.log.debug('createAction %d %s', action_type, linked_id.hex())
now = int(time.time())
event = EventQueue(
action = Action(
active_ind=1,
created_at=now,
trigger_at=now + delay,
event_type=event_type,
action_type=action_type,
linked_id=linked_id)
session.add(event)
session.add(action)
def createEvent(self, delay, event_type, linked_id):
# self.log.debug('createEvent %d %s', event_type, linked_id.hex())
def createAction(self, delay, action_type, linked_id):
# self.log.debug('createAction %d %s', action_type, linked_id.hex())
self.mxDB.acquire()
try:
session = scoped_session(self.session_factory)
self.createEventInSession(delay, event_type, linked_id, session)
self.createActionInSession(delay, action_type, linked_id, session)
session.commit()
finally:
session.close()
session.remove()
self.mxDB.release()
def logBidEvent(self, bid_id, event_type, event_msg, session):
self.log.debug('logBidEvent %s %s', bid_id.hex(), event_type)
def logEvent(self, linked_type, linked_id, event_type, event_msg, session):
entry = EventLog(
active_ind=1,
created_at=int(time.time()),
linked_type=TableTypes.BID,
linked_id=bid_id,
linked_type=linked_type,
linked_id=linked_id,
event_type=int(event_type),
event_msg=event_msg)
@@ -1595,10 +1594,27 @@ class BasicSwap(BaseApp):
session.remove()
self.mxDB.release()
def logBidEvent(self, bid_id, event_type, event_msg, session):
self.log.debug('logBidEvent %s %s', bid_id.hex(), event_type)
self.logEvent(Concepts.BID, bid_id, event_type, event_msg, session)
def countBidEvents(self, bid, event_type, session):
q = session.execute('SELECT COUNT(*) FROM eventlog WHERE linked_type = {} AND linked_id = x\'{}\' AND event_type = {}'.format(int(TableTypes.BID), bid.bid_id.hex(), int(event_type))).first()
q = session.execute('SELECT COUNT(*) FROM eventlog WHERE linked_type = {} AND linked_id = x\'{}\' AND event_type = {}'.format(int(Concepts.BID), bid.bid_id.hex(), int(event_type))).first()
return q[0]
def getEvents(self, linked_type, linked_id):
events = []
self.mxDB.acquire()
try:
session = scoped_session(self.session_factory)
for entry in session.query(EventLog).filter(sa.and_(EventLog.linked_type == linked_type, EventLog.linked_id == linked_id)):
events.append(entry)
return events
finally:
session.close()
session.remove()
self.mxDB.release()
def postBid(self, offer_id, amount, addr_send_from=None, extra_options={}):
# Bid to send bid.amount * bid.rate of coin_to in exchange for bid.amount of coin_from
self.log.debug('postBid %s', offer_id.hex())
@@ -1838,13 +1854,13 @@ class BasicSwap(BaseApp):
def list_bid_events(self, bid_id, session):
query_str = 'SELECT created_at, event_type, event_msg FROM eventlog ' + \
'WHERE active_ind = 1 AND linked_type = {} AND linked_id = x\'{}\' '.format(TableTypes.BID, bid_id.hex())
'WHERE active_ind = 1 AND linked_type = {} AND linked_id = x\'{}\' '.format(Concepts.BID, bid_id.hex())
q = session.execute(query_str)
events = []
for row in q:
events.append({'at': row[0], 'desc': describeEventEntry(row[1], row[2])})
query_str = 'SELECT created_at, trigger_at FROM eventqueue ' + \
query_str = 'SELECT created_at, trigger_at FROM actions ' + \
'WHERE active_ind = 1 AND linked_id = x\'{}\' '.format(bid_id.hex())
q = session.execute(query_str)
for row in q:
@@ -1962,21 +1978,21 @@ class BasicSwap(BaseApp):
ensure(xmr_offer, 'XMR offer not found: {}.'.format(offer_id.hex()))
ensure(offer.expire_at > int(time.time()), 'Offer has expired')
valid_for_seconds = extra_options.get('valid_for_seconds', 60 * 10)
self.validateBidValidTime(offer.swap_type, offer.coin_from, offer.coin_to, valid_for_seconds)
coin_from = Coins(offer.coin_from)
coin_to = Coins(offer.coin_to)
ci_from = self.ci(coin_from)
ci_to = self.ci(coin_to)
valid_for_seconds = extra_options.get('valid_for_seconds', 60 * 10)
bid_rate = extra_options.get('bid_rate', offer.rate)
self.validateBidAmount(offer, amount, bid_rate)
amount_to = int((int(amount) * bid_rate) // ci_from.COIN())
if not (self.debug and extra_options.get('debug_skip_validation', False)):
self.validateBidValidTime(offer.swap_type, offer.coin_from, offer.coin_to, valid_for_seconds)
self.validateBidAmount(offer, amount, bid_rate)
self.checkSynced(coin_from, coin_to)
amount_to = int((int(amount) * bid_rate) // ci_from.COIN())
balance_to = ci_to.getSpendableBalance()
ensure(balance_to > amount_to, '{} spendable balance is too low: {}'.format(ci_to.coin_name(), ci_to.format_amount(balance_to)))
@@ -2954,7 +2970,7 @@ class BasicSwap(BaseApp):
if bid.was_sent:
delay = random.randrange(self.min_delay_event, self.max_delay_event)
self.log.info('Sending xmr swap chain B lock tx for bid %s in %d seconds', bid_id.hex(), delay)
self.createEventInSession(delay, EventTypes.SEND_XMR_SWAP_LOCK_TX_B, bid_id, session)
self.createActionInSession(delay, ActionTypes.SEND_XMR_SWAP_LOCK_TX_B, bid_id, session)
# bid.setState(BidStates.SWAP_DELAYING)
if bid_changed:
@@ -3001,7 +3017,7 @@ class BasicSwap(BaseApp):
if bid.was_received:
delay = random.randrange(self.min_delay_event, self.max_delay_event)
self.log.info('Releasing xmr script coin lock tx for bid %s in %d seconds', bid_id.hex(), delay)
self.createEventInSession(delay, EventTypes.SEND_XMR_LOCK_RELEASE, bid_id, session)
self.createActionInSession(delay, ActionTypes.SEND_XMR_LOCK_RELEASE, bid_id, session)
if bid_changed:
self.saveBidInSession(bid_id, bid, session, xmr_swap)
@@ -3018,11 +3034,11 @@ class BasicSwap(BaseApp):
except Exception as e:
self.log.debug('getrawtransaction lock spend tx failed: %s', str(e))
elif state == BidStates.XMR_SWAP_SCRIPT_TX_REDEEMED:
if bid.was_received and self.countQueuedEvents(session, bid_id, EventTypes.REDEEM_XMR_SWAP_LOCK_TX_B) < 1:
if bid.was_received and self.countQueuedActions(session, bid_id, ActionTypes.REDEEM_XMR_SWAP_LOCK_TX_B) < 1:
bid.setState(BidStates.SWAP_DELAYING)
delay = random.randrange(self.min_delay_event, self.max_delay_event)
self.log.info('Redeeming coin b lock tx for bid %s in %d seconds', bid_id.hex(), delay)
self.createEventInSession(delay, EventTypes.REDEEM_XMR_SWAP_LOCK_TX_B, bid_id, session)
self.createActionInSession(delay, ActionTypes.REDEEM_XMR_SWAP_LOCK_TX_B, bid_id, session)
self.saveBidInSession(bid_id, bid, session, xmr_swap)
session.commit()
elif state == BidStates.XMR_SWAP_NOSCRIPT_TX_REDEEMED:
@@ -3411,7 +3427,7 @@ class BasicSwap(BaseApp):
if bid.xmr_b_lock_tx is not None:
delay = random.randrange(self.min_delay_event, self.max_delay_event)
self.log.info('Recovering xmr swap chain B lock tx for bid %s in %d seconds', bid_id.hex(), delay)
self.createEventInSession(delay, EventTypes.RECOVER_XMR_SWAP_LOCK_TX_B, bid_id, session)
self.createActionInSession(delay, ActionTypes.RECOVER_XMR_SWAP_LOCK_TX_B, bid_id, session)
else:
bid.setState(BidStates.XMR_SWAP_FAILED_REFUNDED)
@@ -3524,49 +3540,49 @@ class BasicSwap(BaseApp):
finally:
self.mxDB.release()
def countQueuedEvents(self, session, bid_id, event_type):
q = session.query(EventQueue).filter(sa.and_(EventQueue.active_ind == 1, EventQueue.linked_id == bid_id, EventQueue.event_type == event_type))
def countQueuedActions(self, session, bid_id, action_type):
q = session.query(Action).filter(sa.and_(Action.active_ind == 1, Action.linked_id == bid_id, Action.action_type == action_type))
return q.count()
def checkEvents(self):
def checkQueuedActions(self):
self.mxDB.acquire()
now = int(time.time())
session = None
try:
session = scoped_session(self.session_factory)
q = session.query(EventQueue).filter(sa.and_(EventQueue.active_ind == 1, EventQueue.trigger_at <= now))
q = session.query(Action).filter(sa.and_(Action.active_ind == 1, Action.trigger_at <= now))
for row in q:
try:
if row.event_type == EventTypes.ACCEPT_BID:
if row.action_type == ActionTypes.ACCEPT_BID:
self.acceptBid(row.linked_id)
elif row.event_type == EventTypes.ACCEPT_XMR_BID:
elif row.action_type == ActionTypes.ACCEPT_XMR_BID:
self.acceptXmrBid(row.linked_id)
elif row.event_type == EventTypes.SIGN_XMR_SWAP_LOCK_TX_A:
elif row.action_type == ActionTypes.SIGN_XMR_SWAP_LOCK_TX_A:
self.sendXmrBidTxnSigsFtoL(row.linked_id, session)
elif row.event_type == EventTypes.SEND_XMR_SWAP_LOCK_TX_A:
elif row.action_type == ActionTypes.SEND_XMR_SWAP_LOCK_TX_A:
self.sendXmrBidCoinALockTx(row.linked_id, session)
elif row.event_type == EventTypes.SEND_XMR_SWAP_LOCK_TX_B:
elif row.action_type == ActionTypes.SEND_XMR_SWAP_LOCK_TX_B:
self.sendXmrBidCoinBLockTx(row.linked_id, session)
elif row.event_type == EventTypes.SEND_XMR_LOCK_RELEASE:
elif row.action_type == ActionTypes.SEND_XMR_LOCK_RELEASE:
self.sendXmrBidLockRelease(row.linked_id, session)
elif row.event_type == EventTypes.REDEEM_XMR_SWAP_LOCK_TX_A:
elif row.action_type == ActionTypes.REDEEM_XMR_SWAP_LOCK_TX_A:
self.redeemXmrBidCoinALockTx(row.linked_id, session)
elif row.event_type == EventTypes.REDEEM_XMR_SWAP_LOCK_TX_B:
elif row.action_type == ActionTypes.REDEEM_XMR_SWAP_LOCK_TX_B:
self.redeemXmrBidCoinBLockTx(row.linked_id, session)
elif row.event_type == EventTypes.RECOVER_XMR_SWAP_LOCK_TX_B:
elif row.action_type == ActionTypes.RECOVER_XMR_SWAP_LOCK_TX_B:
self.recoverXmrBidCoinBLockTx(row.linked_id, session)
else:
self.log.warning('Unknown event type: %d', row.event_type)
except Exception as ex:
if self.debug:
self.log.error(traceback.format_exc())
self.log.error('checkEvents failed: {}'.format(str(ex)))
self.log.error('checkQueuedActions failed: {}'.format(str(ex)))
if self.debug:
session.execute('UPDATE eventqueue SET active_ind = 2 WHERE trigger_at <= {}'.format(now))
session.execute('UPDATE actions SET active_ind = 2 WHERE trigger_at <= {}'.format(now))
else:
session.execute('DELETE FROM eventqueue WHERE trigger_at <= {}'.format(now))
session.execute('DELETE FROM actions WHERE trigger_at <= {}'.format(now))
session.commit()
finally:
@@ -3778,7 +3794,7 @@ class BasicSwap(BaseApp):
self.mxDB.acquire()
use_session = scoped_session(self.session_factory)
link = use_session.query(AutomationLink).filter_by(active_ind=1, linked_type=TableTypes.OFFER, linked_id=offer.offer_id).first()
link = use_session.query(AutomationLink).filter_by(active_ind=1, linked_type=Concepts.OFFER, linked_id=offer.offer_id).first()
if not link:
return False
@@ -3787,11 +3803,20 @@ class BasicSwap(BaseApp):
self.log.debug('Evaluating against strategy {}'.format(strategy.record_id))
if opts.get('full_amount_only', False) is True:
if not offer.amount_negotiable:
if bid.amount != offer.amount_from:
self.log.info('Not auto accepting bid %s, want exact amount match', bid.bid_id.hex())
return False
if bid.amount < offer.min_bid_amount:
self.log.info('Not auto accepting bid %s, bid amount below minimum', bid.bid_id.hex())
return False
if opts.get('exact_rate_only', False) is True:
if bid.rate != offer.rate:
self.log.info('Not auto accepting bid %s, want exact rate match', bid.bid_id.hex())
return False
max_bids = opts.get('max_bids', 1)
# Auto accept bid if set and no other non-abandoned bid for this order exists
if self.countAcceptedBids(offer.offer_id) >= max_bids:
@@ -3909,7 +3934,7 @@ class BasicSwap(BaseApp):
if self.shouldAutoAcceptBid(offer, bid):
delay = random.randrange(self.min_delay_event, self.max_delay_event)
self.log.info('Auto accepting bid %s in %d seconds', bid_id.hex(), delay)
self.createEvent(delay, EventTypes.ACCEPT_BID, bid_id)
self.createAction(delay, ActionTypes.ACCEPT_BID, bid_id)
def processBidAccept(self, msg):
self.log.debug('Processing bid accepted msg %s', msg['msgid'])
@@ -4029,7 +4054,7 @@ class BasicSwap(BaseApp):
if self.shouldAutoAcceptBid(offer, bid, session):
delay = random.randrange(self.min_delay_event, self.max_delay_event)
self.log.info('Auto accepting xmr bid %s in %d seconds', bid.bid_id.hex(), delay)
self.createEventInSession(delay, EventTypes.ACCEPT_XMR_BID, bid.bid_id, session)
self.createActionInSession(delay, ActionTypes.ACCEPT_XMR_BID, bid.bid_id, session)
bid.setState(BidStates.SWAP_DELAYING)
self.saveBidInSession(bid.bid_id, bid, session, xmr_swap)
@@ -4087,7 +4112,7 @@ class BasicSwap(BaseApp):
delay = random.randrange(self.min_delay_event, self.max_delay_event)
self.log.info('Responding to xmr bid accept %s in %d seconds', bid.bid_id.hex(), delay)
self.createEventInSession(delay, EventTypes.SIGN_XMR_SWAP_LOCK_TX_A, bid.bid_id, session)
self.createActionInSession(delay, ActionTypes.SIGN_XMR_SWAP_LOCK_TX_A, bid.bid_id, session)
def processXmrBid(self, msg):
# MSG1L
@@ -4437,7 +4462,7 @@ class BasicSwap(BaseApp):
if num_retries < 5 and (ci_to.is_transient_error(ex) or self.is_transient_error(ex)):
delay = random.randrange(self.min_delay_retry, self.max_delay_retry)
self.log.info('Retrying sending xmr swap chain B lock tx for bid %s in %d seconds', bid_id.hex(), delay)
self.createEventInSession(delay, EventTypes.SEND_XMR_SWAP_LOCK_TX_B, bid_id, session)
self.createActionInSession(delay, ActionTypes.SEND_XMR_SWAP_LOCK_TX_B, bid_id, session)
else:
self.setBidError(bid_id, bid, 'publishBLockTx failed: ' + str(ex), save_bid=False)
self.saveBidInSession(bid_id, bid, session, xmr_swap, save_in_progress=offer)
@@ -4585,7 +4610,7 @@ class BasicSwap(BaseApp):
if num_retries < 100 and (ci_to.is_transient_error(ex) or self.is_transient_error(ex)):
delay = random.randrange(self.min_delay_retry, self.max_delay_retry)
self.log.info('Retrying sending xmr swap chain B spend tx for bid %s in %d seconds', bid_id.hex(), delay)
self.createEventInSession(delay, EventTypes.REDEEM_XMR_SWAP_LOCK_TX_B, bid_id, session)
self.createActionInSession(delay, ActionTypes.REDEEM_XMR_SWAP_LOCK_TX_B, bid_id, session)
else:
self.setBidError(bid_id, bid, 'spendBLockTx failed: ' + str(ex), save_bid=False)
self.saveBidInSession(bid_id, bid, session, xmr_swap, save_in_progress=offer)
@@ -4644,7 +4669,7 @@ class BasicSwap(BaseApp):
if num_retries < 100 and (ci_to.is_transient_error(ex) or self.is_transient_error(ex)):
delay = random.randrange(self.min_delay_retry, self.max_delay_retry)
self.log.info('Retrying sending xmr swap chain B refund tx for bid %s in %d seconds', bid_id.hex(), delay)
self.createEventInSession(delay, EventTypes.RECOVER_XMR_SWAP_LOCK_TX_B, bid_id, session)
self.createActionInSession(delay, ActionTypes.RECOVER_XMR_SWAP_LOCK_TX_B, bid_id, session)
else:
self.setBidError(bid_id, bid, 'spendBLockTx for refund failed: ' + str(ex), save_bid=False)
self.saveBidInSession(bid_id, bid, session, xmr_swap, save_in_progress=offer)
@@ -4710,7 +4735,7 @@ class BasicSwap(BaseApp):
delay = random.randrange(self.min_delay_event, self.max_delay_event)
self.log.info('Sending coin A lock tx for xmr bid %s in %d seconds', bid_id.hex(), delay)
self.createEvent(delay, EventTypes.SEND_XMR_SWAP_LOCK_TX_A, bid_id)
self.createAction(delay, ActionTypes.SEND_XMR_SWAP_LOCK_TX_A, bid_id)
bid.setState(BidStates.SWAP_DELAYING)
self.saveBid(bid_id, bid, xmr_swap=xmr_swap)
@@ -4829,7 +4854,7 @@ class BasicSwap(BaseApp):
delay = random.randrange(self.min_delay_event, self.max_delay_event)
self.log.info('Redeeming coin A lock tx for xmr bid %s in %d seconds', bid_id.hex(), delay)
self.createEvent(delay, EventTypes.REDEEM_XMR_SWAP_LOCK_TX_A, bid_id)
self.createAction(delay, ActionTypes.REDEEM_XMR_SWAP_LOCK_TX_A, bid_id)
bid.setState(BidStates.XMR_SWAP_LOCK_RELEASED)
self.saveBid(bid_id, bid, xmr_swap=xmr_swap)
@@ -4866,6 +4891,12 @@ class BasicSwap(BaseApp):
self.log.error('processMsg %s', str(ex))
if self.debug:
self.log.error(traceback.format_exc())
self.logEvent(Concepts.NETWORK_MESSAGE,
bytes.fromhex(msg['msgid']),
EventLogTypes.ERROR,
str(ex),
None)
finally:
self.mxDB.release()
@@ -4940,9 +4971,9 @@ class BasicSwap(BaseApp):
self.expireMessages()
self._last_checked_expired = now
if now - self._last_checked_events >= self.check_events_seconds:
self.checkEvents()
self._last_checked_events = now
if now - self._last_checked_actions >= self.check_actions_seconds:
self.checkQueuedActions()
self._last_checked_actions = now
if now - self._last_checked_xmr_swaps >= self.check_xmr_swaps_seconds:
self.checkXmrSwaps()