I'm trying to simulate a situation where data is received by a server periodically. In my set up, I run one process that sets up the server and another process that sets up a bunch of clients (suffices to think of a single client). I have set up some of the code by putting together bits and pieces mostly from here. The server/clients communicate by sending messages using transport.write. First, the server tells the clients to start (this works fine AFAIK). The clients report back to the server as they make progress. What has me confused is that I only get these intermittent messages at the very end when the client is done. This could be a problem with buffer flush and I tried (unsuccessfully) things like This. Also, each message is pretty large and I tried sending the same message multiple times so the buffer would get cleared.
I suspect what I am seeing is a problem with returning the control to the transport but I can't figure out how to do away with it.
Any help with this or any other issues that jump up to you is much appreciated.
Server:
from twisted.internet import reactor, protocol
import time
import serverSideAnalysis
import pdb
#import bson, json, msgpack
import _pickle as pickle # I expect the users to authenticate and not
# do anything malicious.
PORT = 9000
NUM = 1
local_scratch="/local/scratch"
class Hub(protocol.Protocol):
def __init__(self,factory, clients, nclients):
self.clients = clients
self.nclients = nclients
self.factory = factory
self.dispatcher = serverSideAnalysis.ServerTalker(NUM, self,
local_scratch)
def connectionMade(self):
print("connected to user" , (self))
if len(self.clients) < self.nclients:
self.factory.clients.append(self)
else:
self.factory.clients[self.nclients] = self
if len(self.clients) == NUM:
val = input("Looks like everyone is here, shall we start? (Y/N)")
while (val.upper() != "Y"):
time.sleep(20)
val = input("Looks like everyone is here, shall we start??? (Y/N)")
message = pickle.dumps({"TASK": "INIT", "SUBTASK":"STORE"})
self.message(message) # This reaches the client as I had expected
def message(self, command):
for c in self.factory.clients:
c.transport.write(command)
def connectionLost(self, reason):
self.factory.clients.remove(self)
self.nclients -= 1
def dataReceived(self, data):
if len(self.clients) == NUM:
self.dispatcher.dispatch(data)
class PauseTransport(protocol.Protocol):
def makeConnection(self, transport):
transport.pauseProducing()
class HubFactory(protocol.Factory):
def __init__(self, num):
self.clients = set([])
self.nclients = 0
self.totConnections = num
def buildProtocol(self, addr):
print(self.nclients)
if self.nclients < self.totConnections:
self.nclients += 1
return Hub(self, self.clients, self.nclients)
protocol = PauseTransport()
protocol.factory = self
return protocol
factory = HubFactory(NUM)
reactor.listenTCP(PORT, factory)
factory.clients = []
reactor.run()
Client:
from twisted.internet import reactor, protocol
import time
import clientSideAnalysis
import sys
HOST = 'localhost'
PORT = 9000
local_scratch="/local/scratch"
class MyClient(protocol.Protocol):
def connectionMade(self):
print("connected!")
self.factory.clients.append(self)
print ("clients are ", self.factory.clients)
self.cdispatcher = clientSideAnalysis.ServerTalker(analysis_file_name, local_scratch, self)
def clientConnectionLost(self, reason):
#TODO send warning
self.factory.clients.remove(self)
def dataReceived(self, data): #This is the problematic part I think
self.cdispatcher.dispatch(data)
print("1 sent")
time.sleep(10)
self.cdispatcher.dispatch(data)
print("2 sent")
time.sleep(10)
self.cdispatcher.dispatch(data)
time.sleep(10)
def message(self, data):
self.transport.write(data)
class MyClientFactory(protocol.ClientFactory):
protocol = MyClient
if __name__=="__main__":
analysis_file_name = sys.argv[1]
factory = MyClientFactory()
reactor.connectTCP(HOST, PORT, factory)
factory.clients = []
reactor.run()
The last bit of relevant information about what the dispatchers do.
In both cases, they load the message that has arrived (a dictionary) and do a few computations based on the content. Every once in a while, they use the message
method to communicate with thier current values.
Finally, I'm using python 3.6. and twisted 18.9.0
The way you return control to the reactor from a Protocol.dataReceived method is you return from that method. For example:
def dataReceived(self, data):
self.cdispatcher.dispatch(data)
print("1 sent")
If you want more work to happen after this, you have some options. If you want the work to happen after some amount of time has passed, use reactor.callLater
. If you want the work to happen after it is dispatched to another thread, use twisted.internet.threads.deferToThread
. If you want the work to happen in response to some other event (for example, data being received), put it in the callback that handles that event (for example, dataReceived
).