1

I have a Kafka Stream open in my pySpark code like so:

ssc.checkpoint('ckpt')
mystream = KafkaUtils.createStream(ssc, K_HOST, "sample", {"logs_queue": 1})

Now I am trying to filter this stream into 2 separate streams basis of some condition. If I run the following code, I get the 2 streams perfectly:

s1 = mystream.filter(lambda s: s['key'].startswith("11")).map(lambda s: (s['key'], 1)).reduceByKey(lambda a, b: a + b)
s1.pprint()
s2 = mystream.filter(lambda s: s['key'].startswith("12")).map(lambda s: (s['key'], 1)).reduceByKey(lambda a, b: a + b)
s2.pprint()

The output for the above statements is correct:

-------------------------------------------
Time: 2017-02-08 14:09:26
-------------------------------------------
(u'11-59', 201)
(u'11-142', 225)
(u'11-68', 151)
(u'11-64', 161)
(u'11-60', 152)
(u'11-69', 106)
(u'11-65', 196)
(u'11-61', 208)
(u'11-143', 158)
(u'11-140', 112)
...

-------------------------------------------
Time: 2017-02-08 14:09:26
-------------------------------------------
(u'12-14', 62)
(u'12-10', 73)
(u'12-36', 95)
(u'12-32', 106)
(u'12-18', 82)
(u'12-21', 107)
(u'12-25', 68)
(u'12-29', 111)
(u'12-15', 134)
(u'12-28', 59)
...

Now since only the filter is different in the above statement, I changed the above code to a for loop like so:

f = ["12", "11"]
for i in f:
    fs = mystream.filter(lambda s: s['key'].startswith(i)).map(lambda s: (s['key'], 1)).reduceByKey(lambda a, b: a + b)
    fs.pprint()

I was expecting the output to be same, but it was same for both pprints as shown below.

-------------------------------------------
Time: 2017-02-08 14:05:38
-------------------------------------------
(u'11-59', 102)
(u'11-68', 107)
(u'11-60', 93)
(u'11-142', 145)
(u'11-64', 150)
(u'11-61', 71)
(u'11-143', 155)
(u'11-65', 131)
(u'11-69', 110)
(u'11-140', 71)
...


-------------------------------------------
Time: 2017-02-08 14:05:38
-------------------------------------------
(u'11-59', 102)
(u'11-68', 107)
(u'11-60', 93)
(u'11-142', 145)
(u'11-64', 150)
(u'11-61', 71)
(u'11-143', 155)
(u'11-65', 131)
(u'11-69', 110)
(u'11-140', 71)
...

I think, variable i is being passed by reference, but I need to pass it by value. How do I get this to work with the for loop picking up both filters correctly.

Thanks!

Kunal Aggarwal
  • 301
  • 4
  • 18

0 Answers0