1

This is part of my python-spark code which parts of it run too slow for my needs. Especially this part of the code, which I would really like to improve it's speed but don't know how to. It currently takes around 1 minute for 60 Million data rows and I would like to improve it to under 10 seconds.

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 

More context of my spark app:

article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2)

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \
     .map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \
     .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \
     .filter(lambda x:len(x[1])>=2) \
     .map(lambda x:x[1][-1]) \
     .map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))    

Thanks a lot for your suggestions.

EDIT:

Count takes up most of the time (50s) not join

I also tried increasing parallelism with but it didn't have any obvious effect:

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

and

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()

This is the picture from spark showing how long each operation takes

peter
  • 674
  • 1
  • 12
  • 33

1 Answers1

4

First you should figure out what's actually taking the most amount of time.

For example determine how long just reading the data takes

axes = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(table="axes", keyspace=source)
  .load()
  .count()

Increasing the parallelism or number of parallel readers may help this but only if you aren't maxing out the IO of your Cassandra Cluster.

Second, see if you can do everything with the Dataframes api. Every-time you use a python lambda you are incurring serialization costs between the python and scala types.

Edit:

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

Will only take effect after the load has completed so this won't help you.

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load()

Is not a valid parameter for the Spark Cassandra Connector so this won't do anything.

See https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters Input Split Size determines how many C* partitions to put in a Spark Partition.

RussS
  • 16,476
  • 1
  • 34
  • 62
  • I added some details above I tried increasing parralellsim with these 2 methods in the edit above but it didn't have any effect. Could you specifiy what you mean by everything in the dateframes API? Thanks – peter Jun 12 '16 at 06:23
  • 2
    @Peter I provided you with a link to the approach using only DataFrames [previous time](http://stackoverflow.com/a/37507116/1560062). – zero323 Jun 12 '16 at 12:25
  • @zero323 I tried using only dateframes, but dateframe doesn't seem to have a keyby and reducebykey method, therefore it seems i would need to go back to using RDD. I get this error message when I try: AttributeError: 'DataFrame' object has no attribute 'keyBy'. Any idea what to do? thanks – peter Jun 13 '16 at 03:08
  • 2
    @peter Trust me. If you follow the links you'll find the code you need to achieve the same result. – zero323 Jun 13 '16 at 03:47
  • @zero323 I added your dateframe advice could you please take a look at this question. Thanks a lot! http://stackoverflow.com/questions/37848388/how-can-i-filter-filterlambda-xlenx1-2-in-dataframe/37848771#37848771 – peter Jun 17 '16 at 01:56