I'm currently trying to send a telemetry message via AMPQ Adaptor to the Hono Sandbox. All though i took over parts of the code sample seen in Hono Noth bridge example (which should work for the south bridge as well) I struggle a bit with the SASL as it seems.
Here is my code
from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
tenantId = 'xxxx'
deviceId = 'yyyyy'
devicePassword = 'my-secret-password'
class AmqpMessageSender(MessagingHandler):
def __init__(self, server, address):
super(AmqpMessageSender, self).__init__()
self.server = server
self.address = address
def on_start(self, event):
conn = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
event.container.create_sender(conn, self.address)
def on_sendable(self, event):
msg = Message(
address=f'{self.address}/{deviceId}',
content_type='application/json',
body={"temp": 5, "transport": "amqp"}
)
event.sender.send(self.msg)
event.sender.close()
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5671', f'telemetry/{tenantId}')).run()
If I run the code, I get a transport error with the context condition
'Expected SASL protocol header: no protocol header found (connection aborted)'
I tried also with port 5672 which got me a link error and using port 15672 (which actually is the north bridge port) which - to my surprise, didn't cause a SASL error but got me the expected "not authorized" error (as the device is not allowed to connect via the north bridge)
======= update=======
once more thank you for you time.
regarding a) since comments are rather limited here once agian the code as a answer to question. The code i use to simulate the device is as follwoing
from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
tenantId = 'xxx'
deviceId = 'yyy'
devicePassword = 'my-secret-password'
class AmqpMessageSender(MessagingHandler):
def __init__(self, server):
super(AmqpMessageSender, self).__init__()
self.server = server
def on_start(self, event):
print("In start")
conn = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
print("connection established")
event.container.create_sender(context=conn, target=None)
print("sender created")
def on_sendable(self, event):
print("In Msg send")
event.sender.send(Message(
address=f'telemetry',
properties={
'to': 'telemetry',
'content-type': 'application/json'
},
content_type='application/json',
body={"temp": 5, "transport": "amqp"}
))
event.sender.close()
event.connection.close()
print("Sender & connection closed")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()
To simulate a server I do not use the java client, but use the sample code from the python quick start example as well. I have also a client class that does the http call as in the python quick start example an to that the server class reacts and prints the message - so the server implementation as outlined below should be ok from my understanding:
from __future__ import print_function, unicode_literals
import threading
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
amqpNetworkIp = "hono.eclipseprojects.io"
tenantId = 'xxx'
class AmqpReceiver(MessagingHandler):
def __init__(self, server, address, name):
super(AmqpReceiver, self).__init__()
self.server = server
self.address = address
self._name = name
def on_start(self, event):
conn = event.container.connect(self.server, user="consumer@HONO", password="verysecret")
event.container.create_receiver(conn, self.address)
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_message(self, event):
print(self._name)
print("Got a message:")
print(event.message.body)
class CentralServer:
def listen_telemetry(self, name):
uri = f'amqp://{amqpNetworkIp}:15672'
address = f'telemetry/{tenantId}'
self.container = Container(AmqpReceiver(uri, address, name))
print("Starting (northbound) AMQP Connection...")
self.thread = threading.Thread(target=lambda: self.container.run(), daemon=True)
self.thread.start()
time.sleep(2)
def stop(self):
# Stop container
print("Stopping (northbound) AMQP Connection...")
self.container.stop()
self.thread.join(timeout=5)
CentralServer().listen_telemetry('cs1')
after another day trying i couldn't find what i do wrong i really hope you see where i miss something :)
br Armin
The AMQP protocol adapter requires devices to send messages via an anonymous terminus.
In your code, this means that the on_start
method needs to be changed to contain event.container.create_sender(context=conn, target=None)
.
In any case, the non-TLS port of the AMQP adapter is 5672, so you should use amqp://hono.eclipseprojects.io:5672
as the server address. The second parameter to the constructor (telemetry
) is irrelevant and can be removed.
Also make sure that you have a consumer running for your tenant. Otherwise, the sender will not get any credits for actually sending messages ...
Edited Oct. 21st, 2021
This code works for me ...
class AmqpMessageSender(MessagingHandler):
def __init__(self, server):
super(AmqpMessageSender, self).__init__()
self.server = server
def on_start(self, event):
print("In start")
conn = event.container.connect(
url=self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
print("connection established")
event.container.create_sender(context=conn, target=None)
print("sender created")
def on_sendable(self, event):
print("In Msg send")
event.sender.send(Message(
address=f'telemetry',
content_type='application/json',
body="{\"temp\": 5, \"transport\": \"amqp\"}"
))
event.sender.close()
event.connection.close()
print("Sender & connection closed")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
print(event)
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()