2

I'm trying to build content-addressable file storage. The process is straightforward: take a stream of bytes and write it to a temp location while at the same time calculating a hash of the stream contents, then once the stream is done move the fully written temp object to its final location based on that hash.

Basically like this Conduit example, but with something a little more robust than the filesystem backing it:

storeObject dataDir srcStream = 
  let
     sinks = liftA2 (,)
       (ZipSink (sinkTempFile (dataDir </> "tmp") "ftmp"))
       (ZipSink sinkHash)
  in do
      (tempfile, hash) <- runConduitRes (srcStream .| getZipSink sinks)
      renameFile tempfile (dataDir </> "data" </> unpack (convert hash))
      return (convert (hash :: Digest SHA256))

For fs2, the best answer I could find on forking streams (How do I "split" a stream in fs2?), leads me to something like:

  def zipPipes[F[_]: Functor: Concurrent, A, B, C]
   (p1: Pipe[F, A, B], p2: Pipe[F, A, C]): 
   Pipe[F, A, (B, C)] = stream => 
      Stream.eval(for {
        q <- Queue.noneTerminated[F, A]
        } yield {
          stream
            .evalTap(a => q.enqueue1(Some(a)))
            .onFinalize(q.enqueue1(None))
            .through(p1)
            .zip(q.dequeue.through(p2))
        }
      ).flatten[F, (B, C)]

(Disclaimer: I have not verified the above code does anything more than compile)

But I don't know, this stack of plumbing seems janky enough that I feel I'm missing an obvious alternative?

Charles Miller
  • 2,867
  • 1
  • 21
  • 14

1 Answers1

4

You aren't really missing anything. You can share streams with Topic, e.g.:

def shareN[F[_]: Concurrent, A](n: Int): fs2.Pipe[F, A, List[fs2.Stream[F, A]]] = { src =>
  fs2.Stream.eval(Topic[F, A]).flatMap { topic =>
    fs2.Stream(List.fill(n)(topic.subscribe(1))).concurrently(
      topic.subscribers.find(_ == n) >> topic.publish(src)
    )
  }
}

will give you a fixed-size list, with a restriction that you must consume everything in parallel, or you'll deadlock. This could be made type-safer with shapeless but that's a different question.

It's also quite likely you can hammer the hashing into fs2.Stream#mapAccumulate and get a tuple out as a result.

Oleg Pyzhcov
  • 7,323
  • 1
  • 18
  • 30