17

I am using logstash jdbc to keep the things syncd between mysql and elasticsearch. Its working fine for one table. But now I want to do it for multiple tables. Do I need to open multiple in terminal

logstash  agent -f /Users/logstash/logstash-jdbc.conf 

each with a select query or do we have a better way of doing it so we can have multiple tables being updated.

my config file

input {
  jdbc {
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from table1"
  }
}
output {
    elasticsearch {
        index => "testdb"
        document_type => "table1"
        document_id => "%{table_id}"
        hosts => "localhost:9200"
    }
}
Asim Zaidi
  • 27,016
  • 49
  • 132
  • 221
  • You can have a single config with multiple `jdbc` input and then parametrize the `index` and `document_type` in your `elasticsearch` output depending on which table the event is coming from. – Val Jun 03 '16 at 11:53
  • any example or sample you have ? – Asim Zaidi Jun 03 '16 at 11:57

3 Answers3

43

You can definitely have a single config with multiple jdbc input and then parametrize the index and document_type in your elasticsearch output depending on which table the event is coming from.

input {
  jdbc {
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from table1"
    type => "table1"
  }
  jdbc {
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from table2"
    type => "table2"
  }
  # add more jdbc inputs to suit your needs 
}
output {
    elasticsearch {
        index => "testdb"
        document_type => "%{type}"   # <- use the type from each input
        hosts => "localhost:9200"
    }
}
Val
  • 207,596
  • 13
  • 358
  • 360
  • 1
    hmm...I think that the issue would be document_id => "%{table_id}" unless I can auto generate a unique document_id – Asim Zaidi Jun 03 '16 at 22:58
  • Can you elaborate please? Do you have different ID fields per table you'd like to use as the document ID? – Val Jun 04 '16 at 05:28
  • yes so I have different IDs and I rather have elasticsearch create ID instead of trying to use already existing mysql IDS – Asim Zaidi Jun 04 '16 at 19:04
  • Then simply remove the `document_id` stanza and ES will autogenerate its own IDs. I've updated my answer. – Val Jun 05 '16 at 05:54
  • thank you ...can you please tell me how to get output on the screen ..I am running this logstash -f logstash-jdbc.conf but that does not show any out put...so I dont know if its done or still running – Asim Zaidi Jun 07 '16 at 18:47
  • Yes, you can add an output like this `stdout { codec => "dots"}` and it will show one dot per event sent. – Val Jun 07 '16 at 19:57
11

This will not create duplicate data. and compatible logstash 6x.

# YOUR_DATABASE_NAME : test
# FIRST_TABLE :  place  
# SECOND_TABLE :  things    
# SET_DATA_INDEX : test_index_1, test_index_2

input {
    jdbc {
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "/mysql-connector-java-5.1.44-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        # Postgres jdbc connection string to our database, YOUR_DATABASE_NAME
        jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
        # The user we wish to execute our statement as
        jdbc_user => "root"
        jdbc_password => ""
        schedule => "* * * * *"
        statement => "SELECT  @slno:=@slno+1 aut_es_1, es_qry_tbl.* FROM (SELECT * FROM `place`) es_qry_tbl, (SELECT @slno:=0) es_tbl"
        type => "place"
        add_field => { "queryFunctionName" => "getAllDataFromFirstTable" }
        use_column_value => true
        tracking_column => "aut_es_1"
    }

    jdbc {
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "/mysql-connector-java-5.1.44-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        # Postgres jdbc connection string to our database, YOUR_DATABASE_NAME
        jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
        # The user we wish to execute our statement as
        jdbc_user => "root"
        jdbc_password => ""
        schedule => "* * * * *"
        statement => "SELECT  @slno:=@slno+1 aut_es_2, es_qry_tbl.* FROM (SELECT * FROM `things`) es_qry_tbl, (SELECT @slno:=0) es_tbl"
        type => "things"
        add_field => { "queryFunctionName" => "getAllDataFromSecondTable" }
        use_column_value => true
        tracking_column => "aut_es_2"
    } 
}

# install uuid plugin 'bin/logstash-plugin install logstash-filter-uuid'
# The uuid filter allows you to generate a UUID and add it as a field to each processed event.

filter {

    mutate {
            add_field => {
                    "[@metadata][document_id]" => "%{aut_es_1}%{aut_es_2}"
            }
    }

    uuid {
        target    => "uuid"
        overwrite => true
    }    
}

output {
    stdout {codec => rubydebug}
    if [type] == "place" {
        elasticsearch {
            hosts => "localhost:9200"
            index => "test_index_1_12"
            #document_id => "%{aut_es_1}"
            document_id => "%{[@metadata][document_id]}"
        }
    }
    if [type] == "things" {
        elasticsearch {
            hosts => "localhost:9200"
            index => "test_index_2_13"
            document_id => "%{[@metadata][document_id]}"
            # document_id => "%{aut_es_2}"
            # you can set document_id . otherwise ES will genrate unique id. 
        }
    }
}
Gobinda Nandi
  • 457
  • 7
  • 18
  • I am trying to achieve something similar from multiple JDBC block. Somone can please help me to get the table value returned from first JDBC block? jdbc { ... statement => "select * form users" ... } jdbc { ... statement => "select * form customer where user_id = '%{users.id}'" // How to achieve this? ... } – nitin7805 Apr 13 '20 at 14:58
1

If you need to run more than one pipeline in the same process, Logstash provides a way to do this through a configuration file called pipelines.yml and using multiple pipelines

multiple pipeline

Using multiple pipelines is especially useful if your current configuration has event flows that don’t share the same inputs/filters and outputs and are being separated from each other using tags and conditionals.

more helpfull resource

zabusa
  • 2,520
  • 21
  • 25