Questions tagged [zio-streams]
17 questions
4
votes
0 answers
How can i build batch the request and understand the response with help ZStream (ZIO)?
I have api that gets such request:
case class UsersRequest(ids: List[Long])
and returns such response:
case class UsersInfoResponse(info: List[Info])
case class Info(userId: Long, info: String)
also, i have methods that send this request and…

Vlad
- 41
- 1
3
votes
0 answers
How to merge two stream and sort them in zio streams
Like akka-stream's mergeSorted.
It's very useful when process history data and keep the time order.
What's the replacement in zio-streams?

Lin Lee
- 213
- 2
- 8
3
votes
1 answer
ZIO Streams: Which is the difference between a ZSink and a ZTransducer?
I'm studying ZIO Streams, using version 1.0.9 of the library zio-streams.
I cannot find any reference that shows me the difference between a ZSink and a ZTransducer.
What is the difference?

riccardo.cardin
- 7,971
- 5
- 57
- 106
2
votes
0 answers
Equivalent looking ZIO future interop code with different results
Working with ZIO for the first time and wrote some code that boiled down to
val x = ZStream.fromIterable(Iterable.empty).runDrain
await(zio.Runtime.default.unsafeRun(x.toFuture))
it compiles but fails at runtime with
[info] …

Sign
- 1,919
- 18
- 33
2
votes
0 answers
Data manipulation with ZIO Stream -- Compiles and runs but does not finish
I created 3 different versions of a data processing mini pipeline. One with Scala Views, one with FS2 and the one with ZIO Streams. The View and FS2 implementation both run and finishing pretty quickly (FS2 being much faster). However, my ZIO…

Lukas Tycho
- 25
- 3
2
votes
1 answer
Write output of ZIO Stream to file
I am trying to write the results of a ZIO Stream to a file. The following example app takes a sequence of integers, converts them to bytes, compresses them with the gzip transducer, but I cannot figure out how to write them to a file.
I think I need…

RandomBits
- 4,194
- 1
- 17
- 30
1
vote
1 answer
How to create ZStream of String from ZStream of Byte
I need to read file from http. I'm using sttp with ZioBackend like this:
val sttpBackend: SttpBackend[Task, ZioStreams] = ???
val request =
basicRequest
.post(uri"...")
.response(asStreamUnsafe(ZioStreams))
…

Izbassar Tolegen
- 1,990
- 2
- 20
- 37
1
vote
0 answers
Can I use a ZSink to commit offsets in Zio-Kafka?
I am learning ZIO integration with Apache Kafka, using the library zio-kafka. In the example on the Github main project page, they use a mapM function to commit the offset of a chunk:
Consumer.subscribeAnd(Subscription.topics("topic150"))
…

riccardo.cardin
- 7,971
- 5
- 57
- 106
0
votes
1 answer
Correct way to convert queue emitting Take[_, A] to A?
I don't quite understand the user-level design of ZIO's ZStream when writing to a queue:
val queue: RIO[Scope, Dequeue[Take[Nothing, Int]]] =
ZStream(1, 2).toQueue()
... emitting (for example) Take(Success(Chunk(1)))
I understand that, at a…

Tim W
- 1
- 1
0
votes
0 answers
Scala, ZIO, ZStream - how to stream custom data object to endpoint?
I want to stream data from Zstream with some repeat time. I have my main function which returns ZIO:
def processData(request: MyRequest): Task[Seq[SomePayload]]
I also call this method for every element in my list requests.
Now I would like to take…

Developus
- 1,400
- 2
- 14
- 50
0
votes
0 answers
Scala ZIOStream with Future
I'm trying to compose ZIO.fromFuture with ZStream.async following this example but I have a type issue:
ZIO.fromFuture gives me Task[+A] = ZIO[Any, Throwable, A]
but Zstream.emit needs ZIO[R, Option[E], Chunk[A]]
[
How could I turn a Future into…

Bill'o
- 514
- 1
- 5
- 19
0
votes
1 answer
zio-grpc bi-stream NOT close in server-side after closing grpcurl by `Ctrl-C`
Consulting about zio-grpc bi-stream closing invoke: When it will be closed? I'm use grpcurl to test bistream, but zio-grpc server side not close immidently(it will closed after some time).
I'm watching grpc server-side stream close event by…

LoranceChen
- 2,453
- 2
- 22
- 48
0
votes
0 answers
How to use zio kafka with google protobuf when you need to read data from topic and get it as Java proto class?
I need to get data from Kafka topic as a Zio Stream, data there is in the google protobuf format, also i need to check schema
I use the following sample protobuf file which generates proto.Data Java class for me:
syntax = "proto3";
package…

eprst2019
- 31
- 3
0
votes
1 answer
Interrupt ZStream mapMPar processing
I have the following code which, because of Excel max row limitations, is restricted to ~1million rows:
ZStream.unwrap(generateStreamData).mapMPar(32) {m =>
streamDataToCsvExcel
}
All fairly straightforward and it works perfectly. I keep track of…

user1128482
- 185
- 1
- 1
- 11
0
votes
1 answer
Streaming a ByteArrayOutputStream to an akka http response
I'm creating a ByteArrayOutputStream using ZIO Streams i.e.:
lazy val byteArrayOutputStream = new ByteArrayOutputStream()
val sink = ZSink.fromOutputStream(byteArrayOutputStream).contramapChunks[String](_.flatMap(_.getBytes)
val data =…

user1128482
- 185
- 1
- 1
- 11