_pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
I'm working in implementing a page rank algorithm in spark
with a large data frame.
I'm using a udf
in order to calculate the algorithm:
new_pagerank_udf = udf(lambda x,y: new_pagerank(x,pageRankDF,y), FloatType())
The function:
def new_pagerank(links, current_pr, counters):
n_pr = 0;
#for each incoming link to the page calculate the pagerank
#zip the links and counters to get them both in the loop
for l, c in zip(links, counters):
#get current_pr of the link l
try:
current_link_pr=current_pr[current_pr['id']==l].PR.item()
except:
current_link_pr=0.85/N
n_pr += current_link_pr/c
new_pr = 0.85/N+0.15*n_pr
return new_pr
pageRankDF:
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 id 77 non-null int64
1 PR 77 non-null float64
Where the udf is called:
count=0;
display(pageRankPDF)
previous_pr_sum=0
while ((converged(previous_pr_sum,pageRankPDF["PR"].sum())==False) & (count<7)):
previous_pr_sum=pageRankPDF["PR"].sum()
NewPageRankDF=ReverseDF.select(
ReverseDF["id"],
new_pagerank_udf(ReverseDF["links"],ReverseDF["counters"]).alias("PR"))
pageRankDF=NewPageRankDF
display(pageRankDF)
count=count+1
ReverseDF:
root
|-- id: long (nullable = false)
|-- Links: array (nullable = false)
| |-- element: integer (containsNull = false)
|-- counters: array (nullable = false)
| |-- element: integer (containsNull = false)
The only way that i have found working is to convert the spark data frame
to a pandas
data frame using .toPandas()
, but is too slow for a large data frame.
Can i fix it without use a pandas df
?