I am using elasticsearch 7.17 and I am trying to get all the document id's, version numbers, full index name (using alias in search) and another field. The data will eventually contain about 500 million records so I need to use a solution that can scale effectively. I have read that the best solution is to use Point In Time with slices, however I seem to be having an mismatch between the number of hits that is returned from searching and the total count. I have written a simple version of the code that includes various things I have tried but commented out:
Method that starts the process off (client is HighLevelRestClient):
public void start() {
ElasticPointInTime elasticPointInTime = new ElasticPointInTime(client);
String indexName = "myindex";
long keepAliveMins = 1;
String pointInTimeId = elasticPointInTime.getPointInTimeId(indexName, keepAliveMins);
double count = elasticPointInTime.countPointInTime(pointInTimeId, keepAliveMins);
if (count > 0) {
int maxSlices = 10; //Don't now how to work out what this value should be
Object[] searchAfterValue = null;
for (int currentSlice = 0; currentSlice < maxSlices; currentSlice++)
{
DocumentDataResponse response = elasticPointInTime.searchPointInTime(pointInTimeId, keepAliveMins, currentSlice, maxSlices, searchAfterValue);
searchAfterValue = response.getSearchAfterValue();
processDocumentData(response.getPointInTimeData());
}
}
}
The class that handles the elasticsearch point in time queries:
public class ElasticPointInTime {
private static final String FIELD_NAME = "field";
private static final int MAX_SIZE = 10000;
private final RestHighLevelClient client;
public ElasticPointInTime(RestHighLevelClient client) {
this.client = client;
}
public String getPointInTimeId(String indexName, long keepAliveMins) {
String pointInTimeId = null;
OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indexName);
openRequest.keepAlive(TimeValue.timeValueMinutes(keepAliveMins));
try {
OpenPointInTimeResponse openResponse = client.openPointInTime(openRequest, RequestOptions.DEFAULT);
if (openResponse != null) {
pointInTimeId = openResponse.getPointInTimeId();
}
} catch (IOException e) {
e.printStackTrace();
}
return pointInTimeId;
}
public long countPointInTime(String pointInTimeId, long keepAliveMins) {
long count = -1;
CountRequest countRequest = new CountRequest();
final PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(pointInTimeId);
pointInTimeBuilder.setKeepAlive(TimeValue.timeValueMinutes(keepAliveMins));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
countRequest.source(searchSourceBuilder.pointInTimeBuilder(pointInTimeBuilder));
try {
CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);
if (countResponse != null) {
count = countResponse.getCount();
}
} catch (IOException e) {
e.printStackTrace();
}
return count;
}
public DocumentDataResponse searchPointInTime(String pointInTimeId, long keepAliveMins,
int currentSlice, int maxSlices, Object[] searchAfterValue) {
DocumentDataResponse response = new DocumentDataResponse();
Map<String, DocumentData> pointInTimeData = new HashMap<String, DocumentData>();
SearchRequest searchRequest = new SearchRequest();
final PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(pointInTimeId);
pointInTimeBuilder.setKeepAlive(TimeValue.timeValueMinutes(keepAliveMins));
SliceBuilder sliceBuilder = new SliceBuilder(currentSlice, maxSlices);
//Tried various fieldSorters but really just want data in order it was ingested
//FieldSortBuilder sharedDocSortBuilder = SortBuilders.fieldSort("_shard_doc").order(SortOrder.ASC);
FieldSortBuilder pitSortBuilder = SortBuilders.pitTiebreaker();
//FieldSortBuilder docSortBuilder = SortBuilders.fieldSort("_doc").order(SortOrder.ASC);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.fetchSource(false) // Dont need all document data
.version(true) // Get document version
.fetchField(FIELD_NAME) // Get specific field
.size(MAX_SIZE) // Set number of rows to return otherwise only 10 is returned
.sort(pitSortBuilder) // Sort
.pointInTimeBuilder(pointInTimeBuilder) // Point in time builder
.slice(sliceBuilder) // Sets the slice builder
;
if (searchAfterValue != null && searchAfterValue.length > 0) {
searchSourceBuilder.searchAfter(searchAfterValue);
}
searchRequest.source(searchSourceBuilder);
Object[] lastSearchAfterValue = null;
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
if (searchResponse != null) {
SearchHits hits = searchResponse.getHits();
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit: searchHits) {
String docId = hit.getId();
String indexName = hit.getIndex();
long versionNumber = hit.getVersion();
DocumentField docField = hit.field(FIELD_NAME);
String fieldValue = (docField == null || docField.getValue() == null) ? "" : docField.getValue().toString();
DocumentData docData = new DocumentData();
docData.setDocumentId(docId);
docData.setIndexName(indexName);
docData.setVersionNumber(versionNumber);
docData.setFieldData(fieldValue);
response.getPointInTimeData().put(docId, docData);
lastSearchAfterValue = hit.getSortValues();
}
response.searchAfterValue = lastSearchAfterValue;
}
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
}
Can someone tell me what I am doing wrong? I am not sure whether I should be using sort or not as I only need the data in the order it is ingested and could impact performance. Without size I end up with just 10 rows per slice but I thought slicing handled the number of rows. How should I calculate the number of slices?