2

I'm trying to figure out how to load a CSV file from GCS into BigQuery. Pipeline below:

    // Create the pipeline
    Pipeline p = Pipeline.create(options);

    // Create the PCollection from csv
    PCollection<String> lines = p.apply(TextIO.read().from("gs://impression_tst_data/incoming_data.csv"));


    // Transform into TableRow
    PCollection<TableRow> row = lines.apply(ParDo.of(new StringToRowConverter()));


    // Write table to BigQuery
    row.apply(BigQueryIO.<TableRow>writeTableRows()
            .to(“project_id:dataset.table”)
            .withSchema(getSchema())
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));

Here is the StringToRowConverter class I'm using in the ParDo to create a TableRow PCollection:

// StringToRowConverter
static class StringToRowConverter extends DoFn<String, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        c.output(new TableRow().set("string_field", c.element()));
    }
}

Looking at the staging files it looks like this creates TableRows of JSON that lump the csv into a single column named "string_field". If I don't define string_field in my schema the job fails. When I do define string_field, it writes the each row of the CSV into the column and leaves all my other columns defined in the schema empty. I know this is expected behavior.

So my question: How do I take this JSON output and write it into the schema? Sample output and schema below...

"string_field": "6/26/17 21:28,Dave Smith,1 Learning Drive,867-5309,etc"}

Schema:

static TableSchema getSchema() {
            return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
                // Compose the list of TableFieldSchema from tableSchema.
                {
                    add(new TableFieldSchema().setName("Event_Time").setType("TIMESTAMP"));
                    add(new TableFieldSchema().setName("Name").setType("STRING"));
                    add(new TableFieldSchema().setName("Address").setType("STRING"));
                    add(new TableFieldSchema().setName("Phone").setType("STRING"));
                    add(new TableFieldSchema().setName("etc").setType("STRING"));
                }
            });
        }

Is there a better way of doing this than using the StringToRowConverter?

I need to use a ParDo to create a TableRow PCollection before I can write it out to BQ. However, I'm unable to find a solid example of how to take in a CSV PCollection, transform to TableRow and write it out.

Yes, I am a noob trying to learn here. I'm hoping somebody can help me with a snippet or point me in the right direction on the easiest way to accomplish this. Thanks in advance.

R. Gault
  • 72
  • 8

1 Answers1

1

The code in your StringToRowConverter DoFn should parse the string and produce a TableRow with multiple fields. Since each row is comma separated, this would likely involve splitting the string on commas, and then using your knowledge of the column order to do something like:

String inputLine = c.element();

// May need to make the line parsing more robust, depending on your
// files. Look at how to parse rows of a CSV using Java.
String[] split = inputLine.split(',');

// Also, you may need to handle errors such as not enough columns, etc.

TableRow output = new TableRow();
output.set("Event_Time", split[0]); // may want to parse the string
output.set("Name", split[1]);
...
c.output(output);
Ben Chambers
  • 6,070
  • 11
  • 16
  • Note also that if your CSV files contain a header in the first line, you'll need to manually skip the header - the PCollection of lines produced by TextIO is unordered, so there's no way to know which line was the "first line" anymore, so you need to filter it out somehow. See also https://issues.apache.org/jira/browse/BEAM-123, – jkff Jul 14 '17 at 18:22