0

I'm using Flink to process the data coming from some data source (such as Kafka, Pravega etc).

In my case, the data source is Pravega, which provided me a flink connector.

My data source is sending me some JSON data as below:

{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...

Here is my piece of code:

PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(adapter)
    .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
        @Override
        public String map(ObjectNode node) throws Exception {
            return node.toString();
        }
    })
    .keyBy("word")    // ERROR
    .timeWindow(Time.seconds(10))
    .sum("count");

As you see, I used the FlinkPravegaReader and a proper deserializer to get the JSON stream coming from Pravega.

Then I try to transform the JSON data into a String, KeyBy them and count them.

However, I get an error:

 The program finished with the following exception:

Field expression must be equal to '*' or '_' for non-composite types.
        org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
        org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
        myflink.StreamingJob.main(StreamingJob.java:114)

It seems that KeyBy threw this exception.

Well, I'm not a Flink expert so I don't know why. I've read the source code of the official example WordCount. In that example, there is a custtom splitter, which is used to split the String data into words.

So I'm thinking if I need to use some kind of splitter in this case too? If so, what kind of splitter should I use? Can you show me an example? If not, why did I get such an error and how to solve it?

Yves
  • 11,597
  • 17
  • 83
  • 180

1 Answers1

0

I guess you have read the document about how to specify keys

Specify keys

The example codes use keyby("word") because word is a field of POJO type WC.

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

In your case, you put a map operator before keyBy, and the output of this map operator is a string. So there is obviously no word field in your case. If you actually want to group this string stream, you need to write it like this .keyBy(String::toString)

Or you can even implement a customized keySelector to generate your own key.

Customized Key Selector

iluvex
  • 103
  • 1
  • 9
  • as your input is json, you can parse the json and extract the field you want in a keyselector – iluvex Jun 03 '20 at 12:45
  • You may also define a POJO type, and parse the JSON accordingly. Then you can do keyby based on the field names. – iluvex Jun 03 '20 at 12:48
  • Well, my goal is to process the JSON data one by one. So if I understand your answer correctly, I should use a customized keySelector. – Yves Jun 03 '20 at 13:21
  • 1
    If you just try to group the stream by word( extracted from the orginal JSON string), and calculate the occurrences of each word, a customized keyselector is ok. But in many real-world scenarios, you may also want to extract other fields from the original JSON. In a customized keyselector, we parse a JSON string and only extract the word field, however, if we need other info from the same JSON in following operators, we have to parse the JSON again and again, so it is better to extract all necessary info in the beginning and define a POJO type for it. – iluvex Jun 04 '20 at 03:21
  • Could you please show me an example of POJO type? It would be appreciated. If possible, please use this example: https://stackoverflow.com/questions/62175431/apache-flink-could-not-extract-key-from-objectnodeget – Yves Jun 04 '20 at 03:42