javarabbitmqamqpopennms

OpenNMS v18 AMQP Message Sending Issue


I cannot get OpenNMS to send messages to an AMQP end point. I have never had this working and this is the first time i have used OpenNMS and AMQP so it could be down my inexperience.

I have configured RabbitMQ 3.5.7 and tested it as per this question . It works fine when using an external QPID 0.32 client and also works fine when using either python or perl. Definition of works fine is that communications are established and the message payload is transmitted into an exchange and then delivered into a backend queue. Message can then be viewed within the RabbitMQ Admin GUI.

In OpenNMS i have been following the instructs here Started with the EventForwarder and then tried the AlarmNorthbounder both yield a NullPointerException.

I set up Karaf as follows setting the properties using these statements:-

opennms> config:edit org.opennms.features.amqp.alarmnorthbounder
opennms> propset connectionUrl amqp://simon:simon@/test?brokerlist=\'localhost:5672\'
opennms> propset destination "amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }"
opennms> propset processorName default-alarm-northbounder-processor
opennms> config:update
opennms> config:list '(service.pid=org.opennms.features.amqp.alarmnorthbounder)'
----------------------------------------------------------------
Pid:            org.opennms.features.amqp.alarmnorthbounder
BundleLocation: mvn:org.opennms.features.amqp/org.opennms.features.amqp.alarm-northbounder/18.0.0
Properties:
   connectionUrl = amqp://simon:simon@/test?brokerlist='localhost:5672'
   destination = amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }
   felix.fileinstall.filename = file:/usr/share/opennms/etc/org.opennms.features.amqp.alarmnorthbounder.cfg
   processorName = default-alarm-northbounder-processor
   service.pid = org.opennms.features.amqp.alarmnorthbounder

I get the following error message in the logs

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[forwardAlarm      ] [forwardAlarm      ] [seda://forwardAlarm                                                           ] [         8]
[forwardAlarm      ] [convertBodyTo3    ] [convertBodyTo[org.opennms.netmgt.alarmd.api.NorthboundAlarm]                  ] [         0]
[forwardAlarm      ] [log3              ] [log                                                                           ] [         1]
[forwardAlarm      ] [bean3             ] [bean[ref:dynamicallyTrackedProcessor]                                         ] [         0]
[forwardAlarm      ] [to3               ] [amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }                 ] [         7]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
        Id                  ID-ubuntu-1604-35241-1465752556843-2-60
        ExchangePattern     InOnly
        Headers             {breadcrumbId=ID-ubuntu-1604-35241-1465752556843-2-58, CamelRedelivered=false, CamelRedeliveryCounter=0}
        BodyType            String
        Body                NorthboundAlarm[id=3, uei='uei.opennms.org/generic/traps/EnterpriseDefault', nodeId=1]
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.NullPointerException
        at org.apache.qpid.client.BasicMessageProducer_0_8.declareDestination(BasicMessageProducer_0_8.java:63)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.BasicMessageProducer.<init>(BasicMessageProducer.java:136)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.BasicMessageProducer_0_8.<init>(BasicMessageProducer_0_8.java:55)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:559)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:62)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]

My expectation is that it should be able to deliver the message to the queue using the same libraries as i am using externally.

Putting DEBUG on for org.apache.qpid i get:-

2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: FieldTable::writeToBuffer: Writing encoded length of 254...
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: {instance=[LONG_STRING: ubuntu-16041465757105204], product=[LONG_STRING: qpid], version=[LONG_STRING: 0.28], platform=[LONG_STRING: Java(TM) SE Runtime Environment, 1.8.0_45-b14, Oracle Corporation, amd64, Linux, 4.4.0-22-generic, unknown], qpid.client_process=[LONG_STRING: Qpid Java Client], qpid.client_pid=[INT: 1318]}
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionTuneBodyImpl: channelMax=0, frameMax=131072, heartbeat=60]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.handler.ConnectionTuneMethodHandler: ConnectionTune frame received
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 3 name: CONNECTION_NOT_OPENED from old state AMQState: id = 2 name: CONNECTION_NOT_TUNED
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionOpenOkBodyImpl: knownHosts=null]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 4 name: CONNECTION_OPEN from old state AMQState: id = 3 name: CONNECTION_NOT_OPENED
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,726 INFO  org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connection 44 now connected from /127.0.0.1:45388 to localhost/127.0.0.1:5672
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Are we connected:true
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connected with ProtocolHandler Version:0-91
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnectionDelegate_8_0: Write channel open frame for channel id 1
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Created session:org.apache.qpid.client.AMQSession_0_8@179483f0
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ChannelOpenOkBodyImpl: channelId=null]
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [BasicQosOkBodyImpl: ]
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQDestination: Based on onms3/Simon;{'create':'always','node':{'type':'topic'} } the selected destination syntax is ADDR
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Closing session: org.apache.qpid.client.AMQSession_0_8@179483f0
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.protocol.AMQProtocolSession: closeSession called on protocol session for session 1

It closes the session before writing any information.

When i do essentially the same thing from external qpid client - executed as follows:-

#!/bin/bash

rm log.out
java -Dqpid.amqp.version=0-91 -Dlog4j.debug -Dlog4j.configuration=file:./log4j.properties -cp "client/example/target/classes/:client/example/target/dependency/*:slf4j-1.7.21/slf4j-log4j12-1.7.
21.jar:apache-log4j-1.2.17/log4j-1.2.17.jar" \
    org.apache.qpid.example.ListSender

I get this:-

142  [main] DEBUG org.apache.qpid.client.AMQConnection  - Are we connected:true
142  [main] DEBUG org.apache.qpid.client.AMQConnection  - Connected with ProtocolHandler Version:0-91
146  [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0  - Write channel open frame for channel id 1
162  [main] DEBUG org.apache.qpid.client.AMQSession  - Created session:org.apache.qpid.client.AMQSession_0_8@6bdf28bb
164  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ChannelOpenOkBody]
165  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [BasicQosOkBodyImpl: ]
173  [main] DEBUG org.apache.qpid.client.AMQDestination  - Based on onms3/Simon;{create: always, node:{type: topic } } the selected destination syntax is ADDR
177  [main] DEBUG org.apache.qpid.framing.FieldTable  - FieldTable::writeToBuffer: Writing encoded length of 0...
178  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
179  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
179  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
180  [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8  - MessageProducer org.apache.qpid.client.BasicMessageProducer_0_8@1936f0f5 using publish mode : ASYNC_PUBLISH_ALL
190  [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8  - Sending content body frames to 'onms3'/'Simon'; {
  'create': 'always',
  'node': {
    'type': 'topic'
  }
}
190  [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8  - Sending content header frame to 'onms3'/'Simon'; {
  'create': 'always',
  'node': {
    'type': 'topic'
  }
}
190  [main] DEBUG org.apache.qpid.framing.FieldTable  - FieldTable::writeToBuffer: Writing encoded length of 90...
191  [main] DEBUG org.apache.qpid.framing.FieldTable  - {Id=[INT: 987654321], name=[LONG_STRING: WidgetSimon], price=[DOUBLE: 0.99], qpid.subject=[LONG_STRING: Simon], JMS_QPID_DESTTYPE=[INT: 2]}
192  [main] DEBUG org.apache.qpid.client.AMQSession  - Closing session: org.apache.qpid.client.AMQSession_0_8@6bdf28bb
192  [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolSession  - closeSession called on protocol session for session 1
194  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ChannelCloseOkBody]
194  [IoReceiver - localhost/127.0.0.1:5672] INFO  org.apache.qpid.client.handler.ChannelCloseOkMethodHandler  - Received channel-close-ok for channel-id 1
195  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ConnectionCloseOkBody]
196  [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - Session closed called by client

ListSender.java as follows:-

package org.apache.qpid.example;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;

import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.ListMessage;


public class ListSender {

    public static void main(String[] args) throws Exception
    {
        Connection connection =
            new AMQConnection("amqp://simon:simon@localhost/test?brokerlist='tcp://localhost:5672'");
                                                  AMQShortString a1 = new AMQShortString("");
                                                   AMQShortString a2 = new AMQShortString("");
        AMQShortString[] bindvars = new AMQShortString[]{a1,a2};
        boolean is_durable = true;
/*      
        Destination queue = new AMQAnyDestination( new AMQShortString("onms2"), 
                                                   new AMQShortString("direct"),
                                                   new AMQShortString("Simon"),
                                                   true,        
                                                   true,        
                                                   new AMQShortString(""),
                                                   false,       
                                                   bindvars);
        */

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //Destination queue = new AMQAnyDestination("onms3/Simon");
        Destination queue = new AMQAnyDestination("onms3/Simon;{create: always, node:{type: topic } }");
        //Destination queue = new AMQAnyDestination("onms3/Simon;%7Bcreate%3A%20always%2C%20node%3A%7Btype%3A%20topic%20%7D%20%7D");
        //Destination queue = new AMQAnyDestination("amqp:OpenNMSExchange/Taylor; {create: always}");
        //Destination queue = new AMQAnyDestination("OpenNMSExchange; {create: always}");
        MessageProducer producer = session.createProducer(queue);

ListMessage m = ((org.apache.qpid.jms.Session)session).createListMessage();
        m.setIntProperty("Id", 987654321);
        m.setStringProperty("name", "WidgetSimon");
        m.setDoubleProperty("price", 0.99);

        List<String> colors = new ArrayList<String>();
        colors.add("red");
        colors.add("green");
        colors.add("white");
        m.add(colors);

        Map<String,Double> dimensions = new HashMap<String,Double>();
        dimensions.put("length",10.2);
        dimensions.put("width",5.1);
        dimensions.put("depth",2.0);
        m.add(dimensions);
        List<List<Integer>> parts = new ArrayList<List<Integer>>();
        parts.add(Arrays.asList(new Integer[] {1,2,5}));
        parts.add(Arrays.asList(new Integer[] {8,2,5}));
        m.add(parts);

        Map<String,Object> specs = new HashMap<String,Object>();
        specs.put("colours", colors);
        specs.put("dimensions", dimensions);
        specs.put("parts", parts);
        m.add(specs);

        producer.send((Message)m);
        System.out.println("Sent: " + m);
        connection.close();
    }

}

My assumption was that this was a connectivity issue to the AMQP server of some description. Having faced a number of issues whilst troubleshooting this if i faced the issue from the external jar it was clear from the logs what the issue was. Is this an issue with OpenNMS? Does anyone have this working successfully? Any ideas?

Cheers

Simon


Solution

  • Following guidance from the OpenNMS list I decided to install the QPID Broker v6.0.3 in place of RabbitMQ and messages are flowing no problems now.