0

I need to merge events coming from 2 different event sourcing systems handled by Akka.Net Persistence module. The merge must sort events based on their timestamp, and I found the MergeSorted operator in Akka.Stream that does exactly what I need (tried with 2 list of numbers - for events I wrote a custom EventEnvelopComparer).

In my solution I have an actor system (readsystem1) to read from db1, and a second actor system (readysystem2) to read from db2, both created passing the right connection string to the db (a PostGres db).

The problem is: when I use the MergeSorted operator, I need to pass an instance of ActorMaterializer and if the actor materializer is created in the readsystem1 actor system then only the events from db1 are loaded (and merged with themselves); the opposite if I create the actor materializer in the readsystem2. I need to load them both.

Here is an example of the code (writing timestamps to a file, just to test them):

var actorMaterializer1 = ActorMaterializer.Create(
    readSystem1,
    ActorMaterializerSettings.Create(readSystem1).WithDebugLogging(true));
var readJournal1 = PersistenceQuery.Get(readSystem1)
    .ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
var source1 = readJournal1.CurrentEventsByPersistenceId("mypersistenceId", 0L, long.MaxValue);
await source1
    .Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
    .RunWith(FileIO.ToFile(new FileInfo("c:\\tmp\\timestamps1.txt")), actorMaterializer1);

    // just creating the materializer changes the events loaded by the source!!!
var actorMaterializer2 = ActorMaterializer.Create(
    readSystem2, 
    ActorMaterializerSettings.Create(readSystem1).WithDebugLogging(true));
var readJournal2 = PersistenceQuery.Get(readSystem2)
    .ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
var source2 = readJournal2.CurrentEventsByPersistenceId("mypersistenceId", 0L, long.MaxValue);
await source2
    .Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
    .RunWith(FileIO.ToFile(new FileInfo("c:\\tmp\\timestamps2.txt")), actorMaterializer2);

// RunWith receives actorMaterializer1, so only events coming from db1
// will be loaded and merged with themselves
var source = source1.MergeSorted(source2, new EventEnvelopComparer());
await source
    .Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
    .RunWith(FileIO.ToFile(new FileInfo("c:\\tmp\\timestamps.txt")), actorMaterializer1);

How can I accomplish this? Is it possible to read 2 different event sourcing table from the same actor system, in the same or in different db? Is there something about the ActorMaterializer that can solve my problem? Is my approach completely wrong?

Daniele Armanasco
  • 7,289
  • 9
  • 43
  • 55

2 Answers2

2

To use events from two different ActorSystems I think you'd need to use StreamRefs. But what you could do here is configure two ReadJournalIds, each pointing to a different *.db file. That way you can use one ActorSystem and materializer.

var source1 = PersistenceQuery.Get(actorSystem).ReadJournalFor<SqlReadJournal>("read-journal-1")
    .CurrentEventsByPersistenceId("sample-id-1", 0L, long.MaxValue);

var source2 = PersistenceQuery.Get(actorSystem).ReadJournalFor<SqlReadJournal>("read-journal-2")
    .CurrentEventsByPersistenceId("sample-id-1", 0L, long.MaxValue);

var source = source1.MergeSorted(source2, new EventEnvelopComparer())
    .RunForeach(x => System.Console.WriteLine($"EVENT: {x.Timestamp}"), actorSystem.Materializer());
Ismael
  • 361
  • 3
  • 9
0

I think I see what's going on here....

Here's why your choice of Materializer is creating an issue for you - the Materializer is going to compile your Akka.Persistence.Query + Akka.Streams graphs into actors. When you use the Materializer from ActorSystem A - it's going to materialize the actors into that ActorSystem and use its Journal implementation for Akka.Persistence - that's why you only get 1 events from 1 ActorSystem.

However, it looks like you're doing exactly what I would do - pre-materializing each source before merging them together... Would it be possible to create a reproduction of this on GitHub using a dummy SQLite database or some such? If so I'd be happy to debug it.

Aaronontheweb
  • 8,224
  • 6
  • 32
  • 61
  • Sure, I am trying to purge all not related code and then I will publish it. Thanks! – Daniele Armanasco Sep 26 '22 at 07:09
  • @Aarontheweb I created a repo here: https://github.com/armdan72/akka-merge. It contains a couple of dbs with only one event each (to simplify things), but you can create some other uncommenting code in the Console. The merge code is showing the same unexpected behaviour. – Daniele Armanasco Sep 26 '22 at 17:40
  • Hello. Do you have any news about this? We need to implement this feature by the end of October ... Could you reproduce the problem from my repo? Thanks in advance. – Daniele Armanasco Oct 11 '22 at 06:27