17

I like the Java 8's streaming API. There are plenty of useful intermediate and terminal methods to transform and collect the stream. I'm talking about intermediate methods like distinct() or terminal methods like collect(). I find the Collector API especially useful, to reduce the stream to deep grouping maps.

What is the javascript equivalent for the Java streaming API? I know there're basic functions like map, filter and reduce, but don't find any more generalized interfaces provided by javascript native to query or group the data in collection. Are there some production ready libraries to match the Java Streaming API?

Tuomas Toivonen
  • 21,690
  • 47
  • 129
  • 225
  • 4
    *"Questions asking us to recommend or find a book, tool, software library, tutorial or other off-site resource are off-topic for Stack Overflow as they tend to attract opinionated answers and spam."* – T.J. Crowder Apr 14 '17 at 08:39
  • Here is some information about a 2 year old stream.js https://www.reddit.com/r/programming/comments/2tp90e/streamjs_java_8_streams_api_ported_to_javascript/ – mplungjan Apr 14 '17 at 08:40
  • 2
    you can use [RxJS](https://github.com/Reactive-Extensions/RxJS), depends of what you want to do – Olivier Boissé Apr 14 '17 at 08:50
  • 1
    There is no equivalent of Java streaming API in JS – ZhekaKozlov Apr 14 '17 at 08:51
  • Lodash will cover most cases : https://lodash.com/docs/ – dashton Apr 28 '17 at 15:53

3 Answers3

15

java 8 stream() is the same as lodash chain()

java 8 collect() is the same as lodash value()

java 8 distinct() is the same as lodash uniq()

java 8 map() is the same as lodash map()

lodash is more comprehensive, since it has been around longer.

Eric Hartford
  • 16,464
  • 4
  • 33
  • 50
5

From the api level, lodash/RxJS/stream.js may meet the requirement, but the powerful thing of Java Stream is it can leverage the modern cpu multi core architecture so to parallelize the jobs. However, this is not solved by any of these pure js libraries, at then end of the day, these js are still running in a single threaded runtime, and has 1 core usage at the same time.

I guess the JS engine needs to provide support to achieve the performance target.

Ming Zhu
  • 272
  • 4
  • 13
  • 1
    The JVM has runtime optimizations that rewrite or restructure code at runtime. Sometimes this means that multiple functions get squeezed together because they are always executed together. That used to make java code platform independent. - Well that has been Java's key idea and design principle right from the start. - However, now things get tricky, because a developer actually DOES have to consider which platform code will run on. He has to know how many CPU cores the system will have. It matters, because if you use `parallel` on a single core system you will actually lose performance. – bvdb Nov 26 '21 at 11:50
  • On top of that, in java threads aren't running that often in parallel as it seems. They often are blocked (network & disk I/O or locks). In Node.js it's almost impossible to write code that will block. Whenever there's disk I/O you have to create a new task to handle the result. That means that tasks never get interrupted halfway. There are no waiting threads, just scheduled tasks, which are less expensive. - Anyway, in mean time java (e.g. Spring WebFlux) also supports this concept. However, it never enforces it the way Node.js does, and because it's not a language feature it needs more code. – bvdb Nov 26 '21 at 12:42
3

JavaScript has no parallelism, so streams would always be sequential, and a collector would not need a combiner.

I have here tried to mimic the Stream API in JavaScript, but stripped from several features. Still I think it has the key features.

As you focussed on Collectors, I added a Collector class with a constructor and static methods which roughly correspond to Java's Collectors (plural) interface. The Stream class has a constructor which takes an iterable. It has most of the static methods as in Java, with some variation when it comes to reduce and iterator to make it more aligned with JavaScript's practice.

There's a Stream.Builder class included as well.

Finally, this snippet runs several examples. I think it looks quite familiar if you know the Java API:

class Collector {
    constructor(supplier, accumulator, finisher=a => a) {
        this.supplier = supplier;
        this.accumulator = accumulator;
        this.finisher = finisher;
    }
    static toArray() {
        return self._toArray ??= new Collector(Array, (acc, item) => (acc.push(item), acc));
    }
    static toSet() {
        return self._toSet ??= new Collector(() => new Set, (acc, item) => acc.add(item));
    }
    static toMap(keyMapper, valueMapper) {
        return new Collector(() => new Map, (acc, item) => acc.add(keyMapper(item), valueMapper(item)));
    }
    static toObject(keyMapper, valueMapper) {
        return new Collector(Object, (acc, item) => acc[keyMapper(item)] = valueMapper(item));
    }
    static averaging(mapper=a => a) {
        return new Collector(
            () => [0, 0], 
            ([sum, count], value) => [sum + mapper.call(value, value), count + 1],
            ([sum, count]) => sum / count
        );
    }
    static collectingAndThen({supplier, accumulator, finisher}, nextFinisher) {
        return new Collector(
            supplier, 
            accumulator,
            value => (prev => nextFinisher.call(prev, prev))(finisher.call(value, value))
        );
    }
    static counting() {
        return this._counting ??= new Collector(Number, (count, value) => count + 1);
    }
    static summing(mapper=Number) {
        return new Collector(Number, (sum, value) => sum + mapper.call(value, value));
    }
    static joining(delimiter=",", prefix="", postfix="") {
        return this.collectingAndThen(Collector.toArray(), arr => prefix + arr.join(delimiter) + postfix);
    }
    // No implementation of partitioningBy, as it is the same as groupingBy with a boolean classifier
    static groupingBy(classifier, {supplier, accumulator, finisher} = Collector.toArray()) {
        return new Collector(
            () => new Map,
            (map, value) => {
                const key = classifier.call(value, value);
                let result = map.get(key) ?? supplier();
                return map.set(key, accumulator(result, value));
            },
            map => {
                map.forEach((value, key) => map.set(key, finisher.call(value, value)));
                return map;
            }
        );
    }
    static mapping(mapper, {supplier, accumulator, finisher}) {
        return new Collector(
            supplier,
            (acc, value) => accumulator(acc, mapper.call(value, value)),
            finisher
        );
    }
    static maxBy(comparator) {
        return new Collector(
            () => undefined,
            (acc, value) => acc === undefined || comparator(acc, value) < 0 ? value : acc
        );
    }
    static minBy(comparator) {
        return new Collector(
            () => undefined,
            (acc, value) => acc === undefined || comparator(acc, value) > 0 ? value : acc
        );
    }
    static reducing(binaryOperator, identity, mapper=a => a) {
        return new Collector(
            () => identity,
            (acc, value) => acc === undefined ? mapper.call(value, value) : binaryOperator(acc, mapper.call(value, value))
        );
    }
}

class Stream {
    static of(...args) {
        return new Stream(args);
    }
    static fromGenerator(generator, ...args) {
        return new Stream(generator.call(null, ...args));
    }
    static empty() {
        return this.of();
    }
    static Builder = class Builder {
        _items = [];
        // Chainable
        add(item) {
            this.accept(item);
            return this;
        }
        // Other
        accept(item) {
            if (!this._items) throw new ValueError("The builder has already transitioned to the built state");
            this._items.push(item);
        }
        build() {
            if (!this._items) throw new ValueError("The builder has already transitioned to the built state");
            let {_items} = this;
            this._items = null;
            return new Stream(_items);
        }
    }
    static builder() {
        return new this.Builder();
    }
    static iterate(value, produceNextFromLast) {
        return this.fromGenerator(function* () {
            yield value;
            while (true) yield value = produceNextFromLast.call(value, value);
        });
    }
    static generate(produceNext) {
        return this.fromGenerator(function* () {
            while (true) yield produceNext();
        });
    }
    static concat(a, b) {
        return this.fromGenerator(function* () {
            yield* a;
            yield* b;
        });
    }
    static range(start, end) {
        return this.fromGenerator(function* () {
            while (start < end) yield start++;
        });
    }
    static rangeClosed(start, last) {
        return this.range(start, last + 1);
    }

    constructor(iterable) {
        this._iterable = iterable;
    }
    // Intermediate (Chainable, pipe) methods
    _chain(generator) { // Private helper method
        return Stream.fromGenerator(generator, this);
    }
    filter(predicate) {
        return this._chain(function* (previous) {
            for (const item of previous) {
                if (predicate.call(item, item)) yield item;
            }
        });
    }
    distinct() {
        const set = new Set;
        return this.filter(item => !set.has(item) && set.add(item));
    }
    map(mapper) {
        return this._chain(function* (previous) {
            for (const item of previous) yield mapper.call(item, item);
        });
    }
    flatMap(mapper) {
        return this._chain(function* (previous) {
            for (const item of previous) yield* mapper.call(item, item);
        });
    }
    peek(action) { // Only for debugging
        return this._chain(function* (previous) {
            for (const item of previous) {
                action.call(item, item);
                yield item;
            }
        });
    }
    sorted(comparator=(a, b) => (a > b) - (a < b)) {
        return this._chain(function* (previous) {
            yield* [...previous].sort(comparator);
        });
    }
    dropWhile(predicate) {
        let active = false;
        return this.filter(item => active ||= !predicate.call(item, item));
    }
    skip(n) {
        return this.dropWhile(() => n > 0 && n--);
    }
    takeWhile(predicate) {
        return this._chain(function* (previous) {
            for (const item of previous) {
                if (!predicate.call(item, item)) break;
                yield item;
            }
        });
    }
    limit(maxSize) {
        return this.takeWhile(() => maxSize > 0 && maxSize--);
    }
    // Terminal operations below: these do not return a Stream
    *[Symbol.iterator]() {  // Use JS symbol convention instead of "iterator" method
        const iterable = this._iterable;
        this.close();
        yield* iterable;
    }
    close() {
        if (!this._iterable) throw TypeError("stream has already been operated on or closed");
        this._iterable = null;
    }
    forEach(callback) {
        for (const item of this) callback.call(item, item);
    }
    toArray() {
        return [...this];
    }
    findFirst() {
        for (const item of this) return item;
    }
    allMatch(predicate) {
        for (const item of this) {
            if (!predicate.call(item, item)) return false;
        }
        return true;
    }
    anyMatch(predicate) {
        for (const item of this) {
            if (predicate.call(item, item)) return true;
        }
        return false;
    }
    noneMatch(predicate) {
        return !this.anyMatch(predicate);
    }
    collect(supplier, accumulator, finisher=a => a) {
        // Can be called with Collector instance as first argument
        if (arguments.length === 1) {
            ({supplier, accumulator, finisher} = supplier);
        }
        const reduced = this.reduce(accumulator, supplier());
        return finisher.call(reduced, reduced);
    }
    reduce(accumulator, initialValue) {  // interface like Array#reduce
        let done, result = initialValue;
        const iterator = this[Symbol.iterator]();
        if (arguments.length == 1) {
            ({done, value: result} = iterator.next());
            if (done) throw new TypeError("reduce of empty stream without initial value");
        }
        for (const item of iterator) {
            result = accumulator(result, item);
        }
        return result;
    }
    count() {
        return this.reduce(count => count + 1, 0);
    }
    max(comparator=(a, b) => (a > b) - (a < b)) {
        return this.reduce((a, b) => comparator(a, b) < 0 ? b : a);
    }
    min(comparator=(a, b) => (a > b) - (a < b)) {
        return this.reduce((a, b) => comparator(a, b) < 0 ? a : b);
    }
    sum() { // Will sum numbers or concatenate strings
        return this.reduce((a, b) => a + b, 0);
    }
    average() {
        return this.reduce(([count, sum], b) => [count + 1, sum + b], [0, 0])
                   .reduce((count, sum) => sum / count);
    }
}

// Some example uses....

const first = Stream.iterate(1, i => i + 1)
                 .flatMap(i => Stream.iterate(i, j => j + 100).limit(2))
                 .limit(4);

const second  = Stream.builder().add(9).add(8).add(7).build().peek(console.log);

console.log("concat", Stream.concat(first, second).toArray());
console.log("average", Stream.range(1, 10).average());
console.log("sum", Stream.range(1, 10).sum());
console.log("random", Stream.generate(Math.random).limit(10).toArray());
console.log("skip & limit", Stream.range(1, 10).skip(4).limit(4).toArray());
console.log("first", Stream.range(1, 10).findFirst());
console.log("min, max", Stream.of(..."fabulous").min(), Stream.of(..."fabulous").max());
console.log("count", Stream.range(1, 10).count());
console.log("distinct and sorted", Stream.of(1, 5, 1, 2, 4, 2).distinct().sorted().toArray());

class Person {
    constructor(name, department, salary) {
        this.name = name;
        this.department = department;
        this.salary = salary;
    }
    getName() { return this.name }
    getDepartment() { return this.department }
    getSalary() { return this.salary }
    toString() { return `Hi ${this.name}!` }
}

let people = [
    new Person("John", "reception", 1000), 
    new Person("Mary", "stocks", 1500),
    new Person("Bob", "stocks", 1400),
];
console.log(Stream.of(...people)
                  .map(Person.prototype.getName)
                  .collect(Collector.toArray()));
console.log(Stream.of(...people)
                  .map(Person.prototype.getName)
                  .collect(Collector.toSet()));
console.log(Stream.of(...people)
                  .collect(Collector.joining(", ")));
console.log(Stream.of(...people)
                  .collect(Collector.summing(Person.prototype.getSalary)));
console.log(...Stream.of(...people)
                     .collect(Collector.groupingBy(Person.prototype.getDepartment)));
console.log(...Stream.of(...people)
                     .collect(Collector.groupingBy(Person.prototype.getDepartment,
                                                   Collector.averaging(Person.prototype.getSalary))));
console.log(...Stream.of(...people)
                     .collect(Collector.groupingBy(person => person.getSalary() >= 1300)));

// Fibonnacci
console.log(Stream.iterate([0, 1], ([a, b]) => [b, a+b]) 
                  .map(([a]) => a)
                  .takeWhile(a => a < 30)
                  .dropWhile(a => a < 2)
                  .toArray());

// Accumulate object keys
console.log(Stream.range(0, 10).collect(Object, (acc, a) => Object.assign(acc, {[a]: 1}))); 

// Build complete binary tree in breadth-first order
console.log(Stream.iterate(0, i => i + 1)
    .limit(10)
    .collect(
        () => (root => [[root], root])({ left: "x" }), 
        (acc, value) => {
            let [level] = acc;
            level.push(level[0].left ? (level.shift().right = {value}) : (level[0].left = {value}))
            return acc;
        },
        acc => acc.pop().right
    )
);
trincot
  • 317,000
  • 35
  • 244
  • 286