I have a large number of fairly large daily files stored in a blog storage engine(S3, Azure datalake exc.. exc..) data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv
. My goal is to preform a rolling N-day linear regression but I am having trouble with the data loading aspect. I am not sure how to do this without nested RDD's.
The schema for every .csv
file is the same.
In other words for every date d_t
, I need data x_t
and to join data (x_t-1, x_t-2,... x_t-N)
.
How can I use PySpark to load an N-day Window of these daily files? All of the PySpark examples I can find seem to load from one very large file or data set.
Here's an example of my current code:
dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]
p = sc.parallelize(dates)
def test_run(date_range):
dt0 = date_range[-1] #get the latest date
s = '/daily/data{}.csv'
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
return 1
p.filter(test_run)
p.map(test_run) #fails with same error as p.filter
I'm on PySpark version '2.1.0'
I'm running this on an Azure HDInsight cluster jupyter notebook.
spark
here is of type <class 'pyspark.sql.session.SparkSession'>
A smaller more reproducible example is as follows:
p = sc.parallelize([1, 2, 3])
def foo(date_range):
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
return 1
p.filter(foo).count()