6

I have a large number of fairly large daily files stored in a blog storage engine(S3, Azure datalake exc.. exc..) data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv. My goal is to preform a rolling N-day linear regression but I am having trouble with the data loading aspect. I am not sure how to do this without nested RDD's. The schema for every .csv file is the same.

In other words for every date d_t, I need data x_t and to join data (x_t-1, x_t-2,... x_t-N).

How can I use PySpark to load an N-day Window of these daily files? All of the PySpark examples I can find seem to load from one very large file or data set.

Here's an example of my current code:

dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]

p = sc.parallelize(dates)
def test_run(date_range):
    dt0 = date_range[-1] #get the latest date
    s = '/daily/data{}.csv'
    df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
    file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
    df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
    return 1

p.filter(test_run) 

p.map(test_run) #fails with same error as p.filter

I'm on PySpark version '2.1.0'

I'm running this on an Azure HDInsight cluster jupyter notebook.

spark here is of type <class 'pyspark.sql.session.SparkSession'>

A smaller more reproducible example is as follows:

p = sc.parallelize([1, 2, 3])
def foo(date_range):
    df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
    return 1
p.filter(foo).count()
Merlin
  • 24,552
  • 41
  • 131
  • 206
pyCthon
  • 11,746
  • 20
  • 73
  • 135
  • see my updated answer below – Pushkr Apr 27 '17 at 16:05
  • @Pushkr I updated it again to make it more clear, removing the window function call and putting an example of the dates. Just that example fails for me, I can run it fine by directly calling `test_run(dates[0])` – pyCthon Apr 27 '17 at 16:27
  • 1
    Have you tried loading all the data directly in a Dataframe using `spark.read.csv(folder)` and then add a new column named (file_name) `.withColumn("filename", input_file_name())` Then just use grouping based on this column to manipulate the dataframe further so you can perform the N-day linear regression? – Boggio May 04 '17 at 10:53
  • @Teodor-BogdanBarbieru No I have not tried this, will try shortly thank you for the suggestion. – pyCthon May 04 '17 at 11:50

1 Answers1

2

You are better off with using Dataframes rather than RDD. Dataframe's read.csv api accepts list of paths like -

pathList = ['/path/to/data1900-01-01.csv','/path/to/data1900-01-02.csv']
df = spark.read.csv(pathList)

have a look at documentation for read.csv

You can form the list of paths to date files to your data files by doing some date operation over window of N days like "path/to/data"+datetime.today().strftime("%Y-%m-%d"))+.csv" (This will get you file name of today only but its not hard to figure out date calculation for N days)

However keep in mind that schema of all date csvs should be same for above to work.

edit : When you parallelize list of dates i.e. p, each date gets processed individually by different executors, so input to test_run2 wasnt really as list of dates, it was one individual string like 1995-01-01

Try this instead, see if this works.

# Get the list of dates 
date_range = window(dates, N) 
s = '/daily/data{}.csv'

dt0 = date_range[-1] # most recent file
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM') 

# read previous files
file_list = [s.format(dt) for dt in date_range[:-1]]
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')

r, resid = computeLinearRegression(df0,df1)
r.write.save('daily/r{}.csv'.format(dt0))
resid.write.save('/daily/resid{}.csv'.format(dt0))
Pushkr
  • 3,591
  • 18
  • 31
  • My understanding was that `spark.read` is itself an `RDD` operation so I can't call `spark.read` from within a `sc.parallelize` operation? – pyCthon Apr 27 '17 at 14:34
  • No you can use spark.read inside sc.parallelize. `sc.parallelize` creates RDDs while spark.read creates `dataframe` if you really want those files as `RDD` only, you can do `df.rdd` to convert dataframe to rdd, This will crate `Rdd with Rows`. – Pushkr Apr 27 '17 at 14:39
  • I get the error described here when I try this http://stackoverflow.com/questions/40470487/pyspark-throwing-error-method-getnewargs-does-not-exist – pyCthon Apr 27 '17 at 14:40
  • Can you post the code you tried when you got this error? – Pushkr Apr 27 '17 at 14:43
  • I added an example above – pyCthon Apr 27 '17 at 15:09
  • What is output of `window(dates, N)`? and you are not using file_list anywhere. Both df0 and df1 are basically reading same date_range[0]. And may be you meant to do `.map` in step p.filter(test_run2)` instead of filter. Because your method always returns value 1 so its like saying `p.filter(1).count()` that doesnt make sense. thats why its failing – Pushkr Apr 27 '17 at 15:25
  • sorry I cleaned it up, just that loading example is failing for me, `window(dates, N) ` just gives a list of lists with list[0] being `['1995-01-01', ...'1996-01-01']` for example – pyCthon Apr 27 '17 at 15:34
  • I improved my question and example quite a bit based on your suggestions but it still isn't working for me, if I run everything outside `sc.parallelize` it works not sure why it won't otherwise – pyCthon Apr 27 '17 at 17:59
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/142853/discussion-between-pushkr-and-pycthon). – Pushkr Apr 27 '17 at 19:33