79

I have to store some message in ElasticSearch integrate with my python program. Now what I try to store the message is:

d={"message":"this is message"}
    for index_nr in range(1,5):
        ElasticSearchAPI.addToIndex(index_nr, d)
        print d

That means if I have 10 messages then I have to repeat my code 10 times. So what I want to do is try to make a script file or batch file. I've checked the ElasticSearch Guide, BULK API is possible to use. The format should be something like below:

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

what I did is:

{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}

I also use curl tool to store the doc.

$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json

Now I want to use my Python code to store the file to the Elastic Search.

Nazim Kerimbekov
  • 4,712
  • 8
  • 34
  • 58
chengji18
  • 943
  • 1
  • 8
  • 9
  • Have a look at some python clients like pyes : https://github.com/aparo/pyes OR the elasticsearch official client https://github.com/elasticsearch/elasticsearch-py – mconlin Nov 30 '13 at 14:48
  • Thank you so much, I check some client and I try to use pyelasticsearch. And I already store the stall with bulk index with pyelastic. In pyelasticsearch the doc file will be inside the code. Is that possible to put the doc file which I want to bulk index outside the program? – chengji18 Dec 12 '13 at 10:37

5 Answers5

148
from datetime import datetime

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es = Elasticsearch()

actions = [
  {
    "_index": "tickets-index",
    "_type": "tickets",
    "_id": j,
    "_source": {
        "any":"data" + str(j),
        "timestamp": datetime.now()}
  }
  for j in range(0, 10)
]

helpers.bulk(es, actions)
julienfr112
  • 2,077
  • 2
  • 25
  • 37
Justina Chen
  • 1,511
  • 1
  • 9
  • 9
  • 8
    I didn't leave any comment because I guess the codes are quite clear. – Justina Chen Feb 07 '14 at 07:18
  • 2
    This worked for me, thanks. A side note if you don't provide "_id" one will be automatically generated for you. – xamox Nov 17 '15 at 18:32
  • 2
    I'm using bulk() as described in this answer, but as I check my index while my script is running, I don't see anything being uploaded to the index. When I used the index() function to upload one-at-a-time, I immediately see results. Not sure what's happening. I'm also not getting any errors nor exceptions. Help please! – Soubriquet Oct 03 '16 at 17:59
  • @Soubriquet: facing same issue. Any luck? – Saad Abdullah Aug 24 '17 at 23:02
  • Same issue has anyone figured this out? – Ominus Sep 15 '17 at 13:58
  • 1
    See my answer below that uses Python Generators, allowing you to add millions of data to ES without having the need for more RAM. – Ethan Feb 25 '18 at 16:09
  • I changed the type from `_type` to `doc_type` - and it works for me – Yakir GIladi Edry Sep 11 '22 at 12:19
51

Although @justinachen 's code helped me start with py-elasticsearch, after looking in the source code let me do a simple improvement:

es = Elasticsearch()
j = 0
actions = []
while (j <= 10):
    action = {
        "_index": "tickets-index",
        "_type": "tickets",
        "_id": j,
        "_source": {
            "any":"data" + str(j),
            "timestamp": datetime.now()
            }
        }
    actions.append(action)
    j += 1

helpers.bulk(es, actions)

helpers.bulk() already does the segmentation for you. And by segmentation I mean the chucks sent every time to the server. If you want to reduce the chunk of sent documents do: helpers.bulk(es, actions, chunk_size=100)

Some handy info to get started:

helpers.bulk() is just a wrapper of the helpers.streaming_bulk but the first accepts a list which makes it handy.

helpers.streaming_bulk has been based on Elasticsearch.bulk() so you do not need to worry about what to choose.

So in most cases, helpers.bulk() should be all you need.

CMPSoares
  • 4,175
  • 3
  • 24
  • 42
Diolor
  • 13,181
  • 30
  • 111
  • 179
  • 6
    in python, I would strongly suggest the use of list/dictionary comprehensions (in combination with enumerate for the variable j and range-indexing to ensure j <= 10) instead of loops. This is a more readable style of coding, and allows to create generators (python3), preventing premature materialization. – Herbert Aug 31 '15 at 13:12
  • If you cannot load all your documents into a memory to let helpers.bulk do batching for you than this won't work and still requires you to batch your calls to helpers.bulk. – Chris Feb 20 '17 at 23:43
42

(the other approaches mentioned in this thread use python list for the ES update, which is not a good solution today, especially when you need to add millions of data to ES)

Better approach is using python generators -- process gigs of data without going out of memory or compromising much on speed.

Below is an example snippet from a practical use case - adding data from nginx log file to ES for analysis.

def decode_nginx_log(_nginx_fd):
    for each_line in _nginx_fd:
        # Filter out the below from each log line
        remote_addr = ...
        timestamp   = ...
        ...

        # Index for elasticsearch. Typically timestamp.
        idx = ...

        es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
        es_fields_vals = (remote_addr, timestamp, url, status)

        # We return a dict holding values from each line
        es_nginx_d = dict(zip(es_fields_keys, es_fields_vals))

        # Return the row on each iteration
        yield idx, es_nginx_d   # <- Note the usage of 'yield'

def es_add_bulk(nginx_file):
    # The nginx file can be gzip or just text. Open it appropriately.
    ...

    es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}])

    # NOTE the (...) round brackets. This is for a generator.
    k = ({
            "_index": "nginx",
            "_type" : "logs",
            "_id"   : idx,
            "_source": es_nginx_d,
         } for idx, es_nginx_d in decode_nginx_log(_nginx_fd))

    helpers.bulk(es, k)

# Now, just run it.
es_add_bulk('./nginx.1.log.gz')

This skeleton demonstrates the usage of generators. You can use this even on a bare machine if you need to. And you can go on expanding on this to tailor to your needs quickly.

Python Elasticsearch reference here.

Ethan
  • 4,915
  • 1
  • 28
  • 36
12

There are two options which I can think of at the moment:

1. Define index name and document type with each entity:

es_client = Elasticsearch()

body = []
for entry in entries:
    body.append({'index': {'_index': index, '_type': 'doc', '_id': entry['id']}})
    body.append(entry)

response = es_client.bulk(body=body)

2. Provide the default index and document type with the method:

es_client = Elasticsearch()

body = []
for entry in entries:
    body.append({'index': {'_id': entry['id']}})
    body.append(entry)

response = es_client.bulk(index='my_index', doc_type='doc', body=body)

Works with:

ES version:6.4.0

ES python lib: 6.3.1

Rafal Enden
  • 3,028
  • 1
  • 21
  • 16
1

My working code

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch_dsl import connections
import pandas as pd


# initialize list of lists
data = [['tom', 10, 'NY'], ['nick', 15, 'NY'], ['juli', 14, 'NY'], ['akshay', 30, 'IND'], ['Amit', 14, 'IND']]

# Create the pandas DataFrame
df = pd.DataFrame(data, columns = ['Name', 'Age', 'Country'])

from elasticsearch import Elasticsearch
from elasticsearch import helpers

es_client = connections.create_connection(hosts=['http://localhost:9200/'])
def doc_generator(df):
    df_iter = df.iterrows()
    for index, document in df_iter:
        yield {
                "_index": 'age_sample',
                "_type": "_doc",
                "_source": document,
            }

helpers.bulk(es_client, doc_generator(df))

#get data from elastic search
from elasticsearch_dsl import Search
s = Search(index="age_sample").query("match", Name='nick')
Flair
  • 2,609
  • 1
  • 29
  • 41