pythonpandaspython-asyncioalpha-vantage

Python: use asyncio to hit api and output .csv's


I'm trying to think of how to rewrite some code asynchroniously. I have to download ~7500 datasets from an api and write them to .csv's. Here is a reproducible example (assuming you have a free api key for alpha vantage):

from alpha_vantage.timeseries import TimeSeries
import pandas as pd
import numpy as np
api_key = ""

def get_ts(symbol):
    
    ts = TimeSeries(key=api_key, output_format='pandas')
    data, meta_data = ts.get_daily_adjusted(symbol=symbol, outputsize='full')
    fname = "./data_dump/{}_data.csv".format(symbol)
    data.to_csv(fname)

symbols = ['AAPL', 'GOOG', 'TSLA', 'MSFT']

for s in symbols:
    get_ts(s)

The people who made the alpha_vantage API wrote an article on using it with asyncio here, but I'm not sure if I should make two functions for pulling the data and writing the csv like here.

I haven't used asyncio before, so any pointers would be greatly appreciated - just looking to make my download time take less than 3 hours if possible!

Edit: The other caveat is I'm helping a researcher with this so we are using Jupyter notebooks - see their caveat for asyncio here.


Solution

  • Without changing your function get_ts, it might look like this:

    import multiprocessing
    
    # PROCESSES = multiprocessing.cpu_count()
    PROCESSES = 4  # number of parallel process
    CHUNKS = 6  # one process handle n symbols
    
    # 7.5k symbols
    TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
               "XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
               "TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
               "AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
               "MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
               "CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]
    
    # create a list of n sublist
    TICKERS = [TICKERS[i:i + CHUNKS] for i in range(0, len(TICKERS), CHUNKS)]
    
    
    def download_data(pool_id, symbols):
        for symbol in symbols:
            print("[{:02}]: {}".format(pool_id, symbol))
            # do stuff here
            # get_ts(symbol)
    
    
    if __name__ == "__main__":
        with multiprocessing.Pool(PROCESSES) as pool:
            pool.starmap(download_data, enumerate(TICKERS, start=1))
    

    Similar question here.

    In this example, I split the list of tickers into sublists for each process retrieves data for multiple symbols and limits overhead due to create and destroy processes.