pythonthriftassertionthrift-protocol

Why Thrift raises AssertionError when requested by multiple threads?


i'm building a kind of simulator that uses thrift protocol. But when im executing multiple threads of my virtual equipments sending messages, the program breaks after a short time by receiving them, think the buffer is overloaded or something like that, or not, so i'm here asking for some help if its possible.

Here's the main pieces of my code

A class for threading:

class ThreadManager (threading.Thread):    
   def __init__(self, name, obj, client, layout):
      threading.Thread.__init__(self)
      self.name = name
      self.obj = obj
      self.client = client
      self.layout = layout


   def run(self):
       print ("Starting " + self.name)           
       while True:
           sleep(2)
           self.obj.auto_gen_msg(self.client, layout=self.layout)

The method for generating messages:

def auto_gen_msg(self, client, layout='',  min_delay=15, max_delay=30):

        if not layout:
            msg = self.gen_message(self.draw_random_model())
        else:
            msg = self.gen_message(layout)
        wait = randint(min_delay, max_delay)
        sleep(wait)

        print(self.eqp_type, " delivered a message ...")
        getattr(client, msg[0])(*msg[1])

The main:

def start(layout, equipment, number):

    try:
        host = 'localhost'

        transport = TSocket.TSocket(host, port=9090)

        transport = TTransport.TBufferedTransport(transport)

        protocol = TCompactProtocol.TCompactProtocol(transport)

        client = SuiteService.Client(protocol)

        transport.open()

        equips = [Equipment(equipment) for i in range(number)]

        threads = [ThreadManager(i.eqp_type, i, client, layout) for i in equips]

        for i in range(len(threads)):
            threads[i].start()
            sleep(2)

        while True:
            pass


        transport.close() 

    except Thrift.TException as tx:
        print ("%s " % (tx.message))

The error haunting me:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/lem4fia/Documents/sg/loki/loki-thrift/loki_thrift/loki_thrift/lib/thread_manager.py", line 39, in run
    self.obj.auto_gen_msg(self.client, layout=self.layout)
  File "/Users/lem4fia/Documents/sg/loki/loki-thrift/loki_thrift/loki_thrift/lib/virtual.py", line 281, in auto_gen_msg
    getattr(client, msg[0])(*msg[1])
  File "/Users/lem4fia/Documents/sg/loki/thrift-server/thrift_server/suite/SuiteService.py", line 4895, in v1
    self.send_v1(ir, ts, ch, o1, o2, o3, o4, o5, o6, o7)
  File "/Users/lem4fia/Documents/sg/loki/thrift-server/thrift_server/suite/SuiteService.py", line 4899, in send_v1
    self._oprot.writeMessageBegin('v1', TMessageType.CALL, self._seqid)
  File "/Users/lem4fia/Documents/sg/loki/lokiv/lib/python3.6/site-packages/thrift-0.11.0-py3.6-macosx-10.6-intel.egg/thrift/protocol/TCompactProtocol.py", line 156, in writeMessageBegin
    assert self.state == CLEAR
AssertionError

Curiously, it doesnt bug if instancing 2 virtual equipments in thread, but 10 virtual equipments (sometimes less than this) is sufficient to raise this error.

Can someone please gimme a light? :)


Solution

  • The problem there is that it seems that you have to use one diferent Transport object for each thread. This is probably related to Thrift's implementation!

    Reference here : http://grokbase.com/t/thrift/user/134s16ks4m/single-connection-and-multiple-threads