-1

This is part of my spark code which is very slow. By slow I mean for 70 Million data rows it takes almost 7 minutes to run the code but I need it to run in under 5 seconds if possible. I have a cluster with 5 spark nodes with 80 cores and 177 GB memory of which 33Gb are currently used.

range_expr = col("created_at").between(
                            datetime.now()-timedelta(hours=timespan),
                            datetime.now()-timedelta(hours=time_delta(timespan))
                        )
article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2)

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load()

#article_ids.join(axes,article_ids.article==axes.article)
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \
     .map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \
     .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
     .filter(lambda x:len(x[1])>=2) \
     .map(lambda x:x[1][-1]) \
     .map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))

I believe especially this part of the code is particularly slow:

  sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="table", keyspace=source).load() 

When put in spark it transforms into this which I think causes it to be slow :

javaToPython at NativeMethodAccessorImpl.java:-2

This is mu spark ui

Any help would really be appreciated. Thanks

EDIT

The biggest speed problem seems to be JavatoPython. The attached picture is only for part of my data and is already very slow.

enter image description here

EDIT (2)

About len(x1)>=2:

Sorry for the long elaboration but I really hope I can solve this problem, so making people understand a quite complex problem in detail is crucial:

this is my rdd example:

rdd1 = [(1,3),(1,5),(1,6),(1,9),(2,10),(2,76),(3,8),(4,87),(4,96),(4,109),(5,10),(6,19),(6,18),(6,65),(6,43),(6,81),(7,12),(7,96),(7,452),(8,59)]

After the spark transformation rdd1 has this form: rdd_result = [(1,9),(2,76),(4,109),(6,81),(7,452)] the result does not contain (3,8),(5,10) because the key 3 or 5 only occur once, I don't want the 3 or 5 to appear.

below is my program:

first:rdd1 reduceByKey then the result is:

rdd_reduceByKey=[(1,[3,5,6,9]),(2,[10,76]),(3,[8]),(4,[87,96,109]),(5,[10]),(6,[19,18,65,43,81]),(7,[12,96,452,59]))]

second:rdd_reduceByKey filter by len(x1)>=2 then result is:

rdd_filter=[(1,[3,5,6,9]),(2,[10,76]),(4,[87,96,109]),(6,[19,18,65,43,81]),(7,[12,96,452,59]))]

so the len(x1)>=2 is necessary but slow.

Any recommendation improvements would be hugely appreciated.

peter
  • 674
  • 1
  • 12
  • 33
  • isn't this the same ? http://stackoverflow.com/questions/37770354/improve-speed-of-spark-app – eliasah Jun 29 '16 at 09:12
  • It is similar but I changed my app and I am unfortunately not able to solve this issue with the answers provided and some issues are also different than before. So all I can do is ask and try to explain more detailed. I also would appreciate if people who downvote would explain why they do so. – peter Jun 29 '16 at 09:31

1 Answers1

2

Few things I would to do if I meet performance issue.

  1. check spark web UI. Find the slowest part.
  2. The lambda function is really suspicious
  3. Check executor configuration
  4. Store some of the data in intermediate table.
  5. Compare the result if store data in parquet helps.
  6. Compare the if using Scala helps

EDIT:

Using Scala instead of Python could do the trick if the JavatoPython is the slowest.

Here is the code for finding the latest/largest. It should be NlogN, most likely close to N, since the sorting is on small data set.

import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

val data = Seq((1,3),(1,5),(1,6),(1,9),(2,10),
               (2,76),(3,8),(4,87),(4,96),(4,109),
               (5,10),(6,19),(6,18),(6,65),(6,43),
               (6,81),(7,12),(7,96),(7,452),(8,59))
val df = sqlContext.createDataFrame(data)
val dfAgg = df.groupBy("_1").agg(collect_set("_2").alias("_2"))

val udfFirst= udf[Int, WrappedArray[Int]](_.head)
val dfLatest = dfAgg.filter(size($"_2") > 1).
    select($"_1", udfFirst(sort_array($"_2", asc=false)).alias("latest"))
dfLatest.show()
Rockie Yang
  • 4,725
  • 31
  • 34
  • Thanks. Do you have any suggestions on how to change the lambda function? – peter Jun 29 '16 at 06:31
  • 1
    Not sure if I understand correct since the transformation is pretty complex. I guess the purpose is to get latest articles by axes.article. perhaps could consider, first group the axes itself by article and using max aggregation, then join. I don't understand the reason of len(x[1]) >=2. – Rockie Yang Jun 29 '16 at 07:05
  • I need this because I want to have a continuous graph afterwards with at least 2 data points since only 1 or 0 would not make for a good graph. So the method you proposed wouldn't really work. I would like to use dateframe here since it might increase speed but don't know how. Any thoughts? Thanks – peter Jun 29 '16 at 07:21
  • 1
    You are already using DataFrame. Even if it's not, that would not help much. len(x[1]) >=2 is test there are two articles, but only the last one will be taken? It might be faster, get the latest article, if want more, get the next latest one by first filter out the latest article. – Rockie Yang Jun 29 '16 at 07:45
  • I added an edit to my post above, hopefully it clarifies my thoughts. thanks – peter Jun 29 '16 at 08:20