I have logs which I am trying to push to Google BigQuery. I am trying to build the entire pipeline using google dataflow. The log structure is different and can be classified into four different type. In my pipeline I read logs from PubSub parse it and write to BigQuery table. The table to which the logs need to written is depending on one parameter in logs. The problem is I am stuck on a point where how to change TableName for BigQueryIO.Write at runtime.
Asked
Active
Viewed 1,382 times
2
-
Looks similar to: http://stackoverflow.com/questions/30431840/writing-results-of-google-dataflow-pipeline-into-mulitple-sinks, please check it out, it might help you. – robosoul Dec 28 '16 at 07:35
-
@nikhil sharma - do you really need to hand-roll this solution yourself? Have you looked at using something like Fluentd instead? http://www.fluentd.org/ – Graham Polley Dec 28 '16 at 08:20
-
@Graham thanks a lot for answering. I need exactly like http://stackoverflow.com/questions/35979421/dynamic-table-name-when-writing-to-bq-from-dataflow-pipelines but I am sorry to know that this is not supported in dataflow. I have not tried Fluentd we are trying to implement a pipeline in dataflow and anything else. Can you tell me does Google has any plans of including this feature is dataflow because this is very much need for all our pipelines. Also if I can contribute by implementing this feature I would be more then happy to do it. Please tell me if I can. – nikhil sharma Dec 28 '16 at 16:17
-
@nikhilsharma It is planned https://issues.apache.org/jira/browse/BEAM-92 however AFAIK nobody is currently working on this. – jkff Dec 28 '16 at 18:30
1 Answers
3
You can use side outputs.
https://cloud.google.com/dataflow/model/par-do#emitting-to-side-outputs-in-your-dofn
The following sample code, reads a BigQuery table and splits it in 3 different PCollections. Each PCollections ends up sent to a different Pub/Sub topic (which could be different BigQuery tables instead).
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
final TupleTag<String> readings2010 = new TupleTag<String>() {
};
final TupleTag<String> readings2000plus = new TupleTag<String>() {
};
final TupleTag<String> readingsOld = new TupleTag<String>() {
};
PCollectionTuple collectionTuple = weatherData.apply(ParDo.named("tablerow2string")
.withOutputTags(readings2010, TupleTagList.of(readings2000plus).and(readingsOld))
.of(new DoFn<TableRow, String>() {
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
if (c.element().getF().get(2).getV().equals("2010")) {
c.output(c.element().toString());
} else if (Integer.parseInt(c.element().getF().get(2).getV().toString()) > 2000) {
c.sideOutput(readings2000plus, c.element().toString());
} else {
c.sideOutput(readingsOld, c.element().toString());
}
}
}));
collectionTuple.get(readings2010)
.apply(PubsubIO.Write.named("WriteToPubsub1").topic("projects/fh-dataflow/topics/bq2pubsub-topic1"));
collectionTuple.get(readings2000plus)
.apply(PubsubIO.Write.named("WriteToPubsub2").topic("projects/fh-dataflow/topics/bq2pubsub-topic2"));
collectionTuple.get(readingsOld)
.apply(PubsubIO.Write.named("WriteToPubsub3").topic("projects/fh-dataflow/topics/bq2pubsub-topic3"));
p.run();

Felipe Hoffa
- 54,922
- 16
- 151
- 325
-
Even with side outputs, wouldn't you still need to define the sink name (e.g. BigQueryIO) before the pipeline ran i.e. when defining the pipeline at compile time? My understanding is that the OP wants to dynamically write to a BigQuery table - the name of which is only known at run time. I don't see how this is possible. Maybe I am missing something obvious! – Graham Polley Dec 29 '16 at 02:17
-
Oh, right. Wondering if "The table to which the logs need to written is depending on one parameter in logs" means that the name needs to be dynamic or one of a known set. Hopefully @nikhil sharma can comment. – Felipe Hoffa Dec 29 '16 at 02:21
-
1I think it's a set because "The log structure is different and can be classified into four different type." - so this would allow them to separate the logs into 4 different tables. – Felipe Hoffa Dec 29 '16 at 02:22
-
Hmm. I'm not sure. OP states "The problem is I am stuck on a point where how to change TableName for BigQueryIO.Write at **runtime**". Also, OP has confirmed that http://stackoverflow.com/questions/35979421/dynamic-table-name-when-writing-to-bq-from-dataflow-pipelines is what they need (which is not currently possible in Dataflow). But, let's confirm Felipe! If the names are set, then yes, it's easy peasy using side outputs. @nikhil sharma - can you clarify please? :-) – Graham Polley Dec 29 '16 at 02:37
-
I have four tables to which I have to write logs to based on logtype parameter. The flow is: 1) Stream logs to PubSub 2) Read from PubSub and parse logs 3) Get the table name based on logType 4) Write to BigQuery table and all of this should be on real time i.e. we are trying to stream logs from PubSub to BigQuery – nikhil sharma Dec 29 '16 at 05:06
-
then this solution will work - split the PCollection in 4 by type, send each resulting PCollection to one of the 4 tables – Felipe Hoffa Dec 29 '16 at 05:15
-
@Graham and Felipe thanks a lot for the answer. This approach worked for me! I will soon post my code. Thanks – nikhil sharma Jan 05 '17 at 16:11
-