JavaScript has multiple APIs that use callback functions that all do nearly the same thing with slight variations. Event listeners, array methods such as .forEach
, promises, and NodeJS streams all are very close in the way they are written. Instead, in RxJS you‘d unify all of these APIs under one abstraction.
Normal RxJS API:
import { from } from "rxjs"; import { map, filter } from "rxjs/operators"; from([1, 2, 3, 4]) .pipe(map(x => x * 2)) .pipe(filter(x => x < 5)) .subscribe(val => console.log(val)); // 2 // 4
We can build our own RxJS operator
it has API:
{
subscribe() {}
pipe() {}
}
We can create a function call ‘createObservable(subscribe)‘, take a subscribe function, return a subscribe and pipe function:
function createObservable(subscribe) { return { subscribe, pipe(operator) { return operator(this); } }; }
We can use it to create Observables:
const numberObservable = createObservable(function(observer) { [10, 20, 30, 40].forEach(x => { observer.next(x); }); observer.complete(); }); const clickObservable = createObservable(function(observer) { document.addEventListener("click", function(ev) { observer.next(ev); }); });
Observer is easy, it takes a object which contains ‘next‘, ‘error‘, ‘complete‘ functions:
const observer = { next(x) { console.log(x); }, error(err) { console.error(err); }, complete() { console.log("DONE"); } };
map(fn)(observable)
filter(predFn)(observable)
It is important to know that map & filter, those operator, takes an inputObservable and will return an outputObservable.
We subscribe inputObservable, and inputObserver, inside inputObserver, we call outputObserver which is passed in from the consumer.
const map = fn => inputObservable => { const outputObservable = createObservable(function(outputObserver) { const observer = { next(x) { const res = fn(x); outputObserver.next(res); }, error(err) { outputObserver.error(err); }, complete() { outputObserver.complete(); } }; inputObservable.subscribe(observer); }); return outputObservable; }; const filter = fn => inputObservable => { const outputObservable = createObservable(function(outputObserver) { const observer = { next(x) { if (fn(x)) { outputObserver.next(x); } }, error(err) { outputObserver.error(err); }, complete() { outputObserver.complete(); } }; inputObservable.subscribe(observer); }); return outputObservable; };
--
Full Code:
function createObservable(subscribe) { return { subscribe, pipe(operator) { return operator(this); } }; } const numberObservable = createObservable(function(observer) { [10, 20, 30, 40].forEach(x => { observer.next(x); }); observer.complete(); }); const clickObservable = createObservable(function(observer) { document.addEventListener("click", function(ev) { observer.next(ev); }); }); const map = fn => inputObservable => { const outputObservable = createObservable(function(outputObserver) { const observer = { next(x) { const res = fn(x); outputObserver.next(res); }, error(err) { outputObserver.error(err); }, complete() { outputObserver.complete(); } }; inputObservable.subscribe(observer); }); return outputObservable; }; const filter = fn => inputObservable => { const outputObservable = createObservable(function(outputObserver) { const observer = { next(x) { if (fn(x)) { outputObserver.next(x); } }, error(err) { outputObserver.error(err); }, complete() { outputObserver.complete(); } }; inputObservable.subscribe(observer); }); return outputObservable; }; const observer = { next(x) { console.log(x); }, error(err) { console.error(err); }, complete() { console.log("DONE"); } }; numberObservable .pipe(map(x => x * 3)) .pipe(map(x => x - 9)) .subscribe(observer); clickObservable .pipe(map(ev => [ev.clientX, ev.clientY])) .pipe(filter(([x, y]) => x < 200 && y < 200)) .subscribe(observer);
原文:https://www.cnblogs.com/Answer1215/p/10662844.html