0

I am new to Spark generally, so I wanted to test out some functionality of Spark Streaming which I am likely to require for another larger project.

My toy problem is that I want to handle a stream and learn a linear regression with Stochastic Gradient Descent on it (I am aware that there is StreamingLinearRegression already provided, but I want to implement it myself):

class OnlineLinearRegression:
    def __init__(self, eta=0.1, nr_features=8):
        self.w = np.zeros(nr_features)
        self.nr_features = nr_features
        self.eta = eta

    def evaluate(self, x):
        return np.dot(self.w, x)

    def loss(self, pred, true_value):
        loss = (pred-true_value)*(pred-true_value)
        return loss

    def update_model(self, x, pred, y):
        bef_update = self.w
        self.w = self.w - self.eta * (pred - y) * np.array(x)
        print "=========================="
        print "UPDATE MODEL"
        print "self id " + str(id(self))
        print "w_before_update " + str(bef_update)
        print "w_after_update " + str(self.w)
        print "=========================="

    def prequential(self, e):
        y_hat = self.evaluate(e[0])
        ell = self.loss(y_hat, e[1])
        self.update_model(e[0], y_hat, e[1])
        return ell

    def processRDD(self, time, RDD):
        RDD.foreach(self.prequential)

My Main method looks as follows:

if __name__ == "__main__":
    model = OnlineLinearRegression(eta=0.1,nr_features=8)
    print "ID-MODEL-CREATED: " + str(id(model))

    sc = SparkContext("local[*]", "Test Scenario")
    ssc = StreamingContext(sc, 0.5)

    text = ssc.socketTextStream("localhost", 9997)
    tuples = text.map(parser)
    tuples.foreachRDD(model.processRDD)

    ssc.start()
    ssc.awaitTermination() 

Data is generated once per second and Spark Streaming has an interval of one second (therefore only one sample / batch is processed). Here is some output generated in the OnlineLinearRegression update_model function:

Once at start:

ID-MODEL-CREATED: 140184763844800

1. Sample

  • self id 140411103203200

  • w_before_update [ 0. 0. 0. 0. 0. 0. 0. 0.]

  • w_after_update [ 0. 0.6825 0.5475 0.1425 0.771 0.33675 0.1515 0.225 ]

2. Sample

  • self id 140411106740920

  • w_before_update [ 0. 0. 0. 0. 0. 0. 0. 0.]

  • w_after_update [ 0. 0.245 0.1855 0.063 0.15785 0.06965 0.03395 0.049 ]

3. Sample

  • self id 140411106740704

  • w_before_update [ 0. 0. 0. 0. 0. 0. 0. 0.]

  • w_after_update [ 1.8 0.477 0.378 0.1215 0.6093 0.23085 0.12735 0.189 ]

4. Sample

  • self id 140411106738904

  • w_before_update [ 0. 0. 0. 0. 0. 0. 0. 0.]

  • w_after_update [ 0. 0.44 0.365 0.125 0.516 0.2155 0.114 0.155 ]

5. Sample

  • self id 140411106738904 (Comment: same id as in 4, but w still does not change)

  • w_before_update [ 0. 0. 0. 0. 0. 0. 0. 0.]

  • w_after_update [ 0.7 0.231 0.1785 0.056 0.1435 0.06265 0.02765 0.0385 ]

My questions:

  1. Why do the self_ids change, but at the same time sometimes remain the same? I identify this as the reason w remains a zero vector. But __init__ is only called once.

  2. What are my misunderstandings of Spark and what would be a possible way to fix it in a way so that I have a global model that gets iteratively manipulated?

Thanks a lot.

Edit: Changing the tuples.foreachRDD(model.processRDD) to tuples.map(model.prequential) didn't help, the output is similar (with same issues).

no_use123
  • 294
  • 3
  • 12

1 Answers1

0

Try running spark on a single core. The problem is in sharing of the model with different workers (in your case cores).

Here is what you can do for distributed processing for every batch:

  • Make the model a broadcast variable.
  • Unpersist(blocking=true) it for every update.
  • Rebroadcast it.

Broadcasting it for every other record (sample/example) can cause lots of network traffic latency. You can treat every batch as your sample data (from the whole population) and find the weights that are minimal for that data. Then update the model with that value. You can read it more in detail here.

You can further explore here.