I have written a spark program (Python 3.6 and Spark 2.3.2) for Collaborative Filtering Recommendation System that works on 2 cases:
- Case 1: Item-based CF recommendation system
- Case 2: User-based CF recommendation system with Min-Hash LSH
I have written train and predict programs that has these 2 cases. My code works for user based recommendation but when I try to train my model for Item-based CF, I get the following error:
2020-10-18 20:12:33 ERROR Executor:91 - Exception in task 0.0 in stage 23.0 (TID 196)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 238, in main
File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\serializers.py", line 690, in read_int
length = stream.read(4)
File "C:\Users\17372\AppData\Local\Programs\Python\Python36\lib\socket.py", line 586, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
I tried solving this issue using solutions on this link: Pyspark socket timeout exception after application running for a while
It did not work.
I found a solution to add "--spark.worker.timeout=120" in execution as follows:
bin\spark-submit task3train.py train_review.json task3item.model item_based --spark.worker.timeout=120
I still see the same error. Tried Try Catch blocks as well, but I am not sure how to do it right.
What do I do?
My code for Item-based CF:
if model_type == ITEM_BASED_MODEL:
# group original data by bidx, and remove those unpopular business (rated time < 3)
# tuple(bidx, (uidx, score))
# [(5306, [(3662, 5.0), (3218, 5.0), (300, 5.0),..]), ()
shrunk_bid_uids_rdd = input_lines \
.map(lambda kv: (bus_index_dict[kv[1]], (user_index_dict[kv[0]], kv[2]))) \
.groupByKey().mapValues(lambda uid_score: list(uid_score)) \
.filter(lambda bid_uid_score: len(bid_uid_score[1]) >= CO_RATED_THRESHOLD) \
.mapValues(lambda vals: [{uid_score[0]: uid_score[1]} for uid_score in vals]) \
.mapValues(lambda val: flatMixedList(val))
candidate_bids = shrunk_bid_uids_rdd.map(lambda bid_uids: bid_uids[0]).coalesce(2)
# convert shrunk_bid_uids_rdd into dict form
# dict(bidx: dict(uidx: score))
# => e.g. {5306: defaultdict(<class 'list'>, {3662: 5.0, 3218: 5.0, 300: 5.0...}),
bid_uid_dict = shrunk_bid_uids_rdd \
.map(lambda bid_uid_score: {bid_uid_score[0]: bid_uid_score[1]}) \
.flatMap(lambda kv_items: kv_items.items()).collectAsMap()
# generate all possible pair between candidate bidx
# and compute the pearson similarity
candidate_pair = candidate_bids.cartesian(candidate_bids) \
.filter(lambda id_pair: id_pair[0] < id_pair[1]) \
.filter(lambda id_pair: existNRecords(bid_uid_dict[id_pair[0]],
bid_uid_dict[id_pair[1]])) \
.map(lambda id_pair: (id_pair,
computeSimilarity(bid_uid_dict[id_pair[0]],
bid_uid_dict[id_pair[1]]))) \
.filter(lambda kv: kv[1] > 0) \
.map(lambda kv: {"b1": reversed_index_bus_dict[kv[0][0]],
"b2": reversed_index_bus_dict[kv[0][1]],
"sim": kv[1]})