Reactive data flow processing library
npm install rx4uA lightweight reactive data stream processing library with a clean functional programming API.
- ⚡ High Performance: Optimized reactive data stream processing
- 🚀 Lightweight: Zero dependencies, small bundle size
- 🔧 Clean API: Intuitive functional programming interface
- 📦 Tree Shaking: Full ES6 modules and Tree Shaking support
- 💪 TypeScript: Complete TypeScript type support
- 🌐 Multi-environment: Supports ESM, CommonJS, and direct CDN usage
``bashpnpm
pnpm add rx4u
🚀 Quick Start
$3
`javascript
import {pipeFrom, of, map} from 'rx4u';// Create data stream
const number$ = of(1, 2, 3, 4, 5);
const double$ = pipeFrom(
number$,
map((x) => x * 2)
);
const unsub = double$(
(value) => console.log(value), // Output: 2, 4, 6, 8, 10
(error) => console.error(error),
() => console.log('Complete')
);
// Unsubscribe
unsub();
`$3
`javascript
const {pipeFrom, of, map, filter} = require('rx4u');// Same usage
`$3
`html
`📚 Core API
$3
####
from(source)Create a data stream from creator.
`javascript
import {from} from 'rx4u';// Create from array
from(() => [1, 2, 3])(console.log);
// Create from Promise
from(() => fetch('https://example.org/products.json'))(console.log, console.error, console.warn);
`####
of(...values)Create a data stream from given values.
`javascript
import {of} from 'rx4u';of(1, 2, 3)(console.log); // Output: 1, 2, 3
`####
interval(ms)Create a timer data stream.
`javascript
import {interval} from 'rx4u';const unsub = interval(1000)(console.log); // Output incremental numbers every second
`$3
####
pipeFrom(source, ...operators)Process data streams using pipeline operators.
`javascript
import {pipeFrom, of, map, filter} from 'rx4u';const result$ = pipeFrom(
of(1, 2, 3, 4, 5),
filter((x) => x > 2),
map((x) => x * 2)
);
result$(console.log); // Output: 6, 8, 10
`$3
####
createState(initialValue)Create reactive state.
`javascript
import {createState} from 'rx4u';const [state$, setState, getState] = createState(0);
// Subscribe to state changes
state$((value) => console.log('State:', value));
// Update state
setState(1); // Output: State: 1
console.log('Current State1:', getState()); // Output: State: 1
setState((prev) => prev + 1); // Output: State: 2
console.log('Current State2:', getState()); // Output: State: 2
`$3
####
firstValueFrom(source)Get the first value from a data stream as a Promise.
`javascript
import {firstValueFrom, of} from 'rx4u';const firstValue = await firstValueFrom(of(1, 2, 3));
console.log(firstValue); // Output: 1
`🔧 Operators
$3
-
map(fn) - Transform each value
- scan(fn, seed) - Accumulation operation
- switchMap(fn) - Switch to a new data stream$3
-
filter(predicate) - Filter values
- take(count) - Take the first N values
- skip(count) - Skip the first N values
- distinct() - Remove duplicates$3
-
merge(...sources) - Merge multiple data streams
- combineLatest(...sources) - Combine latest values$3
-
catchError(handler) - Catch errors
- retry(count) - Retry$3
-
tap(fn) - Side effect operations
- delay(ms) - Delay
- debounce(ms) - Debounce
- throttle(ms) - Throttle💡 Usage Examples
$3
`javascript
import {pipeFrom, of, map, filter, take} from 'rx4u';// Process user list
const users = [
{id: 1, name: 'Alice', age: 25},
{id: 2, name: 'Bob', age: 30},
{id: 3, name: 'Charlie', age: 35},
];
const user$ = pipeFrom(
of(...users),
filter((user) => user.age > 25),
map((user) => user.name),
take(2)
);
const unsub = user$(console.log); // Output: Bob, Charlie
`$3
`javascript
import {pipeFrom, from, switchMap, of, catchError} from 'rx4u';// API call handling
const users$ = pipeFrom(
from(() => fetch('https://example.com/api/users')),
switchMap((response) => response.json()),
catchError((error) => {
console.error('API Error:', error);
return of([]); // Return empty array as default value
})
);
const unsub = users$((users) => {
console.log('User Data:', users);
});
`$3
`javascript
import {pipeFrom, from, switchMap, of, retry, catchError} from 'rx4u';// API call handling
const data$ = pipeFrom(
from(() => fetch('https://example.com/api/data')),
switchMap((response) => {
if (!response.ok) {
throw new Error('Network error');
}
return response.json();
}),
retry(3), // Retry 3 times
catchError((error) => {
console.error('Request failed:', error);
return of({error: 'Data loading failed'});
})
);
const unsub = data$((data) => {
console.log('Data:', data);
});
`🔗 TypeScript Support
rx4u provides complete TypeScript type definitions:
`typescript
import {pipeFrom, of, map, filter, Sub} from 'rx4u';// Type-safe data stream processing
interface User {
id: number;
name: string;
age: number;
}
const users: User[] = [
{id: 1, name: 'Alice', age: 18},
{id: 2, name: 'Bob', age: 30},
];
// TypeScript will automatically infer types
const user$: Sub = of(...users);
const adultName$ = pipeFrom(
user$,
filter((user: User) => user.age > 18), // User type
map((user: User) => user.name) // string type
);
const unsub = adultName$((name: string) => {
// string type
console.log(name);
});
``MIT License
Issues and Pull Requests are welcome!
If you encounter any problems during usage, please:
1. Check the documentation and examples
2. Search existing Issues
3. Create a new Issue describing the problem
---
rx4u - Making reactive programming simpler! 🚀