0

Me using spark-sql-2.3.1v , kafka with java8 in my project. I am trying to convert topic received byte[] to Dataset at kafka consumer side.

Here are the details

I have

class Company{
    String companyName;
    Integer companyId;
}

Which I defined as

public static final StructType companySchema = new StructType(
              .add("companyName", DataTypes.StringType)
              .add("companyId", DataTypes.IntegerType);

But message defined as

class Message{
    private List<Company> companyList;
    private String messageId;
}

I tried to define as

StructType messageSchema = new StructType()
            .add("companyList", DataTypes.createArrayType(companySchema , false),false)
            .add("messageId", DataTypes.StringType);

I sent the Message to kafka topic as byte[] using serialization .

I successfully received the message byte [] at consumer . Which I am trying to convert as Dataset ?? how to do it ?

   Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");

  messagesDs.printSchema();

  root
         |-- companyList: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- companyName: string (nullable = true)
         |    |    |-- companyId: integer (nullable = true)
         |-- messageId: string (nullable = true)    

Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));

comapanyListDs.printSchema();

root
 |-- col: struct (nullable = true)
 |    |-- companyName: string (nullable = true)
 |    |-- companyId: integer (nullable = true)



Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));

Getting Error :

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'companyName' given input columns: [col];

How to get Dataset records , how to get it ?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
BdEngineer
  • 2,929
  • 4
  • 49
  • 85

1 Answers1

1

Your struct got named with "col" when exploding.

Since your Bean class doesn't have "col" attribute, it is failing with mentioned error.

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'companyName' given input columns: [col];

You can do following select to get relevant columns as plain column: Something like this:

    Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));

I haven't tested syntax but must work your next step as soon as you get plain columns from struct for each row.

Ramdev Sharma
  • 974
  • 1
  • 12
  • 17
  • 1
    @BdEngineer, 'col' is default name given for Array in explode_outer function. if it is map then default names are 'key' and 'value'. You can specify name using as() function. – Ramdev Sharma Feb 10 '20 at 08:21
  • sir can you help and suggest how to handle this https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename – BdEngineer May 27 '20 at 06:50
  • sir could you please advice what is wrong here in reduce function https://stackoverflow.com/questions/63843599/reduce-result-datasets-into-single-dataset – BdEngineer Sep 11 '20 at 08:50