2

We are newly implementing DataWareHouse on Google bigquery and all our sources are on prim databases. So we are using dataflow for ETL and Maven with the Apache Beam SDK in order to run a 30 pipelines on Google Cloud Dataflow service.

package com.google.cloud.teleport.templates; 
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.io.DynamicJdbcIO;
import com.google.cloud.teleport.templates.common.JdbcConverters;
import com.google.cloud.teleport.util.KMSEncryptedNestedValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;

public class MToBQ {

  private static ValueProvider<String> maybeDecrypt(
      ValueProvider<String> unencryptedValue, ValueProvider<String> kmsKey) {
    return new KMSEncryptedNestedValueProvider(unencryptedValue, kmsKey);
  }
  public static void main(String[] args) {
    JdbcConverters.JdbcToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(JdbcConverters.JdbcToBigQueryOptions.class);

    run(options);
  }
  private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
    Pipeline pipeline = Pipeline.create(options);
    pipeline
       
        .apply(
            "source",
            DynamicJdbcIOMiles.<TableRow>read()
                .withDataSourceConfiguration(
                    DynamicJdbcIOMiles.DynamicDataSourceConfiguration.create(
                            options.getDriverClassName(),
                            maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
                        .withUsername(
                            maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
                        .withPassword(
                            maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
                        .withDriverJars(options.getDriverJars())
                        .withConnectionProperties(options.getConnectionProperties()))
                .withQuery("select * from abcc")
                .withCoder(TableRowJsonCoder.of())
                .withRowMapper(JdbcConverters.getResultSetToTableRow()))
        .apply(
            "Target",
            BigQueryIO.writeTableRows()
                .withoutValidation()
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                .to("dev-27:staging.STG_ABC"));
 pipeline
        .apply(
            "SOURCE",
            DynamicJdbcIOMiles.<TableRow>read()
                .withDataSourceConfiguration(
                    DynamicJdbcIOMiles.DynamicDataSourceConfiguration.create(
                            options.getDriverClassName(),
                            maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
                        .withUsername(
                            maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
                        .withPassword(
                            maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
                        .withDriverJars(options.getDriverJars())
                        .withConnectionProperties(options.getConnectionProperties()))
                .withQuery("SELECT * FROM XYZ")
                .withCoder(TableRowJsonCoder.of())
                .withRowMapper(JdbcConverters.getResultSetToTableRow()))
 
        .apply(
            "TARGET",
            BigQueryIO.writeTableRows()
                .withoutValidation()
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                .to("dev-27:staging.STG_XYZ")); 
    return pipeline.run();
  }
}

if the data in the tables is less, then it is running successfully. If the data is in millions, it is throwing error like below

org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out

To compile and run the main method of the Java class with arguments, I am executing the following command.

mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.teleport.templates.MToBQ \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project= dev-27 \
--region=australia-southeast1 \
--workerMachineType=n1-highmem-8 \
--workerDiskType=compute.googleapis.com/projects/dev-27/zones/australia-southeast1-c/diskTypes/pd-ssd \
--diskSizeGb=50 \
--stagingLocation=gs://dev-dataset/Data/stagingCustomDataFlow/MToBQ \
--tempLocation=gs://dev-dataset/Data/temp \
--templateLocation=gs://dev-dataset/Data/templatesCustomDataFlow/MToBQ/MToBQ.json \
--experiments=upload_graph \
--runner=DataflowRunner" **

Kindly let me know if I am it right. what are the right arguments and can dataflow execute multiple pipelines parallel?

Jaroslav
  • 724
  • 4
  • 17
Jambal
  • 65
  • 6
  • Is the pipeline stopping or throwing many exceptions like the above? – bigbounty Jul 09 '20 at 08:49
  • Where is your data located? What is your goal? Transfer your data from your database to BigQuery? – rmesteves Jul 09 '20 at 09:16
  • @rmesteves data is located in sql server and want to transfer to BQ. – Jambal Jul 09 '20 at 09:54
  • @bigbounty it is throwing exceptions and failing after sometime. – Jambal Jul 09 '20 at 09:54
  • @Jambal Do you need to load only once or will you update data constantly? If you doing a single load one simpler approach is exporting a file in your SQL Server and loading it on BigQuery – rmesteves Jul 09 '20 at 10:26
  • @rmesteves Its a every day datawarehouse load. – Jambal Jul 09 '20 at 10:32
  • In this case, have you taken a look at BigQuery Transfer service (https://cloud.google.com/bigquery-transfer/docs/transfer-service-overview) – rmesteves Jul 09 '20 at 10:45
  • WIth BQ Transfer Service you can use partner transfer sources like Fivetran (https://fivetran.com/blog/dts-announcement) that does exactly what you want. This would be the plug and play solution for this case – rmesteves Jul 09 '20 at 10:48
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/217527/discussion-between-rmesteves-and-jambal). – rmesteves Jul 09 '20 at 12:06

0 Answers0