spring-bootazuremiddlewareazure-eventhub

Connect Spring Boot application to multiple Azure Event Hubs


So i have a SB app that is trying to integrate with a few event hubs to consume and also produce messages.

Reading the Microsoft documentation here i chose the Use Spring Messaging Azure Event Hubs choice.

I have added all the necessary properties to my application.properties file:

spring.cloud.azure.eventhubs.namespace=myHubUrl
spring.cloud.azure.eventhubs.event-hub-name=muHubName
spring.cloud.azure.eventhubs.connection-string=myHubConnectionString
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=nameOfTheCheckpointAccount
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=checkpointContainer
spring.cloud.azure.eventhubs.processor.checkpoint-store.connection-string=checkpointConnectionString
spring.cloud.azure.eventhubs.processor.consumer-group=myAppConsumerGroup

My Consumer Class:

@Component
public class EventHubConsumer {



    @EventHubsListener(destination = "${spring.cloud.azure.eventhubs.event-hub-name}", group = "${spring.cloud.azure.eventhubs.processor.consumer-group}")
    public void handleMessageFromEventHub(Object message) {
        LOGGER.info("Received message.");
    }
}

And my producer class:

@Component
public class EventHubProducer {  

    private final EventHubsTemplate eventHubsTemplate;
    private final String eventHubName;


    public EventHubProducer(EventHubsTemplate eventHubsTemplate,
                                  @Value("${spring.cloud.azure.eventhubs.event-hub-name}") String eventHubName) {
        this.eventHubsTemplate = eventHubsTemplate;
        this.eventHubName = eventHubName;
    }

    public void writeEvent(Object object){
        LOGGER.info("Sending message to Hub...");

        eventHubsTemplate.send(eventHubName, MessageBuilder.withPayload(object).build());
    }

Now, all of this works totally fine, I can read and write to the event hub which's connection string and namespace I've provided.

The problem comes when I am trying to add another Event Hub in the mix, which might have it's own namespace and/or connection string, and this credentials are processed in the background(the Azure library). I haven't found a good way to provide that in my application.properties so that it would work the same as the situation above. Also I'm not sure if i should provide a second blob storage for the additional event hub.

I'm sure there must be a way, and it should be straight forward, but I'm just unable to make it work. Any ideas?

Big thanks!


Solution

  • I successfully sent and received the messages to the Azure Event Hubs using the below Spring Boot application.

    EventHubProducerService.java :

    package com.example.EventHubs.service;
    import com.azure.messaging.eventhubs.EventData;
    import com.azure.messaging.eventhubs.EventHubClientBuilder;
    import com.azure.messaging.eventhubs.EventHubProducerClient;
    import com.azure.messaging.eventhubs.EventDataBatch;
    import org.springframework.stereotype.Service;
    
    @Service
    public class EventHubProducerService {
    
        private static final String EVENT_HUB_CONNECTION_STR_1 = "<Hub1_ConneString>";
        private static final String EVENT_HUB_NAME_1 = "<hub1Name>";
        private static final String EVENT_HUB_CONNECTION_STR_2 = "<Hub2_ConneString>";
        private static final String EVENT_HUB_NAME_2 = "<hub2Name>";
    
        public void sendMessages() {
            try (EventHubProducerClient producerClient1 = new EventHubClientBuilder()
                    .connectionString(EVENT_HUB_CONNECTION_STR_1, EVENT_HUB_NAME_1)
                    .buildProducerClient();
                 EventHubProducerClient producerClient2 = new EventHubClientBuilder()
                         .connectionString(EVENT_HUB_CONNECTION_STR_2, EVENT_HUB_NAME_2)
                         .buildProducerClient()) {
                EventDataBatch eventDataBatch1 = producerClient1.createBatch();
                eventDataBatch1.tryAdd(new EventData("First event for Event Hub 1"));
                producerClient1.send(eventDataBatch1);
                EventDataBatch eventDataBatch2 = producerClient2.createBatch();
                eventDataBatch2.tryAdd(new EventData("First event for Event Hub 2"));
                producerClient2.send(eventDataBatch2);
                System.out.println("Messages sent successfully to both Event Hubs.");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    EventHubReceiverService.java :

    package com.example.EventHubs.service;
    import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
    import com.azure.messaging.eventhubs.EventHubClientBuilder;
    import com.azure.storage.blob.BlobContainerClient;
    import com.azure.storage.blob.BlobContainerClientBuilder;
    import com.azure.core.util.BinaryData;
    import org.springframework.stereotype.Service;
    import java.util.UUID;
    
    @Service
    public class EventHubReceiverService {
    
        private static final String EVENT_HUB_CONNECTION_STR_1 = "<Hub1_ConneString>";
        private static final String EVENT_HUB_NAME_1 = "<hub1Name>";
        private static final String CONSUMER_GROUP_1 = "$Default";
        private static final String EVENT_HUB_CONNECTION_STR_2 = "<Hub2_ConneString>";
        private static final String EVENT_HUB_NAME_2 = "<hub2Name>";
        private static final String CONSUMER_GROUP_2 = "$Default";
        private static final String BLOB_STORAGE_CONNECTION_STRING = "<storageConneString>";
        private static final String BLOB_CONTAINER_NAME = "<containerName?";
        private BlobContainerClient blobContainerClient;
    
        public EventHubReceiverService() {
            this.blobContainerClient = new BlobContainerClientBuilder()
                    .connectionString(BLOB_STORAGE_CONNECTION_STRING)
                    .containerName(BLOB_CONTAINER_NAME)
                    .buildClient();
        }
        public void receiveMessages() {
            EventHubConsumerAsyncClient consumerClient1 = new EventHubClientBuilder()
                    .connectionString(EVENT_HUB_CONNECTION_STR_1, EVENT_HUB_NAME_1)
                    .consumerGroup(CONSUMER_GROUP_1)
                    .buildAsyncConsumerClient();
            EventHubConsumerAsyncClient consumerClient2 = new EventHubClientBuilder()
                    .connectionString(EVENT_HUB_CONNECTION_STR_2, EVENT_HUB_NAME_2)
                    .consumerGroup(CONSUMER_GROUP_2)
                    .buildAsyncConsumerClient();
            consumerClient1.receive()
                    .subscribe(event -> {
                        String message = event.getData().getBodyAsString();
                        System.out.printf("Received from Event Hub 1: %s%n", message);
                        storeMessageInBlobStorage("EventHub1", message);
                    }, error -> {
                        System.err.println("Error receiving from Event Hub 1: " + error);
                    });
            consumerClient2.receive()
                    .subscribe(event -> {
                        String message = event.getData().getBodyAsString();
                        System.out.printf("Received from Event Hub 2: %s%n", message);
                        storeMessageInBlobStorage("EventHub2", message);
                    }, error -> {
                        System.err.println("Error receiving from Event Hub 2: " + error);
                    });
        }
        private void storeMessageInBlobStorage(String source, String message) {
            String blobName = source + "/" + UUID.randomUUID() + ".txt";
            blobContainerClient.getBlobClient(blobName)
                    .upload(BinaryData.fromString(message), true);
            System.out.printf("Stored message in Blob Storage: %s%n", blobName);
        }
    }
    

    EventHubController.java :

    package com.example.EventHubs.controller;
    import com.example.EventHubs.service.EventHubProducerService;
    import com.example.EventHubs.service.EventHubReceiverService;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class EventHubController {
    
        private final EventHubProducerService producerService;
        private final EventHubReceiverService receiverService;
        public EventHubController(EventHubProducerService producerService, EventHubReceiverService receiverService) {
            this.producerService = producerService;
            this.receiverService = receiverService;
        }
        @GetMapping("/send")
        public String sendMessages() {
            producerService.sendMessages();
            return "Messages sent!";
        }
        @GetMapping("/receive")
        public String receiveMessages() {
            receiverService.receiveMessages();
            return "Receiving messages. Check console output.";
        }
    }
    

    pom.xml :

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs</artifactId>
            <version>5.17.0</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
            <version>1.20.1</version>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.4.13</version>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-storage-blob</artifactId>
            <version>12.17.0</version> 
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    

    Output :

    I have sent the messages to Azure Event Hubs using the below URL in the browser.

    http://localhost:8080/send
    

    enter image description here

    enter image description here

    I have received the messages from Azure Event Hubs using the below URL in the browser.

    http://localhost:8080/receive
    

    enter image description here

    enter image description here