2

I want to reduce data frame by key. The reduce logic is pretty complex and requires update on about 10-15 fields. That why I want to convert DataFrame to DataSet and reduce Java POJOs.

Problem

The problem is that after groupByKey-reduceByKey I got some very strange values. Encoders.bean(Entity.class) reads correct data. See Code Example section.

Workarrounds

Replacing Encoders.bean with Encoders.kryo does not work, exception:

Try to map struct<broker_name:string,server_name:string,order:int,storages:array<struct<timestamp:timestamp,storage:double>>> to Tuple1, but failed as the number of fields does not line up.

Also I saw this workarround, but Encoders.product requires TypeTag . I have no idea how to create TypeTag at Java code.

Code Example

    Dataset<Entity> ds = createDataFrame("testData.json", "testSchema.json")
        .as(Encoders.bean(Entity.class));

    // shows correct numbers
    ds.show(10, false);

    // correct data, please pay attention to `storages` column values
+-----------+-----------+-----+-------------------------------+
|broker_name|server_name|order|storages                       |
+-----------+-----------+-----+-------------------------------+
|A1         |S1         |1    |[[2018-10-29 23:11:44, 12.5]]  |
|A2         |S1         |1    |[[2018-10-30 14:43:05, 13.2]]  |
|A3         |S1         |2    |[[2019-11-02 10:00:03, 1001.0]]|
+-----------+-----------+-----+-------------------------------+


    //after reduce shows wrong numbers
    ds
        .groupByKey(o -> new RowKey(o.getBroker_name(), o.getServer_name(), o.getOrder()), Encoders.bean(RowKey.class))
        .reduceGroups((e1, e2) -> e1)
        .map(tuple -> tuple._2, Encoders.bean(Entity.class))
        .show(10, false);

    // wrong values, please pay attention to `storages` column
+-----------+-----+-----------+---------------------------------------------------------+
|broker_name|order|server_name|storages                                                 |
+-----------+-----+-----------+---------------------------------------------------------+
|A1         |2    |S1         |[[7.77011509161492E-309, 149386-07-09 23:48:5454.211584]]|
|A1         |1    |S1         |[[7.61283374479283E-309, 148474-03-19 21:14:3232.5248]]  |
+-----------+-----+-----------+---------------------------------------------------------+

Entity.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Entity implements Serializable {
    private String broker_name;
    private String server_name;
    private Integer order;
    private Storage[] storages;
}

Storage.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Storage implements Serializable {
    private Timestamp timestamp;
    private Double storage;
}

testData.json:

[
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-29 23:11:44.000",
        "storage": 12.5
      }
    ]
  },
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-30 14:43:05.000",
        "storage": 13.2
      }
    ]
  },
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 2,
    "storages": [
      {
        "timestamp": "2019-11-02 10:00:03.000",
        "storage": 1001.0
      }
    ]
  }
]

testSchema.json:

{
  "type": "struct",
  "fields": [
    {
      "name": "broker_name",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "server_name",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "order",
      "type": "integer",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "storages",
      "type": {
        "type": "array",
        "elementType": {
          "type": "struct",
          "fields": [
            {
              "name": "timestamp",
              "type": "timestamp",
              "nullable": true,
              "metadata": {}
            },
            {
              "name": "storage",
              "type": "double",
              "nullable": true,
              "metadata": {}
            }
          ]
        },
        "containsNull": true
      },
      "nullable": true,
      "metadata": {}
    }
  ]
}
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
VB_
  • 45,112
  • 42
  • 145
  • 293

1 Answers1

3

That's because deserialization uses structural matching on schema inferred by the Encoder, and since bean classes have no natural structure, fields of the schema are ordered by name.

So if you define a bean class like your Entity, the schema inferred from the bean Encoder will be

Encoders.bean(Storage.class).schema().printTreeString();
root
 |-- storage: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

not

root
 |-- timestamp: timestamp (nullable = true)
 |-- storage: double (nullable = true)

and this the schema that should be used the Dataset. In other words schema defined as:

StructType schema = Encoders.bean(Entity.class).schema();

or

StructType schema = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " + 
  "storages array<struct<storage: double, timestamp: timestamp>>" 
);

would be valid, and could be used to load testData directly:

Dataset<Entity> ds = spark.read()
  .option("multiline", "true")
  .schema(schema)
  .json("testData.json")
  .as(Encoders.bean(Entity.class));

while your current schema, which is equivalent to:

StructType valid = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " + 
  "storages array<struct<timestamp: timestamp, storage: double>>" 
);

is not, despite the fact that it would work with JSON reader, which (in contrast to Encoders), matches data by name.

Arguably this behavior should be reported as a bug - intuitively there should be no case where Encoder dumps data that is incompatible with it's own loading logic.

Related JIRA ticket - SPARK-27050

user10938362
  • 3,991
  • 2
  • 12
  • 29