So i have a SB app that is trying to integrate with a few event hubs to consume and also produce messages.
Reading the Microsoft documentation here i chose the Use Spring Messaging Azure Event Hubs choice.
I have added all the necessary properties to my application.properties file:
spring.cloud.azure.eventhubs.namespace=myHubUrl
spring.cloud.azure.eventhubs.event-hub-name=muHubName
spring.cloud.azure.eventhubs.connection-string=myHubConnectionString
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=nameOfTheCheckpointAccount
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=checkpointContainer
spring.cloud.azure.eventhubs.processor.checkpoint-store.connection-string=checkpointConnectionString
spring.cloud.azure.eventhubs.processor.consumer-group=myAppConsumerGroup
My Consumer Class:
@Component
public class EventHubConsumer {
@EventHubsListener(destination = "${spring.cloud.azure.eventhubs.event-hub-name}", group = "${spring.cloud.azure.eventhubs.processor.consumer-group}")
public void handleMessageFromEventHub(Object message) {
LOGGER.info("Received message.");
}
}
And my producer class:
@Component
public class EventHubProducer {
private final EventHubsTemplate eventHubsTemplate;
private final String eventHubName;
public EventHubProducer(EventHubsTemplate eventHubsTemplate,
@Value("${spring.cloud.azure.eventhubs.event-hub-name}") String eventHubName) {
this.eventHubsTemplate = eventHubsTemplate;
this.eventHubName = eventHubName;
}
public void writeEvent(Object object){
LOGGER.info("Sending message to Hub...");
eventHubsTemplate.send(eventHubName, MessageBuilder.withPayload(object).build());
}
Now, all of this works totally fine, I can read and write to the event hub which's connection string and namespace I've provided.
The problem comes when I am trying to add another Event Hub in the mix, which might have it's own namespace and/or connection string, and this credentials are processed in the background(the Azure library). I haven't found a good way to provide that in my application.properties so that it would work the same as the situation above. Also I'm not sure if i should provide a second blob storage for the additional event hub.
I'm sure there must be a way, and it should be straight forward, but I'm just unable to make it work. Any ideas?
Big thanks!
I successfully sent and received the messages to the Azure Event Hubs using the below Spring Boot application.
EventHubProducerService.java :
package com.example.EventHubs.service;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.EventDataBatch;
import org.springframework.stereotype.Service;
@Service
public class EventHubProducerService {
private static final String EVENT_HUB_CONNECTION_STR_1 = "<Hub1_ConneString>";
private static final String EVENT_HUB_NAME_1 = "<hub1Name>";
private static final String EVENT_HUB_CONNECTION_STR_2 = "<Hub2_ConneString>";
private static final String EVENT_HUB_NAME_2 = "<hub2Name>";
public void sendMessages() {
try (EventHubProducerClient producerClient1 = new EventHubClientBuilder()
.connectionString(EVENT_HUB_CONNECTION_STR_1, EVENT_HUB_NAME_1)
.buildProducerClient();
EventHubProducerClient producerClient2 = new EventHubClientBuilder()
.connectionString(EVENT_HUB_CONNECTION_STR_2, EVENT_HUB_NAME_2)
.buildProducerClient()) {
EventDataBatch eventDataBatch1 = producerClient1.createBatch();
eventDataBatch1.tryAdd(new EventData("First event for Event Hub 1"));
producerClient1.send(eventDataBatch1);
EventDataBatch eventDataBatch2 = producerClient2.createBatch();
eventDataBatch2.tryAdd(new EventData("First event for Event Hub 2"));
producerClient2.send(eventDataBatch2);
System.out.println("Messages sent successfully to both Event Hubs.");
} catch (Exception e) {
e.printStackTrace();
}
}
}
EventHubReceiverService.java :
package com.example.EventHubs.service;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.core.util.BinaryData;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class EventHubReceiverService {
private static final String EVENT_HUB_CONNECTION_STR_1 = "<Hub1_ConneString>";
private static final String EVENT_HUB_NAME_1 = "<hub1Name>";
private static final String CONSUMER_GROUP_1 = "$Default";
private static final String EVENT_HUB_CONNECTION_STR_2 = "<Hub2_ConneString>";
private static final String EVENT_HUB_NAME_2 = "<hub2Name>";
private static final String CONSUMER_GROUP_2 = "$Default";
private static final String BLOB_STORAGE_CONNECTION_STRING = "<storageConneString>";
private static final String BLOB_CONTAINER_NAME = "<containerName?";
private BlobContainerClient blobContainerClient;
public EventHubReceiverService() {
this.blobContainerClient = new BlobContainerClientBuilder()
.connectionString(BLOB_STORAGE_CONNECTION_STRING)
.containerName(BLOB_CONTAINER_NAME)
.buildClient();
}
public void receiveMessages() {
EventHubConsumerAsyncClient consumerClient1 = new EventHubClientBuilder()
.connectionString(EVENT_HUB_CONNECTION_STR_1, EVENT_HUB_NAME_1)
.consumerGroup(CONSUMER_GROUP_1)
.buildAsyncConsumerClient();
EventHubConsumerAsyncClient consumerClient2 = new EventHubClientBuilder()
.connectionString(EVENT_HUB_CONNECTION_STR_2, EVENT_HUB_NAME_2)
.consumerGroup(CONSUMER_GROUP_2)
.buildAsyncConsumerClient();
consumerClient1.receive()
.subscribe(event -> {
String message = event.getData().getBodyAsString();
System.out.printf("Received from Event Hub 1: %s%n", message);
storeMessageInBlobStorage("EventHub1", message);
}, error -> {
System.err.println("Error receiving from Event Hub 1: " + error);
});
consumerClient2.receive()
.subscribe(event -> {
String message = event.getData().getBodyAsString();
System.out.printf("Received from Event Hub 2: %s%n", message);
storeMessageInBlobStorage("EventHub2", message);
}, error -> {
System.err.println("Error receiving from Event Hub 2: " + error);
});
}
private void storeMessageInBlobStorage(String source, String message) {
String blobName = source + "/" + UUID.randomUUID() + ".txt";
blobContainerClient.getBlobClient(blobName)
.upload(BinaryData.fromString(message), true);
System.out.printf("Stored message in Blob Storage: %s%n", blobName);
}
}
EventHubController.java :
package com.example.EventHubs.controller;
import com.example.EventHubs.service.EventHubProducerService;
import com.example.EventHubs.service.EventHubReceiverService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class EventHubController {
private final EventHubProducerService producerService;
private final EventHubReceiverService receiverService;
public EventHubController(EventHubProducerService producerService, EventHubReceiverService receiverService) {
this.producerService = producerService;
this.receiverService = receiverService;
}
@GetMapping("/send")
public String sendMessages() {
producerService.sendMessages();
return "Messages sent!";
}
@GetMapping("/receive")
public String receiveMessages() {
receiverService.receiveMessages();
return "Receiving messages. Check console output.";
}
}
pom.xml :
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.17.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.20.1</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.13</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.17.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Output :
I have sent the messages to Azure Event Hubs using the below URL in the browser.
http://localhost:8080/send
I have received the messages from Azure Event Hubs using the below URL in the browser.
http://localhost:8080/receive