2

The application I am working on is connecting to 2 postgresql databases at the same time.

The application is trying to perform an insert in a table with 3 columns on one of the two databases. The columns of the table are a string, a json and a timestamp.

When the application tries to perform the insert, it is getting the following error:

org.springframework.dao.InvalidDataAccessApiUsageException: Nested entities are not supported
    at org.springframework.data.r2dbc.convert.MappingR2dbcConverter.writePropertyInternal(MappingR2dbcConverter.java:413)
    at org.springframework.data.r2dbc.convert.MappingR2dbcConverter.writeProperties(MappingR2dbcConverter.java:380)
    at org.springframework.data.r2dbc.convert.MappingR2dbcConverter.writeInternal(MappingR2dbcConverter.java:358)
    at org.springframework.data.r2dbc.convert.MappingR2dbcConverter.write(MappingR2dbcConverter.java:350)
    at org.springframework.data.r2dbc.convert.MappingR2dbcConverter.write(MappingR2dbcConverter.java:64)
    at org.springframework.data.r2dbc.core.DefaultReactiveDataAccessStrategy.getOutboundRow(DefaultReactiveDataAccessStrategy.java:200)
    at org.springframework.data.r2dbc.core.R2dbcEntityTemplate.lambda$doInsert$9(R2dbcEntityTemplate.java:548)
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:152)
    at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
    at reactor.core.publisher.MonoUsingWhen.subscribe(MonoUsingWhen.java:87)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:102)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:102)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)

In the stack it says that "nested entities are not supported" but I think that the issue here is that the framework is not using the converters I have defined, in fact the entity doesn't have any relations with other entities. Following are the details of the entity the application fails to insert:

@Data
@AllArgsConstructor
@NoArgsConstructor
@Table("my_tab")
public class MyTab implements Persistable<String> {

    @Id
    @Column("identifier")
    private String identifier;

    @Column("data")
    private MyData data;

    @Column("created")
    private LocalDateTime created;


    @Transient
    @Override
    public String getId() {
        return identifier;
    }
}

MyData is basically a PoJo with other fields.

In order to allow the application to connect to two databases I have followed what is on the guide:

https://docs.spring.io/spring-data/r2dbc/docs/current/reference/html/#r2dbc.multiple-databases

Following are the connection factory objects I have defined:

@Configuration
@EnableR2dbcRepositories(basePackages = {"com.my.package.entity.persistence", "com.my.package.repository.myTab"}, entityOperationsRef =
        "myTabR2dbcEntityOperations")
public class FirstDatabaseConfig {

    private static final Logger LOG = LoggerFactory.getLogger(FirstDatabaseConfig.class);

    @Autowired
    private MyDataToJsonConverter myDataWriteConverter;
    @Autowired
    private JsonToMyDataConverter myDataReadConverter;

    /*
        Used to configure custom object converters for the database.
     */
   @Bean
    public R2dbcCustomConversions r2dbcCustomConversions() {
        Collection<?> converters = Arrays.asList(myDataWriteConverter, myDataReadConverter);
        R2dbcCustomConversions conversions = R2dbcCustomConversions.of(PostgresDialect.INSTANCE,converters);
        return conversions;
    }

    @Bean(name = "firstDatabaseConnectionFactory")
    public ConnectionFactory connectionFactory() {

        // URL, password, username are taken from an application.properties file
        
        ConnectionFactoryOptions connectionFactoryOptions = ConnectionFactoryOptions.parse(url);

        ConnectionFactoryOptions.Builder builder = ConnectionFactoryOptions.builder().from(connectionFactoryOptions);

        if (!StringUtils.isEmpty(username)) {
            builder = builder.option(ConnectionFactoryOptions.USER, username);
        }
        if (!StringUtils.isEmpty(password)) {
            builder = builder.option(ConnectionFactoryOptions.PASSWORD, password);
        }

        return ConnectionFactories.get(builder.build());
    }

    @Bean
    public R2dbcEntityOperations myTabR2dbcEntityOperations(@Qualifier("firstDatabaseConnectionFactory") ConnectionFactory connectionFactory) {

        DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);

        return new R2dbcEntityTemplate(databaseClient, PostgresDialect.INSTANCE);
    }
}

The other configuration class is similar to the one above but it doesn't use any converters as the other table in the other database has only simple data types.

Moreover, the guide https://docs.spring.io/spring-data/r2dbc/docs/current/reference/html/#mapping.configuration suggests how to use custom converters but it doesn't work either in my case. Also, it says to extend AbstractR2dbcConfiguration, which shouldn't be used if you are connecting to multiple databases, as underlined at the link on the same guide I posted before https://docs.spring.io/spring-data/r2dbc/docs/current/reference/html/#r2dbc.multiple-databases.

Finally the details of the r2dbc version I am using (gradle):

implementation group: "io.r2dbc", name: "r2dbc-postgresql", version: "0.8.8.RELEASE"
implementation "org.springframework.boot:spring-boot-starter-data-r2dbc"

The strange thing is that it was working before I have introduced the connection to the other database.

If someone might be able to help, that would be really appreciated.

Antonio
  • 61
  • 7
  • i am also stuck on custom converter for single DB. can you please help me here https://stackoverflow.com/questions/70584887/spring-r2dbc-custom-json-converter-not-working. – user1111880 Jan 07 '22 at 18:26
  • Hi @user1111880, I have had a look at your question. However you are connecting to a single database... It might be slightly different, but maybe you can try anyway. – Antonio Jan 14 '22 at 17:59

1 Answers1

3

I have solved this issue.

I've had to manually register a converter that is used with the connection to the database with the table for which a conversion is needed. This is the class where I have registered the converters:

@Component
public class FirstR2dbcConverters {

// These are the converters. Data is what is persisted to the DB whilst  Json is what is read from the DB.
private DataToJsonConverter dataWriteConverter;
private JsonToDataConverter dataReadConverter;

@Autowired
public FirstR2dbcConverters(DataToJsonConverter dataWriteConverter, JsonToDataConverter dataReadConverter){
    this.dataWriteConverter = dataWriteConverter;
    this.dataReadConverter = dataReadConverter;
}

/*
 *  Creates a new r2dbc converter using the one we have defined. This r2dbc converter will replace the default r2dbc converter used by the spring
 * framework.
 */
public MappingR2dbcConverter r2dbcConverter() {

    R2dbcMappingContext mappingContext = r2dbcMappingContext();
    R2dbcCustomConversions r2dbcCustomConversions = r2dbcCustomConversions();

    return new MappingR2dbcConverter(mappingContext, r2dbcCustomConversions);
}

private R2dbcMappingContext r2dbcMappingContext() {
    // Interface and default implementation of a naming strategy.
    NamingStrategy namingStrategy = NamingStrategy.INSTANCE;

    R2dbcCustomConversions r2dbcCustomConversions = r2dbcCustomConversions();

    R2dbcMappingContext context = new R2dbcMappingContext(namingStrategy);
    context.setSimpleTypeHolder(r2dbcCustomConversions.getSimpleTypeHolder());

    return context;
}

/*
    This is where we return our custom object converters for the database.
*/
@ExcludeFromJacocoGeneratedReport
private R2dbcCustomConversions r2dbcCustomConversions() {
    Collection<?> converters = Arrays.asList(dataWriteConverter, dataReadConverter);
    R2dbcCustomConversions conversions = R2dbcCustomConversions.of(PostgresDialect.INSTANCE,converters);
    return conversions;
}}

The other connection doesn't need anything like the above as there aren't tables with complex data types.

Now we need to create the configurations for the connections to the two databases.

In FirstDatabaseConfig we are going to use the class FirstR2dbcConverters, which will return us the converters we are going to use by default when connecting to First database.

@Configuration
@EnableR2dbcRepositories(basePackages = {"packages where the entities to persist are stored"}, entityOperationsRef =
    "firstR2dbcEntityOperations")
public class FirstDatabaseConfig {

@Value("${db.first.url}")
private String url;
@Value("${db.first.username}")
private String username;
@Value("${db.first.password}")
private String password;

@Autowired
private FirstR2dbcConverters firstR2dbcConfiguration;

@Bean(name = "firstConnectionFactory")
public ConnectionFactory connectionFactory() {

    // Parses a R2DBC Connection URL and returns the parsed ConnectionFactoryOptions
    // The  ConnectionFactoryOptions is  holder for configuration options related to ConnectionFactory.
    ConnectionFactoryOptions connectionFactoryOptions = ConnectionFactoryOptions.parse(url);

    // Create and populates a builder with the existing values from a configured ConnectionFactoryOptions
    // and returns it
    ConnectionFactoryOptions.Builder builder = ConnectionFactoryOptions.builder().from(connectionFactoryOptions);

    if (!StringUtils.isEmpty(username)) {
        builder = builder.option(ConnectionFactoryOptions.USER, username);
    }
    if (!StringUtils.isEmpty(password)) {
    // Don't take this as an example... the password should be kept in a config pwd encrypted.
      builder = builder.option(ConnectionFactoryOptions.PASSWORD, password);
    }
    // Returns a ConnectionFactory from an available implementation, created from a collection of ConnectionFactoryOptions
    return ConnectionFactories.get(builder.build());
}

@Bean
public R2dbcEntityOperations firstR2dbcEntityOperations(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {

    // Here we are saying that we are going to use the converters when connecting to the DB targeted by the connectionFactory
    DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);
    MappingR2dbcConverter customMappingConverter = firstR2dbcConfiguration.r2dbcConverter();
    return new R2dbcEntityTemplate(databaseClient, PostgresDialect.INSTANCE, customMappingConverter);
} }

However, when connecting to the other database, we don't need any converter:

  @Configuration
  @EnableR2dbcRepositories(basePackages = {"packages where the entities to persist are stored"}, entityOperationsRef =
    "secondR2dbcEntityOperations")
public class secondDatabaseConfig {

@Value("${db.second.url}")
private String url;
@Value("${db.second.username}")
private String username;
@Value("${db.second.password}")
private String password;

@Bean(name = "SecondDatabaseConfig")
public ConnectionFactory connectionFactory() {


    // Parses a R2DBC Connection URL and returns the parsed ConnectionFactoryOptions
    // The ConnectionFactoryOptions is  holder for configuration options related to ConnectionFactory.
    ConnectionFactoryOptions connectionFactoryOptions = ConnectionFactoryOptions.parse(url);

    // Create and populates a builder with the existing values from a ConnectionFactoryOptions and returns it
    ConnectionFactoryOptions.Builder builder = ConnectionFactoryOptions.builder().from(connectionFactoryOptions);

    if (!StringUtils.isEmpty(username)) {
        builder = builder.option(ConnectionFactoryOptions.USER, username);
    }
    if (!StringUtils.isEmpty(password)) {
       // Don't take this as an example... the password should be kept in a config pwd encrypted.
        builder = builder.option(ConnectionFactoryOptions.PASSWORD, password);
    }

    // Returns a ConnectionFactory from an available implementation, created from a collection of ConnectionFactoryOptions
    return ConnectionFactories.get(builder.build());
}

@Bean
public R2dbcEntityOperations secondR2dbcEntityOperations(@Qualifier("SecondDatabaseConfig")ConnectionFactory connectionFactory) {

    DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);

    return new R2dbcEntityTemplate(databaseClient, PostgresDialect.INSTANCE);
}}

So the spring wasn't picking up the converters when connecting to 2 databases so I've had to explicitly tell which converters to use for a specific connection.

Antonio
  • 61
  • 7