[](https://coveralls.io/github/AWinterman/simple-scuttle?branch=master)
npm install simple-scuttle
Replicate state across a network with the scuttlebutt protocol.
I recommend you at least skim the [paper][] which describes the
protocol before continuing too much further.
Simple-Scuttle exports a class - soit Gossip - whose instances are transform
streams in objectMode. Gossip instances maintain several data structures with
which they manages state (see Gossip.state) and the
propagation of state changes (see Gossip.history).
Rather than implementing several parallel streams, Gossip instances choose
logical branches based on the semantics of the objects written to them-- the
shape of inputs to the stream determine the resulting action. These are
documented in the Expected Objects section.
``js
var deserializer = require('desrializer')
, scuttle = require('simple-scuttle')
, serializer = require('serializer')
, fs = require('fs')
persist = fs.createWriteStream('/path/to/logs', {flag: 'a'})
var peer_io = // a stream which sends and receives messages from the peer.
, config = scuttle.base.config
config.resolve = base.resolution.strictly_order_values
var gossip = new scuttle.Gossip('id', config)
gossip.pipe(serializer).pipe(peer_io).pipe(deserializer).pipe(gossip)
gossip.history.on('update', function(update) {
persist.write(serializer.serialize(update))
})
gossip.history.on('compaction', compact)
function compact(memory, history_instance) {
/ Resolve history to be more compact somehow /
}
`
- base.config: The default config object, described in detail below.base.resolution
- : Some sample conflict resolution functions.
This is a module,
lib/base.js,
with some sample defaults.
`js`
Gossip(String id, Object config, Object state) -> gossipid
- : The unique identifier for each Gossip instance. config
- : an Object which must have the following properties:config.mtu
- : Stands for Maximum Transmission Unit. Determines how manyconfig.max_history
messages the network can handle at once-- this is used to set
opts.highWaterMark.
- : How many updates to store before we begin to forgetconfig.resolve
old ones. Such concerns are absent from the paper, but they seem important
to me. Defaults to 10 if falsey.
- (gossip, update) -> Boolean: A function whichconfig.sort
determines whether or not a given update should be applied.
- : A function which describes how to order updates. Has thestate
same signature as javascripts
Array.sort,
and will be called by Array.sort under the hood. This function is used to
order updates when another Gossip instance requests updates more recent
than a given version number.
- : if you would like to pass a specific object to the Gossip to use as its key value store, perhaps a proxy object which you have overrided to be backed by a database.
The config.resolve function is one of the most consequential decisions you will
make when constructing your distributed system. Please make an informed
decision. Investigate:
- http://aphyr.com/posts/299-the-trouble-with-timestamps,
- http://aphyr.com/posts/286-call-me-maybe-final-thoughts,
- http://pagesperso-systeme.lip6.fr/Marc.Shapiro/papers/RR-6956.pdf.
You will encounter concurrent updates across your system, and how you manage
them will determine the reliability and persistence of your data.
Note that regardless of this function, every valid update (updates to any value
that is not on Object.prototype) will be written to history.
Gossip instances expect objects written to them (with gossip.write) to eitherdigest
be s or updatess.
#### digests ####
`js
var digest
if(more_digest) {
digest = {
digest: true
, source_id: source_id
, version: last_seen_version_for_source_id
}
} else {
digest = {
digest: true
, done: true
}
}
`
The assumption is that a digest object is sent from one node to another, and
specifies what information the receiver should send back to the sender (all
updates the receiver has seen for the specified source node, with version
number greater than digest.version). Upon receiving a digest, the
receiver queues all such updates into its Readable buffer.
If !!digest.done is true, then the receiver will also send back updates on anyconfig.sort
peers it knows about that have version number greater than the version in the
digest. They will be ordered according to , (updated each time
history is updated).
#### updates ####
The other kind of object, the update, is an object that appears like the
following:
`js
var update = {
key: 'age'
, value: 100
, source_id: '#A'
, version: 10
}
`
This says: "source update.source_id thought update.key mapped toupdate.value at version update.version." The config.resolve is a function
that takes an update and the gossip instance and determines whether the gossip
instance should include the update.
Gossip instances are Transform
Streams, so
they implement all of the methods and events as described in the node core
documentation. In addition, there are a few methods specific to this purpose:
This method applies a local update, simply setting the given key to the given
value in the local instance, and tacking on the appropriate version number andsource_id. It's return value indicates whether the underlying stream has hitfalse
its high water mark. If the return value is , do not write untilGossip has emitted a "drain" event.
A method which provides convenient lookup for arbitrary keys. If
Gossip.state has no entry for a given key, this method returns {version: -Infinity, value: null}. Otherwise it returns {version: version,
value: value}
Causes Gossip to queue a randomly sorted set of digest objects into itsGossip
Readable buffer. If another stream reads these, it will respondupdate
with a series of objects. See Expected Object for
information on the shape of the objects.
.gossip will not write to the underlying stream past the highWaterMark, i.e.
after
gossip.push
returns false.
attribute of a Gossip instance. Each key maps to a value and a
version number, so state[key] -> {version: version, value: value}$3
The highest version number the
Gossip instance has seen (both locally and
from other instances)$3
An object for keeping track of updates, and replaying updates from a given peer
on demand.
update objects are transmitted individually via the Gossip's
streaming methods. #### Events ####
Gossip.history is an event emitter, which emits two events:-
"update": Any time an update is applied, the "update" event is emitted,
with the update to be applied.-
"compaction": If the number of updates recorded in the history exceeds the max_history parameter, the "compaction" event is emitted prior to removing old updates from the history. This way the client can implement more dramatic compaction, making their own tradeoffs between performance, replayability, and speed.####
Gossip.history.write(key, value, source_id, version) ####Write a new update to the history. The update is recorded into
Gossip.history.memory, an array of updates, which is then sorted via sort
argument to the Gossip constructor. Next Gossip.history emits an "update"
event is emitted with the update as its argument. This event is emitted to allow
the client to take action prior to pruning the memory array to
max_history's length.####
Gossip.history.news(id, version) -> Array updates ####Returns an array of
updates which came from a source with unique identifier
matching id, and which occurred after version.TODO: #
- Investigate whether history's memory attribute should be an array that is
.sort(fn)`-ed, or a custom implementation, such as [this[npm.im/scuttlebutt]: https://npmjs.org/package/scuttlebutt
[paper]: http://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf
[vector-clocks-hard]: http://basho.com/why-vector-clocks-are-hard/
[cross-filter-sort]: https://github.com/square/crossfilter/blob/master/src/quicksort.js