1

I am using elasticsearch 7.17 and I am trying to get all the document id's, version numbers, full index name (using alias in search) and another field. The data will eventually contain about 500 million records so I need to use a solution that can scale effectively. I have read that the best solution is to use Point In Time with slices, however I seem to be having an mismatch between the number of hits that is returned from searching and the total count. I have written a simple version of the code that includes various things I have tried but commented out:

Method that starts the process off (client is HighLevelRestClient):

public void start() {
             ElasticPointInTime elasticPointInTime = new ElasticPointInTime(client);
             String indexName = "myindex";
             long keepAliveMins = 1;
             String pointInTimeId = elasticPointInTime.getPointInTimeId(indexName, keepAliveMins);
             double count = elasticPointInTime.countPointInTime(pointInTimeId, keepAliveMins);
             if (count > 0) {
                    int maxSlices = 10; //Don't now how to work out what this value should be
                    Object[] searchAfterValue = null;
                    for (int currentSlice = 0; currentSlice < maxSlices; currentSlice++)
                    {
                           DocumentDataResponse response = elasticPointInTime.searchPointInTime(pointInTimeId, keepAliveMins, currentSlice, maxSlices, searchAfterValue);
                           searchAfterValue = response.getSearchAfterValue();
                           processDocumentData(response.getPointInTimeData());
                    }
             }
       }

The class that handles the elasticsearch point in time queries:

public class ElasticPointInTime {
       
       private static final String FIELD_NAME = "field";
       private static final int MAX_SIZE = 10000;
       private final RestHighLevelClient client;
       
       public ElasticPointInTime(RestHighLevelClient client) {
             this.client = client;
       }
       
       public String getPointInTimeId(String indexName, long keepAliveMins) {
             String pointInTimeId = null;
             
             OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indexName);
              openRequest.keepAlive(TimeValue.timeValueMinutes(keepAliveMins));
             
             try {
                    OpenPointInTimeResponse openResponse = client.openPointInTime(openRequest, RequestOptions.DEFAULT);
                    if (openResponse != null) {
                           pointInTimeId = openResponse.getPointInTimeId();
                    }
             } catch (IOException e) {
                    e.printStackTrace();
             }
             
             return pointInTimeId;
       }
       
       public long countPointInTime(String pointInTimeId, long keepAliveMins) {
             long count = -1;
             CountRequest countRequest = new CountRequest();
             
             final PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(pointInTimeId);
       pointInTimeBuilder.setKeepAlive(TimeValue.timeValueMinutes(keepAliveMins));
             
             SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
       countRequest.source(searchSourceBuilder.pointInTimeBuilder(pointInTimeBuilder));
             
             try {
                    CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);
                    if (countResponse != null) {
                           count = countResponse.getCount();
                    }
             } catch (IOException e) {
                    e.printStackTrace();
             }
             return count;
       }
       
       public DocumentDataResponse searchPointInTime(String pointInTimeId, long keepAliveMins,
                    int currentSlice, int maxSlices, Object[] searchAfterValue) {
             DocumentDataResponse response = new DocumentDataResponse();
             Map<String, DocumentData> pointInTimeData = new HashMap<String, DocumentData>();
             
             SearchRequest searchRequest = new SearchRequest();
             final PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(pointInTimeId);
       pointInTimeBuilder.setKeepAlive(TimeValue.timeValueMinutes(keepAliveMins));
             
             SliceBuilder sliceBuilder = new SliceBuilder(currentSlice, maxSlices);
             
             //Tried various fieldSorters but really just want data in order it was ingested
             //FieldSortBuilder sharedDocSortBuilder = SortBuilders.fieldSort("_shard_doc").order(SortOrder.ASC);
             FieldSortBuilder pitSortBuilder = SortBuilders.pitTiebreaker();
             //FieldSortBuilder docSortBuilder = SortBuilders.fieldSort("_doc").order(SortOrder.ASC);
             
             SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                           .fetchSource(false)                      // Dont need all document data
                           .version(true)                           // Get document version
                           .fetchField(FIELD_NAME)                  // Get specific field
                           .size(MAX_SIZE)                          // Set number of rows to return otherwise only 10 is returned
                           .sort(pitSortBuilder)                    // Sort
                           .pointInTimeBuilder(pointInTimeBuilder)  // Point in time builder
                           .slice(sliceBuilder)                     // Sets the slice builder
                           ;
             
             if (searchAfterValue != null && searchAfterValue.length > 0) {
                    searchSourceBuilder.searchAfter(searchAfterValue);
             }
             searchRequest.source(searchSourceBuilder);
             Object[] lastSearchAfterValue = null;
             try {
                    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                    if (searchResponse != null) {
                           SearchHits hits = searchResponse.getHits();
                           SearchHit[] searchHits = hits.getHits();
                           for (SearchHit hit: searchHits) {
                                 String docId = hit.getId();
                                 String indexName = hit.getIndex();
                                 long versionNumber = hit.getVersion();
                                 DocumentField docField = hit.field(FIELD_NAME);
                                 String fieldValue = (docField == null || docField.getValue() == null) ? "" : docField.getValue().toString();
                                 DocumentData docData = new DocumentData();
                                 docData.setDocumentId(docId);
                                 docData.setIndexName(indexName);
                                 docData.setVersionNumber(versionNumber);
                                 docData.setFieldData(fieldValue);
                                 response.getPointInTimeData().put(docId, docData);
                                 lastSearchAfterValue = hit.getSortValues();
                           }
                           response.searchAfterValue = lastSearchAfterValue;
                    }
             } catch (IOException e) {
                    e.printStackTrace();
             }
             
             return response;
       }
}

Can someone tell me what I am doing wrong? I am not sure whether I should be using sort or not as I only need the data in the order it is ingested and could impact performance. Without size I end up with just 10 rows per slice but I thought slicing handled the number of rows. How should I calculate the number of slices?

karen
  • 893
  • 2
  • 13
  • 38

0 Answers0