I'm trying to think of how to rewrite some code asynchroniously. I have to download ~7500 datasets from an api and write them to .csv's. Here is a reproducible example (assuming you have a free api key for alpha vantage):
from alpha_vantage.timeseries import TimeSeries
import pandas as pd
import numpy as np
api_key = ""
def get_ts(symbol):
ts = TimeSeries(key=api_key, output_format='pandas')
data, meta_data = ts.get_daily_adjusted(symbol=symbol, outputsize='full')
fname = "./data_dump/{}_data.csv".format(symbol)
data.to_csv(fname)
symbols = ['AAPL', 'GOOG', 'TSLA', 'MSFT']
for s in symbols:
get_ts(s)
The people who made the alpha_vantage
API wrote an article on using it with asyncio here, but I'm not sure if I should make two functions for pulling the data and writing the csv like here.
I haven't used asyncio before, so any pointers would be greatly appreciated - just looking to make my download time take less than 3 hours if possible!
Edit: The other caveat is I'm helping a researcher with this so we are using Jupyter notebooks - see their caveat for asyncio here.
Without changing your function get_ts
, it might look like this:
import multiprocessing
# PROCESSES = multiprocessing.cpu_count()
PROCESSES = 4 # number of parallel process
CHUNKS = 6 # one process handle n symbols
# 7.5k symbols
TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
"XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
"TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
"AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
"MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
"CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]
# create a list of n sublist
TICKERS = [TICKERS[i:i + CHUNKS] for i in range(0, len(TICKERS), CHUNKS)]
def download_data(pool_id, symbols):
for symbol in symbols:
print("[{:02}]: {}".format(pool_id, symbol))
# do stuff here
# get_ts(symbol)
if __name__ == "__main__":
with multiprocessing.Pool(PROCESSES) as pool:
pool.starmap(download_data, enumerate(TICKERS, start=1))
Similar question here.
In this example, I split the list of tickers into sublists for each process retrieves data for multiple symbols and limits overhead due to create and destroy processes.