An incredibly lightweight version of TC39 Observable
npm install mini-observable``js
import {Observable, fromEvent, map, toggle, merge} from 'mini-observable'
mouseUps = fromEvent('mouseup')
mouseDowns = fromEvent('mousedown')
mouseMoves = fromEvent('mousemove')
isMousingDown = merge(mouseDown, map(() => true), map(mouseUp, () => false))
const unsubscribe = mouseClicks.subscribe(({ preventDefault }) => preventDefault())
`
Observable is a type - similar to, say, Promise - that allows us to emit pieces of data to parts of the code that wish to recieve them.
Conceptually, Observable has some very key features which give it a great deal of power, and are difficult to replicate without the Observable type. Chiefly:
- Observable itself not async, but can be easily used in an async way
- Observable puts composability above all else
- Observable is not evaluated until someone subscribes
- Observable is no longer evaluated after someone unsubscribes
To create an Observable, one passes a subscribe callback. The subscribe callback is called whenever some code subscribes to the Observable, the subscribe callback gets next, error and complete functions which can tell subscribers what is happening with the stream. error and complete will only fire at-most-once, but next can be fired many times (or none!). The subscriber callback can also manage what happens when some code _unsubscribes_ from a subscription - so it can clean up event listeners or null out references etc.
Well, the intent here is to keep the Observable _minimally maximal_. The TC39 Observable polyfill is not an ideal _minimally maximal_ implementation because it tries to stick hard to the spec and as such has a lot of code around error and argument checking - which we can forgo by keeping ourselves disciplined. most.js/RxJS/bacon/etc have an intersection of features and compabilities which we might not make use of or are missing features that we need (in the cas eof xstream which misses utils like debounce), in addition, many of these are much larger - a full most.js import is 10kb, RxJS is 30kb. By keeping our Observable instance very small (just subscribe and of methods) and utils to just what we need, we can keep a very lightweight implementation.
One of the reasons all of utils are in individual files - rather than on the Observable prototype - is to ensure that you can more easily drop observable.js for a compatible (hopefully one day native) implementation, and keep using the operator helpers from this repo.
This library comes with a set of operators that operate over one or more Observables and always return one Observable. Here are some brief descriptions for each of these:
``
of([1,2,3]): |-1-2-3-|`js`
of(items: Array
of creates an Observable (Observable) from a list of items in an Array or Array Like (Array).
`
transform t: (a, b) => a + b
source a: |--1--2--1--2--|
source b: |--1--1--2--2--|
combine(t, a, b): |--2--3--3--4--|
transform t: (a, b) => a + b
source a: |--1-----2-----3--|
source b: |--1--2--3--4--5--|
combine(t, a, b): |--2--3--5--6--8--|
``js`
combine
combine takes two Observables (Observable and Observable) and when it has received values from both Observables will call transform(T, U). combine itself returns an Observable (Observable) which emits next(V) for every return value of the called transform(T, U). transform can be called with stale values, if - for example - sourceB emits after sourceA completes, then transform will be called with the last value from sourceA. The output Observable will only complete() when both sources complete(). Unsubscribing from Observable will unsubscribe from all sources.
``
sources o.a: |------1----------2----------1----------2------|
sources o.b: |------1----------1----------2----------2------|
combineObject(o): |--{a:1,b:1}--{a:1,b:1}--{a:1,b:1}--{a:1,b:1}--|`js`
combineObject(sources: {[name: string]: Observable<>}): Observable<{[name: string]: }>
combineObject takes an Object of Observables and returns an Observable of objects; the keys of which match the key of the sources object, and the values match the values emitted by the Observable values in the sources object. The returned Observable may emit stale values, if - for example - one of the sources completes, then subsequent objects will include the last value of that Observable. The output Observable will only complete() when all sources complete(). Unsubscribing from the output Observable will unsubscribe from all sources.
``
source: |--1-2-3-----4-5------6-------7----|
debounce(200): |---------3-------5-----6-------7--|`js`
debounce
debounce takes a duration of milliseconds, and an Observable. Any values the source Observable emits will not be emitted on the output Observable until the duration has passed. If the source Observable emits multiple values during one duration, then older values are discarded - in other words the output Observable will only emit at-most-once per duration, with the latest value from the source Observable.
``
predicate p: x => x % 2 == 0
source s: |--1--2--3--4--5--6--|
filter(p, s): |-----2-----4-----6--|`js`
filter
filter will execute predicate for every value emitted from Observable. If predicate returns false, then the output Observable _will not_ emit that T. If predicate returns true then the ouput Observable _will_ emit that T.
``
transform t: x => Observable.of(x+1, x+2))
source s: |-1-------2-------3-------7-----|
source t(1): |-2--3--|
source t(2): |-3--4--|
source t(3): |-4--5--|
source t(7): |-8--9--|
flatMap(t, s): |-2--3----3--4----4--5----8--9--|`js`
flatMap
flatMap takes a source Observable and passes each value emitted to transform(T). The output Observable of transform(T) is then immediately subscribed to and any values are emitted on the output Observable. sources from transform(T) may overlap. The output Observable will only complete() when all sources from transform(T) also complete(). Unsubscribing from the output Observable will unsubscribe from all sources, including Observable - and transform(T) will no longer be called.
`js`
fromCallback
fromCallback translates a callback taking function, and returns a function which no longer takes that callback - instead returning an Observable that, when subscribed to, will call the original function with the given arguments and emit next events any time the callback is called. In other words if you have a function like readFile(name: string, callback: (contents: Buffer) => void) then you can call fromCallback(readFile) to get a function of readFile(name: string) => Observable.
`js`
fromEvent(element: HTMLElementLike, name: string, options?: EventListenerOptionsOrUseCapture): Observable
fromEvent takes a name, element and optional options object. The output Observable will call element.addEventListener(name, next, options) - thereby emitting any events from the listener, to the Observable. When unsubscribed, Observable will cleanup the event listener.
`js`
fromPromise
fromPromise takes a Promise and will emit T when Promise resolves. If Promise rejects, then Observable will error(). complete() is called after Promise resolves and Observable emits - as such Observable will only ever emit one value.
``
transform t: x => x * 2
source s: |--1--2--3--4--|
map(s, t): |--2--4--6--8--|`js`
map
map takes a source Observable and calls transform(T) for every emitted value. The returned Observable will emit the returned values from transform. If transform is not callable, and is instead U - then the raw value is simply used instead.
``
source a: |--1-----2-----3-----4-----|
source b: |--9-----8--------7-----6--|
merge(a, b): |--1-9---2-8---3--7--4--6--|`js`
merge(...sources: Array
merge takes an arbitrary amount of source Observable<>s and emits any value from any of those sources in the returned Observable<>. The output Observable<> will only complete() when all sources complete(). Calling unsubscribe() on the output Observable<> will call unsubscribe() on all source Observable<*>s.
``
source s: |--1--2--3--4--5--6--7--|
skip(3, s): |-----------4--5--6--7--|`js`
skip
skip takes a count, and a source Observable. The returned Observable will only emit Ts after the source Observable has emitted count times. Any values the source Observable emits before count is reached will be discarded.
``
source s: |--1--1--2--3--4--4--4--4--5-|
skipRepeats(s): |--1-----2--3--4-----------5-|`js`
skipRepeats
skipRepeats takes source Observable and returns an output Observable which will ignore repeat emissions from the source. Any values emitted multiple times from the source Observable will be discarded after the first emission. In some libraries this is called distinctUntilChanged.
``
source: |-----1--2--3--4--5--6--|
startWith(1): |--1--1--2--3--4--5--6--|`js`
startWith
startWith takes an initial value T and a source Observable and returns an ouput Observable, which immediately emits T, subsequently emitting any Us coming from Observable.
``
transform t: x => Observable.of(x+1, x+1, x+1))
source: |--1-----2--3--4--------|
source t(1): |--2--2--2--------------|
source t(2): |--------3--3--3--------|
source t(3): |-----------4--4--4-----|
source t(4): |--------------5--5--5--|
switchLatest: |--2--2--3--4--5--5--5--|`js`
switchLatest
switchLatest takes a source Observable and calls transform(T) for each emitted value. Like flatMap, switchLatest will immediately subscribe to any Observable coming from transform(T), but _in addition_ to this, will unsubscribe() from any prior Observables - so that there is only ever one Observable subscribed at any one time. In some libraries this is called switchMap.
``
source s: |--1--2--3--4--5--6--7--8--|
source t: |-----T-----F-----T--F-----|
toggle(s, t): |-----2--3--4-----6--7-----|`js`
toggle
toggle takes a source Observable and a toggler Observable. The output Observable will only ever emit Ts while Observable's last emitted value was true. If Observable last emitted false then any Ts up until the next (Observable) true will be discarded. In other words, toggler Observable controls whether or not output Observable emits values or not. If source Observable complete()s then toggler Observable will be unsubscribe()d, however complete() from toggler Observable is ignored. Calling unsubscribe() on output Observable will call unsubscribe()` on both source and toggler.