pythontimeoutstompheartbeatstomp.py

How to capture heartbeat timeout using Python Stomp connection


I am constantly getting this heartbeat timeout issue, and the receiver sits idle then. The skeleton of my code is like:

def connect_and_subscribe(conn):
    the_id = 1111
    user = 'my_user'
    password = 'my_password'
    destination = 'my_destination'
    subscription_name = 'my_subscriber'

    conn.connect(login=user,passcode=password, wait=True, wait_time=120, headers = {'client-id': 'xxxx'})
    # activemq.subscriptionName ensures durable connection
    conn.subscribe(destination=destination, id=the_id, ack='auto', 
                   persistent=True, 
                   headers = {"activemq.subscriptionName":subscription_name, "activemq.persistent":"true"})

class MyListener(stomp.ConnectionListener):
  
  def __init__(self, conn, queue):
    self.conn = conn
    self.count = 0
    self.errors = 0
    self.stop = False
    self.queue = queue
  
  def on_error(self, message):
    print('Received an error %s' % message)
    self.stop = True
    
  def on_message(self, message):
    if message == "SHUTDOWN":
        diff = time.time() - self.start
        print("Received %s in %f seconds" % (self.count, diff))
        print("Receiver shutdown")
        self.stop = True
      
    else:
        item = (self.count, message)
        self.queue.put(item)        
        self.count += 1
        
  def on_disconnect(self):
      print_log('Disconnected, going to restart ...')
      connect_and_subscribe(self.conn)
      self.stop = False

# producer task
def producer(queue):
    host = "my_host"
    port = my_port
    destination = "my_destination"
    heartbeats = 4000
    subscription_name = "my_subscriber"
    
    conn = stomp.Connection12(host_and_ports = [(host, port)], timeout=120, heartbeats=(heartbeats, heartbeats))
    listener = MyListener(conn, queue)
    conn.set_listener('', listener)

    print('Listen to ' + host + ':' + str(port) + ' at destination:' + destination)
    print ('Subscription name:' + subscription_name + ' with heartbeats:' + str(heartbeats))
    connect_and_subscribe(conn)
    
    while not listener.stop:
        time.sleep(30)
        if check_stop(): # a signal will be given if I want to stop the process
            print('Listener is stopped')
            listener.stop = True
                
    print('Producer is stopped')    
    print('Producer has received ' + str(listener.count) + ' messages in total')
    queue.put(None)
    conn.disconnect()
    print('Producer: Done')

I got this error after the a long while no messages are sent from the destination:

heartbeat timeout: diff_receive=6.01600000000326, time=372544.906, lastrec=372538.89

So how can I prevent this timeout? Should I simply increase the heartbeat?

Secondly, how can I capture this heartbeat timeout? Then I can re-connect and re-subscribe.

Third, why is the on_disconnect function is not triggered when the timeout happens?


Solution

  • I let the Producer to check the connectivity frequently. If the connection is lost, then re-connect. This solves my problem.

    while not listener.stop:
            time.sleep(30)
            if check_stop(): # a signal will be given if I want to stop the process
                print('Listener is stopped')
                listener.stop = True
            else:
                if not conn.is_connected():
                    print('Producer is disconnected and found by is_connected')
                    connect_and_subscribe(conn)
                    print('Producer reconnects')