2

Is there an operator in RxJava, an external library or a way I'm missing to create a flowable/observable that recieves a function that controls the emission of data, like a valve?

I have a huge json file I need to process but I have to get a portion of the file, a list of entities, process it and then get another portion, I have tried using windows(), buffer() but the BiFunction I pass to Flowable.generate() keeps executing after I recieved the first list and I haven't finished processing it. I also tried FlowableTransformers.valve() from hu.akarnokd.rxjava3.operators but it just piles up the items before the flatMap() function that process the list

private Flowable<T> flowable(InputStream inputStream) {

    return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {

        final var token = jsonParser.nextToken();

        if (token == null) {
            emitter.onComplete();
        }

        if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
            return jsonParser;
        }

        if (JsonToken.START_OBJECT.equals(token)) {
            emitter.onNext(reader.readValue(jsonParser));
        }

        return jsonParser;
    }, JsonParser::close);
}

Edit: I need to control de emission of items to don't overload the memory and the function that process the data, because that function reads and writes to database, also the processing needs to be sequentially. The function that process the data it's not entirely mine and it's written in RxJava and it's expected that I use Rx.

I managed to solve it like this but if there is another way let me know please:

public static <T> Flowable<T> flowable(InputStream inputStream, JsonFactory jsonFactory, ObjectReader reader, Supplier<Boolean> booleanSupplier) {
    return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {

        if (booleanSupplier.get()) {
            final var token = jsonParser.nextToken();

            if (token == null) {
                emitter.onComplete();
            }

            if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
                return jsonParser;
            }

            if (JsonToken.START_OBJECT.equals(token)) {
                emitter.onNext(reader.readValue(jsonParser));
            }

        }
        
        return jsonParser;
    }, JsonParser::close);
}

Edit2: This is one of the ways I'm currently consuming the function

public Flowable<List<T>> paging(Function<List<T>, Single<List<T>>> function) {
    final var atomicInteger = new AtomicInteger(0);
    final var atomicBoolean = new AtomicBoolean(true);

    return flowable(inputStream, jsonFactory, reader, atomicBoolean::get)
            .buffer(pageSize)
            .flatMapSingle(list -> {

                final var counter = atomicInteger.addAndGet(1);

                if (counter == numberOfPages) {
                    atomicBoolean.set(false);
                }

                return function.apply(list)
                        .doFinally(() -> {
                            if (atomicInteger.get() == numberOfPages) {
                                atomicInteger.set(0);
                                atomicBoolean.set(true);
                            }
                        });
            });
}
Yaz
  • 93
  • 1
  • 8
  • What problem are you trying to solve (see xyproblem.info)? Do you absolutely need to use RxJava, e.g. for academic purposes? This thread gives some solutions to stream a large JSON file using Java, but not using RxJava: https://stackoverflow.com/questions/9390368/java-best-approach-to-parse-huge-extra-large-json-file – DV82XL Jul 18 '20 at 15:40
  • 1
    @DV82XL I added more information to the question, I need to RxJava since this is a small part of an RxJava chain – Yaz Jul 18 '20 at 17:24
  • Thanks for clarifying the question. – DV82XL Jul 18 '20 at 17:33
  • Generate will call the lambda as many times as there is a request already. How are you processing the json values after? Many operators have overloads with a prefetch or capacityHint parameter to limit the outstanding request amounts. – akarnokd Jul 20 '20 at 06:28
  • @akarnokd I updated the post with an example, the json values are processed and saved to database, this operation is very expensive from I what I have been told so I need a way to control how many data I'm sending – Yaz Jul 20 '20 at 18:06

1 Answers1

0

Managed to solve it like this

 public static Flowable<Object> flowable(JsonParser jsonParser, ObjectReader reader, PublishProcessor<Boolean> valve) {
    return Flowable.defer(() -> {
        final var token = jsonParser.nextToken();

        if (token == null) {

            return Completable.fromAction(jsonParser::close)
                    .doOnError(Throwable::printStackTrace)
                    .onErrorComplete()
                    .andThen(Flowable.empty());
        }


        if (JsonToken.START_OBJECT.equals(token)) {
            final var value = reader.readValue(jsonParser);
            final var just = Flowable.just(value).compose(FlowableTransformers.valve(valve, true));
            return Flowable.concat(just, flowable(jsonParser, reader, valve));
        }


        return flowable(jsonParser, reader, valve);
    });
}
Yaz
  • 93
  • 1
  • 8