4

I know we can use Window function in pyspark to calculate cumulative sum. But Window is only supported in HiveContext and not in SQLContext. I need to use SQLContext as HiveContext cannot be run in multi processes.

Is there any efficient way to calculate cumulative sum using SQLContext? A simple way is to load the data into the driver's memory and use numpy.cumsum, but the con is the data need to be able to fit into the memory

Sergey Bushmanov
  • 23,310
  • 7
  • 53
  • 72
Michael
  • 1,398
  • 5
  • 24
  • 40
  • _need to use SQLContext as HiveContext cannot be run in multi processes_ - huh? Would you care to elaborate? – zero323 Jan 12 '16 at 00:09
  • I have extensively used Window functions with sqlContext. – KrisP Jan 12 '16 at 02:31
  • @zero323 A limitation of HiveContext. I am facing the same problem as https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201502.mbox/%3C1423026302857-21491.post@n3.nabble.com%3E – Michael Jan 12 '16 at 04:57
  • @KrisP Can you please give me an example of using Window with sqlContext? I would greatly appreciate it, as I am stuck in this problem. Thanks! My Window function can work when I use HiveContext, but it crashes when I use sqlContext, with the error 'Note that, using window functions currently requires a HiveContext;' – Michael Jan 12 '16 at 05:05
  • 1
    It is not a limitation of `HiveContext`. You simply use embedded Derby as a metastore which is not intended for production. See my answer to http://stackoverflow.com/q/34705886/1560062 – zero323 Jan 12 '16 at 05:05
  • @KrisP Window functions are not supported without `HiveContext` (SPARK-11001). If you mean `sqlContext` in `spark-shell` / `pyspark` it is initialized with `HiveContext` as long as Spark has been built with Hive support. – zero323 Jan 12 '16 at 05:13
  • @zero323 seems pretty complicated to me, I have to dive deep into the spark code to make the changes? I think I will use numpy.cumsum instead. – Michael Jan 12 '16 at 05:21
  • 1
    No changes in the Spark code required. But you'll need some DevOps skills. – zero323 Jan 12 '16 at 05:27

4 Answers4

11

Not sure if this is what you are looking for but here are two examples how to use sqlContext to calculate the cumulative sum:

First when you want to partition it by some categories:

from pyspark.sql.types import StructType, StringType, LongType
from pyspark.sql import SQLContext

rdd = sc.parallelize([
    ("Tablet", 6500), 
    ("Tablet", 5500), 
    ("Cell Phone", 6000), 
    ("Cell Phone", 6500), 
    ("Cell Phone", 5500)
    ])

schema = StructType([
    StructField("category", StringType(), False),
    StructField("revenue", LongType(), False)
    ])

df = sqlContext.createDataFrame(rdd, schema)

df.registerTempTable("test_table")

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum
FROM
test_table
""")

Output:

[Row(category='Tablet', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=6500, cumsum=12000),
 Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Cell Phone', revenue=6000, cumsum=11500),
 Row(category='Cell Phone', revenue=6500, cumsum=18000)]

Second when you only want to take the cumsum of one variable. Change df2 to this:

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (ORDER BY revenue, category) as cumsum
FROM
test_table
""")

Output:

[Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=5500, cumsum=11000),
 Row(category='Cell Phone', revenue=6000, cumsum=17000),
 Row(category='Cell Phone', revenue=6500, cumsum=23500),
 Row(category='Tablet', revenue=6500, cumsum=30000)]

Hope this helps. Using np.cumsum is not very efficient after collecting the data especially if the dataset is large. Another way you could explore is to use simple RDD transformations like groupByKey() and then use map to calculate the cumulative sum of each group by some key and then reduce it at the end.

Dat Tran
  • 2,368
  • 18
  • 25
  • Thanks, but your solution works on hiveContext, and not sqlContext. Can you output your sqlContext? It should be showing that it is a hiveContext – Michael Jan 14 '16 at 09:15
5

Here is a simple example:

import pyspark
from pyspark.sql import window
import pyspark.sql.functions as sf


sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)

data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20),
                                   ("Cam", "F", "Cambridge", 1, 25),
                                  ("Lin", "F", "Cambridge", 1, 25),
                                  ("Cat", "M", "Boston", 1, 20),
                                  ("Sara", "F", "Cambridge", 1, 15),
                                  ("Jeff", "M", "Cambridge", 1, 25),
                                  ("Bean", "M", "Cambridge", 1, 26),
                                  ("Dave", "M", "Cambridge", 1, 21),], 
                                 ["name", 'gender', "city", 'donation', "age"])


data.show()

gives output

+----+------+---------+--------+---+
|name|gender|     city|donation|age|
+----+------+---------+--------+---+
| Bob|     M|   Boston|       1| 20|
| Cam|     F|Cambridge|       1| 25|
| Lin|     F|Cambridge|       1| 25|
| Cat|     M|   Boston|       1| 20|
|Sara|     F|Cambridge|       1| 15|
|Jeff|     M|Cambridge|       1| 25|
|Bean|     M|Cambridge|       1| 26|
|Dave|     M|Cambridge|       1| 21|
+----+------+---------+--------+---+

Define a window

win_spec = (window.Window
                  .partitionBy(['gender', 'city'])
                  .rowsBetween(window.Window.unboundedPreceding, 0))

# window.Window.unboundedPreceding -- first row of the group # .rowsBetween(..., 0) -- 0 refers to current row, if instead -2 specified then upto 2 rows before current row

Now, here is a trap:

temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

with error :

TypeErrorTraceback (most recent call last)
<ipython-input-9-b467d24b05cd> in <module>()
----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

/Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self)
    238 
    239     def __iter__(self):
--> 240         raise TypeError("Column is not iterable")
    241 
    242     # string methods

TypeError: Column is not iterable

This is due to using python's sum function instead of pyspark's. The way to fix this is using sum function from pyspark.sql.functions.sum:

temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec))
temp.show()

will give:

+----+------+---------+--------+---+--------------+
|name|gender|     city|donation|age|CumSumDonation|
+----+------+---------+--------+---+--------------+
|Sara|     F|Cambridge|       1| 15|             1|
| Cam|     F|Cambridge|       1| 25|             2|
| Lin|     F|Cambridge|       1| 25|             3|
| Bob|     M|   Boston|       1| 20|             1|
| Cat|     M|   Boston|       1| 20|             2|
|Dave|     M|Cambridge|       1| 21|             1|
|Jeff|     M|Cambridge|       1| 25|             2|
|Bean|     M|Cambridge|       1| 26|             3|
+----+------+---------+--------+---+--------------+
muon
  • 12,821
  • 11
  • 69
  • 88
  • 1
    win_spec is not defined in your example, could you add it ? Would be most helpful to understand your great example – Mike Apr 04 '18 at 08:11
  • oops my bad @Mike will try to dig up my codebase ;) fingers crossed – muon Apr 04 '18 at 21:45
1

After landing on this thread trying to solve a similar problem, I've solved my issue using this code. Not sure if I'm missing part of the OP, but this is a way to sum a SQLContext column:

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext

sc = SparkContext() 
sc.setLogLevel("ERROR")
conf = SparkConf()
conf.setAppName('Sum SQLContext Column')
conf.set("spark.executor.memory", "2g")
sqlContext = SQLContext(sc)

def sum_column(table, column):
    sc_table = sqlContext.table(table)
    return sc_table.agg({column: "sum"})

sum_column("db.tablename", "column").show()
Chris Marotta
  • 558
  • 6
  • 25
0

It is not true that windows function works only with HiveContext. You can use them even with sqlContext:

from pyspark.sql.window import *

myPartition=Window.partitionBy(['col1','col2','col3'])

temp= temp.withColumn("#dummy",sum(temp.col4).over(myPartition))
Mr. Xcoder
  • 4,719
  • 5
  • 26
  • 44
  • Only on spark 2.0+ one can use Window Functions with SQLContext. For Spark versions 1.4 ~ 1.6, it is necessary to use HiveContext – Daniel de Paula Jan 17 '17 at 14:34
  • No they are introduced from spark version 1.4 – Abhishek Kgsk Jan 19 '17 at 06:07
  • 1
    They exist since 1.4, but before Spark 2, it was necessary to use a HiveContext. However, in many distributions, the default class for the instance of "sqlContext" in both spark-shell and pyspark is, in fact, HiveContext, so this may cause some confusions, where people would think it was possible to use window functions with the normal SQLContext. You can refer to this question for more info: http://stackoverflow.com/questions/36171349/using-windowing-functions-in-spark – Daniel de Paula Jan 21 '17 at 09:47