This is part of my spark code which is very slow. By slow I mean for 70 Million data rows it takes almost 7 minutes to run the code but I need it to run in under 5 seconds if possible. I have a cluster with 5 spark nodes with 80 cores and 177 GB memory of which 33Gb are currently used.
range_expr = col("created_at").between(
datetime.now()-timedelta(hours=timespan),
datetime.now()-timedelta(hours=time_delta(timespan))
)
article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2)
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load()
#article_ids.join(axes,article_ids.article==axes.article)
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \
.map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \
.map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
.filter(lambda x:len(x[1])>=2) \
.map(lambda x:x[1][-1]) \
.map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))
I believe especially this part of the code is particularly slow:
sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load()
When put in spark it transforms into this which I think causes it to be slow :
javaToPython at NativeMethodAccessorImpl.java:-2
Any help would really be appreciated. Thanks
EDIT
The biggest speed problem seems to be JavatoPython. The attached picture is only for part of my data and is already very slow.
EDIT (2)
About len(x1)>=2
:
Sorry for the long elaboration but I really hope I can solve this problem, so making people understand a quite complex problem in detail is crucial:
this is my rdd example:
rdd1 = [(1,3),(1,5),(1,6),(1,9),(2,10),(2,76),(3,8),(4,87),(4,96),(4,109),(5,10),(6,19),(6,18),(6,65),(6,43),(6,81),(7,12),(7,96),(7,452),(8,59)]
After the spark transformation rdd1 has this form: rdd_result = [(1,9),(2,76),(4,109),(6,81),(7,452)] the result does not contain (3,8),(5,10) because the key 3 or 5 only occur once, I don't want the 3 or 5 to appear.
below is my program:
first:rdd1 reduceByKey then the result is:
rdd_reduceByKey=[(1,[3,5,6,9]),(2,[10,76]),(3,[8]),(4,[87,96,109]),(5,[10]),(6,[19,18,65,43,81]),(7,[12,96,452,59]))]
second:rdd_reduceByKey filter by len(x1)>=2 then result is:
rdd_filter=[(1,[3,5,6,9]),(2,[10,76]),(4,[87,96,109]),(6,[19,18,65,43,81]),(7,[12,96,452,59]))]
so the len(x1)>=2 is necessary but slow.
Any recommendation improvements would be hugely appreciated.