I have been facing a problem of reading a zip files in Apache Spark some time ago. I have shared my answer on StackOverflow.
As @Programmer aptly noted I am not closing opened streams. I tried achieving it with a partial function in takeWhile
(inspiration)
Stream.continually(zis.getNextEntry)
.takeWhile {
case null => zis.close(); false
case _ => true
}
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine())
.takeWhile{
case null => br.close(); false
case _ => true
}
}
But it does not work!
While reading the zip files, I get this error now:
Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 20, localhost): java.io.IOException: Stream closed
at java.util.zip.ZipInputStream.ensureOpen(ZipInputStream.java:67)
at java.util.zip.ZipInputStream.getNextEntry(ZipInputStream.java:116)
And simply by leaving it open - it works OK.
So it seems like I am closing the stream and then try to read it again. But I do not know why and how to solve it.