I have a Kafka Stream open in my pySpark code like so:
ssc.checkpoint('ckpt')
mystream = KafkaUtils.createStream(ssc, K_HOST, "sample", {"logs_queue": 1})
Now I am trying to filter this stream into 2 separate streams basis of some condition. If I run the following code, I get the 2 streams perfectly:
s1 = mystream.filter(lambda s: s['key'].startswith("11")).map(lambda s: (s['key'], 1)).reduceByKey(lambda a, b: a + b)
s1.pprint()
s2 = mystream.filter(lambda s: s['key'].startswith("12")).map(lambda s: (s['key'], 1)).reduceByKey(lambda a, b: a + b)
s2.pprint()
The output for the above statements is correct:
-------------------------------------------
Time: 2017-02-08 14:09:26
-------------------------------------------
(u'11-59', 201)
(u'11-142', 225)
(u'11-68', 151)
(u'11-64', 161)
(u'11-60', 152)
(u'11-69', 106)
(u'11-65', 196)
(u'11-61', 208)
(u'11-143', 158)
(u'11-140', 112)
...
-------------------------------------------
Time: 2017-02-08 14:09:26
-------------------------------------------
(u'12-14', 62)
(u'12-10', 73)
(u'12-36', 95)
(u'12-32', 106)
(u'12-18', 82)
(u'12-21', 107)
(u'12-25', 68)
(u'12-29', 111)
(u'12-15', 134)
(u'12-28', 59)
...
Now since only the filter is different in the above statement, I changed the above code to a for loop like so:
f = ["12", "11"]
for i in f:
fs = mystream.filter(lambda s: s['key'].startswith(i)).map(lambda s: (s['key'], 1)).reduceByKey(lambda a, b: a + b)
fs.pprint()
I was expecting the output to be same, but it was same for both pprints as shown below.
-------------------------------------------
Time: 2017-02-08 14:05:38
-------------------------------------------
(u'11-59', 102)
(u'11-68', 107)
(u'11-60', 93)
(u'11-142', 145)
(u'11-64', 150)
(u'11-61', 71)
(u'11-143', 155)
(u'11-65', 131)
(u'11-69', 110)
(u'11-140', 71)
...
-------------------------------------------
Time: 2017-02-08 14:05:38
-------------------------------------------
(u'11-59', 102)
(u'11-68', 107)
(u'11-60', 93)
(u'11-142', 145)
(u'11-64', 150)
(u'11-61', 71)
(u'11-143', 155)
(u'11-65', 131)
(u'11-69', 110)
(u'11-140', 71)
...
I think, variable i
is being passed by reference, but I need to pass it by value. How do I get this to work with the for loop picking up both filters correctly.
Thanks!