databasetime-seriesquestdb

Efficient way to upload a pandas dataframe into QuestDB


What is the efficient (speed) way to upload a pandas dataframe into the database? I am following the example provided in https://questdb.io/docs/clients/ingest-python#basic-insert

conf = f'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
    sender.dataframe(df, table_name='trades', at=TimestampNanos.now())

It takes about 45 sec to upload a dataframe with ~5M rows and 50 columns, each about 350 bytes. I am using the default parameters as referenced in the example. I was expecting a much faster load performance.


Solution

  • After speaking to a core engineer at QuestDB, I have some insights and managed to ingest the data in just 8 seconds.

    It turns out that even if the pandas streaming is zero-copy, due to ILP protocol limitations, it has to be expanded into UTF-8 for transfer, so more data will be transferred than the absolute size of the dataframe.

    In the future QuestDB will have its own binary protocol, other than ILP, but at the moment ILP would be our fastest option, so we need a workaround to ingest data faster. It turns out QuestDB ingest data faster when you have multiple connections sending data in parallel, so one straight forward option here is to divide the dataframe into chunks and stream those chunks from multiple connections.

    We can do something like this:

    1. Split frame into batches
    2. Define a sending function
    3. Use a thread pool to send batches in parallel
    from collections import deque
    from concurrent.futures import ThreadPoolExecutor
    
    
    batches = deque()
    for slice in np.array_split(df, df.size / batch_size):
        batches.append(slice)
    
    def send_batch(conf_str, table_name, batches, timestamp_name):
        with Sender.from_conf(conf_str, auto_flush=False, init_buf_size=100000000) as qdb_sender:
            while True:
                try:
                    slice = batches.pop()
                except IndexError:
                    break
                qdb_sender.dataframe(slice, table_name=table_name, at=timestamp_name)
                qdb_sender.flush()
    
    
    with ThreadPoolExecutor(max_workers=parallel) as executor:
        futures = []
        for _ in range(parallel):
           futures.append(executor.submit(send_batch, conf_str, table_name, batches, timestamp_name))
        for future in futures:
           future.result()
    

    We can play with autoflush rather than explicit flushing after every batch, and we could use multiprocessing rather than a thread pool, but that's basically the idea.

    Note: I was told another avenue would be working directly with QuestDB's parquet capabilities if data was in parquet format, but in my case that was not an option as data comes from an external source and converting into parquet would be extra overhead.