6

I'm running Spark on Hadoop's YARN. How does this conversion work? Does a collect() take place before the conversion?

Also I need to install Python and R on every slave node for the conversion to work? I'm struggling to find documentation on this.

Community
  • 1
  • 1
user1956609
  • 2,132
  • 5
  • 27
  • 43

1 Answers1

14

toPandas (PySpark) / as.data.frame (SparkR)

Data has to be collected before local data frame is created. For example toPandas method looks as follows:

def toPandas(self):
    import pandas as pd
    return pd.DataFrame.from_records(self.collect(), columns=self.columns)

You need Python, optimally with all the dependencies, installed on each node.

SparkR counterpart (as.data.frame) is simply an alias for collect.

To summarize in both cases data is collected to the driver node and converted to the local data structure (pandas.DataFrame and base::data.frame in Python and R respectively).

Vectorized user defined functions

Since Spark 2.3.0 PySpark also provides a set of pandas_udf (SCALAR, GROUPED_MAP, GROUPED_AGG) which operate in parallel on chunks of data defined by

  • Partitions in case of SCALAR variant
  • Grouping expression in case of GROUPED_MAP and GROUPED_AGG.

Each chunk is represented by

  • One or more pandas.core.series.Series in case of SCALAR and GROUPED_AGG variants.
  • A single pandas.core.frame.DataFrame in case of GROUPED_MAP variant.

Similarly, since Spark 2.0.0, SparkR provides dapply and gapply functions operating on data.frames defined by partitions and grouping expressions respectively.

Aforementioned functions:

  • Don't collect to the driver. Unless data contains only a single partition (i.e. with coalesce(1)) or grouping expression is trivial (i.e. groupBy(lit(1))) there is no single node bottleneck.
  • Load respective chunks in memory of the corresponding executor. As a result it is limited by the size of individual chunks / memory available on each executor.
zero323
  • 322,348
  • 103
  • 959
  • 935
  • So, toPandas is always on the driver node? and you can never use a pandas dataframe in a map function on a worker node? – Matthias Aug 25 '16 at 16:04
  • @Matthias toPandas is always on driver. You can use pandas objects inside map if you want but it is not something that concerns Spark in particular. Whatever you get inside executor thread is just a plain local object. – zero323 Aug 25 '16 at 16:07
  • Ahh, thanks for clarifying. I was just discussing a scenario with my colleagues today ([see this link](http://stackoverflow.com/questions/39142549/is-dataframe-topandas-always-on-driver-node-or-on-worker-nodes)). – Matthias Aug 25 '16 at 17:41
  • Hmm, according to your example it does work in a distributed manner? Meaning that you can create many pandas data frames within the map function. are they then located on the driver node? – Matthias Aug 25 '16 at 21:15