1

I am implementing an UDAF according to UDAF example. the update phase there looks like this:

    public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
        String inputKey = input.getString(0);
        Map<String, Long> inputValues = input.<String, Long>getJavaMap(1);
        Map<String, Map<String, Long>> newData = new HashMap<>();

        if (!buffer.isNullAt(0)) {
            Map<String, Map<String, Long>> currData = buffer.<String, Map<String, Long>>getJavaMap(0);
            newData.putAll(currData);
        }
        newData.put(inputKey, inputValues);
        buffer.update(0, newData);
    }
}

You can see that on every step a new HashMap is created (newData) and the data from the previous buffer is copied into it. It looks like an awful waste, having to create new Maps and copying all the elements. So I tried (in my case I have a map with a slightly different types):

bufferJavaMap = buffer.<String, Integer>getJavaMap(0);
bufferJavaMap.put("aaaa", 1);
buffer.update(0, bufferJavaMap);

I receive the following error:

java.lang.UnsupportedOperationException
   at java.util.AbstractMap.put(AbstractMap.java:209)
   at dns.MergeMapUDAF.update(MergeMapUDAF.java:84)

Isn't it possible to update the existing Map? what is the best method update this Map?

zero323
  • 322,348
  • 103
  • 959
  • 935
antonpuz
  • 3,256
  • 4
  • 25
  • 48

1 Answers1

1

Isn't it possible to update the existing Map?

It is not possible, but the problem is more complex than the one identified in your. Spark makes a full copy of the structure on both get and update so even removing explicit copy wouldn't resolve the problem.

If performance is required, you should avoid using UserDefinedAggregateFunction with non-atomic types.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • thx for the answer, if UDAF should be avoided, I have a usecase where each row has a list of unique names. I want to find a unique set of all those names. So I can use UDAF to aggregate the names in to Set or use explode->functions.collect_set. The latter sounds like a great load on Spark (copying the row by the number of items in the list). Under those assumptions, is the explode version still better than the UDAF? – antonpuz Jan 04 '18 at 08:19
  • If duplicates are rare I would collect_list and flatten to set, if there are common I'd benchmark and consider using RDD combineByKey. – Alper t. Turker Jan 04 '18 at 17:24