1

This is a part of my spark app. The first part is the part where I get all the articles within the last 1 hour and the second part of the code grabs all these articles comments. The third part adds the comments to the articles. The problem is that the articles.map(lambda x:(x.id,x.id)).join(axes) part is too slow, it takes around 1 minute. I would like to improve this to 10 seconds or even less but don't know how to? Thanks for your reply.

articles = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="articles", keyspace=source).load() \
                        .map(lambda x:x).filter(lambda x:x.created_at!=None).filter(lambda x:x.created_at>=datetime.now()-timedelta(hours=1) and x.created_at<=datetime.now()-timedelta(hours=0)).cache()

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().map(lambda x:(x.article,x))

speed_rdd = articles.map(lambda x:(x.id,x.id)).join(axes)

EDIT

This is my new code, which I changed according to your suggestions. It is now already 2 times as fast as before, so thanks for that ;). Just another improvement I would like to make with the last part of my code in the axes part, which is still too slow and needs 38 seconds for 30 million data:

range_expr = col("created_at").between(
                            datetime.now()-timedelta(hours=timespan),
                            datetime.now()-timedelta(hours=time_delta(timespan))
                        )
        article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').persist()


        axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()

I tried this here (which should substitute the last axes part of my code) and this is also the solution I would like to have but it doesn't seem to work properly:

in_expr = col("article").isin(article_ids.collect())
        axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().where(in_expr)

I always get this error message:

in_expr = col("article").isin(article_ids.collect())
Traceback (most recent call last):                                              
  File "<stdin>", line 1, in <module>
TypeError: 'Column' object is not callable

Thanks for your help.

peter
  • 674
  • 1
  • 12
  • 33

2 Answers2

3

1) Predicate Pushdown is automatically detected by the Spark-Cassandra connector, as long as the filtering is possible in Cassandra (using primary key for filtering or secondary index): https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushing-down-clauses-to-cassandra

2) For more efficient joins, you can call the method repartitionByCassandraReplica. Unfortunately this method may not be available for PySpark, only for Scala/Java API. Read the doc here: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12

3) Another hint is to try to debug and understand how the connector is creating Spark partitions. There are some examples and caveats mentioned in the docs: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md

doanduyhai
  • 8,712
  • 27
  • 26
2

As mentioned before if you want to achieve reasonable performance don't convert your data to RDD. It not not only makes optimizations like predicate pushdown impossible, but also introduces as huge overhead of moving data out of JVM to Python.

Instead you should use use SQL expressions / DataFrame API in a way similar to this:

from pyspark.sql.functions import col, expr, current_timestamp

range_expr = col("created_at").between(
    current_timestamp() - expr("INTERVAL 1 HOUR"),
    current_timestamp())

articles = (sqlContext.read.format("org.apache.spark.sql.cassandra")
    .options(...).load()
    .where(col("created_at").isNotNull())  # This is not really required
    .where(range_expr))

It should be also possible to formulate predicate expression using standard Python utilities as you've done before:

import datetime

range_expr = col("created_at").between(
    datetime.datetime.now() - datetime.timedelta(hours=1),
    datetime.datetime.now()
)

Subsequent join should be performed without moving data out of data frame as well:

axes = (sqlContext.read.format("org.apache.spark.sql.cassandra")
    .options(...)
    .load())

articles.join(axes, ["id"])
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    a) `isin` has been introduced in 1.5 b) I am pretty sure what you want is to flatten this `article_ids.collect()` first. – zero323 May 31 '16 at 08:42
  • I tried to flatten it but it didn't really increase speed. When loading, this part sqlContext.read.format has a default partition number of 255 and I would like to make it smaller, since this part reads from cassandra according to partitions is slow, but don't know how to. Any ideas? Thanks – peter Jun 07 '16 at 01:50