Sync DB schema to table definitions.

This commit is contained in:
tecnovert
2025-05-25 23:27:05 +02:00
parent f3adc17bb8
commit 3faf947588
5 changed files with 240 additions and 343 deletions

View File

@@ -71,6 +71,7 @@ class BaseApp(DBMethods):
self.default_socket = socket.socket
self.default_socket_timeout = socket.getdefaulttimeout()
self.default_socket_getaddrinfo = socket.getaddrinfo
self._force_db_upgrade = False
def __del__(self):
if self.fp:

View File

@@ -498,6 +498,7 @@ class BasicSwap(BaseApp, UIApp):
self.with_coins_override = extra_opts.get("with_coins", set())
self.without_coins_override = extra_opts.get("without_coins", set())
self._force_db_upgrade = extra_opts.get("force_db_upgrade", False)
for c in Coins:
if c in chainparams:
@@ -10198,7 +10199,9 @@ class BasicSwap(BaseApp, UIApp):
def processContactConnected(self, event_data) -> None:
connId = event_data["resp"]["contact"]["activeConn"]["connId"]
localDisplayName = event_data["resp"]["contact"]["localDisplayName"]
self.log.debug(f"Processing Contact Connected event, ID: {connId}, contact name: {localDisplayName}.")
self.log.debug(
f"Processing Contact Connected event, ID: {connId}, contact name: {localDisplayName}."
)
try:
cursor = self.openDB()

View File

@@ -31,8 +31,6 @@ if not len(initial_logger.handlers):
logger = initial_logger
swap_client = None
with_coins = set()
without_coins = set()
class Daemon:
@@ -285,7 +283,11 @@ def getCoreBinArgs(coin_id: int, coin_settings, prepare=False, use_tor_proxy=Fal
def runClient(
data_dir: str, chain: str, start_only_coins: bool, log_prefix: str = "BasicSwap"
data_dir: str,
chain: str,
start_only_coins: bool,
log_prefix: str = "BasicSwap",
extra_opts=dict(),
) -> int:
global swap_client, logger
daemons = []
@@ -311,13 +313,6 @@ def runClient(
with open(settings_path) as fs:
settings = json.load(fs)
extra_opts = dict()
if len(with_coins) > 0:
with_coins.add("particl")
extra_opts["with_coins"] = with_coins
if len(without_coins) > 0:
extra_opts["without_coins"] = without_coins
swap_client = BasicSwap(
data_dir, settings, chain, log_name=log_prefix, extra_opts=extra_opts
)
@@ -334,12 +329,16 @@ def runClient(
# Settings may have been modified
settings = swap_client.settings
try:
# Try start daemons
for c, v in settings["chainclients"].items():
if len(start_only_coins) > 0 and c not in start_only_coins:
continue
if (len(with_coins) > 0 and c not in with_coins) or c in without_coins:
if (
len(swap_client.with_coins_override) > 0
and c not in swap_client.with_coins_override
) or c in swap_client.without_coins_override:
if v.get("manage_daemon", False) or v.get(
"manage_wallet_daemon", False
):
@@ -623,6 +622,9 @@ def printHelp():
"--startonlycoin Only start the provides coin daemon/s, use this if a chain requires extra processing."
)
print("--logprefix Specify log prefix.")
print(
"--forcedbupgrade Recheck database against schema regardless of version."
)
def main():
@@ -630,6 +632,9 @@ def main():
chain = "mainnet"
start_only_coins = set()
log_prefix: str = "BasicSwap"
options = dict()
with_coins = set()
without_coins = set()
for v in sys.argv[1:]:
if len(v) < 2 or v[0] != "-":
@@ -665,6 +670,9 @@ def main():
ensure_coin_valid(coin)
without_coins.add(coin)
continue
if name == "forcedbupgrade":
options["force_db_upgrade"] = True
continue
if len(s) == 2:
if name == "datadir":
data_dir = os.path.expanduser(s[1])
@@ -693,8 +701,14 @@ def main():
if not os.path.exists(data_dir):
os.makedirs(data_dir)
if len(with_coins) > 0:
with_coins.add("particl")
options["with_coins"] = with_coins
if len(without_coins) > 0:
options["without_coins"] = without_coins
logger.info(os.path.basename(sys.argv[0]) + ", version: " + __version__ + "\n\n")
fail_code = runClient(data_dir, chain, start_only_coins, log_prefix)
fail_code = runClient(data_dir, chain, start_only_coins, log_prefix, options)
print("Done.")
return fail_code

View File

@@ -694,10 +694,9 @@ class DirectMessageRouteLink(Table):
created_at = Column("integer")
def create_db_(con, log) -> None:
c = con.cursor()
def extract_schema() -> dict:
g = globals().copy()
tables = {}
for name, obj in g.items():
if not inspect.isclass(obj):
continue
@@ -707,15 +706,13 @@ def create_db_(con, log) -> None:
continue
table_name: str = obj.__tablename__
query: str = f"CREATE TABLE {table_name} ("
table = {}
columns = {}
primary_key = None
constraints = []
indices = []
num_columns: int = 0
for m in inspect.getmembers(obj):
m_name, m_obj = m
if hasattr(m_obj, "__sqlite3_primary_key__"):
primary_key = m_obj
continue
@@ -726,48 +723,105 @@ def create_db_(con, log) -> None:
indices.append(m_obj)
continue
if hasattr(m_obj, "__sqlite3_column__"):
if num_columns > 0:
query += ","
col_type: str = m_obj.column_type.upper()
if col_type == "BOOL":
col_type = "INTEGER"
query += f" {m_name} {col_type} "
if m_obj.primary_key:
query += "PRIMARY KEY ASC "
if m_obj.unique:
query += "UNIQUE "
num_columns += 1
columns[m_name] = {
"type": col_type,
"primary_key": m_obj.primary_key,
"unique": m_obj.unique,
}
table["columns"] = columns
if primary_key is not None:
query += f", PRIMARY KEY ({primary_key.column_1}"
table["primary_key"] = {"column_1": primary_key.column_1}
if primary_key.column_2:
query += f", {primary_key.column_2}"
table["primary_key"]["column_2"] = primary_key.column_2
if primary_key.column_3:
query += f", {primary_key.column_3}"
query += ") "
table["primary_key"]["column_3"] = primary_key.column_3
for constraint in constraints:
query += f", UNIQUE ({constraint.column_1}"
if "constraints" not in table:
table["constraints"] = []
table_constraint = {"column_1": constraint.column_1}
if constraint.column_2:
query += f", {constraint.column_2}"
table_constraint["column_2"] = constraint.column_2
if constraint.column_3:
query += f", {constraint.column_3}"
table_constraint["column_3"] = constraint.column_3
table["constraints"].append(table_constraint)
for i in indices:
if "indices" not in table:
table["indices"] = []
table_index = {"index_name": i.name, "column_1": i.column_1}
if i.column_2 is not None:
table_index["column_2"] = i.column_2
if i.column_3 is not None:
table_index["column_3"] = i.column_3
table["indices"].append(table_index)
tables[table_name] = table
return tables
def create_table(c, table_name, table) -> None:
query: str = f"CREATE TABLE {table_name} ("
for i, (colname, column) in enumerate(table["columns"].items()):
col_type = column["type"]
query += ("," if i > 0 else "") + f" {colname} {col_type} "
if column["primary_key"]:
query += "PRIMARY KEY ASC "
if column["unique"]:
query += "UNIQUE "
if "primary_key" in table:
column_1 = table["primary_key"]["column_1"]
column_2 = table["primary_key"].get("column_2", None)
column_3 = table["primary_key"].get("column_3", None)
query += f", PRIMARY KEY ({column_1}"
if column_2:
query += f", {column_2}"
if column_3:
query += f", {column_3}"
query += ") "
constraints = table.get("constraints", [])
for constraint in constraints:
column_1 = constraint["column_1"]
column_2 = constraint.get("column_2", None)
column_3 = constraint.get("column_3", None)
query += f", UNIQUE ({column_1}"
if column_2:
query += f", {column_2}"
if column_3:
query += f", {column_3}"
query += ") "
query += ")"
c.execute(query)
for i in indices:
query: str = f"CREATE INDEX {i.name} ON {table_name} ({i.column_1}"
if i.column_2 is not None:
query += f", {i.column_2}"
if i.column_3 is not None:
query += f", {i.column_3}"
indices = table.get("indices", [])
for index in indices:
index_name = index["index_name"]
column_1 = index["column_1"]
column_2 = index.get("column_2", None)
column_3 = index.get("column_3", None)
query: str = f"CREATE INDEX {index_name} ON {table_name} ({column_1}"
if column_2:
query += f", {column_2}"
if column_3:
query += f", {column_3}"
query += ")"
c.execute(query)
def create_db_(con, log) -> None:
db_schema = extract_schema()
c = con.cursor()
for table_name, table in db_schema.items():
create_table(c, table_name, table)
def create_db(db_path: str, log) -> None:
con = None
try:

View File

@@ -12,8 +12,10 @@ from .db import (
AutomationStrategy,
BidState,
Concepts,
create_table,
CURRENT_DB_DATA_VERSION,
CURRENT_DB_VERSION,
extract_schema,
)
from .basicswap_util import (
@@ -49,10 +51,9 @@ def upgradeDatabaseData(self, data_version):
return
self.log.info(
"Upgrading database records from version %d to %d.",
data_version,
CURRENT_DB_DATA_VERSION,
f"Upgrading database records from version {data_version} to {CURRENT_DB_DATA_VERSION}."
)
cursor = self.openDB()
try:
now = int(time.time())
@@ -138,313 +139,137 @@ def upgradeDatabaseData(self, data_version):
self.db_data_version = CURRENT_DB_DATA_VERSION
self.setIntKV("db_data_version", self.db_data_version, cursor)
self.commitDB()
self.log.info(
"Upgraded database records to version {}".format(self.db_data_version)
)
self.log.info(f"Upgraded database records to version {self.db_data_version}")
finally:
self.closeDB(cursor, commit=False)
def upgradeDatabase(self, db_version):
if db_version >= CURRENT_DB_VERSION:
if self._force_db_upgrade is False and db_version >= CURRENT_DB_VERSION:
return
self.log.info(
f"Upgrading database from version {db_version} to {CURRENT_DB_VERSION}."
)
while True:
# db_version, tablename, oldcolumnname, newcolumnname
rename_columns = [
(13, "actions", "event_id", "action_id"),
(13, "actions", "event_type", "action_type"),
(13, "actions", "event_data", "action_data"),
(
14,
"xmr_swaps",
"coin_a_lock_refund_spend_tx_msg_id",
"coin_a_lock_spend_tx_msg_id",
),
]
expect_schema = extract_schema()
have_tables = {}
try:
cursor = self.openDB()
current_version = db_version
if current_version == 6:
cursor.execute("ALTER TABLE bids ADD COLUMN security_token BLOB")
cursor.execute("ALTER TABLE offers ADD COLUMN security_token BLOB")
db_version += 1
elif current_version == 7:
cursor.execute("ALTER TABLE transactions ADD COLUMN block_hash BLOB")
for rename_column in rename_columns:
dbv, table_name, colname_from, colname_to = rename_column
if db_version < dbv:
cursor.execute(
"ALTER TABLE transactions ADD COLUMN block_height INTEGER"
)
cursor.execute("ALTER TABLE transactions ADD COLUMN block_time INTEGER")
db_version += 1
elif current_version == 8:
cursor.execute(
"""
CREATE TABLE wallets (
record_id INTEGER NOT NULL,
coin_id INTEGER,
wallet_name VARCHAR,
wallet_data VARCHAR,
balance_type INTEGER,
created_at BIGINT,
PRIMARY KEY (record_id))"""
)
db_version += 1
elif current_version == 9:
cursor.execute("ALTER TABLE wallets ADD COLUMN wallet_data VARCHAR")
db_version += 1
elif current_version == 10:
cursor.execute(
"ALTER TABLE smsgaddresses ADD COLUMN active_ind INTEGER"
)
cursor.execute(
"ALTER TABLE smsgaddresses ADD COLUMN created_at INTEGER"
)
cursor.execute("ALTER TABLE smsgaddresses ADD COLUMN note VARCHAR")
cursor.execute("ALTER TABLE smsgaddresses ADD COLUMN pubkey VARCHAR")
cursor.execute(
"UPDATE smsgaddresses SET active_ind = 1, created_at = 1"
f"ALTER TABLE {table_name} RENAME COLUMN {colname_from} TO {colname_to}"
)
cursor.execute("ALTER TABLE offers ADD COLUMN addr_to VARCHAR")
cursor.execute(f'UPDATE offers SET addr_to = "{self.network_addr}"')
db_version += 1
elif current_version == 11:
cursor.execute(
"ALTER TABLE bids ADD COLUMN chain_a_height_start INTEGER"
)
cursor.execute(
"ALTER TABLE bids ADD COLUMN chain_b_height_start INTEGER"
)
cursor.execute("ALTER TABLE bids ADD COLUMN protocol_version INTEGER")
cursor.execute("ALTER TABLE offers ADD COLUMN protocol_version INTEGER")
cursor.execute("ALTER TABLE transactions ADD COLUMN tx_data BLOB")
db_version += 1
elif current_version == 12:
cursor.execute(
"""
CREATE TABLE knownidentities (
record_id INTEGER NOT NULL,
address VARCHAR,
label VARCHAR,
publickey BLOB,
num_sent_bids_successful INTEGER,
num_recv_bids_successful INTEGER,
num_sent_bids_rejected INTEGER,
num_recv_bids_rejected INTEGER,
num_sent_bids_failed INTEGER,
num_recv_bids_failed INTEGER,
note VARCHAR,
updated_at BIGINT,
created_at BIGINT,
PRIMARY KEY (record_id))"""
)
cursor.execute("ALTER TABLE bids ADD COLUMN reject_code INTEGER")
cursor.execute("ALTER TABLE bids ADD COLUMN rate INTEGER")
cursor.execute(
"ALTER TABLE offers ADD COLUMN amount_negotiable INTEGER"
)
cursor.execute("ALTER TABLE offers ADD COLUMN rate_negotiable INTEGER")
db_version += 1
elif current_version == 13:
db_version += 1
cursor.execute(
"""
CREATE TABLE automationstrategies (
record_id INTEGER NOT NULL,
active_ind INTEGER,
label VARCHAR,
type_ind INTEGER,
only_known_identities INTEGER,
num_concurrent INTEGER,
data BLOB,
note VARCHAR,
created_at BIGINT,
PRIMARY KEY (record_id))"""
)
cursor.execute(
"""
CREATE TABLE automationlinks (
record_id INTEGER NOT NULL,
active_ind INTEGER,
linked_type INTEGER,
linked_id BLOB,
strategy_id INTEGER,
data BLOB,
repeat_limit INTEGER,
repeat_count INTEGER,
note VARCHAR,
created_at BIGINT,
PRIMARY KEY (record_id))"""
)
cursor.execute(
"""
CREATE TABLE history (
record_id INTEGER NOT NULL,
concept_type INTEGER,
concept_id INTEGER,
changed_data BLOB,
note VARCHAR,
created_at BIGINT,
PRIMARY KEY (record_id))"""
)
cursor.execute(
"""
CREATE TABLE bidstates (
record_id INTEGER NOT NULL,
active_ind INTEGER,
state_id INTEGER,
label VARCHAR,
in_progress INTEGER,
note VARCHAR,
created_at BIGINT,
PRIMARY KEY (record_id))"""
)
cursor.execute("ALTER TABLE wallets ADD COLUMN active_ind INTEGER")
cursor.execute(
"ALTER TABLE knownidentities ADD COLUMN active_ind INTEGER"
)
cursor.execute("ALTER TABLE eventqueue RENAME TO actions")
cursor.execute(
"ALTER TABLE actions RENAME COLUMN event_id TO action_id"
)
cursor.execute(
"ALTER TABLE actions RENAME COLUMN event_type TO action_type"
)
cursor.execute(
"ALTER TABLE actions RENAME COLUMN event_data TO action_data"
)
elif current_version == 14:
db_version += 1
cursor.execute(
"ALTER TABLE xmr_swaps ADD COLUMN coin_a_lock_release_msg_id BLOB"
)
cursor.execute(
"ALTER TABLE xmr_swaps RENAME COLUMN coin_a_lock_refund_spend_tx_msg_id TO coin_a_lock_spend_tx_msg_id"
)
elif current_version == 15:
db_version += 1
cursor.execute(
"""
CREATE TABLE notifications (
record_id INTEGER NOT NULL,
active_ind INTEGER,
event_type INTEGER,
event_data BLOB,
created_at BIGINT,
PRIMARY KEY (record_id))"""
)
elif current_version == 16:
db_version += 1
cursor.execute(
"""
CREATE TABLE prefunded_transactions (
record_id INTEGER NOT NULL,
active_ind INTEGER,
created_at BIGINT,
linked_type INTEGER,
linked_id BLOB,
tx_type INTEGER,
tx_data BLOB,
used_by BLOB,
PRIMARY KEY (record_id))"""
)
elif current_version == 17:
db_version += 1
cursor.execute(
"ALTER TABLE knownidentities ADD COLUMN automation_override INTEGER"
)
cursor.execute(
"ALTER TABLE knownidentities ADD COLUMN visibility_override INTEGER"
)
cursor.execute("ALTER TABLE knownidentities ADD COLUMN data BLOB")
cursor.execute("UPDATE knownidentities SET active_ind = 1")
elif current_version == 18:
db_version += 1
cursor.execute("ALTER TABLE xmr_split_data ADD COLUMN addr_from STRING")
cursor.execute("ALTER TABLE xmr_split_data ADD COLUMN addr_to STRING")
elif current_version == 19:
db_version += 1
cursor.execute("ALTER TABLE bidstates ADD COLUMN in_error INTEGER")
cursor.execute("ALTER TABLE bidstates ADD COLUMN swap_failed INTEGER")
cursor.execute("ALTER TABLE bidstates ADD COLUMN swap_ended INTEGER")
elif current_version == 20:
db_version += 1
cursor.execute(
"""
CREATE TABLE message_links (
record_id INTEGER NOT NULL,
active_ind INTEGER,
created_at BIGINT,
linked_type INTEGER,
linked_id BLOB,
msg_type INTEGER,
msg_sequence INTEGER,
msg_id BLOB,
PRIMARY KEY (record_id))"""
)
cursor.execute("ALTER TABLE offers ADD COLUMN bid_reversed INTEGER")
elif current_version == 21:
db_version += 1
cursor.execute("ALTER TABLE offers ADD COLUMN proof_utxos BLOB")
cursor.execute("ALTER TABLE bids ADD COLUMN proof_utxos BLOB")
elif current_version == 22:
db_version += 1
cursor.execute("ALTER TABLE offers ADD COLUMN amount_to INTEGER")
elif current_version == 23:
db_version += 1
cursor.execute(
"""
CREATE TABLE checkedblocks (
record_id INTEGER NOT NULL,
created_at BIGINT,
coin_type INTEGER,
block_height INTEGER,
block_hash BLOB,
block_time INTEGER,
PRIMARY KEY (record_id))"""
)
cursor.execute("ALTER TABLE bids ADD COLUMN pkhash_buyer_to BLOB")
elif current_version == 24:
db_version += 1
cursor.execute("ALTER TABLE bidstates ADD COLUMN can_accept INTEGER")
elif current_version == 25:
db_version += 1
cursor.execute(
"""
CREATE TABLE coinrates (
record_id INTEGER NOT NULL,
currency_from INTEGER,
currency_to INTEGER,
rate VARCHAR,
source VARCHAR,
last_updated INTEGER,
PRIMARY KEY (record_id))"""
)
elif current_version == 26:
db_version += 1
cursor.execute("ALTER TABLE offers ADD COLUMN auto_accept_type INTEGER")
elif current_version == 27:
db_version += 1
cursor.execute("ALTER TABLE offers ADD COLUMN pk_from BLOB")
cursor.execute("ALTER TABLE bids ADD COLUMN pk_bid_addr BLOB")
if current_version != db_version:
self.db_version = db_version
self.setIntKV("db_version", db_version, cursor)
self.commitDB()
self.log.info("Upgraded database to version {}".format(self.db_version))
query = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;"
tables = cursor.execute(query).fetchall()
for table in tables:
table_name = table[0]
if table_name in ("sqlite_sequence",):
continue
have_table = {}
have_columns = {}
query = "SELECT * FROM PRAGMA_TABLE_INFO(:table_name) ORDER BY cid DESC;"
columns = cursor.execute(query, {"table_name": table_name}).fetchall()
for column in columns:
cid, name, data_type, notnull, default_value, primary_key = column
have_columns[name] = {"type": data_type, "primary_key": primary_key}
have_table["columns"] = have_columns
cursor.execute(f"PRAGMA INDEX_LIST('{table_name}');")
indices = cursor.fetchall()
for index in indices:
seq, index_name, unique, origin, partial = index
if origin == "pk": # Created by a PRIMARY KEY constraint
continue
cursor.execute(f"PRAGMA INDEX_INFO('{index_name}');")
index_info = cursor.fetchall()
add_index = {"index_name": index_name}
for index_columns in index_info:
seqno, cid, name = index_columns
if origin == "u": # Created by a UNIQUE constraint
have_columns[name]["unique"] = 1
else:
if "column_1" not in add_index:
add_index["column_1"] = name
elif "column_2" not in add_index:
add_index["column_2"] = name
elif "column_3" not in add_index:
add_index["column_3"] = name
else:
raise RuntimeError("Add more index columns.")
if origin == "c":
if "indices" not in table:
have_table["indices"] = []
have_table["indices"].append(add_index)
have_tables[table_name] = have_table
for table_name, table in expect_schema.items():
if table_name not in have_tables:
self.log.info(f"Creating table {table_name}.")
create_table(cursor, table_name, table)
continue
have_table = have_tables[table_name]
have_columns = have_table["columns"]
for colname, column in table["columns"].items():
if colname not in have_columns:
col_type = column["type"]
self.log.info(f"Adding column {colname} to table {table_name}.")
cursor.execute(
f"ALTER TABLE {table_name} ADD COLUMN {colname} {col_type}"
)
indices = table.get("indices", [])
have_indices = have_table.get("indices", [])
for index in indices:
index_name = index["index_name"]
if not any(
have_idx.get("index_name") == index_name
for have_idx in have_indices
):
self.log.info(f"Adding index {index_name} to table {table_name}.")
column_1 = index["column_1"]
column_2 = index.get("column_2", None)
column_3 = index.get("column_3", None)
query: str = (
f"CREATE INDEX {index_name} ON {table_name} ({column_1}"
)
if column_2:
query += f", {column_2}"
if column_3:
query += f", {column_3}"
query += ")"
cursor.execute(query)
if CURRENT_DB_VERSION != db_version:
self.db_version = CURRENT_DB_VERSION
self.setIntKV("db_version", CURRENT_DB_VERSION, cursor)
self.log.info(f"Upgraded database to version {self.db_version}")
self.commitDB()
except Exception as e:
self.log.error("Upgrade failed {}".format(e))
self.log.error(f"Upgrade failed {e}")
self.rollbackDB()
finally:
self.closeDB(cursor, commit=False)
break
if db_version != CURRENT_DB_VERSION:
raise ValueError("Unable to upgrade database.")