I am using apache beam 2.10 and trying to understand what exactly flatmap is doing when returning the pcollection to the caller.
Reading the explanation on online documentation, I thought flatMap simply splits the set of PCollection elements into single pcollection like in this example.
The callable must return an iterable for each element of the input PCollection. The elements of these iterables will be flattened into the output PCollection.
But in case of below code , flatMap flattens every single character of "Hello World" instead of returning "Hello World" as a whole.
def simple(x):
logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))
# Just return
return x
def run():
with beam.Pipeline(runner='DirectRunner') as p:
elems = p | 'map:init' >> beam.Create(["Hello World"])
#
# Sample with FlatMap
#
( elems | 'flatmap:exec' >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
| 'flatmap:out' >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
)
def main():
run()
if __name__ == "__main__":
main()
RESULT
H
e
l
l
o
W
o
r
l
d
But when I return a generator the iterated result , like in this code, it seems to become as pcollection.
def simple(x):
logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))
# Just return
yield x
def run():
with beam.Pipeline(runner='DirectRunner') as p:
elems = p | 'map:init' >> beam.Create(["Hello World"])
#
# Sample with FlatMap
#
( elems | 'flatmap:exec' >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
| 'flatmap:out' >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
)
def main():
run()
if __name__ == "__main__":
main()
RESULT
Hello World
ADDED ON May 2, 2019 JST
If I literally return a iterator then a string is iterated and each character is flattened into PCollections.
def simple(x):
logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))
# Just return
return iter(x)
def run():
with beam.Pipeline(runner='DirectRunner') as p:
elems = p | 'map:init' >> beam.Create(["Hello World"])
#
# Sample with Map
#
( elems | 'flatmap:exec' >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
| 'flatmap:out' >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
)
def main():
run()
if __name__ == "__main__":
main()
RESULT
H
e
l
l
o
W
o
r
l
d
So what does flatMap exactly do when returning the PCollection to the caller function?