I'm trying to figure out how to load a CSV file from GCS into BigQuery. Pipeline below:
// Create the pipeline
Pipeline p = Pipeline.create(options);
// Create the PCollection from csv
PCollection<String> lines = p.apply(TextIO.read().from("gs://impression_tst_data/incoming_data.csv"));
// Transform into TableRow
PCollection<TableRow> row = lines.apply(ParDo.of(new StringToRowConverter()));
// Write table to BigQuery
row.apply(BigQueryIO.<TableRow>writeTableRows()
.to(“project_id:dataset.table”)
.withSchema(getSchema())
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
Here is the StringToRowConverter class I'm using in the ParDo to create a TableRow PCollection:
// StringToRowConverter
static class StringToRowConverter extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TableRow().set("string_field", c.element()));
}
}
Looking at the staging files it looks like this creates TableRows of JSON that lump the csv into a single column named "string_field". If I don't define string_field in my schema the job fails. When I do define string_field, it writes the each row of the CSV into the column and leaves all my other columns defined in the schema empty. I know this is expected behavior.
So my question: How do I take this JSON output and write it into the schema? Sample output and schema below...
"string_field": "6/26/17 21:28,Dave Smith,1 Learning Drive,867-5309,etc"}
Schema:
static TableSchema getSchema() {
return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
// Compose the list of TableFieldSchema from tableSchema.
{
add(new TableFieldSchema().setName("Event_Time").setType("TIMESTAMP"));
add(new TableFieldSchema().setName("Name").setType("STRING"));
add(new TableFieldSchema().setName("Address").setType("STRING"));
add(new TableFieldSchema().setName("Phone").setType("STRING"));
add(new TableFieldSchema().setName("etc").setType("STRING"));
}
});
}
Is there a better way of doing this than using the StringToRowConverter?
I need to use a ParDo to create a TableRow PCollection before I can write it out to BQ. However, I'm unable to find a solid example of how to take in a CSV PCollection, transform to TableRow and write it out.
Yes, I am a noob trying to learn here. I'm hoping somebody can help me with a snippet or point me in the right direction on the easiest way to accomplish this. Thanks in advance.