19 KiB
AGENT-001: Server-Sent Events (SSE) Node
Overview
Create a new runtime node that establishes and manages Server-Sent Events (SSE) connections, enabling real-time streaming data from servers to Noodl applications. This is critical for agentic UI patterns where AI backends stream responses incrementally.
Phase: 3.5 (Real-Time Agentic UI)
Priority: CRITICAL (blocks Erleah development)
Effort: 3-5 days
Risk: Medium
Problem Statement
Current Limitation
The existing HTTP Request node only supports request-response patterns:
User Action → HTTP Request → Wait → Response → UI Update
This doesn't work for streaming use cases:
User Action → SSE Connect → Stream messages → Progressive UI Updates
↓
Message 1 → UI Update
Message 2 → UI Update
Message 3 → UI Update
...
Real-World Use Cases
- AI Chat Responses - Stream tokens as they generate (Erleah)
- Live Notifications - Server pushes updates without polling
- Real-Time Dashboards - Continuous metric updates
- Progress Updates - Long-running backend operations
- Event Feeds - News, social media, activity streams
Goals
- ✅ Establish and maintain SSE connections
- ✅ Parse incoming messages (text and JSON)
- ✅ Handle connection lifecycle (open, error, close)
- ✅ Auto-reconnect on connection loss
- ✅ Clean disconnection on node deletion
- ✅ Support custom event types (via addEventListener)
Technical Design
Node Specification
{
name: 'net.noodl.SSE',
displayNodeName: 'Server-Sent Events',
category: 'Data',
color: 'data',
docs: 'https://docs.noodl.net/nodes/data/sse',
searchTags: ['sse', 'stream', 'server-sent', 'events', 'realtime', 'websocket']
}
Port Schema
Inputs
| Port Name | Type | Group | Description |
|---|---|---|---|
url |
string | Connection | SSE endpoint URL |
connect |
signal | Actions | Establish connection |
disconnect |
signal | Actions | Close connection |
autoReconnect |
boolean | Connection | Auto-reconnect on disconnect (default: true) |
reconnectDelay |
number | Connection | Delay before reconnect (ms, default: 3000) |
withCredentials |
boolean | Connection | Include credentials (default: false) |
customHeaders |
object | Connection | Custom headers (EventSource doesn't support, document limitation) |
Outputs
| Port Name | Type | Group | Description |
|---|---|---|---|
message |
object | Data | Raw message event object |
data |
* | Data | Parsed message data (JSON or text) |
eventType |
string | Data | Event type (if custom events used) |
connected |
signal | Events | Fired when connection opens |
disconnected |
signal | Events | Fired when connection closes |
error |
string | Events | Error message |
isConnected |
boolean | Status | Current connection state |
lastMessageTime |
number | Status | Timestamp of last message (ms) |
State Machine
┌─────────────┐
START │ │
────┬─→│ DISCONNECTED│←──┐
│ │ │ │
│ └─────────────┘ │
│ │ │
│ [connect] │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ CONNECTING │ │
│ └─────────────┘ │
│ │ │
│ [onopen] │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
└──│ CONNECTED │ │
│ │ │
└─────────────┘ │
│ │ │
[onmessage] │
│ │ │
[onerror/close] │
│ │
└───────────┘
[autoReconnect
after delay]
Implementation Details
File Structure
packages/noodl-runtime/src/nodes/std-library/data/
├── ssenode.js # Main node implementation
└── ssenode.test.js # Unit tests
Core Implementation
var SSENode = {
name: 'net.noodl.SSE',
displayNodeName: 'Server-Sent Events',
category: 'Data',
color: 'data',
initialize: function() {
this._internal.eventSource = null;
this._internal.isConnected = false;
this._internal.messageBuffer = [];
this._internal.reconnectTimer = null;
this._internal.customEventListeners = new Map();
},
inputs: {
url: {
type: 'string',
displayName: 'URL',
group: 'Connection',
set: function(value) {
this._internal.url = value;
}
},
connect: {
type: 'signal',
displayName: 'Connect',
group: 'Actions',
valueChangedToTrue: function() {
this.doConnect();
}
},
disconnect: {
type: 'signal',
displayName: 'Disconnect',
group: 'Actions',
valueChangedToTrue: function() {
this.doDisconnect();
}
},
autoReconnect: {
type: 'boolean',
displayName: 'Auto Reconnect',
group: 'Connection',
default: true,
set: function(value) {
this._internal.autoReconnect = value;
}
},
reconnectDelay: {
type: 'number',
displayName: 'Reconnect Delay (ms)',
group: 'Connection',
default: 3000,
set: function(value) {
this._internal.reconnectDelay = value;
}
},
withCredentials: {
type: 'boolean',
displayName: 'With Credentials',
group: 'Connection',
default: false,
set: function(value) {
this._internal.withCredentials = value;
}
},
// For custom event types (beyond 'message')
eventType: {
type: 'string',
displayName: 'Listen for Event Type',
group: 'Connection',
set: function(value) {
this._internal.customEventType = value;
if (this._internal.eventSource && value) {
this.addCustomEventListener(value);
}
}
}
},
outputs: {
message: {
type: 'object',
displayName: 'Message',
group: 'Data',
getter: function() {
return this._internal.lastMessage;
}
},
data: {
type: '*',
displayName: 'Data',
group: 'Data',
getter: function() {
return this._internal.lastData;
}
},
eventType: {
type: 'string',
displayName: 'Event Type',
group: 'Data',
getter: function() {
return this._internal.lastEventType;
}
},
connected: {
type: 'signal',
displayName: 'Connected',
group: 'Events'
},
disconnected: {
type: 'signal',
displayName: 'Disconnected',
group: 'Events'
},
messageReceived: {
type: 'signal',
displayName: 'Message Received',
group: 'Events'
},
error: {
type: 'string',
displayName: 'Error',
group: 'Events',
getter: function() {
return this._internal.lastError;
}
},
isConnected: {
type: 'boolean',
displayName: 'Is Connected',
group: 'Status',
getter: function() {
return this._internal.isConnected;
}
},
lastMessageTime: {
type: 'number',
displayName: 'Last Message Time',
group: 'Status',
getter: function() {
return this._internal.lastMessageTime;
}
}
},
methods: {
doConnect: function() {
// Disconnect existing connection if any
if (this._internal.eventSource) {
this.doDisconnect();
}
const url = this._internal.url;
if (!url) {
this.setError('URL is required');
return;
}
try {
// Create EventSource with options
const options = {
withCredentials: this._internal.withCredentials || false
};
const eventSource = new EventSource(url, options);
this._internal.eventSource = eventSource;
// Connection opened
eventSource.onopen = () => {
this._internal.isConnected = true;
this._internal.lastError = null;
this.flagOutputDirty('isConnected');
this.flagOutputDirty('error');
this.sendSignalOnOutput('connected');
console.log('[SSE Node] Connected to', url);
};
// Message received (default event type)
eventSource.onmessage = (event) => {
this.handleMessage(event, 'message');
};
// Connection error/closed
eventSource.onerror = (error) => {
console.error('[SSE Node] Connection error:', error);
const wasConnected = this._internal.isConnected;
this._internal.isConnected = false;
this.flagOutputDirty('isConnected');
if (wasConnected) {
this.sendSignalOnOutput('disconnected');
}
// Check if connection is permanently closed
if (eventSource.readyState === EventSource.CLOSED) {
this.setError('Connection closed');
// Auto-reconnect if enabled
if (this._internal.autoReconnect) {
const delay = this._internal.reconnectDelay || 3000;
console.log(`[SSE Node] Reconnecting in ${delay}ms...`);
this._internal.reconnectTimer = setTimeout(() => {
this.doConnect();
}, delay);
}
}
};
// Add custom event listener if specified
if (this._internal.customEventType) {
this.addCustomEventListener(this._internal.customEventType);
}
} catch (e) {
this.setError(e.message);
console.error('[SSE Node] Failed to connect:', e);
}
},
doDisconnect: function() {
// Clear reconnect timer
if (this._internal.reconnectTimer) {
clearTimeout(this._internal.reconnectTimer);
this._internal.reconnectTimer = null;
}
// Close connection
if (this._internal.eventSource) {
this._internal.eventSource.close();
this._internal.eventSource = null;
this._internal.isConnected = false;
this.flagOutputDirty('isConnected');
this.sendSignalOnOutput('disconnected');
console.log('[SSE Node] Disconnected');
}
},
addCustomEventListener: function(eventType) {
if (!this._internal.eventSource || !eventType) return;
// Remove old listener if exists
const oldListener = this._internal.customEventListeners.get(eventType);
if (oldListener) {
this._internal.eventSource.removeEventListener(eventType, oldListener);
}
// Add new listener
const listener = (event) => {
this.handleMessage(event, eventType);
};
this._internal.eventSource.addEventListener(eventType, listener);
this._internal.customEventListeners.set(eventType, listener);
},
handleMessage: function(event, type) {
this._internal.lastMessageTime = Date.now();
this._internal.lastEventType = type || 'message';
this._internal.lastMessage = {
data: event.data,
lastEventId: event.lastEventId,
type: type || 'message'
};
// Try to parse as JSON
try {
this._internal.lastData = JSON.parse(event.data);
} catch (e) {
// Not JSON, use raw string
this._internal.lastData = event.data;
}
this.flagOutputDirty('message');
this.flagOutputDirty('data');
this.flagOutputDirty('eventType');
this.flagOutputDirty('lastMessageTime');
this.sendSignalOnOutput('messageReceived');
},
setError: function(message) {
this._internal.lastError = message;
this.flagOutputDirty('error');
},
_onNodeDeleted: function() {
this.doDisconnect();
}
},
getInspectInfo: function() {
if (this._internal.isConnected) {
return {
type: 'value',
value: {
status: 'Connected',
url: this._internal.url,
lastMessage: this._internal.lastData,
messageCount: this._internal.messageCount || 0
}
};
}
return {
type: 'text',
value: this._internal.isConnected ? 'Connected' : 'Disconnected'
};
}
};
module.exports = {
node: SSENode
};
Usage Examples
Example 1: AI Chat Streaming
[Text Input: "Hello AI"]
→ [Send] signal
→ [HTTP Request] POST /chat/start → returns chatId
→ [String Format] "/chat/{chatId}/stream"
→ [SSE Node] url
→ [SSE Node] connect
[SSE Node] data
→ [Array] accumulate messages
→ [Repeater] render messages
[SSE Node] messageReceived
→ [Scroll To Bottom] in chat container
Example 2: Live Notifications
[Component Mounted] signal
→ [SSE Node] connect to "/notifications/stream"
[SSE Node] data
→ [Show Toast] notification
[SSE Node] connected
→ [Variable] isLive = true → [Visual indicator]
[SSE Node] disconnected
→ [Variable] isLive = false → [Show offline banner]
Example 3: Progress Tracker
[Start Upload] signal
→ [HTTP Request] POST /upload → returns uploadId
→ [SSE Node] connect to "/upload/{uploadId}/progress"
[SSE Node] data → { percent: number }
→ [Progress Bar] value
[SSE Node] data → { status: "complete" }
→ [SSE Node] disconnect
→ [Navigate] to success page
Testing Checklist
Functional Tests
- Connection establishes to valid SSE endpoint
- Connection fails gracefully with invalid URL
- Messages are received and parsed correctly
- JSON messages parse to objects
- Plain text messages output as strings
connectedsignal fires on opendisconnectedsignal fires on closemessageReceivedsignal fires for each messageisConnectedreflects current state- Auto-reconnect works after disconnect
- Reconnect delay is respected
- Manual disconnect stops auto-reconnect
- Custom event types are received
- withCredentials flag works correctly
- Node cleanup works (no memory leaks)
Edge Cases
- Handles rapid connect/disconnect cycles
- Handles very large messages (>1MB)
- Handles malformed JSON gracefully
- Handles server sending error events
- Handles network going offline/online
- Handles multiple SSE nodes simultaneously
- Connection closes cleanly on component unmount
- Reconnect timer clears on manual disconnect
Performance
- Memory usage stable over 1000+ messages
- No visible UI lag during streaming
- Message parsing doesn't block main thread
- Multiple connections don't interfere
Browser Compatibility
EventSource (SSE) is supported in:
| Browser | Support |
|---|---|
| Chrome | ✅ Yes |
| Firefox | ✅ Yes |
| Safari | ✅ Yes |
| Edge | ✅ Yes |
| IE 11 | ❌ No (polyfill available) |
For IE 11 support, can use: event-source-polyfill
Limitations & Workarounds
Limitation 1: No Custom Headers
EventSource API doesn't support custom headers (like Authorization).
Workaround:
- Send auth token as query parameter:
/stream?token=abc123 - Use cookie-based authentication (withCredentials: true)
- Or use WebSocket instead (AGENT-002)
Limitation 2: No Request Body
SSE is GET-only, can't send request body.
Workaround:
- Create session/channel via POST first
- Connect SSE to session-specific URL:
/stream/{sessionId}
Limitation 3: One-Way Communication
Server → Client only. Client can't send messages over same connection.
Workaround:
- Use separate HTTP requests for client → server
- Or use WebSocket for bidirectional (AGENT-002)
Documentation Requirements
User-Facing Docs
Create: docs/nodes/data/sse.md
# Server-Sent Events
Stream real-time data from your server to your Noodl app using Server-Sent Events (SSE). Perfect for live notifications, AI chat responses, progress updates, and real-time dashboards.
## When to Use
- **AI Chat**: Stream tokens as they generate
- **Notifications**: Push updates without polling
- **Dashboards**: Continuous metric updates
- **Progress**: Long-running operation status
## Basic Usage
1. Add SSE node to your component
2. Set the `URL` to your SSE endpoint
3. Send the `Connect` signal
4. Use the `data` output to access messages
5. Listen to `messageReceived` to trigger actions
## Example: Live Chat
[Screenshot showing SSE node connected to chat UI]
## Authentication
Since EventSource doesn't support custom headers, use query parameters:
URL: https://api.example.com/stream?token={authToken}
Or enable `With Credentials` for cookie-based auth.
## Auto-Reconnect
By default, SSE nodes auto-reconnect if connection drops. Configure:
- `Auto Reconnect`: Enable/disable
- `Reconnect Delay`: Wait time in milliseconds
## Custom Events
Servers can send named events. Use the `Event Type` input to listen for specific events.
```javascript
// Server sends:
event: notification
data: {"message": "New user signed up"}
// Noodl receives on 'notification' event type
### Technical Docs
Add to: `dev-docs/reference/NODE-PATTERNS.md`
Section on streaming nodes.
---
## Success Criteria
1. ✅ SSE node successfully streams data in test app
2. ✅ Auto-reconnect works reliably
3. ✅ No memory leaks over extended usage
4. ✅ Clear documentation with examples
5. ✅ Works in Erleah prototype for AI chat streaming
---
## Future Enhancements
Post-MVP features to consider:
1. **Message Buffering** - Store last N messages for replays
2. **Rate Limiting** - Throttle message processing
3. **Event Filtering** - Filter messages by criteria before output
4. **Last Event ID** - Resume from last message on reconnect
5. **Connection Pooling** - Share connection across components
---
## References
- [MDN: EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource)
- [HTML Living Standard: SSE](https://html.spec.whatwg.org/multipage/server-sent-events.html)
- [SSE vs WebSocket](https://ably.com/topic/server-sent-events-vs-websockets)
---
## Dependencies
- None (uses native EventSource API)
## Blocked By
- None
## Blocks
- AGENT-007 (Stream Parser Utilities) - needs SSE for testing
- Erleah development - requires streaming AI responses
---
## Estimated Effort Breakdown
| Phase | Estimate | Description |
|-------|----------|-------------|
| Core Implementation | 1 day | Basic SSE node with connect/disconnect/message |
| Error Handling | 0.5 day | Graceful failures, reconnect logic |
| Testing | 1 day | Unit tests, integration tests, edge cases |
| Documentation | 0.5 day | User docs, technical docs, examples |
| Edge Cases & Polish | 0.5-1 day | Performance, memory, browser compat |
**Total: 3.5-4 days**
Buffer: +1 day for unexpected issues = **4-5 days total**