4

I have a large list of values, drawn from the range 0 to 100,000 (represented here as letters for clarity). There might be a few thousand items in each input.

[a a a a b b b b c f d b c f ... ]

I want to find the count of numbers with counts over certain threshold. For example, if the threshold is 3, the answer is {a: 4, b: 5}.

The obvious way to do this is to group by identity, count each grouping and then filter.

This is a language agnostic question, but in Clojure (don't be put off if you don't know Clojure!):

(filter (fn [[k cnt]] (> cnt threshold)) (frequencies input))

This function runs over a very large number of inputs, each input is very large, so the grouping and filtering is an expensive operation. I want to find some kind of guard function that will return early if the input can never produce any outputs over the given threshold or otherwise partition the problem space. For example, the most simplistic is if the size of the input is less than the size of the threshold return nil.

I'm looking for a better guard function that will skip the computation if the input can't produce any outputs. Or a quicker way to produce the output.

Obviously it has to be less expensive than the grouping itself. One great solution involved the count of the input by the distinct set of inputs but that ended up being as expensive as grouping...

I have an idea that probabilistic data structures might hold the key. Any ideas?

(I tagged hyerloglog, although I don't think it applies because it doesn't provide counts)

Thumbnail
  • 13,293
  • 2
  • 29
  • 37
Joe
  • 46,419
  • 33
  • 155
  • 245
  • Do you know anything about the distribution? You mentioned it is just between 0 and 100k. – Thomas Jungblut Oct 11 '15 at 20:41
  • I think the whole question boils down to 'what is the distribution?'. Ideal case a small number of clusters of size = threshold, worst case even distribution of one each. – Joe Oct 11 '15 at 20:42
  • 1
    Did you try to sample 1-10% of the data and see how accurate this would be? – Thomas Jungblut Oct 11 '15 at 20:47
  • Interesting idea, thanks. I'll look at the numbers and see if it fits. – Joe Oct 11 '15 at 20:49
  • Let me know :) Otherwise there are sketching data structures which give you a rough idea of the frequency of the most common elements using some hashing tricks. (http://lkozma.net/blog/sketching-data-structures/) – Thomas Jungblut Oct 11 '15 at 20:54
  • Do you run many queries on the same set of inputs? If so, it's probably worth a one-off preprocessing step in which, within each input, you first count frequencies in each group, and then sort these frequencies in decreasing order. Then, sort the inputs themselves in lexicographically decreasing order. Together, this will mean that (a) within an input, you can read all satisfying frequencies from the left end and stop as soon as the frequency dips below the threshold, and (b) as soon as the leftmost frequency in an input is below the threshold, you can totally stop. – j_random_hacker Oct 12 '15 at 12:10

3 Answers3

1

You might like to look at Narrator. It's designed for 'analyzing and aggregating streams of data'.

A simple query-seq to do what you're initially after is:

(require '[narrator.query :refer [query-seq query-stream]])
(require '[narrator.operators :as n])

(def my-seq [:a :a :b :b :b :b :c :a :b :c])
(query-seq (n/group-by identity n/rate) my-seq)
==> {:a 3, :b 5, :c 2}

Which you can filter as you suggested.

You can use quasi-cardinality to quickly determine the number of unique items in your sample (and thus your partition question). It uses HyperLogLog cardinality estimation algorithm for this, e.g.

(query-seq (n/quasi-cardinality) my-seq)
==> 3

quasi-frequency-by demonstrated here:

(defn freq-in-seq
  "returns a function that, when given a value, returns the frequency of that value in the sequence s
   e.g. ((freq-in-seq [:a :a :b :c]) :a)  ==> 2"
  [s]
  (query-seq (n/quasi-frequency-by identity) s))

((freq-in-seq my-seq) :a) ==> 3

quasi-distinct-by:

(query-seq (n/quasi-distinct-by identity) my-seq)
==> [:a :b :c]

There's also real-time stream analysis with query-stream.

Here's something showing you how you can sample the stream to get count of changes over 'period' values read:

(s/stream->seq 
  (->> my-seq
       (map #(hash-map :timestamp %1 :value %2) (range))
       (query-stream (n/group-by identity n/rate) 
                     {:value :value :timestamp :timestamp :period 3})))
==> ({:timestamp 3, :value {:a 2, :b 1}} {:timestamp 6, :value {:b 3}} {:timestamp 9, :value {:a 1, :b 1, :c 1}} {:timestamp 12, :value {:c 1}})

The result is a sequence of changes every 3 items (period 3), with the appropriate timestamp.

You can also write custom stream aggregators, which would probably be how you go about accumulating the values in the stream above. I had a quick go with these, and failed abysmally to get it working (only on my lunch break at moment), but this works in its place:

(defn lazy-value-accum
  ([s] (lazy-value-accum s {}))
  ([s m]
   (when-not (empty? s)
     (lazy-seq
      (let [new-map (merge-with + m (:value (first s)))]
        (cons new-map
              (lazy-value-accum (rest s) new-map))))))


(lazy-value-accum
  (s/stream->seq 
    (->> my-seq
         (map #(hash-map :timestamp %1 :value %2) (range))
         (query-stream (n/group-by identity n/rate) 
                       {:value :value :timestamp :timestamp :period 3}))))
==> ({:a 2, :b 1} {:a 2, :b 4} {:a 3, :b 5, :c 1} {:a 3, :b 5, :c 2})

which shows a gradually accumulating count of each value after every period samples, which can be read lazily.

Mark Fisher
  • 9,838
  • 3
  • 32
  • 38
  • Thanks very much! I'll have a good look at that. I'm avoiding parallelism because this process itself many times in a `pmap` (although it may be that running the outer loop sequentially and having parallel operations inside may be better). – Joe Oct 12 '15 at 15:46
  • The stream aggregators allow for parallelisation by providing hooks to merge data from multiple threads, if that's how they are being aggregated. – Mark Fisher Oct 12 '15 at 19:37
  • Thanks for your effort. I've not had time to fully concentrate on it, might be a few days. – Joe Oct 13 '15 at 10:21
1

what about using partition-all to produce a lazy list of partitions of maximum size n, apply frequencies on each partition, merge them and then filter final map?

(defn lazy-count-and-filter
  [coll n threshold]
  (filter #(< threshold (val %))
          (apply (partial merge-with +) 
                 (map frequencies 
                      (partition-all n coll)))))

ex:

(lazy-count-and-filter [:a :c :b :c :a :d :a] 2 1)
==> ([:a 3] [:c 2])
0

If you're looking to speed up the work on a single node, consider reducers or core.async, as this blog post illustrates.

If this is a very large dataset, and this operation is needed frequently, and you have resources to have a multi-node cluster, you could consider setting up either Storm or Onyx.

Realistically, it sounds like reducers will give you the most benefit for the least amount of work. With all the options that I've listed, the solutions that are more powerful/flexible/faster require more time upfront to understand. In order of simplest to most powerful, they are reducers, core.async, Storm, Onyx.

Elango
  • 78
  • 5