5

I used Symfony2 with the RabbitMqBundle to create a worker that sends documents to ElasticSearch. Indexing documents at a one-by-one rate is much slower than using the ElasticSearch bulk API. Therefore I created a buffer that flushes the documents to ES in groups of thousand. The code looks (a bit simplified) as follows:

class SearchIndexator
{
    protected $elasticaService;
    protected $buffer = [];
    protected $bufferSize = 0;

    // The maximum number of documents to keep in the buffer.
    // If the buffer reaches this amount of documents, then the buffers content
    // is send to elasticsearch for indexation.
    const MAX_BUFFER_SIZE = 1000;

    public function __construct(ElasticaService $elasticaService)
    {
        $this->elasticaService = $elasticaService;
    }

    /**
     * Destructor
     *
     * Flush any documents that remain in the buffer.
     */
    public function __destruct()
    {
        $this->flush();
    }

    /**
     * Add a document to the indexation buffer.
     */
    public function onMessage(array $document)
    {
        // Prepare the document for indexation.
        $this->doHeavyWeightStuff($document);

        // Create an Elastica document
        $document = new \Elastica\Document(
            $document['key'],
            $document
        );

        // Add the document to the buffer.
        $this->buffer[] = $document;

        // Flush the buffer when max buffersize has been reached.
        if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
            $this->flush();
        }
    }

    /**
     * Send the current buffer to ElasticSearch for indexation.
     */
    public function flush()
    {
        // Send documents to ElasticSearch for indexation.
        if (1 <= $this->bufferSize) {
            $this->elasticaService->addDocuments($this->buffer);
        }

        // Clear buffer
        $this->buffer = [];
        $this->bufferSize = 0;
    }
}

This all works quite nice but there is a slight problem. The queue gets filled with messages at an unpredictable rate. Sometimes 100000 in 5 minutes, sometimes not one for hours. When there are for instance 82671 documents queued, the last 671 documents don't get indexated before receiving another 329 documents which might take hours. I would like to be able to do the following:

Warning: Sci-Fi code! This obviously won't work:

class SearchIndexator
{
    protected $elasticaService;
    protected $buffer = [];
    protected $bufferSize = 0;
    protected $flushTimer;

    // The maximum number of documents to keep in the buffer.
    // If the buffer reaches this amount of documents, then the buffers content
    // is send to elasticsearch for indexation.
    const MAX_BUFFER_SIZE = 1000;

    public function __construct(ElasticaService $elasticaService)
    {
        $this->elasticaService = $elasticaService;

        // Highly Sci-fi code
        $this->flushTimer = new Timer();
        // Flush buffer after 5 minutes of inactivity.
        $this->flushTimer->setTimeout(5 * 60);
        $this->flushTimer->setCallback([$this, 'flush']);
    }

    /**
     * Destructor
     *
     * Flush any documents that remain in the buffer.
     */
    public function __destruct()
    {
        $this->flush();
    }

    /**
     * Add a document to the indexation buffer.
     */
    public function onMessage(array $document)
    {
        // Prepare the document for indexation.
        $this->doHeavyWeightStuff($document);

        // Create an Elastica document
        $document = new \Elastica\Document(
            $document['key'],
            $document
        );

        // Add the document to the buffer.
        $this->buffer[] = $document;

        // Flush the buffer when max buffersize has been reached.
        if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
            $this->flush();
        } else {
            // Start a timer that will flush the buffer after a timeout.
            $this->initTimer();
        }
    }

    /**
     * Send the current buffer to ElasticSearch for indexation.
     */
    public function flush()
    {
        // Send documents to ElasticSearch for indexation.
        if (1 <= $this->bufferSize) {
            $this->elasticaService->addDocuments($this->buffer);
        }

        // Clear buffer
        $this->buffer = [];
        $this->bufferSize = 0;

        // There are no longer messages to be send, stop the timer.
        $this->flushTimer->stop();
    }

    protected function initTimer()
    {
        // Start or restart timer
        $this->flushTimer->isRunning()
          ? $this->flushTimer->reset()
          : $this->flushTimer->start();
    }
}

Now, I know about the limitations of PHP not being event driven. But this is 2015 and there are solutions like ReactPHP, so this should be possible right? For ØMQ there is this function. What would be a solution that will work for RabbitMQ or independent of any message queue extension?

Solutions that I'm skeptical about:

  1. There is crysalead/code. It simulates a timer using declare(ticks = 1);. I'm not sure wether this is a performant and solid approach. Any ideas?
  2. I could run a cronjob that publishes a 'FLUSH' message to the same queue every 5 minutes and then explicitly flush the buffer when receiving this message but that would be cheating.
Sander Toonen
  • 3,463
  • 35
  • 54
  • Not completely what you are looking for but might be a good in between solution. Store the time when you last ran the `flush` command and when you add documents also check the time. If it's been more than 5 minutes flush anyway. 2nd best option is the cronjob IMHO – Ken Foncé Dec 16 '15 at 14:27
  • The point is, when you don't receive any messages for a long period, you can't check the time and therefore the buffer won't get flushed. A cronjob runs PHP in a different process and can therefore not access the buffer. – Sander Toonen Dec 16 '15 at 16:34
  • So your running that code in a long running php process? Cause in that case you could probably use signals (just like your number 1 option does) have a look [here](http://www.hackingwithphp.com/16/1/1/taking-control-of-php-pcntl_signal) and [here](http://www.hackingwithphp.com/16/1/2/timing-your-signals). These signals are non-blocking, haven't used it myself yet, but it might just be the thing you need for your use-case. – Ken Foncé Dec 16 '15 at 17:31

1 Answers1

0

As I've mentioned in my comment you could use the signals. PHP allows you to register signal handlers to your scripts signals (i.e. SIGINT, SIGKILL etc.)

For your use-case you can use the SIGALRM signal. This signal will alarm your script after a certain time (that you can set) has expired. Positive side of these signals is that they are non-blocking. In other words the normal operation of your script will not be interfered with.

The adjusted solution (ticks are deprecated since PHP 5.3):

function signal_handler($signal) {
    // You would flush here
    print "Caught SIGALRM\n";
    // Set the SIGALRM timer again or it won't trigger again
    pcntl_alarm(300);
}

// register your handler with the SIGALRM signal
pcntl_signal(SIGALRM, "signal_handler", true);
// set the timeout for the SIGALRM signal to 300 seconds
pcntl_alarm(300);

// start loop and check for pending signals
while(pcntl_signal_dispatch() && your_loop_condition) {
    //Execute your code here
}

Note: you can only use 1 SIGALRM signal in your script, if you set the time of your signal with pcntl_alarm the timer for your alarm will reset (without firing the signal) to its newly set value.

Ken Foncé
  • 56
  • 4
  • Yes, that does the same as the ``crysalead/code`` project I mentioned in my question. But that uses "declare ticks" and I doubt whether interrupting the PHP execution after each statement is a performant solution. Do you have any experience with that? – Sander Toonen Dec 16 '15 at 20:23
  • Also, the link you've provided links to a page that explains that using ticks is deprecated. Most mentions of ticks state that using ticks is in most cases an anti-pattern. I'm therefore interested whether there is an alternative. – Sander Toonen Dec 16 '15 at 21:07
  • Your right, I overlooked the part stated ticks were deprecated. I did some digging and found a non-deprecated alternative. By using `pcntl_signal_dispatch()` you can determine yourself when to check for pending signals i.s.o. running the handler every tick. The adjusted solution will be more performant too.. Hope this helps. – Ken Foncé Dec 17 '15 at 08:35
  • Thank you but I think you don't understand the actual problem I have. I can't poll anything, there is no loop that I can use to check things once in a while. The RabbitMQBundle calls a function whenever a message is received, if I would enter a loop, that would be blocking and ``php-amqplib`` would stop reading from the socket. – Sander Toonen Dec 17 '15 at 08:58
  • I thought you had access to the long running process yes. In that case I don't think its achievable in PHP to do what you want. The flow you're looking for is achievable in lower-level languages. That's why ØMQ has the kind of method you're looking for. As said before by yourself the cronjob that publishes a message to your queue every 5 minutes is the best option. Migrating to ØMQ for your messaging/queues is another option but I realise it is not what you're looking for. – Ken Foncé Dec 17 '15 at 09:49