I have one pipeline read file from GCS
through Pub\Sub
,
class ExtractFileNameFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield file_name
class LogFn(beam.DoFn):
def process(self, element):
logging.info(element)
return [element]
class LogPassThroughFn(beam.DoFn):
def process(self, element):
logging.info(element)
return element
...
p
| "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
| "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
| "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
| 'Log Results' >> beam.ParDo(LogFn())
# | 'Log Results' >> beam.ParDo(LogPassThroughFn())
| "Read File from GCS" >> beam.io.ReadAllFromText()
The difference of LogPassThroughFn
and LogPassThroughFn
is the type of return value, one the string
, the other is list
. And the LogFn
works well in test codes, but LogPassThroughFn
make the pipeline failed to run. Per this issue answer
Beam Python SDK still tries to interpret the output of that ParDo as if it was a collection of elements. And it does so by interpreting the string you emitted as collection of characters.
We know LogFn
should work correctly.
However, I notice the ExtractFileNameFn
return string
rather than list
. Is that correct? Then I test it as below, return list
in ExtractFileNameFn1
class ExtractFileNameFn1(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield [file_name]
...
p
| "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
| "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
| "Extract File Name" >> beam.ParDo(ExtractFileNameFn1())
| "Read File from GCS" >> beam.io.ReadAllFromText()
Now, the pipeline failed to run...
My question is What the difference between return string
and return list
in DoFn? Why ReadAllFromText
could receive string
from ExtractFileNameFn
, but receive list
from LogFn
?
beam version: 2.14.0