8

I'm just learning highland.js after being inspired by NoFlo.js. I want to be able to have streams operate recursively. In this contrived example I will provide a number that get's multiplied by two and we filter results <= 512. Once the number is multiplied it gets fed back into the system. The code I have works but if I take out the doto function in the pipeline it doesn't process any numbers. I suspect that I'm sending the data back into the returnPipe incorrectly. Is there a better way to pipe data back into a system? What am I missing?

###
  input>--m--->multiplyBy2>---+
          |                   |
          |                   |
          +---<returnPipe<----+
###

H = require('highland')

input = H([1])
returnPipe = H.pipeline(
  H.doto((v)->console.log(v))
)
H.merge([input,returnPipe])
 .map((v)-> return v * 2)
 .filter((v)-> return v <= 512)
 .pipe(returnPipe)
Michael Connor
  • 4,182
  • 24
  • 21

2 Answers2

5

From the documentation: doto spins off a stream while re-emitting the source stream. This means that as far as the pipeline is concerned, there is a function that is still passing the stream through it. If you take doto out, the original stream doesn't make it back through return stream on the next iteration.

If you are going to use pipeline, you have to pass it a method that takes a stream and emits a stream. For example, you could replace the doto method with something like H.map((v)=>{console.log(v); return v;}) in the call to H.pipeline and since that method consumes a stream and emits a stream, it will continue to flow when the stream is passed back into it on .pipe(returnPipe)

EDIT: To answer your question, when you declare let input = H([1]) you are actually creating a stream right there. You can remove any reference to the pipeline and returnPipe and produce the same output with the following code:

let input = H([1]);

input.map((v)=> {
  return v * 2;
})
.filter((v)=> {
  if (v <= 512) {
    console.log(v);
  }
  return v <= 512;
})
.pipe(input);
jaredkwright
  • 1,390
  • 2
  • 11
  • 27
  • That makes sense. Thank you. It just intuitively feels like I should be able to pipe the data without any functions whatsoever. Is there something like a pipeline that will just consume and re-emit? – Michael Connor Sep 03 '15 at 01:56
  • See the example in the edit I made. Since the whole idea of a stream is that it consumes and re-emits, and since `H(source)` actually creates a stream object, there is no need to create a separate "pipeline". I believe that the idea of the pipeline is just to provide a convenient wrapper for a series of functions that you want the stream to pass through. – jaredkwright Sep 03 '15 at 05:02
  • That would have never occurred to me. Thanks so much. Well worth the +50!! – Michael Connor Sep 04 '15 at 17:54
  • If you change the map to a flatMap((v)-> return H([v + 1, v+ 2]) then only a single value is printed and the pipe seems to fail. Debugging highland is pretty rough. – Michael Connor Sep 08 '15 at 18:58
1

My original intent was to write a recursive file reader in highland.js. I posted to the highland.js github issues list and Victor Vu helped me put this together with a fantastic write-up.

H = require('highland')
fs = require('fs')
fsPath = require('path')

###
  directory >---m----------> dirFilesStream >-------------f----> out
                |                                         |
                |                                         |
                +-------------< returnPipe <--------------+

  legend: (m)erge  (f)ork

 + directory         has the initial file
 + dirListStream     does a directory listing
 + out               prints out the full path of the file
 + directoryFilter   runs stat and filters on directories
 + returnPipe        the only way i can

###

directory = H(['someDirectory'])
mergePoint = H()
dirFilesStream = mergePoint.merge().flatMap((parentPath) ->
  H.wrapCallback(fs.readdir)(parentPath).sequence().map (path) ->
    fsPath.join parentPath, path
)
out = dirFilesStream
# Create the return pipe without using pipe!
returnPipe = dirFilesStream.observe().flatFilter((path) ->
  H.wrapCallback(fs.stat)(path).map (v) ->
    v.isDirectory()
)
# Connect up the merge point now that we have all of our streams.
mergePoint.write directory
mergePoint.write returnPipe
mergePoint.end()
# Release backpressure.
out.each H.log
Michael Connor
  • 4,182
  • 24
  • 21