A lightweight client for fetching events from the Flowcore platform
npm install @flowcore/sdk-data-pump-clientA Flowcore SDK module that provides a lightweight client for fetching events from the Flowcore platform
install with npm:
``bash`
npm install @flowcore/sdk-data-pump-client @flowcore/sdk-oidc-client
or yarn:
`bash`
yarn add @flowcore/sdk-data-pump-client @flowcore/sdk-oidc-client
Create a new instance of the Data Pump client:
`typescript
import {DataPump} from '@flowcore/sdk-data-pump-client';
import {OidcClient} from "@flowcore/sdk-oidc-client";
const client = new OidcClient("your client id", "your client secret", "well known endpoint");
const dataPump = new DataPump("https://graph.api.flowcore.io/graphql", client);
`
> You can configure the page size in the last argument of the constructor, the default is 1000.
Then create a RXJS observable to listen to the events:
`typescript
import {Subject} from 'rxjs';
import {SourceEvent} from "@flowcore/sdk-data-pump-client";
const subject = new Subject
subject.subscribe({
next: (event) => {
console.log(event);
},
complete: () => {
console.log("completed");
},
});
`
Then you can fetch all events with the fetchAllEvents method:
`typescript`
await dataPump.fetchAllEvents(subject, {
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
});
This will loop through all the events for the specified event types and push them to the observable.
> You can specify how many time buckets should be run in parallel with the parallel argument, the default is 1.
To fetch events for a specific time bucket you can use the fetchEvents method:
`typescript`
await dataPump.fetchEvents(subject, {
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
timeBucket: "your time bucket",
});
This will fetch all events for the specified time bucket and push them to the observable.
> To close the observable set the last argument of the fetchEvents method to true.
You can specify the from and to event id to fetch events between a specific range:
`typescript`
await dataPump.fetchEvents(subject, {
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
timeBucket: "your time bucket",
afterEventId: "your from event id",
beforeEventId: "your to event id",
});
These are both exclusive, meaning that the events with the specified id's will not be included in the result. Either and both can be omitted.
You can fetch time buckets for a specific event type with the fetchIndexes method:
`typescript`
await dataPump.fetchIndexes({
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventType: "your event type",
});
This will return a list of time buckets for the specified event type.
> You can also specify the from and to time bucket with the from and to arguments. and it will return the time buckets between the specified range.
You can also pump events to a destination with the pumpEvents method:
`typescript
const abortController = new AbortController();
await dataPump.pumpEvents(
"cache-key",
"observable",
{
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
},
abortController
);
`
this will pump all events using backfilling, then switch to live mode and pump all new events. The cache-key is used to store the last event id in the cache, so that the client can resume from the last event id if it is restarted. The abort controller can be used to stop the pumping.
> Note: the default cache is in memory, you can implement your own cache by extending the SimpleCache class and passing it to the DataPump constructor. via the options object.
> Note: You can also specify from time bucket with the from argument and control the backfilling parallelism with the parallel argument.
> Note: When not passing the abort controller, the data pump will run once until it has fetched all events currently present in the data container.
You can reset the data pump with the reset method:
`typescript`
await dataPump.reset("cache-key");
> Note: it is only possible to reset the data pump if it is not currently running. You can check if the data pump is running with the isRunning method.abort
> To stop the data pump you can use the controller.
You can create your own pump by manually calling the pumpPage method:
`typescript
let cursor: string | undefined = undefined;
do {
const result = await dataPump.pumpPage({
dataCoreId: "your data core id",
aggregator: "your aggregator",
eventTypes: ["your event type"],
timeBucket: "your time bucket",
afterEventId: "your from event id" | undefined,
beforeEventId: "your to event id" | undefined,
}, cursor);
for (const event of result.events) {
console.log(event);
}
cursor = result.cursor;
} while(cursor);
`
This will allow you to control the flow of events and prevent the dump from pumping too many events at once.
##
Development
`bash`
yarn install
or with npm:
`bash``
npm install