1

I'm interested how to apply CQRS and Event Sourcing with Akka.net. I already noticed Akka.Persistence which delivers the ES part.

As far as I understand the Command Handlers and AggergateRoot can be represented by the ReceivePersistentActor in a single class. I have a base class for that (not complete here)...

public abstract class AggregateRoot<TState> : ReceivePersistentActor
{
    private readonly Func<TState, bool> shouldSafeSnapShot;

    protected AggregateRoot(TState state, Func<TState, bool> shouldSafeSnapShot)
    {
        if (state == null) throw new ArgumentNullException(nameof(state));
        if (shouldSafeSnapShot == null) throw new ArgumentNullException(nameof(shouldSafeSnapShot));

        var path = this.Self.Path;
        this.PersistenceId = $"{path.Parent.Name}/{path.Name}";
        this.State = state;
        this.shouldSafeSnapShot = shouldSafeSnapShot;
    }

    public sealed override string PersistenceId { get; }

    protected TState State { get; }

    protected void Emit<TEvent>(TEvent e, Action<TEvent> apply = null)
    {
        this.Persist(e,
            @event =>
                {
                    // update state
                    apply?.Invoke(@event);

                    // safe snapshot or so...
                    this.SaveSnapshotIfRequired();

                    // publish to event bus?
                    Context.System.EventStream.Publish(e);
                });
    }

    private void SaveSnapshotIfRequired()
    {
        var state = this.State;

        if (this.shouldSafeSnapShot(state))
        {
            this.SaveSnapshot(state);
        }
    }
}

Not I want to send the event over the EventBus and Akka seems to bring something within it's package that might fit to the way it is done with other CQRS frameworks (EventBus abstractions in NCqrs, Inceptum) or in the simple sample of Gregory Young here.

// publish to event bus?
Context.System.EventStream.Publish(e);

However, it seems weird to me to know the implementation of the eventbus within the AggregateRoot domain object actor. I could put the responsibility to an actor child or inject something like an IAbstractEventBus interface that is used to switch implementations but I don't think it should be the responsibility of the AggregateRoot to publish the event after persisting and inform all subscribers, right!? But the ES part seems to be coupled to the Actor.

What is the typical way to do that with Akka.net and Akka.Persistence? Any design ideas how to split this up?

I stumbled up the IEventAdapter interface of Akka.net but I'm not sure this brings me to the right path (or dark side)...

public class SomeEventAdapter : IEventAdapter
{
    public object ToJournal(object evt)
    {
        return evt; // should I add the event emitter here?
        // but is this already stored now? hmmm
    }
}
Beachwalker
  • 7,685
  • 6
  • 52
  • 94

2 Answers2

2

In an idealist world your command handler could take care of tracking events to push to the event bus and persist them to some form of sotrage, once the command is fully handled without any issues - this ensures you don't prematurely push an event before the aggregate method has completed raising all events it should.

In Akka.net, you could change the Tell to an Ask when passing the command to the aggregate (assuming you have your handlers separate from your aggregates for the sake of purity) and have the aggregate return the events it wants raised and persisted back to the handler, so the handler can then send and persist them (probably indirectly itself... maybe via some form on U.O.W.)... that will however mean NO other command that the command handler is configured to handle can be processed until your aggregate has finished... even though other aggregates are probably serviced by the one handler - Ask locks after all... Tell doesn't.

In some other systems the persistence layer itself can act as an event bus. Greg Young's EventStore is one of these... you take the concern totally out of your application).

This is the challenge of idealistic theory vs real-world implementation and framework limitations.

In an idealistic world an aggregate root does NOT concern itself with storing events it raises... it's purpose (some would say) is to hydrate itself from a provided collection of event, raise events and have other components manage the broadcast, persistence and querying (for hydration) of those events. I know Akka.net provides a nicely rolled in DB access/persistence layer to it's actors... which is nice... but does stray from a pure SOLID CQRS implementation. As such you can start to muddy the waters across your implementation, which is where you are.

This is far from ideal but one option

    protected virtual void Execute<TCommand>(Action<TCommand> action, TCommand command)
    {
        UnitOfWork.Add(this);
        try
        {
            action(command);

            UnitOfWork.Commit();

            Sender.Tell(true, Self);
        }
        catch(Exception exception)
        {
            Logger.LogError("Executing an Akka.net request failed.", exception: exception);
            Sender.Tell(false, Self);
            throw;
        }
    }

which can then be wired up via

    Receive<SayHelloWorldCommand>(command => Execute(SayHello, command));

This taken from cqrs.net uses a Unit Of Work (U.O.W.) within the aggregate rather than passing it back to the handler. It's a comprimise they've made to at-least keep the concerns above contained out of the aggregate direcly and to keep to some form of SOLID implementation.

cdmdotnet
  • 1,663
  • 3
  • 17
  • 22
1

@Beachwalker I have the implementation using the CQRS approach at https://github.com/thangchung/magazine-website-akka. I don't know it can help you or not? But hope that you will find out something useful. I am continuing to add more functions and features for it.

thangchung
  • 2,020
  • 2
  • 17
  • 28