3

I have had the command bus up and running for a while and developed a lot of my application. Now their are some commands I wish to process asynchronously. I have added the BernardMessageProducer and that all seem to work. I would like to only send curtain Commands to the async bus and the rest to be handled locally.

Is this possible and how can I do it. I have already written a command router, but is after the command bus has handled the message .

Thank you

dbrumann
  • 16,803
  • 2
  • 42
  • 58

2 Answers2

2

From the readme

If you want to set up a bus that handles all messages async you can do so by attaching a Prooph\ServiceBus\Plugin\MessageProducerPlugin initialized with your message producer of choice to a message bus.

Let's look at a simple example using the psb-zeromq-producer

//app bootstrap
$container = new Container;
$container['config'] = [
    'prooph' => [
        'zeromq_producer' => [
            'dsn' => 'tcp://127.0.0.1:5555', // ZMQ Server Address.
            'persistent_id' => 'example', // ZMQ Persistent ID to keep connections alive between requests.
            'rpc' => false, // Use as Query Bus.
        ]
    ]
];

$factory = \Prooph\ServiceBus\Message\ZeroMQ\Container\ZeroMQMessageProducerFactory;
$zmqProducer = $factory($container);

$commandBus = new \Prooph\ServiceBus\CommandBus();

$messageProducerForwarder = new \Prooph\ServiceBus\Plugin\MessageProducerPlugin($zmqProducer);

$commandBus->utilize($messageProducerForwarder);

$echoText = new ExampleCommand('It works');
$commandBus->dispatch($echoText);

You can also route individual messages to message producer by using a message router plugin.

Note: Prooph\ServiceBus\Plugin\Router\RegexRouter is a good choice if you want to handle all messages of a specific namespace async.

  • I have added code to redirect individual messages to message producer by using a message router. This all works, but it then drops out and tries to use the default handler, even after `$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, true);` How to I stop an ActionEvent from being processed? – Guy Radford Aug 10 '16 at 08:13
0

I'm not sure how to do this wih prooph (espacially since you didn't provide any code samples), but generally speaking: It is possible a good approach can be found in Mathias Noback's MessageBus-repository in the docs for Command Bus.

You can create a middleware that checks, e.g. for a marker interface (as in the example linked above):

public function handle($message, callable $next)
{
    if ($message instanceof IsHandledAsynchronously) {
        // handle the message asynchronously using a message queue
        $this->messageQueue->add($message);
    } else {
        // handle the message synchronously, i.e. right-away
        $next($message);
    }
}

Then it is just a matter of marking your command by letting it implement the right interface and obviously appending the middleware at the right place in the command bus.

If you have multiple command buses as your question suggests. Then you probably want to have some kind of CommandResolver that matches a command, e.g. by it's class name to the appropriate Command Bus. Again see Mathias Noback's docs especially the section Defining the command handler map in the same document and DelegatesToMessageHandlerInterface

dbrumann
  • 16,803
  • 2
  • 42
  • 58
  • the Prooph CQRS framework behaves a little differently to SimpleBus. I was wondering If I could route it earlier before it went on the bus, but maybe routing it after could work. – Guy Radford Aug 07 '16 at 18:57