2

I want to retrieve rows from Apache Hive via Apache Spark and put each row to Aerospike cache.

Here is a simple case.

var dataset = session.sql("select * from employee");
final var aerospikeClient = aerospike;  // to remove binding between lambda and the service class itself
dataset.foreach(row -> {
    var key = new Key("namespace", "set", randomUUID().toString());
    aerospikeClient.add(
        key,
        new Bin(
            "json-repr",
            row.json()
        )
    );
});

I get an error:

Caused by: java.io.NotSerializableException: com.aerospike.client.reactor.AerospikeReactorClient

Obviously I can't make AerospikeReactorClient serializable. I tried to add dataset.collectAsList() and that did work. But as far as understood, this method loads all the content into one node. There might an enormous amount of data. So, it's not the option.

What are the best practices to deal with such problems?

Semyon Kirekov
  • 1,237
  • 8
  • 20

2 Answers2

4

You can write directly from a data frame. No need to loop through the dataset.

Launch the spark shell and import the com.aerospike.spark.sql._ package:

$ spark-shell
scala> import com.aerospike.spark.sql._
import com.aerospike.spark.sql._

Example of writing data into Aerospike

val TEST_COUNT= 100
val simpleSchema: StructType = new StructType(
    Array(
    StructField("one", IntegerType, nullable = false),
    StructField("two", StringType, nullable = false),
    StructField("three", DoubleType, nullable = false)
  ))

val simpleDF = {
    val inputBuf=  new ArrayBuffer[Row]()
    for ( i <- 1 to num_records){
        val one = i
        val two = "two:"+i
        val three = i.toDouble
        val r = Row(one, two, three)
        inputBuf.append(r)
    }
    val inputRDD = spark.sparkContext.parallelize(inputBuf.toSeq)
    spark.createDataFrame(inputRDD,simpleSchema)
}

//Write the Sample Data to Aerospike
simpleDF.write
.format("aerospike") //aerospike specific format
.option("aerospike.writeset", "spark-test") //write to this set
.option("aerospike.updateByKey", "one")//indicates which columns should be used for construction of primary key
.option("aerospike.write.mode","update")
.save()
Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
  • If you wanted to see a different way that allowed manage the process with a more fine grained approach read this post on writing data by partition: https://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions – Matt Andruff Feb 01 '22 at 15:27
2

I managed to overcome this issue by creating AerospikeClient manually inside foreach lambda.

var dataset = session.sql("select * from employee");
dataset.foreach(row -> {
    var key = new Key("namespace", "set", randomUUID().toString());
    newAerospikeClient(aerospikeProperties).add(
        key,
        new Bin(
            "json-repr",
            row.json()
        )
    );
});

Now I only have to declare AerospikeProperties as Serializable.

Semyon Kirekov
  • 1,237
  • 8
  • 20