0

I have a HashMap<String, CompletableFuture<HashMap<String, String>>> mapping an item to its attributes, e.g. { "pizza" -> { "calories" -> "120", "fat" -> "12" } }, where the attributes are retrieved from different data sources.

For example, we get the "fat" attribute from the database, while we get the "calories" attribute from Solr.

When I initially retrieve "fat" from the DB, I use supplyAsync in order to not block the main thread, something like:

  public CompletableFuture<HashMap<String, String>> getFat(String name, boolean) {
    return CompletableFuture.supplyAsync(new Supplier<HashMap<String, String>>() {
      @Override
      public HashMap<String, String> get() {
        HashMap<String, String> attributes = new HashMap<>();
        
        ... do work ...
      
        attributes.put(name, attributes);
        return attributes;
      }
   })
 }

I then chain this with an asynchronous call to Solr so that I will eventually have an asynchronous Hashmap mapping items to their attributes, namely HashMap<String, CompletableFuture<HashMap<String, String>>> itemsToAttributesMapping; (so I loop through the keys of the hashmap and update the value with the new attributes, accessing the old ones using thenApply).

I finish by mapping the data to a csv, which is where the problem arises:

       File file = new File(home + "/Downloads/rmsSkuValidationResults.csv");

       try{
          FileWriter outputfile = new FileWriter(file);
          CSVWriter writer = new CSVWriter(outputfile);

            for(String itemKey : itemsToAttributesMapping.keySet()) {
                itemsToAttributesMapping.get(itemKey).thenAccept(attributes -> {

                String[] row = { attributes.get("calories"),
                            attributes.get("fat")
                        
                        ... more attributes ...

                        };
                writer.writeNext(row);
                });
            }

         writer.close();
      }
      catch(Exception e){
        e.printStackTrace();
      }

As is, the printing to the CSV file works fine for some 800-1100 items, but stops writing after that and the program terminates.

I have tried variations to the above, including using get instead of thenAccept, or adding join after the thenAccept leading to the program hanging (async computations are quick, shouldn't hang).

I have also tried storing the results of the thenAccepts I run, and then calling allOf on them, but this leads to strange behavior where the attributes from Solr stop showing up after a few hundred items. The attributes from the database do show up for every item, which makes me think the first supplyAsync always works, but the subsequent thenApplys that add attributes to the original HashMap<String, CompletableFuture<HashMap<String, String>>> itemsToAttributesMapping; provided by the supplyAsnc stop working.

Any insight on what might be the problem would be much appreciated. Perhaps my understanding of CompletableFuture is incorrect, especially on chaining and resolving futures? Maybe this is a timeout issue, or the threads are being lost? The last approach I expanded upon suggests perhaps the issue is the thenApplys?

artin2
  • 136
  • 1
  • 5
  • 1
    Once you get past this and complete your program I would recommend converting to using CompletionStage rather than CompletableFuture. https://stackoverflow.com/a/49158702/925913 – Andrew Cheong Dec 22 '20 at 10:46
  • 1
    @slackwing you had a great comment, I'll choose that as the accepted answer since it provides great insight, but I wanted to clarify here that I was able to get it to work after refactoring the `thenApply`s! The code is not trivial so I can't share all of it here (especially both before/after refactoring) but I had a thenApply where I retrieved the item name and did a Solr lookup, and instead of adding the attrib. right there, I returned a new `CompletableFuture` for those new attrib. and later double chained existing and new attrib. in two `thenApply`s. Now I just add new attrib. immediately. – artin2 Dec 22 '20 at 17:30
  • I'd also add that I used this approach to writing to csv file, it takes quite a bit but it doesn't hang like the `get` did: I first add the results of the `thenAccept` to `ArrayList> async = new ArrayList>();` and then run the following: `CompletableFuture[] asyncArray = async.toArray(CompletableFuture[]::new); var all = CompletableFuture.allOf(asyncArray); all.whenComplete((aVoid, throwable) -> { System.out.println("Completed all"); }); all.join();` Sorry for format – artin2 Dec 22 '20 at 17:31
  • Hm. Tbh, hard to picture the code you're describing, but glad it worked out. And yup, using `allOf` works. Typically though (if you have control over the server), we expose a batch API (or more advanced, streaming API). If you're using an external API, you may want to batch your requests to not hit a rate limit, e.g. 20 at a time. You should do this even if there isn't a rate limit because you may be clogging up threads / not allowing other tasks to get intermittent priority. (I'm actually not totally sure about this last statement though.) – Andrew Cheong Dec 22 '20 at 19:47
  • @slackwing we do have a streaming api which is what I was going to use initially, but there were complications forcing me not to use it, had to make the code asynchronous accordingly so it would run in a reasonable time frame. – artin2 Dec 22 '20 at 20:02
  • gotcha, makes sense – Andrew Cheong Dec 22 '20 at 20:04

1 Answers1

0

Here's a rough illustration of what your code above, as you have it, is doing:

get(itemKey1) then at some unspecified time in the future writeNext(attr1)
get(itemKey2) then at some unspecified time in the future writeNext(attr2)
get(itemKey3) then at some unspecified time in the future writeNext(attr3)
get(itemKey4) then at some unspecified time in the future writeNext(attr4)
get(itemKey5) then at some unspecified time in the future writeNext(attr5)
get(itemKey6) then at some unspecified time in the future writeNext(attr6)
get(itemKey7) then at some unspecified time in the future writeNext(attr7)
attr1 finally delivered; writeNext(attr1)
get(itemKey8) then at some unspecified time in the future writeNext(attr8)
attr2 finally delivered; writeNext(attr2)
attr3 finally delivered; writeNext(attr3)
get(itemKey9) then at some unspecified time in the future writeNext(attr9)
no more items; writer.close()
attr4 finally delivered; oops, writer closed
attr5 finally delivered; oops, writer closed
attr6 finally delivered; oops, writer closed
attr7 finally delivered; oops, writer closed
attr8 finally delivered; oops, writer closed
attr9 finally delivered; oops, writer closed

You mentioned you tried .get() and .join(). That would basically make the program synchronous but it's a good debugging step. It would have changed execution to:

get(itemKey1) then at some unspecified time in the future writeNext(attr1)
attr1 finally delivered; writeNext(attr1)
get(itemKey2) then at some unspecified time in the future writeNext(attr2)
attr2 finally delivered; writeNext(attr2)
get(itemKey3) then at some unspecified time in the future writeNext(attr3)
attr3 finally delivered; writeNext(attr3)
...
...
...
get(itemKey9) then at some unspecified time in the future writeNext(attr9)
attr9 finally delivered; writeNext(attr9)
no more items; writer.close()

This should have worked. What does adding output to each of your stages (the thenApplys you haven't shown as well as the thenAccept) reveal? Is it indeed going as quickly as you say?

Please show more code. Especially of the chaining part, if that's a place you think the problem may be.

Andrew Cheong
  • 29,362
  • 15
  • 90
  • 145
  • Thank you, this was very insightful :) please see the comments I left under the original post, it seems to be working now! – artin2 Dec 22 '20 at 17:35