1

I have 150k documents within an index for which I need to dump the raw data; yes I actually need all the raw data.

So I am trying to figure out how to run a scroll query but would like to do so async, however I haven't been able to figure out how to get the ActionListeners to self ref so that it continues to make the calls until the documents within the scroll are exhausted.

A synch approach is pretty straight forward

SearchRequest sr = new SearchRequest();
// search source builder, etc for index, query, etc

final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
sr.scroll(scroll);

SearchResponse response = client.search(request, RequestOptions.DEFAULT);
String scrollId = response.getScrollId();
SearchHit[] hits = response.getHits().getHits();

while ( hits != null && hits.length > 0) {
    //add the hits to a data structure
    SearchScrollRequest newRequest = new SearchScrollRequest(scrollId);
    newRequest.scroll(scroll);
    response = client.scroll(newRequest);
    hits = response.getHits().getHits();
}

So how to implement an async callback that sends the data to a processEmulator, but then continues to call for more data. I think the issue is the callback needs to include ref to itself.

SearchRequest req = new SearchRequest();
SearchSourceBuilder sb = new SearchSourceBuilder();
sb.size(10000); // batches of 10k please
req.source(sb);
req.indices("my-data-index")
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(2L));
req.scroll(scroll);
                
SearchResponse res = client.search(req, RequestOptions.DEFAULT);
String scrollId = res.getScrollId();
SearchScrollRequest sr = new SearchScrollRequest(scrollId);

ActionListener<SearchResponse> callback = new ActionListener<SearchResponse>() {
     public void onResponse(SearchResponse searchResponse) {
          List<String> results = getDataFromResponse(searchResponse);
          if (results != null && results.size() > 0) {
              if (results != null && results.size() > 0) {
                  emulateSendDateElsewhere(results); 
                  // call again? how do I self ref the callback from within the callback??
              }
          }
     }

     @Override
     public void onFailure(Exception e) {
         LogUtil.writeLog(e);
     }
};


client.scrollAsync(sr, RequestOptions.DEFAULT, callback);

I have tried a couple things, one being a basically self-calling method like the following:

private void processAsyncAssetCall(String scrollId) {
    SearchScrollRequest sr = new SearchScrollRequest(scrollId);
    ActionListener<SearchResponse> callback = new ActionListener<SearchResponse>() {
        @Override
        public void onResponse(SearchResponse searchResponse) {
            List<String> results = getIdsFromSearchResponse(searchResponse);
            if (results != null && results.size() > 0) {
                if (results != null && results.size() > 0) {
                    emulateSendDateElsewhere(results);
                    processAsyncAssetCall(scrollId);
                }
            }
        }

        @Override
        public void onFailure(Exception e) {
            LogUtil.writeLog(e);
        }
    };

    RestHighLevelClient client = getInternalRepo().getClient();
    client.scrollAsync(sr, RequestOptions.DEFAULT, callback);
}

However this still only runs twice and then errors parsing the results.

Thanks

Justin
  • 4,461
  • 22
  • 87
  • 152
  • Can close, this was another error within other logic that was causing it to fail, bottom solution works fine. Apologies. – Justin Oct 18 '22 at 14:09

0 Answers0