2

Basically I am trying get all columns when I am using group by expression in my query.

Creating stream from a topic

CREATE STREAM events_stream \
      ( \
     account VARCHAR, \
     event_id VARCHAR, \
     user_name VARCHAR, \
     event_name VARCHAR, \
     source VARCHAR, \
     message VARCHAR, \
     timestamp STRUCT<iMillis INTEGER>) \
    WITH (KAFKA_TOPIC='console_failure', VALUE_FORMAT='JSON');

creating a table from above stream.

ksql> CREATE TABLE events_table AS \
      SELECT source, count(*) \
      FROM events_stream \
      WINDOW TUMBLING (SIZE 60 SECONDS) \
      WHERE account = '1111111111' \
                  GROUP BY source \
                  HAVING count(*) > 3;

Producing this message for 4 times.

ip="10.10.10.10"

data = {
        "account": "1111111111",
        "event_id": "4cdabe46-690d-494a-a37e-6e455781d8b4",
        "user_name": "shakeel",
        "event_name": "some_event",
        "source": "127.0.0.1",
        "message": "message related to event",
        "timestamp": {
            "iMillis": 1547543309000
             }
        }

producer.send('console_failure', key='event_json', value=dict(data)

This works as expected! But how to get other fields(ex:user_name, message etc) for the matched result ?

ksql> select * from events_table;
1550495772262 | 10.10.10.10 : Window{start=1550495760000 end=-} | 10.10.10.10 | 4
ksql> 

After using I understand is may be we cannot get other columns when using group by statement.

ksql> CREATE TABLE events_table1 AS \
>      SELECT source, event_id, \
>               count(*) \
>     FROM events_stream \
>     WINDOW TUMBLING (SIZE 60 SECONDS) \
>      WHERE account = '1111111111' \
>                  GROUP BY source \
>                  HAVING count(*) > 3;
Group by elements should match the SELECT expressions.
ksql>

Can we achieve this using rekeying the stream ?

After reading this I tried to rekey my stream with event_id field but not sure how I can use partition key in my group by statement.

Below is the error what I get when I am trying with rekey.

ksql> CREATE STREAM events_stream_rekey AS SELECT * FROM events_stream PARTITION BY event_id;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>
ksql> SELECT ROWKEY, EVENT_ID FROM events_stream_rekey;
4cdabe46-690d-494a-a37e-6e455781d8b4 | 4cdabe46-690d-494a-a37e-6e455781d8b4
ksql>

ksql> CREATE TABLE  events_table2 AS \
>      SELECT source, \
>               count(*), \
>     WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'),
>     WINDOW TUMBLING (SIZE 60 SECONDS) \
>      WHERE account = '1111111111' \
>                  GROUP BY source \
>                  HAVING count(*) > 3;
line 1:70: extraneous input 'WITH' expecting {'(', 'NOT', 'NO', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'CAST', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', '+', '-', '*', STRING, BINARY_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}


KSQL version details:CLI v5.1.0, Server v5.1.0

-------------------------- STEPS TO REPRODUCE --------------------------

Producer script:This script will generate 4 message in less than 30 seconds of window.

import time
import uuid
from kafka import KafkaProducer
from json import dumps

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x:
                         dumps(x).encode('utf-8'))

for i in range(1, 5):
    time.sleep(1)
    data = {
        "account": "1111111111",
        "event_id": str(uuid.uuid4()),
        "user_name": "user_{0}".format(i),
        "event_name": "event_{0}".format(i),
        "source": "10.0.9.1",
        "message": "message related to event {0}".format(i),
        "timestamp": {
            "iMillis": int(round(time.time() * 1000))
        }
    }
    time.sleep(2)
    producer.send('testing_topic', value=data)

On consuming messages from testing_topic(using a normal consumer script).

{'account': '1111111111', 'event_id': 'c186ba8a-2402-428a-a5d8-c5b8279e14af', 'user_name': 'user_1', 'event_name': 'event_1', 'source': '10.0.9.1', 'message': 'message related to event 1', 'timestamp': {'iMillis': 1551296878444}}
{'account': '1111111111', 'event_id': '4c45bff7-eb40-48a8-9972-301ad24af9ca', 'user_name': 'user_2', 'event_name': 'event_2', 'source': '10.0.9.1', 'message': 'message related to event 2', 'timestamp': {'iMillis': 1551296881465}}
{'account': '1111111111', 'event_id': '4ee14303-e6d1-4847-ae3d-22b49b3ce6eb', 'user_name': 'user_3', 'event_name': 'event_3', 'source': '10.0.9.1', 'message': 'message related to event 3', 'timestamp': {'iMillis': 1551296884469}}
{'account': '1111111111', 'event_id': '3c196ac5-9559-4269-bf51-95b78ce4ffcb', 'user_name': 'user_4', 'event_name': 'event_4', 'source': '10.0.9.1', 'message': 'message related to event 4', 'timestamp': {'iMillis': 1551296887472}}

Expected result: If the messages contains same source address within 30 seconds of window time for same account then I want to get next immediate complete messages(4th message in my case as shown below). Can this be achieved using KSQL ?

{'account': '1111111111', 'event_id': '3c196ac5-9559-4269-bf51-95b78ce4ffcb', 'user_name': 'user_4', 'event_name': 'event_4', 'source': '10.0.9.1', 'message': 'message related to event 4', 'timestamp': {'iMillis': 1551296887472}}
Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
Shakeel
  • 1,869
  • 15
  • 23

2 Answers2

0

The message itself actually tells you the problem :)

Group by elements should match the SELECT expressions.

So here, you've got source in both SELECT and GROUP BY:

ksql> SELECT source, count(*) \
>      FROM events_stream \
>      WINDOW TUMBLING (SIZE 60 SECONDS) \
>      WHERE account = '1111111111' \
>                  GROUP BY source \
>                  HAVING count(*) > 3;
127.0.0.1 | 4
^CQuery terminated

To add other columns, make sure you add them to the SELECT also:

ksql> SELECT source, event_id, count(*) \
>      FROM events_stream \
>      WINDOW TUMBLING (SIZE 60 SECONDS) \
>      WHERE account = '1111111111' \
>                  GROUP BY source, event_id \
>                  HAVING count(*) > 3;
127.0.0.1 | 4cdabe46-690d-494a-a37e-6e455781d8b4 | 4

Edit to answer your updated question:

I don't think that can [easily] be done in SQL (or KSQL). You might be able to achieve something similar by including the timestamp in the aggregate operation, something like:

CREATE TABLE source_alert AS \
SELECT source, COUNT(*), MAX(timestamp) \
FROM event_stream WINDOW TUMBLING (SIZE 60 SECONDS) \
GROUP BY `source` \
HAVING COUNT(*)>1

and then take the resulting table and join to the event stream:

SELECT * \
 FROM event_stream e \
      INNER JOIN \
      source_alert a ON e.source=a.source \
WHERE e.timestamp=a.timestamp

I've not tried this, but in principle, it might get you where you want to.

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
  • > Your are write Robin but in my case `event_id` will be distinct for every message so i would not group by with event_id. I need 'event_id` as key so that i can use to join with another table. > Can you share me any other better way to get all the columns of 4th message which will be result of the above `group by` query. – Shakeel Feb 20 '19 at 20:00
  • If you're not including `event_id` in your `GROUP BY` then it makes no logical sense to have it as the message key. You can't aggregate but still include a lower grain of data. Perhaps you need to update your question to more clearly illustrate what you're trying to do. – Robin Moffatt Feb 20 '19 at 21:38
  • Thanks Robin for replying, I am just looking for this kinda query "select * from table window tumbling (size 30 seconds) where account = '1111111111' group by source having count(*) > 3;". Let me know if it is still unclear, im happy to provide you more details about it. – Shakeel Feb 21 '19 at 08:38
  • Yes, please update your question with an example of your input data and the expected output that you want from your query. – Robin Moffatt Feb 25 '19 at 10:04
  • Appreciate your prompt reply, I have updated my question with some steps to reproduce and expected result. Please assist me if that can be achieved using KSQL ? – Shakeel Feb 27 '19 at 20:08
0

In addition to Robin's answer, this error:

line 1:70: extraneous input 'WITH' expecting {'(', 'NOT', 'NO', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'CAST', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', '+', '-', '*', STRING, BINARY_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}

Is referring to the fact that your WITH clause is in the wrong place. The correct pattern is:

CREATE TABLE <table name> WITH(...) AS SELECT ...

Which would make your statement:

ksql> CREATE TABLE events_table2
>     WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'),
>     AS
>     SELECT source, count(*),
>     WINDOW TUMBLING (SIZE 60 SECONDS)
>      WHERE account = '1111111111'
>                  GROUP BY source
>                  HAVING count(*) > 3;
Andrew Coates
  • 1,775
  • 1
  • 10
  • 16
  • Yes I had tried this but I get error shown below. Is the above query working for you ? please let me know if any syntactical change needed to execute this query. ` ksql> CREATE TABLE events_table3 > WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'), > AS > SELECT source, count(*), > WINDOW TUMBLING (SIZE 60 SECONDS) > WHERE account = '1111111111' > GROUP BY source > HAVING count(*) > 3; line 2:86: mismatched input ',' expecting ';' Caused by: org.antlr.v4.runtime.InputMismatchException ksql> ` – Shakeel Feb 20 '19 at 19:51