1

I have an array of very large size. I want to do linear regression on each column of the array. To speed up the calculation, I created a list with each column of the array as its element. I then employed pyspark to create a RDD and further applied a defined function on it. I had memory problems in creating that RDD (i.e. parallelization).

I have tried to improve the spark.driver.memory to 50g by setting the spark-defaults.conf but the program still seems dead.

import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
from pyspark import SparkContext
sc = SparkContext("local", "get Linear Coefficients")

def getLinearCoefficients(column):
    y=column[~np.isnan(column)] # Extract column non-nan values
    x=np.where(~np.isnan(column))[0]+1 # Extract corresponding indexs plus 1
    # We only do linear regression interpolation when there are no less than 3 data pairs exist.
    if y.shape[0]>=3:
        model=LinearRegression(fit_intercept=True) # Intilialize linear regression model
        model.fit(x[:,np.newaxis],y) # Fit the model using data
        n=y.shape[0]
        slope=model.coef_[0]
        intercept=model.intercept_
        r2=r2_score(y,model.predict(x[:,np.newaxis]))
        rmse=np.sqrt(mean_squared_error(y,model.predict(x[:,np.newaxis])))
    else:
        n,slope,intercept,r2,rmse=np.nan,np.nan,np.nan,np.nan,np.nan
    return n,slope,intercept,r2,rmse

random_array=np.random.rand(300,2000*2000) # Here we use a random array without missing data for testing purpose.
columns=[col for col in random_array.T]
columnsRDD=sc.parallelize(columns)
columnsLinearRDD=columnsRDD.map(getLinearCoefficients)
n=np.array([e[0] for e in columnsLinearRDD.collect()])
slope=np.array([e[1] for e in columnsLinearRDD.collect()])
intercept=np.array([e[2] for e in columnsLinearRDD.collect()])
r2=np.array([e[3] for e in columnsLinearRDD.collect()])
rmse=np.array([e[4] for e in columnsLinearRDD.collect()])

The program output was stagnant like the following.

Exception in thread "dispatcher-event-loop-0" java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:486)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:467)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:315)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$11.apply(TaskSchedulerImpl.scala:412)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$11.apply(TaskSchedulerImpl.scala:409)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:409)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:396)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396)
        at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86)
        at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
        at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
        at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I guess it is possible to use pyspark to speed up the calculation but how could I make it? Modifying other parameters in spark-defaults.conf? Or vectorize each column of the array (I do know range() function in Python3 do that way and it is really faster.)?

Fei Yao
  • 1,502
  • 10
  • 10

2 Answers2

1

That is not going to work that way. You are basically doing three things:

  1. you are using a RDD for parallelization,
  2. you are calling your getLinearCoefficients() function and finally
  3. you call collect() on it to use your existing code.

There is nothing wrong with the frist point, but there is a huge mistake in the second and third step. Your getLinearCoefficients() function does not benefit from pyspark, as you use numpy and sklearn (Have a look at this post for a better explanation). For most of the functions you are using, there is a pyspark equivalent. The problem with the third step is the collect() function. When you call collect(), pyspark is bringing all the rows of the RDD to the driver and executes the sklearn functions there. Therefore you get only the parallelization which is allowed by sklearn. Using pyspark is completely pointless in the way you are doing it currently and maybe even a drawback. Pyspark is not a framework which allows you to run your python code in parallel. When you want to execute your code in parallel with pyspark, you have to use the pyspark functions.

So what can you?

  • First of all you could use the n_jobs parameter of the LinearRegession class to use more than one core for your calculation. This allows you at least to use all cores of one machine.
  • Another thing you could do, is stepping away from sklearn and use the linearRegression of pyspark (have a look at the guide and the api). With this you can use a whole cluster for your linear regression.
cronoik
  • 15,434
  • 3
  • 40
  • 78
  • 1
    Thanks for your comment. I agree with you that it is inadvisable to use .collect() method in pyspark since that consumes too much memory. But I guess those codes with .collect() do not involve getLinearCoefficients() function. I suppose I also have problems with the code: columnsRDD=sc.parallelize(columns). To illustrate this, I also wrote a skeleton program appended to the original question. Your suggestion of using pyspark function is really appreciated. I am relatively a novice of pyspark and will learn it via time! – Fei Yao Jun 18 '19 at 13:54
  • I have extended my post to address your getLinearCoefficients() function a bit. I think your other question should regarding your skeleton program should be addressed in a seperate question. – cronoik Jun 18 '19 at 22:06
  • Despite not giving a try, I think your answer is good. I should use pyspark equivalent functions in order to take its advantage. I have moved my question on the skeleton program to another page (https://stackoverflow.com/questions/56663268/is-there-a-way-converting-np-array-or-list-to-something-like-python3-range-i). Hope you could also help. – Fei Yao Jun 19 '19 at 08:30
0

For large datasets with more than 100k samples, using LinearRegression is discouraged. General advice is to use the SGDRegressor and set the parameters correctly, so that OLS loss is being used:

from sklearn.linear_model import SGDRegressor

And replace your LinearRegression with:

model = SGDRegressor(loss=’squared_loss’, penalty=’none’, fit_intercept=True)

Setting loss=’squared_loss’ and penalty=’none’ sets the SGDRegressor to use OLS and no regularization, thus it should produce results similar to LinearRegression.

Try out some options like learning_rate and eta0/power_t to find an optimum in the performance.

Furthermore I recommend using train_test_split to split the data set and use the test set for scoring. A good test size to begin with is test_size=.3.

JE_Muc
  • 5,403
  • 2
  • 26
  • 41
  • Thanks for commenting. Your answer is good but honestly not suitable for my question because mine is about doing 100k regressions each with 300 samples instead of doing a regression with 100k samples. Anyway, I think your answer is useful and might benefit me in the future. – Fei Yao Jun 19 '19 at 08:38