The requirement is to read the latest updated records from the BQ and load in to CloudSQL:
Here is are the steps executed,
Read BQ table records which is greater than LAST_UPD_TS. PCollection read_from_bq = pipeline.apply("read from bq", BigQueryIO.readTableRows() .withTemplateCompatibility() .fromQuery("select * from
pdata.DEPT
WHERE LAST_UPD_TS >='parametervalue'";) .withoutValidation().usingStandardSql());the records will be inserts in to CloudSQL read_from_bq.apply("Insert and Update", JdbcIO.write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider)) .withStatement("insert into DEPT (LOC_DIM_ID,DIVN_NBR,DEPT_NBR,END_DT,START_DT,PRC_OPT_CD,PRN_LVL_CD,PRICE_LOC_NBR,LAST_UPD_TS,LAST_UPD_USERID)" + "values( ?,?,?,?,?,?,?,?,?,?)" + "ON DUPLICATE KEY UPDATE START_DT=?,PRC_OPT_CD=?,PRN_LVL_CD=?,PRICE_LOC_NBR=?,LAST_UPD_TS=?,LAST_UPD_USERID=?") .withPreparedStatementSetter(new DEPT_BULKPreparedStatementSetters()) ); PipelineResult.State state = pipeline.run().waitUntilFinish();
The above code perfectly executed without any issues
The Requirement is, first need to read the LAST_UPD_TS coulmn from CloudSQL table existing records and set as a parameter value to BigQuery query in the pipeline, its giving the null pointer exception while pinting the column using System.out.println(tableRowTypedRead.getTable().get("loc_dim")); in the code below, how do we pass the LAST_UPD_TS value fetched from the CloudSQL as first step in pipeline as a inline parameter to BQ query in the same pipeline?
PCollection<String> last_upd_td = pipeline.apply("get latest ts", JdbcIO.<String>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider))
.withQuery("select MAX(last_upd_ts) AS last_upd_ts from DEPT")
.withCoder(AvroCoder.of(String.class))
.withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
return resultSet.getString("last_upd_ts");
})
);
PCollection<BigQueryIO.TypedRead<TableRow>> bq = last_upd_td.apply(ParDo.of(new DoFn<String, BigQueryIO.TypedRead<TableRow>>() {
@ProcessElement
public void processElement(ProcessContext c, @Element String last_upd_td ) throws Exception {
BQ_TO_CSQL_Options as = c.getPipelineOptions().as(BQ_TO_CSQL_Options.class);
BigQueryIO.TypedRead<TableRow> tableRowTypedRead = BigQueryIO.readTableRows()
.withTemplateCompatibility()
.fromQuery("select * from `pdata.DEPT` WHERE LAST_UPD_TS >= " + last_upd_td))
.withoutValidation().usingStandardSql();
System.out.println(tableRowTypedRead.getTable().get("loc_dim"));
c.output(tableRowTypedRead);
}
}));
bq.apply("Insert and Update", JdbcIO.<BigQueryIO.TypedRead<TableRow>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider))
.withStatement("insert into DEPT (LOC_DIM_ID,DIVN_NBR,DEPT_NBR,END_DT,START_DT,PRC_OPT_CD,PRN_LVL_CD,PRICE_LOC_NBR,LAST_UPD_TS,LAST_UPD_USERID)" +
"values( ?,?,?,?,?,?,?,?,?,?)" +
"ON DUPLICATE KEY UPDATE START_DT=?,PRC_OPT_CD=?,PRN_LVL_CD=?,PRICE_LOC_NBR=?,LAST_UPD_TS=?,LAST_UPD_USERID=?")
.withPreparedStatementSetter(new DEPT_BULKPreparedStatementSetters())
);
PipelineResult.State state = pipeline.run().waitUntilFinish();