1

I have been unable to solve the problem into elasticsearch Bulk method for several days, since I am not strong in Go and started learning it not so long ago, while executing the code :

package main

import (
    "bytes"
    json "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
)

type BulkInsertMetaData struct {
    Index []BulkInsertIndex `json:"index"`
}
type BulkInsertIndex struct {
    Index string `json:"_index"`
    ID    string `json:"_id"`
}

type BulInsertData struct {
    Url string `json:"url"`
}

func main() {
    dataMeta := BulkInsertMetaData{
        Index: []BulkInsertIndex{
            {
                Index: "Test",
                ID:    "1234567890",
            },
        },
    }
    data := BulInsertData{
        Url: "http://XXXX.XX",
    }
    TojsBulInsertData, _ := json.Marshal(data)
    TojsBulkInsertMetaData, _ := json.Marshal(dataMeta)
    BulkMetaData := bytes.NewBuffer(append(TojsBulkInsertMetaData, []byte("\n")...))
    BulkData := bytes.NewBuffer(append(TojsBulInsertData, []byte("\n")...))
    log.Println(BulkMetaData)
    log.Println(BulkData)
    respMetaData, err := http.Post("http://127.0.0.1:9200/_bulk", "application/json", BulkMetaData)
    if err != nil {
        log.Println(err)
    }
    body, err := ioutil.ReadAll(respMetaData.Body)
    if err != nil {
        log.Println(err)
    }
    fmt.Println(string(body))
    respBulkData, err := http.Post("http://127.0.0.1:9200/_bulk", "application/json", BulkData)
    if err != nil {
        log.Println(err)
    }
    body2, err := ioutil.ReadAll(respBulkData.Body)
    if err != nil {
        log.Println(err)
    }
    fmt.Println(string(body2))
}

but i get an error:

2022/02/09 14:37:02 {"index":[{"_index":"Test","_id":"1234567890"}]}

2022/02/09 14:37:02 {"url":"http://XXXX.XX"}

{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [START_ARRAY]"}],"type":"illegal_argument_exception","reason":"Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [START_ARRAY]"},"status":400}

please help and explain what I'm doing wrong, I searched the Internet for the answer to my question but did not find I test insert when using REST client passes without problems

Paulo
  • 8,690
  • 5
  • 20
  • 34
MaxVel
  • 11
  • 1
  • 3
  • Have a look at: https://stackoverflow.com/questions/45792309/bulk-api-malformed-action-metadata-line-3-expected-start-object-but-found. Bulk API expects no newlines (except the last line). The Docs mention: `The REST API endpoint is /_bulk, and it expects the following newline delimited JSON (NDJSON) structure` – Vedant Feb 09 '22 at 12:49
  • Check my comment above, also if you can share an excerpt of the JSON file, it will make it much easier to answer the question :) – Vedant Feb 09 '22 at 12:49
  • I don't use a file for json as you see in the code, I form json programmatically, then I output what happened and then I try to insert into elastick . According to the description in the documentation, an example is given: action_and_meta_data\n optional_source\n . Or am I missing something ? – MaxVel Feb 09 '22 at 13:10

2 Answers2

0

BulkMetaData should be {"index":{"_index":"Test","_id":"1234567890"}} (without []) and it should be sent to /_bulk together with BulkData, as a single payload:

{"index":{"_index":"Test","_id":"1234567890"}}
{"url":"http://XXXX.XX"}
ilvar
  • 5,718
  • 1
  • 20
  • 17
0

Sorry for kinda necroing but I also recently needed to design a Bulk connector in our codebase and the fact that there are NO NDJSON encoder/decoders out on the web is appalling. Here is my implementation:

func ParseToNDJson(data []map[string]interface{}, dst *bytes.Buffer) error {
    enc := json.NewEncoder(dst)
    for _, element := range data {
        if err := enc.Encode(element); err != nil {
            if err != io.EOF {
                return fmt.Errorf("failed to parse NDJSON: %v", err)
            }
            break
        }
    }
    return nil
}

Driver code to test:

func main() {
    var b bytes.Buffer
    var data []map[string]interface{}
    // pointless data generation...
    for i, name := range []string{"greg", "sergey", "alex"} {
        data = append(data, map[string]interface{}{name: i})
    }
    if err := ParseToNDJson(query, &body); err != nil {
        return nil, fmt.Errorf("error encoding request: %s", err)
    }
    res, err := esapi.BulkRequest{
        Index:   "tasks",
        Body:    strings.NewReader(body.String()),
    }.Do(ctx, q.es)

Hope this helps someone

zkscpqm
  • 69
  • 1
  • 5