I have a bit of a complicated technology stack. I am leveraging Netflix DGS to provide a GraphQL service. Behind the scenes are a bunch of JMS components sending and receiving data from various services. I have everything working outside of a GraphQL subscription.
Specifically what I am trying to do is is create a GraphQL subscription for messages from an ActiveMQ topic.
So I have a SubscriptionDataFetcher as follows:
@DgsComponent
public class SurveyResultsSubscriptionDataFetcher {
private final Publisher<SurveyResult> surveyResultsReactiveSource;
@Autowired
public SurveyResultsSubscriptionDataFetcher(Publisher<SurveyResult> surveyResultsReactiveSource) {
this.surveyResultsReactiveSource = surveyResultsReactiveSource;
}
@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
public Publisher<SurveyResult> surveyResults() {
return surveyResultsReactiveSource;
}
}
Inside my Spring configuration, I am using the following Spring Integration Flow:
@Bean
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Publisher<SurveyResult> surveyResultsReactiveSource() {
SurveyResultMessageConverter converter = new SurveyResultMessageConverter();
return Flux.from(
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher())
.map((message) -> converter.fromMessage(message, SurveyResult.class));
}
I will say a few things:
@JmsListener
that is receiving these messages off the topicWhen I connect the client to the subscription, I see the following logs:
PublishSubscribeChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
DgsWebSocketHandler : Subscription started for 1
I suspect that the message listener container isn't activated when the web socket connection is established. Am I supposed to "activate" the channel adapter? What am I missing?
Tech Stack:
// spring boots - version 2.4.3
implementation "org.springframework.boot:spring-boot-starter-web"
implementation "org.springframework.boot:spring-boot-starter-activemq"
implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive"
implementation "org.springframework.boot:spring-boot-starter-integration"
implementation 'org.springframework.boot:spring-boot-starter-security'
// spring integration
implementation group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.4.4'
// dgs
implementation "com.netflix.graphql.dgs:graphql-dgs-spring-boot-starter:3.10.2"
implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:3.10.2'
Update 1:
For what its worth, if I update the subscription to the following I get results on the client side.
@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
public Publisher<SurveyResult> surveyResults() {
// repository is a ReactiveMongoRepository
return repository.findAll();
}
Update 2:
This is the finalized bean in case it helps someone out based on accepted solution. I needed the listener on a topic, not a queue.
@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
SurveyResultMessageConverter converter = new SurveyResultMessageConverter();
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(), surveyDestination).pubSubDomain(true))
.jmsMessageConverter(converter))
.toReactivePublisher();
}
Your problem is here:
return Flux.from(
IntegrationFlows.from(
And the framework just doesn't see that inner IntegrationFlow
instance to parse and register properly.
To make it working you need to consider to declare that IntegrationFlow
as a top-level bean.
Something like this:
@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
.log(LoggingHandler.Level.DEBUG)
.transform([transform to SurveyResult])
.toReactivePublisher();
}
Now the framework knows that this logical IntegrationFlow
container has to be parsed and all the beans have to be registered and started.
You probably need to rethink your SurveyResultMessageConverter
logic to a plain transform()
if you can't supply a Jms.messageDrivenChannelAdapter
with your converter.
Then in your SurveyResultsSubscriptionDataFetcher
you just need to have:
return surveyResultsReactiveSource.map(Message::getPayload);