0

I have to make a HTTP POST request by passing header and body. In the body, I need to provide a pageNumber before posting data so I am starting with "1" initially. After that I will post data and I will get a JSON response back as shown below.

{
    "response": {
        "pageNumber": 1,
        "entries": 200,
        "numberOfPages": 3
    },
    "list": [
        {
            // some stuff here
        }
    ],
    "total": 1000
}

Now depending on response for pageNumber 1, I will decide how many more calls I need to make. Now in the above response numberOfPages are 3 so I need to make a total of three calls to the same URL. Since we already made 1 call I will make 2 more calls with pageNumber "2" and "3" in the body.

Below is my working code. I just need to call the same URL until numberOfPages times by just changing the body. For each call, it should be made with the corresponding pageNumber so if numberOfPages are 3 then I will make total 3 calls. And I am populating two maps after collecting data from each pages.

public class AppParser {
  private static final ObjectMapper objectMapper = new ObjectMapper();
  private static final String lastParentIdJsonPath = "......";    
  private final Map<String, String> processToTaskIdHolder = new HashMap<>();
  private final Multimap<String, Category> itemsByCategory = LinkedListMultimap.create();
  private final int entries;
  private final String siteId;

  public AppParser(int entries, String id) {
    this.entries = entries;
    this.id = id;
    collect();
  }

  // this is only called from above constructor
  private void collect() {
    String endpoint = "url_endpoint";
    int number = 1;
    int expectedNumber;
    do {
      HttpEntity<String> requestEntity = new HttpEntity<String>(getBody(number), getHeader());
      ResponseEntity<String> responseEntity =
          HttpClient.getInstance().getClient()
              .exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
      String jsonInput = responseEntity.getBody();
      Stuff response = objectMapper.readValue(jsonInput, Stuff.class);
      expectedNumber = (int) response.getPaginationResponse().getNumberOfPages();
      if (expectedNumber <= 0) {
        break;
      }
      List<Postings> postings = response.getPostings();
      for (Postings posting : postings) {
        if (posting.getClientIds().isEmpty()) {
          continue;
        }
        List<String> lastParent = JsonPath.read(jsonInput, lastParentIdJsonPath);
        String clientId = posting.getClientIds().get(0).getId();
        Category category = getCategory(posting);
        // populate two maps now
        itemsByCategory.put(clientId, category);
        processToTaskIdHolder.put(clientId, lastParent.get(0));
      }
      number++;
    } while (number <= expectedNumber);
  }

  private String getBody(final int number) {
    Input input = new Input(entries, number, 0);
    Body body = new Body("Stuff", input);
    return gson.toJson(body);
  }

  // getters for those two above maps
}

Now my above code is collecting data sequentially for each pages one by one so if I have high numberOfPages then it will take some time to collect all the data for all those page numbers. Let's say if numberOfPages is 500 then my code will be running sequentially one by one for each pageNumber. Is there any way to parallelize my above code so that we can collect data for say 5 pages at a same time? Is this possible to do? And I guess then I need to make sure my code is thread safe.

Note: HttpClient is thread safe singleton class.

user1950349
  • 4,738
  • 19
  • 67
  • 119

2 Answers2

1

I tried to modify your code with multithreading, but it wasn't easy because you didn't provide full class source with all imports. Also your code isn't as clean as could be. Your task is a usual case for async requesting. I wrapped your collect code into java.util.concurrent.Callable. It provides me use task via ExecutorService asynchronously and get the result as ParseResult object if it needs. In code below I made 1 request to fill the expectedNumber variable, and the in loop app creates tasks and submits them into executorService with dedicated thread pool where they are running. Code:

private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String URL_ENDPOINT = "url_endpoint";
private final Map<String, String> processToTaskIdHolder = new HashMap<>();
private final Multimap<String, Category> itemsByCategory = LinkedListMultimap.create();
private static final String lastParentIdJsonPath = "......";

class ParseResult {
    private String clientId;
    private Category category;
    private String lastParent;
    private int expectedNumber;
}

class ParseTask implements Callable<ParseResult> {

    private int pageNumber;

    public ParseTask(int pageNumber) {
        this.pageNumber = pageNumber;
    }

    @Override
    public ParseResult call() throws Exception {
        HttpEntity<String> requestEntity = new HttpEntity<String>(getBody(pageNumber), getHeader());
        ResponseEntity<String> responseEntity =
                HttpClient.getInstance().getClient()
                        .exchange(URI.create(URL_ENDPOINT), HttpMethod.POST, requestEntity, String.class);
        String jsonInput = responseEntity.getBody();
        Stuff response = objectMapper.readValue(jsonInput, Stuff.class);
        int expectedNumber = (int) response.getPaginationResponse().getNumberOfPages();
        if (expectedNumber <= 0) {
            return null; // or throw exception
        }
        List<Postings> postings = response.getPostings();
        for (Postings posting : postings) {
            if (posting.getClientIds().isEmpty()) {
                continue;
            }
            List<String> lastParent = JsonPath.read(jsonInput, lastParentIdJsonPath);
            String clientId = posting.getClientIds().get(0).getId();
            Category category = getCategory(posting);

            //collecting the result
            ParseResult parseResult = new ParseResult();
            parseResult.clientId = clientId;
            parseResult.category = category;
            parseResult.expectedNumber = expectedNumber;
            parseResult.lastParent = lastParent.get(0);
            writeResult(parseResult); // writing the result
            return parseResult;
        }
    }
}

public AppParser(int entries, String id) {
    // .....
    collect();
}

// this is only called from above constructor
private void collect() {
    int number = 1;
    int expectedNumber = 0;
    ParseTask parseTask = new ParseTask(number);
    try {
        ParseResult firstResult = parseTask.call();
        expectedNumber = firstResult.expectedNumber; // fill the pages amount
    } catch (Exception e) {
        e.printStackTrace();
    }

    ExecutorService executorService = Executors.newCachedThreadPool();
    while (number <= expectedNumber) {
        executorService.submit(new ParseTask(number));
    }
}

private String getBody(final int number) {
    Input input = new Input(entries, number, 0);
    Body body = new Body("Stuff", input);
    return gson.toJson(body);
}

private void writeResult(ParseResult result) {
    // populate two maps now
    itemsByCategory.put(result.clientId, result.category);
    processToTaskIdHolder.put(result.clientId, result.lastParent);
}

We can spend a lot of time to upgrading your code, but this is a raw version with multithreading. I'm not sure it'll work because as I said before you didn't provide the full version. Maybe it needs some syntax fixes.

Lev Khruschev
  • 1,576
  • 2
  • 14
  • 18
  • Is it necessary to use `Executors.newCachedThreadPool() or there is some benefit using that`? Can we not use `FixedThreadPoolSize` here? – user1950349 Jan 26 '18 at 17:15
  • That was just an example. If you know how many parallel threads you want use fixed thread pool. You can obtain more information reading book "Thinking in Java" – Lev Khruschev Jan 26 '18 at 19:46
  • ok. I provided my full class in the question now. Can you check that out? And also is your code thread safe? – user1950349 Jan 26 '18 at 19:48
0

Java 8 solution (in-order execution):

Parallel streams can be your friend here:

IntStream.range(1,numberOfPages)
        .parallel()
        .forEachOrdered(page -> {
            // ...
            postings.parallelStream()
                    .forEachOrdered(posting -> {
                        // ...
            });                    
        });

Any of the variables used inside the lambdas would need to be declared final.

foreachOrdered may be replaced by foreach if the order of output is not important.

See this topic for controlling how many threads are running concurrently: How many threads are spawned in parallelStream in Java 8?

Java 7 solution (non-ordered execution):

Inspired by: wait until all threads finish their work in java

ExecutorService es = Executors.newFixedThreadPool(4);
for(int page=1 ; page < numberOfPages ; ++page) {
    es.execute(new Runnable() {
        @Override
        public void run() {
            /*  your task */  
        }});
}
es.shutdown();
boolean finished = false;
try {
    finished = es.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {}

The 4 can be exchanged for maximum threads you want at any one time. And the awaitTermination needs some appropriate timeout.

DHa
  • 659
  • 1
  • 6
  • 21
  • I am still working with Java 7 so can't use Jav 8 yet. – user1950349 Jan 26 '18 at 16:42
  • There are backports of streams, for example: https://github.com/streamsupport/streamsupport I haven't tried these myself, but my guess is they could help you. – DHa Jan 26 '18 at 17:17
  • hmm never worked with backports before. is there any other way through normal java we can do this? – user1950349 Jan 26 '18 at 19:46
  • Added a java 7 way of doing it, it will have unordered execution but you are populating maps so hopefully that is okay. You might want to check that those maps you are using tolerate multiple threads writing to them as well. – DHa Jan 26 '18 at 21:48