0

I have the following test code:

  FlowableOnSubscribe<SomeObj> fos;

  private void init() {
    fos = emitter -> {
      try {
        while(true) {
          SomeObj someObj = readFromDataInputStream();
          emitter.onNext(someObj);
          System.out.println("Emitted object");
        }
      }
      catch(Exception e) {
        emitter.onError(e);
      }
    };
  }

  public Single<String> doWork() {
    Flowable<String> myFlow = Flowable.defer(() -> 
      Flowable.create(fos, BackpressureStrategy.BUFFER)
        .cache()
        .subscribeOn(Schedulers.io(), false)
        .doOnSubscribe(x -> doSomethingToTriggerDataInputStream())
        .map(x -> convertMyCustomObjectToString(x))
      );
    );

    // lastOrError/singleOrError end up blocking :(
    return myFlow.firstOrError(); 
  }

  public VertxDataFetcher<CompletionStage<String>> vertxDataFetcherTest() {
    return new VertxDataFetcher<>((env, future) -> {
      try {
        future.complete(doWork().to(SingleInterop.get()));
      }
      catch(Exception e) {
        future.fail();
      }
    });
  }

  public DataFetcher<CompletionStage<String>> dataFetcherTest() {
    return env -> doWork().to(SingleInterop.get());
  }

If I run the example code it hangs after the first successful use. In other words, it will run once on initial webpage load but if I do a refresh in my browser (Ctrl F5), it hangs and no longer completes the call.

FWIW, with a little bit of debugging it looks like the defer/map calls are happening on RxCachedThreadScheduler1, and after a webpage refresh it hangs in the defer call with RxCachedThreadScheduler2.

If I switch to using a separate input stream for each call it does not get hung up. The example I followed is here(RxJava: Feed one stream (Observable) as the input of another stream…).

However, that does not work with my design as it needs 1 shared connection that remains open the entire time. This is because I want the GraphQL subscription to be able to capture anything emitted on the data input stream. If I have multiple/separate socket connections, the GraphQL subscription input stream will miss anything emitted on the non-subscription input streams. (Unless the best option is to have every other input stream also emit to the GraphQL subscription stream, which I'm not sure how to do...)

As a sidenote, does it matter if I use VertxDataFetcher vs DataFetcher in this scenario? I am currently using the DataFetcher to run my example. If I need to switch to the VertxDataFetcher I'm not sure how to properly cast the types to get that method to work.

ekjcfn3902039
  • 1,673
  • 3
  • 29
  • 54

1 Answers1

0

There is a lot of concerning stuff going on.

Your emitter is clearly blocking, so the most important change you can try is call .observeOn(RxHelper.blockingScheduler(vertx)) when creating your Flowable. Read more here about how to use the proper schedulers.

That will probably not fix your issue though. You're probably reading data in a blocking, conventional Java IO way with readFromDataInputStream(), which might be thread safe but it's almost certainly does not support concurrent readers. You are trying to use concurrent readers, implicitly, as long as readFromDataInputStream() eventually reads from the same IO. See Java: Concurrent reads on an InputStream

DoctorPangloss
  • 2,994
  • 1
  • 18
  • 22
  • Hi, thanks for the input but I'm not sure it's what I'm looking for. The example is for one instance of the emitter, with multiple observers on it. I have also tried the RxHelper since this original post but it didn't work (as you also mentioned) – ekjcfn3902039 Oct 29 '19 at 13:13