3

According to the Beam website,

Often it is faster and simpler to perform local unit testing on your pipeline code than to debug a pipeline’s remote execution.

I want to use test-driven development for my Beam/Dataflow app that writes to Bigtable for this reason.

However, following the Beam testing documentation I get to an impasse--PAssert isn't useful because the output PCollection contains org.apache.hadoop.hbase.client.Put objects, which don't override the equals method.

I can't get the contents of the PCollection to do validation on them either, since

It is not possible to get the contents of a PCollection directly - an Apache Beam or Dataflow pipeline is more like a query plan of what processing should be done, with PCollection being a logical intermediate node in the plan, rather than containing the data.

So how can I test this pipeline, other than manually running it? I'm using Maven and JUnit (in Java since that's all the Dataflow Bigtable Connector seems to support).

Adair
  • 1,697
  • 18
  • 22

2 Answers2

8

The Bigtable Emulator Maven plugin can be used to write integration tests for this:

  • Configure the Maven Failsafe plugin and change your test case's ending from *Test to *IT to run as an integration test.
  • Install the Bigtable Emulator in the gcloud sdk on command line:

    gcloud components install bigtable   
    

    Note that this required step is going to reduce code portability (e.g. will it run on your build system? On other devs' machines?) so I'm going to containerize it using Docker before deploying to the build system.

  • Add the emulator plugin to the pom per the README

  • Use the HBase Client API and see the example Bigtable Emulator integration test to set up your session and table(s).

  • Write your test as normal per the Beam documentation, except instead of using PAssert actually call CloudBigtableIO.writeToTable and then use the HBase Client to read the data from the table to verify it.

Here's an example integration test:

package adair.example;

import static org.apache.hadoop.hbase.util.Bytes.toBytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.junit.Assert;
import org.junit.Test;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;

/**
 *  A simple integration test example for use with the Bigtable Emulator maven plugin.
 */
public class DataflowWriteExampleIT {

  private static final String PROJECT_ID = "fake";
  private static final String INSTANCE_ID = "fakeinstance";
  private static final String TABLE_ID = "example_table";
  private static final String COLUMN_FAMILY = "cf";
  private static final String COLUMN_QUALIFIER = "cq";

  private static final CloudBigtableTableConfiguration TABLE_CONFIG =
    new CloudBigtableTableConfiguration.Builder()
      .withProjectId(PROJECT_ID)
      .withInstanceId(INSTANCE_ID)
      .withTableId(TABLE_ID)
      .build();

  public static final List<String> VALUES_TO_PUT = Arrays
    .asList("hello", "world", "introducing", "Bigtable", "plus", "Dataflow", "IT");

  @Test
  public void testPipelineWrite() throws IOException {
    try (Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID)) {
      Admin admin = connection.getAdmin();
      createTable(admin);

      List<Mutation> puts = createTestPuts();

      //Use Dataflow to write the data--this is where you'd call the pipeline you want to test.
      Pipeline p = Pipeline.create();
      p.apply(Create.of(puts)).apply(CloudBigtableIO.writeToTable(TABLE_CONFIG));
      p.run().waitUntilFinish();

      //Read the data from the table using the regular hbase api for validation
      ResultScanner scanner = getTableScanner(connection);
      List<String> resultValues = new ArrayList<>();
      for (Result row : scanner) {
        String cellValue = getRowValue(row);
        System.out.println("Found value in table: " + cellValue);
        resultValues.add(cellValue);
      }

      Assert.assertThat(resultValues,
        IsIterableContainingInAnyOrder.containsInAnyOrder(VALUES_TO_PUT.toArray()));
    }
  }

  private void createTable(Admin admin) throws IOException {
    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(TABLE_ID));
    tableDesc.addFamily(new HColumnDescriptor(COLUMN_FAMILY));

    admin.createTable(tableDesc);
  }

  private ResultScanner getTableScanner(Connection connection) throws IOException {
    Scan scan = new Scan();
    Table table = connection.getTable(TableName.valueOf(TABLE_ID));
    return table.getScanner(scan);
  }

  private String getRowValue(Result row) {
    return Bytes.toString(row.getValue(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER)));
  }

  private List<Mutation> createTestPuts() {
    return VALUES_TO_PUT
          .stream()
          .map(this::stringToPut)
          .collect(Collectors.toList());
  }

  private Mutation stringToPut(String cellValue){
    String key = UUID.randomUUID().toString();
    Put put = new Put(toBytes(key));
    put.addColumn(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER), toBytes(cellValue));
    return put;
  }

}
Adair
  • 1,697
  • 18
  • 22
0

In Google Cloud you can do e2e testing of your Dataflow pipeline easily using real cloud resources like Pub/Sub topic and BigQuery tables.

By using Junit5 Extension Model (https://junit.org/junit5/docs/current/user-guide/#extensions) you can create custom classes that will handle the creation and deletion of the required resources for your pipeline.

You can find a demo/seed project here https://github.com/gabihodoroaga/dataflow-e2e-demo and a blog post here https://hodo.dev/posts/post-31-gcp-dataflow-e2e-tests/.