I am new to Spark generally, so I wanted to test out some functionality of Spark Streaming which I am likely to require for another larger project.
My toy problem is that I want to handle a stream and learn a linear regression with Stochastic Gradient Descent on it (I am aware that there is StreamingLinearRegression already provided, but I want to implement it myself):
class OnlineLinearRegression:
def __init__(self, eta=0.1, nr_features=8):
self.w = np.zeros(nr_features)
self.nr_features = nr_features
self.eta = eta
def evaluate(self, x):
return np.dot(self.w, x)
def loss(self, pred, true_value):
loss = (pred-true_value)*(pred-true_value)
return loss
def update_model(self, x, pred, y):
bef_update = self.w
self.w = self.w - self.eta * (pred - y) * np.array(x)
print "=========================="
print "UPDATE MODEL"
print "self id " + str(id(self))
print "w_before_update " + str(bef_update)
print "w_after_update " + str(self.w)
print "=========================="
def prequential(self, e):
y_hat = self.evaluate(e[0])
ell = self.loss(y_hat, e[1])
self.update_model(e[0], y_hat, e[1])
return ell
def processRDD(self, time, RDD):
RDD.foreach(self.prequential)
My Main method looks as follows:
if __name__ == "__main__":
model = OnlineLinearRegression(eta=0.1,nr_features=8)
print "ID-MODEL-CREATED: " + str(id(model))
sc = SparkContext("local[*]", "Test Scenario")
ssc = StreamingContext(sc, 0.5)
text = ssc.socketTextStream("localhost", 9997)
tuples = text.map(parser)
tuples.foreachRDD(model.processRDD)
ssc.start()
ssc.awaitTermination()
Data is generated once per second and Spark Streaming has an interval of one second (therefore only one sample / batch is processed). Here is some output generated in the OnlineLinearRegression update_model function:
Once at start:
ID-MODEL-CREATED: 140184763844800
1. Sample
self id 140411103203200
w_before_update [ 0. 0. 0. 0. 0. 0. 0. 0.]
w_after_update [ 0. 0.6825 0.5475 0.1425 0.771 0.33675 0.1515 0.225 ]
2. Sample
self id 140411106740920
w_before_update [ 0. 0. 0. 0. 0. 0. 0. 0.]
w_after_update [ 0. 0.245 0.1855 0.063 0.15785 0.06965 0.03395 0.049 ]
3. Sample
self id 140411106740704
w_before_update [ 0. 0. 0. 0. 0. 0. 0. 0.]
w_after_update [ 1.8 0.477 0.378 0.1215 0.6093 0.23085 0.12735 0.189 ]
4. Sample
self id 140411106738904
w_before_update [ 0. 0. 0. 0. 0. 0. 0. 0.]
w_after_update [ 0. 0.44 0.365 0.125 0.516 0.2155 0.114 0.155 ]
5. Sample
self id 140411106738904 (Comment: same id as in 4, but w still does not change)
w_before_update [ 0. 0. 0. 0. 0. 0. 0. 0.]
w_after_update [ 0.7 0.231 0.1785 0.056 0.1435 0.06265 0.02765 0.0385 ]
My questions:
Why do the self_ids change, but at the same time sometimes remain the same? I identify this as the reason
w
remains a zero vector. But__init__
is only called once.What are my misunderstandings of Spark and what would be a possible way to fix it in a way so that I have a global model that gets iteratively manipulated?
Thanks a lot.
Edit: Changing the tuples.foreachRDD(model.processRDD)
to tuples.map(model.prequential)
didn't help, the output is similar (with same issues).