I have 150k documents within an index for which I need to dump the raw data; yes I actually need all the raw data.
So I am trying to figure out how to run a scroll query but would like to do so async, however I haven't been able to figure out how to get the ActionListeners to self ref so that it continues to make the calls until the documents within the scroll are exhausted.
A synch approach is pretty straight forward
SearchRequest sr = new SearchRequest();
// search source builder, etc for index, query, etc
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
sr.scroll(scroll);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
String scrollId = response.getScrollId();
SearchHit[] hits = response.getHits().getHits();
while ( hits != null && hits.length > 0) {
//add the hits to a data structure
SearchScrollRequest newRequest = new SearchScrollRequest(scrollId);
newRequest.scroll(scroll);
response = client.scroll(newRequest);
hits = response.getHits().getHits();
}
So how to implement an async callback that sends the data to a processEmulator, but then continues to call for more data. I think the issue is the callback needs to include ref to itself.
SearchRequest req = new SearchRequest();
SearchSourceBuilder sb = new SearchSourceBuilder();
sb.size(10000); // batches of 10k please
req.source(sb);
req.indices("my-data-index")
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(2L));
req.scroll(scroll);
SearchResponse res = client.search(req, RequestOptions.DEFAULT);
String scrollId = res.getScrollId();
SearchScrollRequest sr = new SearchScrollRequest(scrollId);
ActionListener<SearchResponse> callback = new ActionListener<SearchResponse>() {
public void onResponse(SearchResponse searchResponse) {
List<String> results = getDataFromResponse(searchResponse);
if (results != null && results.size() > 0) {
if (results != null && results.size() > 0) {
emulateSendDateElsewhere(results);
// call again? how do I self ref the callback from within the callback??
}
}
}
@Override
public void onFailure(Exception e) {
LogUtil.writeLog(e);
}
};
client.scrollAsync(sr, RequestOptions.DEFAULT, callback);
I have tried a couple things, one being a basically self-calling method like the following:
private void processAsyncAssetCall(String scrollId) {
SearchScrollRequest sr = new SearchScrollRequest(scrollId);
ActionListener<SearchResponse> callback = new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
List<String> results = getIdsFromSearchResponse(searchResponse);
if (results != null && results.size() > 0) {
if (results != null && results.size() > 0) {
emulateSendDateElsewhere(results);
processAsyncAssetCall(scrollId);
}
}
}
@Override
public void onFailure(Exception e) {
LogUtil.writeLog(e);
}
};
RestHighLevelClient client = getInternalRepo().getClient();
client.scrollAsync(sr, RequestOptions.DEFAULT, callback);
}
However this still only runs twice and then errors parsing the results.
Thanks