RxJS-based client for real-time data streaming using @leancodepl/pipe
npm install @leancodepl/rx-pipe-clientRxJS-based topic functions for real-time data subscriptions using @leancodepl/pipe.
``bash`
npm install @leancodepl/rx-pipe-clientor
yarn add @leancodepl/rx-pipe-client
Creates topic functions for data subscriptions using @leancodepl/pipe.
Parameters:
- pipe: Pipe - Pipe instance from @leancodepl/pipe
Returns: Object containing createTopic method for creating typed observables
`typescript
import { mkPipeClient } from "@leancodepl/rx-pipe-client"
import { createPipe } from "@leancodepl/pipe"
const pipe = createPipe({
url: "wss://api.example.com/pipe",
getAccessToken: () => localStorage.getItem("token"),
})
const pipeClient = mkPipeClient({ pipe })
`
`typescript
import { mkPipeClient } from "@leancodepl/rx-pipe-client"
import { createPipe } from "@leancodepl/pipe"
interface ChatTopic {
roomId: string
}
interface ChatNotifications {
MessageReceived: {
id: string
content: string
authorId: string
}
}
const pipe = createPipe({
url: "wss://api.example.com/pipe",
getAccessToken: () => localStorage.getItem("token"),
})
const pipeClient = mkPipeClient({ pipe })
const chatTopic = pipeClient.createTopic
chatTopic({ roomId: "room1" }).subscribe(notification => {
if (notification.type === "MessageReceived") {
console.log(New message: ${notification.data.content})`
}
})
`typescript
import { filter, map } from "rxjs/operators"
import { mkPipeClient } from "@leancodepl/rx-pipe-client"
import { createPipe } from "@leancodepl/pipe"
interface MetricsTopic {
dashboardId: string
}
interface MetricsNotifications {
CpuUpdate: { value: number }
MemoryUpdate: { value: number }
}
const pipe = createPipe({
url: "wss://api.example.com/pipe",
getAccessToken: () => localStorage.getItem("token"),
})
const pipeClient = mkPipeClient({ pipe })
const metricsTopic = pipeClient.createTopic
const cpuUpdates$ = metricsTopic({ dashboardId: "main" }).pipe(
filter(notification => notification.type === "CpuUpdate"),
map(notification => notification.data.value),
)
cpuUpdates$.subscribe(value => {
console.log(CPU: ${value}%)``
})