1

I tried the new "pivot" function of 1.6 on a larger stacked dataset. It has 5,656,458 rows and the IndicatorCode column has 1344 different codes.

The idea was to use pivot to "unstack" (in pandas terms) this data set and have a column for each IndicatorCode.

schema = StructType([ \
   StructField("CountryName", StringType(), True), \
   StructField("CountryCode", StringType(), True), \
   StructField("IndicatorName", StringType(), True), \
   StructField("IndicatorCode", StringType(), True), \
   StructField("Year", IntegerType(), True), \
   StructField("Value", DoubleType(), True)  \
])

data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv', 
                            format='com.databricks.spark.csv', 
                            header='true', 
                            schema=schema)

data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
                      .select(["CountryCode", "IndicatorCode2", "Year", "Value"])

columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]

data3 = data2.groupBy(["Year", "CountryCode"])\
             .pivot("IndicatorCode2", columns)\
             .max("Value")

While this returned successfully, data3.first() never returned a result (I interrupted on my standalone using 3 cores after 10 min).

My approach using RDD and aggregateByKey worked well, so I'm not searching for a solution about how to do it, but whether pivot with DataFrames can also do the trick.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Bernhard
  • 25
  • 1
  • 3

2 Answers2

3

Well, pivoting is not a very efficient operation in general and there is not much you can do about it using DataFrame API. One thing you can try though is to repartition your data:

(data2
  .repartition("Year", "CountryCode")
  .groupBy("Year", "CountryCode")
  .pivot("IndicatorCode2", columns)
  .max("Value"))

or even aggregate:

from pyspark.sql.functions import max

(df
    .groupBy("Year", "CountryCode", "IndicatorCode")
    .agg(max("Value").alias("Value"))
    .groupBy("Year", "CountryCode")
    .pivot("IndicatorCode", columns)
    .max("Value"))

before applying pivot. The idea behind both solutions is the same. Instead of moving large expanded Rows move narrow dense data and expand locally.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks. I find it interesting that something that can be done on one node with pandas in 26 sec (16 sec loading/parsing csv amd 10 sec for unstacking) – Bernhard Feb 17 '16 at 07:14
  • ... is that difficult using spark with four of these machines. With RDD I am currently at about 100 sec - I will try to find a faster way in Spark. – Bernhard Feb 17 '16 at 07:27
  • It doesn't surprise me. Local processing is fast because it just moving things in memory. Spark has to perform a lot of work, even locally, and makes specific assumptions about the data which can make this type of operations inefficient. – zero323 Feb 17 '16 at 07:30
1

Spark 2.0 introduced SPARK-13749 an implementation of pivot that is faster for a large number of pivot column values.

Testing with Spark 2.1.0 on my computer, your example now runs in 48 seconds.

Andrew Ray
  • 736
  • 6
  • 4