5

What is the correct way to use elasticsearch-py in multiprocessing script? Should I create a new client object before start processes and use that object or should I create a new object inside each of the processes. The 2nd one gives me an an error with connection issues from elasticsearch

Thanks Kiran

kiran
  • 477
  • 1
  • 7
  • 11

2 Answers2

3

It seems the first method works for me, when I declare the client object as a global variable.

from multiprocessing import Pool
from elasticsearch import Elasticsearch
import time


def task(body):
    result = es.index(index='test', doc_type='test', body=body)
    return result


def main():
    pool = Pool(processes=MAX_CONNECTS)
    result = []
    for x in range(10):
        result.append(pool.apply_async(task, ({'id': x},)))
    time.sleep(1)
    for rs in result:
        print(rs.get())


if __name__ == "__main__":
    MAX_CONNECTS = 5
    es = Elasticsearch(hosts="localhost", maxsize=MAX_CONNECTS)
    main()

The output looks like

{'_index': 'test', '_type': 'test', '_id': 'xEjqBWcB9xsUYKqz-P6U', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'w0jqBWcB9xsUYKqz-P6U', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'x0jqBWcB9xsUYKqz-P6X', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'xkjqBWcB9xsUYKqz-P6X', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'xUjqBWcB9xsUYKqz-P6W', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'yEjqBWcB9xsUYKqz-P66', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'ykjqBWcB9xsUYKqz-P7I', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'yUjqBWcB9xsUYKqz-P7I', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'y0jqBWcB9xsUYKqz-P7P', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 4, '_primary_term': 1}
{'_index': 'test', '_type': 'test', '_id': 'zEjqBWcB9xsUYKqz-P7V', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 5, '_primary_term': 1}
iuyoy
  • 121
  • 1
  • 6
0

The recommended way is to create a unique client object and you can increase the number of simultaneous thread using the maxsize (10 by default).

es = Elasticsearch( "host1", maxsize=25)

Source

ML_TN
  • 727
  • 6
  • 16
  • 1
    Wrong: In the link you have posted above "Since we use persistent connections throughout the client it **means that the client doesn’t tolerate fork very well**. If your application calls for multiple processes make sure you create a fresh client after call to fork. Note that Python’s multiprocessing module uses fork to create new processes on POSIX systems." – Antoine Sep 15 '20 at 15:36