4

For a huge project, with a lot of entities, I wrote a save() common method.

This method is stored in an abstract service and is used in all the project to save entities state.

AbstractService::save() looks like this :

public function save($entity)
{
    $transactionStarted = $this->beginTransaction();

    try
    {
        $action = $entity->getId() ? self::UPDATE : self::CREATION;

        $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);

        $this->getEntityManager()->persist($entity);
        $this->getEntityManager()->flush();

        $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);

        if ($transactionStarted)
        {
            $this->commitTransaction();
        }
    } catch (\Exception $e)
    {
        if ($transactionStarted)
        {
            $this->rollbackTransaction();
        }

        throw new Exception('Unable to save entity', $e);
    }

    return true;
}

public function beginTransaction()
{
    if (!$this->getEntityManager()->getConnection()->isTransactionActive())
    {
        $this->getEntityManager()->getConnection()->beginTransaction();

        return true;
    }

    return false;
}

public function commitTransaction()
{
    $this->getEntityManager()->getConnection()->commit();

    return $this;
}

public function rollbackTransaction()
{
    $this->getEntityManager()->getConnection()->rollBack();

    return $this;
}

In my case, when a member is inserted (new Member entity) when calling the Member service (extended AbstractService), an email is sent (e.g) through the save.post event. Or another action related to another service calling save method too can be proceed.

Example of the "child" MemberService::save() method

MemberService

public function save(Member $member)
{
    // some stuff, e.g set a property
    $member->setFirstName('John');

    return parent::save($member);
}

Example of triggered event

$sharedEventManager->attach(MemberService::class, 'save.post', [$this, 'onMembersCreation']);

public function onMembersCreation(EventInterface $event)
{
    // send an email

    // anything else ... update another entity ... (call AnotherService::save() too) 
}

That's great for a simple saving process.

But now, I want to massively import a lot of members, with creations, updates, ... And to achieve that, I read the Doctrine doc related to bulk imports. Doc here

But how to update my code properly to handle "bulk saving" and "single saving" ? And keep transactions security and events ?

ceadreak
  • 1,662
  • 2
  • 18
  • 27
  • What is " a lot of members " ? 1k ? 1M ? Your answer will define which strategy you should adaopt – JesusTheHun Apr 13 '16 at 11:26
  • Hi JesusTheHun, thank you to be the first to be interested by my issue :) "a lot of members" is from 4k to 10k – ceadreak Apr 13 '16 at 12:33
  • Is this a one shot import or does it have to run often ? Underlying question is : does performance matter ? – JesusTheHun Apr 13 '16 at 12:43
  • This functionality will be used everyday, so yes, performance is very important. Here, for a 500 records file, it takes ~4minutes. – ceadreak Apr 13 '16 at 14:51

2 Answers2

1

Basically I suggest you implement the Doctrine\Common\Collections\Collection interface, maybe extending ArrayCollection, and create a method save that will do what the doc told you to.

<?php

class MyDirtyCollection extends \Doctrine\Common\Collections\ArrayCollection {

    public function __construct(AbstractService $abstractService)
    {
        $this->service = $abstractService;
    }

    public function save()
    {
        foreach ($this as $entity) {
            $this->service->save($entity);
        }
    }
}

class MyCollection extends \Doctrine\Common\Collections\ArrayCollection {

    public $bulkSize = 500;

    protected $eventManager;
    protected $entityManager;

    public function __construct(EntityManager $entityManager, EventManager $eventManager)
    {
        $this->entityManager = $entityManager;
        $this->eventManager = $eventManager;
    }

    public function getEventManager()
    {
        return $this->eventManager;
    }

    public function getEntityManager()
    {
        return $this->entityManager;
    }

    public function setBulkSize(int $bulkSize)
    {
        $this->bulkSize = $bulkSize;
    }

    public function save()
    {
        $transactionStarted = $this->getEntityManager()->getConnection()->beginTransaction();

        try {
            foreach ($this as $entity) {
                $action = $entity->getId() ? self::UPDATE : self::CREATION;
                $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);
            }

            $i = 0;
            foreach ($this as $entity) {
                $i++;

                $this->getEntityManager()->persist($entity);

                if (($i % $this->bulkSize) === 0) {
                    $this->getEntityManager()->flush();
                    $this->getEntityManager()->clear();
                }
            }

            $this->getEntityManager()->flush();
            $this->getEntityManager()->clear();

            foreach ($this as $entity) {
                $action = $entity->getId() ? self::UPDATE : self::CREATION;
                $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);
            }

            if ($transactionStarted) {
                $this->getEntityManager()->getConnection()->commitTransaction();
            }

        } catch (Exception $e) {
            $this->getEntityManager()->rollbackTransaction();
        }
    }
}

Something like that ;) When you fetch your data you hydrate your collection, then you deal with your entity and finally call $collection->save();

EDIT : Add insert class and use case below :

The performance here will be low, but still better than commit by commit. Yet you should think about using Doctrine DBAL instead of the ORM if you are looking for hgih performance. Here I share with you my DBAL class for bulk Insert :

<?php

namespace JTH\Doctrine\DBAL;

use Doctrine\DBAL\Query\QueryBuilder;
use Exception;
use InvalidArgumentException;
use Traversable;
use UnderflowException;

class Insert extends QueryBuilder
{
    const CALLBACK_FAILURE_SKIP = 0;
    const CALLBACK_FAILURE_BREAK = 1;

    protected $callbackFailureStrategy = self::CALLBACK_FAILURE_BREAK;

    public static $defaultBulkSize = 500;

    public $ignore = false;
    public $onDuplicate = null;

    public function values(array $values)
    {
        $this->resetQueryPart('values');
        $this->addValues($values);
    }

    public function addValues(array $values)
    {
        $this->add('values', $values, true);
    }

    public function setCallbackFailureStrategy($strategy)
    {
        if ($strategy == static::CALLBACK_FAILURE_BREAK) {
            $this->callbackFailureStrategy = static::CALLBACK_FAILURE_BREAK;
        } elseif ($strategy == static::CALLBACK_FAILURE_SKIP) {
            $this->callbackFailureStrategy = static::CALLBACK_FAILURE_SKIP;
        } else {
            $class = self::class;
            throw new InvalidArgumentException(
                "Invalid failure behaviour. See $class::CALLBACK_FAILURE_SKIP and $class::CALLBACK_FAILURE_BREAK"
            );
        }
    }

    public function getCallbackFailureStrategy()
    {
        return $this->callbackFailureStrategy;
    }

    public function execute()
    {
        return $this->getConnection()->executeUpdate(
            $this->getSQLForInsert(),
            $this->getParameters(),
            $this->getParameterTypes()
        );
    }

    /**
     * Converts this instance into an INSERT string in SQL.
     * @return string
     * @throws \Exception
     */
    private function getSQLForInsert()
    {
        $count = sizeof($this->getQueryPart('values'));

        if ($count == 0) {
            throw new UnderflowException("No values ready for INSERT");
        }

        $values = current($this->getQueryPart('values'));
        $ignore = $this->ignore ? 'IGNORE' : '' ;
        $sql = "INSERT $ignore INTO " . $this->getQueryPart('from')['table'] .
            ' (' . implode(', ', array_keys($values)) . ')' . ' VALUES ';

        foreach ($this->getQueryPart('values') as $values) {
            $sql .= '(' ;

            foreach ($values as $value) {
                if (is_array($value)) {
                    if ($value['raw']) {
                        $sql .= $value['value'] . ',';
                    } else {
                        $sql .= $this->expr()->literal($value['value'], $value['type']) . ',';
                    }
                } else {
                    $sql .= $this->expr()->literal($value) . ',';
                }
            }

            $sql = substr($sql, 0, -1);
            $sql .= '),';
        }

        $sql = substr($sql, 0, -1);

        if (!is_null($this->onDuplicate)) {
            $sql .= ' ON DUPLICATE KEY UPDATE ' . $this->onDuplicate . ' ';
        }

        return $sql;
    }

    /**
     * @param $loopable array | Traversable An array or object to loop over
     * @param $callable Callable A callable that will be called before actually insert the row.
     * two parameters will be passed :
     * - the key of the current row
     * - the row values (Array)
     * An array of rows to insert must be returned
     * @param $bulkSize int How many rows will be inserted at once
     * @param bool $transactionnal
     * @throws \Doctrine\DBAL\ConnectionException
     * @throws \Exception
     */
    public function bulk($loopable, callable $callable, $bulkSize = null, $transactionnal = true)
    {
        if (!is_array($loopable) and !($loopable instanceof Traversable)) {
            throw new InvalidArgumentException("\$loppable must be either an array or a traversable object");
        }

        $bulkSize = $bulkSize ?? static::$defaultBulkSize;

        $this->getConnection()->getConfiguration()->setSQLLogger(null); // Avoid MonoLog memory overload

        if ($transactionnal) {
            $this->getConnection()->beginTransaction();
        }

        $this->resetQueryPart('values');

        foreach ($loopable as $key => $values) {
            try {
                $callbackedValues = $callable($key, $values);

                if (sizeof($callbackedValues) > 0) {
                    foreach ($callbackedValues as $callbackedValuesRow) {
                        $this->addValues($callbackedValuesRow);
                    }
                }
            } catch (Exception $e) {
                /*
                 * If a callback exception must break the transaction, then throw the exception to the call stack
                 * Else, skip the row insertion
                 */
                if ($this->callbackFailureStrategy == static::CALLBACK_FAILURE_BREAK) {
                    throw $e;
                } else {
                    continue;
                }
            }

            $count = count($this->getQueryPart('values'));

            if ($count >= $bulkSize) {
                $this->execute();
                $this->resetQueryPart('values');
            }
        }

        $count = count($this->getQueryPart('values'));

        if ($count > 0) {
            $this->execute();
        }

        $this->resetQueryPart('values');

        if ($transactionnal) {
            $this->getConnection()->commit();
        }
    }

    /**
     * @return boolean
     */
    public function isIgnore()
    {
        return $this->ignore;
    }

    /**
     * @param boolean $ignore
     */
    public function setIgnore(bool $ignore)
    {
        $this->ignore = $ignore;
    }

    /**
     * @return null|string
     */
    public function getOnDuplicate() : string
    {
        return $this->onDuplicate;
    }

    /**
     * @param null $onDuplicate
     */
    public function setOnDuplicate($onDuplicate)
    {
        $this->onDuplicate = $onDuplicate;
        $this->ignore = false;
    }


}

Use case :

    try {
        $i = new Insert($this->getDoctrine()->getConnection('myDB'));
        $i->insert('myTable');
        $i->setOnDuplicate('col1 = VALUES(col1), updated_last = NOW()');
        $i->setCallbackFailureStrategy(Insert::CALLBACK_FAILURE_BREAK);
        $i->bulk($myArrayOfRows, function ($key, $row) {

            // Some pre-insert processing

            $rowset[] = $row;

            return $rowset;

        }, 500, true);

        $this->addFlash('success', 'Yay !');

    } catch (DBALException $e) {
        $this->addFlash('error', 'Damn, error : ' . $e->getMessage());
    }
JesusTheHun
  • 1,217
  • 1
  • 10
  • 19
  • Mhhh I'm not sure about that, I tried a very similar method with array, not collection but had problems with transactions ... I'll test again with a collection to check if the problem is the same – ceadreak Apr 13 '16 at 14:55
  • What was the problem with the transactions ? Check my update also – JesusTheHun Apr 13 '16 at 15:00
  • @ JesusTheHun I'll test your solution this p.m, thanks for the time – ceadreak Apr 14 '16 at 09:09
  • I tried an implementation with ArrayCollection solution. So, I wrote a `saveBulk` method called if `save()` param is an ArrayCollection. But now, it try to insert some data in a completely other table (?) And this table's related entity has no relation to the `Member` entity. Weird behavior ... – ceadreak Apr 14 '16 at 15:32
  • You can check http://stackoverflow.com/questions/36337538/doctrine2-bulk-import-try-to-work-with-another-entity to have more details about the entities relation. (I had the same problem there are 2 weeks ago and still unsolved !) – ceadreak Apr 14 '16 at 16:03
  • This seems not related to this issue, I would recommend to turn xdebug on and dig deep down to see what is actually happening. But for what I see from here, it looks like you implement a cascade persistance, so doctrine tries to fill Member::$client itself having a cascade to Groups ... It may explain why you see another table in your insert statement error message. – JesusTheHun Apr 15 '16 at 09:19
  • @ JesusTheHun : You're right ! I removed the `cascade={"persist", "merge"}` constraint on `Client::$group` but now, Doctrine try to persist another entity. I also removed a `ManyToOne` cascade constraints on the target property of this entity and Doctrine try to persist another entity ... Do you think `cascade={"persist", "merge"}` is useful on `ManyToOne` bi directional relations ? Or just use it on `OneToMany` – ceadreak Apr 15 '16 at 17:08
  • 1
    Using cascade or not really depends on the use you make of your entities. If you allow modification of your collections from different places (functionally speaking), it is wise to disable cascade in order to preserve a fine control on it. If you have a single form to control a entity and its related collection, I would recommend the use of cascade option. It's up to you :) – JesusTheHun Apr 16 '16 at 16:23
  • MMhhh and in this special case, a `client` can be set in the `group` single control form. If I correctly understand what you said, I should use cascade option on `Client::group` property in this case. But if I let the cascade option on `Client::group` property's ManyToOne relation, a bulk update / insert of a `member` tries to insert into `group`. It's a little confused for this use case but +1 for your answer – ceadreak Apr 16 '16 at 19:59
  • If you have only one place where cascade may be useful, it may be interesting to simply disable cascade, and update manually. You can also disable cascade at runtime using metadata setter, even if it's a bit hacky, it's a "legal hack" (I mean doctrine developers are aware of the great possibilities of this so they haven't lock the setter)? Once the cascade disable, enjoy yourself :D – JesusTheHun Apr 18 '16 at 10:20
  • please, check my answer and let me know if you have some recommendations. – ceadreak May 10 '16 at 12:50
0

Finally, I used merge doctrine method and it seems to work great.

I wrote a separate AbstractService::saveBulk() method to save a high number of Member entities such as :

    /**
     * @param ArrayCollection $entities
     *
     * @return bool
     * @throws Exception
     */
    public function saveBulk(ArrayCollection $entities)
    {
        $batchSize = 100;
        $i         = 0;

        foreach ($entities as $entity)
        {
            $transactionStarted = $this->beginTransaction();

            try
            {
                $action = $entity->getId() ? self::UPDATE : self::CREATION;

                $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);

                $entity = $this->getEntityManager()->merge($entity);

                $this->getEntityManager()->persist($entity);
                $this->getEntityManager()->flush();

                $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);

                if (($i % $batchSize) === 0)
                {
                    $this->getEntityManager()->clear();
                }

                if ($transactionStarted)
                {
                    $this->commitTransaction();
                }
            } catch (\Exception $e)
            {
                if ($transactionStarted)
                {
                    $this->rollbackTransaction();
                }

                throw new Exception(Exception::UNEXPECTED_ERROR, 'Unable to save entity', $e);
            }
        }

        $this->getEntityManager()->clear();

        return true;
    }

contrary to the doctrine2 documentation, I just call clear() and not flush() + clear() for each batch, because for some events called, I need to know if the entity has a database identifier.

@JesusTheHun thanks for your comments that help me a lot.

ceadreak
  • 1,662
  • 2
  • 18
  • 27
  • You start a transaction for every entity instead of once for your bulk. Why don't you use the method I gave you in the class MyCollection ? ( http://stackoverflow.com/questions/36390063/zend-framework-2-doctrine-2-bulk-operations-and-events-triggering/36602128#36602128 ) If you flush on every entity, it makes no sense, you have to flush every $batchSize. You batch size var must be a parameter, a class constant or an object property initialized on construct or a static property. Also, using the modulo condition, and you have 121 entities for exemple, you will not commit the last 21 entities. – JesusTheHun May 11 '16 at 10:31
  • I tried your method but without merge, still have this error `A new entity was found through the relationship ...`. `beginTransaction()` method checks if a transaction is already starter, I don't start e transaction for every entity – ceadreak May 11 '16 at 15:14
  • I tried to use your method again with a merge before persistance, and it works now. for 500 entities, it takes 4 seconds less with your methods than mine ;). Answer accepted +1 – ceadreak May 11 '16 at 15:27