Streams are for everyone
2018, Feb 11
I have been fiddling with reactive programming for a while, mostly with RxJS, and I simply love the tools.
If you don’t know what an observable is, it’s basically a Promise
-style wrapper in which events can be composed. In other words, you can map over the output of functions that would otherwise use callbacks.
This is an example of an event emitter for tracking a user’s status:
function createOnlineEmitter() {
let cbs = []; //array of registered callbacks for the event
let unsub; //function for removing the main event listener
//this is the main event listener that gets registered with window.online/offline event
const mainListener = (isOnline) => {
//call all the subscribed callbacks
cbs.forEach(cb => cb(isOnline));
};
const registerMainListener = () => {
const boundOnline = mainListener.bind(null, true);
const boundOffline = mainListener.bind(null, false);
window.addEventListener('online', boundOnline);
window.addEventListener('offline', boundOffline);
//return unsubcribe functionality in a closure
return function unsubscribe() {
window.removeEventListener('online', boundOnline);
window.removeEventListener('offline', boundOffline);
};
};
const addCb = (cb) => {
cbs.push(cb);
//register main listener only once
//use existence of `unsub` as indicator if main event listener is added or not
if(!unsub) {
unsub = registerMainListener();
}
};
const removeCb = (cb) => {
const index = cbs.indexOf(cb);
if(index > -1) {
cbs.splice(index, 1);
}
//if no callbacks left, remove main event listener
if(cbs.length === 0 && unsub) {
unsub();
unsub = null;
}
};
return function initOnlineEmitter(cb) {
addCb(cb);
//call it async with the initial val
setTimeout(() => {
cb(navigator.onLine);
});
//return unsubscribe function to caller
return removeCb.bind(null, cb);
};
}
// implement it
const onlineEmitter = createOnlineEmitter();
let unsub = onlineEmitter(isOnline => console.log(isOnline));
unsub();
Not too bad, but this is what the implementation would look like with observables:
const { Observable } = require('rxjs/Observable');
require('rxjs/add/observable/fromEvent');
require('rxjs/add/operator/map');
require('rxjs/add/observable/merge');
function createOnline$() {
//merge several events into one
return Observable.merge(
//use .map() to transform the returned Event type into a true/false value
Observable.fromEvent(window, 'offline').map(() => false),
Observable.fromEvent(window, 'online').map(() => true),
//start the stream with the current online status
Observable.create(sub => {
sub.next(navigator.onLine);
sub.complete(); //this one only emits once, so now we end it
})
);
}
// implement it
const onlineSub = createOnline$().subscribe(isOnline => console.log(isOnline));
onlineSub.unsubscribe();
Simple example. Observables are awesome tools.