6

I need to write a job that reads a DataSet[Row] and converts it to a DataSet[CustomClass] where CustomClass is a protobuf class.

val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}(protoEncoder)

However, looks like Protobuf classes are not really Java Beans and I do get a NPE on the following

val x =  Encoders.bean(classOf[CustomClass])

How does one go about ensuring that the job can emit a dataset of type DataSet[CustomClass] where CustomClass is the protobuf class. Any pointers/examples on writing a custom encoder for the class?

NPE:

val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
  at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
  at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
  ... 48 elided

The Bean encoder internally uses

JavaTypeInference.serializerFor(protoClass)

If I try to do the same in my custom encoder, I get a more descriptive error message:

Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
        at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
        at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
        at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
        at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
        at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
        at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)
zero323
  • 322,348
  • 103
  • 959
  • 935
Apurva
  • 153
  • 2
  • 7

5 Answers5

3

My experience with Encoders are not very promising and at this point I would recommend not spending more time on this.

I'd rather think about alternatives and how to work with Spark its way and map the result of Spark computation to the protobuf-generated class at the very last step.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • 1
    Thanks @JacekLaskowski your guide is very helpful in general. I was hoping to be able to write these in a distributed manner to a key value store using the map operation. My attempts so far look very very similar to what you mention but without luck. Will post an update if I get this to work. – Apurva Jun 27 '17 at 01:42
2

For converting Row to Protobuf class you can use sparksql-protobuf

This library provides utilities to work with Protobuf objects in SparkSQL. It provides a way to read parquet file written by SparkSQL back as an RDD of the compatible protobuf object. It can also convert RDD of protobuf objects into DataFrame.

add a dependency to your build.sbt file

resolvers += Resolver.jcenterRepo

libraryDependencies ++= Seq(
    "com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
    "org.apache.parquet" % "parquet-protobuf" % "1.8.1"

)

You can follow some examples from the library to get started

Example 1

Example 2

I hope this helps!

Community
  • 1
  • 1
koiralo
  • 22,594
  • 6
  • 51
  • 72
  • Thanks, I took a look at this, "It provides a way to read parquet file written by SparkSQL back as an RDD of compatible protobuf object" - this assumption is not necessarily true in my case - the underlying representation is not in parquet. – Apurva Jun 26 '17 at 22:35
  • I haven't worked with spark and protobuf, but this should helped you – koiralo Jun 26 '17 at 22:36
  • Some more context, I tried writing my own encoder. val serializer = JavaTypeInference.serializerFor(protoClass) This is what fails as suspected: Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430) – Apurva Jun 26 '17 at 23:19
0

While not a strict answer, I did get a workaround. The encoders are not needed if we use RDDs.

val rows =
      spark.sql("select * from tablename").as[CaseClass].rdd
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}

This gives me an RDD of the Protobuf Class that I can work with.

Apurva
  • 153
  • 2
  • 7
  • Why don't you directly build the RDD[Proto] you want with sparksql-protobuf (saurfang's github)? – belka Feb 15 '18 at 09:07
0

The way I did it: I used saurfang's sparksql-protobuf library (code available on Github). You directly get a RDD[ProtoSchema], but its difficult to convert to a Dataset[ProtoSchema]. I used it to fetch information to append to another RDD with user-defined functions mainly.

1: Import the library

With Maven:

<dependencies>
    <dependency>
        <groupId>com.github.saurfang</groupId>
        <artifactId>sparksql-protobuf_2.10</artifactId>
        <version>0.1.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-protobuf</artifactId>
        <version>1.9.0</version>
    </dependency>

    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.5.1</version>
    </dependency>
</dependencies>
...

<repositories>
    <repository>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
        <id>bintray-saurfang-maven</id>
        <name>bintray</name>
        <url>https://dl.bintray.com/saurfang/maven</url>
    </repository>
</repositories>

2: Read data as a RDD[ProtoSchema]

val sess: SparkSession = ...
val proto_rdd = new ProtoParquetRDD[ProtoSchema](sess.sparkContext, input_path, classOf[ProtoSchema])

(Optional) Add a PathFilter (Hadoop API)

If you'd like to add a PathFilter class (like you used to with Hadoop), or activate other options you had with Hadoop, you can do:

sess.sparkContext.hadoopConfiguration.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true)
sess.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[MyPathFiltering], classOf[PathFilter])

But do not forget to clear your Hadoop configuration, in case you want to use your SparkSession to read other things:

sess.sparkContext.hadoopConfiguration.clear()
belka
  • 1,480
  • 1
  • 18
  • 31
0

The default serialization doesn't work for my protobuf objects either.

However, turns out spark internally is using kryo. So if you do

Encoders.kryo(ProtoBuffObject.class)

it worked.

Jackie
  • 25,199
  • 6
  • 33
  • 24