6

I need to use a non-serialisable 3rd party class in my functions on all executors in Spark, for example:

JavaRDD<String> resRdd = origRdd
    .flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(String t) throws Exception {

        //A DynamoDB mapper I don't want to initialise every time
        DynamoDBMapper mapper = new DynamoDBMapper(new AmazonDynamoDBClient(credentials));

        Set<String> userFav = mapper.load(userDataDocument.class, userId).getFav();

        return userFav;
    }
});

I would like to have a static DynamoDBMapper mapper which I initialise once for every executor and be able to use it over and over again.

Since it's not a serialisable, I can't initialise it once in the drive and broadcast it.

note: this is an answer here (What is the right way to have a static object on all workers) but it's only for Scala.

Community
  • 1
  • 1
Roee Gavirel
  • 18,955
  • 12
  • 67
  • 94

1 Answers1

7

You can use mapPartition or foreachPartition. Here is a snippet taken from Learning Spark

By using partition- based operations, we can share a connection pool to this database to avoid setting up many connections, and reuse our JSON parser. As Examples 6-10 through 6-12 show, we use the mapPartitions() function, which gives us an iterator of the elements in each partition of the input RDD and expects us to return an iterator of our results.

This allows us to initialize one connection per executor, then iterate over the elements in the partition however you would like. This is very useful for saving data into some external database or for expensive reusable object creation.

Here is a simple scala example taken from the linked book. This can be translated to java if needed. Just here to show a simple use case of mapPartition and foreachPartition.

ipAddressRequestCount.foreachRDD { rdd => rdd.foreachPartition { partition =>
    // Open connection to storage system (e.g. a database connection)
    partition.foreach { item =>
    // Use connection to push item to system
    }
    // Close connection
    } 
}

Here is a link to a java example.

Alex Naspo
  • 2,052
  • 1
  • 20
  • 37
  • there will be many partitions . It is posiible to have connection pool per executor or jvm ? – donald Nov 08 '17 at 08:37
  • 2
    @donald It is possible if you add a connection pool as a transient val to an Object (singleton). The Object will be initialized once per executor and the connection pool will be created on each executor upon referencing it. You can now reuse that connection pool for each partition. – Alex Naspo Nov 11 '17 at 19:00