1

The requirement is to read the latest updated records from the BQ and load in to CloudSQL:

Here is are the steps executed,

  1. Read BQ table records which is greater than LAST_UPD_TS. PCollection read_from_bq = pipeline.apply("read from bq", BigQueryIO.readTableRows() .withTemplateCompatibility() .fromQuery("select * from pdata.DEPT WHERE LAST_UPD_TS >='parametervalue'";) .withoutValidation().usingStandardSql());

  2. the records will be inserts in to CloudSQL read_from_bq.apply("Insert and Update", JdbcIO.write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider)) .withStatement("insert into DEPT (LOC_DIM_ID,DIVN_NBR,DEPT_NBR,END_DT,START_DT,PRC_OPT_CD,PRN_LVL_CD,PRICE_LOC_NBR,LAST_UPD_TS,LAST_UPD_USERID)" + "values( ?,?,?,?,?,?,?,?,?,?)" + "ON DUPLICATE KEY UPDATE START_DT=?,PRC_OPT_CD=?,PRN_LVL_CD=?,PRICE_LOC_NBR=?,LAST_UPD_TS=?,LAST_UPD_USERID=?") .withPreparedStatementSetter(new DEPT_BULKPreparedStatementSetters()) ); PipelineResult.State state = pipeline.run().waitUntilFinish();

The above code perfectly executed without any issues

The Requirement is, first need to read the LAST_UPD_TS coulmn from CloudSQL table existing records and set as a parameter value to BigQuery query in the pipeline, its giving the null pointer exception while pinting the column using System.out.println(tableRowTypedRead.getTable().get("loc_dim")); in the code below, how do we pass the LAST_UPD_TS value fetched from the CloudSQL as first step in pipeline as a inline parameter to BQ query in the same pipeline?

       PCollection<String> last_upd_td = pipeline.apply("get latest ts", JdbcIO.<String>read()
                        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider))
                        .withQuery("select MAX(last_upd_ts) AS last_upd_ts from DEPT")
                        .withCoder(AvroCoder.of(String.class))
                        .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
                            return resultSet.getString("last_upd_ts");
                        })
        );
        
        PCollection<BigQueryIO.TypedRead<TableRow>> bq = last_upd_td.apply(ParDo.of(new DoFn<String, BigQueryIO.TypedRead<TableRow>>() {
            @ProcessElement
            public void processElement(ProcessContext c, @Element String last_upd_td ) throws Exception {
                BQ_TO_CSQL_Options as = c.getPipelineOptions().as(BQ_TO_CSQL_Options.class);
                BigQueryIO.TypedRead<TableRow> tableRowTypedRead = BigQueryIO.readTableRows()
                        .withTemplateCompatibility()
                        .fromQuery("select * from `pdata.DEPT` WHERE LAST_UPD_TS >= " + last_upd_td))
                        .withoutValidation().usingStandardSql();
                        System.out.println(tableRowTypedRead.getTable().get("loc_dim"));
                c.output(tableRowTypedRead);
            }
        }));

        bq.apply("Insert and Update", JdbcIO.<BigQueryIO.TypedRead<TableRow>>write()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider))
                .withStatement("insert into DEPT (LOC_DIM_ID,DIVN_NBR,DEPT_NBR,END_DT,START_DT,PRC_OPT_CD,PRN_LVL_CD,PRICE_LOC_NBR,LAST_UPD_TS,LAST_UPD_USERID)" +
                        "values( ?,?,?,?,?,?,?,?,?,?)" +
                        "ON DUPLICATE KEY UPDATE START_DT=?,PRC_OPT_CD=?,PRN_LVL_CD=?,PRICE_LOC_NBR=?,LAST_UPD_TS=?,LAST_UPD_USERID=?")
                .withPreparedStatementSetter(new DEPT_BULKPreparedStatementSetters())
        );

        PipelineResult.State state = pipeline.run().waitUntilFinish();
Akshay
  • 11
  • 2
  • Hi @Akshay were you able to solve this problem…? Am facing similar issue like trying to pass a value from pcollection to query parameters. – Nag Oct 29 '22 at 23:00

0 Answers0