-1

I have a bunch of CSV files that are needed to be migrated to Elastichsearch, I managed to use Logstash version 7.16.3, the index is already created on elastichsearch using the proper mapping. the configuration file is as below:

input{
    file{
        path=> "C:/Users/fr-pa/Documents/wikidata/extracted/*.csv"
        start_position => "beginning"
        sincedb_path => "NULL"
    } } filter{
    csv{
        separator => ","
        columns =>["id", "type", "arlabel", "enlabel","araliases",
                                      "enaliases","ardescription","endescription","maincategory",
                                      "arwiki", "enwiki","arwikiquote", "enwikiquote"]
    } } output{
    elasticsearch{
        hosts=> "http://localhost:9200/"
        index => "wikidata_index"
        
    }
    stdout {
       
    } }

But the data is not migrated, the output of the Logstash:

logstash cmd

Does anyone have an idea what is the problem?

This is my index

index settings

request_body = {
    "settings": {
    "analysis": {
      "filter": {
        "arabic_stop": {
          "type":       "stop",
          "stopwords":  "_arabic_"
        },
        "arabic_keywords": {
          "type":       "keyword_marker",
          "keywords":   ["مثال"]
        },
        "arabic_stemmer": {
          "type":       "stemmer",
          "language":   "arabic"
        },
        "english_stop": {
          "type":       "stop",
          "stopwords":  "_english_"
        },
        "english_keywords": {
          "type":       "keyword_marker",
          "keywords":   ["example"]
        },
        "english_stemmer": {
          "type":       "stemmer",
          "language":   "english"
        },
        "english_possessive_stemmer": {
          "type":       "stemmer",
          "language":   "possessive_english"
        }
      },
      "analyzer": {
        "rebuilt_arabic": {
          "tokenizer":  "standard",
          "filter": [
            "lowercase",
            "decimal_digit",
            "arabic_stop",
            "arabic_normalization",
            "arabic_keywords",
            "arabic_stemmer"
          ]
        },
       "comma_split":{
          "type" : "pattern",
          "pattern" : ","
        },
        "rebuilt_english": {
          "tokenizer":  "standard",
          "filter": [
            "english_possessive_stemmer",
            "lowercase",
            "english_stop",
            "english_keywords",
            "english_stemmer"
          ]
        }
      }
    }
  } ,
   "mappings": {
        "properties": {
          "id": {
          "type": "keyword",
          "ignore_above": 256
        },
          "type": {
          "type": "text",
          "analyzer": "comma_split"
        },
        "arlabel": {
          "type": "text",
          "analyzer": "rebuilt_arabic"
        },
        "enlabel": {
          "type": "text",
          "analyzer": "rebuilt_english"
        },
        "araliases": {
          "type": "text",
          "analyzer": "comma_split"
        },
        "enaliases": {
          "type": "text",
          "analyzer": "comma_split"
        },
        "ardescription":{
          "type": "text",
          "analyzer": "rebuilt_arabic"
        },
          "endescription":{
          "type": "text",
          "analyzer": "rebuilt_english"
        },
          "maincategory":{
          "type": "text",
          "analyzer": "comma_split"
        },
          "arwiki":{
          "type": "text",
          "analyzer": "rebuilt_arabic"
        },
          "enwiki":{
          "type": "text",
          "analyzer": "rebuilt_english"
        },
          "arwikiquote":{
          "type": "text",
          "analyzer": "rebuilt_arabic"
        },
        "enwikiquote": {
          "type": "text",
          "analyzer": "rebuilt_english"
        }
      }
    }
}

please note that there are some fields that have empty values, I tried to use the python Bulk helper class to insert the data:

with open(full_path,encoding="utf8") as f:
    reader = csv.DictReader(f)
    print(reader)
    helpers.bulk(es, reader, index='wikidata_index') 

The error raised is :

C:\Users\fr-pa\Documents\wikidata\extracted\till_Q10091689_item.csv
<csv.DictReader object at 0x0000028E86C47EB0>
---------------------------------------------------------------------------
BulkIndexError                            Traceback (most recent call last)
<ipython-input-42-3849641bd8f9> in <module>
      5         reader = csv.DictReader(f)
      6         print(reader)
----> 7         helpers.bulk(es, reader, index='wikidata_index')

C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in bulk(client, actions, stats_only, ignore_status, *args, **kwargs)
    408     # make streaming_bulk yield successful results so we can count them
    409     kwargs["yield_ok"] = True
--> 410     for ok, item in streaming_bulk(
    411         client, actions, ignore_status=ignore_status, *args, **kwargs
    412     ):

C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in streaming_bulk(client, actions, chunk_size, max_chunk_bytes, raise_on_error, expand_action_callback, raise_on_exception, max_retries, initial_backoff, max_backoff, yield_ok, ignore_status, *args, **kwargs)
    327 
    328             try:
--> 329                 for data, (ok, info) in zip(
    330                     bulk_data,
    331                     _process_bulk_chunk(

C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception, raise_on_error, ignore_status, *args, **kwargs)
    254             raise_on_error=raise_on_error,
    255         )
--> 256     for item in gen:
    257         yield item
    258 

C:\ProgramData\Anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk_success(resp, bulk_data, ignore_status, raise_on_error)
    185 
    186     if errors:
--> 187         raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
    188 
    189 

BulkIndexError: ('500 document(s) failed to index.', [{'index': {'_index': 'wikidata_index', '_type': '_doc', '_id': 'dbxzon4BOVq7OZfct2-t', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'field name cannot be an empty string'}},
Mai
  • 121
  • 1
  • 10
  • Did you run this pipeline before? Try to change `sincedb_path` to `NUL` instead of `NULL`, this is what you need to not use a sincedb in windows. – leandrojmp Jan 28 '22 at 22:01
  • I tried to run the pipeline a few months ago and it worked, but yesterday it didn't work and I didn't get the error – Mai Jan 29 '22 at 07:23
  • I'm sorry, but I don't understand the issue anymore, first you said you had a problem with Logstash, then you edited the question and shared a Python stack trace which is completely unrelated to Logstash. You need to explain what you are doing and what is not working. But as the answer below says, you can't have empty field names. – leandrojmp Jan 29 '22 at 12:36

2 Answers2

0

The problem is evident from the last line of the stacktrace:

BulkIndexError: ('500 document(s) failed to index.', [{'index': {'_index': 'wikidata_index', '_type': '_doc', '_id': 'dbxzon4BOVq7OZfct2-t', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'field name cannot be an empty string'}},

You need to remove or replace the fields with empty keys, for example in a way like this one.

dcolazin
  • 831
  • 1
  • 10
  • 25
0
The solution that worked for :

 with open(full_path,encoding="utf8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            if row['type']  not in ['Q4167410', 'Q4167836']:
                if row['araliases'] !="<class 'str'>":
                    araliases = row['araliases']
                else:
                    araliases= None
                if row['enaliases'] !="<class 'str'>":
                    enaliases = row['enaliases']
                else:
                    enaliases= None
                if row['maincategory'] !="<class 'str'>":
                    maincategory = row['maincategory']
                else:
                    maincategory= None                
                wikidata_item ={
                    "id":row['id'],
                    "type":row['type'],
                    "arlabel":row['arlabel'],
                    "enlabel":row['enlabel'],
                    "araliases": araliases,
                    "enaliases":enaliases,
                    "ardescription":row['ardescription'],
                    "endescription":row['endescription'],
                    "maincategory": maincategory,
                    "arwiki":row['arwiki'],
                    "enwiki":row['enwiki'],
                    "arwikiquote":row['arwikiquote']             

                }
                actions.append(wikidata_item)
    #print(actions)
    helpers.bulk(es,actions, index='wikidata_index')
Mai
  • 121
  • 1
  • 10