scalarabbitmqakka-streamalpakka

How to set headers in RabbitMq with Scala


Hi I'm trying to set headers in the rabbit messages with the library akka-stream-alpakka-amqp in scala but I can't be able to make it work and reading the repository I can't see any reference or method to add headers in the messages.

I was trying to add the headers property in the arguments but i can't make it work


val exchangeWitHeaders =
        RabbitIntegrationConstants.queueDeclaration.withArguments(Map("x-match" -> "all", "h1" -> "header"))
val writeSettings = AmqpWriteSettings(RabbitIntegrationConstants.connectionProvider)
        .withRoutingKey(RabbitIntegrationConstants.queueName)
        .withDeclaration(exchangeWitHeaders)

val amqpSinkExchange = AmqpSink.simple(writeSettings)
val textToSend = Vector("test")

Source(textToSend).map(s => ByteString(s)).runWith(amqpSinkExchange)

Solution

  • First of all, you will need to make sure that you are not using a SimpleSink so that it can accept a WriteMessage instead of a ByteString

    val amqpSink = AmqpSink.apply(writeSettings)
    
    
    Source(textToSend)
      .map { s =>
        val headers = new java.util.HashMap<String, Object>()
        headers.put("header1", "value1")
    
        val basicProperties = 
          new BasicProperties.Builder()
            .headers(headers)
            .build()
    
        WriteMessage(ByteString(s))
          .withProperties(basicProperties)
      }
      .runWith(amqpSink)