I have a select.poll()
object for incoming messages of a socket, and a queue.Queue()
object that contains potential outgoing messages.
While both objects separately support waiting for a timeout without spending CPU cycles, I need to wait on both at the same time and resume the thread once either of them triggers. Is there a mechanism for that?
I have tried waiting on both after another in a loop with very short timeout, but this is basically busy-waiting and ended up costing too much CPU. I have also tried increasing the timeout, which frees up CPU as expected but increases latency too much.
It looks like select.poll()
can wait on any UNIX file descriptor, so perhaps another way of asking my question is whether there is a queue.Queue()
or threading.Event()
alternative that exposes a file descriptor that can be polled?
Current
import socket
import select
import queue
import threading
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('localhost', 4200))
incoming = select.poll()
incoming.register(socket, select.POLLIN)
outgoing = queue.Queue()
while True:
if incoming.poll(timeout=0.001):
read(socket)
try:
msg = outgoing.get(block=False, timeout=0.001)
send(socket, msg)
except queue.Empty:
pass
Desired
import socket
import queue
import threading
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('localhost', 4200))
outgoing = queue.Queue()
poller = HYBRID_POLLER() # TODO
poller.register(socket)
poller.register(outgoing)
while True:
available = poller.poll(timeout=1)
if socket in available:
read(socket)
if outgoing in available:
msg = outgoing.get(block=False, timeout=0.001)
send(socket, msg)
See chapter 12.13 in the Python Cookbook, 3rd Edition, which describes how to make a queue.Queue
instance "pollable" (a socket.socket
instance is already pollable):
import queue
import socket
import os
class PollableQueue(queue.Queue):
def __init__(self):
super().__init__()
# Create a pair of connected sockets
if os.name == 'posix':
self._putsocket, self._getsocket = socket.socketpair()
else:
# Compatibility on non-POSIX systems
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 0))
server.listen(1)
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._putsocket.connect(server.getsockname())
self._getsocket, _ = server.accept()
server.close()
def fileno(self):
return self._getsocket.fileno()
def put(self, item):
super().put(item)
self._putsocket.send(b'x')
def get(self):
self._getsocket.recv(1)
return super().get()
Then you can pass to select.select
a list containing your queue and socket instances:
import select
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('localhost', 4200))
outgoing = PollableQueue()
...
while True:
available, _, _ = select.select([outgoing, socket], [], [], timeout=1.0)
if socket in available:
read(socket)
if outgoing in available:
msg = outgoing.get()
send(socket, msg)
Is socket
the best choice to name your socket instance since it is a possible conflict with the module named socket
?