1

I'd like to parallel into 3 threads. Then I read the output, if there's returning output more than 99, stop the two other threads. Then main thread will give an output as "99+". Otherwise if not reach 99, store it as is (integer value) then wait until other threads end with giving another value, then accumulate it. In short, accumulate value from all of those threads. If more than 99, give it as "99+" then stop unfinished thread. This is how I implemented it:

    RequestDTO request; //this is http request data
    ExecutorService executor = Executors.newFixedThreadPool(3);
    
    //for flagging purpose, for counting how many sub threads end
    //but I can't reference it directly just like I did to DTOResponse totalAll;
    short asyncFlag = 0;

    Cancellable
        cancellableThreads1,
        cancellableThreads2,
        cancellableThreads3;
    
    DTOResponse totalAll = new DTOResponse(); totalAll.total = 0;

    LOGGER.info("start threads 1");
    cancellableThreads1 =
        Uni.createFrom().item(asyncFlag)
        .runSubscriptionOn(executor).subscribe().with(consumer ->
        {//it runs on new thread
            Response response = method1(request).await().indefinitely();
            LOGGER.info("got uniMethod1!");
            DTOResponse totalTodo = response.readEntity(DTOResponse.class);
            Integer total =(Integer) totalTodo.total;
            totalAll.total = (Integer) totalAll.total + total;
            LOGGER.info("total thread1 done: "+total);
            if ((Integer) totalAll.total > 99){
                totalAll.total = "99+";
            }
            //as I mentioned on comments above, I can't refer asyncFlag directly, so I put those as .item() parameter
            //then I just refer it as consumer, but no matter how many consumer increase, it not change the asyncFlag on main thread
            consumer++;
    });
    LOGGER.info("thread 1 already running asynchronus");

    LOGGER.info("start threads 2");
    cancellableThreads2 =
        Uni.createFrom().item(asyncFlag)
        .runSubscriptionOn(executor).subscribe().with(consumer ->
        {//it runs on new thread
            Response response = method2(request).await().indefinitely();
            LOGGER.info("got uniMethod2!");
            DTOResponse totalTodo = response.readEntity(DTOResponse.class);
            Integer total =(Integer) totalTodo.total;
            totalAll.total = (Integer) totalAll.total + total;
            LOGGER.info("total thread2 done: "+total);
            if ((Integer) totalAll.total > 99){
                totalAll.total = "99+";
            }
            //as I mentioned on comments above, I can't refer asyncFlag directly, so I put those as .item() parameter
            //then I just refer it as consumer, but no matter how many consumer increase, it not change the asyncFlag on main thread
            consumer++;
    });
    LOGGER.info("thread 2 already running asynchronus");

    LOGGER.info("start threads 3");
    cancellableThreads2 =
        Uni.createFrom().item(asyncFlag)
        .runSubscriptionOn(executor).subscribe().with(consumer ->
        {//it runs on new thread
            Response response = method3(request).await().indefinitely();
            LOGGER.info("got uniMethod3!");
            DTOResponse totalTodo = response.readEntity(DTOResponse.class);
            Integer total =(Integer) totalTodo.total;
            totalAll.total = (Integer) totalAll.total + total;
            LOGGER.info("total thread3 done: "+total);
            if ((Integer) totalAll.total > 99){
                totalAll.total = "99+";
            }
            //as I mentioned on comments above, I can't refer asyncFlag directly, so I put those as .item() parameter
            //then I just refer it as consumer, but no matter how many consumer increase, it not change the asyncFlag on main thread
            consumer++;
    });
    LOGGER.info("thread 3 already running asynchronus");
    
    do{
        //executed by main threads.
        //I wanted to block in here until those condition is met
        //actually is not blocking thread but forever loop instead
        if(totalAll.total instanceof String || asyncFlag >=3){
            cancellableThreads1.cancel();
            cancellableThreads2.cancel();
            cancellableThreads3.cancel();
        }
        //asyncFlag isn't increase even all of 3 threads has execute consumer++
    }while(totalAll.total instanceof Integer && asyncFlag <3);

    ResponseBuilder responseBuilder = Response.ok().entity(totalAll);
    return Uni.createFrom().item("").onItem().transform(s->responseBuilder.build());

totalAll is able to be accessed by those subthreads, but not asyncFlag. my editor gave me red line with Local variable asyncFlag defined in an enclosing scope must be final or effectively finalJava(536871575) if asyncFlag written inside subthreads block. So I use consumer but it doesn't affected. Making loop is never ending unless total value turned into String (first condition)

RxGianYagami
  • 76
  • 1
  • 10
  • Does this answer your question? [Is Java "pass-by-reference" or "pass-by-value"?](https://stackoverflow.com/questions/40480/is-java-pass-by-reference-or-pass-by-value) – tgdavies Jun 28 '22 at 07:31
  • yes I am able to pass by reference once I make my ```asyncFlag``` as an object, or as an array. But that post isn't talk about passing reference between threads, which makes a bit different topics. – RxGianYagami Jun 30 '22 at 05:15

1 Answers1

1

You are better switching gears to use a reactive(-native) approach to your problem.

Instead of subscribing to each Uni then collecting their results individually in an imperative approach monitoring their progress, here down the series of steps that you should rather use in a rxified way:

  • Create all your Uni request-representing objects with whatever concurrency construct you would like: Uni#emitOn
  • Combine all your requests Unis into a Multi merging all of your initial requests executing them concurrently (not in an ordered fashion): MultiCreatedBy#merging
  • Scan the Multi emitted items, which are your requests results, as they come adding each item to an initial seed: MultiOnItem#scan
  • Keep on skipping the items sum until you first see a value exceeding a threshold (99 in your case) in which case you let the result flow through your stream pipeline: MultiSkip#first (not that the skip stage will automatically cancel upstream requests hence stop any useless request processing already inflight)
  • In case no item has been emitted downstream, meaning that the requests sum has not exceeded the , you sum up the initial Uni results (which are cached to avoid re-triggering the requests): UniOnNull#ifNull

Here down a pseudo implementation of the described stages:

public Uni<Response> request() {
    RequestDTO request; //this is http request data
    Uni<Object> requestOne = method1(request)
            .emitOn(executor)
            .map(response -> response.readEntity(DTOResponse.class))
            .map(dtoResponse -> dtoResponse.total)
            .memoize()
            .atLeast(Duration.ofSeconds(3));
    Uni<Object> requestTwo = method2(request)
            .emitOn(executor)
            .map(response -> response.readEntity(DTOResponse.class))
            .map(dtoResponse -> dtoResponse.total)
            .memoize()
            .atLeast(Duration.ofSeconds(3));
    Uni<Object> requestThree = method3(request)
            .emitOn(executor)
            .map(response -> response.readEntity(DTOResponse.class))
            .map(dtoResponse -> dtoResponse.total)
            .memoize()
            .atLeast(Duration.ofSeconds(3));

    return Multi.createBy()
            .merging()
            .withConcurrency(1)
            .streams(requestOne.toMulti(), requestTwo.toMulti(), requestThree.toMulti())
            .onItem()
            .scan(() -> 0, (result, itemTotal) -> result + (Integer) itemTotal)
            .skip()
            .first(total -> total < 99)
            .<Object>map(ignored -> "99+")
            .toUni()
            .onItem()
            .ifNull()
            .switchTo(
                    Uni.combine()
                            .all()
                            .unis(requestOne, requestTwo, requestThree)
                            .combinedWith((one, two, three) -> (Integer) one + (Integer) two + (Integer) three)
            )
            .map(result -> Response.ok().entity(result).build());
}
tmarwen
  • 15,750
  • 5
  • 43
  • 62