1

I am writing a custom operator to load a csv file and emit each line as data. This operator is supposed to work like the of operator, which is a static function to create observable. I follow the instruction of operator creation and add the operator function directly to Observable prototype.

All following code is written in JavaScript ES6.

My source code is this

import { Observable } from 'rxjs';
import { createInterface } from 'readline';
import { createReadStream } from 'fs';

function fromCSV(path, options) {

  return Observable.create((subscriber) => {
    let readStream = createReadStream(path);
    let reader = createInterface({ input: readStream });
    let isHeader = true;
    let columns;

    reader.on('line', (line) => {
      if (isHeader) {
        columns = line.split(',');
        isHeader = false;
      } else {
        let values = line.split(',');
        let valueObject = {};

        for (let i = 0, n = columns.length; i < n; i++) {
          valueObject[columns[i]] = values[i] || undefined;
        }

        subscriber.next(valueObject);
      }
    });

    reader.on('close', () => subscriber.complete());
    readStream.on('error', (error) => subscriber.error(error));
  });
}

Observable.prototype.fromCSV = fromCSV;

The operator function looks totally correct, but when I try to use this operator like

import { Observable } from 'rxjs';
import './rx-from-csv';

Observable.fromCSV(testCSV)
  .subscribe((row) => {
    console.log(row);
  });

It throws an error TypeError: _rxjs.Observable.fromCSV is not a function. So the function binding fails and I have no idea why it happens :-( Any help is appreciated.

This particularly confuses me because I have successfully done a similar operator binding for another custom csv operator.

Haoliang Yu
  • 2,987
  • 7
  • 22
  • 28

2 Answers2

0

The problem is that TypeScript doesn't know about the operator because it couldn't find it in RxJS's *.d.ts.

Have a look at how it's done by the default RxJS operators: https://github.com/ReactiveX/rxjs/blob/master/src/add/operator/bufferCount.ts

In you case you'll need just the declare module ... part with a correct path to the Observable definition. For example:

function fromCSV(path, options) {
  ...
}

Observable.prototype.fromCSV = fromCSV;

declare module 'rxjs/Observable' {
  interface Observable<T> {
    fromCSV: typeof fromCSV;
  }
}
martin
  • 93,354
  • 25
  • 191
  • 226
0

It turns out that I used a wrong way to add static function. See this post for more information.

To add a static function to the Observable class, the code needs to be

Observable.fromCSV = fromCSV;

Adding the function to the class's prototype will make it only available after newing that class.

Community
  • 1
  • 1
Haoliang Yu
  • 2,987
  • 7
  • 22
  • 28