We are newly implementing DataWareHouse on Google bigquery and all our sources are on prim databases. So we are using dataflow for ETL and Maven with the Apache Beam SDK in order to run a 30 pipelines on Google Cloud Dataflow service.
package com.google.cloud.teleport.templates;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.io.DynamicJdbcIO;
import com.google.cloud.teleport.templates.common.JdbcConverters;
import com.google.cloud.teleport.util.KMSEncryptedNestedValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
public class MToBQ {
private static ValueProvider<String> maybeDecrypt(
ValueProvider<String> unencryptedValue, ValueProvider<String> kmsKey) {
return new KMSEncryptedNestedValueProvider(unencryptedValue, kmsKey);
}
public static void main(String[] args) {
JdbcConverters.JdbcToBigQueryOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(JdbcConverters.JdbcToBigQueryOptions.class);
run(options);
}
private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
"source",
DynamicJdbcIOMiles.<TableRow>read()
.withDataSourceConfiguration(
DynamicJdbcIOMiles.DynamicDataSourceConfiguration.create(
options.getDriverClassName(),
maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
.withUsername(
maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
.withPassword(
maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
.withDriverJars(options.getDriverJars())
.withConnectionProperties(options.getConnectionProperties()))
.withQuery("select * from abcc")
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow()))
.apply(
"Target",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to("dev-27:staging.STG_ABC"));
pipeline
.apply(
"SOURCE",
DynamicJdbcIOMiles.<TableRow>read()
.withDataSourceConfiguration(
DynamicJdbcIOMiles.DynamicDataSourceConfiguration.create(
options.getDriverClassName(),
maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
.withUsername(
maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
.withPassword(
maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
.withDriverJars(options.getDriverJars())
.withConnectionProperties(options.getConnectionProperties()))
.withQuery("SELECT * FROM XYZ")
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow()))
.apply(
"TARGET",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to("dev-27:staging.STG_XYZ"));
return pipeline.run();
}
}
if the data in the tables is less, then it is running successfully. If the data is in millions, it is throwing error like below
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out
To compile and run the main method of the Java class with arguments, I am executing the following command.
mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.teleport.templates.MToBQ \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project= dev-27 \
--region=australia-southeast1 \
--workerMachineType=n1-highmem-8 \
--workerDiskType=compute.googleapis.com/projects/dev-27/zones/australia-southeast1-c/diskTypes/pd-ssd \
--diskSizeGb=50 \
--stagingLocation=gs://dev-dataset/Data/stagingCustomDataFlow/MToBQ \
--tempLocation=gs://dev-dataset/Data/temp \
--templateLocation=gs://dev-dataset/Data/templatesCustomDataFlow/MToBQ/MToBQ.json \
--experiments=upload_graph \
--runner=DataflowRunner" **
Kindly let me know if I am it right. what are the right arguments and can dataflow execute multiple pipelines parallel?