0

I'm prototyping a fraud application. We'll frequently have metrics like "total amount of cash transactions in the last 5 days" that we need to compare against some threshold to determine if we raise an alert.

We're looking to use Kafka Streams to create and maintain the aggregates and then create an enhanced version of the incoming transaction that has the original transaction fields plus the aggregates. This enhanced record gets processed by a downstream rules system.

I'm wondering the best way to approach this. I've prototyped creating the aggregates with code like this:

TimeWindows twoDayHopping TimeWindows.of(TimeUnit.DAYS.toMillis(2))
               .advanceBy(TimeUnit.DAYS.toMillis(1));
KStream<String, AdditiveStatistics> aggrStream = transactions
    .filter((key,value)->{
        return value.getAccountTypeDesc().equals("P") &&
               value.getPrimaryMediumDesc().equals("CASH");

    })
    .groupByKey()
    .aggregate(AdditiveStatistics::new,
               (key,value,accumulator)-> {                 
                   return AdditiveStatsUtil
                     .advance(value.getCurrencyAmount(),accumulator),
                              twoDayHopping,
                              metricsSerde,
                              "sas10005_store")
                } 
     .toStream()
     .map((key,value)-> {
                value.setTransDate(key.window().start());
                return new KeyValue<String, AdditiveStatistics>(key.key(),value);
            })
     .through(Serdes.String(),metricsSerde,datedAggrTopic);;

This creates a store-backed stream that has a records per key per window. I then join the original transactions stream to this window to produce the final output to a topic:

  JoinWindows joinWindow = JoinWindows.of(TimeUnit.DAYS.toMillis(1))
                                        .before(TimeUnit.DAYS.toMillis(1))
                                        .after(-1)
                                        .until(TimeUnit.DAYS.toMillis(2)+1);
    KStream<String,Transactions10KEnhanced> enhancedTrans = transactions.join(aggrStream,
                      (left,right)->{
                            Transactions10KEnhanced out = new Transactions10KEnhanced();
                            out.setAccountNumber(left.getAccountNumber());
                            out.setAccountTypeDesc(left.getAccountTypeDesc());
                            out.setPartyNumber(left.getPartyNumber());
                            out.setPrimaryMediumDesc(left.getPrimaryMediumDesc());
                            out.setSecondaryMediumDesc(left.getSecondaryMediumDesc());
                            out.setTransactionKey(left.getTransactionKey());
                            out.setCurrencyAmount(left.getCurrencyAmount());
                            out.setTransDate(left.getTransDate());
                            if(right != null) {
                                out.setSum2d(right.getSum());

                            }
                            return out;
                       },
                       joinWindow);

This produces the correct results, but it seems to run for quite a while, even with a low number of records. I'm wondering if there's a more efficient way to achieve the same result.

  • Care to elaborate? For example, what do you mean by "it seems to run for a quite a while" -- the application should be running continuously rather than "run for a while then stop"? Do you mean processing a new input record takes longer than you'd expect? If so, how long is "quite a while" -- a few seconds, a minute, an hour? – miguno Jun 20 '17 at 18:11
  • I put a sleep in the main() method to allow the background threads to process the input data (16 input records with the same key). I have to set it to over 30 seconds to see any output to the topic. It appears that if the program isn't left to run that long, it doesn't have enough time to actually process this few records. Wondering if there's a lot of network/inter-process shuffling going on to cause this. – Tim Stearn Jun 21 '17 at 15:43

1 Answers1

1

It's a config issues: cf http://docs.confluent.io/current/streams/developer-guide.html#memory-management

Disable caching by setting cache size to zero (parameter cache.max.bytes.buffering in StreamsConfig) will resolve the "delayed" delivery to the output topic.

You might also read this blog post for some background information about Streams design: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thanks for the info. This did make output come faster. However, aggrStream that's the right side of the join has multiple records for each key/window. My intention was to have one record per key/window so that I get one record on the output stream for each transaction coming in. If I could get differing numbers of aggregate records depending on my memory settings, not sure how I could depend on the results being accurate. Makes me wonder if I've taken the wrong approach. – Tim Stearn Jun 22 '17 at 06:10
  • BTW, I do understand that what I get for the aggregate depends on the timing/order of the events flowing through. I just want to find a way to make sure that I'm joining to only a single key/window for any incoming event. In other words, I want to join to only the latest key/window combinations. – Tim Stearn Jun 22 '17 at 06:18
  • Accepted the answer above - thx. Might need to post a separate question, but as my comments above indicate, this config change does allow me to get the response time I need, but reveals that the join method I'm using won't work because I can have multiple update events to the datedAggrTopic stream, which means my join is no longer 1:1. Tried a few different approaches, but still haven't figured this part out. – Tim Stearn Jun 22 '17 at 15:23
  • Compare: https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable/38945277#38945277 – Matthias J. Sax Jun 22 '17 at 16:32
  • Thanks Matthias. So given my goals, would you suggest using "manual de-duplication" in the lower level Kafka API as you do on that track? At some point, I still need to do a Stream-->Stream join to marry my original event up to the correct aggregate window - I just want to avoid joining to prior state updates. – Tim Stearn Jun 22 '17 at 18:01
  • This might work. But if you strictly want one output per window, than you need to figure out how to handle late arriving data. – Matthias J. Sax Jun 23 '17 at 08:10
  • I agree I'll have to look at that too. Still just trying to get this to work with on time data. Doesn't seem like windowing was built with this use case mind. Am I tilting at windmills trying to accomplish this with windowed aggregates? – Tim Stearn Jun 23 '17 at 11:21
  • Windows at DSL level are not designed for this use case -- they focus on event-time processing with late-arriving/out-of-order data; this is was make sense for most use cases. However, using a transform() with an attached windowed store should not be too hard to get to work. You can also look into the implementation of the DSL windows and c&p what make sense for your case. – Matthias J. Sax Jun 24 '17 at 03:24
  • Thanks for pulling me off the pure DSL track. Above you mentioned transform(), but I think you meant process(), as documented here: https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KStream.html#process(org.apache.kafka.streams.processor.ProcessorSupplier,%20java.lang.String...). I could see where calling process() on the transactions stream and accessing the window state store with windowStore.fetch() could work. Is that what you had in mind? Since I'd be querying the store rather than the change log, I should avoid dupes right? – Tim Stearn Jun 24 '17 at 09:59
  • So, I tried using KStream.prcoess(), but realized it doesn't generate a sink, so I can't get any of the data I produce to go downstream to a topic. I've started doing everything with the Processor API. I attempting to use the OOTB processors, particularly KStreamWindowAggregateProcessor. I can't figure out how to construct the org.apache.kafka.streams.state.WindowStore it needs. Could you provide/point me to an example? – Tim Stearn Jun 24 '17 at 19:47
  • I used this invocation. Not sure if the parameters are set the the correct values, but seems to work: StateStoreSupplier windowStore = new RocksDBWindowStoreSupplier(windowStoreName,TimeUnit.DAYS.toMillis(1),2,false,Serdes.String(),metricsSerde,TimeUnit.DAYS.toMillis(2),true,logConfig,true); – Tim Stearn Jun 24 '17 at 20:44
  • Since comments in StackOverflow don't accept formatting, going to ask folllow up questions in 3 comments below. – Tim Stearn Jun 24 '17 at 21:12
  • 1. Is the RocksDBWindowStoreSupplier construction above OK? Wasn't sure on the right settings for numSegments and retainDuplicates. – Tim Stearn Jun 24 '17 at 21:13
  • 2. Was able to use the OOTB KStreamWindowAggregate processor directly. However, KStreamFilter is only package public, so I had to copy the code. Is there some reason all these processors can't be public to allow reuse in the Processor API? – Tim Stearn Jun 24 '17 at 21:16
  • About process() vs transform() -- the plain processor API only support process() but you can mix-in processor API into the DSL and within the DSL you have process(), transform(), and transformValues() (cf http://docs.confluent.io/current/streams/developer-guide.html#applying-processors-and-transformers-processor-api-integration) -- transform() allows to produce downstream ;). The store supplier setting are ok. – Matthias J. Sax Jun 25 '17 at 19:25
  • `numSegments` must be at least 2 -- this is used to delete stuff based on retention time. Each segment is it's own RocksDB under the hood. So we can drop whole segments if all data in a segment did expire. Your "retention time" span is divided by the number of segments -- so less segments mean more coarse grained expiry, while more segments means more fined grained expiry. But of course, you also get more or less RocksDB instances. It's a pure tuning parameter, by default, we set it to 3 in DSL. --- About duplicates -- this would allow you to store a key multiple times, thus, false is correct. – Matthias J. Sax Jun 25 '17 at 19:29
  • About public/private: IMHO it's ok that it's private and I would recommend to c&p all code that you use from internal packages. Otherwise, on upgrading you might break your application... All classes of package `internal` are not part of public API can might change in a non-backwards compatible way anytime. – Matthias J. Sax Jun 25 '17 at 19:31
  • Understood - what I'm saying is that clearly these have utility for use in the Processor API. – Tim Stearn Jun 25 '17 at 22:32
  • 1
    OK - done;). I was using 3.2.1 before, so didn't know about the transformValues() method. I was able to roll back to using the DSL and call a ValueTransformer. Get exactly the output I need! I retract my comment about making some of those internal classes public - since I can do this through DSL, they're implementation details I don't need to know. THANKS FOR ALL YOUR HELP! – Tim Stearn Jun 26 '17 at 04:58