I'd like to parallel into 3 threads. Then I read the output, if there's returning output more than 99, stop the two other threads. Then main thread will give an output as "99+". Otherwise if not reach 99, store it as is (integer value) then wait until other threads end with giving another value, then accumulate it. In short, accumulate value from all of those threads. If more than 99, give it as "99+" then stop unfinished thread. This is how I implemented it:
RequestDTO request; //this is http request data
ExecutorService executor = Executors.newFixedThreadPool(3);
//for flagging purpose, for counting how many sub threads end
//but I can't reference it directly just like I did to DTOResponse totalAll;
short asyncFlag = 0;
Cancellable
cancellableThreads1,
cancellableThreads2,
cancellableThreads3;
DTOResponse totalAll = new DTOResponse(); totalAll.total = 0;
LOGGER.info("start threads 1");
cancellableThreads1 =
Uni.createFrom().item(asyncFlag)
.runSubscriptionOn(executor).subscribe().with(consumer ->
{//it runs on new thread
Response response = method1(request).await().indefinitely();
LOGGER.info("got uniMethod1!");
DTOResponse totalTodo = response.readEntity(DTOResponse.class);
Integer total =(Integer) totalTodo.total;
totalAll.total = (Integer) totalAll.total + total;
LOGGER.info("total thread1 done: "+total);
if ((Integer) totalAll.total > 99){
totalAll.total = "99+";
}
//as I mentioned on comments above, I can't refer asyncFlag directly, so I put those as .item() parameter
//then I just refer it as consumer, but no matter how many consumer increase, it not change the asyncFlag on main thread
consumer++;
});
LOGGER.info("thread 1 already running asynchronus");
LOGGER.info("start threads 2");
cancellableThreads2 =
Uni.createFrom().item(asyncFlag)
.runSubscriptionOn(executor).subscribe().with(consumer ->
{//it runs on new thread
Response response = method2(request).await().indefinitely();
LOGGER.info("got uniMethod2!");
DTOResponse totalTodo = response.readEntity(DTOResponse.class);
Integer total =(Integer) totalTodo.total;
totalAll.total = (Integer) totalAll.total + total;
LOGGER.info("total thread2 done: "+total);
if ((Integer) totalAll.total > 99){
totalAll.total = "99+";
}
//as I mentioned on comments above, I can't refer asyncFlag directly, so I put those as .item() parameter
//then I just refer it as consumer, but no matter how many consumer increase, it not change the asyncFlag on main thread
consumer++;
});
LOGGER.info("thread 2 already running asynchronus");
LOGGER.info("start threads 3");
cancellableThreads2 =
Uni.createFrom().item(asyncFlag)
.runSubscriptionOn(executor).subscribe().with(consumer ->
{//it runs on new thread
Response response = method3(request).await().indefinitely();
LOGGER.info("got uniMethod3!");
DTOResponse totalTodo = response.readEntity(DTOResponse.class);
Integer total =(Integer) totalTodo.total;
totalAll.total = (Integer) totalAll.total + total;
LOGGER.info("total thread3 done: "+total);
if ((Integer) totalAll.total > 99){
totalAll.total = "99+";
}
//as I mentioned on comments above, I can't refer asyncFlag directly, so I put those as .item() parameter
//then I just refer it as consumer, but no matter how many consumer increase, it not change the asyncFlag on main thread
consumer++;
});
LOGGER.info("thread 3 already running asynchronus");
do{
//executed by main threads.
//I wanted to block in here until those condition is met
//actually is not blocking thread but forever loop instead
if(totalAll.total instanceof String || asyncFlag >=3){
cancellableThreads1.cancel();
cancellableThreads2.cancel();
cancellableThreads3.cancel();
}
//asyncFlag isn't increase even all of 3 threads has execute consumer++
}while(totalAll.total instanceof Integer && asyncFlag <3);
ResponseBuilder responseBuilder = Response.ok().entity(totalAll);
return Uni.createFrom().item("").onItem().transform(s->responseBuilder.build());
totalAll
is able to be accessed by those subthreads, but not asyncFlag
. my editor gave me red line with Local variable asyncFlag defined in an enclosing scope must be final or effectively finalJava(536871575)
if asyncFlag
written inside subthreads block. So I use consumer
but it doesn't affected. Making loop is never ending unless total value turned into String (first condition)