1

I would like to implement a very simple beam pipeline:

read google storage links to text files from PubSub topic->read each text line by line->write to BigQuery.

Apache Beam has pre-implemented PTransform for each process.

So pipeline would be:

Pipeline | ReadFromPubSub("topic_name") | ReadAllFromText() | WriteToBigQuery("table_name")

However, ReadAllFromText() blocks the pipeline somehow. Creating custom PTransform which return the random line after reading from PubSub and writing it to BigQuery table works normally (no blocking). Adding the fixed window of 3 seconds or triggering each element doesn't solve the problem either.

Each file is around 10MB and 23K lines.

Unfortunately, I cannot find the documentation about how ReadAllFromText is supposed to work. It would be really weird if it tries to block the pipeline until reading all the files. And I would expect the function to push each line to the pipeline as soon as it reads the line.

Is there any known reason for the above behavior? Is it a bug or am I doing something wrong?

Pipeline code:

pipeline_options = PipelineOptions(pipeline_args)
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromPubSub(subscription=source_dict["input"]) \
                | 'window' >> beam.WindowInto(window.FixedWindows(3, 0)) \
                | ReadAllFromText(skip_header_lines=1)

            elements = lines | beam.ParDo(SplitPayload())

            elements | WriteToBigQuery(source_dict["output"], write_disposition=BigQueryDisposition.WRITE_APPEND)
    .
    .
    .
    class SplitPayload(beam.DoFn):
        def process(self, element, *args, **kwargs):

            timestamp, id, payload = element.split(";")

            return [{
                'timestamp': timestamp,
                'id': id,
                'payload': payload
            }]
marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
korujzade
  • 400
  • 4
  • 23
  • Can you post an [MCVE](https://stackoverflow.com/help/mcve)? I tried to reproduce the pipeline you described very minimally with something like (pipeline | beam.io.ReadFromPubSub(topic) | beam.io.textio.ReadAllFromText() | beam.Map(lambda x: {'i': x}) | beam.io.WriteToBigQuery(output, schema='i:STRING')) And ReadAllFromText doesn't block the streaming. Are you doing any other processing to your data that could block it? Make sure to pass the "--streaming" flag too. – rilla Sep 04 '18 at 10:22
  • @rilla please check the edited answer. I am currently testing by using a direct runner. If I publish one GS link of a text file to the topic and read it with `ReadFromPubSub` it takes around 10 seconds to start to write lines from this file to a BigQuery table. When I publish 20 files and then start to read, it takes around 30 seconds to start to write to the table. When I try 360 files, it takes around 90 seconds to start to write to the table. – korujzade Sep 07 '18 at 12:40
  • So is it normal behavior that it takes more time for `ReadAllFromText` to start to send lines to `WriteToBigQuery` when the number of `PCollections` (GS links from `ReadFromPubSub`) increases? Rn, I cannot test with DataFlow Runner. Could it be because of a direct runner? – korujzade Sep 07 '18 at 12:52
  • If I change the scenario to `ReadFromPubSub | WriteToBigQuery`, the above issue does not occur and it starts to write to the table immediately. That supports my assumption that `ReadAllFromText` actually blocks a pipeline. – korujzade Sep 09 '18 at 11:00
  • Have you tried ReadFromText instead of ReadAllFromText? Seems like ReadAllFromText is optimized for larger files than your usecase [(1)](https://stackoverflow.com/a/45942931/9908267). – Yurci Sep 10 '18 at 14:59
  • @Yurci `ReadFromText` function takes a source file as a string parameter but not PCollection. However, I receive links to gs source files from `ReadFromPubSub` which gives PCollections. I suppose `ReadFromText` would be useful in a batch mode as it can be used as an input adapter. – korujzade Sep 10 '18 at 15:06
  • @Yurci Also, it is mentioned in the referenced answer that `ReadAllFromText` is for reading a very large number of files, which is exactly what it is in my situation. – korujzade Sep 11 '18 at 13:29
  • You can find the code for ReadAllFromText [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L413) and the documentation for reading input data [here](https://beam.apache.org/documentation/programming-guide/#pipeline-io) – Yurci Sep 12 '18 at 13:55
  • I have already read how reading from input data with `ReadFromText` works and it is as I have already mentioned above: takes the external source as a file-pattern parameter and return a PCollection for each line. And looking at the code of `ReadAllFromText` doesn't change the fact that it doesn't work properly with DirectRunner. Do you want me to find what the reason for the bug is or what? – korujzade Sep 12 '18 at 14:05
  • Hi @korujzade, is it possible for you to test this with Dataflow Runner as well? On my end I couldn't reproduce this. You can use [Cloud Pub/Sub to BigQuery template](https://cloud.google.com/dataflow/docs/templates/provided-templates#cloudpubsubtobigquery). – Yurci Sep 18 '18 at 07:16
  • hi @Yurci. The issue does not happen with DataFlow Runner. However, the pipeline hangs for a couple of minutes after workers are successfully initialized, then works normally after it starts. But I guess it is another issue that I should address with a separate question. – korujzade Sep 24 '18 at 14:32

0 Answers0