6

Update: It seems my errors are probably because of how I installed Spark and/or Hive. Working with window functions seems pretty straightforward in a Databricks (hosted) notebook. I need to figure out how to set this up locally.

I have a Spark DataFrame that I need to use a Window function on.* I tried following the instructions over here, but I ran into some problems.

Setting up my environment:

import os
import sys
import datetime as dt

os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2'
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip'
sys.path.append('/usr/bin/spark-1.5.2/python')
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip')

import pyspark
sc = pyspark.SparkContext()
hiveContext = pyspark.sql.HiveContext(sc)
sqlContext = pyspark.sql.SQLContext(sc)
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict

Setting up my data:

test_ts = {'adminDistrict': None,
 'city': None,
 'country': {'code': 'NA', 'name': 'UNKNOWN'},
 'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89},
  {'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44},
  {'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3},
  {'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6},
  {'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84},
  {'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74},
  {'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33},
  {'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33},
  {'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5},
  {'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79},
  {'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3},
  {'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0},
  {'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35},
  {'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82},
  {'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24},
  {'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61},
  {'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14},
  {'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0},
  {'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82},
  {'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11},
  {'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46},
  {'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8},
  {'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74},
  {'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63},
  {'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64},
  {'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}],
 'maxDate': '2015-12-28T00:00:00Z',
 'minDate': '2005-08-25T00:00:00Z',
 'name': 'S&P GSCI Crude Oil Spot',
 'offset': 0,
 'resolution': 'DAY',
 'sources': ['trf'],
 'subtype': 'Index',
 'type': 'Commodities',
 'uid': 'TRF_INDEX_Z39824_PI'}

A function to turn that json into a DataFrame:

def ts_to_df(ts):
    data = []
    for line in ts['data']:
        data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value']))
    return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')])

Getting a dataframe and taking a look at what's inside:

test_df = ts_to_df(test_ts)
test_df.show()

That shows me this:

+----------+----------------------+
|      Date|SP_GSCI_Crude_Oil_Spot|
+----------+----------------------+
|2005-08-25|                369.89|
|2005-08-26|                362.44|
|2005-08-29|                 368.3|
|2005-08-30|                 382.6|
|2005-08-31|                377.84|
|2005-09-01|                380.74|
|2005-09-02|                370.33|
|2005-09-05|                370.33|
|2005-09-06|                 361.5|
|2005-09-07|                352.79|
|2005-09-08|                 354.3|
|2005-09-09|                 353.0|
|2005-09-12|                349.35|
|2005-09-13|                348.82|
|2005-09-14|                360.24|
|2005-09-15|                357.61|
|2005-09-16|                347.14|
|2005-09-19|                 370.0|
|2005-09-20|                362.82|
|2005-09-21|                366.11|
+----------+----------------------+

And here is where I have no idea what I'm doing and everything starts to go wrong:

from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window

w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()

That gives me this error:

Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;

So it looks like I need a HiveContext, right? Do I need to create my DataFrame using a HiveContext? Then let me try to create a DataFrame explicitly using HiveContext:

def ts_to_hive_df(ts):
    data = []
    for line in ts['data']:
        data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
                 ts['name'].replace('&', '').replace(' ', '_'):line['value']})
    temp_rdd = sc.parallelize(data).map(lambda x: Row(**x))
    return hiveContext.createDataFrame(temp_rdd)

test_df = ts_to_hive_df(test_ts)
test_df.show()

But that gives me this error:

TypeError: 'JavaPackage' object is not callable

So how do I use Window functions? Do I need to create the DataFrames using a HiveContext? If so, then how do I do that? Can someone tell me what I'm doing wrong?

*I need to know if there are gaps in my data. I have the column 'Date' and for each row, ordered by Date, I want to know what's on the next row, and if I have missing days or bad data, then I want to use the last day's data on that row. If you know of a better way of doing that, let me know. But I still would like to know how to get these Window functions working.

Community
  • 1
  • 1
Nathaniel
  • 540
  • 1
  • 7
  • 17
  • Sorry. Added specific code. I hope that leads us somewhere. Thanks for taking a look. – Nathaniel Dec 30 '15 at 14:24
  • 1
    Alright, well it looks like something might be messed up with how I have Spark (or Hive?) installed locally, because I can get this to work in a DataBricks notebook. DataBricks doesn't want us making our own HiveContexts or SQLContexts. To get it working there, I left out the creation of my own contexts and I used the above ts_to_hive_df function, replacing my hiveContext with their sqlContext. I'll have to get this working in my own install eventually. I'll come back and write a solution when I figure it out. – Nathaniel Dec 31 '15 at 14:51
  • 1
    It looks like you're Spark binaries have been built without Hive support. – zero323 Jan 04 '16 at 17:09

1 Answers1

1

This is an older question and thus moot since you've probably moved onto new versions of Spark. I'm running spark 2.0 myself, so this may be cheating.

But fwiw: 2 possible issues. In the first example, I think the .toDF() maybe defaulting to SQLContext since you had both called. In the second, when you refactored, could it be that you are calling the hivecontext inside the function?

If I refactor your second ts_to_df function to have hivecontext called outside the function, everything is fine.

def ts_to_df(ts):
    data = []
    for line in ts['data']:
        data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
                 ts['name'].replace('&', '').replace(' ', '_'):line['value']})
    return data

data = ts_to_df(test_ts)
test_rdd = sc.parallelize(data).map(lambda x: Row(**x))
test_df = hiveContext.createDataFrame(test_rdd)

from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window

w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()

I get the output

+----------+
| Next_Date|
+----------+
|2005-08-26|
|2005-08-29|
|2005-08-30|
|2005-08-31|
|2005-09-01|
|2005-09-02|
.....
data_steve
  • 1,548
  • 12
  • 17