4

I am trying to fetch a large dataset from Couchbase reactively. I use the ReactiveCouchbaseRepository provided by spring data.

public interface ReactiveFooRepository extends ReactiveCouchbaseRepository<Foo, String> {

    @Query("#{#n1ql.selectEntity} WHERE ... ORDER BY ...")
    Flux<Foo> findAll();
}

In my service I subscribe as follows

repository.findAll()
          .subscribe(this::process,
                     t -> LOGGER.error("Failed to process.", t));

On my test dataset this works fine, in production however, the query returns a large data set and runs about 20-25 seconds.

My understanding was that this is exactly what reactive repositories are meant for: consuming large results without the need for explicit paging etc.

What I am getting however is an IllegalStateException

Failed to process.
java.lang.IllegalStateException: The content of this Observable (queryRow.13de03e2-9271-47e2-9d56-df01038011f9) is already released. Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting.
...

Raising the autoreleaseAfter timeout does not seem to be a reliable solution. It seems as if the entire result is buffered in Couchbase before publishing the first result element.

Am I getting something wrong? Any ideas on what the problem or solution might be?

Edit

One problem I found was ordering. I assume that Couchbase has to fetch the entire result set before it can order it. After removing the ORDER BY clause I can stream the results, but only when using the java SDK directly.

The following piece of code works and starts streaming data immediately:

public void applyToAllFooAsync(Consumer<Optional<Foo>> consumer) {
    String queryString = String.format("SELECT meta().id, _class, field1, field2, ... "
                                           + "FROM %s "
                                           + "WHERE ... ",
                                       getQuotedBucketName());

    N1qlParams params = N1qlParams.build().consistency(ScanConsistency.STATEMENT_PLUS).pretty(false);
    N1qlQuery query = N1qlQuery.simple(queryString, params);

    asyncBucket
        .query(query)
        .flatMap(AsyncN1qlQueryResult::rows)
        .map(this::getFoo)
        .forEach(consumer::accept);
}

protected Optional<Foo> getFoo(AsyncN1qlQueryRow row) {
    try {
        return Optional.of(objectMapper.readValue(row.byteValue(), clazz));
    } catch (IOException e) {
        LOGGER.warn("Could not map Foo to object.", e);
        return Optional.empty();
    }
}

applyToAllOperationsAsync(System.out::println));

If I use the the exact same query in a ReactiveCouchbaseRepository, however, it takes a couple of seconds and then the exception shown above is thrown.

Does somebody have an idea where the different behaviour might stem from? Can somebody point me at the class or method where the actual java SDK is used in the spring data code?

Johannes Jasper
  • 861
  • 1
  • 7
  • 30
  • 2
    when you run the query in the `n1ql` tool does it stream results? how long before the server starts answering? that might be what prevents the query from subscribing to the stream of rows in Java – Simon Baslé Jul 17 '18 at 16:06
  • Hey @SimonBaslé sorry for the long delay. With `n1ql` tool you mean the java SDK? I updated my question accordingly. I found a working solution using the SDK directly but I would love to use the ReactiveRepository instead. Can you tell me where the SDK is used so that I can debug ui? – Johannes Jasper Jul 23 '18 at 11:07

0 Answers0