5

I'm using a worker with the Symfony 4 messenger component.

This worker is

  • receiving a message (from rabbitMQ)
  • launch ffmpeg
  • do a treatment on a video
  • and save something in a database.

To configure this worker on Symfony I've done this (middleware are important):

// config/packages/framework.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    # each time a message is handled, the Doctrine connection
                    # is "pinged" and reconnected if it's closed. Useful
                    # if your workers run for a long time and the database
                    # connection is sometimes lost
                    - doctrine_ping_connection

                    # After handling, the Doctrine connection is closed,
                    # which can free up database connections in a worker,
                    # instead of keeping them open forever
                    - doctrine_close_connection

        transports:
            ffmpeg:
              dsn: '%env(CLOUDAMQP_URL)%'
              options:
                auto_setup: false
                exchange:
                    name: amq.topic
                    type: topic
                queues:
                  ffmpeg: ~

        routing:
            # Route your messages to the transports, for now all are AMQP messages
            'App\Api\Message\AMQPvideoFFMPEG': ffmpeg
        ## Handle multiple buses ? https://symfony.com/doc/current/messenger/multiple_buses.html
        ## When queries and command should be distinguished

Then in order to understand what may cause this issue I've try to debug the messenger to see if the middleware are correctly configured

root@b9eec429cb54:/var/www/html# php bin/console debug:messenger

Messenger
=========

command_bus
-----------

 The following messages can be dispatched:

 ------------------------------------------------------ 
  App\Api\Message\AMQPvideoFFMPEG                       
      handled by App\Api\Message\Handler\FFMPEGHandler  
 ------------------------------------------------------ 

Everything seems ok right ?

So how is this possible to see this :

[2019-08-23 10:25:26] messenger.ERROR: Retrying App\Api\Message\AMQPvideoFFMPEG - retry #1. {"message":"[object] (App\Api\Message\AMQPvideoFFMPEG: {})","class":"App\Api\Message\AMQPvideoFFMPEG","retryCount":1,"error":"[object] (Doctrine\DBAL\Exception\ConnectionException(code: 0): An exception occurred in driver: SQLSTATE[HY000] [2002] Connection timed out at /var/www/html/vendor/doctrine/dbal/lib/Doctrine/DBAL/Driver/AbstractMySQLDriver.php:93, Doctrine\DBAL\Driver\PDOException(code: 2002): SQLSTATE[HY000] [2002] Connection timed out at /var/www/html/vendor/doctrine/dbal/lib/Doctrine/DBAL/Driver/PDOConnection.php:31, PDOException(code: 2002): SQLSTATE[HY000] [2002] Connection timed out at /var/www/html/vendor/doctrine/dbal/lib/Doctrine/DBAL/Driver/PDOConnection.php:27)"} []

I'm completely lost, Have I missed something ?

This happens sometimes, but it works most of the time, I suppose this bug happen when my worker has lost the connection to DB especially if ffmpeg treatment last 7 minutes or higher, but this should be avoided by the ping and the close connection's middlewares. So i don't clearly understand what is the problem here.

Greco Jonathan
  • 2,517
  • 2
  • 29
  • 54
  • 1
    Did you check MySQL's logs? This middleware of yours isn't perfect. You may hit connection lost error if the ffmpeg's conversion time is greater than the mysql's idle limit. Instead you should "disconnect & reconnect" in your code, right before any SQL insert attempts (eg. before the Doctrine's flush). I'd personally go for a Doctrine event preFlush/postFlush event subscriber instead: just disconnect. Doctrine will reconnect during the flush. – Mike Doe Aug 23 '19 at 13:40
  • It is a good idea emix, but right now I do'nt know how to do this to be only valid for this handler – Greco Jonathan Aug 23 '19 at 13:48
  • 1
    It's going to be a service. Inject it into the handler and call a setter `setEnabled(true)`. By default make it "disabled" and don't disconnect. – Mike Doe Aug 23 '19 at 14:41
  • @emix can you write an answer with your solution ? I would like to try it but don't know what to do. – Greco Jonathan Aug 27 '19 at 15:33
  • can you tell us something about the context? small cloud database, or full blown db server? I ask because I stumbled over a very limited amount of allowed database connections once in a cloud setting (~10). – Jakumi Aug 28 '19 at 15:42
  • @Jakumi Its a docker cluster supervised by kubernetes, with MySQL pods, all isolated behing an nginx reverse proxy. – Greco Jonathan Aug 29 '19 at 09:51

2 Answers2

3

After reading the code of my middlewares and especially this block

https://github.com/symfony/symfony/blob/4.4/src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php

class DoctrinePingConnectionMiddleware extends AbstractDoctrineMiddleware
{
    protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
    {
        $connection = $entityManager->getConnection();
        if (!$connection->ping()) {
            $connection->close();
            $connection->connect();
        }
        if (!$entityManager->isOpen()) {
            $this->managerRegistry->resetManager($this->entityManagerName);
        }
        return $stack->next()->handle($envelope, $stack);
    }
}

We can see that my handler is called right after the connection openning. This behaviour is supposed to work, I assume it is, but FFMPEG can work during a long time with the same RabbitMQ's message. So the last step of my handler that would insert something into the database can provide a mySQL has gone away error, or connection timed out.

That's why, I took this snippet and put it into a method without the call of the handler stuff only the code related to doctrine connect, then I call this just before any insert into my DB like this :

public function __invoke(AMQPvideoFFMPEG $message)
    {
        // reset connection if not found
        $this->processService->testConnection();
        $process = $this->processService->find($message->getProcess());
        $this->renderServcie->updateQueue($process->getQueue(), "processing");

// some other stuff
}

Where testConnection() method is

 /**
     * Reconnect if connection is aborted for some reason
     */
    public function testConnection()
    {
        $connection = $this->entityManager->getConnection();
        if (!$connection->ping()) {
            $connection->close();
            $connection->connect();
        }

    }

But I’ve experimented another issue after that

Resetting a non-lazy manager service is not supported. Set the "doctrine.orm.default_entity_manager" service as lazy and require "symfony/proxy-manager-bridge" in your composer.json file instead.

After installing "symfony/proxy-manager-bridge", the error was gone.

So far no connection timed out was experienced. Wait and see.

Greco Jonathan
  • 2,517
  • 2
  • 29
  • 54
  • Why do you reset the manager? Make your own middleware: simply `disconnect()` if the `$envelope->getMessage() instanceof DisconnectsBeforeHandling`. Of course you have to create the interface, but it doesn't need to contain any methods. – Mike Doe Aug 29 '19 at 10:01
  • Why make my own middleware, this one is done by much more experienced developpers and not need to be replaced IMO, in fact since the handler is called after, it would not change anything at all about ffmpeg and possible disconnections from MYSQL. – Greco Jonathan Aug 29 '19 at 10:08
  • They're just people, like you and me. They also make mistakes. They won't cover every edge case either. Your case is a perfect example: imagine a handler which does a lengthy job (say 5 minutes) and your mysql's timeout setting is set to 1 minute. The handler will fail, obviously. – Mike Doe Aug 29 '19 at 10:43
  • @emix true but for now I need to be pragmatic and just make this work. – Greco Jonathan Aug 29 '19 at 10:52
  • I told you how to approach this about an hour ago in another comment. And you did good, you don't have "connect" manually, Doctrine will do this for you. – Mike Doe Aug 29 '19 at 11:34
1

Simply disconnect before any insert operation:

public function handle(…)
{
    // your time-consuming business logic

    // disconnect if needed
    if (!$this->entityManager->getConnection()->ping()) {
        $this->entityManager->getConnection()->close();
    }

    // save your work
    $this->entityManager->flush();
}
Mike Doe
  • 16,349
  • 11
  • 65
  • 88
  • The middleware is not the real issue here, it closes the connection and open it again juste before my handler and FFMPEG start to work. The connection timed out arrives during the long process of ffmpeg treatment. And then your middleware will not be called again. You solution about a subscriber is more like a good solution to me than this answer. The vendor middleware do the job anyway. – Greco Jonathan Aug 29 '19 at 10:13
  • Oh ok, like I said before, you have to disconnect before the flush - manually. Just inject the connection into your handler and `disconnect()` manually after you have done the conversion job. – Mike Doe Aug 29 '19 at 10:27