pythontwistedtwisted.internet

return control to the transport


I'm trying to simulate a situation where data is received by a server periodically. In my set up, I run one process that sets up the server and another process that sets up a bunch of clients (suffices to think of a single client). I have set up some of the code by putting together bits and pieces mostly from here. The server/clients communicate by sending messages using transport.write. First, the server tells the clients to start (this works fine AFAIK). The clients report back to the server as they make progress. What has me confused is that I only get these intermittent messages at the very end when the client is done. This could be a problem with buffer flush and I tried (unsuccessfully) things like This. Also, each message is pretty large and I tried sending the same message multiple times so the buffer would get cleared.

I suspect what I am seeing is a problem with returning the control to the transport but I can't figure out how to do away with it.

Any help with this or any other issues that jump up to you is much appreciated.

Server:

from twisted.internet import reactor, protocol

import time
import serverSideAnalysis
import pdb
#import bson, json, msgpack
import _pickle as pickle  # I expect the users to authenticate and not 
                          # do anything malicious. 


PORT = 9000
NUM = 1
local_scratch="/local/scratch"


class Hub(protocol.Protocol):
  def __init__(self,factory, clients, nclients):
    self.clients = clients 
    self.nclients = nclients
    self.factory = factory
    self.dispatcher = serverSideAnalysis.ServerTalker(NUM, self, 
          local_scratch)

  def connectionMade(self):
    print("connected to user" , (self))
    if len(self.clients) < self.nclients:
      self.factory.clients.append(self)
    else:
      self.factory.clients[self.nclients] = self
    if len(self.clients) == NUM:
      val = input("Looks like everyone is here, shall we start? (Y/N)")
      while (val.upper() != "Y"):
        time.sleep(20)
        val = input("Looks like everyone is here, shall we start??? (Y/N)")
      message = pickle.dumps({"TASK": "INIT", "SUBTASK":"STORE"})
      self.message(message) # This reaches the client as I had expected

  def message(self, command):
    for c in self.factory.clients:
      c.transport.write(command)

  def connectionLost(self, reason):
    self.factory.clients.remove(self)
    self.nclients -= 1

  def dataReceived(self, data):
    if len(self.clients) == NUM:
      self.dispatcher.dispatch(data)

class PauseTransport(protocol.Protocol):
  def makeConnection(self, transport):
    transport.pauseProducing()

class HubFactory(protocol.Factory):
  def __init__(self, num):
    self.clients = set([])
    self.nclients = 0 
    self.totConnections = num

  def buildProtocol(self, addr):
    print(self.nclients)
    if self.nclients < self.totConnections:
      self.nclients += 1
      return Hub(self, self.clients, self.nclients)
    protocol = PauseTransport()
    protocol.factory = self
    return protocol

factory = HubFactory(NUM)
reactor.listenTCP(PORT, factory)
factory.clients = []
reactor.run()

Client:

from twisted.internet import reactor, protocol
import time
import clientSideAnalysis
import sys


HOST = 'localhost'
PORT = 9000
local_scratch="/local/scratch"

class MyClient(protocol.Protocol):

  def connectionMade(self):
    print("connected!")
    self.factory.clients.append(self)
    print ("clients are ", self.factory.clients)

    self.cdispatcher = clientSideAnalysis.ServerTalker(analysis_file_name, local_scratch, self)

  def clientConnectionLost(self, reason):
    #TODO send warning
    self.factory.clients.remove(self)

  def dataReceived(self, data): #This is the problematic part I think
    self.cdispatcher.dispatch(data)
    print("1 sent")
    time.sleep(10)
    self.cdispatcher.dispatch(data)
    print("2 sent")
    time.sleep(10)
    self.cdispatcher.dispatch(data)
    time.sleep(10)


  def message(self, data):
    self.transport.write(data)

class MyClientFactory(protocol.ClientFactory):
  protocol = MyClient

if __name__=="__main__":
  analysis_file_name = sys.argv[1]

  factory = MyClientFactory()
  reactor.connectTCP(HOST, PORT, factory)
  factory.clients = []
  reactor.run()

The last bit of relevant information about what the dispatchers do.

In both cases, they load the message that has arrived (a dictionary) and do a few computations based on the content. Every once in a while, they use the message method to communicate with thier current values.

Finally, I'm using python 3.6. and twisted 18.9.0


Solution

  • The way you return control to the reactor from a Protocol.dataReceived method is you return from that method. For example:

    def dataReceived(self, data):
        self.cdispatcher.dispatch(data)
        print("1 sent")
    

    If you want more work to happen after this, you have some options. If you want the work to happen after some amount of time has passed, use reactor.callLater. If you want the work to happen after it is dispatched to another thread, use twisted.internet.threads.deferToThread. If you want the work to happen in response to some other event (for example, data being received), put it in the callback that handles that event (for example, dataReceived).