0

I am trying to learn how to use pyspark.pandas and I am coming across an issue that I don't know how to solve. I have a df of about 700k rows and 7 columns. Here is a sample of my data:

import pyspark.pandas as ps
import pandas as pd

data = {'Region': ['Africa','Africa','Africa','Africa','Africa','Africa','Africa','Asia','Asia','Asia'],
         'Country': ['South Africa','South Africa','South Africa','South Africa','South Africa','South Africa','South Africa','Japan','Japan','Japan'],
         'Product': ['ABC','ABC','ABC','XYZ','XYZ','XYZ','XYZ','DEF','DEF','DEF'],
         'Year': [2016, 2018, 2019,2016, 2017, 2018, 2019,2016, 2017, 2019],
         'Price': [500, 0,450,750,0,0,890,19,120,3],
         'Quantity': [1200,0,330,500,190,70,120,300,50,80],
         'Value': [600000,0,148500,350000,0,29100,106800,74300,5500,20750]}

df = ps.DataFrame(data)

Even when I run the simplest of operations like df.head(), I get the following warning and I'm not sure how to fix it:

WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

I know how to work around this with pyspark dataframes, but I'm not sure how to fix it using the Pandas API for Pyspark to define a partition for window operation.

Does anyone have any suggestions?

A.N.
  • 580
  • 4
  • 15
  • This might help: [https://stackoverflow.com/questions/41313488/avoid-performance-impact-of-a-single-partition-mode-in-spark-window-functions](https://stackoverflow.com/questions/41313488/avoid-performance-impact-of-a-single-partition-mode-in-spark-window-functions) – waterlily74 Aug 10 '22 at 15:51

1 Answers1

0

For Koalas, the repartition seems to only take in a number of partitions here: https://koalas.readthedocs.io/en/latest/reference/api/databricks.koalas.DataFrame.spark.repartition.html

I think the goal here is to run Pandas functions on a Spark DataFrame. One option you can use is Fugue. Fugue can take a Python function and apply it on Spark per partition. Example code below.

from typing import List, Dict, Any
import pandas as pd 

df = pd.DataFrame({"date":["2021-01-01", "2021-01-02", "2021-01-03"] * 3,
                   "id": (["A"]*3 + ["B"]*3 + ["C"]*3),
                   "value": [3, 4, 2, 1, 2, 5, 3, 2, 3]})


def count(df: pd.DataFrame) -> pd.DataFrame:
    # this assumes the data is already partitioned
    id = df.iloc[0]["id"]
    count = df.shape[0]
    return pd.DataFrame({"id": [id], "count": [count]})

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

sdf = spark.createDataFrame(df)

from fugue import transform

# Pandas
pdf = transform(df.copy(),
          count,
          schema="id:str, count:int",
          partition={"by": "id"})
print(pdf.head())

# Spark
transform(sdf,
          count,
          schema="id:str, count:int",
          partition={"by": "id"},
          engine=spark).show()

You just need to annotate your function with input and output types and then you can use it with the Fugue transform function. Schema is a requirement for Spark so you need to pass it. If you supply spark as the engine, then the execution will happen on Spark. Otherwise, it will run on Pandas by default.

Kevin Kho
  • 679
  • 4
  • 14