1

I am using apache beam 2.10 and trying to understand what exactly flatmap is doing when returning the pcollection to the caller.

Reading the explanation on online documentation, I thought flatMap simply splits the set of PCollection elements into single pcollection like in this example.

The callable must return an iterable for each element of the input PCollection. The elements of these iterables will be flattened into the output PCollection.

But in case of below code , flatMap flattens every single character of "Hello World" instead of returning "Hello World" as a whole.

def simple(x):
    logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))

    # Just return
    return x

def run():
    with beam.Pipeline(runner='DirectRunner') as p:
        elems = p | 'map:init' >> beam.Create(["Hello World"])

        #
        # Sample with FlatMap
        #
        ( elems | 'flatmap:exec'   >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
            | 'flatmap:out'    >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
        )

def main():
    run()

if __name__ == "__main__":
  main()

RESULT

H
e
l
l
o

W
o
r
l
d

But when I return a generator the iterated result , like in this code, it seems to become as pcollection.

def simple(x):
    logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))

    # Just return
    yield x

def run():
    with beam.Pipeline(runner='DirectRunner') as p:
        elems = p | 'map:init' >> beam.Create(["Hello World"])

        #
        # Sample with FlatMap
        #
        ( elems | 'flatmap:exec'   >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
            | 'flatmap:out'    >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
        )

def main():
    run()

if __name__ == "__main__":
  main()

RESULT

Hello World

ADDED ON May 2, 2019 JST

If I literally return a iterator then a string is iterated and each character is flattened into PCollections.

def simple(x):
    logging.info("Inside simple type: {0}, val: {1}".format(type(x), x))

    # Just return
    return iter(x)

def run():
    with beam.Pipeline(runner='DirectRunner') as p:
        elems = p | 'map:init' >> beam.Create(["Hello World"])

        #
        # Sample with Map
        #
        ( elems | 'flatmap:exec'   >> beam.FlatMap(simple) # 1 to 1 relation with flatmap
            | 'flatmap:out'    >> beam.io.textio.WriteToText(file_path_prefix='flatmap')
        )

def main():
    run()

if __name__ == "__main__":
  main()

RESULT

H
e
l
l
o

W
o
r
l
d

So what does flatMap exactly do when returning the PCollection to the caller function?

Yu Watanabe
  • 621
  • 4
  • 17

1 Answers1

2

FlatMap assumes that the return type of given function is iterable. In your first example, simple returns "Hello World". As iterable, "Hello World" can be considered as ['H','e','l','l','o',' ','W','o','r','l','d']. Therefore, your first example works like following:

  1. [] -> create -> ["Hello World"]
  2. ["Hello World"] -> map -> [['H','e','l','l','o',' ','W','o','r','l','d']]
  3. [['H','e','l','l','o',' ','W','o','r','l','d']] -> flatten -> ['H','e','l','l','o',' ','W','o','r','l','d']

Final PCollection: ['H','e','l','l','o',' ','W','o','r','l','d']

In your second example, however, simple yields x. You can think that simple returns iterator which contains a single element x. So your second example works like this:

  1. [] -> create -> ["Hello World"]
  2. ["Hello World"] -> map -> [["Hello World"]]
  3. [["Hello World"]] -> flatten -> ["Hello World"]

Final PCollection: ["Hello World"]

To answer your last question:

yield x and return iter(x) are semantically different. The following example may help you to understand the context.

>>> list(iter("abc"))
['a', 'b', 'c']
>>> def x(): yield "abc"
>>> list(x())
['abc']
ihji
  • 1,456
  • 11
  • 19
  • Thank you for posting the answer. I agree with your first point but I dont think returning a *iterator* is right way to understand because if I literally return iterator by *iter()* then still each character of "Hello World" is flattened to PCollections. – Yu Watanabe May 02 '19 at 01:24