0

Currently I am using join() to join my current thread. As far as i know join will make it plain sequential code. I want to do it in multithreading.

public class A {

private static final Logger logger = LoggerFactory.getLogger(A.class);

@Value("${query.interval}")
private String queryInterval;

private Set<B> missingKeys = new HashSet<>();
private Map<E, String> erroredKeys;

public Map<B, Collection<Product>> methodA(
        List<B> searchKeyList) {

    long startTime = System.currentTimeMillis();        
    missingKeys = new HashSet<>();
    erroredKeys = new HashMap<>();
    int size = searchKeyList.size();
    int threadNumber = 0;
    int startIndex = 0;
    int endIndex = 0;
    List<C> c = new ArrayList<>();

    int qrySize = Integer.parseInt(queryInterval);

    logger.info("Size of searchKeyList [{}] of instrument look up", new Object[] { searchKeyList.size() });
    for (; threadNumber < size / rdsQrySize; threadNumber++) {

        startIndex = threadNumber * rdsQrySize;
        endIndex = startIndex + rdsQrySize;

        logger.debug("Creating thread for Instrument LookUp");
        c = createThread(threadNumber, startIndex, endIndex,
                searchKeyList, c);

    }

    if (size % rdsQrySize != 0) {

        startIndex = threadNumber * rdsQrySize;
        endIndex = startIndex + size % rdsQrySize;

        logger.debug("Creating EXTRA thread for Instrument LookUp");

        c = createThread(requestor, businessLine, resolutionEnum, threadNumber, startIndex, endIndex,
                searchKeyList, c);

    }

    // Here I don't want to use join. I am looking for any other way to do this
    // to make my code run in multithreaded way

    for (C lookUpThread : c) {
        try {
            lookUpThread.join();
        } catch (InterruptedException e) {

        }
    }

    Map<B, Collection<Product>> responseDataList = new HashMap<>();
    for (C lookUpThread : c) {

        Map<B, Collection<Product>> instrumentResponseData = lookUpThread.getInstrumentResponse()
                .getFoundData();
        missingKeys.addAll(lookUpThread.getInstrumentResponse().getMissingKeys());
        erroredKeys.putAll(lookUpThread.getInstrumentResponse().getErroredKeys());
        responseDataList.putAll(instrumentResponseData);

    }

    long stopTime = System.currentTimeMillis();

    logger.info(
            "[{}] milliseconds taken to fetch [{}] instruments from RDS divided in [{}] threads  ",
            new Object[] { stopTime - startTime,size,  c.size() });

    return responseDataList;

}

private List<C> createThread(int threadNumber, int startIndex, int endIndex,
        List<B> searchKeyList, List<C> c) {

    List<B> searchKeys = new ArrayList<>();
    for (; startIndex < endIndex; startIndex++) {
        searchKeys.add(searchKeyList.get(startIndex));
    }

    ProductRequest<B> request = new ProductRequest<>(
            searchKeys);

    logger.info("Creating  thread no [{}] for Instrument LookUp", new Object[]{threadNumber});
    C lookUpThread = new C("RDS Instrument Thread - " + threadNumber);
    lookUpThread.setRequest(request);
    lookUpThread.start();

    c.add(lookUpThread);


    return c;
}

public Set<B> getMissingKeys() {
    return missingKeys;
}

public void setMissingKeys(Set<B> missingKeys) {
    this.missingKeys = missingKeys;
}

public Map<E, String> getErroredKeys() {
    return erroredKeys;
}

public void setErroredKeys(Map<E, String> erroredKeys) {
    this.erroredKeys = erroredKeys;
}

// Inner class for thread


private class C extends Thread {

    ClientResponse<B, Product, E> instrumentResponse = null;
    ProductRequest<B> request = null;

    C(String name) {
        super.setName(name);
    }

    public void run() {
        long startTime = System.currentTimeMillis();
        instrumentResponse = rdsDao.getByKey(request);
        long stopTime = System.currentTimeMillis();

        logger.info("RDS responded in [{}] milliseconds for thread [{}] while Instrument Lookup",
                new Object[] { stopTime - startTime, super.getName() });

    }

    public void setInstrumentResponse(
            ClientResponse<B, Product, E> instrumentResponse) {
        this.instrumentResponse = instrumentResponse;
    }

    public ClientResponse<B, Product, E> getInstrumentResponse() {
        return instrumentResponse;
    }

    public void setRequest(ProductRequest<B> request) {
        this.request = request;
    }

    public ProductRequest<B> getRequest() {
        return request;
    }

}
}
Kedar Mhaswade
  • 4,535
  • 2
  • 25
  • 34
ashish
  • 1
  • 2
  • `ExecutorService`, `CompletionService` etc. But if you need to wait for them all to finish anyway... why do you need anything different? – Andy Turner Jun 06 '16 at 14:15
  • You can have a look at ``java.util.concurrent.CountDownLatch`` – Alexandre Cartapanis Jun 06 '16 at 14:29
  • 1
    [`ExecutorService#invokeAll`](http://stackoverflow.com/a/18203093) is pretty much the same thing you do, except for using a thread pool. I don't see much benefit in using a [CompletionService](http://stackoverflow.com/questions/4912228/when-should-i-use-a-completionservice-over-an-executorservice) since there seems to be nothing much you can do while only some of the threads have finished (you can achieve similar by handling the result of each joined thread directly within the joining loop). – zapl Jun 06 '16 at 14:59

1 Answers1

1

Your code is running concurrently (not sequentially as you mentioned).

ThreadT.join() will make the current thread to wait for the ThreadT to finish.

As you are spawning multiple threads, and joining with the Main thread, those non Main threads will still run concurrently (as you are calling Thread.start() in createThread()).

If you are not joining the non Main threads as above, then your Main thread/method will complete before those other non Main threads complete, which I guess is not desirable for you.

Vladimir Vagaytsev
  • 2,871
  • 9
  • 33
  • 36
Vijay
  • 542
  • 4
  • 15