javaapache-kafkaaxon

Using AxonFramework without Spring and with Kafka + PostgreSQL


I'm trying to access the events of my Spring instance that publishes Axon events to Kafka and stores them in PostgreSQL. To accomplish this we created the following interface:

public interface AxonConnector extends AutoCloseable {

  /**
   * Starts the Axon connector and establishes a connection to the Axon server.
   */
  void start();

  /**
   * Closes the Axon connector and releases any resources it holds.
   */
  void registerEventHandler(Object eventHandler);

  /**
   * Registers an aggregate with the Axon server.
   *
   * @param aggregateType the class of the aggregate to register
   * @param <T>           the type of the aggregate
   */
  <T> void registerAggregate(Class<T> aggregate);

  /**
   * Sends a command to the Axon server.
   *
   * @param command the command to send
   */
  void sendCommand(Object command);

  /**
   * Sends a query to the Axon server and returns the result.
   *
   * @param query the query to send
   * @return the result of the query
   */
  <R> R sendQuery(Object query);
}

Implementing this using the AxonServer works flawlessly:

// Configure connection to Axon Server
AxonServerConfiguration serverConfig = AxonServerConfiguration.builder()
    .servers(host + ":" + port)
    .componentName(componentName)
    .build();

AxonServerConnectionManager connectionManager = AxonServerConnectionManager.builder()
    .axonServerConfiguration(serverConfig)
    .build();

// Create event store
AxonServerEventStore eventStore = AxonServerEventStore.builder()
    .configuration(serverConfig)
    .platformConnectionManager(connectionManager)
    .snapshotFilter(SnapshotFilter.allowAll())
    .build();

// Build Axon configuration
configurer = DefaultConfigurer.defaultConfiguration();
configurer.configureEventStore(config -> eventStore)
    .configureSerializer(config -> JacksonSerializer.defaultSerializer())
    .configureEventSerializer(config -> JacksonSerializer.defaultSerializer())
    .configureMessageSerializer(config -> JacksonSerializer.defaultSerializer());

But when it comes to doing it without Spring Dependencies, with Kafka and with PostgreSQL, the documentation thins out a lot and I'm not able to find a point where I should even begin.

So far, I've been able to create the EventStore with PostgreSQL (at least I hope that it is correct since I haven't been able to actually test it)

...

HikariConfig config = new HikariConfig();
config.setJdbcUrl(postgresUrl);
config.setUsername(postgresUser);
config.setPassword(postgresPassword);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");

DataSource dataSource = new HikariDataSource(config);

ConnectionProvider connectionProvider = new UnitOfWorkAwareConnectionProviderWrapper(
    new DataSourceConnectionProvider(dataSource)
);
EventStorageEngine eventStorageEngine = JdbcEventStorageEngine.builder()
    .connectionProvider(connectionProvider)
    .snapshotFilter(SnapshotFilter.allowAll())
    .eventSerializer(serializer)
    .snapshotSerializer(serializer)
    .build();
EmbeddedEventStore eventStore = EmbeddedEventStore.builder()
    .storageEngine(eventStorageEngine)
    .build();
Configurer configurer = DefaultConfigurer.defaultConfiguration()
    .configureEventStore(c -> eventStore)
    .configureSerializer(c -> serializer)
    .configureEventSerializer(c -> serializer)
    .configureMessageSerializer(c -> serializer);

axonConfig = configurer.start();

Then I tried to create a KafkaProducer and a KafkaConsumer:

private void initializeKafka() {
  Properties producerProps = new Properties();
  producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
  producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      StringSerializer.class.getName());
  producer = new KafkaProducer<>(producerProps);

  Properties consumerProps = new Properties();
  consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
  consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "axon-connector-group");
  consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
      StringDeserializer.class.getName());
  consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
      StringDeserializer.class.getName());
  consumer = new KafkaConsumer<>(consumerProps);
  consumer.subscribe(Collections.singletonList("axon-commands"));
}

But after that I just can't figure out how to further configure Axon to use Kafka

I'd gladly appreciate any help here


Solution

  • First and foremost: awesome to hear you got your interface working flawlessly with Axon Server.

    Frankly, it does make me wonder why you are dropping this ease of support entirely for two different pieces of infrastructure that are more involved to set up. As your question attests to.

    Nonetheless, I do have a hint when it comes to configuring Axon's Kafka Extension. I would take a look at the Spring Boot autoconfiguration class that's contained the in the project. For your convenience, you can find it here. As you can spot in that class, there's quite a lot to configuring the extension correctly. This, and the sheer amount of Axon+Spring users, and the lower amount of Kafka users in general, is why the documentation is lacking in that area. If you feel strongly about having that documentation, you are free to draft an issue for the extension. It's open-source software, based on contributions, after all.

    Now, back to the autoconfiguration class.
    As you can see, it generates numerous beans that are used to get Axon's Kafka event processing going:

    1. A KafkaMessageConverter, to be able to switch from Axon Framework's event format to Kafka's records.

    2. A ProducerFactory, which constructs the Kafka Producers for the extension's KafkaPublisher:

    3. A KafkaPublisher, receiving events from a small Axon-specific wrapper called the KafkaEventPublisher and migrating them to a Producer.

    4. The KafkaEventPublisher, implementing Axon's EventMessageHandler interface, ensuring it can act as a "regular" event handler in one of Axon's Event Processors.

    5. The ConsumerFactory, mirroring the ProducerFactory, but now for Consumers.

    6. A KafkaFetcher, which is the process that'll fetch events from a Kafka Consumer for the Axon Framework layer.

    7. A StreamableKafkaMessageSource, forming the previously mentioned "Axon Framework layer." The StreamableMessageSource, of which this is an implementation, is the component Axon's Streaming Event Processors use to get events from.

    Component one is needed for both the producing and consuming side. Components 2, 3, and 4 are needed to read events from an Axon Framework event storage solution and put them onto Kafka. Components 5, 6, and 7 are in turn required to start consuming events from Kafka back into an Axon Framework environment.

    The last pointer to finalize event consumption with Axon's Kafka extension, is wiring the StreamableKafkaMessageSource to a Streamable Event Processor. This can be achieved by registering this message source as the default message source for your Streaming Event Processor, for example. This part is definitely has more explanation in AxonIQ's library, and can be found here.

    I know this isn't the full solution you can copy paste or anything. But I do hope it provides you with the guidance of what you need to do.