0

I have a fairly specific scenario where I've run into problems:

I have a Camel route which tries to fetch data from an arbitrary number of datasources and insert those into a central database. If the targeted table does not exist in the central database, I will try to get the DDL from the source database. The creation of missing tables in the central database works great, but when the Camel route runs on its schedule the next time, it can't find the table, even though I can see it in my database tool. If I simply restart the spring boot instance, it finds the table and it works.

To give you a better idea of how it works, here's a slice of the route where it gets wrong:

from("direct:ensureTargetTableExists")
    .routeId("ensureTargetTableExists")
    .doTry()
        .toD(String.format("sql:select * from ${in.header.%s} where false?dataSource=%s&outputHeader=%s", QueryHandler.Properties.TARGET_TABLE, DataSourceHandler.Properties.TARGET_DATASOURCE_NAME, "tableExists"))
        .to("direct:handleResultSet")
    .doCatch(BadSqlGrammarException.class)
        .log("entered catch")
        .log(LoggingLevel.INFO, String.format("Could not find table ${in.header.%s} at %s, will attempt to create it by fetching DDL.", QueryHandler.Properties.TARGET_TABLE, DataSourceHandler.Properties.TARGET_DATASOURCE_NAME))
        .to("direct:fetchTableDdlFromSource")
    .endDoTry();

from("direct:fetchTableDdlFromSource")
    .routeId("fetchTableDdlFromSource")
    .streamCaching()
    .bean(DumpHandler.class, "generateDumpCommand")
    .toD(String.format("ssh:{{sync.dump.ssh.user}}@${in.header.%s}?password={{sync.dump.ssh.password}}", DataSourceHandler.Properties.FETCHED_FROM_HOST))
    .to("direct:applyDdlToTargetDatabase");

from("direct:applyDdlToTargetDatabase")
    .routeId("applyDdlToTargetDatabase")
    .transacted("targetTransactionPolicy")
    .toD(String.format("sql:?dataSource=%s&useMessageBodyForSql=true", DataSourceHandler.Properties.TARGET_DATASOURCE_NAME))
    .bean(FileUtil.class, "saveDdl")
    // Nothing else is called after this, since I want the transaction to be completed and commited
    .log(LoggingLevel.INFO, "Table will be populated next scheduled run.");

I let it run two iterations, and this is what happens:

entered catch
Could not find table article_flags at target, will attempt to create it by fetching DDL.
Saving DDL to config/sql/ddl/article_flags-20181206_162310.sql
Successfully created table article_flags
Table will be populated next scheduled run.
entered catch
Could not find table article_flags at target, will attempt to create it by fetching DDL.
Failed delivery for (MessageId: ID-overlord-1544109787810-0-12 on ExchangeId: ID-overlord-1544109787810-0-9). Exhausted after delivery attempt: 1 caught: org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar []; nested exception is org.postgresql.util.PSQLException: ERROR: relation "article_flags" already exists

I've tried to resolve this using transaction managers and transaction policies which I add when the application starts, but to no avail. As you can see I'm using the transaction policy targetTransactionPolicy in the camel route above.

public class ContextConfiguration implements BeanDefinitionRegistryPostProcessor {

    private static final Logger LOG = LoggerFactory.getLogger(ContextConfiguration.class);

    private Map<String, BasicDataSource> datasources;

    public ContextConfiguration(Map<String, BasicDataSource> datasources) {
        this.datasources = datasources;
    }

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        for (Entry<String, BasicDataSource> source : datasources.entrySet()) {
            LOG.info("Adding datasource {} targeting {} to Spring registry", source.getKey(), source.getValue().getUrl());
            PlatformTransactionManager transactionManager = new DataSourceTransactionManager(source.getValue());
            SpringTransactionPolicy transactionPolicy = new SpringTransactionPolicy(transactionManager);
            registry.registerBeanDefinition(source.getKey() + "TransactionPolicy", BeanDefinitionBuilder.genericBeanDefinition(SpringTransactionPolicy.class, () -> transactionPolicy).getBeanDefinition());
            registry.registerBeanDefinition(source.getKey() + "TransactionManager", BeanDefinitionBuilder.genericBeanDefinition(PlatformTransactionManager.class, () -> transactionManager).getBeanDefinition());
            registry.registerBeanDefinition(source.getKey(), BeanDefinitionBuilder.genericBeanDefinition(BasicDataSource.class, () -> source.getValue()).getBeanDefinition());
        }
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { }

}

I've been losing my hear over this the last couple of days and I can't seem to get past this. Any help or insights would be greatly appreciated. My best guess is that it is related to transactions, or the way I'm creating my transaction managers, policies and datasources.

Additional information:

  • Spring Boot 2.1.1
  • Camel 2.23.0
  • Underlying datasource is Postgres 10.6
jokarl
  • 1,913
  • 2
  • 24
  • 52

2 Answers2

0

The select query is failing because of the error :

relation "article_flags" already exists

May be this thread will help. Also I don't think it is related to transactions.

VarunKrish
  • 179
  • 1
  • 9
  • This does not seem to be the same isuse. I can remove it programatically or manually and, without restarting, the camel route will recreate it without errors. – jokarl Dec 07 '18 at 07:55
0

I think I solved it in satisfactory way. It was transactions as I suspected.

I added a transaction policy to my target datasource:

@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
    registerTargetTransactionPolicy(datasources.get("target"), registry);
    for (Entry<String, BasicDataSource> source : datasources.entrySet()) {
        LOG.info("Adding datasource {} targeting {} to Spring registry", source.getKey(), source.getValue().getUrl());
        registry.registerBeanDefinition(source.getKey(), BeanDefinitionBuilder.genericBeanDefinition(BasicDataSource.class, () -> source.getValue()).getBeanDefinition());
    }
}

private void registerTargetTransactionPolicy(BasicDataSource datasource, BeanDefinitionRegistry registry) {
    PlatformTransactionManager transactionManager = new DataSourceTransactionManager(datasource);
    SpringTransactionPolicy transactionPolicy = new SpringTransactionPolicy(transactionManager);
    transactionPolicy.setPropagationBehaviorName("PROPAGATION_REQUIRES_NEW");
    registry.registerBeanDefinition(TARGET_REQUIRES_NEW_POLICY, BeanDefinitionBuilder.genericBeanDefinition(SpringTransactionPolicy.class, () -> transactionPolicy).getBeanDefinition());
}

I then use it in my camel route using .policy(ContextConfiguration.TARGET_REQUIRES_NEW_POLICY) and it works flawlessly.

jokarl
  • 1,913
  • 2
  • 24
  • 52