I am trying to fetch a large dataset from Couchbase reactively.
I use the ReactiveCouchbaseRepository
provided by spring data.
public interface ReactiveFooRepository extends ReactiveCouchbaseRepository<Foo, String> {
@Query("#{#n1ql.selectEntity} WHERE ... ORDER BY ...")
Flux<Foo> findAll();
}
In my service I subscribe as follows
repository.findAll()
.subscribe(this::process,
t -> LOGGER.error("Failed to process.", t));
On my test dataset this works fine, in production however, the query returns a large data set and runs about 20-25 seconds.
My understanding was that this is exactly what reactive repositories are meant for: consuming large results without the need for explicit paging etc.
What I am getting however is an IllegalStateException
Failed to process.
java.lang.IllegalStateException: The content of this Observable (queryRow.13de03e2-9271-47e2-9d56-df01038011f9) is already released. Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting.
...
Raising the autoreleaseAfter
timeout does not seem to be a reliable solution.
It seems as if the entire result is buffered in Couchbase before publishing the first result element.
Am I getting something wrong? Any ideas on what the problem or solution might be?
Edit
One problem I found was ordering. I assume that Couchbase has to fetch the entire result set before it can order it.
After removing the ORDER BY
clause I can stream the results, but only when using the java SDK directly.
The following piece of code works and starts streaming data immediately:
public void applyToAllFooAsync(Consumer<Optional<Foo>> consumer) {
String queryString = String.format("SELECT meta().id, _class, field1, field2, ... "
+ "FROM %s "
+ "WHERE ... ",
getQuotedBucketName());
N1qlParams params = N1qlParams.build().consistency(ScanConsistency.STATEMENT_PLUS).pretty(false);
N1qlQuery query = N1qlQuery.simple(queryString, params);
asyncBucket
.query(query)
.flatMap(AsyncN1qlQueryResult::rows)
.map(this::getFoo)
.forEach(consumer::accept);
}
protected Optional<Foo> getFoo(AsyncN1qlQueryRow row) {
try {
return Optional.of(objectMapper.readValue(row.byteValue(), clazz));
} catch (IOException e) {
LOGGER.warn("Could not map Foo to object.", e);
return Optional.empty();
}
}
applyToAllOperationsAsync(System.out::println));
If I use the the exact same query in a ReactiveCouchbaseRepository
, however, it takes a couple of seconds and then the exception shown above is thrown.
Does somebody have an idea where the different behaviour might stem from? Can somebody point me at the class or method where the actual java SDK is used in the spring data code?