1

I have been facing a problem of reading a zip files in Apache Spark some time ago. I have shared my answer on StackOverflow.

As @Programmer aptly noted I am not closing opened streams. I tried achieving it with a partial function in takeWhile (inspiration)

Stream.continually(zis.getNextEntry)
  .takeWhile {
    case null => zis.close(); false
    case _ => true
  }
  .flatMap { _ =>
    val br = new BufferedReader(new InputStreamReader(zis))
    Stream.continually(br.readLine())
      .takeWhile{
        case null => br.close(); false
        case _ => true
      }
  } 

But it does not work!

While reading the zip files, I get this error now:

Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 20, localhost): java.io.IOException: Stream closed
    at java.util.zip.ZipInputStream.ensureOpen(ZipInputStream.java:67)
    at java.util.zip.ZipInputStream.getNextEntry(ZipInputStream.java:116)

And simply by leaving it open - it works OK.

So it seems like I am closing the stream and then try to read it again. But I do not know why and how to solve it.

Atais
  • 10,857
  • 6
  • 71
  • 111

1 Answers1

0

Following @alexandre-dupriez comment I have closed only the outer Stream and this helped...

Stream.continually(zis.getNextEntry)
  .takeWhile {
    case null => zis.close(); false
    case _ => true
  }
  .flatMap { _ =>
    val br = new BufferedReader(new InputStreamReader(zis))
    Stream.continually(br.readLine()).takeWhile(_ != null)
  }

Thus I will need a while to confirm if it works as it should.

Atais
  • 10,857
  • 6
  • 71
  • 111