I'm trying to aggregate messages in a list in order to use AmqpOutboundEndpoint.multiSend
option. I followed this solution but I use Kotlin DSL instead of XML. Here is the code sample:
@Configuration
@EnableIntegration
class SampleConfiguration {
@Bean
fun sampleFlow(amqpTemplate: AmqpTemplate): StandardIntegrationFlow {
return IntegrationFlow
.from("inputChannel")
.aggregate(Consumer<AggregatorSpec> {
it.releaseExpression("size() == 100")
.groupTimeout(1000)
.sendPartialResultOnExpiry(true)
.correlationExpression("T(Thread).currentThread().id")
.poller { p: PollerFactory -> p.fixedRate(1000).maxMessagesPerPoll(100) }
})
.handle(Amqp.outboundAdapter(amqpTemplate).exchangeName("sampleExchange").multiSend(true))
.get()
}
}
I'm getting a compilation error:
Overload resolution ambiguity:
public open fun aggregate(aggregator: Consumer<AggregatorSpec!>?): IntegrationFlowBuilder defined in org.springframework.integration.dsl.IntegrationFlowBuilder
public open fun aggregate(aggregatorProcessor: Any): IntegrationFlowBuilder defined in org.springframework.integration.dsl.IntegrationFlowBuilder
I couldn't find any way to overcome this problem. How can I configure an Aggregator using Kotlin DSL? I couldn't find any examples of a working code on the Internet.
Spring Integration version: 6.2.1
Kotlin version: 1.9.22
What you have so far is not a Kotlin DSL, but rather a Java API used in Kotlin code. There is indeed could be a problem with compatibility between those two languages, especially when we deal with lambdas.
And that's why we developed a dedicated Kotlin DSL for Spring Integration a while ago: https://docs.spring.io/spring-integration/reference/kotlin-dsl.html
So, something like this must work for you:
@Bean
fun sampleFlow(amqpTemplate: AmqpTemplate) =
integrationFlow("inputChannel") {
aggregate {
releaseExpression("size() == 100")
groupTimeout(1000)
sendPartialResultOnExpiry(true)
correlationExpression("T(Thread).currentThread().id")
poller { it.fixedRate(1000).maxMessagesPerPoll(100) }
}
handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("sampleExchange")
.multiSend(true))
}