1

Since Elasticsearch is on the way to remove _type from its API, so I need to maintain the compatibility of our product on ES across different versions.
To solve the conflicts between 2.x, 5.x and 6.x and higher version which does not support _type, I simply unify the api but write different implementations on the requests sent to ES. For versions support _type, I use ES low rest client to send raw dsl to ES while for those which do not support, I invoke high rest client. Unfortunately, when I was implementing bulk(), I simply had no idea how I should handle BulkResponse.
For 2.x and 5.x, my method return is a JSONObject, implementations are below:

    /**
     * @param bulkRequest requests to be sent
     * @param type target type, for es 2.x and 5.x, this field is set by user
     * @param refresh force es to refresh and create new lucene segment or not
     * @param options
     * @param headers
     * @return
     * @throws IOException
     */
    @Override
    public BulkResponse bulk(BulkRequest bulkRequest, String type, boolean refresh, RequestOptions options, Header... headers) throws IOException {

        String endpoint = connection.getHttpSchema() + "://" + connection.getConnectedHosts()
                + ":" + connection.getConnectedPort() + "/_bulk";
        HttpEntity entity = new NStringEntity(wrapBulkRequests(bulkRequest, type), ContentType.APPLICATION_JSON);
        Request request = new Request(POST, endpoint);
        request.setOptions(options);
        request.setEntity(entity);
        Response response = this.connection.getLowLevelClient().performRequest(request);
        HttpEntity httpEntity = response.getEntity();
        JSONObject res = JSON.parseObject(EntityUtils.toString(httpEntity, "utf-8"));
        LOG.debug("bulk response is {}", res);
        return new BulkResponse(res);
    }

private String wrapBulkRequests(BulkRequest bulkRequest, String type) {
        StringBuilder bulkRequestBody = new StringBuilder();
        for (DocWriteRequest<?> request : bulkRequest.requests()) {
            String requestStr = request.toString();
            String targetIndex = request.index();
            String _id = request.id();
            String opType = request.opType().getLowercase();
            if (INDEX_OP_TYPE.equals(opType)) {
                if (_id == null || "".equals(_id)) {
                    bulkRequestBody.append(String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" } }\n", targetIndex, type));
                } else {
                    bulkRequestBody.append(String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\"  } }\n", targetIndex, type, _id));
                }
                bulkRequestBody.append(matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]") + "\n");
            } else if (CREATE_OP_TYPE.equals(opType)) {
                if (_id == null || "".equals(_id)) {
                    bulkRequestBody.append(String.format("{ \"create\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" } }\n", targetIndex, type));
                } else {
                    bulkRequestBody.append(String.format("{ \"create\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\"  } }\n", targetIndex, type, _id));
                }
                bulkRequestBody.append(matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]") + "\n");
            } else if (UPDATE_OP_TYPE.equals(opType)) {
                int retryOnConflict = ((UpdateRequest) request).retryOnConflict();
                if (retryOnConflict == 0) {
                    bulkRequestBody.append(String.format("{ \"update\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\"  } }\n", targetIndex, type, _id));
                } else {
                    bulkRequestBody.append(String.format("{ \"update\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\",  \"retry_on_conflict\" : %d} }\n", targetIndex, type, _id, retryOnConflict));
                }
                boolean isUpsert = ((UpdateRequest) request).docAsUpsert();
                bulkRequestBody.append(matchSourceFromUpdateRequest(requestStr, isUpsert));
            } else if (DELETE_OP_TYPE.equals(opType)) { // delete ops
                bulkRequestBody.append(String.format("{ \"delete\" : { \"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\"  } }\n", targetIndex, type, _id));
            } else {
                LOG.error("bad bulk request op_type {}", opType);
            }
        }
        LOG.debug("bulk request body is {}", bulkRequestBody.toString());
        return bulkRequestBody.toString();
    }

    private String matchSourceFromUpdateRequest(String requestStr, boolean isUpsert) {
        if (isUpsert) {
            return "{ \"doc\" : " + matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]\\}\\]") + ", \"doc_as_upsert\" : true }\n";
        } else {
            return "{ \"doc\" : " + matchSourceFromIndexRequest(requestStr, "source\\[(\\{.+\\})\\]\\}\\]") + " }\n";
        }
    }

    private String matchSourceFromIndexRequest(String requestStr, String regex) {
        Pattern p = Pattern.compile(regex);
        Matcher m = p.matcher(requestStr);
        if (m.find()) {
            LOG.debug("index bulk request source is {}", m.group(1));
            return m.group(1);
        }
        LOG.error("cannot match source from index bulk request, requestStr is {}", requestStr);
        return "";
    }

To be compatible with 6.x and 7.x, I need to return a BulkResponse. For cases like GetResponse, UpdateResponse, IndexResponse, I can simply write our own wrap classes to hold JSONObject or real ES responses based on the current es version because commonly used api return types of responses above are just primary type. But for BulkResponse, it returns a lot nested classes like BulkItemResponse. So I think I might have to build BulkResponse given JSONObject response.
And I googled a lot and found a similar question:How do you convert an Elasticsearch JSON String Response, with an Aggregation, to an Elasticsearch SearchResponse Object
This guy had similar request with me and his request was trying to convert json to SearchResponse while mine was trying to convert json to BulkResponse. I got the idea was to build something like NamedXContentRegistry. But this part of ES is kinda like a black box which is not easily exposable or visible to developers. I searched a lot of docs, articles but failed to learn to write my own XContentRegistry. So here I want to ask someone with solid Elasticsearch knowledge base to help me out of this mess.
Hugh thanks!

Yifeng
  • 97
  • 1
  • 9
  • did you find any solution?? – saba safavi Aug 19 '20 at 10:42
  • Direct conversion seemed hard to achieve, so I made a workaround, which I used in other types of es response, that create my own BulkResponse which has a Object field called content and this field is a JSONObject if current es server supported _type, otherwise the content type is *org.elasticsearch.action.bulk.BulkResponse*. At runtime use `instanceof` keyword to determine current response type and do your business logic. BTW, hope you find a better and more elegant way to solve this problem – Yifeng Aug 20 '20 at 23:12

0 Answers0