I'm playing around with the stream interfaces discussed in this blog post. I've made what feels like a simple implementation to test out the interfaces, but it is hanging indefinitely when I run it and I'm stumped as to why. Here's the code:
import java.io.PipedOutputStream
import java.io.PipedInputStream
import java.io.InputStream
import java.io.OutputStream
import scala.concurrent.{Future, Promise, Await}
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
import scala.concurrent.duration._
import scala.collection.immutable.ListMap
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
def copy(input: InputStream, output: OutputStream): Unit = {
val buffer = new Array[Byte](1024)
var bytesRead = input.read(buffer)
while (bytesRead != -1) {
output.write(buffer, 0, bytesRead)
bytesRead = input.read(buffer)
}
}
def curthread(label: String): Unit = println(label, Thread.currentThread().getName())
trait Writable{
def writeBytesTo(out: OutputStream): Unit
}
trait Readable extends Writable{
def readBytesThrough[T](f: InputStream => T): T
def writeBytesTo(out: OutputStream): Unit = readBytesThrough(copy(_, out))
}
def convertToUpperCase(input: InputStream, output: OutputStream): Unit = {
println("convertToUpperCase")
curthread("convertToUpperCase")
val buffer = new Array[Byte](1024) // buffer size
println("convertToUpperCase: input.read")
var bytesRead = input.read(buffer)
while (bytesRead != -1) {
println(s"convertToUpperCase read $bytesRead")
val upperBuffer = buffer.map(_.toChar.toUpper.toByte)
println("convertToUpperCase: output.write")
output.write(upperBuffer, 0, bytesRead)
println("convertToUpperCase: input.read")
bytesRead = input.read(buffer)
}
input.close()
output.close()
}
class Put extends Readable {
private val output = new PipedOutputStream
def readBytesThrough[T](f: InputStream => T): T = {
println("Put: readBytesThrough")
curthread("Put")
f(new PipedInputStream(output))
}
def receive(x: Any): Unit = {
println(s"Put: receive($x)")
curthread("Put")
output.write(x.toString.getBytes("utf-8"))
println("done receiving")
}
def receivedLast(): Unit = {
println("Put: receivedLast")
curthread("Put")
output.close()
}
}
case class Parsed(value: String) extends Writable {
def writeBytesTo(out: OutputStream): Unit = {
println("Parsed: writeBytesTo")
curthread("Parsed")
out.write(value.getBytes("utf-8"))
}
}
def uppercase(r: Readable): Readable = new Readable {
def readBytesThrough[T](f: InputStream => T): T = {
val output = new PipedOutputStream
Future {
r.readBytesThrough(input => {
convertToUpperCase(input, output)
})
}
f(new PipedInputStream(output))
}
}
def parse(r: Readable): Writable = {
curthread("parse")
val x = r.readBytesThrough(scala.io.Source.fromInputStream(_).mkString)
Parsed(x)
}
def display(w: Writable): Unit = {
curthread("display")
w.writeBytesTo(System.out)
}
val put = new Put
curthread("main")
Future {
put.receive("foobarbaz")
put.receivedLast()
}
display(parse(uppercase(put)))
This gives the output
(main,run-main-0)
Put: receive(foobarbaz)
(Put,pool-11-thread-1)
(parse,run-main-0)
Put: readBytesThrough
(Put,pool-11-thread-2)
convertToUpperCase
(convertToUpperCase,pool-11-thread-2)
convertToUpperCase: input.read
So it appears that reads and writes to each end of the piped stream are happening on different threads as they should. Why then is it hanging at convertToUpperCase
?
Scastie link: https://scastie.scala-lang.org/I9EIOs8NR2mBeLbcqDx2zA