Building upon my 2 previous questions:
I have the following structure:
A Spring Boot Backend that runs Axon without Issues, publishes Events to the Kafka Source, etc
A normal application that uses the AxonConnector interface to listen to Events and send Commands to the Backend
This is the implementation of the AxonConnector using Kafka:
@Log4j2
public class KafkaPostgresAxonConnector implements AxonConnector {
private final String kafkaBootstrapServers;
private final String postgresUrl;
private final String postgresUser;
private final String postgresPassword;
private final String componentName;
private final String groupId;
private final Configurer configurer;
private final JacksonSerializer serializer;
private KafkaProducer<String, String> producer;
private Configuration axonConfig;
public KafkaPostgresAxonConnector(String kafkaBootstrapServers, String postgresUrl,
String postgresUser, String postgresPassword, String componentName) {
this.kafkaBootstrapServers = kafkaBootstrapServers;
this.postgresUrl = postgresUrl;
this.postgresUser = postgresUser;
this.postgresPassword = postgresPassword;
this.componentName = componentName;
this.groupId = componentName + UUID.randomUUID();
serializer = JacksonSerializer.builder()
.defaultTyping()
.lenientDeserialization()
.build();
configurer = DefaultConfigurer.defaultConfiguration()
.configureSerializer(c -> serializer)
.configureEventSerializer(c -> serializer)
.configureMessageSerializer(c -> serializer);
}
@Override
public void start() {
DataSource dataSource = dataSource();
initializePostgres(dataSource);
initializeKafkaConsumer();
initializeKafkaProducer();
log.info("Starting AxonFramework with component: {}", componentName);
axonConfig = configurer.buildConfiguration();
axonConfig.start();
}
@Override
public void registerEventHandler(Object eventHandler) {
configurer.registerEventHandler(conf -> eventHandler);
}
@Override
public <T> void registerAggregate(Class<T> aggregateType) {
configurer.configureAggregate(aggregateType);
}
@Override
public UUID sendCommand(Object command) {
if (axonConfig == null) {
throw new IllegalStateException("Axon Framework not initialized");
}
CommandGateway commandGateway = axonConfig.commandGateway();
return commandGateway.sendAndWait(command);
}
@Override
public <R> R sendQuery(Object query, ResponseType<R> responseType) {
if (axonConfig == null) {
log.warn("Cannot send query - Axon Framework not initialized");
return null;
}
QueryGateway queryGateway = axonConfig.queryGateway();
return queryGateway.query(query, responseType).join();
}
@Override
public void close() {
if (producer != null) {
producer.close();
}
if (axonConfig != null) {
axonConfig.shutdown();
}
}
/**
* KAFKA
* */
private KafkaMessageConverter<String, byte[]> messageConverter() {
return DefaultKafkaMessageConverter.builder()
.serializer(serializer)
.build();
}
/**
* KAFKA PRODUCER
* */
private void initializeKafkaProducer() {
ProducerFactory<String, byte[]> producerFactory = producerFactory();
KafkaMessageConverter<String, byte[]> messageConverter = messageConverter();
KafkaPublisher<String, byte[]> kafkaPublisher = kafkaPublisher(
producerFactory,
messageConverter
);
KafkaEventPublisher<String, byte[]> kafkaEventPublisher = kafkaEventPublisher(kafkaPublisher);
registerPublisherToEventProcessor(configurer.eventProcessing(), kafkaEventPublisher);
}
public void registerPublisherToEventProcessor(
EventProcessingConfigurer eventProcessingConfigurer,
KafkaEventPublisher<String, byte[]> kafkaEventPublisher
) {
String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
eventProcessingConfigurer
.registerEventHandler(configuration -> kafkaEventPublisher)
.registerSubscribingEventProcessor(processingGroup)
.usingSubscribingEventProcessors();
}
private KafkaPublisher<String, byte[]> kafkaPublisher(
ProducerFactory<String, byte[]> producerFactory,
KafkaMessageConverter<String, byte[]> kafkaMessageConverter
) {
return KafkaPublisher.<String, byte[]>builder()
.producerFactory(producerFactory)
.messageConverter(kafkaMessageConverter)
.serializer(serializer)
.build();
}
public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(
KafkaPublisher<String, byte[]> kafkaPublisher
) {
return KafkaEventPublisher.<String, byte[]>builder()
.kafkaPublisher(kafkaPublisher)
.build();
}
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
producerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
return DefaultProducerFactory.<String, byte[]>builder()
.configuration(producerProps)
.confirmationMode(ConfirmationMode.NONE)
.build();
}
/**
* KAFKA CONSUMER
* */
private void initializeKafkaConsumer() {
ConsumerFactory<String, byte[]> consumerFactory = consumerFactory();
Fetcher<String, byte[], EventMessage<?>> fetcher = fetcher();
KafkaMessageConverter<String, byte[]> messageConverter = messageConverter();
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = kafkaMessageSourceConfigurer(
configurer);
SubscribableKafkaMessageSource<String, byte[]> messageSource = subscribableKafkaMessageSource(
consumerFactory,
fetcher,
messageConverter,
kafkaMessageSourceConfigurer
);
configureSubscribableKafkaSource(configurer.eventProcessing(), messageSource);
}
private Fetcher<String, byte[], EventMessage<?>> fetcher() {
return AsyncFetcher.<String, byte[], EventMessage<?>>builder()
.pollTimeout(1000)
.executorService(Executors.newSingleThreadExecutor())
.build();
}
private ConsumerFactory<String, byte[]> consumerFactory() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultConsumerFactory<>(consumerProps);
}
private KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer(Configurer configurer) {
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
configurer.registerModule(kafkaMessageSourceConfigurer);
return kafkaMessageSourceConfigurer;
}
private SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(
ConsumerFactory<String, byte[]> consumerFactory,
Fetcher<String, byte[], EventMessage<?>> fetcher,
KafkaMessageConverter<String, byte[]> messageConverter,
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer
) {
SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
.autoStart()
.groupId(groupId)
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.messageConverter(messageConverter)
.consumerCount(1)
.serializer(serializer)
.build();
kafkaMessageSourceConfigurer.configureSubscribableSource(
configuration -> subscribableKafkaMessageSource
);
return subscribableKafkaMessageSource;
}
private void configureSubscribableKafkaSource(EventProcessingConfigurer eventProcessingConfigurer,
SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource) {
eventProcessingConfigurer.configureDefaultSubscribableMessageSource(
configuration -> subscribableKafkaMessageSource
);
eventProcessingConfigurer.usingSubscribingEventProcessors();
}
/**
* POSTGRES
*/
private void initializePostgres(DataSource dataSource) {
ConnectionProvider connectionProvider = new DataSourceConnectionProvider(dataSource);
EventSchema schema = EventSchema.builder()
.eventTable("domain_event_entry")
.snapshotTable("snapshot_event_entry")
.build();
// Initialize Postgres connection
EventStorageEngine eventStorageEngine = JdbcEventStorageEngine.builder()
.transactionManager(transactionManager())
.connectionProvider(connectionProvider)
.snapshotFilter(SnapshotFilter.allowAll())
.eventSerializer(serializer)
.snapshotSerializer(serializer)
.schema(schema)
.build();
configurer.configureEmbeddedEventStore(c -> eventStorageEngine);
}
private DataSource dataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(postgresUrl);
config.setUsername(postgresUser);
config.setPassword(postgresPassword);
config.setDriverClassName("org.postgresql.Driver");
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
return new HikariDataSource(config);
}
private TransactionManager transactionManager() {
return NoTransactionManager.INSTANCE;
}
}
To manually test the AxonConnector, I created this app:
@Log4j2
public class ConnectorTest {
public static void main(String[] args) {
AxonConnector connector = AxonConnector.kafka(
"localhost:29092,localhost:39092,localhost:49092",
"jdbc:postgresql://localhost:5432/database",
"postgres-user",
"postgres-password",
"Tester-1"
);
connector.start();
connector.registerEventHandler(new Handler());
UUID id =
connector.sendCommand(new CreateModuleCommand("TestModule", 10, 1, 5, true, true));
log.info(connector.sendQuery(new FindModuleByIdQuery(id), ResponseTypes.optionalInstanceOf(ModuleDto.class)));
// Add Shutdown Hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Shutting down Axon Connector...");
try {
connector.close();
} catch (Exception e) {
log.error("Error while shutting down Axon Connector", e);
}
}));
}
@Log4j2
public static class Handler {
@EventHandler
public void on(ModuleCreatedEvent event) {
System.out.println("Module Created: " + event.getModule().getId());
}
}
}
The Containers are running like follows:
But running the main function throws the following exception:
Exception in thread "main" org.axonframework.commandhandling.NoHandlerForCommandException: No handler was subscribed for command [de.cronixzero.test.messages.commands.modules.CreateModuleCommand].
at org.axonframework.commandhandling.SimpleCommandBus.doDispatch(SimpleCommandBus.java:167)
at org.axonframework.commandhandling.SimpleCommandBus.lambda$dispatch$1(SimpleCommandBus.java:131)
at org.axonframework.tracing.Span.run(Span.java:101)
at org.axonframework.commandhandling.SimpleCommandBus.dispatch(SimpleCommandBus.java:125)
at org.axonframework.commandhandling.gateway.AbstractCommandGateway.send(AbstractCommandGateway.java:76)
at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:83)
at org.axonframework.commandhandling.gateway.DefaultCommandGateway.sendAndWait(DefaultCommandGateway.java:100)
at de.cronixzero.test.messages.connector.kafka.KafkaPostgresAxonConnector.sendCommand(KafkaPostgresAxonConnector.java:136)
at de.cronixzero.test.messages.testing.ConnectorTest.main(ConnectorTest.java:30)
ConnectorTest.java:30
UUID id = connector.sendCommand(new CreateModuleCommand("TestModule", 10, 1, 5, true, true));
KafkaPostgresAxonConnector.java:136:
CommandGateway commandGateway = axonConfig.commandGateway();
return commandGateway.sendAndWait(command);
What is it, that I'm doing wrong here?
Running the Java Debugger shows that the @EventHandler annotated method never gets reached, even though the application seems to be connected to the kafka source and is even registered as a Consumer.
Why is this?
Just now, I noticed that once I start the TestApplication, the messageCount in the Kafka Topic ramps up astronomically (The same messages seem to be resent constantly)
They are absolutely identical in their content and easily ramp up to 200.000 repititions in just a couple of minutes
For the distribution of command messages in Axon Framework, you would need to us a distributed version of the CommandBus
. There are roughly two options you have for this:
1. Axon Server
2. The DistributedCommandBus
with Spring Cloud Service Discovery
3. The DistributedCommandBus
with JGroups
Using Axon Server is arguably the easiest way to set it up seamlessly. Furthermore, it would mean you also have a dedicated Event Store solution, distributed event messaging, and distributed query messaging. Alongside all other features given by Axon Server to simplify things. Running a single Axon Server instance is very straightforward. You could also follow the setup process on AxonIQ Console to simplify it further. Note that Axon Server is not necessarily a paid tool.
If you feel like doing more work, you can try out the Spring Cloud or JGroups extensions from Axon Framework, in combination with a DistributedCommandBus
. For the former, you will need to select a preferred Spring Cloud Service Discovery solution, like Eureka, Consul, Kubernetes, or ZooKeeper. For JGroups you would, well, setup JGroups as the service discovery.