jsonpython-3.xout-of-memoryin-memory

Python: Reading and Writing HUGE Json files


I am new to python. So please excuse me if I am not asking the questions in pythonic way.

My requirements are as follows:

  1. I need to write python code to implement this requirement.

  2. Will be reading 60 json files as input. Each file is approximately 150 GB.

  3. Sample structure for all 60 json files is as shown below. Please note each file will have only ONE json object. And the huge size of each file is because of the number and size of the "array_element" array contained in that one huge json object.

    { "string_1":"abc", "string_1":"abc", "string_1":"abc", "string_1":"abc", "string_1":"abc", "string_1":"abc", "array_element":[] }

  4. Transformation logic is simple. I need to merge all the array_element from all 60 files and write it into one HUGE json file. That is almost 150GB X 60 will be the size of the output json file.

Questions for which I am requesting your help on:

  1. For reading: Planning on using "ijson" module's ijson.items(file_object, "array_element"). Could you please tell me if ijson.items will "Yield" (that is NOT load the entire file into memory) one item at a time from "array_element" array in the json file? I dont think json.load is an option here because we cannot hold such a huge dictionalry in-memory.

  2. For writing: I am planning to read each item using ijson.item, and do json.dumps to "encode" and then write it to the file using file_object.write and NOT using json.dump since I cannot have such a huge dictionary in memory to use json.dump. Could you please let me know if f.flush() applied in the code shown below is needed? To my understanding, the internal buffer will automatically get flushed by itself when it is full and the size of the internal buffer is constant and wont dynamically grow to an extent that it will overload the memory? please let me know

  3. Are there any better approach to the ones mentioned above for incrementally reading and writing huge json files?

Code snippet showing above described reading and writing logic:

for input_file in input_files:
    with open("input_file.json", "r") as f:
         objects = ijson.items(f, "array_element")
         for item in objects:
              str = json.dumps(item, indent=2)
              with open("output.json", "a") as f:
                   f.write(str)
                   f.write(",\n")
                   f.flush()
    with open("output.json", "a") as f:
        f.seek(0,2)
        f.truncate(f.tell() - 1)
        f.write("]\n}")

Hope I have asked my questions clearly. Thanks in advance!!


Solution

  • The following program assumes that the input files have a format that is predictable enough to skip JSON parsing for the sake of performance.

    My assumptions, inferred from your description, are:

    When all of these points are true, concatenating a predefined header fragment, the respective file ranges, and a footer fragment would produce one large, valid JSON file.

    import re
    import mmap
    
    head_pattern = re.compile(br'"array_element"\s*:\s*\[\s*', re.S)
    tail_pattern = re.compile(br'\s*\]\s*\}\s*$', re.S)
    
    input_files = ['sample1.json', 'sample2.json']
    
    with open('result.json', "wb") as result:
        head_bytes = 500
        tail_bytes = 50
        chunk_bytes = 16 * 1024
    
        result.write(b'{"JSON": "fragment", "array_element": [\n')
    
        for input_file in input_files:
            print(input_file)
    
            with open(input_file, "r+b") as f:
                mm = mmap.mmap(f.fileno(), 0)
                
                start = head_pattern.search(mm[:head_bytes])
                end = tail_pattern.search(mm[-tail_bytes:])
    
                if not (start and end):
                    print('unexpected file format')
                    break
    
                start_pos = start.span()[1]
                end_pos = mm.size() - end.span()[1] + end.span()[0]
    
                if input_files.index(input_file) > 0:
                    result.write(b',\n')
    
                pos = start_pos
                mm.seek(pos)
                while True:
                    if pos + chunk_bytes >= end_pos:
                        result.write(mm.read(end_pos - pos))
                        break
                    else:
                        result.write(mm.read(chunk_bytes))
                        pos += chunk_bytes
    
        result.write(b']\n}')
    

    If the file format is 100% predictable, you can throw out the regular expressions and use mm[:head_bytes].index(b'...') etc for the start/end position arithmetic.