I would like to implement a very simple beam pipeline:
read google storage links to text files from PubSub topic->read each text line by line->write to BigQuery.
Apache Beam has pre-implemented PTransform for each process.
So pipeline would be:
Pipeline | ReadFromPubSub("topic_name") | ReadAllFromText() | WriteToBigQuery("table_name")
However, ReadAllFromText
() blocks the pipeline somehow. Creating custom PTransform which return the random line after reading from PubSub and writing it to BigQuery table works normally (no blocking). Adding the fixed window of 3 seconds or triggering each element doesn't solve the problem either.
Each file is around 10MB and 23K lines.
Unfortunately, I cannot find the documentation about how ReadAllFromText
is supposed to work. It would be really weird if it tries to block the pipeline until reading all the files. And I would expect the function to push each line to the pipeline as soon as it reads the line.
Is there any known reason for the above behavior? Is it a bug or am I doing something wrong?
Pipeline code:
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromPubSub(subscription=source_dict["input"]) \
| 'window' >> beam.WindowInto(window.FixedWindows(3, 0)) \
| ReadAllFromText(skip_header_lines=1)
elements = lines | beam.ParDo(SplitPayload())
elements | WriteToBigQuery(source_dict["output"], write_disposition=BigQueryDisposition.WRITE_APPEND)
.
.
.
class SplitPayload(beam.DoFn):
def process(self, element, *args, **kwargs):
timestamp, id, payload = element.split(";")
return [{
'timestamp': timestamp,
'id': id,
'payload': payload
}]