I have several tutorials working with Spring Boot and RPC through RabbitMQ. However, as soon as I attempt to add a Jackson JSON message converter, it all falls to pieces.
The remote invocation is successfully received by the server, so I feel pretty confident it's not the client configuration.
Exchange DATAFLOW_EXCHANGE
Routing Key dataflowRunner
Redelivered ○
Properties
reply_to: amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAAr0wAAAAAB.MmIZ6Htejtc1qB11G7BBQw==
priority: 0
delivery_mode: 2
headers:
__TypeId__: org.springframework.remoting.support.RemoteInvocation
content_encoding: UTF-8
content_type: application/json
Payload
675 bytes
Encoding: string
{"methodName":"run","parameterTypes":["dw.dataflow.Dataflow"],"arguments":[{ Valid Dataflow JSON Removed for Brevity } ]}
However, the following exception is output:
Caused by: org.springframework.messaging.converter.MessageConversionException:
No converter found to convert to class dw.dataflow.Dataflow, message=GenericMessage
[payload=RemoteInvocation: method name 'run'; parameter types [dw.dataflow.Dataflow], headers={amqp_receivedExchange=DATAFLOW_EXCHANGE, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAArRAAAAAQC.PA/bJ6lcUfaP3csAP5v5NA==, amqp_consumerQueue=DATAFLOW_QUEUE, amqp_redelivered=false, amqp_receivedRoutingKey=dataflowRunner, amqp_contentEncoding=UTF-8, amqp_deliveryMode=PERSISTENT, id=adb37c77-c0da-16bd-8df4-b739cfddf89f, amqp_consumerTag=amq.ctag-N_tFCc_Hp9UtQkiXl7FZ8g, contentType=application/json, __TypeId__=org.springframework.remoting.support.RemoteInvocation, timestamp=1462560945203}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:118)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:98)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:112)
... 12 common frames omitted
So, on delivery, it KNOWS it should be a dw.dataflow.Dataflow object, it just can't find a converter. However, I have my converter defined EVERYWHERE.
Server Configuration
@Configuration
@EnableRabbit
public class RabbitListenerConfiguration {
@Autowired
ConnectionFactory connectionFactory;
@Autowired
ObjectMapper jacksonObjectMapper;
@Bean
public TopicExchange exchange() {
return new TopicExchange("DATAFLOW_EXCHANGE", true, false);
}
@Bean
public Queue queue() {
return new Queue("DATAFLOW_QUEUE", true);
}
@Bean
public AmqpInvokerServiceExporter amqpInvokerServiceExporter() {
AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter() ;
exporter.setAmqpTemplate(rabbitTemplate());
exporter.setMessageConverter(jackson2JsonMessageConverter());
exporter.setServiceInterface(DataflowRunner.class);
exporter.setService(dataflowRunner());
return exporter ;
}
@Bean
public DataflowRunner dataflowRunner() {
return new DataflowRunnerServerImpl();
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setJsonObjectMapper(jacksonObjectMapper);
return converter;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jackson2JsonMessageConverter());
return template;
}
@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setDefaultRequeueRejected(false);
return factory;
}
Here is the Service interface:
public interface DataflowRunner {
String run(Dataflow dataflow) throws Exception;
}
And concrete implementation:
public class DataflowRunnerServerImpl implements DataflowRunner {
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues="DATAFLOW_QUEUE")
public String run(Dataflow dataflow) throws Exception {
// SNIP
}
For grins and giggles, I also attempted to configure the server implementation class with the following annotations, but it has the same error:
@RabbitHandler
@RabbitListener(
bindings = @QueueBinding(key = "dataflowRunner",
value = @Queue(value = "DATAFLOW_QUEUE", durable = "true", autoDelete = "false", exclusive = "false"),
exchange = @Exchange(value = "DATAFLOW_EXCHANGE", durable = "true", autoDelete = "false", type = "topic")) )
public String run(Dataflow dataflow) throws Exception {
Client Configuration
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUser);
connectionFactory.setPassword(rabbitPassword);
connectionFactory.setAddresses(rabbitAddresses);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jackson2MessageConverter());
return template;
}
Does anything seem incorrectly configured? What am I missing? I have the converter set on the service exporter, and the listener container factory.
Any help and/or thoughts appreciated.
@RabbitListener
is not intended to be used with the service exporter - just a plain Java class.
For Spring Remoting over RPC, the service exporter is the MessageListener
for a SimpleMessageListenerContainer
.
With @RabbitListener
, there's a special listener adapter that wraps the pojo method.
So you seem to be mixing two different paradigms.
The ServiceExporter
(Spring remoting) is expected to be paired with a AmqpProxyFactoryBean
on the client side with the service exporter as the listener on the server side.
For simple POJO RPC (which is much newer than using Spring Remoting over RabbitMQ), use @RabbitListener
and RabbitTemplate.convertSendAndReceive()
on the client side. Get rid of the PFB and SE.
Can you explain what led you down this path, in case we need to add some clarification to the documentation.
EDIT
If you do want to use Spring Remoting (inject an interface on the client side and have it "magically" invoke a service on the server side), you need to get rid of all the container factory stuff and simply wire up a SimpleMessageListenerContainer
and inject the service exporter as the MessageListener
.
The reference manual has an XML example but you can wire up the SMLC as a @Bean
.
EDIT2
I have run some tests and Spring Remoting over AMQP doesn't work with JSON because the top level object is a RemoteInvocation
- while the message converter can re-create that object, it has no type information about the actual arguments so leaves it as a linked hash map.
For now, if you must use JSON, the template convertSendAndReceive
in conjunction with the @RabbitListener
is the way to go here. I will open a JIRA issue to see if we can address using Spring Remoting RPC with JSON, but it was really designed for Java Serialization.