pythonudppython-asynciobroadcast

How to broadcast using asyncio's datagram endpoint?


I have tried to build on asyncio's edp echo client example for building a broadcaster (for Wake On LAN, but cut out some details below to keep the code short). The code below however always fails to send. I have tried other non-broadcast IPs, this doesn't matter. Setting allow_broadcast to False does let the code complete. How can i make it work when broadcasting? I am on Windows 11, Python 3.10. NB: I have commented out two lines to make sure the socket isn't closed too early (then i get other errors, those are a later worry).

import asyncio
from typing import Optional

BROADCAST_IP = "255.255.255.255"
DEFAULT_PORT = 9


class _WOLProtocol:
    def __init__(self, *messages):
        self.packets = messages
        self.done = asyncio.get_running_loop().create_future()
        self.transport = None

    def connection_made(self, transport):
        for p in self.packets:
            transport.sendto(p.encode())

        #transport.close()

    def error_received(self, exc):
        self.done.set_exception(exc)

    def connection_lost(self, exc):
        print('closing')
        #self.done.set_result(None)


async def send_magic_packet(
    *macs: str,
    ip_address: str = BROADCAST_IP,
    port: int = DEFAULT_PORT,
    interface: Optional[str] = None
) -> None:
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: _WOLProtocol(*macs),
        remote_addr=(ip_address, port),
        allow_broadcast = True,
        local_addr=(interface, 0) if interface else None
        )

    try:
        await protocol.done
    finally:
        transport.close()

if __name__ == "__main__":
    asyncio.run(send_magic_packet('test'))

Error i get:

Exception in callback _ProactorDatagramTransport._loop_reading()
handle: <Handle _ProactorDatagramTransport._loop_reading()>
Traceback (most recent call last):
  File "C:\Program Files\Python310\lib\asyncio\proactor_events.py", line 570, in _loop_reading
    self._read_fut = self._loop._proactor.recv(self._sock,
  File "C:\Program Files\Python310\lib\asyncio\windows_events.py", line 458, in recv
    ov.WSARecv(conn.fileno(), nbytes, flags)
OSError: [WinError 10022] An invalid argument was supplied

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Program Files\Python310\lib\asyncio\proactor_events.py", line 576, in _loop_reading
    self._protocol.error_received(exc)
  File ".....\code.py", line 23, in error_received
    self.done.set_exception(exc)
asyncio.exceptions.InvalidStateError: invalid state
closing
Traceback (most recent call last):
  File ".....\code.py", line 50, in <module>
    asyncio.run(send_magic_packet('test'))
  File "C:\Program Files\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Program Files\Python310\lib\asyncio\base_events.py", line 649, in run_until_complete
    return future.result()
  File ".....\code.py", line 45, in send_magic_packet
    await protocol.done
  File "C:\Program Files\Python310\lib\asyncio\proactor_events.py", line 530, in _loop_writing
    self._write_fut = self._loop._proactor.send(self._sock,
  File "C:\Program Files\Python310\lib\asyncio\windows_events.py", line 541, in send
    ov.WSASend(conn.fileno(), buf, flags)
OSError: [WinError 10057] A request to send or receive data was disallowed because the socket is not connected and (when sending on a datagram socket using a sendto call) no address was supplied

For reference, the corresponding sync function body for send_magic_packet() would be the below, which works fine:

# credit: https://github.com/remcohaszing/pywakeonlan/blob/main/wakeonlan/__init__.py
import socket
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
    if interface is not None:
        sock.bind((interface, 0))
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    sock.connect((ip_address, port))
    for packet in macs:
        sock.send(packet.encode())

Solution

  • Ok, found it. It was a cascade of problems.

    1. If you set allow_broadcast=True, then the socket is never connected to the remote end point. Hence the error. So transport.sendto(p.encode()) should be transport.sendto(p.encode(), remote_addr)
    2. That however doesn't work if you provide a remote_addr in the call to create_datagram_endpoint, the remote_addr in sendto() will be ignored.
    3. Furthermore, You need to make sure that the local address is resolved to an IPv4 address, or the Windows API will throw an invalid pointer error. The local address may well resolve to an IPv6 address when not provided.
    4. Lastly, I needed a working way to wait for all data to be sent. That required launching an async from the sync functions of my protocol implementation.

    So, code that send successfully is below. I am not so sure that my implementation of waiting for all data to be sent before returning is elegant, but it works. If someone knows a better way, please do let me know!

    import asyncio
    import socket
    from typing import Optional
    
    BROADCAST_IP = "255.255.255.255"
    DEFAULT_PORT = 9
    
    
    class _WOLProtocol(asyncio.DatagramProtocol):
        def __init__(self, remote_addr, *messages):
            self.remote_addr = remote_addr
            self.packets = messages
            self.done = asyncio.get_running_loop().create_future()
            self._waiter = None
            self.transport = None
    
        async def wait_until_sent(self):
            while True:
                await asyncio.sleep(0)
                if self.transport.get_write_buffer_size()==0:
                    break
    
            if not self.done.done():
                self.done.set_result(None)
    
        def connection_made(self, transport):
            self.transport = transport
            for p in self.packets:
                self.transport.sendto(p.encode(), (BROADCAST_IP,DEFAULT_PORT))
    
            self._waiter = asyncio.create_task(self.wait_until_sent())
    
        def error_received(self, exc):
            if not self.done.done():
                self.done.set_exception(exc)
    
        def connection_lost(self, exc):
            if not self.done.done():
                self.done.set_result(None)
    
    
    async def send_magic_packet(
        *macs: str,
        ip_address: str = BROADCAST_IP,
        port: int = DEFAULT_PORT,
        interface: Optional[str] = None
    ) -> None:
        loop = asyncio.get_running_loop()
        transport, protocol = await loop.create_datagram_endpoint(
            lambda: _WOLProtocol((ip_address, port), *macs),
            family=socket.AF_INET,
            proto=socket.IPPROTO_UDP,
            allow_broadcast = True,
            local_addr=(interface, 0) if interface else None
            )
    
        try:
            await protocol.done
        finally:
            transport.close()
    
    if __name__ == "__main__":
        asyncio.run(send_magic_packet('test','test2'))