1

I have a Kafka topic where Protobuf messages arive. I want to store them in blob storage and then be able to query them. Due to the volume of data I want to do this with Spark in order to have a scalable solution. The problem is that Spark does not have native support for Protobuf. I can think of two possible solutions:

  1. Store the raw data as a binary column in some format such as Parquet. When querying, use a UDF to parse the raw data.
  2. Convert the Protobuf to Parquet (this should map 1:1) on write. Then reading/querying with Spark becomes trivial.

I have managed to implement (2) using plain Java using the org.apache.parquet.proto.ProtoParquetWriter, i.e.

        ProtoParquetWriter w = new ProtoParquetWriter(file, MyProtobufClass.class);

        for (Object record : messages) {
            w.write(record);
        }

        w.close();

The problem is how to implement this using Spark?

There is this sparksql-protobuf project but it hasn't been updated in years.

I have found this related question but it is 3,5 years old and has no answer.

rdeboo
  • 377
  • 4
  • 11

1 Answers1

2

AFAIK there is no easy way to do it using dataset apis. We(in our team/company) use spark hadoop mapreduce apis which uses protobuf-parquet and use spark RDDs to encode/decore protobuf in parquet format. I can show some examples if you have trouble following parquet-protobuf docs.

Working sample

import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.proto.ProtoParquetOutputFormat;
import org.apache.parquet.proto.ProtoReadSupport;
import org.apache.parquet.proto.ProtoWriteSupport;

... 

this.job = Job.getInstance();
ParquetInputFormat.setReadSupportClass(job, ProtoReadSupport.class);
ParquetOutputFormat.setWriteSupportClass(job, ProtoWriteSupport.class);
ProtoParquetOutputFormat.setProtobufClass(job, YourProtoMessage.class); // your proto class

// read
context = new JavaSparkContext(sparkContext)
JavaPairRDD<Void, YourProtoMessage.Builder> input =
                context.newAPIHadoopFile(
                        inputDir,
                        ParquetInputFormat.class,
                        Void.class,
                        YourProtoMessage.Builder.class,
                        job.getConfiguration());


// write

rdd.saveAsNewAPIHadoopFile(OutputDir,
                        Void.class,
                        YourProtoMessage.class,
                        ParquetOutputFormat.class,
                        job.getConfiguration());

  • If I understand correctly, you perform two steps 1) Convert proto file to PARQUET ( This happens outside Spark. Kindly correct if this assumption is wrong) 2) Read parquet file in spark Did you face issues with Self Referencing Complex Objects? This is failing for me with protobuf-parquet while generating schema. – ReeniMathew Jun 03 '22 at 02:30
  • re 1) ProtoParquetWriter implementation in parquet-mr library takes care of it. happens inside spark. re 2) I am not sure about complex objects you are refering to. But in our usecase we have a pretty big proto schema with lots of different types. With parquet-mr it just works out of the box. Keep in mind you need to use spark RDD interface for this. – ashwin konale Jun 07 '22 at 10:01
  • Updated answer with working prototype – ashwin konale Jun 07 '22 at 10:08
  • Concerning this example of code for reading/writing protobuf messages in Parquet files, is there any example of implementing a filter for accessing them (lookup of min/max values of row groups)? – Nicholas Kou Dec 13 '22 at 09:36