I have the following test code:
FlowableOnSubscribe<SomeObj> fos;
private void init() {
fos = emitter -> {
try {
while(true) {
SomeObj someObj = readFromDataInputStream();
emitter.onNext(someObj);
System.out.println("Emitted object");
}
}
catch(Exception e) {
emitter.onError(e);
}
};
}
public Single<String> doWork() {
Flowable<String> myFlow = Flowable.defer(() ->
Flowable.create(fos, BackpressureStrategy.BUFFER)
.cache()
.subscribeOn(Schedulers.io(), false)
.doOnSubscribe(x -> doSomethingToTriggerDataInputStream())
.map(x -> convertMyCustomObjectToString(x))
);
);
// lastOrError/singleOrError end up blocking :(
return myFlow.firstOrError();
}
public VertxDataFetcher<CompletionStage<String>> vertxDataFetcherTest() {
return new VertxDataFetcher<>((env, future) -> {
try {
future.complete(doWork().to(SingleInterop.get()));
}
catch(Exception e) {
future.fail();
}
});
}
public DataFetcher<CompletionStage<String>> dataFetcherTest() {
return env -> doWork().to(SingleInterop.get());
}
If I run the example code it hangs after the first successful use. In other words, it will run once on initial webpage load but if I do a refresh in my browser (Ctrl F5), it hangs and no longer completes the call.
FWIW, with a little bit of debugging it looks like the defer/map calls are happening on RxCachedThreadScheduler1, and after a webpage refresh it hangs in the defer call with RxCachedThreadScheduler2.
If I switch to using a separate input stream for each call it does not get hung up. The example I followed is here(RxJava: Feed one stream (Observable) as the input of another stream…).
However, that does not work with my design as it needs 1 shared connection that remains open the entire time. This is because I want the GraphQL subscription to be able to capture anything emitted on the data input stream. If I have multiple/separate socket connections, the GraphQL subscription input stream will miss anything emitted on the non-subscription input streams. (Unless the best option is to have every other input stream also emit to the GraphQL subscription stream, which I'm not sure how to do...)
As a sidenote, does it matter if I use VertxDataFetcher vs DataFetcher in this scenario? I am currently using the DataFetcher to run my example. If I need to switch to the VertxDataFetcher I'm not sure how to properly cast the types to get that method to work.