Im new to beam and Google dataflow, I've created a simple class to migrate data from cloud sql to Elastic Search using batch processing by writing this class below:
package com.abc;
class DataFlowTest{
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("staging"); options.setTempLocation("gs://csv_to_sql_staging/temp"); options.setStagingLocation("gs://csv_to_sql_staging/staging"); options.setRunner(DataflowRunner.class); options.setGcpTempLocation("gs://csv_to_sql_staging/temp");
Pipeline p = Pipeline.create(options);
p.begin();
p.apply(JdbcIO.read().withQuery("select * from user_table").withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://"+EnvironmentVariable.getDatabaseIp()+"/" + EnvironmentVariable.getDatabaseName()+ "&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user="+Credentials.getDatabaseUsername()+"&password="+Credentials.getDatabasePassword()+"&useSSL=false")));
Write w = ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(new String [] {"host"}, "user-temp", "String").withUsername("elastic").withPassword("password")
);
p.apply(w);
p.run().waitUntilFinish(); } }
and find below my dependnecies in pom.xml
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.19.0</version>
<exclusions>
<exclusion>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-jackson2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-elasticsearch</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.19.0</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</exclusion>
</exclusions>
</dependency>
and now the problem is a compilation error it says:
The method apply(PTransform) in the type Pipeline is not applicable for the arguments (ElasticsearchIO.Write) At that line : p.apply(w);
Any one can help on that plz? I have done some exclusion in the pom file to fix some dependceies conflicts