pythonpython-3.xpython-asynciostreamwriter

Why should asyncio.StreamWriter.drain be explicitly called?


From doc:

write(data)

Write data to the stream.

This method is not subject to flow control. Calls to write() should be followed by drain().

coroutine drain()

Wait until it is appropriate to resume writing to the stream. Example:

writer.write(data)
await writer.drain()

From what I understand,

Then why is write not a coroutine that calls it automatically? Why would one call write without having to drain? I can think of two cases

  1. You want to write and close immediately
  2. You have to buffer some data before the message it is complete.

First one is a special case, I think we can have a different API. Buffering should be handled inside write function and application should not care.


Let me put the question differently. What is the drawback of doing this? Does the python3.8 version effectively do this?

async def awrite(writer, data):
    writer.write(data)
    await writer.drain()

Note: drain doc explicitly states the below:

When there is nothing to wait for, the drain() returns immediately.


Reading the answer and links again, I think the functions work like this. Note: Check accepted answer for more accurate version.

def write(data):
    remaining = socket.try_write(data)
    if remaining:
        _pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data

async def drain():
    if len(_pendingbuffer) < BUF_LIMIT:
        return
    await wait_until_other_side_is_up_to_speed()
    assert len(_pendingbuffer) < BUF_LIMIT

async def awrite(writer, data):
    writer.write(data)
    await writer.drain()        

So when to use what:

  1. When the data is not continuous, Like responding to an HTTP request. We just need to send some data and don't care about when it is reached and memory is not a concern - Just use write
  2. Same as above but memory is a concern, use awrite
  3. When streaming data to a large number of clients (e.g. some live stream or a huge file). If the data is duplicated in each of the connection's buffers, it will definitely overflow RAM. In this case, write a loop that takes a chunk of data each iteration and call awrite. In case of a huge file, loop.sendfile is better if available.

Solution

  • From what I understand, (1) You need to call drain every time write is called. (2) If not I guess, write will block the loop thread

    Neither is correct, but the confusion is quite understandable. The way write() works is as follows:

    The second property is convenient at first glance - you can call write() wherever you need to, even from a non-async function - but it's also a major flaw of write(). Stream writing as implemented in asyncio completely decoupled from the OS accepting the data, so if you write data faster than your network peer can read it, the internal buffer will keep growing and you'll have a memory leak on your hands. drain() fixes that problem: awaiting it pauses the coroutine if the write buffer has grown too large, and resumes it again once the os.write()'s performed in the background are successful and the buffer shrinks.

    You don't need to await drain() after every write, but you do need to await it as needed to prevent the buffer from growing without bounds. A drain is typically inserted between iterations of a loop which invokes write(). For example:

    while True:
        response = await peer1.readline()
        peer2.write(b'<response>')
        peer2.write(response)
        peer2.write(b'</response>')
        await peer2.drain()
    

    drain() returns immediately if the amount of pending unwritten data is small. If the data exceeds a high threshold, drain() will suspend the calling coroutine until the amount of pending unwritten data drops beneath a low threshold. The pause will cause the coroutine to stop reading from peer1, which will in turn cause peer1 to slow down the rate at which it sends us data. This kind of feedback is referred to as back-pressure.

    Buffering should be handled inside write function and application should not care.

    That is pretty much how write() works now - it does handle buffering and it lets the application not care, for better or worse. Also see this answer for additional info.


    Addressing the edited part of the question:

    Reading the answer and links again, I think the the functions work like this.

    write() is still a bit smarter than that. It won't try to write only once, it will actually arrange for data to continue to be written until there is no data left to write. This will happen even if you never await drain() - the only thing the application must do is let the event loop run its course for long enough to write everything out.

    A more correct pseudo code of write and drain might look like this:

    class ToyWriter:
        def __init__(self):
            self._buf = bytearray()
            self._empty = asyncio.Event(True)
    
        def write(self, data):
            self._buf.extend(data)
            # Instruct the event loop to write out the data as soon as the file
            # descriptor is writable, regardless of calls to drain()
            loop.add_writer(self._fd, self._do_write)
            self._empty.clear()
    
        def _do_write(self):
            # invoked when fd is writable, write as much as possible
            while self._buf:
                try:
                    nwritten = os.write(self._fd, self._buf)
                except OSError as e:
                    if e.errno == errno.EWOULDBLOCK:
                        # Give up for now. We'll be called anew as soon
                        # self._fd becomes writable again.
                        return
                    raise
                self._buf = self._buf[nwritten:]
            self._empty.set()
            loop.remove_writer(self._fd, self._do_write)
    
        async def drain(self):
            if len(self._buf) > 64*1024:
                await self._empty.wait()
    

    The actual implementation is more complicated because:

    The last point is another good reason to call drain() - to actually notice that the peer is gone by the fact that writing to it is failing.