0

I am trying to connect to an AWS RDS MySQL instance from Google Dataflow. I created a java program to create the pipeline. The job creates successfully but the MySQL connection always fails with the following error:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
    at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:338)
    at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:308)
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
    at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
    at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:154)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:308)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:264)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:133)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:113)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:100)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeSetup(Unknown Source)
    at com.google.cloud.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:63)
    at com.google.cloud.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:45)
    at com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:94)
    at com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
    at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:415)
    at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:326)
    ... 14 more
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException:
Communications link failure
Caused by: java.net.SocketTimeoutException: connect timed out

The JAVA source code is below:

public class MySQLToBQ {
    public static void main(String[] args) throws Exception {
        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject("project_name");
        options.setStagingLocation("gs://staging");
        options.setTempLocation("gs://temp");
        options.setRunner(DataflowRunner.class);
        options.setJobName("MySQL-To-BQ-" + new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()));
        System.out.println("Job Name " + options.getJobName());
        Pipeline p = Pipeline.create(options);

        DataSourceConfiguration mysqlConfig = JdbcIO.DataSourceConfiguration.create(
                "com.mysql.cj.jdbc.Driver", "jdbc:mysql://mysql_host:3306/mysql_database")
                .withUsername("user")
                .withPassword("password");

        p.apply("mysql_source", JdbcIO.<SourceRow>read()
            .withDataSourceConfiguration(mysqlConfig)
            .withQuery("query")
            .withCoder(SerializableCoder.of(SourceRow.class))
            .withRowMapper(new JdbcIO.RowMapper<SourceRow>() {
                    @Override
                    public SourceRow mapRow(ResultSet resultSet) throws Exception {
                        SourceRow datarow = new SourceRow();
                        ResultSetMetaData rsmd = resultSet.getMetaData();
                        for(int i = 1; i <= rsmd.getColumnCount(); i++) {
                            datarow.add(rsmd.getColumnName(i), resultSet.getString(i));
                        }
                    return datarow;
                    }
                }
            )
        )
        .apply(table + "_transform", ParDo.of(new TransformToTableRow()))
        .apply(table + "_destination", BigQueryIO.writeTableRows()
            .to("table_name")
            .withSchema(getSchema())
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        );

        p.run();
    }
}

I was able to create a Compute Engine VM instance and successfully connect to the MySQL database from there.

A Kong
  • 3
  • 1
  • 3
  • The last part of the log is "Caused by: java.net.SocketTimeoutException: connect timed out" – A Kong May 04 '18 at 17:41
  • consider adding source code? – JDOaktown May 04 '18 at 17:56
  • I don't think the JAVA source code is relevant. This is more of a Google Dataflow question. The job was created successfully but cannot connect to the MySQL instance. I verified that I can connect to it manually from a Computer Engine VM instance. – A Kong May 04 '18 at 18:40
  • Apologies @JDOaktown. I realize the source code is useful to help debug and have added it. Thanks. – A Kong May 08 '18 at 15:04
  • Are you using private IP addresses? If this is set then your dataflow workers will only be able to contact other google cloud services, in this case your JDBC connection tries to make requests outside of google cloud. If this is the case, it would explain your issue. https://cloud.google.com/vpc/docs/private-google-access Also, could you please post a job_id? – Alex Amato May 08 '18 at 21:06
  • Hi @AlexAmato I am using an external hostname – A Kong May 09 '18 at 21:44
  • Perhaps it worth seeing if you can ssh to a GCE instance and ping your jdbc instance. This seems like there might be something blocking the connection. Normally outbound connections are open IIRC. Did you setup an outbound firewall rules or networking tools? – Alex Amato May 14 '18 at 21:57
  • Have you checked the general troubleshooting for this error? https://stackoverflow.com/questions/2983248/com-mysql-jdbc-exceptions-jdbc4-communicationsexception-communications-link-fai I think @Alex did make the point that 7. firewall or proxy could be overlooked. – Jinjun Jun 01 '18 at 21:17
  • Hi @AlexAmato, the job_id is "2018-06-05_13_20_38-12074118063903631599" – A Kong Jun 06 '18 at 15:41

3 Answers3

0

On Dataflow you cannot whitelist an IP to enable Dataflow to access a SQL instance. I'm not sure for AWS RDS but for Cloud SQL so you should use JDBC socket factory instead https://cloud.google.com/sql/docs/mysql/connect-external-app#java

MonicaPC
  • 379
  • 4
  • 15
0

For Java, you can use public access and using this: https://github.com/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory.

Brachi
  • 637
  • 9
  • 17
0

Did you follow Connect to an Amazon Aurora DB cluster from outside a VPC to make the instance publicly accessible?

To connect to an Amazon Aurora DB cluster directly from outside the VPC, the instances in the cluster must meet the following requirements:
The DB instance must have a public IP address
The DB instance must be running in a publicly accessible subnet
As well as configuring your DB instance so that it can be connected to from outside a VPC, you can also secure the connections using Transport Layer Security (TLS), formerly known as Secure Sockets Layer (SSL).

You'll need to do that prior to being able to connect to it. Based on the code sample (jdbc:mysql://mysql_host:3306/mysql_database), it doesn't look like that's a public host.

Pat
  • 16,515
  • 15
  • 95
  • 114