1

Stuck at the point to index data collection in elasticsearch.

Following is the code I'm trying to index the data from mongo.

const elasticsearch = require('elasticsearch');
// instantiate an Elas

var bulk = [];


var MongoClient = require('mongodb').MongoClient;
var ObjectID = require('mongodb').ObjectID;
var mongoDBName = 'mydb'; // Name of mongodb goes here
var mongoCollectionName = 'mycollection'; // Collection name of mongodb goes here
var connectionString = 'mongodb://127.0.0.1:27017/'; // put username and password for mongo here

var esIndexName = 'new-collection'; // Elasticsearch index name will go here
var bulk = [];
const client = new elasticsearch.Client({
   hosts: [ 'http://localhost:9200']
});
// ping the client to be sure Elasticsearch is up
client.ping({
     requestTimeout: 30000,
 }, function(error) {
 // At this point, eastic search is down, please check your Elasticsearch service
     if (error) {
         console.error('Elasticsearch cluster is down!');
     } else {
         console.log('Everything is ok');
     }
 });


MongoClient.connect(connectionString+mongoDBName, function(err, db) {
    if(err) throw err;

   // for each object in a collection
   var collection = db.collection(mongoCollectionName);
   var counter = 0;
   collection.find().each(function(err, item, response, status) {
       console.log(item)
    Array.from(item).forEach(itemdata => {
        bulk.push({index:{ 
                        _index: esIndexName, 
                        _type: mongoCollectionName,
                    }          
                })
        bulk.push(itemdata)
        })
        //perform bulk indexing of the data passed
        client.bulk({body:bulk}, function( err, response  ){ 
            if( err ){ 
                console.log("Failed Bulk operation".red, err) 
            } else { 
                console.log("Successfully imported %s".green, mongoCollectionName.length); 
            } 
            console.log(response);
        });

    if(item != null) {    
        if(counter % 100 == 0) console.log( "Syncing object id: "+ item['_id'] + " #: " + counter);
        client.indices.create(
         { index: esIndexName },
         function(error, response) {
            if (error) {
                     console.log(error);
                 } else {
               console.log("created a new index", response);
              }
         }
       );
   }
     counter += 1;
   });
});

So here I'm trying to indexing data into elasticsearch, I'm able to create the collection index, but failed to insert the data in index of elastic search. Can anyone help me here? Where I'm getting wrong, and what mistake I'm doing here. I'm using nodejs here, just simple function to test, later will add lambda function to update/delete and which any change.

XFW
  • 89
  • 2
  • 9

3 Answers3

0

First of all, I would suggest to tidy up your code ; it's very difficult to see how the blocks are nested.

Now, there are several problems with your code:

  1. Why are you doing Array.from(item).forEach(itemdata => {? item is a document object from Mongo, so doing Array.from on it has no effect.
  2. You are calling the bulk API inside the .each callback ; meaning you'll do an API call for each document. I don't think this is what you want.
  3. You are creating the index after the bulk operation. This is wrong. You should create your ES index once and for all before inserting documents. It's important because in the future, you'll want to have a more advanced configuration to process your documents.
  4. Your ping call to ES is nice, but it doesn't prevent the rest of your code to run if the cluster is down.

So what you should do:

  1. Create your ES index before iterating over you documents.
  2. Iterate over your MongoDB documents and accumulate them in your body object.
  3. When you have a batch of n documents, call the bulk API and reset your body.
frankie567
  • 1,703
  • 12
  • 20
0

Here is the solution you are looking for

index.js

//MongoDB client config
var MongoClient = require('mongodb').MongoClient;
var mongoDBName = 'mydb'; // Name of mongodb goes here
var mongoCollectionName = 'mycollection'; // Collection name of mongodb goes here
var connectionString = 'mongodb://127.0.0.1:27017/'; // put username and password for mongo here

//Elasticsearch client config
const { Client } = require('@elastic/elasticsearch')
const esClient = new Client({ node: 'http://localhost:9200' });
var esIndexName = 'new-collection'; // Elasticsearch index name will go here

let bulk = [];

async function indexData() {

  const client = await MongoClient.connect(connectionString, { useNewUrlParser: true })
    .catch(err => { console.log(err); });

  if (!client) {
    return;
  }

  try {

    const db = client.db(mongoDBName);

    let collection = db.collection(mongoCollectionName);
    await collection.find().forEach((doc) => {
      bulk.push({
        index: {
          _index: esIndexName,
        }
      })

      let { _id, ...data } = doc;
      bulk.push(data);
    })
    console.log(bulk);

    await esClient.indices.create({
      index: esIndexName,
    }, { ignore: [400] })

    const { body: bulkResponse } = await esClient.bulk({ refresh: true, body: bulk })

    if (bulkResponse.errors) {
      const erroredDocuments = []
      // The items array has the same order of the dataset we just indexed.
      // The presence of the `error` key indicates that the operation
      // that we did for the document has failed.
      bulkResponse.items.forEach((action, i) => {
        const operation = Object.keys(action)[0]
        if (action[operation].error) {
          erroredDocuments.push({
            // If the status is 429 it means that you can retry the document,
            // otherwise it's very likely a mapping error, and you should
            // fix the document before to try it again.
            status: action[operation].status,
            error: action[operation].error,
            operation: bulk[i * 2],
            document: bulk[i * 2 + 1]
          })
        }
      })
      console.log(erroredDocuments)
    }

    const { body: count } = await esClient.count({ index: esIndexName })
    console.log(count)

  } catch (err) {

    console.log(err);
  } finally {
    client.close();
  }
}

indexData();

package.json

{
  "name": "elastic-node-mongo",
  "version": "1.0.0",
  "description": "Simple example to connect ElasticSearch, MongoDB and NodeJS",
  "main": "index.js",
  "dependencies": {
    "@elastic/elasticsearch": "^7.3.0",
    "mongodb": "^3.3.2",
    "nodemon": "1.18.3"
  },
  "scripts": {
    "dev": "nodemon",
    "start": "node index.js"
  },
  "keywords": [
    "nodejs",
    "node",
    "mongodb",
    "elasticsearch",
    "docker"
  ],
  "author": "Sathishkumar Rakkiasmy",
  "license": "ISC"
}

Clarifications

I'm able to create the collection index but failed to insert the data in an index of elastic search.

Above sentence makes sense. Because the bulk variable is unaltered.

Refer below links why bulk variable is unaltered.

Why is my variable unaltered after I modify it inside of a function? - Asynchronous code reference

How do I return the response from an asynchronous call?

To know more about asynchronous programming

https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous

https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous/Async_await

Sathishkumar Rakkiyasamy
  • 3,509
  • 2
  • 30
  • 34
-1

You can make logstash to import data from mongo db to elasticsearch.Please find attached configuration for your reference.

 input {
    mongodb {
    codec => “json”
    uri => ‘mongodb://localhost:27017/NewDb’
    placeholder_db_dir => ‘/home/devbrt.shukla/Desktop/scalaoutput/ELK/logstash-6.4.1/db_dir’
    placeholder_db_name => ‘Employee_sqlite.db’
    collection => ‘Employee’
    batch_size => 5000
    generateId => ‘true’
    parse_method => “simple”
    }
    }
    filter {
    mutate {
    remove_field => [ “_id” ]
    }
    }
    output {
    elasticsearch {
    hosts => [“localhost:9200”]
    index => “employee-%{+YYYY.MM.dd}”
    }
    stdout { codec => rubydebug } }

In Logstash we will three sections Input, Filter and Output.

Input: Is to take data from sql, mongodb, mysql etc..
Filter: In this section, we can frame customized json to index into elasticsearch.
Output: In this section we will put Index name, doc type and Ip address of the output section i.e. elasticsearch.