Questions tagged [scala-streams]

37 questions
7
votes
1 answer

Spark streaming is not working in Standalone cluster deployed in VM

I have written Kafka stream program using Scala and executing in Spark standalone cluster. Code works fine in my local. I have done Kafka , Cassandra and Spark setup in Azure VM. I have opened all inbound and outbound ports to avoid port blocking.…
Gnana
  • 2,130
  • 5
  • 26
  • 57
6
votes
1 answer

Scala how to get last calculated value of stream?

According to scala docs stream implements lazy lists where elements are only evaluated when they are needed. Example; val fibs: Stream[BigInt] = BigInt(0) #:: BigInt(1) #:: fibs.zip(fibs.tail).map(n => { n._1 + n._2 }) After that in scala…
altayseyhan
  • 735
  • 1
  • 7
  • 18
5
votes
2 answers

How to abruptly stop an akka stream Runnable Graph?

I am not able to figure out how to stop akka stream Runnable Graph immediately ? How to use killswitch to achieve this? It has been just a few days that I started akka streams. In my case I am reading lines from a file and doing some operations in…
PainPoints
  • 461
  • 8
  • 20
4
votes
2 answers

What is the intuition behind recursive algorithms with Streams?

Like the title says what is the intuition behind recursive algos with streams like: val fibs: LazyList[Int] = (0 #:: fibs).scanLeft(1)(_ + _) and val fibs: LazyList[Int] = 0 #:: 1 #:: (fibs.zip(fibs.tail).map{ t => t._1 + t._2 }) How do they…
Sergey Bushmanov
  • 23,310
  • 7
  • 53
  • 72
4
votes
0 answers

Scala Stream.grouped buffers entire stream into memory

Calling grouped on a Scala Stream seems to buffer the entire stream into memory. I've dug in quite a bit here to determine which class is holding the reference to Stream's head. A simple example: lazy val stream1: Stream[Int] = { def loop(v: Int):…
Christian Benincasa
  • 1,215
  • 1
  • 21
  • 45
4
votes
1 answer

remove the filtered line from the file using scala fs2 file streaming

how to remove the filtered lines from the current streaming file using fs2 and get the count of filtered lines as the return type? ex: If the old.txt contains strings separated by a newline (\n): john sam chen yval .... and val myList =…
vkt
  • 1,401
  • 2
  • 20
  • 46
4
votes
2 answers

forEach in scala shows expected: Consumer[_ >:Path] actual: (Path) => Boolean

Wrong syntax problem in recursively deleting scala files Files.walk(path, FileVisitOption.FOLLOW_LINKS) .sorted(Comparator.reverseOrder()) .forEach(Files.deleteIfExists)
Tom George
  • 43
  • 1
  • 4
3
votes
2 answers

How does the lazy 'take' function compute the Scala stream further?

In Martin Odersky’s book “Programming in Scala” there is an example of computing the Fibonacci sequence starting with 2 numbers passed as arguments to the function fibFrom. def fibFrom(a: Int, b: Int): Stream[Int] = a #:: fibFrom(b, a +…
2
votes
1 answer

Scala Stream prepend returns List instead of Stream

I have a Seq, x, and a Stream, y, and I wish to prepend x to y to obtain a new Stream. However, the static type of y is causing the Stream to be evaluated immediately, and I am confused why this is the case. Here is an example: val x: Seq[Int] =…
DBear
  • 312
  • 2
  • 9
2
votes
1 answer

Scala recursive implementation with Stream type

I have started studying Scala on Coursera and have some question on squareRootGuess implementation as follow I am trying to implement criteria to filter the accurate guess inside the sqrtGuess definition as shown below, but it gives me stack…
2
votes
1 answer

What is the equivalent of F# seq monad in Scala

I'm trying to move from F# to Scala. In F#, we can easily create a seq with computation expression or monad. For example: let myseq = seq { let mutableList = List() for i = 0 to 100 do mutableList.append(i) yield…
Xiang Zhang
  • 2,831
  • 20
  • 40
2
votes
1 answer

Cannot resolve symbol #:: error for streams in Scala

I have been trying the Bloxorz assignment as part of the Functional Program Design in Scala course and have been trying to add an element to a Stream as below but I am getting: Cannot resolve symbol #::. There is some very small, obvious mistake…
Aditya Singh
  • 15,810
  • 15
  • 45
  • 67
1
vote
1 answer

Generate sentences usign streams using scala

I want to generate multiple sentences using Stream. What I have now is I can generate 1 Sentences. def main(args: Array[String]): Unit = { println((generateSentence take 1).mkString) } This is the result I have so far …
NoobZik
  • 87
  • 9
1
vote
1 answer

Lazy Pagination in Scala (Stream/Iterator of Iterators?)

I'm reading a very large number of records sequentially from database API one page at a time (with unknown number of records per page) via call to def readPage(pageNumber: Int): Iterator[Record] I'm trying to wrap this API in something like either…
alex
  • 1,757
  • 4
  • 21
  • 32
1
vote
1 answer

Create and stream a zip file as it is beeing created with the playframework and scala

My scala-play api provides endpoints to return a file as a stream via the Ok.chunked-function. I now want to be able to allow the download of multiple files as a zip archive. I want to create a zip file as a stream which play should directly return…
Linde_98
  • 418
  • 4
  • 10
1
2 3