A library providing an expression language to compose and evaluate stream processing
The real-value library is an expression language for composing flows of data, control and presentation events
The motivations in this are to:
1. allow expression of end to data flows in a form which can be rendered to easily understand end to end logic
2. to express end to end data flows such that value chains can be tested end to end inside unit tests
3. to separate infrastructure complexity out from end to end data flows so that application of infrastructure is a separate concern
4. to allow processing of infinite streams of data within finite computational resource
5. to allow expression of data flows processing logic as functional composition
The value in this is expected to be
1. faster and higher quality solution develop
2. allow contribution to development by a larger pool of resource
The value in this language is going to be provided by the ability for the language across multiple problems domains such that the time to understand how to pick up and modify a solution developed by others is reduced.
A real-value flow model is a graph of data stream nodes intended to have data/events flowing between them.
A flow model should be renderable into a graphical expression language as a way to view the data flow at a macro level.

There will be different flow model nodes. The set of nodes aspire to provide functions similar to [EAI Integration Patterns]() nodes.

The flow model is a factory used to set up data streams and the subsequent flow processing.
```
let model = Model()
//logical stream of values
,1,,,,2,,3
`It would be convenient to have a
from stream node that can reify a stream.
`
model.from(',1,,,,2,,3')
`Stream data can come from csv content. The following would produce a stream of a single value with AssetId=1 and SerialNo=A
`
from()
`Streams should be constructable from csv string, arrays, sql tables, iterators, generators.
`
from([{key:1, value:1},{key: 2, value: 2}])from(someGeneratorFunction)
`Outputing streams
Data streams can be written to files using to stream nodes`
from('1,2,3').to('test.log')from([{key:1,value:1},{key:2,value:2}]).toCSV('test.csv')
`It is likely we want output a stream to other data sinks including queues or databases.
syntax to be decided
A Stream Propagation
A data stream likely represents a data set arriving over time.
For the purpose of this readme we can think data transformation through a flow by considering an input and output stream which is represented as below.
`
,1,,,,2,,3 -=> ,1,,,,2,,3
`A Stream Inspection
A
log operator can be used to view a stream.
`
from(',1,,,,2,,3').log()
`
Motivation: To interogate what the stream looks like at nodes in a data flow model.Stream Propagation Delay
The values may be delayed through a stream
`
,1,,,,2,,3,, -=> ,,,1,,,,2,,3
`A
delay operator would achieve the above translation.
`
from(',1,,,,2,,3').delay(2)
`Motiviation: We could use to represent the activity of a truck moving a payload from mine face to processing conveyor belt.
`
from(',1,,,,2,,3').delay(transportTime)
`Stream Filtering
It will be necessary to select content from a stream.
`
from('1,2,3,4').filter(x=>x%2)
`
Streams May Diverge/Split
Streams may need to diverge
`
b:1,a:1,a:2:b:2,,,b:3,a:3 => ,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3,
`Streams can be piped into any number of downstream stream operators
`
let stream = from('1,2,3,4,5')
stream.filter(selectOdd).to('odd.log')
stream.filter(selectEven).to('event.log')
`Motivation: In a mine a digger may split material into waste and product stockpiles.
`
let stream = from('waste:1,product:1,product:2,,,waste:3,product:3')
stream.filter(filterproduct).tap(...)
stream.filter(filterWaste).tap(...)
`Streams May Merge
Streams may need to flow together.
`
,a:1,a:2,,,,,a:3, ; b:1,,b:2,,b:3, -=> b:1,a:1,a:2:b:2,,,b:3,a:3
`
Note that the a:1 syntax indicate a key with a value A
merge operator could merge 2 streams.
`
from(',a:3,a:3,,,,,a:3,').merge(from('b:3,,b:2,,b:3,'))
`Motivation: In an IOT solution there are streams of asset data from different channels but this source is irrelevant to the UI.
`
from(',Asset111:on,Asset222:off,,').merge(from('Asset333:on,,Asset333:off,'))
`As an alternative a from operation applied to a stream should create a merged stream.
`
from('1,3'{column: 'odd}).from('2,4',{column: 'event'}) //{odd:1,even:2},{odd:3,even:4},
`$3
Streams can be joined using inner,outer,left,right,xor semantics
`
{key:1,b:1},{key:2,b:2} : {key:1,c:1},{key:3,b:3} => {key:1,b:1,c:1},{key:2,b:2},{key:3,b:3}
`Implemented as
`
let s1 = model.from([{ 'key': 1, 'b': 1 }, { 'key': 2, 'value': 2 }])
let s2 = model.from([{ 'key': 1, 'c': 1 }, { 'key': 3, 'value': 3}])
s1.join(s2,{type:'outer'})
`$3
This functionailty is useful there is a slow process for handling individual events but relatively fast for a set of events (such as DB connections).
It lets you take a bunch of separate events and group them into batches of a max batch size.
`
from('data.xlsx').batch({
typeFn: x=>x.type,
addToBatchFb: x=>x,
delay: 1000/ batch up to 1 second/,
maxbatch: 100/max batch size/
}).tap(somefunction)
`Motivation: In a mine a truck can accumulate some load from a digger before moving it to a processing stockpile.
Stream Reduction
Reduce the events into a stream
The stream values may accumulate before propagating
`
1,1,1 -=> ,,3
``
from(',1,,,,2,,3').reducer(truckCapaciityReducer)
`Stream Propagation Limiting
TODO
The values may need to be limited before propagating
`
,1,,,,2,,3,, -=> ,1,,,,1,1,1,1,1
`
A limitTo operator could facilitate splitting defined chunks
`
from(',1,,,,2,,3').limitTo(1).log
`Motivation: A mine conveyor system has a defined throughput.
`
from(stockpile).limitTo(1)
`Stream Processing Composition
It should be possible to compose stream operations such that specific processing logic can be built up from smaller testable units.
Motivation: In a mine it would take a truck some time to return from the stockpile before it can be reloaded.
`
from(',1,,,,2,,3').compose(accumulateTo(truckCapacity),delay(truckReturnTrip))
`
Streams May Be Accumulated into Categories
Streams may need to accumulate into categories
`
,a:1,a:2,,c:2,,b:3 => { a: 4, b: 3, c:2 }
`A
table operator would acheive this
`
from(',a:1,a:2,,c:2,,,b:2,').table(accumulate)
`For some production, utilization was accumulated into assets
`
from(',E16:10,D10:20,,E15:10,,,E16:10,').table(accumulate)
`The aggregation into a table should be capable of updating statistics
`
,a:1,a:3,,b:2,,b:4 => average:{ a: 2, b: 3 }
`For IOT solution we may want to capture statistics across the type/class of asset
`
from(',Asset1:10,Asset11:20,,Asset12:10,,,Asset10:20,').map(mapToAssetType).table(movingAverage,'averageUtilizationByType')
`Note that table should accumulate but also emits changes to the tables.
`
from('A:1,A:10,,B:2,B:2,C:3,').table(accumulate).log() //should generate A:1,A:10,B:2,/no change/,C:3
`Enrichment
One reason to accumulate into categories is to be able to enrich event streams with meta data.
`
let latest = from([{id: 'a',value: 1},{id: 'b',value: 1},{id: 'a',value: 2}]).table()
from('a,a,a,a,').join(latest).log()
`Motivation: For a set of timesheet entries it may be necessary to augment records with the employee details
`
let employeeData = from([{id: '123',name: 'Jim Smith'},{id: '234',name: 'Sally Smith'}]).table()
from([{id: '234', time: 6}],a,a,a,').join(latest).log()
`Change Propagation
When there is a change to a stream it may be desirable to reprocess the content of a table stream. For instance getting new information about depreciation rates could require reprocessing a table stream representing the value of some assets.`
from(somestream).table({propagate:change}) //only table changes are propagated.
from(somestream).table({propagate:all}) //any table change generates a stream of the entire table.
`Streams May Need Accumulated Between Values
Streams may need to accumulate between values
`
,a:1,b:1,,b:0,,a:0 => { a: 5, b: 3 }
`A
delta operation might allow this to be expressed
`
from(',a:1,b:1,,b:0,,a:0').table(compose(delta,accumulate))
`MOtivation: In an IOT solution there is utilization related to time periods between on/off events
`
from(',Asset111:on,Asset222:on,,Asset222:off,,Asset:on').table(compose(delta,accumulate))
`Streams may need to be compared against targets
Streams may need to be compared against targetsMotivation: In a mine there was a need to show up to date production versus target production during the course of an interval.
`
let targets = from('targets.csv')
let actuals = from(productionGenerator).join(targets,{joinFn: (actual,target)=>{
actual.percentage = actual.value/target.value
}})
`$3
It should be possible to define a stream with a source to be re-used multiple times.`
let identityStream = map(x=>x) //The identity processing stream
from(testStream).thru(identity) // Used to test the stream definition
from(productionStream2).thru(identity) //Used to process production
`$3
Streams should be usable to update functions used in other streams.
A stream of market data is used to update a function which can be used to value an individual item.`
let valuer = () => ({
let valuationModel = //the model
trainValue: (historicalSaleData)=>{//update model },
value: (equipment)=>valuationModel.value(equipment)
})from(historicalSaleData).map(x=>valuer.train(x)) //We need to update a valuation model
from(onSaleData).map(x=>valuer.value(x)).table() //then we want to use the model
`
$3
It will be require to apply aggregate statistics against values in a stream. For example if we have a set of asset values
we may want to normalize those values. Aggregation is a difficult concept for stream processing as you never expect to have the full set of data. To support this a buffer node is used to process a set of events (so that aggregrates can be computed) before emitting all those events which can then use those aggregates.
`
let statsProcessing = ()=>{
let stats = {}
let calcStats = () => {...}
let useStats = () => {...}
}
from('data.csv').map(calcStats).buffer(1000).map(useStats)
`Cognitive Load
$3
When developing a flow model there will be need to introspect the stream of data from any one node.
Specifically when unit testing a model provided by some library it is useful to be able to directly address a stream node.
There are a couple ways to acheive this.
All stream nodes have names which default to the type of the node.
`
let filterNode = model.from('1,2,3').filter(x=>x<2>)
filterNode.getName()
`Architectural Concerns
$3
While we should be able to test an entire stream process in a single simple run time, we might want to be able to deploy that stream distributed in multiple processes for scale or to solve other architectural constraints.
A pub node is used to allow streams to support both outcomes.
`
let streamUrl = from('data1.csv').map(tx).pub(options) //deploy to one node
from(streamUrl).map(tx2) //deploy to another node
`Examples

$3
There can be solutions scheduling resources to empty and load shipping containers.
- There are a stream of containers to be unloaded from(port)
- Containers accumulate stuff .limitTo()
- Containers are transported delay().
- Containers are unloaded splitTo()
$3
- There are streams of mine faces from(mineplan)
- Faces need to be cleared splitTo(diggerLoad).accumulateTo(truckCapacitiy)
- Trucks transport waste or mineral delay(travelTime)
$3
- Purchase = from('100,,,,,')
- PlannedMaintenance = from(',-10,-10,-10,-10)
- UnplannedMaintance = from(',,-5,-7,')
- PlannedMaintenace.depreciate(depreciationrate).table(accumulate)
- UnplannedMaintenace.depreciate(depreciationrate).table(accumulate)
- merge(Purchase,PlannedMaintance,Unplanned).depreciate(depreciationrate).table(accumulate)
$3
Assets needs to be valued by composing assets information with depreciation information related to
age, utilization , location, marketsupply
`
function ageValuer {assettype,age} // build linear interpol from asset type and age to a value
from('...',{columns:['assetType','assetAge','value']}).map(ageValuer)
function smuValuer {assetType,smu} //calculate variances of value based on utilization at age and then use deviation from mean utilization to adjust value
from('...',{columns:['assettype','assetSMU','value']}.map(smuvaluer)
function locationValue{assetType,location} //
from('...',{column:['assettype','location','value']}).map(deriveLocationDepreciation)
from('assets.csv').
`Install
`
npm install real-value-lang
yarn install real-value-lang
``see the unit tests
- Kafka Stream
- Most.js reactive programming library
- RxJS
- ScramJet
- View of flow using sankey diagram
- It might be appropriate to consider how to use real-value-lang to perform simulations such as those possible with SLX