Update: It seems my errors are probably because of how I installed Spark and/or Hive. Working with window functions seems pretty straightforward in a Databricks (hosted) notebook. I need to figure out how to set this up locally.
I have a Spark DataFrame that I need to use a Window function on.* I tried following the instructions over here, but I ran into some problems.
Setting up my environment:
import os
import sys
import datetime as dt
os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2'
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip'
sys.path.append('/usr/bin/spark-1.5.2/python')
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip')
import pyspark
sc = pyspark.SparkContext()
hiveContext = pyspark.sql.HiveContext(sc)
sqlContext = pyspark.sql.SQLContext(sc)
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict
Setting up my data:
test_ts = {'adminDistrict': None,
'city': None,
'country': {'code': 'NA', 'name': 'UNKNOWN'},
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89},
{'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44},
{'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3},
{'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6},
{'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84},
{'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74},
{'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5},
{'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79},
{'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3},
{'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0},
{'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35},
{'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82},
{'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24},
{'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61},
{'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14},
{'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0},
{'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82},
{'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11},
{'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46},
{'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8},
{'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74},
{'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63},
{'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64},
{'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}],
'maxDate': '2015-12-28T00:00:00Z',
'minDate': '2005-08-25T00:00:00Z',
'name': 'S&P GSCI Crude Oil Spot',
'offset': 0,
'resolution': 'DAY',
'sources': ['trf'],
'subtype': 'Index',
'type': 'Commodities',
'uid': 'TRF_INDEX_Z39824_PI'}
A function to turn that json into a DataFrame:
def ts_to_df(ts):
data = []
for line in ts['data']:
data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value']))
return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')])
Getting a dataframe and taking a look at what's inside:
test_df = ts_to_df(test_ts)
test_df.show()
That shows me this:
+----------+----------------------+
| Date|SP_GSCI_Crude_Oil_Spot|
+----------+----------------------+
|2005-08-25| 369.89|
|2005-08-26| 362.44|
|2005-08-29| 368.3|
|2005-08-30| 382.6|
|2005-08-31| 377.84|
|2005-09-01| 380.74|
|2005-09-02| 370.33|
|2005-09-05| 370.33|
|2005-09-06| 361.5|
|2005-09-07| 352.79|
|2005-09-08| 354.3|
|2005-09-09| 353.0|
|2005-09-12| 349.35|
|2005-09-13| 348.82|
|2005-09-14| 360.24|
|2005-09-15| 357.61|
|2005-09-16| 347.14|
|2005-09-19| 370.0|
|2005-09-20| 362.82|
|2005-09-21| 366.11|
+----------+----------------------+
And here is where I have no idea what I'm doing and everything starts to go wrong:
from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()
That gives me this error:
Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;
So it looks like I need a HiveContext, right? Do I need to create my DataFrame using a HiveContext? Then let me try to create a DataFrame explicitly using HiveContext:
def ts_to_hive_df(ts):
data = []
for line in ts['data']:
data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
ts['name'].replace('&', '').replace(' ', '_'):line['value']})
temp_rdd = sc.parallelize(data).map(lambda x: Row(**x))
return hiveContext.createDataFrame(temp_rdd)
test_df = ts_to_hive_df(test_ts)
test_df.show()
But that gives me this error:
TypeError: 'JavaPackage' object is not callable
So how do I use Window functions? Do I need to create the DataFrames using a HiveContext? If so, then how do I do that? Can someone tell me what I'm doing wrong?
*I need to know if there are gaps in my data. I have the column 'Date' and for each row, ordered by Date, I want to know what's on the next row, and if I have missing days or bad data, then I want to use the last day's data on that row. If you know of a better way of doing that, let me know. But I still would like to know how to get these Window functions working.