2

If I call map or mapPartition and my function receives rows from PySpark what is the natural way to create either a local PySpark or Pandas DataFrame? Something that combines the rows and retains the schema?

Currently I do something like:

def combine(partition):
    rows = [x for x in partition]
    dfpart = pd.DataFrame(rows,columns=rows[0].keys())
    pandafunc(dfpart)

mydf.mapPartition(combine)
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
retrocookie
  • 319
  • 2
  • 10

4 Answers4

2

Spark >= 2.3.0

Since Spark 2.3.0 it is possible to use Pandas Series or DataFrame by partition or group. See for example:

Spark < 2.3.0

what is the natural way to create either a local PySpark

There is no such thing. Spark distributed data structures cannot be nested or you prefer another perspective you cannot nest actions or transformations.

or Pandas DataFrame

It is relatively easy but you have to remember at least few things:

  • Pandas and Spark DataFrames are not even remotely equivalent. These are different structures, with different properties and in general you cannot replace one with another.
  • Partitions can be empty.
  • It looks like you're passing dictionaries. Remember that base Python dictionary is unordered (unlike collections.OrderedDict for example). So passing columns may not work as expected.
import pandas as pd

rdd = sc.parallelize([
    {"x": 1, "y": -1}, 
    {"x": -3, "y": 0},
    {"x": -0, "y": 4}
])

def combine(iter):
    rows = list(iter)
    return [pd.DataFrame(rows)] if rows else []

rdd.mapPartitions(combine).first()
##    x  y
## 0  1 -1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you the explanation helps. The approach is similar to what I'm using now, but is there a natural way to pass the row schema other than just column names? – retrocookie Dec 24 '15 at 06:07
  • I am not sure if I understand the question. Pandas DataFrames use columns and dtype parameters which can be passed in a closure but it is not something that will be recognized by Spark. If you want a Spark DataFrame you should pass this `createDataFrame` and pass schema (which is different than Pandas dtypes) there. – zero323 Dec 24 '15 at 06:32
  • I want to maintain the schema, in the same way I'd imagine the normal toPandas does. Calling createDataFrame and then toPandas would be fine if I knew how to call createDataFrame and maintain the row schema. Although I'm guessing there might be a loss of efficiency? – retrocookie Dec 24 '15 at 06:56
  • `toPandas` simply collects and creates local data structure with the same `columns` names as Spark data frame. Nothing more, nothing less. `Row` (as `pyspark.sql.Row`) has no schema - it is just a `tuple` with a few added methods and `__fields__` attribute which stores names. – zero323 Dec 24 '15 at 07:04
  • Interesting, toPandas doesn't also enforce column types? And types aren't built into the rows? – retrocookie Dec 24 '15 at 07:12
2

It's actually possible to convert Spark rows to Pandas inside executors & finally create Spark DataFrame out of those output using mapPartitions. See my gist in Github

# Convert function to use in mapPartitions
def rdd_to_pandas(rdd_):
    # convert rows to dict
    rows = (row_.asDict() for row_ in rdd_)
    # create pandas dataframe
    pdf = pd.DataFrame(rows)

    # Rows/Pandas DF can be empty depending on patiition logic.
    # Make sure to check it here, otherwise it will throw untrackable error
    if len(pdf) > 0:
        #
        # Do something with pandas DataFrame 
        #
        pass

    return pdf.to_dict(orient='records')

# Create Spark DataFrame from resulting RDD
rdf = spark.createDataFrame(df.rdd.mapPartitions(rdd_to_pandas))
chhantyal
  • 11,874
  • 7
  • 51
  • 77
1

You could use toPandas(),

pandasdf = mydf.toPandas()
WoodChopper
  • 4,265
  • 6
  • 31
  • 55
  • That doesn't answer my question I need it to operate inside a map call on a partition. If there's a map that passes dataframe that would be good too. – retrocookie Dec 23 '15 at 17:03
  • Sorry, I couldn't understand `map that passes dataframe` . What is expected as output of spark dataframe? You want to create dataframe for each partition? – WoodChopper Dec 23 '15 at 17:22
  • 1
    mapPartition passes a Row iterator for each partition, so I can't use dataframe functions – retrocookie Dec 23 '15 at 17:25
0

In order to create a spark SQL dataframe you need a hive context:

hc = HiveContext(sparkContext)

With the HiveContext you can create a SQL dataframe via the inferSchema function:

sparkSQLdataframe = hc.inferSchema(rows)  
zero323
  • 322,348
  • 103
  • 959
  • 935
Jonathan
  • 358
  • 3
  • 14
  • Good point. This only works on a RDD. So you would invoke it for your variable 'combine' before you call the mapPartition. – Jonathan Dec 23 '15 at 17:16
  • Alternatively, it would be even better if you would read in your data as dataframe right away. You can do that for several input sources among them are JSON, Hive tables and many more – Jonathan Dec 23 '15 at 17:25
  • I have a dataframe but when I call mapPartition each slave node sees a row iterator, for convenience I'd like to combine these rows – retrocookie Dec 23 '15 at 17:29
  • Ah, now I understand. Unfortunately, I can't think of a good solution. Which dataframe operation do you want to apply if you had the rows combined into a dataframe? – Jonathan Dec 23 '15 at 18:05
  • Ultimately toPandas and then a pandas function. The approach sketched above works but it feels hacky – retrocookie Dec 23 '15 at 18:21