diff --git a/basicswap/__init__.py b/basicswap/__init__.py index 73f5c3b..b9c4099 100644 --- a/basicswap/__init__.py +++ b/basicswap/__init__.py @@ -1,3 +1,3 @@ name = "basicswap" -__version__ = "0.0.2" +__version__ = "0.0.3" diff --git a/basicswap/basicswap.py b/basicswap/basicswap.py index 9d9eaa5..18e6a48 100644 --- a/basicswap/basicswap.py +++ b/basicswap/basicswap.py @@ -359,7 +359,7 @@ class BasicSwap(): self.zmqSubscriber.setsockopt_string(zmq.SUBSCRIBE, 'smsg') for c in Coins: - self.coin_clients[c] = self.setCoinConnectParams(c) + self.setCoinConnectParams(c) def prepareLogging(self): self.log = logging.getLogger(self.log_name) @@ -385,6 +385,7 @@ class BasicSwap(): return {} def setCoinConnectParams(self, coin): + # Set anything that does not require the daemon to be running chain_client_settings = self.getChainClientSettings(coin) bindir = os.path.expanduser(chain_client_settings.get('bindir', '')) @@ -399,22 +400,6 @@ class BasicSwap(): elif 'rpcpassword' in chain_client_settings: rpcauth = chain_client_settings['rpcuser'] + ':' + chain_client_settings['rpcpassword'] self.log.debug('Read %s rpc credentials from json settings', coin) - if rpcauth is None: - if self.chain == 'mainnet': - testnet_name = '' - else: - testnet_name = chainparams[coin][self.chain].get('name', self.chain) - authcookiepath = os.path.join(datadir, testnet_name, '.cookie') - self.log.debug('Reading %s rpc credentials from auth cookie %s', coin, authcookiepath) - # Wait for daemon to start - for i in range(10): - if not os.path.exists(authcookiepath): - time.sleep(0.5) - try: - with open(authcookiepath, 'rb') as fp: - rpcauth = fp.read().decode('utf-8') - except Exception: - self.log.warning('Unable to read authcookie for %s, %s', str(coin), authcookiepath) session = scoped_session(self.session_factory) try: @@ -424,8 +409,9 @@ class BasicSwap(): session.close() session.remove() - return { + self.coin_clients[coin] = { 'coin': coin, + 'name': chainparams[coin]['name'], 'connection_type': connection_type, 'bindir': bindir, 'datadir': datadir, @@ -437,8 +423,44 @@ class BasicSwap(): 'last_height_checked': last_height_checked, 'use_segwit': chain_client_settings.get('use_segwit', False), 'use_csv': chain_client_settings.get('use_csv', True), + 'pid': None, } + def setDaemonPID(self, name, pid): + for c, v in self.coin_clients.items(): + if v['name'] == name: + v['pid'] = pid + + def getChainDatadirPath(self, coin): + datadir = self.coin_clients[coin]['datadir'] + testnet_name = '' if self.chain == 'mainnet' else chainparams[coin][self.chain].get('name', self.chain) + return os.path.join(datadir, testnet_name) + + def setCoinRunParams(self, coin): + cc = self.coin_clients[coin] + if cc['connection_type'] == 'rpc' and cc['rpcauth'] is None: + chain_client_settings = self.getChainClientSettings(coin) + authcookiepath = os.path.join(self.getChainDatadirPath(coin), '.cookie') + pidfilepath = os.path.join(self.getChainDatadirPath(coin), cc['name'] + '.pid') + self.log.debug('Reading %s rpc credentials from auth cookie %s', coin, authcookiepath) + # Wait for daemon to start + # Test pids to ensure authcookie is read for the correct process + for i in range(20): + try: + with open(pidfilepath, 'rb') as fp: + datadir_pid = int(fp.read().decode('utf-8')) + assert(datadir_pid == cc['pid']) + assert(os.path.exists(authcookiepath)) + except Exception: + time.sleep(0.5) + try: + assert(datadir_pid == cc['pid']) + with open(authcookiepath, 'rb') as fp: + cc['rpcauth'] = fp.read().decode('utf-8') + except Exception: + self.log.error('Unable to read authcookie for %s, %s, datadir pid %d, daemon pid %s', str(coin), authcookiepath, datadir_pid, cc['pid']) + raise ValueError('Error, terminating') + def start(self): self.log.info('Starting BasicSwap %s\n\n', __version__) self.log.info('sqlalchemy version %s', sa.__version__) @@ -446,6 +468,7 @@ class BasicSwap(): self.upgradeDatabase(self.db_version) for c in Coins: + self.setCoinRunParams(c) if self.coin_clients[c]['connection_type'] == 'rpc': self.waitForDaemonRPC(c) core_version = self.callcoinrpc(c, 'getnetworkinfo')['version'] @@ -458,6 +481,38 @@ class BasicSwap(): self.initialise() + def stopDaemon(self, coin): + num_tries = 10 + authcookiepath = os.path.join(self.getChainDatadirPath(coin), '.cookie') + stopping = False + try: + for i in range(num_tries): + rv = self.callcoincli(coin, 'stop', timeout=10) + self.log.debug('Trying to stop %s', str(coin)) + stopping = True + time.sleep(i + 1) + except Exception as ex: + if 'Could not connect' in str(ex): + if stopping: + for i in range(30): + # The lock file doesn't get deleted + # Using .cookie is a temporary workaround, will only work if rpc password is unset. + # TODO: Query lock on .lock properly + if os.path.exists(authcookiepath): + self.log.debug('Waiting on .cookie file %s', str(coin)) + time.sleep(i + 1) + time.sleep(4) # Extra time to settle + return + self.log.error('stopDaemon %s', str(ex)) + traceback.print_exc() + raise ValueError('Could not stop {}'.format(str(coin))) + + def stopDaemons(self): + for c in Coins: + chain_client_settings = self.getChainClientSettings(c) + if self.coin_clients[c]['connection_type'] == 'rpc' and chain_client_settings['manage_daemon'] is True: + self.stopDaemon(c) + def stopRunning(self, with_code=0): self.fail_code = with_code self.is_running = False @@ -2391,14 +2446,14 @@ class BasicSwap(): raise ValueError('TX error ' + str(out[1])) return out[0].decode('utf-8').strip() - def callcoincli(self, coin_type, params, wallet=None): + def callcoincli(self, coin_type, params, wallet=None, timeout=None): bindir = self.coin_clients[coin_type]['bindir'] datadir = self.coin_clients[coin_type]['datadir'] command_cli = os.path.join(bindir, chainparams[coin_type]['name'] + '-cli' + ('.exe' if os.name == 'nt' else '')) chainname = '' if self.chain == 'mainnet' else (' -' + self.chain) args = command_cli + chainname + ' ' + '-datadir=' + datadir + ' ' + params p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - out = p.communicate() + out = p.communicate(timeout=timeout) if len(out[1]) > 0: raise ValueError('CLI error ' + str(out[1])) return out[0].decode('utf-8').strip() diff --git a/bin/basicswap_run.py b/bin/basicswap_run.py index fedd42d..61198a4 100644 --- a/bin/basicswap_run.py +++ b/bin/basicswap_run.py @@ -20,7 +20,7 @@ import subprocess import logging from basicswap import __version__ -from basicswap.basicswap import BasicSwap +from basicswap.basicswap import BasicSwap, Coins from basicswap.http_server import HttpThread @@ -58,8 +58,11 @@ def runClient(fp, data_dir, chain, test_mode): with open(settings_path) as fs: settings = json.load(fs) + swap_client = BasicSwap(fp, data_dir, settings, chain) + daemons = [] pids = [] + threads = [] if os.path.exists(pids_path): with open(pids_path) as fd: @@ -67,43 +70,44 @@ def runClient(fp, data_dir, chain, test_mode): # TODO: try close logger.warning('Found pid for daemon {} '.format(ln.strip())) - for c, v in settings['chainclients'].items(): - if v['manage_daemon'] is True: - logger.info('Starting {} daemon'.format(c.capitalize())) - - filename = c + 'd' + ('.exe' if os.name == 'nt' else '') - daemons.append(startDaemon(v['datadir'], v['bindir'], filename)) - pid = daemons[-1].pid - pids.append((c, pid)) - logger.info('Started {} {}'.format(filename, pid)) - - if len(pids) > 0: - with open(pids_path, 'w') as fd: - for p in pids: - fd.write('{}:{}\n'.format(*p)) - - swap_client = BasicSwap(fp, data_dir, settings, chain) - - if not test_mode: - # signal only works in main thread - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - swap_client.start() - - threads = [] - if 'htmlhost' in settings: - swap_client.log.info('Starting server at %s:%d.' % (settings['htmlhost'], settings['htmlport'])) - allow_cors = settings['allowcors'] if 'allowcors' in settings else ALLOW_CORS - tS1 = HttpThread(fp, settings['htmlhost'], settings['htmlport'], allow_cors, swap_client) - threads.append(tS1) - tS1.start() + # Ensure daemons are stopped + swap_client.stopDaemons() try: + # Try start daemons + for c, v in settings['chainclients'].items(): + if v['manage_daemon'] is True: + logger.info('Starting {} daemon'.format(c.capitalize())) + + filename = c + 'd' + ('.exe' if os.name == 'nt' else '') + daemons.append(startDaemon(v['datadir'], v['bindir'], filename)) + pid = daemons[-1].pid + pids.append((c, pid)) + swap_client.setDaemonPID(c, pid) + logger.info('Started {} {}'.format(filename, pid)) + if len(pids) > 0: + with open(pids_path, 'w') as fd: + for p in pids: + fd.write('{}:{}\n'.format(*p)) + + if not test_mode: + # Signal only works in main thread + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + swap_client.start() + + if 'htmlhost' in settings: + swap_client.log.info('Starting server at %s:%d.' % (settings['htmlhost'], settings['htmlport'])) + allow_cors = settings['allowcors'] if 'allowcors' in settings else ALLOW_CORS + tS1 = HttpThread(fp, settings['htmlhost'], settings['htmlport'], allow_cors, swap_client) + threads.append(tS1) + tS1.start() + logger.info('Exit with Ctrl + c.') while swap_client.is_running: time.sleep(0.5) swap_client.update() - except Exception: + except Exception as ex: traceback.print_exc() swap_client.log.info('Stopping threads.')