AI Agent Node.js library for multiplayer ai agents
npm install @multiplayer-app/ai-agent-nodeA Node.js library for building AI agent backends with support for multi-provider AI models, real-time communication, distributed processing, and chat management.
- Multi-Provider AI Support: OpenAI, Anthropic, Google, and OpenRouter
- Streaming Chat Processing: Real-time message streaming with tool calling support
- Distributed Architecture: Kafka and Redis integration for scalable agent processing
- Real-time Communication: Socket.io integration for live updates
- Agent Process Management: Lifecycle management with event-driven architecture
- Artifact Storage: S3 integration for storing generated artifacts
- Context Management: Intelligent context limiting and attachment handling
- Built-in Tools: Form value proposal
``bash`
npm install @multiplayer-app/ai-agent-node ai
- Node.js >= 18
- npm >= 8
The library requires several infrastructure services to function properly. These services can be started using Docker Compose or configured separately.
Use the startServices() function to initialize all required services:
`typescript
import { startServices } from '@multiplayer-app/ai-agent-node';
import { MongoAgentChatRepository, MongoAgentMessageRepository } from '@multiplayer-app/ai-agent-mongo';
// Initialize repositories (example with MongoDB)
const chatRepository = new MongoAgentChatRepository();
const messageRepository = new MongoAgentMessageRepository();
// Start all services
await startServices(chatRepository, messageRepository);
`
This function initializes:
- Kafka Service: Connects to Kafka brokers and subscribes to topics for chat title generation and background processing
- Agent Store: Initializes the in-memory agent process store for managing active agent conversations
- Model Store: Fetches and caches available AI models from configured providers
- S3 Bucket: Ensures the configured S3 bucket exists for artifact storage
Use the stopServices() function to gracefully shut down connections:
`typescript
import { stopServices } from '@multiplayer-app/ai-agent-node';
// Stop all services
await stopServices();
`
This disconnects:
- Redis connections (including pub/sub clients)
- Kafka consumer and producer connections
#### Kafka
Purpose: Kafka is used for asynchronous, distributed processing of agent tasks:
- Chat Title Generation: Generates chat titles asynchronously after the first message to avoid blocking the main request
- Background Processing: Handles long-running agent tasks that don't need immediate response
- Scalability: Allows multiple service instances to process tasks in parallel
Topics:
- chat-title-generation (default): Processes chat title generation requestsbackground-chat-processing
- (default): Handles background agent processing tasks
#### Redis
Purpose: Redis provides:
- Pub/Sub for Socket.IO: Enables real-time message broadcasting across multiple service instances
- Agent State Management: Stores temporary agent process state and event listeners
- Caching: Can be used for caching model information and other frequently accessed data
Connection: Redis is used for both direct key-value operations and pub/sub channels for Socket.IO adapter.
The project includes a docker-compose.yml file that sets up all required services:
`bashStart all services
docker-compose up -d
This starts:
- MongoDB (port 27017): Database for chat and message storage
- Redis (port 6379): Pub/sub and caching
- Zookeeper (port 2181): Required for Kafka
- Kafka (port 9092): Message broker
- MinIO (ports 9000, 9001): S3-compatible object storage for artifacts
Environment Variables
Configure the library using the following environment variables:
$3
`bash
Redis connection URL (alternative to host/port)
REDIS_URL=redis://localhost:6379Or use individual settings
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password # Optional
REDIS_DATABASE=0 # Optional, defaults to 0
`$3
At least one provider API key is required:
`bash
OpenAI
OPENAI_API_KEY=sk-...Anthropic
ANTHROPIC_API_KEY=sk-ant-...Google
GOOGLE_GENERATIVE_AI_API_KEY=...OpenRouter (supports multiple providers)
OPENROUTER_API_KEY=sk-or-...
`$3
`bash
Maximum number of messages to include in context (default: 10)
MAX_CONTEXT_MESSAGES=10Default model to use if none specified
DEFAULT_MODEL=openai/gpt-4o
`$3
`bash
S3 bucket name for storing attachments and artifacts
S3_ATTACHMENTS_BUCKET=ai-agent-attachments
`$3
`bash
Kafka consumer group ID
KAFKA_GROUP_ID=ai-agent-nodeKafka topics
KAFKA_CHAT_TITLE_GENERATION_TOPIC=chat-title-generation
KAFKA_BACKGROUND_CHAT_PROCESSING_TOPIC=background-chat-processing
`$3
`bash
Enable text extraction from documents (default: true)
ENABLE_TEXT_EXTRACTION=trueMaximum size for extracted text in bytes (default: 51200)
MAX_EXTRACTED_TEXT_SIZE=51200
`Usage Examples
$3
The
ChatProcessor class handles all chat operations. The most common use case is creating a streaming message:`typescript
import { ChatProcessor } from '@multiplayer-app/ai-agent-node';
import { MongoAgentChatRepository, MongoAgentMessageRepository } from '@multiplayer-app/ai-agent-mongo';
import { ArtifactStore } from '@multiplayer-app/ai-agent-node';// Initialize repositories
const chatRepository = new MongoAgentChatRepository();
const messageRepository = new MongoAgentMessageRepository();
const artifactStore = new ArtifactStore();
// Create processor
const chatProcessor = new ChatProcessor(
chatRepository,
messageRepository,
artifactStore
);
// Create a streaming message
const stream = await chatProcessor.createMessageStream({
content: 'Hello, how can you help me?',
contextKey: 'support',
userId: 'user-123'
});
// Stream is a Node.js PassThrough stream that emits SSE-formatted events
stream.on('data', (chunk) => {
// Process SSE chunk: "data: {...}\n\n"
const data = chunk.toString();
// Parse and handle stream chunks
});
`$3
The stream emits Server-Sent Events (SSE) with the following chunk types:
`typescript
// Chat metadata (first chunk for new chats)
{
type: 'chat',
chatId: 'chat-123',
chat: { / AgentChat object / }
}// Message updates
{
type: 'message',
message: { / AgentMessage object / }
}
// Errors
{
type: 'error',
error: 'Error message'
}
// Stream end marker
[DONE]
`$3
`typescript
// List chats with filters
const chats = await chatProcessor.listChats({
contextKey: 'support',
userId: 'user-123',
limit: 20,
sortField: 'updatedAt',
sortOrder: SortOrder.Desc
});// Get a specific chat with messages
const chat = await chatProcessor.getChat('chat-id');
// Delete a chat
await chatProcessor.deleteChat('chat-id');
`Agent Setup
Agents are configured using a JSON configuration file that defines agents, their tools, and which contexts they're available in.
$3
`json
{
"models": ["gpt-4o", "claude-3-5-sonnet", "openai/gpt-4"],
"agents": [
{
"name": "support-agent",
"description": "Customer support agent",
"systemPrompt": "You are a helpful customer support agent...",
"defaultModel": "gpt-4o",
"temperature": 0.7,
"maxTokens": 2000,
"contextKeys": ["support", "general"],
"tools": [
{
"type": "api-tool",
"data": {
"title": "Get Order Status",
"description": "Retrieves the status of a customer order",
"method": "GET",
"url": "https://api.example.com/orders/{orderId}",
"headersToPass": ["Authorization"],
"needsApproval": false
}
},
{
"type": "web-search",
"data": {
"title": "web-search"
}
}
]
}
]
}
`$3
`typescript
import { ConfigStore } from '@multiplayer-app/ai-agent-node';
import fs from 'fs';const configStore = ConfigStore.getInstance();
const rawConfig = JSON.parse(fs.readFileSync('agent-config.json', 'utf-8'));
configStore.loadConfig(rawConfig);
`$3
API tools allow agents to make HTTP requests to external services:
`json
{
"type": "api-tool",
"data": {
"title": "Tool Name",
"description": "What this tool does",
"method": "GET|POST|PUT|DELETE|PATCH",
"url": "https://api.example.com/endpoint",
"headersToPass": ["Authorization", "X-Custom-Header"],
"body": {
"type": "object",
"properties": {
"param1": { "type": "string" },
"param2": { "type": "number" }
},
"required": ["param1"]
},
"queryParams": {
"type": "object",
"properties": {
"filter": { "type": "string" }
}
},
"needsApproval": true
}
}
`Key Fields:
-
needsApproval: If true, the tool execution requires user approval before running
- headersToPass: List of request headers to forward from the original request
- body/queryParams: JSON Schema definitions for request parameters (converted to Zod schemas)$3
Web search tools enable agents to search the internet:
`json
{
"type": "web-search",
"data": {
"title": "web-search"
}
}
`$3
You can also add agents programmatically:
`typescript
import { ConfigStore } from '@multiplayer-app/ai-agent-node';
import { AgentToolType } from '@multiplayer-app/ai-agent-types';const configStore = ConfigStore.getInstance();
// Add an agent
configStore.addAgent(['support', 'sales'], {
name: 'custom-agent',
description: 'Custom agent added in code',
systemPrompt: 'You are a helpful assistant...',
defaultModel: 'gpt-4o',
temperature: 0.7,
tools: [
{
type: AgentToolType.WEB_SEARCH,
data: { title: 'web-search' }
}
]
});
// Add a tool to all existing agents
configStore.addToolToAllAgents({
type: AgentToolType.LOCAL_FUNCTION,
data: {
title: 'custom-tool',
needsApproval: false,
// ... tool implementation
}
});
`$3
When a tool has
needsApproval: true, the agent will request approval before execution:1. Agent generates a tool call with
requiresConfirmation: true and an approvalId
2. Chat status changes to AgentStatus.WaitingForUserAction
3. User can approve or deny the tool call
4. If approved, the tool executes; if denied, execution is skipped`typescript
// Process approval using createMessageStream (handles chat lookup internally)
const stream = await chatProcessor.createMessageStream({
chatId: 'chat-id',
messageId: 'message-id',
approvalId: 'approval-id',
approved: true,
reason: 'User approved'
});// Or use streamMessage directly if you already have the chat object
const chat = await chatProcessor.getChat('chat-id');
await chatProcessor.streamMessage(chat, {
chatId: 'chat-id',
messageId: 'message-id',
approvalId: 'approval-id',
approved: true,
reason: 'User approved'
});
`$3
Messages can include attachments for context:
`typescript
import { AgentAttachmentType } from '@multiplayer-app/ai-agent-types';const stream = await chatProcessor.createMessageStream({
content: 'Analyze this document',
contextKey: 'support',
attachments: [
{
id: 'att-1',
type: AgentAttachmentType.File,
name: 'document.pdf',
url: 'https://s3.../document.pdf',
mimeType: 'application/pdf',
size: 1024000
},
{
id: 'att-2',
type: AgentAttachmentType.Context,
name: 'Page Context',
metadata: {
kind: 'webSnippet',
source: { url: 'https://example.com' },
selectedText: 'Selected text from page'
}
}
]
});
`Database Access
The library uses repository interfaces from
@multiplayer-app/ai-agent-db for database operations. You can use any implementation that conforms to these interfaces.$3
`typescript
import mongo from '@multiplayer-app/ai-agent-mongo';
import { MongoAgentChatRepository, MongoAgentMessageRepository } from '@multiplayer-app/ai-agent-mongo';// Connect to MongoDB
await mongo.connect();
// Create repositories
const chatRepository = new MongoAgentChatRepository();
const messageRepository = new MongoAgentMessageRepository();
// Use with ChatProcessor
const chatProcessor = new ChatProcessor(
chatRepository,
messageRepository,
artifactStore
);
`$3
The library works with any repository implementation that matches the interfaces:
`typescript
import type {
AgentChatRepository,
AgentMessageRepository
} from '@multiplayer-app/ai-agent-db';// Your custom implementation
class CustomChatRepository implements AgentChatRepository {
// Implement all required methods
}
class CustomMessageRepository implements AgentMessageRepository {
// Implement all required methods
}
// Use with ChatProcessor
const chatProcessor = new ChatProcessor(
new CustomChatRepository(),
new CustomMessageRepository(),
artifactStore
);
`$3
AgentChatRepository:
-
create(data): Create a new chat
- findById(id): Find chat by ID
- update(id, data): Update chat
- delete(id): Delete chat
- findWithMessages(filter, options): Find chats with aggregated messagesAgentMessageRepository:
-
create(data): Create a new message
- findById(id): Find message by ID
- findByChatId(chatId): Find all messages for a chat
- update(id, data): Update message
- delete(id): Delete messageAdvanced Usage
$3
Monitor and control agent processes:
`typescript
import { agentStore, AgentProcessEventType } from '@multiplayer-app/ai-agent-node';// Listen to agent process events
agentStore.addListener('chat-id', (event) => {
switch (event.type) {
case AgentProcessEventType.Update:
console.log('Agent updated:', event.data);
break;
case AgentProcessEventType.Finished:
console.log('Agent finished:', event.data);
break;
case AgentProcessEventType.Error:
console.error('Agent error:', event.data);
break;
}
});
// Stop an agent process
agentStore.stopAgentProcess('chat-id');
`$3
The library includes built-in tools:
`typescript
import {
createProposeFormValuesTool,
createGenerateChartTool
} from '@multiplayer-app/ai-agent-node';// Add form proposal tool to all agents
configStore.addToolToAllAgents(createProposeFormValuesTool());
// Add chart generation tool to all agents
configStore.addToolToAllAgents(createGenerateChartTool());
`$3
The library automatically limits context to prevent token limit issues:
`typescript
import { helpers } from '@multiplayer-app/ai-agent-node';const limitedMessages = helpers.ContextLimiter.limitContext(messages, {
maxMessages: 10,
keepFirstUserMessage: true,
keepSystemMessages: true
});
`$3
Store and retrieve artifacts generated by agents:
`typescript
// List artifacts for a chat
const artifacts = chatProcessor.listArtifacts('chat-id');// Artifacts are automatically stored when generated by tools
// Access via ArtifactStore if needed
import { ArtifactStore } from '@multiplayer-app/ai-agent-node';
const artifactStore = new ArtifactStore();
const artifacts = artifactStore.listArtifacts('chat-id');
``