1

Im new to beam and Google dataflow, I've created a simple class to migrate data from cloud sql to Elastic Search using batch processing by writing this class below:

package com.abc;

class DataFlowTest{

    public static void main(String[] args) {

    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject("staging");  options.setTempLocation("gs://csv_to_sql_staging/temp");    options.setStagingLocation("gs://csv_to_sql_staging/staging");  options.setRunner(DataflowRunner.class);    options.setGcpTempLocation("gs://csv_to_sql_staging/temp");
            Pipeline p = Pipeline.create(options);


      p.begin();

      p.apply(JdbcIO.read().withQuery("select * from user_table").withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://"+EnvironmentVariable.getDatabaseIp()+"/" + EnvironmentVariable.getDatabaseName()+ "&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user="+Credentials.getDatabaseUsername()+"&password="+Credentials.getDatabasePassword()+"&useSSL=false")));


  Write w = ElasticsearchIO.write().withConnectionConfiguration(
            ElasticsearchIO.ConnectionConfiguration.create(new String [] {"host"}, "user-temp", "String").withUsername("elastic").withPassword("password")
            );
   p.apply(w);

p.run().waitUntilFinish(); } }

and find below my dependnecies in pom.xml


<dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>2.19.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>2.19.0</version>

      <exclusions>
      <exclusion>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      </exclusion>

      <exclusion>
      <groupId>com.google.http-client</groupId>
      <artifactId>google-http-client</artifactId>
      </exclusion>

      <exclusion>
      <groupId>com.google.http-client</groupId>
      <artifactId>google-http-client-jackson2</artifactId>
      </exclusion>


      </exclusions>

    </dependency>

    <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
    <version>2.19.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-jdbc</artifactId>
        <version>2.19.0</version>
    </dependency>



    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>2.19.0</version>
      <exclusions>
      <exclusion>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-core</artifactId>
      </exclusion>

      </exclusions>

    </dependency>

and now the problem is a compilation error it says:

The method apply(PTransform) in the type Pipeline is not applicable for the arguments (ElasticsearchIO.Write) At that line : p.apply(w);

Any one can help on that plz? I have done some exclusion in the pom file to fix some dependceies conflicts

Tamer Saleh
  • 473
  • 9
  • 21
  • Have you tried to invoke `PTransform` replacing `Write w = ElasticsearchIO.write()...p.apply(w)` with `p.apply(ElasticsearchIO.write()...)`? – Nick_Kh Feb 12 '20 at 14:32
  • yes and same result still – Tamer Saleh Feb 13 '20 at 05:54
  • I'm trying to understand your use-case, let me know in case my understanding is wrong. So you are trying to read data from cloudSql and write that result data to Elastic Search. right? If this is the case then I can help you with that. – miles212 Feb 13 '20 at 11:44
  • yes, plz thats exactly what im trying to do, I've passed this error and I'm now facing another problem which ;'ve posted here https://stackoverflow.com/questions/60197434/how-to-connect-to-cloud-sql-from-google-dataflow/60206404#60206404 – Tamer Saleh Feb 13 '20 at 13:42
  • @Tamer Saleh, if you have already fixed the current problem, then it is a good habit to write the answer in order to help next contributors researching the solution for the similar problems. – Nick_Kh Feb 17 '20 at 07:40

1 Answers1

1

It's not possible to directly apply ElasticSearchIO.write to pipeline object. First create a PCollection and then apply ElasticsearchIO to PCollection. Please refer the below code.

PCollection<String> sqlResult1 = p.apply(
                JdbcIO.<String>read().withDataSourceConfiguration(config).withQuery("select * from test_table")
                        .withCoder(StringUtf8Coder.of()).withRowMapper(new JdbcIO.RowMapper<String>() {

                            private static final long serialVersionUID = 1L;

                            public String mapRow(ResultSet resultSet) throws Exception {
                                StringBuilder val = new StringBuilder();
                                return val.append(resultSet.getString(0)).append(resultSet.getString(1)).toString();
                                // return KV.of(resultSet.getString(1), resultSet.getString(2));
                            }
                        }));

        sqlResult1.apply(ElasticsearchIO.write().withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration
                .create(new String[] { "https://host:9243" }, "user-temp", "String").withUsername("").withPassword("")));

I think the above piece of code should work in your use-case.

miles212
  • 383
  • 3
  • 20