1

I want to create a global cache in Spark Streaming. This cache will contain elements for 24 hours. Since the cache will be updated by all the executors how do we maintain a global state of cache where cache is updated and refreshed dynamically.

Can I create a global RDD that is cached in MEMORY_AND_DISK and refresh across every Spark Streaming batches.

Alchemist
  • 849
  • 2
  • 10
  • 27
  • Did you already evaluate 'StateDStream' and 'mapWithState' ? – maasg Jun 16 '17 at 13:33
  • thanks so much for your response. I am using Spark 1.5 these APIs are introduced in 1.6 Spark. And I am thinking of using timer based Guava caching library to manage global cache. Only thing is how to make this cache distributed across different executors. – Alchemist Jun 16 '17 at 13:43
  • 2
    I would advise an upgrade. 1.5 is pretty old by now. Guava cache will not work - it's local to one JVM. If you plan to do that on your own, you need an external service, like EHCache (clustered), alluxio or even an external 'fast' db like Redis. – maasg Jun 16 '17 at 13:52
  • We are using CDH5.5.1 with Spark Streaming. We were having issues when we wanted to upgrade to CDH5.8. Thanks so much for your advice. – Alchemist Jun 16 '17 at 14:04
  • You could consider this method as well, but expiration is going to be tricky: https://stackoverflow.com/a/44437251/764040 – maasg Jun 16 '17 at 14:56
  • Thanks so much!! So you are suggesting that each time we process a batch we create a new "history" RDD by removing expired entry and persisting that history RDD in DISK cache. Next time when a new batch comes in we join this history RDD with the current batch to create a dynamic cache. – Alchemist Jun 16 '17 at 16:59
  • That could work, but will depend very much of the size of data you have. Also, the context of the other question was removing already matched items, so the dataset tends to be stable in size. So, it will very much depend on your context. – maasg Jun 16 '17 at 17:08
  • Agree thanks for your response Maasg. thanks so much for your answers. – Alchemist Jun 16 '17 at 18:18

2 Answers2

0

Expanding a bit on what @maasg added in comments, the best solution for what your proposing is probably a 3rd party datastore that integrates or connects to Spark. He mentioned a few, Alluxio, Redis, EhCache. I thought I would link you to a previous answer of mine that tries to catalog many of them and provide a (small) bit of context.

plamb
  • 5,636
  • 1
  • 18
  • 31
  • Thanks so much for the details plambre. I need to manage one day data in cache. I will try to see if I can create in memory cache of last 24 hours of data. Goal is to not use extra storage and tools if I can do without them. Thanks again – Alchemist Jun 17 '17 at 18:51
0

Third-party cache's like Redis, EhCache etc will not help us in managing a changing cache within spark especially when the changing cache is a machine-learning active-learning based system's cache. For example, k-means grouping will have to keep on learning cluster centroids and update them after each iteration. If you are also dealing with a cache of this nature, then one can think of following two main steps for this:

  1. Broadcast the data to have a local cached copy per executor and
  2. Iteratively keep refining this broadcasted cache.

But, because broadcasted variable is final and cannot be changed, I took an alternative route to solve this where the cached data is not broadcasted. It basically involves learning local corpus per partition and aggregating partition-wise corpus into one.

please refer to my blog part1-talks about keeping track of changing cache where I've demoed how to aggregate partition-wise corpus. Hope it helps

Sruthi Poddutur
  • 1,371
  • 13
  • 7