Data engineering pipeline library for node.js
npm install workflow.io- Use cases
- Getting started
- Install the package
- Composition of a pipeline
- Create a pipeline
- Run a pipeline
- Use component adapters
- Use Pipeline Factory
- Industrialize a pipeline
- Orchestrator
- Controller
This package is useful for data engineering tasks, such as :
* Extract data from an api, transform it and load it into a database, a file...
* Synchronize data between applications, databases, files...
* Migrate data and apply complex transformations
This package is designed to be simple to use, and to be easily extensible. It provides some predefined components to extract and load data, and some adapters to change the flow of the pipeline.
npm install workflow.io
`$3
* Pipeline : The main class, used to create and start the process. A pipeline is composed of several Components.
* Components : a function or a class that does an action : extract, transform, load data...
$3
Example of a pipeline :
`ts
const example = Pipeline.create([
new HttpExtractor({
url: "https://dummyjson.com/products",
root: "products"
}),
(data: RawProduct[]) => data.map((d: RawProduct, i: number) => {
return {
id: i,
title: d.title,
price: d.price,
realPrice: Math.floor(d.price * (1 - d.discountPercentage / 100)),
rating: d.rating
}
}),
new CsvLoader({
path: "data/customers.csv",
columns: [
"id",
"title",
"price",
"realPrice",
"rating"
]
}),
]);
`
The pipeline above fetchs data from an api, transforms it to select fields and computes a new "realPrice" field, and finally loads it into a csv file.A pipeline is a succession of components, which exchanges data in a chain. A component must be standalone and stateless, it could be an object or a function.
This package provides several components to extract and to load data into files or databases (ie.
`HttpExtractor` or `CsvLoader`). It is possible to create a component with a class that extends `Component` or a function with this signature `(data: []) => []`.$3
A pipeline can easily be run by calling
`process()` on the pipeline object. This function is asynchronous.`ts
example.process().then(() => {
console.log("Pipeline finished")
}).catch((error) => {
console.error("Pipeline failed", error)
});
`The
`process()` function returns a promise of the output data, which resolves when the pipeline is finished, and rejects if an error occurs. It can take a array of data as parameter, which will be the initial data of the pipeline.Use component adapters
To build more complexe pipelines, some components called adapters help to change the flow of the pipeline. For example, the MergeAdapter helps to process several components simultaneously and merged the returning data into a single list.
Example of a MergeAdapter on the previous example :
`ts
const example = Pipeline.create([
new MergeAdapter([
new HttpExtractor({
url: "https://dummyjson.com/products",
root: "products",
headers: {}
}),
new HttpExtractor({
url: "https://dummyjson.com/products",
root: "products",
headers: {}
}),
]),
(data: RawProduct[]) => data.map((d: RawProduct, i: number) => {
return {
id: i,
title: d.title,
price: d.price,
realPrice: Math.floor(d.price * (1 - d.discountPercentage / 100)),
rating: d.rating
}
}),
new CsvLoader({
path: "data/customers.csv",
columns: [
"id",
"title",
"price",
"realPrice",
"rating"
]
}),
]);
`
The difference here, is in the extraction of data, this pipeline will fetch data from the api twice and simultaneously, after that it will merge the two results into one.Use Pipeline Factory
The package provides a Pipeline Factory with some predefined pipelines :
*
`PipelineFactory.createETLPipeline` : creates a Extract, Transform, Load pipeline with multiple extractors, transformers and loaders
* `PipelineFactory.createSimpleETLPipeline`` : creates Extract, Transform, Load pipeline with only one extractor and one loaderMore predefined pipelines will be added in the future.
This package provides some options to industrialise a pipeline. A pipeline can also be industrialise in a custom way.