0

JDBC Plugin Polymorphic Index

Hi, we have a table that's polymorphic to items and we'd like to find a way to update different indexes in one logstash config.

Table Structure

Below is an example table. The item_type column denotes the type (such as Pen, Post, Collection), the item_id is a foreign key to the item in our DB, and the score is calculated on a cron and updated every once in a while, which updates our updated_at column.

popularity_scores

Process

Using logstash jdbc plugins, we'd like to query the data, then push it to ES. However, I don't see a way (other than a logstash config and sql query for each item type) to dynamically push updates to indexes. In a perfect world, we'd like to take input from the table above (see input code below)

input

    input {
        jdbc {
            jdbc_driver_library => "/usr/share/logstash/bin/mysql-connector-java-8.0.15.jar"
            jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
            # useCursorFetch needed cause jdbc_fetch_size not working??
            # https://discuss.elastic.co/t/logstash-jdbc-plugin/84874/2
            # https://stackoverflow.com/a/10772407
            jdbc_connection_string => "jdbc:mysql://${CP_LS_SQL_HOST}:${CP_LS_SQL_PORT}/${CP_LS_SQL_DB}?useCursorFetch=true&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
            statement => "select * from view_elastic_popularity_all where updated_at > :sql_last_value"
            jdbc_user => "${CP_LS_SQL_USER}"
            jdbc_password => "${CP_LS_SQL_PASSWORD}"
            jdbc_fetch_size => "${CP_LS_FETCH_SIZE}"
            last_run_metadata_path => "/usr/share/logstash/cp/last_run_files/last_run_popularity_live"
            jdbc_page_size => '10000'
            use_column_value => true
            tracking_column => 'updated_at'
            tracking_column_type => 'timestamp'
            schedule => "* * * * *"
        }
    }

Then run update queries to ES via an output plugin (see output code below)

output

    output {
      elasticsearch {
          index => "HOW_DO_WE_DYNAMICALLY_SET_INDEX_BASED_ON_ITEM_TYPE?"
          document_id => "%{id}"
          hosts => ["${CP_LS_ES_HOST}:${CP_LS_ES_PORT}"]
          user => "${CP_LS_ES_USER}"
          password => "${CP_LS_ES_PASSWORD}"
      }
    }

Help?

We can't be the first company with this problem. How would we structure the output?

timsabat
  • 2,208
  • 3
  • 25
  • 34

1 Answers1

0

You can dynamically set the name of the index by using a field in the event message in the same way that you dynamically setup the document_id.

output {
  elasticsearch {
      index => "%{item_type}"
      document_id => "%{id}"
      hosts => ["${CP_LS_ES_HOST}:${CP_LS_ES_PORT}"]
      user => "${CP_LS_ES_USER}"
      password => "${CP_LS_ES_PASSWORD}"
  }
}
mihomir
  • 197
  • 6
  • 15