I am working on a project that uses the ccxt
async library which requires all resources used by a certain class to be released with an explicit call to the class's .close()
coroutine. I want to exit the program with ctrl+c
and await the close coroutine in the exception. However, it is never awaited.
The application consists of the modules harvesters
, strategies
, traders
, broker
, and main
(plus config and such). The broker initiates the strategies specified for an exchange and executes them. The strategy initiates the associated harvester which collects the necessary data. It also analyses the data and spawns a trader when there is a profitable opportunity. The main module creates a broker for each exchange and runs it. I have tried to catch the exception at each of these levels, but the close routine is never awaited. I'd prefer to catch it in the main module in order to close all exchange instances.
Harvester
async def harvest(self):
if not self.routes:
self.routes = await self.get_routes()
for route in self.routes:
self.logger.info("Harvesting route {}".format(route))
await asyncio.sleep(self.exchange.rateLimit / 1000)
yield await self.harvest_route(route)
Strategy
async def execute(self):
async for route_dct in self.harvester.harvest():
self.logger.debug("Route dictionary: {}".format(route_dct))
await self.try_route(route_dct)
Broker
async def run(self):
for strategy in self.strategies:
self.strategies[strategy] = getattr(
strategies, strategy)(self.share, self.exchange, self.currency)
while True:
try:
await self.execute_strategies()
except KeyboardInterrupt:
await safe_exit(self.exchange)
Main
async def main():
await load_exchanges()
await load_markets()
brokers = [Broker(
share,
exchanges[id]["api"],
currency,
exchanges[id]["strategies"]
) for id in exchanges]
futures = [broker.run() for broker in brokers]
for future in asyncio.as_completed(futures):
executed = await future
return executed
if __name__ == "__main__":
status = asyncio.run(main())
sys.exit(status)
I had expected the close()
coroutine to be awaited, but I still get an error from the library that I must explicitly call it. Where do I catch the exception so that all exchange instances are closed properly?
Somewhere in your code should be an entry point, where the event loop is started.
Usually it is one of functions below:
loop.run_until_complete(main())
loop.run_forever()
asyncio.run(main())
When ctrl+C
happens, KeyboardInterrupt
can be caught at this line. When it happened to execute some finalizing coroutine you can run event loop again.
This little example shows idea:
import asyncio
async def main():
print('Started, press ctrl+C')
await asyncio.sleep(10)
async def close():
print('Finalazing...')
await asyncio.sleep(1)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
loop.run_until_complete(close())
finally:
print('Program finished')