3

I have an Amazon Elasticsearch instance which is active, and I'm able to connect and execute statements through 'Sense' from Chrome. But when I try to do bulk inserts, it shows 'timeout' error. I've been trying through both Python (bulk helper) and logstash module, getting the same error both ways.

Below is the code used

import psycopg2
from elasticsearch import Elasticsearch, helpers
import time

connection = psycopg2.connect(database='dbname', user='username', password='password', host='abc.def.com', port=5432)
es = Elasticsearch('elasticsearchinstance.amazonaws.com', max_retries=3, retry_on_timeout=True, request_timeout='10m')
cursor = connection.cursor()

query = """
select column1,column2,column3 from table
"""
cursor.execute(query)
rows = cursor.fetchall()
dict_list = []
for i in range(len(rows)):
    dict_list.append({'_type':'doc', '_index':'es_index', '_id':rows[i][0], 'column2':rows[i][1], 'column3':rows[i][2]})

print len(dict_list)

es.indices.delete(index='es_index', ignore=[400, 404])

time.sleep(2)

mapping = "{\"settings\" : {\"analysis\" : { \"analyzer\" : { \"my_ngram_analyzer\" : { \"tokenizer\" : \"my_ngram_tokenizer\" }},\"tokenizer\" : {\"my_ngram_tokenizer\" : {\"type\" : \"nGram\" , \"min_gram\" : \"2\" , \"max_gram\" : \"50\" }}}}, \"mappings\": { \"doc\": { \"_id\" : { \"path\" : \"id\" }, \"properties\": { \"column2\": { \"type\": \"string\", \"analyzer\": \"my_ngram_analyzer\" }, \"id\": { \"type\": \"long\" }, \"column3\": { \"type\": \"integer\" }}}}}"
es.indices.create(index='es_index', ignore=400, body=mapping)

helpers.bulk(es, dict_list)

The error obtained through Python Bulk helper is as below

Traceback (most recent call last):
File "D:\Python\refresh_data.py", line 21, in <module>
es.indices.delete(index='es_index', ignore=[400, 404])
File "C:\Python27\lib\site-packages\elasticsearch\client\utils.py", line 69, in _wrapped
return func(*args, params=params, **kwargs)
File "C:\Python27\lib\site-packages\elasticsearch\client\indices.py", line 198, in delete
params=params)
File "C:\Python27\lib\site-packages\elasticsearch\transport.py", line 307, in perform_request
status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
File "C:\Python27\lib\site-packages\elasticsearch\connection\http_urllib3.py", line 89, in perform_request
raise ConnectionError('N/A', str(e), e)

elasticsearch.exceptions.ConnectionError:
ConnectionError((<urllib3.connection.HTTPConnection object at 0x0000000002C91898>, u'Connection to elasticsearchinstance.amazonaws.com timed out. (connect timeout=10)')) 
caused by:
ConnectTimeoutError((<urllib3.connection.HTTPConnection object at 0x0000000002C91898>, u'Connection to elasticsearchinstance.amazonaws.com timed out. (connect timeout=10)'))

Similar timeout error with Logstash (for bulk insert) as well (will edit and update the error of logstash if needed).

In need of help to solve this timeout issue with Amazon Elasticsearch Service.

Thanks in advance.

Edit:

Here is the error I'm getting with 'Logstash' when I execute bulk insert into Amazon ES

C:\logstash-1.5.4\bin>logstash agent -f feed_load_amazon_es.conf
io/console not supported; tty will not be manipulated
←[31mFailed to install template: connect timed out {:level=>:error}←[0m
Logstash startup completed
←[31mGot error to send bulk of actions: connect timed out {:level=>:error}←[0m
←[33mFailed to flush outgoing items {:outgoing_count=>3, :exception=>"Manticore::ConnectTimeout", 
:backtrace=>["C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:35:in `initialize'", 
"org/jruby/RubyProc.java:271:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:70:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:245:in `call_once'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:148:in `code'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/http/manticore.rb:71:in `perform_request'", 
"org/jruby/RubyProc.java:271:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/base.rb:190:in `perform_request'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/http/manticore.rb:54:in `perform_request'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/client.rb:119:in `perform_request'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-api-1.0.12/lib/elasticsearch/api/actions/bulk.rb:80:in `bulk'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch/protocol.rb:104:in `bulk'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:542:in `submit'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:566:in `flush'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/buffer.rb:219:in `buffer_flush'", 
"org/jruby/RubyHash.java:1341:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/buffer.rb:216:in `buffer_flush'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:600:in `teardown'", 
"org/jruby/RubyArray.java:1613:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:248:in `outputworker'", 
"org/jruby/RubyArray.java:1613:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:247:in `outputworker'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:166:in `start_outputs'"], :level=>:warn}←[0m
Hari Shreyas
  • 65
  • 1
  • 8
  • 1
    Can you show how you connect to ES from Sense (i.e. what URL you're using)? – Val Dec 07 '15 at 10:16
  • Had edited the specifics (server names and username-password) for privacy concerns. I'm using the endpoint that was mentioned in the AWS console. I'm even able to hit ES URL without 'Sense' : ` { "status" : 200, "name" : "Ardina", "cluster_name" : "12345678:clustername, "version" : { "number" : "1.5.2", "build_hash" : "aabcabcabcabcabcabcabc", "build_timestamp" : "2015-04-27T09:21:06Z", "build_snapshot" : false, "lucene_version" : "4.10.4" }, "tagline" : "You Know, for Search" }` – Hari Shreyas Dec 07 '15 at 10:22

1 Answers1

0

You are doing it wrong I think.

A bulk request is a 2 lines combination in the "body" field of the bulk method.

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }

Here is what you should have in your body field.

The first line contains the type of request, the index where you bulk and a lot of other params that you can set or not (check documentation). Add a \r\n at the end of the first line.

The second line must contains what you're trying to insert.

If you check what you're putting into dict_list, you forgot the index method call.

Wrong structure :

dict_list.append({'_type':'doc', '_index':'es_index', '_id':rows[i][0], 'column2':rows[i][1], 'column3':rows[i][2]})

Right structure :

{ "index" : {'_type':'doc', '_index':'es_index', '_id':rows[i][0]} }

And then add your document on a second line.

JonathanG
  • 292
  • 1
  • 10
  • 1
    I used the same code for another ES instance which is hosted on a Windows Virtual Machine, and it worked without a hitch. So i'm sure the python code works fine. I get this error only with Amazon Elasticsearch.. Should I make any config changes to the Amazon ES instance ? – Hari Shreyas Dec 07 '15 at 11:12
  • Yeah your ES instance is just not connecting. Right port? Ip? Port open? – JonathanG Dec 07 '15 at 14:46
  • 1
    Yes, I'm able to connect to the ES Instance. I get a response when I hit the URL, and I'm also able to connect through 'Sense' chrome AddOn. I've added my IP range in the allowed IPs, so the connection route is clear. The timeout error occurs during bulk insert, through Python or Logstash. – Hari Shreyas Dec 08 '15 at 05:08
  • 1
    I saw the thread at http://stackoverflow.com/questions/20288770/how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python/ @diolor requesting your comments on this, I'm using a code similar to the one mentioned in the answer, any hunch on why the 'timeout' error occurs ? – Hari Shreyas Dec 08 '15 at 07:04
  • How much datas are you sending? Maybe the requests takes too much time and your ES instance times you out. Got any way to configure it on amazon? – JonathanG Dec 08 '15 at 08:55
  • For the sake of example, I have made the query as "select column1,column2,column3 from table" but I had used 'limit 10' to get and push 10 records. Got the same error. – Hari Shreyas Dec 08 '15 at 10:09
  • Does it take a bunch of time to throw you the error or does it pops instant? – JonathanG Dec 08 '15 at 10:28
  • 1
    It takes like 10-15 seconds for the first error to appear, then the same error is printed in the console again and again. I've printed the data once it is read from the database, so I know for sure the db call doesn't delay to read and fetch data. The exact same code works perfectly fine with the ES instance I've exposed from a separate VM. The error just occurs with Amazon Elasticsearch Service. – Hari Shreyas Dec 08 '15 at 13:36
  • Okay easy test, try to change the size of what you're bulking. From a huuuuuuge file to a small one. If it times out after the same amount of time it's because in some way it "doesn't connect", if it does not takes the same amount of time, then your bulking works but is just slow. Anyway, you should contact Amazon and report them your problem, it seems like... Something is not active or misconfigured. – JonathanG Dec 08 '15 at 15:41