0

Currently I am pulling a lot of data from a service using their built in REST API. Currently it takes about ~600ms for the service to return a JSON formatted file, and I need to get 495 JSON formatted files returned.

As my original POC I just called them linearly in the main thread (didn't want the program to advance till all the queries were complete), and that took about ~300 seconds to complete. Now that I have shown POC, I need to optimize this quite a bit, because a 5min query is not very ideal. Currently what I am doing is using Executor Service with a fixed Thread Pool and adding 495 tasks to the service and invokeAll() them.

My only issue, is that now I am getting bad data values. Logically nothing should change, the querys only return 50 elements at a time so all I am doing is changing the starting point (which I've checked and there are no overlaps in the URL). For some reason I have results that are missing and I have duplicates of existing results. The code to process the JSON has not changed, the only thing that has changed was the method in which the results were obtained.

I initially thought I had an issue with the variable traversing Threads and it not being Atomic, but all that is really happening after I get the JSON is that I am parsing it, creating a Requirement object, then adding it to a Set. Since the Set is never redefined only added I was under the impression it being Atomic wouldn't make a difference (I could be 100% wrong however).

Below the first snippet of code is how I am running ti on the main thread, linearly, whereas the second snippet is my version which includes multithreading. I do know it is a bit messy, this is currently my POC to determine how much faster the multi-threading is (currently goes from ~300seconds to ~45sec) and if its worth to apply to other calls within my program.

I just need to figure out why values are being duplicated and missing (no duplicates or missing values when it is called linearly) when using multiple threads. The URL determines the starting point, and the size never changes or anything, I can't figure out why I am ~ 2000 requirements short and 224 duplicate entries where there shouldn't be ANY at all.

The only thing that was changed has been the Executor Service and the loop in which I obtain the startingPoint (aka I just calculate how many loops I need instead of relying on the returned current position). All the creatRequirement(obj) function does is parse the JSON file even more, and create a Requirement Object using data passed from JSON into the constructor.

private void obtainAllRequirements() {
    int startingLocation = 0;
    boolean continueQueries = true;
    String output = null;
    do {
        output = executeRESTCall(baseUrl + "/abstractitems?maxResults=50&itemType=43&startAt=" + startingLocation);
        JSONObject obj = new JSONObject(output);
        if ((obj.getJSONObject("meta").getJSONObject("pageInfo").getInt("totalResults") - startingLocation) <= 50) {
            continueQueries = false;
        }
        createRequirements(obj);
        startingLocation += 50;

    } while (continueQueries);
}


private void obtainAllRequirements() {
    String output = executeRESTCall(baseUrl + "/abstractitems?itemType=43&startAt=0");
    int totalResults = new JSONObject(output).getJSONObject("meta").getJSONObject("pageInfo").getInt("totalResults");
    ExecutorService service = Executors.newFixedThreadPool(THREADS);
    List<Callable<Void>> tasks = new ArrayList<>();
    for (int i = 0; i < Math.ceil(totalResults/MAX_RESULTS); i++){
        final int iteration = i;
        tasks.add(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                System.out.println(baseUrl + "/abstractitems?maxResults="+MAX_RESULTS+"&itemType=43&startAt=" + (iteration*MAX_RESULTS));
                String o = executeRESTCall(baseUrl + "/abstractitems?maxResults="+MAX_RESULTS+"&itemType=43&startAt=" + (iteration*MAX_RESULTS));
                JSONObject obj = new JSONObject(o);
                createRequirements(obj);
                return null;
            }
        });
    }
    try {
        service.invokeAll(tasks);
        service.shutdown();
    }catch (InterruptedException e){
        e.printStackTrace();
    }

}

Edit: Here is what happens inside create requirement, the constructor for Requirement just takes the JSON data and assigns the values to specific private variable members.

private void createRequirements(JSONObject json) {
    JSONArray dataArray = json.getJSONArray("data"); // Gets the data array in the JSON file
    for (int i = 0; i < dataArray.length(); i++) {
        JSONObject req = dataArray.getJSONObject(i);
        Requirement requirement = new Requirement(req);
        if (!requirement.INVALID_PROJECT) {
            requirements.add(requirement);
        }
    }
}

EDIT: Added the requirement's set to be a ConcurrentSet but no change.

this.requirements = new ConcurrentHashMap<>().newKeySet();

EDIT: Added excute REST call

public String executeRESTCall(String urlValue) {
    try {
        URL url = new URL(urlValue);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setRequestMethod("GET");
        conn.setRequestProperty("Accept", "application/json");
        String encoding = Base64.getEncoder()
                .encodeToString((Credentials.XXX + ":" + Credentials.XXX).getBytes("UTF-8"));
        conn.setRequestProperty("Authorization", "Basic " + encoding);
        if (conn.getResponseCode() != 200) {
            throw new RuntimeException("Failed : HTTP error code : " + conn.getResponseCode());
        }

        BufferedReader br = new BufferedReader(new InputStreamReader((conn.getInputStream())));
        return br.readLine();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return "";
}
97WaterPolo
  • 375
  • 1
  • 3
  • 24
  • 2
    "Since the Set is never redefined only added..." well thread safety is not only about object allocation safety. The manipulation of objects is just as important. So, no, you can not expect to add data to any `Set` from multiple thread, and have it work reliably. So definietly don't do that, and start with a Concurrent Set implementation. Could you show what happens inside `createRequirements` ? – GPI Jul 11 '18 at 16:44
  • @GPI I added the code for createRequirements. Could you explain a bit more in detail as to why it wouldn't work reliably. The data values should never overlap as each REST call should return unique information, and it is only adding to the Set, since I don't care about order I don't care if the first query is added last and visa verca. ConcurrentSet is not a valid object type, or did you mean implement one myself (using this)? https://javarevisited.blogspot.com/2017/08/how-to-create-thread-safe-concurrent-hashset-in-java-8.html#axzz5Ky2S76GY – 97WaterPolo Jul 11 '18 at 17:13
  • @GPI I implemented the Set to be backed by a ConcurrentHashMap. All the code still works and is generated properly (outside of my error here) so it accepted the change. But I still have the issue with results from the query being duplicated. It is odd because I am doing multiple other queries, threaded in the exact same way (but sub 10 calls) with no issue at all, it is just the Requirements which has 495+ calls that is the issue. – 97WaterPolo Jul 11 '18 at 17:24
  • @97WaterPolo If number of threads is affecting your results then *you can't trust your threading, even if it looks like it's working with fewer threads*. With proper multi-threading the number of threads should not impact your results; Only the speed should be impacted by the number of threads, which you should tune to get optimal performance. – xtratic Jul 11 '18 at 18:00
  • Could you show the body of executeRESTCall function? Is it thread safe? Because everything else is OK at first sight. – dyrkin Jul 11 '18 at 22:00
  • @dyrkin Updated initial post – 97WaterPolo Jul 12 '18 at 16:11
  • @97WaterPolo 1) about the concurrent set : it's all about race conditions. Adding data to a java.util.HashSet means adding data to a java.util.HashMap (internally it's the same). Adding data to a HashMap means resizing internal tables (called buckets) and counters. Incrementing a counter means calling "size = size+1" on it. Calling "size = size+1" from 2 different threads means getting a false result (one overlapse and erases the other). And that's the good case. Worst case : you end up in infinite loops... https://stackoverflow.com/questions/10219724/infinite-loop-in-java-util-hashmap – GPI Jul 13 '18 at 12:30
  • 2) Just add logging. Log each unit of work with an identifyer, log the URL each identfier calls, log the HTTP result line you get back, log how many "requirements" you got out of it, log the size of the set before and after adding them all. 3) Do wait for the executor to end its work before collecting results (that's `awaitTermination`, after `shutdown`), and check that the results completed OK (that is call `get` on each `Future` object that `invokeAll` returned), that's a must – GPI Jul 13 '18 at 12:34
  • 5) properly close the input stream of your HTTP Call to avoid leaking system resources. – GPI Jul 13 '18 at 16:38

2 Answers2

1

Don't lose track of exceptions

Also you'll probably want to wait for the tasks to actually complete.

You'll need some better exception handling, but for now please test with this (with the many threads) and post the output:

private void obtainAllRequirements() {

    String output = executeRESTCall(baseUrl + "/abstractitems?itemType=43&startAt=0");
    int totalResults = new JSONObject(output).getJSONObject("meta").getJSONObject("pageInfo")
            .getInt("totalResults");

    ExecutorService service = Executors.newFixedThreadPool(THREADS);
    List<Callable<Void>> tasks = new ArrayList<>();
    for (int i = 0; i < Math.ceil(totalResults / MAX_RESULTS); i++) {
        final int iteration = i;
        tasks.add(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                try
                    final String request = baseUrl + "/abstractitems?maxResults=" + MAX_RESULTS
                        + "&itemType=43&startAt=" + (iteration * MAX_RESULTS);
                    // hash codes to tie the request and responses together,
                    // since multithreading will have them printing interleaved
                    System.out.println(hashCode() + ":request: " + request);
                    String response = executeRESTCall(request);
                    System.out.println(hashCode() + ":response: " + response);
                    JSONObject obj = new JSONObject(response);
                    createRequirements(obj);
                    return null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
    try {
        service.invokeAll(tasks);

        service.shutdown();
        // you might want to await termination ?
        service.awaitTermination(1, TimeUnit.MINUTES);

        // catch all exceptions ?
        // you'll need some better error handling
    } catch (Exception e) {
        e.printStackTrace();
    }
}
xtratic
  • 4,600
  • 2
  • 14
  • 32
  • When running the code given, I get the following error on invokeAll(); https://bitbucket.org/snippets/97WaterPolo/jer78q – 97WaterPolo Jul 11 '18 at 20:47
  • @97WaterPolo What about https://stackoverflow.com/questions/8183205/what-could-be-the-cause-of-rejectedexecutionexception? – maaartinus Jul 11 '18 at 21:28
  • @97WaterPolo I guess, you need `shutdown` and *then* `awaitTermination`. The exception happens already in `invokeAll`, i.e., the executor rejects that much work. You probably need to increase its `workQueue` size, see [here](https://stackoverflow.com/questions/8183205/what-could-be-the-cause-of-rejectedexecutionexception) for rejection reasons. In any case: *Make sure all exception are at least logged somewhere.* – maaartinus Jul 11 '18 at 21:47
  • @maaartinus Thanks for the assist! Yea, the primary point I was going for was to not lose the exceptions and to properly debug the multithreading code. – xtratic Jul 13 '18 at 20:00
-1

What I did was dumb down the amount of Fixed Threads from 400 to 10. Not sure why I picked 400, I didn't think that through and was under the impression the JVM would handle the threads and I wouldn't really have to worry about it. Dropping it down to 10 fixes the issues that I had where data was missing and duplicated, I have no clue why this is the case and I would love to understand why.

97WaterPolo
  • 375
  • 1
  • 3
  • 24
  • 1
    If number of threads is affecting your results then *you can't trust your threading, even if it looks like it's working with fewer threads*. With proper multi-threading the number of threads should not impact your results; Only the speed should be impacted by the number of threads, which you should tune to get optimal performance. – xtratic Jul 11 '18 at 18:00
  • @xtratic Is there any other possible reason I am having this data duplication/missing error then? The calls should never return the same data twice, the results returned depends on a 3rd party application which **should** have consistent values at time of query. – 97WaterPolo Jul 11 '18 at 20:55
  • You can validate the responses from the server since you log them now. If the responses are valid then it must be on the client side, and if serial execution is working fine but concurrent is not then it is likely threading code that is causing the problems. – xtratic Jul 11 '18 at 21:20