From 3faf947588051f1f47fa36fa3f5daab5084b4a30 Mon Sep 17 00:00:00 2001 From: tecnovert Date: Sun, 25 May 2025 23:27:05 +0200 Subject: [PATCH] Sync DB schema to table definitions. --- basicswap/base.py | 1 + basicswap/basicswap.py | 5 +- basicswap/bin/run.py | 38 ++-- basicswap/db.py | 122 ++++++++---- basicswap/db_upgrades.py | 417 ++++++++++++--------------------------- 5 files changed, 240 insertions(+), 343 deletions(-) diff --git a/basicswap/base.py b/basicswap/base.py index 8b6cabf..a834aee 100644 --- a/basicswap/base.py +++ b/basicswap/base.py @@ -71,6 +71,7 @@ class BaseApp(DBMethods): self.default_socket = socket.socket self.default_socket_timeout = socket.getdefaulttimeout() self.default_socket_getaddrinfo = socket.getaddrinfo + self._force_db_upgrade = False def __del__(self): if self.fp: diff --git a/basicswap/basicswap.py b/basicswap/basicswap.py index a7f0eb5..6e3e7d2 100644 --- a/basicswap/basicswap.py +++ b/basicswap/basicswap.py @@ -498,6 +498,7 @@ class BasicSwap(BaseApp, UIApp): self.with_coins_override = extra_opts.get("with_coins", set()) self.without_coins_override = extra_opts.get("without_coins", set()) + self._force_db_upgrade = extra_opts.get("force_db_upgrade", False) for c in Coins: if c in chainparams: @@ -10198,7 +10199,9 @@ class BasicSwap(BaseApp, UIApp): def processContactConnected(self, event_data) -> None: connId = event_data["resp"]["contact"]["activeConn"]["connId"] localDisplayName = event_data["resp"]["contact"]["localDisplayName"] - self.log.debug(f"Processing Contact Connected event, ID: {connId}, contact name: {localDisplayName}.") + self.log.debug( + f"Processing Contact Connected event, ID: {connId}, contact name: {localDisplayName}." + ) try: cursor = self.openDB() diff --git a/basicswap/bin/run.py b/basicswap/bin/run.py index 2835ff0..bffb522 100755 --- a/basicswap/bin/run.py +++ b/basicswap/bin/run.py @@ -31,8 +31,6 @@ if not len(initial_logger.handlers): logger = initial_logger swap_client = None -with_coins = set() -without_coins = set() class Daemon: @@ -285,7 +283,11 @@ def getCoreBinArgs(coin_id: int, coin_settings, prepare=False, use_tor_proxy=Fal def runClient( - data_dir: str, chain: str, start_only_coins: bool, log_prefix: str = "BasicSwap" + data_dir: str, + chain: str, + start_only_coins: bool, + log_prefix: str = "BasicSwap", + extra_opts=dict(), ) -> int: global swap_client, logger daemons = [] @@ -311,13 +313,6 @@ def runClient( with open(settings_path) as fs: settings = json.load(fs) - extra_opts = dict() - if len(with_coins) > 0: - with_coins.add("particl") - extra_opts["with_coins"] = with_coins - if len(without_coins) > 0: - extra_opts["without_coins"] = without_coins - swap_client = BasicSwap( data_dir, settings, chain, log_name=log_prefix, extra_opts=extra_opts ) @@ -334,12 +329,16 @@ def runClient( # Settings may have been modified settings = swap_client.settings + try: # Try start daemons for c, v in settings["chainclients"].items(): if len(start_only_coins) > 0 and c not in start_only_coins: continue - if (len(with_coins) > 0 and c not in with_coins) or c in without_coins: + if ( + len(swap_client.with_coins_override) > 0 + and c not in swap_client.with_coins_override + ) or c in swap_client.without_coins_override: if v.get("manage_daemon", False) or v.get( "manage_wallet_daemon", False ): @@ -623,6 +622,9 @@ def printHelp(): "--startonlycoin Only start the provides coin daemon/s, use this if a chain requires extra processing." ) print("--logprefix Specify log prefix.") + print( + "--forcedbupgrade Recheck database against schema regardless of version." + ) def main(): @@ -630,6 +632,9 @@ def main(): chain = "mainnet" start_only_coins = set() log_prefix: str = "BasicSwap" + options = dict() + with_coins = set() + without_coins = set() for v in sys.argv[1:]: if len(v) < 2 or v[0] != "-": @@ -665,6 +670,9 @@ def main(): ensure_coin_valid(coin) without_coins.add(coin) continue + if name == "forcedbupgrade": + options["force_db_upgrade"] = True + continue if len(s) == 2: if name == "datadir": data_dir = os.path.expanduser(s[1]) @@ -693,8 +701,14 @@ def main(): if not os.path.exists(data_dir): os.makedirs(data_dir) + if len(with_coins) > 0: + with_coins.add("particl") + options["with_coins"] = with_coins + if len(without_coins) > 0: + options["without_coins"] = without_coins + logger.info(os.path.basename(sys.argv[0]) + ", version: " + __version__ + "\n\n") - fail_code = runClient(data_dir, chain, start_only_coins, log_prefix) + fail_code = runClient(data_dir, chain, start_only_coins, log_prefix, options) print("Done.") return fail_code diff --git a/basicswap/db.py b/basicswap/db.py index d334259..f421e40 100644 --- a/basicswap/db.py +++ b/basicswap/db.py @@ -694,10 +694,9 @@ class DirectMessageRouteLink(Table): created_at = Column("integer") -def create_db_(con, log) -> None: - c = con.cursor() - +def extract_schema() -> dict: g = globals().copy() + tables = {} for name, obj in g.items(): if not inspect.isclass(obj): continue @@ -707,15 +706,13 @@ def create_db_(con, log) -> None: continue table_name: str = obj.__tablename__ - query: str = f"CREATE TABLE {table_name} (" - + table = {} + columns = {} primary_key = None constraints = [] indices = [] - num_columns: int = 0 for m in inspect.getmembers(obj): m_name, m_obj = m - if hasattr(m_obj, "__sqlite3_primary_key__"): primary_key = m_obj continue @@ -726,46 +723,103 @@ def create_db_(con, log) -> None: indices.append(m_obj) continue if hasattr(m_obj, "__sqlite3_column__"): - if num_columns > 0: - query += "," - col_type: str = m_obj.column_type.upper() if col_type == "BOOL": col_type = "INTEGER" - query += f" {m_name} {col_type} " - - if m_obj.primary_key: - query += "PRIMARY KEY ASC " - if m_obj.unique: - query += "UNIQUE " - num_columns += 1 - + columns[m_name] = { + "type": col_type, + "primary_key": m_obj.primary_key, + "unique": m_obj.unique, + } + table["columns"] = columns if primary_key is not None: - query += f", PRIMARY KEY ({primary_key.column_1}" + table["primary_key"] = {"column_1": primary_key.column_1} if primary_key.column_2: - query += f", {primary_key.column_2}" + table["primary_key"]["column_2"] = primary_key.column_2 if primary_key.column_3: - query += f", {primary_key.column_3}" - query += ") " + table["primary_key"]["column_3"] = primary_key.column_3 for constraint in constraints: - query += f", UNIQUE ({constraint.column_1}" + if "constraints" not in table: + table["constraints"] = [] + table_constraint = {"column_1": constraint.column_1} if constraint.column_2: - query += f", {constraint.column_2}" + table_constraint["column_2"] = constraint.column_2 if constraint.column_3: - query += f", {constraint.column_3}" - query += ") " + table_constraint["column_3"] = constraint.column_3 + table["constraints"].append(table_constraint) + for i in indices: + if "indices" not in table: + table["indices"] = [] + table_index = {"index_name": i.name, "column_1": i.column_1} + if i.column_2 is not None: + table_index["column_2"] = i.column_2 + if i.column_3 is not None: + table_index["column_3"] = i.column_3 + table["indices"].append(table_index) + + tables[table_name] = table + return tables + + +def create_table(c, table_name, table) -> None: + query: str = f"CREATE TABLE {table_name} (" + + for i, (colname, column) in enumerate(table["columns"].items()): + col_type = column["type"] + query += ("," if i > 0 else "") + f" {colname} {col_type} " + if column["primary_key"]: + query += "PRIMARY KEY ASC " + if column["unique"]: + query += "UNIQUE " + + if "primary_key" in table: + column_1 = table["primary_key"]["column_1"] + column_2 = table["primary_key"].get("column_2", None) + column_3 = table["primary_key"].get("column_3", None) + query += f", PRIMARY KEY ({column_1}" + if column_2: + query += f", {column_2}" + if column_3: + query += f", {column_3}" + query += ") " + + constraints = table.get("constraints", []) + for constraint in constraints: + column_1 = constraint["column_1"] + column_2 = constraint.get("column_2", None) + column_3 = constraint.get("column_3", None) + query += f", UNIQUE ({column_1}" + if column_2: + query += f", {column_2}" + if column_3: + query += f", {column_3}" + query += ") " + + query += ")" + c.execute(query) + + indices = table.get("indices", []) + for index in indices: + index_name = index["index_name"] + column_1 = index["column_1"] + column_2 = index.get("column_2", None) + column_3 = index.get("column_3", None) + query: str = f"CREATE INDEX {index_name} ON {table_name} ({column_1}" + if column_2: + query += f", {column_2}" + if column_3: + query += f", {column_3}" query += ")" c.execute(query) - for i in indices: - query: str = f"CREATE INDEX {i.name} ON {table_name} ({i.column_1}" - if i.column_2 is not None: - query += f", {i.column_2}" - if i.column_3 is not None: - query += f", {i.column_3}" - query += ")" - c.execute(query) + + +def create_db_(con, log) -> None: + db_schema = extract_schema() + c = con.cursor() + for table_name, table in db_schema.items(): + create_table(c, table_name, table) def create_db(db_path: str, log) -> None: diff --git a/basicswap/db_upgrades.py b/basicswap/db_upgrades.py index 2ed8357..4385780 100644 --- a/basicswap/db_upgrades.py +++ b/basicswap/db_upgrades.py @@ -12,8 +12,10 @@ from .db import ( AutomationStrategy, BidState, Concepts, + create_table, CURRENT_DB_DATA_VERSION, CURRENT_DB_VERSION, + extract_schema, ) from .basicswap_util import ( @@ -49,10 +51,9 @@ def upgradeDatabaseData(self, data_version): return self.log.info( - "Upgrading database records from version %d to %d.", - data_version, - CURRENT_DB_DATA_VERSION, + f"Upgrading database records from version {data_version} to {CURRENT_DB_DATA_VERSION}." ) + cursor = self.openDB() try: now = int(time.time()) @@ -138,313 +139,137 @@ def upgradeDatabaseData(self, data_version): self.db_data_version = CURRENT_DB_DATA_VERSION self.setIntKV("db_data_version", self.db_data_version, cursor) self.commitDB() - self.log.info( - "Upgraded database records to version {}".format(self.db_data_version) - ) + self.log.info(f"Upgraded database records to version {self.db_data_version}") finally: self.closeDB(cursor, commit=False) def upgradeDatabase(self, db_version): - if db_version >= CURRENT_DB_VERSION: + if self._force_db_upgrade is False and db_version >= CURRENT_DB_VERSION: return self.log.info( f"Upgrading database from version {db_version} to {CURRENT_DB_VERSION}." ) - while True: - try: - cursor = self.openDB() + # db_version, tablename, oldcolumnname, newcolumnname + rename_columns = [ + (13, "actions", "event_id", "action_id"), + (13, "actions", "event_type", "action_type"), + (13, "actions", "event_data", "action_data"), + ( + 14, + "xmr_swaps", + "coin_a_lock_refund_spend_tx_msg_id", + "coin_a_lock_spend_tx_msg_id", + ), + ] - current_version = db_version - if current_version == 6: - cursor.execute("ALTER TABLE bids ADD COLUMN security_token BLOB") - cursor.execute("ALTER TABLE offers ADD COLUMN security_token BLOB") - db_version += 1 - elif current_version == 7: - cursor.execute("ALTER TABLE transactions ADD COLUMN block_hash BLOB") + expect_schema = extract_schema() + have_tables = {} + try: + cursor = self.openDB() + + for rename_column in rename_columns: + dbv, table_name, colname_from, colname_to = rename_column + if db_version < dbv: cursor.execute( - "ALTER TABLE transactions ADD COLUMN block_height INTEGER" - ) - cursor.execute("ALTER TABLE transactions ADD COLUMN block_time INTEGER") - db_version += 1 - elif current_version == 8: - cursor.execute( - """ - CREATE TABLE wallets ( - record_id INTEGER NOT NULL, - coin_id INTEGER, - wallet_name VARCHAR, - wallet_data VARCHAR, - balance_type INTEGER, - created_at BIGINT, - PRIMARY KEY (record_id))""" - ) - db_version += 1 - elif current_version == 9: - cursor.execute("ALTER TABLE wallets ADD COLUMN wallet_data VARCHAR") - db_version += 1 - elif current_version == 10: - cursor.execute( - "ALTER TABLE smsgaddresses ADD COLUMN active_ind INTEGER" - ) - cursor.execute( - "ALTER TABLE smsgaddresses ADD COLUMN created_at INTEGER" - ) - cursor.execute("ALTER TABLE smsgaddresses ADD COLUMN note VARCHAR") - cursor.execute("ALTER TABLE smsgaddresses ADD COLUMN pubkey VARCHAR") - cursor.execute( - "UPDATE smsgaddresses SET active_ind = 1, created_at = 1" + f"ALTER TABLE {table_name} RENAME COLUMN {colname_from} TO {colname_to}" ) - cursor.execute("ALTER TABLE offers ADD COLUMN addr_to VARCHAR") - cursor.execute(f'UPDATE offers SET addr_to = "{self.network_addr}"') - db_version += 1 - elif current_version == 11: - cursor.execute( - "ALTER TABLE bids ADD COLUMN chain_a_height_start INTEGER" - ) - cursor.execute( - "ALTER TABLE bids ADD COLUMN chain_b_height_start INTEGER" - ) - cursor.execute("ALTER TABLE bids ADD COLUMN protocol_version INTEGER") - cursor.execute("ALTER TABLE offers ADD COLUMN protocol_version INTEGER") - cursor.execute("ALTER TABLE transactions ADD COLUMN tx_data BLOB") - db_version += 1 - elif current_version == 12: - cursor.execute( - """ - CREATE TABLE knownidentities ( - record_id INTEGER NOT NULL, - address VARCHAR, - label VARCHAR, - publickey BLOB, - num_sent_bids_successful INTEGER, - num_recv_bids_successful INTEGER, - num_sent_bids_rejected INTEGER, - num_recv_bids_rejected INTEGER, - num_sent_bids_failed INTEGER, - num_recv_bids_failed INTEGER, - note VARCHAR, - updated_at BIGINT, - created_at BIGINT, - PRIMARY KEY (record_id))""" - ) - cursor.execute("ALTER TABLE bids ADD COLUMN reject_code INTEGER") - cursor.execute("ALTER TABLE bids ADD COLUMN rate INTEGER") - cursor.execute( - "ALTER TABLE offers ADD COLUMN amount_negotiable INTEGER" - ) - cursor.execute("ALTER TABLE offers ADD COLUMN rate_negotiable INTEGER") - db_version += 1 - elif current_version == 13: - db_version += 1 - cursor.execute( - """ - CREATE TABLE automationstrategies ( - record_id INTEGER NOT NULL, - active_ind INTEGER, - label VARCHAR, - type_ind INTEGER, - only_known_identities INTEGER, - num_concurrent INTEGER, - data BLOB, - - note VARCHAR, - created_at BIGINT, - PRIMARY KEY (record_id))""" - ) - - cursor.execute( - """ - CREATE TABLE automationlinks ( - record_id INTEGER NOT NULL, - active_ind INTEGER, - - linked_type INTEGER, - linked_id BLOB, - strategy_id INTEGER, - - data BLOB, - repeat_limit INTEGER, - repeat_count INTEGER, - - note VARCHAR, - created_at BIGINT, - PRIMARY KEY (record_id))""" - ) - - cursor.execute( - """ - CREATE TABLE history ( - record_id INTEGER NOT NULL, - concept_type INTEGER, - concept_id INTEGER, - changed_data BLOB, - - note VARCHAR, - created_at BIGINT, - PRIMARY KEY (record_id))""" - ) - - cursor.execute( - """ - CREATE TABLE bidstates ( - record_id INTEGER NOT NULL, - active_ind INTEGER, - state_id INTEGER, - label VARCHAR, - in_progress INTEGER, - - note VARCHAR, - created_at BIGINT, - PRIMARY KEY (record_id))""" - ) - - cursor.execute("ALTER TABLE wallets ADD COLUMN active_ind INTEGER") - cursor.execute( - "ALTER TABLE knownidentities ADD COLUMN active_ind INTEGER" - ) - cursor.execute("ALTER TABLE eventqueue RENAME TO actions") - cursor.execute( - "ALTER TABLE actions RENAME COLUMN event_id TO action_id" - ) - cursor.execute( - "ALTER TABLE actions RENAME COLUMN event_type TO action_type" - ) - cursor.execute( - "ALTER TABLE actions RENAME COLUMN event_data TO action_data" - ) - elif current_version == 14: - db_version += 1 - cursor.execute( - "ALTER TABLE xmr_swaps ADD COLUMN coin_a_lock_release_msg_id BLOB" - ) - cursor.execute( - "ALTER TABLE xmr_swaps RENAME COLUMN coin_a_lock_refund_spend_tx_msg_id TO coin_a_lock_spend_tx_msg_id" - ) - elif current_version == 15: - db_version += 1 - cursor.execute( - """ - CREATE TABLE notifications ( - record_id INTEGER NOT NULL, - active_ind INTEGER, - event_type INTEGER, - event_data BLOB, - created_at BIGINT, - PRIMARY KEY (record_id))""" - ) - elif current_version == 16: - db_version += 1 - cursor.execute( - """ - CREATE TABLE prefunded_transactions ( - record_id INTEGER NOT NULL, - active_ind INTEGER, - created_at BIGINT, - linked_type INTEGER, - linked_id BLOB, - tx_type INTEGER, - tx_data BLOB, - used_by BLOB, - PRIMARY KEY (record_id))""" - ) - elif current_version == 17: - db_version += 1 - cursor.execute( - "ALTER TABLE knownidentities ADD COLUMN automation_override INTEGER" - ) - cursor.execute( - "ALTER TABLE knownidentities ADD COLUMN visibility_override INTEGER" - ) - cursor.execute("ALTER TABLE knownidentities ADD COLUMN data BLOB") - cursor.execute("UPDATE knownidentities SET active_ind = 1") - elif current_version == 18: - db_version += 1 - cursor.execute("ALTER TABLE xmr_split_data ADD COLUMN addr_from STRING") - cursor.execute("ALTER TABLE xmr_split_data ADD COLUMN addr_to STRING") - elif current_version == 19: - db_version += 1 - cursor.execute("ALTER TABLE bidstates ADD COLUMN in_error INTEGER") - cursor.execute("ALTER TABLE bidstates ADD COLUMN swap_failed INTEGER") - cursor.execute("ALTER TABLE bidstates ADD COLUMN swap_ended INTEGER") - elif current_version == 20: - db_version += 1 - cursor.execute( - """ - CREATE TABLE message_links ( - record_id INTEGER NOT NULL, - active_ind INTEGER, - created_at BIGINT, - - linked_type INTEGER, - linked_id BLOB, - - msg_type INTEGER, - msg_sequence INTEGER, - msg_id BLOB, - PRIMARY KEY (record_id))""" - ) - cursor.execute("ALTER TABLE offers ADD COLUMN bid_reversed INTEGER") - elif current_version == 21: - db_version += 1 - cursor.execute("ALTER TABLE offers ADD COLUMN proof_utxos BLOB") - cursor.execute("ALTER TABLE bids ADD COLUMN proof_utxos BLOB") - elif current_version == 22: - db_version += 1 - cursor.execute("ALTER TABLE offers ADD COLUMN amount_to INTEGER") - elif current_version == 23: - db_version += 1 - cursor.execute( - """ - CREATE TABLE checkedblocks ( - record_id INTEGER NOT NULL, - created_at BIGINT, - coin_type INTEGER, - block_height INTEGER, - block_hash BLOB, - block_time INTEGER, - PRIMARY KEY (record_id))""" - ) - cursor.execute("ALTER TABLE bids ADD COLUMN pkhash_buyer_to BLOB") - elif current_version == 24: - db_version += 1 - cursor.execute("ALTER TABLE bidstates ADD COLUMN can_accept INTEGER") - elif current_version == 25: - db_version += 1 - cursor.execute( - """ - CREATE TABLE coinrates ( - record_id INTEGER NOT NULL, - currency_from INTEGER, - currency_to INTEGER, - rate VARCHAR, - source VARCHAR, - last_updated INTEGER, - PRIMARY KEY (record_id))""" - ) - elif current_version == 26: - db_version += 1 - cursor.execute("ALTER TABLE offers ADD COLUMN auto_accept_type INTEGER") - elif current_version == 27: - db_version += 1 - cursor.execute("ALTER TABLE offers ADD COLUMN pk_from BLOB") - cursor.execute("ALTER TABLE bids ADD COLUMN pk_bid_addr BLOB") - - if current_version != db_version: - self.db_version = db_version - self.setIntKV("db_version", db_version, cursor) - self.commitDB() - self.log.info("Upgraded database to version {}".format(self.db_version)) + query = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;" + tables = cursor.execute(query).fetchall() + for table in tables: + table_name = table[0] + if table_name in ("sqlite_sequence",): continue - except Exception as e: - self.log.error("Upgrade failed {}".format(e)) - self.rollbackDB() - finally: - self.closeDB(cursor, commit=False) - break - if db_version != CURRENT_DB_VERSION: - raise ValueError("Unable to upgrade database.") + have_table = {} + have_columns = {} + query = "SELECT * FROM PRAGMA_TABLE_INFO(:table_name) ORDER BY cid DESC;" + columns = cursor.execute(query, {"table_name": table_name}).fetchall() + for column in columns: + cid, name, data_type, notnull, default_value, primary_key = column + have_columns[name] = {"type": data_type, "primary_key": primary_key} + + have_table["columns"] = have_columns + + cursor.execute(f"PRAGMA INDEX_LIST('{table_name}');") + indices = cursor.fetchall() + for index in indices: + seq, index_name, unique, origin, partial = index + + if origin == "pk": # Created by a PRIMARY KEY constraint + continue + + cursor.execute(f"PRAGMA INDEX_INFO('{index_name}');") + index_info = cursor.fetchall() + + add_index = {"index_name": index_name} + for index_columns in index_info: + seqno, cid, name = index_columns + if origin == "u": # Created by a UNIQUE constraint + have_columns[name]["unique"] = 1 + else: + if "column_1" not in add_index: + add_index["column_1"] = name + elif "column_2" not in add_index: + add_index["column_2"] = name + elif "column_3" not in add_index: + add_index["column_3"] = name + else: + raise RuntimeError("Add more index columns.") + if origin == "c": + if "indices" not in table: + have_table["indices"] = [] + have_table["indices"].append(add_index) + + have_tables[table_name] = have_table + + for table_name, table in expect_schema.items(): + if table_name not in have_tables: + self.log.info(f"Creating table {table_name}.") + create_table(cursor, table_name, table) + continue + + have_table = have_tables[table_name] + have_columns = have_table["columns"] + for colname, column in table["columns"].items(): + if colname not in have_columns: + col_type = column["type"] + self.log.info(f"Adding column {colname} to table {table_name}.") + cursor.execute( + f"ALTER TABLE {table_name} ADD COLUMN {colname} {col_type}" + ) + indices = table.get("indices", []) + have_indices = have_table.get("indices", []) + for index in indices: + index_name = index["index_name"] + if not any( + have_idx.get("index_name") == index_name + for have_idx in have_indices + ): + self.log.info(f"Adding index {index_name} to table {table_name}.") + column_1 = index["column_1"] + column_2 = index.get("column_2", None) + column_3 = index.get("column_3", None) + query: str = ( + f"CREATE INDEX {index_name} ON {table_name} ({column_1}" + ) + if column_2: + query += f", {column_2}" + if column_3: + query += f", {column_3}" + query += ")" + cursor.execute(query) + + if CURRENT_DB_VERSION != db_version: + self.db_version = CURRENT_DB_VERSION + self.setIntKV("db_version", CURRENT_DB_VERSION, cursor) + self.log.info(f"Upgraded database to version {self.db_version}") + self.commitDB() + except Exception as e: + self.log.error(f"Upgrade failed {e}") + self.rollbackDB() + finally: + self.closeDB(cursor, commit=False)