class Collector {
constructor(supplier, accumulator, finisher=a => a) {
this.supplier = supplier;
this.accumulator = accumulator;
this.finisher = finisher;
}
static toArray() {
return self._toArray ??= new Collector(Array, (acc, item) => (acc.push(item), acc));
}
static toSet() {
return self._toSet ??= new Collector(() => new Set, (acc, item) => acc.add(item));
}
static toMap(keyMapper, valueMapper) {
return new Collector(() => new Map, (acc, item) => acc.add(keyMapper(item), valueMapper(item)));
}
static toObject(keyMapper, valueMapper) {
return new Collector(Object, (acc, item) => acc[keyMapper(item)] = valueMapper(item));
}
static averaging(mapper=a => a) {
return new Collector(
() => [0, 0],
([sum, count], value) => [sum + mapper.call(value, value), count + 1],
([sum, count]) => sum / count
);
}
static collectingAndThen({supplier, accumulator, finisher}, nextFinisher) {
return new Collector(
supplier,
accumulator,
value => (prev => nextFinisher.call(prev, prev))(finisher.call(value, value))
);
}
static counting() {
return this._counting ??= new Collector(Number, (count, value) => count + 1);
}
static summing(mapper=Number) {
return new Collector(Number, (sum, value) => sum + mapper.call(value, value));
}
static joining(delimiter=",", prefix="", postfix="") {
return this.collectingAndThen(Collector.toArray(), arr => prefix + arr.join(delimiter) + postfix);
}
// No implementation of partitioningBy, as it is the same as groupingBy with a boolean classifier
static groupingBy(classifier, {supplier, accumulator, finisher} = Collector.toArray()) {
return new Collector(
() => new Map,
(map, value) => {
const key = classifier.call(value, value);
let result = map.get(key) ?? supplier();
return map.set(key, accumulator(result, value));
},
map => {
map.forEach((value, key) => map.set(key, finisher.call(value, value)));
return map;
}
);
}
static mapping(mapper, {supplier, accumulator, finisher}) {
return new Collector(
supplier,
(acc, value) => accumulator(acc, mapper.call(value, value)),
finisher
);
}
static maxBy(comparator) {
return new Collector(
() => undefined,
(acc, value) => acc === undefined || comparator(acc, value) < 0 ? value : acc
);
}
static minBy(comparator) {
return new Collector(
() => undefined,
(acc, value) => acc === undefined || comparator(acc, value) > 0 ? value : acc
);
}
static reducing(binaryOperator, identity, mapper=a => a) {
return new Collector(
() => identity,
(acc, value) => acc === undefined ? mapper.call(value, value) : binaryOperator(acc, mapper.call(value, value))
);
}
}
class Stream {
static of(...args) {
return new Stream(args);
}
static fromGenerator(generator, ...args) {
return new Stream(generator.call(null, ...args));
}
static empty() {
return this.of();
}
static Builder = class Builder {
_items = [];
// Chainable
add(item) {
this.accept(item);
return this;
}
// Other
accept(item) {
if (!this._items) throw new ValueError("The builder has already transitioned to the built state");
this._items.push(item);
}
build() {
if (!this._items) throw new ValueError("The builder has already transitioned to the built state");
let {_items} = this;
this._items = null;
return new Stream(_items);
}
}
static builder() {
return new this.Builder();
}
static iterate(value, produceNextFromLast) {
return this.fromGenerator(function* () {
yield value;
while (true) yield value = produceNextFromLast.call(value, value);
});
}
static generate(produceNext) {
return this.fromGenerator(function* () {
while (true) yield produceNext();
});
}
static concat(a, b) {
return this.fromGenerator(function* () {
yield* a;
yield* b;
});
}
static range(start, end) {
return this.fromGenerator(function* () {
while (start < end) yield start++;
});
}
static rangeClosed(start, last) {
return this.range(start, last + 1);
}
constructor(iterable) {
this._iterable = iterable;
}
// Intermediate (Chainable, pipe) methods
_chain(generator) { // Private helper method
return Stream.fromGenerator(generator, this);
}
filter(predicate) {
return this._chain(function* (previous) {
for (const item of previous) {
if (predicate.call(item, item)) yield item;
}
});
}
distinct() {
const set = new Set;
return this.filter(item => !set.has(item) && set.add(item));
}
map(mapper) {
return this._chain(function* (previous) {
for (const item of previous) yield mapper.call(item, item);
});
}
flatMap(mapper) {
return this._chain(function* (previous) {
for (const item of previous) yield* mapper.call(item, item);
});
}
peek(action) { // Only for debugging
return this._chain(function* (previous) {
for (const item of previous) {
action.call(item, item);
yield item;
}
});
}
sorted(comparator=(a, b) => (a > b) - (a < b)) {
return this._chain(function* (previous) {
yield* [...previous].sort(comparator);
});
}
dropWhile(predicate) {
let active = false;
return this.filter(item => active ||= !predicate.call(item, item));
}
skip(n) {
return this.dropWhile(() => n > 0 && n--);
}
takeWhile(predicate) {
return this._chain(function* (previous) {
for (const item of previous) {
if (!predicate.call(item, item)) break;
yield item;
}
});
}
limit(maxSize) {
return this.takeWhile(() => maxSize > 0 && maxSize--);
}
// Terminal operations below: these do not return a Stream
*[Symbol.iterator]() { // Use JS symbol convention instead of "iterator" method
const iterable = this._iterable;
this.close();
yield* iterable;
}
close() {
if (!this._iterable) throw TypeError("stream has already been operated on or closed");
this._iterable = null;
}
forEach(callback) {
for (const item of this) callback.call(item, item);
}
toArray() {
return [...this];
}
findFirst() {
for (const item of this) return item;
}
allMatch(predicate) {
for (const item of this) {
if (!predicate.call(item, item)) return false;
}
return true;
}
anyMatch(predicate) {
for (const item of this) {
if (predicate.call(item, item)) return true;
}
return false;
}
noneMatch(predicate) {
return !this.anyMatch(predicate);
}
collect(supplier, accumulator, finisher=a => a) {
// Can be called with Collector instance as first argument
if (arguments.length === 1) {
({supplier, accumulator, finisher} = supplier);
}
const reduced = this.reduce(accumulator, supplier());
return finisher.call(reduced, reduced);
}
reduce(accumulator, initialValue) { // interface like Array#reduce
let done, result = initialValue;
const iterator = this[Symbol.iterator]();
if (arguments.length == 1) {
({done, value: result} = iterator.next());
if (done) throw new TypeError("reduce of empty stream without initial value");
}
for (const item of iterator) {
result = accumulator(result, item);
}
return result;
}
count() {
return this.reduce(count => count + 1, 0);
}
max(comparator=(a, b) => (a > b) - (a < b)) {
return this.reduce((a, b) => comparator(a, b) < 0 ? b : a);
}
min(comparator=(a, b) => (a > b) - (a < b)) {
return this.reduce((a, b) => comparator(a, b) < 0 ? a : b);
}
sum() { // Will sum numbers or concatenate strings
return this.reduce((a, b) => a + b, 0);
}
average() {
return this.reduce(([count, sum], b) => [count + 1, sum + b], [0, 0])
.reduce((count, sum) => sum / count);
}
}
// Some example uses....
const first = Stream.iterate(1, i => i + 1)
.flatMap(i => Stream.iterate(i, j => j + 100).limit(2))
.limit(4);
const second = Stream.builder().add(9).add(8).add(7).build().peek(console.log);
console.log("concat", Stream.concat(first, second).toArray());
console.log("average", Stream.range(1, 10).average());
console.log("sum", Stream.range(1, 10).sum());
console.log("random", Stream.generate(Math.random).limit(10).toArray());
console.log("skip & limit", Stream.range(1, 10).skip(4).limit(4).toArray());
console.log("first", Stream.range(1, 10).findFirst());
console.log("min, max", Stream.of(..."fabulous").min(), Stream.of(..."fabulous").max());
console.log("count", Stream.range(1, 10).count());
console.log("distinct and sorted", Stream.of(1, 5, 1, 2, 4, 2).distinct().sorted().toArray());
class Person {
constructor(name, department, salary) {
this.name = name;
this.department = department;
this.salary = salary;
}
getName() { return this.name }
getDepartment() { return this.department }
getSalary() { return this.salary }
toString() { return `Hi ${this.name}!` }
}
let people = [
new Person("John", "reception", 1000),
new Person("Mary", "stocks", 1500),
new Person("Bob", "stocks", 1400),
];
console.log(Stream.of(...people)
.map(Person.prototype.getName)
.collect(Collector.toArray()));
console.log(Stream.of(...people)
.map(Person.prototype.getName)
.collect(Collector.toSet()));
console.log(Stream.of(...people)
.collect(Collector.joining(", ")));
console.log(Stream.of(...people)
.collect(Collector.summing(Person.prototype.getSalary)));
console.log(...Stream.of(...people)
.collect(Collector.groupingBy(Person.prototype.getDepartment)));
console.log(...Stream.of(...people)
.collect(Collector.groupingBy(Person.prototype.getDepartment,
Collector.averaging(Person.prototype.getSalary))));
console.log(...Stream.of(...people)
.collect(Collector.groupingBy(person => person.getSalary() >= 1300)));
// Fibonnacci
console.log(Stream.iterate([0, 1], ([a, b]) => [b, a+b])
.map(([a]) => a)
.takeWhile(a => a < 30)
.dropWhile(a => a < 2)
.toArray());
// Accumulate object keys
console.log(Stream.range(0, 10).collect(Object, (acc, a) => Object.assign(acc, {[a]: 1})));
// Build complete binary tree in breadth-first order
console.log(Stream.iterate(0, i => i + 1)
.limit(10)
.collect(
() => (root => [[root], root])({ left: "x" }),
(acc, value) => {
let [level] = acc;
level.push(level[0].left ? (level.shift().right = {value}) : (level[0].left = {value}))
return acc;
},
acc => acc.pop().right
)
);