NexFlow CAAS - Cron & Keep-Alive Oracle for Virtuals ACP
npm install nexflow-caasFOR UPDATE SKIP LOCKED prevents duplicate processing across workers. |
jobId to deduplicate if you receive the same webhook twice.
┌─────────────────────────────────────────────────────────────┐
│ NexFlow CAAS │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ API Server │ │ Scheduler │ │
│ │ (Fastify) │ │ Worker │ │
│ │ │ │ │ │
│ │ POST /acp/ │ │ Poll every 60s │ │
│ │ schedule-task │ │ Fetch due jobs │ │
│ │ │ │ Fire webhooks │ │
│ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │
│ └───────────┬───────────────┘ │
│ │ │
│ ┌─────────▼─────────┐ │
│ │ PostgreSQL │ │
│ │ scheduled_jobs │ │
│ └───────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
`
$3
All job creation flows through a single protocol-neutral DTO:
`
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ REST API │ │ ACP Client │ │ Internal │
│ /acp/* │ │ (Virtuals) │ │ (SMF) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
│ mapLegacy...() │ acpInput...() │ (direct)
▼ ▼ ▼
┌────────────────────────────────────────────────────────────┐
│ ScheduleJobRequest (Kernel DTO) │
│ tenantId | source | callbackUrl | triggerType | payload │
└──────────────────────────┬─────────────────────────────────┘
│
▼
scheduleJobFromKernel()
│
▼
PostgreSQL
`
| Field | Description |
|-------|-------------|
| tenantId | Generic identifier (ACP buyer agent, customer ID, SMF ID) |
| source | Origin: 'ACP', 'HTTP', or 'INTERNAL' |
| callbackUrl | HTTPS webhook URL |
| triggerType | 'AT_TIME' or 'IN_DELAY' |
| payload | JSON payload (max 16 KB) |
| metadata | Optional context (ACP task ID, correlation ID) |
This design enables:
- NexFlow public API → same kernel
- ACP integration → same kernel
- SMF agent → same kernel
Tech Stack
- Runtime: Node.js 20+
- Language: TypeScript (ESM)
- Web Framework: Fastify
- Database: PostgreSQL
- Migrations: node-pg-migrate
- Process Manager: PM2
- Logging: Pino (JSON structured logs)
- Validation: Zod
Quick Start
$3
- Node.js 20+
- PostgreSQL 14+
- PM2 (for production)
$3
`bash
Clone and enter directory
cd nexflow-caas
Install dependencies
npm install
Copy environment file
cp env.example .env
Edit .env with your configuration
`
$3
Edit .env with your settings:
`bash
Required
DATABASE_URL=postgresql://user:password@localhost:5432/nexflow_caas
ACP_PRIVATE_KEY=your_wallet_private_key
Optional (shown with defaults)
PORT=3100
LOG_LEVEL=info
SCHEDULER_POLL_INTERVAL_MS=60000
SCHEDULER_BATCH_SIZE=50
SCHEDULER_MAX_RETRIES=5
WEBHOOK_TIMEOUT_MS=30000
`
$3
`bash
Create database (if needed)
createdb nexflow_caas
Run migrations
npm run migrate
`
$3
`bash
Build TypeScript
npm run build
Start API server (terminal 1)
npm run start:api
Start scheduler worker (terminal 2)
npm run start:scheduler
`
For development with auto-reload:
`bash
API server with watch
npm run dev:api
Scheduler with watch
npm run dev:scheduler
`
$3
`bash
Build first
npm run build
Start both processes
npm run pm2:start
View logs
npm run pm2:logs
Stop all
npm run pm2:stop
Restart
npm run pm2:restart
`
API Reference
$3
`bash
Schedule task to run at specific time
curl -X POST http://localhost:3100/acp/schedule-task \
-H "Content-Type: application/json" \
-d '{
"buyer_agent_id": "my-agent-001",
"callback_url": "https://example.com/webhook",
"callback_auth": "secret-token",
"trigger_type": "AT_TIME",
"trigger_time": "2024-12-25T12:00:00Z",
"payload": {
"action": "send_reminder",
"user_id": "user123"
}
}'
Schedule task with delay (e.g., 5 minutes from now)
curl -X POST http://localhost:3100/acp/schedule-task \
-H "Content-Type: application/json" \
-d '{
"buyer_agent_id": "my-agent-001",
"callback_url": "https://example.com/webhook",
"trigger_type": "IN_DELAY",
"delay_seconds": 300,
"payload": {
"action": "check_status"
}
}'
`
Response:
`json
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"status": "scheduled",
"trigger_time": "2024-12-25T12:00:00.000Z"
}
`
$3
`bash
curl http://localhost:3100/acp/schedule-task/550e8400-e29b-41d4-a716-446655440000
`
Response:
`json
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"status": "PENDING",
"triggerTime": "2024-12-25T12:00:00.000Z",
"attemptCount": 0,
"lastError": null,
"createdAt": "2024-12-20T10:00:00.000Z",
"updatedAt": "2024-12-20T10:00:00.000Z"
}
`
$3
`bash
curl -X POST http://localhost:3100/acp/schedule-task/550e8400-e29b-41d4-a716-446655440000/cancel
`
Response:
`json
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"status": "CANCELLED",
"message": "Job cancelled successfully"
}
`
$3
`bash
curl http://localhost:3100/health
`
Response:
`json
{
"status": "healthy",
"version": "1.0.0",
"timestamp": "2024-12-20T10:00:00.000Z",
"checks": {
"database": {
"status": "healthy"
}
},
"responseTime": "5ms"
}
`
$3
`bash
curl http://localhost:3100/metrics
`
Response:
`json
{
"timestamp": "2024-12-20T10:00:00.000Z",
"jobs": {
"total": 1500,
"byStatus": {
"pending": 50,
"fired": 1400,
"failed": 30,
"cancelled": 20
}
},
"lastFiredJobAt": "2024-12-20T09:59:00.000Z",
"uptime": 86400,
"memory": {
"heapUsed": 45,
"heapTotal": 65,
"rss": 80
}
}
`
Webhook Payload
When a job triggers, the callback URL receives:
`json
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"triggered_at": "2024-12-25T12:00:00.000Z",
"payload": {
"action": "send_reminder",
"user_id": "user123"
},
"provider_agent_id": "nexflow-caas",
"status": "TRIGGERED"
}
`
Headers:
- Content-Type: application/json
- User-Agent: NexFlow-CAAS/1.0
- X-NexFlow-Job-Id:
- X-NexFlow-Provider: nexflow-caas
- Authorization: Bearer (if provided)
Retry Behavior
Failed webhook deliveries are retried with exponential backoff:
| Attempt | Delay |
|---------|-------|
| 1 | 30 seconds |
| 2 | 60 seconds |
| 3 | 5 minutes |
| 4 | 15 minutes |
| 5 | 1 hour |
After 5 failed attempts, the job is marked as FAILED.
Database Schema
$3
| Column | Type | Description |
|--------|------|-------------|
| id | UUID | Primary key |
| buyer_agent_id | VARCHAR(255) | ACP agent identifier |
| callback_url | TEXT | Webhook URL |
| callback_auth | TEXT | Optional bearer token |
| trigger_type | ENUM | 'AT_TIME' or 'IN_DELAY' |
| trigger_time | TIMESTAMPTZ | When to fire |
| delay_seconds | INTEGER | Original delay (for IN_DELAY) |
| payload | JSONB | Custom payload data |
| status | ENUM | PENDING/FIRED/FAILED/CANCELLED |
| attempt_count | INTEGER | Number of delivery attempts |
| last_error | TEXT | Last error message |
| created_at | TIMESTAMPTZ | Creation time |
| updated_at | TIMESTAMPTZ | Last update time |
Indexes:
- (status, trigger_time) - Partial index on PENDING jobs
- (buyer_agent_id) - For agent-specific queries
Project Structure
`
nexflow-caas/
├── src/
│ ├── api/
│ │ ├── index.ts # Fastify server entry
│ │ └── routes/
│ │ ├── acpSchedule.ts
│ │ ├── health.ts
│ │ └── metrics.ts
│ ├── scheduler/
│ │ ├── index.ts # Scheduler process entry
│ │ └── worker.ts # Job processing logic
│ ├── services/
│ │ ├── jobService.ts # Job CRUD operations
│ │ └── webhookService.ts # HTTP delivery
│ ├── config/
│ │ ├── env.ts # Environment config
│ │ ├── db.ts # Database pool
│ │ └── acp.ts # ACP configuration
│ ├── acp/
│ │ └── acpClient.ts # ACP SDK integration
│ ├── types/
│ │ ├── job.ts # Job types + schemas
│ │ └── acp.ts # ACP types
│ └── utils/
│ ├── logger.ts # Pino logger
│ └── error.ts # Error classes
├── migrations/
│ ├── 1704067200000_create-scheduled-jobs-table.cjs
│ └── 1704067200001_add-job-metrics-table.cjs
├── ecosystem.config.cjs # PM2 config
├── migrate.config.cjs # Migration config
├── tsconfig.json
├── package.json
└── README.md
`
npm Scripts
| Script | Description |
|--------|-------------|
| build | Compile TypeScript to dist/ |
| dev:api | Run API with hot reload |
| dev:scheduler | Run scheduler with hot reload |
| start:api | Start API (production) |
| start:scheduler | Start scheduler (production) |
| migrate | Run pending migrations |
| migrate:down | Rollback last migration |
| migrate:create | Create new migration |
| pm2:start | Start via PM2 |
| pm2:stop | Stop PM2 processes |
| pm2:restart | Restart PM2 processes |
| pm2:logs | View PM2 logs |
AWS EC2 Deployment
For t4g.micro/t4g.medium instances:
`bash
Install Node.js 20
curl -fsSL https://deb.nodesource.com/setup_20.x | sudo -E bash -
sudo apt-get install -y nodejs
Install PM2 globally
sudo npm install -g pm2
Clone and setup
git clone /opt/nexflow-caas
cd /opt/nexflow-caas
npm install
npm run build
Configure environment
cp env.example .env
Edit .env with production values
Run migrations
npm run migrate
Start with PM2
npm run pm2:start
Save PM2 startup script
pm2 save
pm2 startup
`
Virtuals ACP Integration
NexFlow CAAS is a Virtuals Protocol ACP provider offering scheduled task execution services to buyer agents.
$3
`
┌─────────────────┐ ┌──────────────────────────────────────────┐
│ Buyer Agent │ │ NexFlow CAAS │
│ (ACP Client) │ │ │
│ │ │ ┌─────────────┐ ┌──────────────────┐ │
│ Request Task │────▶│ │ ACP Client │──▶│ Kernel DTO │ │
│ │ │ │ (adapter) │ │ (ScheduleJob │ │
│ │ │ └─────────────┘ │ Request) │ │
│ │ │ └────────┬─────────┘ │
│ │ │ │ │
│ │ │ ┌────────▼─────────┐ │
│ ◀── Webhook ───│─────│────────────────────│ Scheduler │ │
│ │ │ │ Worker │ │
└─────────────────┘ │ └──────────────────┘ │
└──────────────────────────────────────────┘
`
$3
1. ACP tasks are received by src/acp/acpClient.ts
2. Mapped to the protocol-neutral ScheduleJobRequest kernel DTO
3. Processed through scheduleJobFromKernel() — the single entry point
4. Persisted to PostgreSQL
5. Scheduler worker polls for due jobs and delivers webhooks
6. ACP completion is confirmed back to the protocol
$3
| Field | Value |
|-------|-------|
| Name | Schedule Task Execution (Cron & Keep-Alive) |
| Price | 0.1 $VIRTUAL per task |
| Capabilities | schedule_at_time, schedule_with_delay, webhook_callback, retry_on_failure, status_tracking, cancellation, idempotent_delivery |
$3
CAAS is multi-tenant from day one:
- tenantId: Generic identifier (ACP buyer agent ID, external customer ID, SMF ID)
- source: Tracks origin (ACP, HTTP, INTERNAL`)