1

I have the following code running in Java Spark:

ZipFile zipFile = new ZipFile(zipFilePath);
Enumeration<? extends ZipEnter> entries = zipFiles.entries();
while(entries.hasMoreElements()) {
    ZipEntry entry = entries.nextElement();
    //my logic...
}

I want to do the code above in parallel with Spark or Java parallel, How can I do it?

Thanks

user51
  • 8,843
  • 21
  • 79
  • 158
Ya Ko
  • 509
  • 2
  • 4
  • 19

2 Answers2

0

Below code will process the logic concurrently for each entries in enumeration in java and scala respectively.

In Java

entriesList = Collections.list(enumeration);
List<CompletableFuture<ZipEnter>> futureList = entriesList.stream().(x -> CompletableFuture. supplyAsync(() -> {
    //logic
}).collect(Collectors.toList());
CompletableFuture.allof(futureList);

In Scala

    entriesList = // to scala list

    Future[ZipEnter] futureList = entriesList.map(x => Future{
        // logic
    })

    Future.sequence(futureList)

Hope it helps.

user51
  • 8,843
  • 21
  • 79
  • 158
  • what should I write next to entriesList.stream(). ? entriesList.stream().allMatch? – Ya Ko Mar 07 '19 at 17:18
  • no. You don't need allMatch at all there. You only need to replace the //logic with whatever logic. I just trying to convert list of `ZipEnter` into list of `CompletableFuture< ZipEnter>`. – user51 Mar 07 '19 at 17:21
0
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

public class ParallelEnumeration {
    public static void main(String[] args) {
        String zipFilePath = "/ZipDir/";
        File zipFiles = new File(zipFilePath);
        final List<File> files = Arrays.asList(Objects.requireNonNull(zipFiles.listFiles()));
        // configure spark
        SparkConf sparkConf = new SparkConf().setAppName("Print Elements of RDD")
                .setMaster("local[*]");
        // start a spark context
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        // parallelize the file collection to two partitions
        jsc.parallelize(files, 2)
                .filter(file -> { // This filter is optional if the directory contains only zip files
                    // https://stackoverflow.com/questions/33934178/how-to-identify-a-zip-file-in-java
                    DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(file)));
                    int test = in.readInt();
                    in.close();
                    return test == 0x504b0304;
                }).foreach((VoidFunction<File>) file -> System.out.println(file.getName()));

    }
}
QuickSilver
  • 3,915
  • 2
  • 13
  • 29