0

I am attempting to index a DataFrame of the below schema in ElasticSearch using the elasticsearch-hadoop connector.

 |-- ROW_ID: long (nullable = false)
 |-- SUBJECT_ID: long (nullable = false)
 |-- HADM_ID: long (nullable = true)
 |-- CHARTDATE: date (nullable = false)
 |-- CATEGORY: string (nullable = false)
 |-- DESCRIPTION: string (nullable = false)
 |-- CGID: integer (nullable = true)
 |-- ISERROR: integer (nullable = true)
 |-- TEXT: string (nullable = true)

When writing this DataFrame to ElasticSearch, the "CHARTDATE" field is being written as a long. According to the documentation for the connector I am using (shown below) DateType fields in Spark should be written as string-formatted dates in ElasticSearch. As I was hoping to build some visualizaitons in Kibana leveraging the date fields, them being written as longs is proving problematic.

https://www.elastic.co/guide/en/elasticsearch/hadoop/6.4/spark.html

Code used to produce error

val elasticOptions = Map(
      "es.nodes"              -> esIP,
      "es.port"               -> esPort,
      "es.mapping.id"         -> primaryKey,
      "es.index.auto.create"  -> "yes",
      "es.nodes.wan.only"     -> "true",
      "es.write.operation"    -> "upsert",
      "es.net.http.auth.user" -> esUser,
      "es.net.http.auth.pass" -> esPassword,
      "es.spark.dataframe.write.null" -> "true",
      "es.mapping.date.rich" -> "true"
    )
castedDF.saveToEs(index, elasticOptions)

Is there a step I am missing to have these values written as ES dates?

mongolol
  • 941
  • 1
  • 13
  • 31

2 Answers2

1

Long time I haven't Spark with ElasticSearch; But this DateType problem was really annoying for me.

What I was doing to make this work was : * Convert the DateType to epoch timestamp in Spark (not sure if necessary step here) * Specify in Kibana or with a curL PUT request when I initialise the index scheme that the field CHARTDATE will be of type date like this :

PUT /spark
{
 "mappings": {
  "log": {
    "properties": {
      "CHARTDATE": {
        "type": "date"
      }
    }
  }
 }
} 

I dunno if Elastic 6.4 changed anything, and if you find a better solution I would appreciate if you can share to us later !

I know this isn't really the best solution, having to PUT the index before running the saveToEs action from Spark. But that really was the thing that fix it for me.

tricky
  • 1,413
  • 1
  • 15
  • 26
  • Using put requests to seed the ES index with the date fields resolved my issue. Thanks for the suggestion! – mongolol Sep 19 '18 at 05:15
1

According to documentation: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapping.html#mapping-date

You must use https://en.wikipedia.org/wiki/ISO_8601 format.

Thomas Decaux
  • 21,738
  • 2
  • 113
  • 124
  • Thanks for the response! I tried casting `currentDF.withColumn("CHARTDATE", to_date(col("CHARTDATE"), "yyyy-mm-dd'T'hh:mm:ss.SSSZ"))` and sadly got the same issue. I'll look into this casting in conjunction with @tricky's seeding method below. – mongolol Sep 10 '18 at 20:36
  • Also, to debug what Spark is sending, you could use a HTTP proxy like Charles, this helps a lot to understand how es4hadoop works. Keep us posted ! thanks – Thomas Decaux Sep 12 '18 at 08:20
  • Ended up just needing to build a method which seeded the index. TimeStamps and Dates were both handled correctly. – mongolol Sep 19 '18 at 05:16
  • @mongolol can you elaborate what you mean by `seeded the index` ? I'm having same problem – eugene Jan 03 '20 at 06:02
  • @eugene I recurse through the schema returned by `df.schema` to build a mappings JSON and then post that to ES prior to writing the data. Refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html#create-mapping for ES's mappings API and https://stackoverflow.com/questions/37471346/automatically-and-elegantly-flatten-dataframe-in-spark-sql (the flatten schema response) for recursing through the schema. In this case, you're looking for fields of `DateType` and `TimestampType`. – mongolol Jan 07 '20 at 15:16