1

I would like to insert pyspark dataframe content to Redis in an effective way. Trying a couple methods but none of them are giving expected results.

  1. Converting df to json takes 30 seconds. The goal is to SET the json payload into Redis cluster for consumption.

  2. I'm also trying to make use of spark-redis https://github.com/RedisLabs/spark-redis/blob/master/doc/python.md library to insert the results into Redis, so that the results are inserted into Redis by all the worker nodes to see if it makes a huge difference. Even this process takes the same amount of time to get the results inserted into Redis

I'm looking for experts suggestions on how to clear my bottleneck and see if I can bring it up to less than 5 seconds, Thanks.

I'm using EMR cluster with 1+4 nodes with 16 cores and 64 Gigs memory each.

js = json.dumps(df.toJSON().collect()) #takes 29 seconds
redis.set(key1, js) #takes 1 second

df.write.format("org.apache.spark.sql.redis").option("table", key1).mode('append').save()   #takes 28 seconds

the first two lines of code to convert the df into json taking 29 seconds and setting into redis taking 1 second.

or

last line of code uses worker nodes to insert the df content directly into Redis, but takes like 28 seconds.

  • 1
    Could you check the partition number of df? – abiratsis Jun 18 '19 at 05:54
  • 1
    As @AlexandrosBiratsis said, check the number of partitions - the default is 200 and it is usually not optimal at all. Try a number like 4*16*2 or 4*16 or even less (partitions depend on your cores). Also, could you please share the size of the data you need to insert into redis? – mkaran Jun 18 '19 at 06:54
  • hey Alex/Karan , the data I'm trying to insert here is little less than 5MB, its pretty small, I'm currently running 30 partitions, which is giving better performance than 200. This is the configuration I'm using currently spark-submit --master yarn --deploy-mode cluster --executor-memory 6g --executor-cores 5 --num-executors 6 --driver-memory 15g --conf spark.sql.autoBroadcastJoinThreshold=209715200 --conf spark.sql.shuffle.partitions=30 --conf spark.default.parallelism=30 – user2407164 Jun 18 '19 at 23:55
  • How long does the following take? df.foreach(lambda x: x) And what is a size of the dataframe? – fe2s Jun 19 '19 at 20:10
  • Hi there, did you solve your issue? Indeed since your data is very small a parallelization of 4-8 would be sufficient. Although you need to ensure that this parallelization is being successfully executed – abiratsis Jun 22 '19 at 09:40

0 Answers0