1

I am facing a issue where when i collecting object from flink flatmap collector than i am not getting value collected correctly. I am getting object reference and its not giving me actual value.

dataStream.filter(new FilterFunction<GenericRecord>() {
      @Override
      public boolean filter(GenericRecord record) throws Exception {
        if (record.get("user_id") != null) {
          return true;
        }
        return false;
      }
    }).flatMap(new ProfileEventAggregateFlatMapFunction(aggConfig))
        .map(new MapFunction<ProfileEventAggregateEmittedTuple, String>() {
          @Override
          public String map(
              ProfileEventAggregateEmittedTuple profileEventAggregateEmittedTupleNew)
              throws Exception {
            String res=null;
            try {
              ObjectMapper mapper = new ObjectMapper();
              mapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
              res= mapper.writeValueAsString(profileEventAggregateEmittedTupleNew);
            } catch (Exception e) {
              e.printStackTrace();
            }
            return res;
          }
        }).print();




public class ProfileEventAggregateFlatMapFunction extends
    RichFlatMapFunction<GenericRecord, ProfileEventAggregateEmittedTuple> {

  private final ProfileEventAggregateTupleEmitter aggregator;
  ObjectMapper mapper = ObjectMapperPool.getInstance().get();

  public ProfileEventAggregateFlatMapFunction(String config) throws IOException {
    this.aggregator = new ProfileEventAggregateTupleEmitter(config);
  }

  @Override
  public void flatMap(GenericRecord event,
      Collector<ProfileEventAggregateEmittedTuple> collector) throws Exception {
    try {

      List<ProfileEventAggregateEmittedTuple> aggregateTuples = aggregator.runAggregates(event);

      for (ProfileEventAggregateEmittedTuple tuple : aggregateTuples) {

        collector.collect(tuple);
      }
}}

Debug Results: tuple that i am collecting in collector

tuple = {ProfileEventAggregateEmittedTuple@7880} 
 profileType = "userprofile"
 key = "1152473"
 businessType = "keyless"
 name = "consumer"
 aggregates = {ArrayList@7886}  size = 1
  0 = {ProfileEventAggregate@7888} "geo_id {geo_id=1} {keyless_select_destination_cnt=1, total_estimated_distance=12.5}"
   entityType = "geo_id"
   dimension = {LinkedHashMap@7891}  size = 1
    "geo_id" -> {Integer@7897} 1
     key = "geo_id"
     value = {Integer@7897} 1
   metrics = {LinkedHashMap@7892}  size = 2
    "keyless_select_destination_cnt" -> {Long@7773} 1
     key = "keyless_select_destination_cnt"
     value = {Long@7773} 1
    "total_estimated_distance" -> {Double@7904} 12.5
     key = "total_estimated_distance"
     value = {Double@7904} 12.5

This i get in my map function .map(new MapFunction<ProfileEventAggregateEmittedTuple, String>()

 profileEventAggregateEmittedTuple = {ProfileEventAggregateEmittedTuple@7935} 
 profileType = "userprofile"
 key = "1152473"
 businessType = "keyless"
 name = "consumer"
 aggregates = {GenericData$Array@7948}  size = 1
  0 = {ProfileEventAggregate@7950} "geo_id {geo_id=java.lang.Object@863dce2} {keyless_select_destination_cnt=java.lang.Object@7cdb4bfc, total_estimated_distance=java.lang.Object@52e81f57}"
   entityType = "geo_id"
   dimension = {HashMap@7952}  size = 1
    "geo_id" -> {Object@7957} 
     key = "geo_id"
     value = {Object@7957} 
      Class has no fields
   metrics = {HashMap@7953}  size = 2
    "keyless_select_destination_cnt" -> {Object@7962} 
     key = "keyless_select_destination_cnt"
     value = {Object@7962} 
      Class has no fields
    "total_estimated_distance" -> {Object@7963} 

Please help me to understand what is happening why i am not getting correct data.

public class ProfileEventAggregateEmittedTuple implements Cloneable, Serializable {
  private String profileType;
  private String key;
  private String businessType;
  private String name;
  private List<ProfileEventAggregate> aggregates = new ArrayList<ProfileEventAggregate>();
  private long startTime;
  private long endTime;

  public String getProfileType() {
    return profileType;
  }

  public void setProfileType(String profileType) {
    this.profileType = profileType;
  }

  public String getKey() {
    return key;
  }

  public void setKey(String key) {
    this.key = key;
  }

  public String getBusinessType() {
    return businessType;
  }

  public void setBusinessType(String businessType) {
    this.businessType = businessType;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public List<ProfileEventAggregate> getAggregates() {
    return aggregates;
  }

  public void addAggregate(ProfileEventAggregate aggregate) {
    this.aggregates.add(aggregate);
  }

  public void setAggregates(List<ProfileEventAggregate> aggregates) {
    this.aggregates = aggregates;
  }

  public long getStartTime() {
    return startTime;
  }

  public void setStartTime(long startTime) {
    this.startTime = startTime;
  }

  public long getEndTime() {
    return endTime;
  }

  public void setEndTime(long endTime) {
    this.endTime = endTime;
  }

 @Override
  public ProfileEventAggregateEmittedTuple clone() {
    ProfileEventAggregateEmittedTuple clone = new ProfileEventAggregateEmittedTuple();

    clone.setProfileType(this.profileType);
    clone.setKey(this.key);
    clone.setBusinessType(this.businessType);
    clone.setName(this.name);

    for (ProfileEventAggregate aggregate : this.aggregates) {
      clone.addAggregate(aggregate.clone());
    }
    return clone;
  }

public class ProfileEventAggregate  implements Cloneable, Serializable {

  private String entityType;
  private Map<String, Object> dimension =new LinkedHashMap<String, Object>();
  private Map<String, Object> metrics = new LinkedHashMap<String, Object>();

  public Map<String, Object> getDimension() {
    return dimension;
  }

  public void setDimension(Map<String, Object> dimension) {
    this.dimension.putAll(dimension);
  }

  public void addDimension(String dimensionKey, Object dimensionValue) {
    this.dimension.put(dimensionKey, dimensionValue);
  }

  public Map<String, Object> getMetrics() {
    return metrics;
  }
  public void addMetric(String metricKey, Object metricValue) {
    this.metrics.put(metricKey, metricValue);
  }
  public void setMetrics(Map<String, Object> metrics) {
    this.metrics.putAll(metrics);
  }
  public String getEntityType() {
    return entityType;
  }
  public void setEntityType(String entityType) {
    this.entityType = entityType;
  }

  @Override
  public ProfileEventAggregate clone()  {
    ProfileEventAggregate clone = new ProfileEventAggregate();

    clone.setEntityType(this.entityType);
    clone.getDimension().putAll(this.getDimension());
    clone.getMetrics().putAll(this.metrics);
    return clone;
  }
Anuj jain
  • 493
  • 1
  • 8
  • 26

1 Answers1

1

When you don't enableObjectReuse, objects are copied with your configured serializer (seems to be Avro?).

In your case, you use Map<String, Object> where you cannot infer a plausible schema.

The easiest fix would be to enableObjectReuse. Else make sure your serializer matches your data. So you could add a unit test where you use AvroSerializer#copy and make sure your POJO is properly annotated if you want to stick with Avro reflect or even better go with a schema first approach, where you generate your Java POJO with a Avro schema and use specific Avro.

Let's discuss some alternatives:

  • Use GenericRecord. Instead of converting it to a Java type, directly access GenericRecord. This is usually the only way when the full record is flexible (e.g. your job takes any input and writes it out to S3).
  • Denormalize schema. Instead of having some class Event { int id; Map<String, Object> data; } you would use class EventInformation { int id; String predicate; Object value; }. You would need to group all information for processing. However, you will run into the same type issues with Avro.
  • Use wide-schema. Looking at the previous approach, if the different predicates are known beforehand, then you can use that to craft a wide-schema class Event { int id; Long predicate1; Integer predicate2; ... String predicateN; } where all oft he entries are nullable and most of them are indeed null. To encode null is very cheap.
  • Ditch Avro. Avro is fully typed. You may want to use something more dynamic. Protobuf has Any to support arbitrary submessages.
  • Use Kryo. Kryo can serialize arbitrary object trees at the cost of being slower and having more overhead.

If you want to write the data, you also need to think about a solution where the type information is added for proper deserialization. For an example, check out this JSON question. But there are more ways to implement it.

Arvid Heise
  • 3,524
  • 5
  • 11
  • Thanks . I have added the pojo class let me know what is wrong there. – Anuj jain Sep 28 '20 at 16:25
  • It's like I feared: you use `Map` where you cannot infer a plausible schema. For metrics, you probably want to go `Map` or `Map`. If `dimension` captures an arbitrary value, you probably have to resort to `Map`. Note that in general these kind of schemas are completely ineficient in terms of serialization cost and computation. It's usually much much better to use a wide-schema with nullable types. – Arvid Heise Sep 28 '20 at 19:02
  • i want to make it generalize. I am not using specific class for each event as there are lots of events and every time a new event come i have to make code changes. Metrics i have kept Map because it can be Long,Int,Double any type of value. Same way dimension value also be string,int,long etc. What is the better design to make it generalize . Please suggest . Can you elaborate wide-schema with nullable types – Anuj jain Sep 29 '20 at 09:41
  • Thanks, Arvid for the detail about the approaches. I am not great in serialization part can u also suggest some good stuff to read for getting some depth knowledge on this. Also, I am still not clear that by using enableObjectReuse how it will work. – Anuj jain Oct 05 '20 at 18:09
  • Here you can read more about [enableObjectReuse](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/#operating-on-data-objects-in-functions) and [serialization in Flink](https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html). – Arvid Heise Oct 06 '20 at 12:21