1

I'm playing around with the stream interfaces discussed in this blog post. I've made what feels like a simple implementation to test out the interfaces, but it is hanging indefinitely when I run it and I'm stumped as to why. Here's the code:

import java.io.PipedOutputStream
import java.io.PipedInputStream
import java.io.InputStream
import java.io.OutputStream
import scala.concurrent.{Future, Promise, Await}
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
import scala.concurrent.duration._
import scala.collection.immutable.ListMap

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

def copy(input: InputStream, output: OutputStream): Unit = {
  val buffer = new Array[Byte](1024)
  var bytesRead = input.read(buffer)
  while (bytesRead != -1) {
    output.write(buffer, 0, bytesRead)
    bytesRead = input.read(buffer)
  }
}

def curthread(label: String): Unit = println(label, Thread.currentThread().getName())

trait Writable{
  def writeBytesTo(out: OutputStream): Unit
}

trait Readable extends Writable{
  def readBytesThrough[T](f: InputStream => T): T
  def writeBytesTo(out: OutputStream): Unit = readBytesThrough(copy(_, out))
}

def convertToUpperCase(input: InputStream, output: OutputStream): Unit = {
  println("convertToUpperCase")
  curthread("convertToUpperCase")
  val buffer = new Array[Byte](1024) // buffer size
  println("convertToUpperCase: input.read")
  var bytesRead = input.read(buffer)

  while (bytesRead != -1) {
    println(s"convertToUpperCase read $bytesRead")
    val upperBuffer = buffer.map(_.toChar.toUpper.toByte)
    println("convertToUpperCase: output.write")
    output.write(upperBuffer, 0, bytesRead)
    println("convertToUpperCase: input.read")
    bytesRead = input.read(buffer)
  }

  input.close()
  output.close()
}

class Put extends Readable {
  private val output = new PipedOutputStream

  def readBytesThrough[T](f: InputStream => T): T = {
    println("Put: readBytesThrough")
    curthread("Put")
    f(new PipedInputStream(output))
  }
  
  def receive(x: Any): Unit = {
    println(s"Put: receive($x)")
    curthread("Put")
    output.write(x.toString.getBytes("utf-8"))
    println("done receiving")
  }

  def receivedLast(): Unit = {
    println("Put: receivedLast")
    curthread("Put")
    output.close()
  }
}

case class Parsed(value: String) extends Writable {
  def writeBytesTo(out: OutputStream): Unit = {
    println("Parsed: writeBytesTo")
    curthread("Parsed")
    out.write(value.getBytes("utf-8"))
  }
}

def uppercase(r: Readable): Readable = new Readable {
  def readBytesThrough[T](f: InputStream => T): T = {
    val output = new PipedOutputStream
    Future {
      r.readBytesThrough(input => {
        convertToUpperCase(input, output)
      })
    }
    f(new PipedInputStream(output))
  }
}

def parse(r: Readable): Writable = {
  curthread("parse")
  val x = r.readBytesThrough(scala.io.Source.fromInputStream(_).mkString)
  Parsed(x)
}

def display(w: Writable): Unit = {
  curthread("display")
  w.writeBytesTo(System.out)
}

val put = new Put

curthread("main")

Future {
    put.receive("foobarbaz")
    put.receivedLast()
}

display(parse(uppercase(put)))

This gives the output

(main,run-main-0)
Put: receive(foobarbaz)
(Put,pool-11-thread-1)
(parse,run-main-0)
Put: readBytesThrough
(Put,pool-11-thread-2)
convertToUpperCase
(convertToUpperCase,pool-11-thread-2)
convertToUpperCase: input.read

So it appears that reads and writes to each end of the piped stream are happening on different threads as they should. Why then is it hanging at convertToUpperCase?

Scastie link: https://scastie.scala-lang.org/I9EIOs8NR2mBeLbcqDx2zA

alcorn
  • 1,268
  • 1
  • 8
  • 17
  • 1
    If you really need / want some efficient way to stream I/O data, I would recommend looking at **fs2** or **ZIO** and maybe **AkkaStreams**. But, note each of these has a learning curve and imply some new concepts like the `IO` data type or actors. – Luis Miguel Mejía Suárez Apr 01 '23 at 13:28
  • @GaëlJ looks like the problem in that question was that the input stream was incorrectly sending EOF. In this case the read is hanging entirely, nothing is coming out, so I think it is different. – alcorn Apr 01 '23 at 19:12
  • Voting to close this question in favor of a new question I asked which isolates the problem much more effectively: https://stackoverflow.com/questions/75908870/why-does-writing-a-value-larger-than-the-buffer-size-of-javas-pipedinputstream – alcorn Apr 01 '23 at 21:09

0 Answers0