I need to load HDFS files parallelly and process(read it and filter it based on some criteria) each file parallely. Following code loading the files in serial way. Running Spark Application with three Workers(4 cores each). I even tried setting paration parameter in parallelize method, but no performance Improvement. I'm sure my cluster has enough resources to run the jobs in parallel. What changes should I do to make it parallel ?
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> files = sparkContext.parallelize(fileList);
Iterator<String> localIterator = files.toLocalIterator();
while (localIterator.hasNext())
{
String hdfsPath = localIterator.next();
long startTime = DateUtil.getCurrentTimeMillis();
JavaPairRDD<IntWritable, BytesWritable> hdfsContent = sparkContext.sequenceFile(hdfsPath, IntWritable.class, BytesWritable.class);
try
{
JavaRDD<Message> logs = hdfsContent.map(new Function<Tuple2<IntWritable, BytesWritable>, Message>()
{
public Message call(Tuple2<IntWritable, BytesWritable> tuple2) throws Exception
{
BytesWritable value = tuple2._2();
BytesWritable tmp = new BytesWritable();
tmp.setCapacity(value.getLength());
tmp.set(value);
return (Message) getProtos(logtype, tmp.getBytes());
}
});
final JavaRDD<Message> filteredLogs = logs.filter(new Function<Message, Boolean>()
{
public Boolean call(Message msg) throws Exception
{
FieldDescriptor fd = msg.getDescriptorForType().findFieldByName("method");
String value = (String) msg.getField(fd);
if (value.equals("POST"))
{
return true;
}
return false;
}
});
long timetaken = DateUtil.getCurrentTimeMillis() - startTime;
LOGGER.log(Level.INFO, "HDFS: {0} Total Log Count : {1} Filtered Log Count : {2} TimeTaken : {3}", new Object[] { hdfsPath, logs.count(), filteredLogs.count(), timetaken });
}
catch (Exception e)
{
LOGGER.log(Level.INFO, "Exception : ", e);
}
}
Instead of iterating the files RDD, I also tried Spark functions like map & foreach. But it throws following Spark Exception. No external variables are referenced inside the closure and My class(OldLogAnalyzer) already implements Serializable interface. Also KryoSerializer and Javaserializer are configured in SparkConf. I'm puzzled what is not serializable in my code.
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:81)
at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32)
at com.test.logs.spark.OldLogAnalyzer.main(OldLogAnalyzer.java:423)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@68f277a2)
- field (class: com.test.logs.spark.OldLogAnalyzer$10, name: val$sparkContext, type: class org.apache.spark.api.java.JavaSparkContext)
- object (class com.test.logs.spark.OldLogAnalyzer$10, com.test.logs.spark.OldLogAnalyzer$10@2f80b005)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 15 more