I am constantly getting this heartbeat timeout issue, and the receiver sits idle then. The skeleton of my code is like:
def connect_and_subscribe(conn):
the_id = 1111
user = 'my_user'
password = 'my_password'
destination = 'my_destination'
subscription_name = 'my_subscriber'
conn.connect(login=user,passcode=password, wait=True, wait_time=120, headers = {'client-id': 'xxxx'})
# activemq.subscriptionName ensures durable connection
conn.subscribe(destination=destination, id=the_id, ack='auto',
persistent=True,
headers = {"activemq.subscriptionName":subscription_name, "activemq.persistent":"true"})
class MyListener(stomp.ConnectionListener):
def __init__(self, conn, queue):
self.conn = conn
self.count = 0
self.errors = 0
self.stop = False
self.queue = queue
def on_error(self, message):
print('Received an error %s' % message)
self.stop = True
def on_message(self, message):
if message == "SHUTDOWN":
diff = time.time() - self.start
print("Received %s in %f seconds" % (self.count, diff))
print("Receiver shutdown")
self.stop = True
else:
item = (self.count, message)
self.queue.put(item)
self.count += 1
def on_disconnect(self):
print_log('Disconnected, going to restart ...')
connect_and_subscribe(self.conn)
self.stop = False
# producer task
def producer(queue):
host = "my_host"
port = my_port
destination = "my_destination"
heartbeats = 4000
subscription_name = "my_subscriber"
conn = stomp.Connection12(host_and_ports = [(host, port)], timeout=120, heartbeats=(heartbeats, heartbeats))
listener = MyListener(conn, queue)
conn.set_listener('', listener)
print('Listen to ' + host + ':' + str(port) + ' at destination:' + destination)
print ('Subscription name:' + subscription_name + ' with heartbeats:' + str(heartbeats))
connect_and_subscribe(conn)
while not listener.stop:
time.sleep(30)
if check_stop(): # a signal will be given if I want to stop the process
print('Listener is stopped')
listener.stop = True
print('Producer is stopped')
print('Producer has received ' + str(listener.count) + ' messages in total')
queue.put(None)
conn.disconnect()
print('Producer: Done')
I got this error after the a long while no messages are sent from the destination:
heartbeat timeout: diff_receive=6.01600000000326, time=372544.906, lastrec=372538.89
So how can I prevent this timeout? Should I simply increase the heartbeat?
Secondly, how can I capture this heartbeat timeout? Then I can re-connect and re-subscribe.
Third, why is the on_disconnect
function is not triggered when the timeout happens?
I let the Producer to check the connectivity frequently. If the connection is lost, then re-connect. This solves my problem.
while not listener.stop:
time.sleep(30)
if check_stop(): # a signal will be given if I want to stop the process
print('Listener is stopped')
listener.stop = True
else:
if not conn.is_connected():
print('Producer is disconnected and found by is_connected')
connect_and_subscribe(conn)
print('Producer reconnects')