akka.netakka.net-persistenceakka.net-streams

How to merge events from 2 Event Sourcing systems


I need to merge events coming from 2 different event sourcing systems handled by Akka.Net Persistence module. The merge must sort events based on their timestamp, and I found the MergeSorted operator in Akka.Stream that does exactly what I need (tried with 2 list of numbers - for events I wrote a custom EventEnvelopComparer).

In my solution I have an actor system (readsystem1) to read from db1, and a second actor system (readysystem2) to read from db2, both created passing the right connection string to the db (a PostGres db).

The problem is: when I use the MergeSorted operator, I need to pass an instance of ActorMaterializer and if the actor materializer is created in the readsystem1 actor system then only the events from db1 are loaded (and merged with themselves); the opposite if I create the actor materializer in the readsystem2. I need to load them both.

Here is an example of the code (writing timestamps to a file, just to test them):

var actorMaterializer1 = ActorMaterializer.Create(
    readSystem1,
    ActorMaterializerSettings.Create(readSystem1).WithDebugLogging(true));
var readJournal1 = PersistenceQuery.Get(readSystem1)
    .ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
var source1 = readJournal1.CurrentEventsByPersistenceId("mypersistenceId", 0L, long.MaxValue);
await source1
    .Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
    .RunWith(FileIO.ToFile(new FileInfo("c:\\tmp\\timestamps1.txt")), actorMaterializer1);

    // just creating the materializer changes the events loaded by the source!!!
var actorMaterializer2 = ActorMaterializer.Create(
    readSystem2, 
    ActorMaterializerSettings.Create(readSystem1).WithDebugLogging(true));
var readJournal2 = PersistenceQuery.Get(readSystem2)
    .ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
var source2 = readJournal2.CurrentEventsByPersistenceId("mypersistenceId", 0L, long.MaxValue);
await source2
    .Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
    .RunWith(FileIO.ToFile(new FileInfo("c:\\tmp\\timestamps2.txt")), actorMaterializer2);

// RunWith receives actorMaterializer1, so only events coming from db1
// will be loaded and merged with themselves
var source = source1.MergeSorted(source2, new EventEnvelopComparer());
await source
    .Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
    .RunWith(FileIO.ToFile(new FileInfo("c:\\tmp\\timestamps.txt")), actorMaterializer1);

How can I accomplish this? Is it possible to read 2 different event sourcing table from the same actor system, in the same or in different db? Is there something about the ActorMaterializer that can solve my problem? Is my approach completely wrong?


Solution

  • To use events from two different ActorSystems I think you'd need to use StreamRefs. But what you could do here is configure two ReadJournalIds, each pointing to a different *.db file. That way you can use one ActorSystem and materializer.

    var source1 = PersistenceQuery.Get(actorSystem).ReadJournalFor<SqlReadJournal>("read-journal-1")
        .CurrentEventsByPersistenceId("sample-id-1", 0L, long.MaxValue);
    
    var source2 = PersistenceQuery.Get(actorSystem).ReadJournalFor<SqlReadJournal>("read-journal-2")
        .CurrentEventsByPersistenceId("sample-id-1", 0L, long.MaxValue);
    
    var source = source1.MergeSorted(source2, new EventEnvelopComparer())
        .RunForeach(x => System.Console.WriteLine($"EVENT: {x.Timestamp}"), actorSystem.Materializer());