3

I have a rather heavy metal library which is accessed via a Java/JNI connector and which I would like to use via Spark in a cluster. However, the library needs to be initialized before first usage and this takes about 30 seconds. So my question is whether Spark has some mechanism to preinitialize such libraries at the beginning of a job so that this init overhead is not necessary for the actual usage?

Klingon
  • 186
  • 5
  • currently how are you initializing (i mean which was taking 30s)? can you paste the procedure to do that pls? – Ram Ghadiyaram Feb 10 '17 at 19:56
  • The whole question deals with a currently fictitious use case, the question is if the Spark framework is powerful enough designed to deal with external libraries which need an explicit init step which can take some significant amount of time, e.g. 30 seconds. Thinks of libray exposing 3 functions: init, deinit and dosomething. init needs to be called before dosomething can be called. The question now is if Spark has any built-in mechanism to make sure that init is called only once and that the external library is kept in memory in this inited state until the whole job is done. – Klingon Feb 11 '17 at 22:28
  • Klingon: please check my answer hope that helps! – Ram Ghadiyaram May 09 '17 at 11:09

1 Answers1

1

question is whether Spark has some mechanism to pre-initialize such libraries at the beginning of a job so that this init overhead is not necessary for the actual usage?

AFAIK & as of now ... there is no such facility given by Spark see SPARK-650 like sc.init...


However if you want to pre-initialize with RDD tranformation then taken a empty RDD before running job and and you can reset and/or initialize the cluster...

map-reduce has setup and clean methods to initialize and cleanup... you can use the same way of converting a map-reduce style code to spark for example:

Note : empty RDD can be re-partitioned. So I am thinking below is the way if its transformation if you are using action then you can use foreachPartition

mapPartitions example :

val rdd = sc.emptyRDD.repartition(sc.defaultParallelism)    
rdd.mapPartitions(partition -> 
        setup() //library initialization like map-reduce setup method
        partition.map( item => 
            val output = process(item) // your logic here.
            if (!partition.hasNext) {
               // cleanup here
            }
        )
    )

foreachPartition example :

 if (rdd.isEmpty) {
      rdd.repartition(sc.defaultParallelism).foreachPartition(_ => yourUtils.initOnce())
    }

NOTE: mapartitions(transformation) and foreachPartition(action) could be useful in the above explained example way.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121