I'm trying to run a Symfony service to be a RabbitMQ queue consumer, writing to a database, and discarding duplicate messages. I've read various posts (like this one), that pointed me in the direction of $this->doctrine->resetManager(), but as yet I've failed to get it to work. My command extends ContainerAwareCommand and calls a service like this:
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->output = $output;
$rssvc = new ResultsStorageService($this->container, $this->logger);
$rssvc->storeResultsQueueListener();
}
The below code shows the ResultsStorageService constructor and called functions setting the entity manager up and resetting where necessary (in theory). It works first time perfectly, UniqueConstraintViolationException Exception found, which is immediately followed by $this->doctrine->resetManager(). But the second time round and thereafter it throws "The EntityManager is closed".
public function __construct($container, $logger)
{
$this->container = $container;
$this->logger = $logger;
$this->mq_host = $this->container->getParameter('api.mq.host');
$this->mq_port = $this->container->getParameter('api.mq.port');
$this->mq_score_queue = $this->container->getParameter('score_queue');
$this->doctrine = $this->container->get("doctrine");
$this->em = $this->container->get("doctrine.orm.entity_manager");
}
public function storeResultsQueueListener()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare($this->mq_score_queue, false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$channel->basic_qos(null, 1, null);
$channel->basic_consume($this->mq_score_queue, '', false, false, false, false, array($this, 'processMessage'));
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
public function processMessage(AMQPMessage $msg)
{
echo " [x] Received ", $msg->body, "\n";
$applyscore_msg = \GuzzleHttp\json_decode($msg->body, true);
$user_id = $applyscore_msg['user_id'];
$steam_id = $applyscore_msg['steam_id'];
$new_match = $applyscore_msg['result'];
$api = $this->container->get("some.api");
$result = MYAPI::mapRawToNormalisedResults($new_match, $steam_id);
try {
$em = $this->container->get("doctrine.orm.entity_manager");
if(!$em->isOpen())
{
$this->container->get("doctrine")->resetManager();
$em = $this->container->get("doctrine.orm.entity_manager");
}
$crp = new Results();
$crp = ResultsMapper::mapResults($api->getResultKeys(), $result, $crp);
$crp->setGame($this->game);
$crp->setUser($this->em->getReference('UserBundle:User', $user_id));
$em->persist($crp);
$em->flush($crp);
} catch (UniqueConstraintViolationException $e) {
echo " UniqueConstraintViolationException Exception found, ", "\n", $e->getCode(), "\n", $e->getMessage(), "\n", $e->getTraceAsString(), "\n";
$this->doctrine->resetManager();
echo " [x] Duplicate found, skipping", "\n";
} catch (PDOException $e) {
echo " PDO Exception found, ", "\n", $e->getCode(), "\n", $e->getMessage(), "\n", $e->getTraceAsString(), "\n";
$this->doctrine->resetManager();
echo " [x] Duplicate found, skipping", "\n";
} catch (\Exception $e) {
echo " Exception found, ", "\n", $e->getCode(), "\n", $e->getMessage(), "\n", $e->getTraceAsString(), "\n";
$this->doctrine->resetManager();
}
echo " [x] Done", "\n";
}
Anyone have any ideas on getting the EntityManager back after an exception in the context of a RabbitMQ consumer?
Thanks in advance.