pythonfileiotemporary-files

How to use io to generate in memory data streams as file like objects?


I want to generate a in-memory (temp file) data stream in Python. One thread is filling the stream with data, and a other one consumes it.

After checking the io - Core tools for working with streams , it seems to me that the io module is the best choice for it.

So I made a simple example:

#!/usr/local/bin/python3
# encoding: utf-8

import io

if __name__ == '__main__':
    a = io.BytesIO()
    a.write("hello".encode())
    txt = a.read(100)
    txt = txt.decode("utf-8")
    print(txt) 

My example does not work. "hello" is not written to a and can not be read after. So where is my error? How do I have to alter my code to get a file like object in memory?


Solution

  • @dhilmathy and @ShadowRanger mentioned that io.BytesIO() do not have separate read and write pointer.

    I overcome this problem with creating a simple class that implements a read pointer and remembers the amount of bytes written. When the amount of read bytes is equal the amount of written bytes the file is shrink to save memory.

    My solution so far:

    #!/usr/local/bin/python3
    # encoding: utf-8
    
    import io
    
    class memoryStreamIO(io.BytesIO):
        """
        memoryStreamIO
    
        a in memory file like stream object 
        """
    
        def __init__(self):
            super().__init__()
            self._wIndex = 0
            self._rIndex = 0
            self._mutex = threading.Lock()
    
        def write(self, d : bytearray):
            self._mutex.acquire()
            r = super().write(d)
            self._wIndex += len(d)
            self._mutex.release()
            return r
    
        def read(self, n : int):
            self._mutex.acquire()
            super().seek(self._rIndex)
            r = super().read(n)
            self._rIndex += len(r)
            # now we are checking if we can
            if self._rIndex == self._wIndex:
                super().truncate(0)
                super().seek(0)
                self._rIndex = 0
                self._wIndex = 0
            self._mutex.release()
            return r
    
        def seek(self, n):
            self._mutex.acquire()
            self._rIndex = n
            r = super().seek(n)
            self._mutex.release()
            return r
    
    
    if __name__ == '__main__':
        a = streamIO()
    
        a.write("hello".encode())
        txt = (a.read(100)).decode()
        print(txt)
    
        a.write("abc".encode())
        txt = (a.read(100)).decode()
        print(txt)