1

In my application, while I make INSERT into Oracle I got a lot of exception about the double insert.

My test code looks like that

class SomeClass{
EntityManager em;
Dao dao;

@Override
void insert(String a, String b){
    MyObject object =new MyObject(a,b);
    dao.insertObject(object);
    }
}

class OtherClass{
    private final ExecutorService completableFutureExecutor =
            new ThreadPoolExecutor(10, 11, 30L, TimeUnit.SECONDS, new SynchronousQueue<>());

    public void method() {

        Runnable task1 = () -> dao.insert("a","b");
        for (int i = 0; i < 5; i++) {
            completableFutureExecutor.submit(task1);
        }
    }
}

In openJpa log I see something like that

240981  JpaPersistenceUnit  TRACE  [pool-25-thread-3] openjpa.jdbc.SQL - <t 1427395137, conn 1023570122> executing prepstmnt 743213969 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240983  JpaPersistenceUnit  TRACE  [pool-25-thread-9] openjpa.jdbc.SQL - <t 1116539025, conn 246735198> executing prepstmnt 468904024 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-5] openjpa.jdbc.SQL - <t 2107513837, conn 1168031715> executing prepstmnt 1872262728 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-1] openjpa.jdbc.SQL - <t 1881630463, conn 2024928498> executing prepstmnt 1258578230 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-7] openjpa.jdbc.SQL - <t 1202968848, conn 1876787130> executing prepstmnt 1733696457 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]

240998  JpaPersistenceUnit  TRACE  [pool-25-thread-9] openjpa.jdbc.SQL - <t 1116539025, conn 246735198> executing prepstmnt 752805342 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params= (String) a,   (String) b]
240999  JpaPersistenceUnit  TRACE  [pool-25-thread-3] openjpa.jdbc.SQL - <t 1427395137, conn 1023570122> executing prepstmnt 1035550395 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params= (String) a,   (String) b]
240999  JpaPersistenceUnit  TRACE  [pool-25-thread-5] openjpa.jdbc.SQL - <t 2107513837, conn 1168031715> executing prepstmnt 1439514282 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]
241000  JpaPersistenceUnit  TRACE  [pool-25-thread-1] openjpa.jdbc.SQL - <t 1881630463, conn 2024928498> executing prepstmnt 1158780577 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]
241000  JpaPersistenceUnit  TRACE  [pool-25-thread-7] openjpa.jdbc.SQL - <t 1202968848, conn 1876787130> executing prepstmnt 1082517334 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]



41018  JpaPersistenceUnit  TRACE  [pool-25-thread-4] openjpa.Runtime - An exception occurred while ending the transaction.  This exception will be re-thrown.<openjpa-2.4.0-r422266:1674604 fatal store error> org.apache.openjpa.util.StoreException: The transaction has been rolled back.  See the nested exceptions for details on the errors that occurred.
FailedObject: com.test.SomeClass@19df04ab
 at org.apache.openjpa.kernel.BrokerImpl.newFlushException(BrokerImpl.java:2368)
 at org.apache.openjpa.kernel.BrokerImpl.flush(BrokerImpl.java:2205)
 at org.apache.openjpa.kernel.BrokerImpl.flushSafe(BrokerImpl.java:2103)
 at org.apache.openjpa.kernel.BrokerImpl.beforeCompletion(BrokerImpl.java:2021)
 at org.apache.openjpa.kernel.LocalManagedRuntime.commit(LocalManagedRuntime.java:81)
 at org.apache.openjpa.kernel.BrokerImpl.commit(BrokerImpl.java:1526)
 at org.apache.openjpa.kernel.DelegatingBroker.commit(DelegatingBroker.java:932)
 at org.apache.openjpa.persistence.EntityManagerImpl.commit(EntityManagerImpl.java:569)
 at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:514)
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
 at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:475)
 at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:270)
 at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:94)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:633)
 at com.test.Dao$$EnhancerBySpringCGLIB$$c4aa5f08.insertObject(<generated>)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
 at org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.doInvoke(ServiceInvoker.java:58)
 at org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.invoke(ServiceInvoker.java:62)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
 at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invokeUnprivileged(ServiceTCCLInterceptor.java:56)
 at org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invoke(ServiceTCCLInterceptor.java:39)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.osgi.service.importer.support.LocalBundleContextAdvice.invoke(LocalBundleContextAdvice.java:59)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
 at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
 at com.sun.proxy.$Proxy255.insertObject(Unknown Source)
 at com.test.OtherClass.lambda$method$5(OtherClass.java:146)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: <openjpa-2.4.0-r422266:1674604 fatal store error> org.apache.openjpa.util.ObjectExistsException: ORA-00001: unique constraint (TABLE_PK) violated
 {prepstmnt 1119780936 INSERT INTO TABLE (COLUMN1, COLUMN2) VALUES (?, ?) [params=(String) a, (String) b]} [code=1, state=23000]
FailedObject: com.test.entities.Table@19df04ab
 at org.apache.openjpa.jdbc.sql.DBDictionary.narrow(DBDictionary.java:4986)
 at org.apache.openjpa.jdbc.sql.DBDictionary.newStoreException(DBDictionary.java:4961)
 at org.apache.openjpa.jdbc.sql.SQLExceptions.getStore(SQLExceptions.java:133)
 at org.apache.openjpa.jdbc.sql.SQLExceptions.getStore(SQLExceptions.java:75)
 at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushBatch(BatchingPreparedStatementManagerImpl.java:225)
 at org.apache.openjpa.jdbc.kernel.BatchingConstraintUpdateManager.flush(BatchingConstraintUpdateManager.java:63)
 at org.apache.openjpa.jdbc.kernel.AbstractUpdateManager.flush(AbstractUpdateManager.java:104)
 at org.apache.openjpa.jdbc.kernel.AbstractUpdateManager.flush(AbstractUpdateManager.java:77)
 at org.apache.openjpa.jdbc.kernel.JDBCStoreManager.flush(JDBCStoreManager.java:731)
 at org.apache.openjpa.kernel.DelegatingStoreManager.flush(DelegatingStoreManager.java:131)
 ... 43 more
Caused by: org.apache.openjpa.lib.jdbc.ReportingSQLException: ORA-00001: unique constraint (TABLE_PK) violated
 {prepstmnt 1119780936 INSERT INTO TABLE (COLUMN1, COLUMN2) VALUES (?, ?) [params=(String) a, (String) b]} [code=1, state=23000]
 at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.wrap(LoggingConnectionDecorator.java:218)
 at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.wrap(LoggingConnectionDecorator.java:194)
 at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.access$1000(LoggingConnectionDecorator.java:58)
 at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator$LoggingConnection$LoggingPreparedStatement.executeUpdate(LoggingConnectionDecorator.java:1133)
 at org.apache.openjpa.lib.jdbc.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:275)
 at org.apache.openjpa.jdbc.kernel.JDBCStoreManager$CancelPreparedStatement.executeUpdate(JDBCStoreManager.java:1791)
 at org.apache.openjpa.jdbc.kernel.PreparedStatementManagerImpl.executeUpdate(PreparedStatementManagerImpl.java:268)
 at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushSingleRow(BatchingPreparedStatementManagerImpl.java:254)
 at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushBatch(BatchingPreparedStatementManagerImpl.java:157)
 ... 48 more

How I can avoid that? Because A lot of errors like that I have in production.

UPD added new log into snippet.

My application locates on both servers (nodes). Each server connected to DB. So, my test we can multiply by 2.

Dred
  • 1,076
  • 8
  • 24
  • *"See the nested exceptions for details on the errors that occurred."* - What were those nested exceptions? – Stephen C Apr 18 '19 at 12:06
  • Nested exception said that Object already in DB – Dred Apr 18 '19 at 12:07
  • Please show us the exception + stacktrace. Put it into the question. – Stephen C Apr 18 '19 at 12:08
  • you could centralized the insertion in a unique thread using a `Executors. newSingleThreadExecutor()`. That "queue" is filled by multiple threads (assuming the threads would need some time to create the insertion). Then, based on the number of insertion, you can use a batch that is send every # seconds. This is the simplest management of concurrent insertion -> removing the concurrency! – AxelH Apr 18 '19 at 12:12
  • 1
    You need to do an "insert if not exists". There are at least 3 ways to do this. See the following questions: https://stackoverflow.com/questions/1702832/ , https://stackoverflow.com/questions/3147874 , https://stackoverflow.com/questions/10824764 ... and probably others. (Pick a description that you can understand.) – Stephen C Apr 18 '19 at 12:17
  • @StephenC, I added log. And now I' read your examples – Dred Apr 18 '19 at 12:19
  • Note that the above links are for vanilla SQL. With JPA ... there may be another approach. – Stephen C Apr 18 '19 at 12:21
  • In the database set a unique field and the database should throw an error back if you try to double insert – Starmixcraft Apr 18 '19 at 12:22
  • @Starmixcraft, The primary key is both columns, so it is complex primary key – Dred Apr 18 '19 at 12:23
  • Maybe there is something comparable in Oracle DB – Starmixcraft Apr 18 '19 at 12:28
  • @AxelH, your variant didn't reproduce exception,but it reproduce multiple insertion in DB. And I don't understand, How can I create Batch with only 1 Thread -1 question, and 2 question- it looks like synchronized from the Answer, isn't it? – Dred Apr 18 '19 at 12:34
  • 1
    What do you *want* to happen when inserting data which is already there? Why do you try to insert the same data concurrently in the first place? – JimmyB Apr 18 '19 at 12:47
  • I want it `INSERT` just once and if It already in DB, select it. But I have about 5 methods in different classes which can call that `INSERT` in time, because of that, that exception should be handled – Dred Apr 18 '19 at 12:50
  • To prevent duplication, you can create a unique constraint and catch the exception on your DAO. I don't see why the backend should filter those duplicate, this logic is better in the database. Concurrency =/= Duplication – AxelH Apr 18 '19 at 13:10

4 Answers4

1

You could possibly synchronize your doa object, meaning it can only be run by one thread at a time.

  @Override
  void insert(String a, String b) {
    MyObject object = new MyObject(a, b);
    synchronized (dao) {
      dao.insertObject(object);
    }
  }

Something like the above

The main advantage of synchronized keyword is we can resolve data inconsistency problems. But the main disadvantage of synchronized keyword is it increases waiting time of thread and effects performance of the system. Hence, if there is no specific requirement it is never recommended to use synchronized keyword.

forzend
  • 36
  • 4
  • Thank you for your answer. I thought about that, but I have About 10-50 call per sec with queries like that. That will slow DB exchanging, won't it? – Dred Apr 18 '19 at 12:06
  • 2
    But if you can create two instance of your `dao`, the lock will not work – AxelH Apr 18 '19 at 12:07
  • I cant comment on the OPs main post yet but are you expecting to upsert data if it already exists @Dred ? Otherwise your Composite Key is working as intended, A + B can only be inserted once – forzend Apr 18 '19 at 12:29
  • @forzend, I know that it can be inserted only once, because of that I make `merge`. openJpa make itself select and if it return NULL it make INSERT – Dred Apr 18 '19 at 12:31
  • [this](https://stackoverflow.com/questions/21755787/jpa-multithreaded-add-if-not-exist) thread might not have the answer but also explains the issuse. We actually had a very similar issue with our application with transactional data. Our work around currently is `synchronized`. Although it does make it slower, it was the best way to ensure consistent data – forzend Apr 18 '19 at 12:41
0

Using a single EntityManager from concurrent threads is most likely a bad idea, as you may have seen from the logs: Best case is that an exception in one thread rolls back the single transaction of the EM which all other threads try to use too. Worst case is all kinds of concurrency problems, race conditions and whatnot.

I want it INSERT just once and if It already in DB, select it. But I have about 5 methods in different classes which can call that INSERT in time, because of that, that exception should be handled

In that case you will probaly have no other choice than to synchronize your DAO calls via some other means; could be done by locking on some existing record in the DB, or in any other way. You may also want to try EntityManager.merge(), but I don't think that would solve your problem of two seperate machines writing concurrently.

JimmyB
  • 12,101
  • 2
  • 28
  • 44
  • About `EntityManager.merge()` I use it now. What's about memcached and etc? – Dred Apr 18 '19 at 13:11
  • And If EntityManager bad idea for concurrency, which is better? I'm the newest in that – Dred Apr 18 '19 at 13:13
  • Each thread should have its own instance of `EntityManager`; and with it its own connection to the database and transaction. – JimmyB Apr 18 '19 at 13:14
  • "What's about memcached and etc?" - What do you mean? – JimmyB Apr 18 '19 at 13:16
  • So, if I have about 100 threads on the application it would be about 2*100 connection into DB intime?And even that, will be able to solve my problem while I have to insert in 0,001 second interval. `Memcache` is a library which allows saving information in memory. – Dred Apr 18 '19 at 13:18
  • Yes, if you want to have 100 DB accesses to happen concurrently, you will definitely need 100 connections to the database. - I fail to understand how a memory cache would resolve your problem of getting consistent data into a database. – JimmyB Apr 18 '19 at 14:55
  • Note that you can of course use some kind of connection pooling to limit the number of connections. Nevertheless each thread should have it's own *transaction(s)*. – JimmyB Apr 18 '19 at 14:56
0

I use now memcache, but it is not final solution

 private int expire = 2;

 public <T> void insert(Supplier<T> supplier, String... keyLine) throws InterruptedException, MemcachedException, TimeoutException {
        String key = ParseUtils.collectToKeyWithDot(keyLine);
        T value = getCache(key);
        if (value == null) {
            value = supplier.get();
            setCache(key, value);
        } 
    }

 private <T> Boolean setCache(String key, T value) throws InterruptedException, MemcachedException, TimeoutException {
        return memcacheClient.set(key, expire, value);
    }

    private <T> T getCache(String key) throws InterruptedException, MemcachedException, TimeoutException {
        return memcacheClient.get(key);
    }

Here, I store my first INSERT into DB and also into the cache for 2 seconds. In that 2 seconds, if any Thread tries to insert the same values into DB and wouldn't happen. First of all, it would be checked in the cache. For me, 2 seconds enough to avoid exceptions.

P.S. I'm still looking for a more elegant solution.

Dred
  • 1,076
  • 8
  • 24
0

You might want to consider a delayed write from the data access object, something like this -

class SomeClass {
    private EntityManager em;
    private Dao dao;
    private Set<MyObject> writableObjects = new HashSet<>();

    @Override
    public void insert(String a, String b) {
        MyObject object = new MyObject(a, b);
        writableObjects.add(object);
    }

    @override
    public void commit() {
        writableObjects.forEach(object -> dao.insertObject(object));
    }
}

class OtherClass {
    private final ExecutorService completableFutureExecutor = new ThreadPoolExecutor(10, 11, 30L, TimeUnit.SECONDS,
            new SynchronousQueue<>());

    public void method() {

        Runnable task1 = () -> dao.insert("a", "b");
        for (int i = 0; i < 5; i++) {
            completableFutureExecutor.submit(task1);
        }

        dao.commit();
    }
}
Rahul R.
  • 92
  • 5
  • Its a good idea to use `Set`, I thought about it, but include into `SET` not Object but full insert like a `String` like `String insert= "INSERT INTO TABLE ... "; Set inserts.add(insert)` to avoid double equals inserts. It looks like own Batch. `Set` could be with size like 5-10 and it should be atomic or concurrent. – Dred Apr 19 '19 at 05:32
  • Well, either ways it would be fine. As long as you have a reliable way to identify unique instances. Ideally should you want to add a custom object like that to a Set then, you might want to override the hashCode() and equals(...) methods. These days any good IDE would do a fairly good job, at generating them for you. – Rahul R. Apr 19 '19 at 15:24
  • how overriding of Hash can help me here? – Dred Apr 19 '19 at 15:28
  • A HashSet in java is backed internally by a the HashMap data structure, which uses the hashcode to determine the object's primary storage index; therefore implementing a reliable way to compute a hashcode (i.e. overriding the hashCode() method) would help you achieve efficient reads. – Rahul R. Apr 19 '19 at 16:06
  • Meanwhile, to further address your point of having an atomic or concurrent set; you are absolutely right about that, as your code would be modifying the same Set, with multiple threads. Following are some of the ways, in which you can create a concurrent Set - [1] Set myConcurrentSet = ConcurrentHashMap.newKeySet(); [2] Set myConcurrentSet = new ConcurrentSkipListSet(); – Rahul R. Apr 19 '19 at 16:10