pythonmultiprocessingpython-multiprocessingpython-multithreading

Python Pool.apply_async() is returning None type objects


I have a 4GB+ file in which each line represents a very nested JSON string. An example of what the file looks like would be below:

{"core":{"id":"1","field1":{"subfield1":1, "subfield2":{"subsubfield1": 1}},....,"field700":700}}
{"core":{"id":"1","field1":{"subfield1":1, "subfield2":{"subsubfield1": 1}},....,"field700":700}}
100,000+ lines like above

I need to do the followings:

My plan is to divide this 100K plus lines into multiple chunks, and use multiprocessing to flatten the JSON objects in each chunk and combine them back into a dataframe. Because I am new to multiprocessing, I read a few posts including this and this (and a few more). I tried to write my code based on the first post like this:

import json
import multiprocessing
from multiprocessing import Pool, Process, Queue

import pandas as pd


# list of ~100 columns I want to keep
COLS_TO_KEEP = {'id', 'field1', 'subfield1', 'subsubfield2', 'field8', ..., 'field680'}


def chunk_the_list(list_of_items, chunk_size):
    for i in range(0, len(list_of_items), chunk_size):
        yield list_of_items[i : i + chunk_size]


def do_work(list_of_dicts):
    results = []
    for d in list_of_dicts:
        results.append(flatten_dict(d))


def flatten_dict(d, parent_key="", sep="_"):
    # This function recursively dives into each JSON object and flattens them
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        else:
            if new_key in COLS_TO_KEEP:
                items.append((new_key, v))
    return dict(items)


def main():
    raw_data_file_path_name = 'path/to/file.txt' # file is 4GB+
    file = open(raw_data_file_path_name, encoding="utf-8").readlines()
    listify = [json.loads(line) for line in file]
    size_of_each_chunk = int(len(listify) / (multiprocessing.cpu_count() - 2))
    chunked_data = chunk_the_list(listify, size_of_each_chunk)

    p = Pool(processes=multiprocessing.cpu_count() - 2)
    results = [p.apply_async(do_work, args=(chunk,)) for chunk in chunked_data]

    results = [item.get() for item in results] # This list has None type objects when I check using debugger
    results = sum(results, []) # because the above line returns None values, this step errors out 

    # I want to create a Pandas dataframe using the accumulated JSON/dictionary objects from results and write it out as a parquet or csv file at the end like this https://stackoverflow.com/a/20638258/1330974
    df = pd.DataFrame(results)
    df.to_parquet('df.parquet.gzip', compression='gzip')

if __name__ == "__main__":
    start_time = time.time()
    main()

Am I doing something wrong to get None type objects back in the results set? Thank you in advance for suggestions and answers!


Solution

  • You need to add a return statement to your do_work() function.

    def do_work(list_of_dicts):
        results = []
        for d in list_of_dicts:
            results.append(flatten_dict(d))
        return results