Hi I'm trying to set headers in the rabbit messages with the library akka-stream-alpakka-amqp
in scala but I can't be able to make it work and reading the repository I can't see any reference or method to add headers in the messages.
I was trying to add the headers property in the arguments but i can't make it work
val exchangeWitHeaders =
RabbitIntegrationConstants.queueDeclaration.withArguments(Map("x-match" -> "all", "h1" -> "header"))
val writeSettings = AmqpWriteSettings(RabbitIntegrationConstants.connectionProvider)
.withRoutingKey(RabbitIntegrationConstants.queueName)
.withDeclaration(exchangeWitHeaders)
val amqpSinkExchange = AmqpSink.simple(writeSettings)
val textToSend = Vector("test")
Source(textToSend).map(s => ByteString(s)).runWith(amqpSinkExchange)
First of all, you will need to make sure that you are not using a SimpleSink
so that it can accept a WriteMessage
instead of a ByteString
val amqpSink = AmqpSink.apply(writeSettings)
Source(textToSend)
.map { s =>
val headers = new java.util.HashMap<String, Object>()
headers.put("header1", "value1")
val basicProperties =
new BasicProperties.Builder()
.headers(headers)
.build()
WriteMessage(ByteString(s))
.withProperties(basicProperties)
}
.runWith(amqpSink)