Files
OpenNoodl/dev-docs/tasks/phase-3.5-realtime-agentic-ui/AGENT-002-websocket-node-task.md
2025-12-30 11:55:30 +01:00

24 KiB

AGENT-002: WebSocket Node

Overview

Create a new runtime node that establishes and manages WebSocket connections, enabling bidirectional real-time communication between Noodl applications and servers. This complements SSE (AGENT-001) by supporting two-way messaging patterns.

Phase: 3.5 (Real-Time Agentic UI)
Priority: HIGH
Effort: 3-5 days
Risk: Medium


Problem Statement

Current Limitation

The existing HTTP Request node and SSE node (AGENT-001) only support one-way communication:

HTTP:  Client → Server → Response (one-shot)
SSE:   Server → Client (one-way stream)

WebSocket enables true bidirectional communication:

WebSocket: Client ⇄ Server (continuous two-way)

Real-World Use Cases

  1. Collaborative Editing - Multiple users editing same document
  2. Gaming - Real-time multiplayer interactions
  3. Chat Applications - Send and receive messages
  4. Live Cursors - Show where other users are pointing
  5. Device Control - Send commands, receive telemetry
  6. Trading Platforms - Real-time price updates + order placement

Goals

  1. Establish and maintain WebSocket connections
  2. Send messages (text and binary)
  3. Receive messages (text and binary)
  4. Handle connection lifecycle (open, error, close)
  5. Auto-reconnect with exponential backoff
  6. Ping/pong heartbeat for connection health
  7. Queue messages when disconnected

Technical Design

Node Specification

{
  name: 'net.noodl.WebSocket',
  displayNodeName: 'WebSocket',
  category: 'Data',
  color: 'data',
  docs: 'https://docs.noodl.net/nodes/data/websocket',
  searchTags: ['websocket', 'ws', 'realtime', 'bidirectional', 'socket']
}

Port Schema

Inputs

Port Name Type Group Description
url string Connection WebSocket URL (ws:// or wss://)
connect signal Actions Establish connection
disconnect signal Actions Close connection
send signal Actions Send message
message * Message Data to send (JSON serialized if object)
messageType enum Message 'text' or 'binary' (default: text)
autoReconnect boolean Connection Auto-reconnect on disconnect (default: true)
reconnectDelay number Connection Initial delay (ms, default: 1000)
maxReconnectDelay number Connection Max delay with backoff (ms, default: 30000)
protocols string Connection Comma-separated subprotocols
queueWhenDisconnected boolean Message Queue messages while offline (default: true)
heartbeatInterval number Connection Ping interval (ms, 0=disabled, default: 30000)

Outputs

Port Name Type Group Description
received * Data Received message data
receivedRaw string Data Raw message string
messageReceived signal Events Fired when message arrives
messageSent signal Events Fired after send succeeds
connected signal Events Fired when connection opens
disconnected signal Events Fired when connection closes
error string Events Error message
isConnected boolean Status Current connection state
queueSize number Status Messages waiting to send
latency number Status Round-trip time (ms)

State Machine

          ┌─────────────┐
   START  │             │
   ────┬─→│ DISCONNECTED│←──┐
       │  │             │   │
       │  └─────────────┘   │
       │         │          │
       │    [connect]       │
       │         │          │
       │         ▼          │
       │  ┌─────────────┐   │
       │  │ CONNECTING  │   │
       │  └─────────────┘   │
       │         │          │
       │    [onopen]        │
       │         │          │
       │         ▼          │
       │  ┌─────────────┐   │
       └──│  CONNECTED  │   │
          │             │   │
          │  [send/recv]│   │
          │             │   │
          └─────────────┘   │
                │ │         │
          [onclose/error]   │
                │           │
                └───────────┘
             [autoReconnect
              with backoff]

Implementation Details

File Structure

packages/noodl-runtime/src/nodes/std-library/data/
├── websocketnode.js         # Main node implementation
└── websocketnode.test.js    # Unit tests

Core Implementation

var WebSocketNode = {
  name: 'net.noodl.WebSocket',
  displayNodeName: 'WebSocket',
  category: 'Data',
  color: 'data',
  
  initialize: function() {
    this._internal.socket = null;
    this._internal.isConnected = false;
    this._internal.messageQueue = [];
    this._internal.reconnectAttempts = 0;
    this._internal.reconnectTimer = null;
    this._internal.heartbeatTimer = null;
    this._internal.lastPingTime = 0;
  },
  
  inputs: {
    url: {
      type: 'string',
      displayName: 'URL',
      group: 'Connection',
      set: function(value) {
        this._internal.url = value;
      }
    },
    
    connect: {
      type: 'signal',
      displayName: 'Connect',
      group: 'Actions',
      valueChangedToTrue: function() {
        this.doConnect();
      }
    },
    
    disconnect: {
      type: 'signal',
      displayName: 'Disconnect',
      group: 'Actions',
      valueChangedToTrue: function() {
        this.doDisconnect();
      }
    },
    
    send: {
      type: 'signal',
      displayName: 'Send',
      group: 'Actions',
      valueChangedToTrue: function() {
        this.doSend();
      }
    },
    
    message: {
      type: '*',
      displayName: 'Message',
      group: 'Message',
      set: function(value) {
        this._internal.messageToSend = value;
      }
    },
    
    messageType: {
      type: {
        name: 'enum',
        enums: [
          { label: 'Text', value: 'text' },
          { label: 'Binary', value: 'binary' }
        ]
      },
      displayName: 'Message Type',
      group: 'Message',
      default: 'text',
      set: function(value) {
        this._internal.messageType = value;
      }
    },
    
    autoReconnect: {
      type: 'boolean',
      displayName: 'Auto Reconnect',
      group: 'Connection',
      default: true,
      set: function(value) {
        this._internal.autoReconnect = value;
      }
    },
    
    reconnectDelay: {
      type: 'number',
      displayName: 'Reconnect Delay (ms)',
      group: 'Connection',
      default: 1000,
      set: function(value) {
        this._internal.reconnectDelay = value;
      }
    },
    
    maxReconnectDelay: {
      type: 'number',
      displayName: 'Max Reconnect Delay (ms)',
      group: 'Connection',
      default: 30000,
      set: function(value) {
        this._internal.maxReconnectDelay = value;
      }
    },
    
    protocols: {
      type: 'string',
      displayName: 'Protocols',
      group: 'Connection',
      set: function(value) {
        this._internal.protocols = value;
      }
    },
    
    queueWhenDisconnected: {
      type: 'boolean',
      displayName: 'Queue When Disconnected',
      group: 'Message',
      default: true,
      set: function(value) {
        this._internal.queueWhenDisconnected = value;
      }
    },
    
    heartbeatInterval: {
      type: 'number',
      displayName: 'Heartbeat Interval (ms)',
      group: 'Connection',
      default: 30000,
      set: function(value) {
        this._internal.heartbeatInterval = value;
        if (this._internal.isConnected) {
          this.startHeartbeat();
        }
      }
    }
  },
  
  outputs: {
    received: {
      type: '*',
      displayName: 'Received',
      group: 'Data',
      getter: function() {
        return this._internal.lastReceived;
      }
    },
    
    receivedRaw: {
      type: 'string',
      displayName: 'Received Raw',
      group: 'Data',
      getter: function() {
        return this._internal.lastReceivedRaw;
      }
    },
    
    messageReceived: {
      type: 'signal',
      displayName: 'Message Received',
      group: 'Events'
    },
    
    messageSent: {
      type: 'signal',
      displayName: 'Message Sent',
      group: 'Events'
    },
    
    connected: {
      type: 'signal',
      displayName: 'Connected',
      group: 'Events'
    },
    
    disconnected: {
      type: 'signal',
      displayName: 'Disconnected',
      group: 'Events'
    },
    
    error: {
      type: 'string',
      displayName: 'Error',
      group: 'Events',
      getter: function() {
        return this._internal.lastError;
      }
    },
    
    isConnected: {
      type: 'boolean',
      displayName: 'Is Connected',
      group: 'Status',
      getter: function() {
        return this._internal.isConnected;
      }
    },
    
    queueSize: {
      type: 'number',
      displayName: 'Queue Size',
      group: 'Status',
      getter: function() {
        return this._internal.messageQueue.length;
      }
    },
    
    latency: {
      type: 'number',
      displayName: 'Latency (ms)',
      group: 'Status',
      getter: function() {
        return this._internal.latency || 0;
      }
    }
  },
  
  methods: {
    doConnect: function() {
      // Disconnect existing
      if (this._internal.socket) {
        this.doDisconnect();
      }
      
      const url = this._internal.url;
      if (!url) {
        this.setError('URL is required');
        return;
      }
      
      if (!url.startsWith('ws://') && !url.startsWith('wss://')) {
        this.setError('URL must start with ws:// or wss://');
        return;
      }
      
      try {
        // Parse protocols
        const protocols = this._internal.protocols
          ? this._internal.protocols.split(',').map(p => p.trim())
          : undefined;
        
        const socket = new WebSocket(url, protocols);
        this._internal.socket = socket;
        
        // Connection opened
        socket.onopen = () => {
          this._internal.isConnected = true;
          this._internal.reconnectAttempts = 0;
          this._internal.lastError = null;
          
          this.flagOutputDirty('isConnected');
          this.flagOutputDirty('error');
          this.sendSignalOnOutput('connected');
          
          // Start heartbeat
          this.startHeartbeat();
          
          // Flush queued messages
          this.flushMessageQueue();
          
          console.log('[WebSocket] Connected to', url);
        };
        
        // Message received
        socket.onmessage = (event) => {
          this.handleMessage(event.data);
        };
        
        // Connection closed
        socket.onclose = (event) => {
          console.log('[WebSocket] Closed:', event.code, event.reason);
          
          this._internal.isConnected = false;
          this.flagOutputDirty('isConnected');
          this.sendSignalOnOutput('disconnected');
          
          this.stopHeartbeat();
          
          // Auto-reconnect
          if (this._internal.autoReconnect && !event.wasClean) {
            this.scheduleReconnect();
          }
        };
        
        // Connection error
        socket.onerror = (error) => {
          console.error('[WebSocket] Error:', error);
          this.setError('Connection error');
        };
        
      } catch (e) {
        this.setError(e.message);
        console.error('[WebSocket] Failed to connect:', e);
      }
    },
    
    doDisconnect: function() {
      // Clear timers
      if (this._internal.reconnectTimer) {
        clearTimeout(this._internal.reconnectTimer);
        this._internal.reconnectTimer = null;
      }
      this.stopHeartbeat();
      
      // Close socket
      if (this._internal.socket) {
        this._internal.socket.close(1000, 'Client disconnect');
        this._internal.socket = null;
        this._internal.isConnected = false;
        this.flagOutputDirty('isConnected');
        
        console.log('[WebSocket] Disconnected');
      }
    },
    
    doSend: function() {
      const message = this._internal.messageToSend;
      if (message === undefined || message === null) {
        return;
      }
      
      // If not connected, queue or drop
      if (!this._internal.isConnected) {
        if (this._internal.queueWhenDisconnected) {
          this._internal.messageQueue.push(message);
          this.flagOutputDirty('queueSize');
          console.log('[WebSocket] Message queued (disconnected)');
        } else {
          this.setError('Cannot send: not connected');
        }
        return;
      }
      
      try {
        const socket = this._internal.socket;
        const messageType = this._internal.messageType || 'text';
        
        // Serialize based on type
        let data;
        if (messageType === 'binary') {
          // Convert to ArrayBuffer or Blob
          if (typeof message === 'string') {
            data = new TextEncoder().encode(message);
          } else {
            data = message;
          }
        } else {
          // Text mode - serialize objects as JSON
          if (typeof message === 'object') {
            data = JSON.stringify(message);
          } else {
            data = String(message);
          }
        }
        
        socket.send(data);
        this.sendSignalOnOutput('messageSent');
        
      } catch (e) {
        this.setError('Send failed: ' + e.message);
      }
    },
    
    handleMessage: function(data) {
      this._internal.lastReceivedRaw = data;
      
      // Try to parse as JSON
      try {
        this._internal.lastReceived = JSON.parse(data);
      } catch (e) {
        // Not JSON, use raw
        this._internal.lastReceived = data;
      }
      
      // Check if it's a pong response
      if (data === 'pong' && this._internal.lastPingTime) {
        this._internal.latency = Date.now() - this._internal.lastPingTime;
        this.flagOutputDirty('latency');
        return; // Don't emit messageReceived for pong
      }
      
      this.flagOutputDirty('received');
      this.flagOutputDirty('receivedRaw');
      this.sendSignalOnOutput('messageReceived');
    },
    
    flushMessageQueue: function() {
      const queue = this._internal.messageQueue;
      if (queue.length === 0) return;
      
      console.log(`[WebSocket] Flushing ${queue.length} queued messages`);
      
      while (queue.length > 0) {
        this._internal.messageToSend = queue.shift();
        this.doSend();
      }
      
      this.flagOutputDirty('queueSize');
    },
    
    scheduleReconnect: function() {
      const baseDelay = this._internal.reconnectDelay || 1000;
      const maxDelay = this._internal.maxReconnectDelay || 30000;
      const attempts = this._internal.reconnectAttempts;
      
      // Exponential backoff: baseDelay * 2^attempts
      const delay = Math.min(baseDelay * Math.pow(2, attempts), maxDelay);
      
      console.log(`[WebSocket] Reconnecting in ${delay}ms (attempt ${attempts + 1})`);
      
      this._internal.reconnectTimer = setTimeout(() => {
        this._internal.reconnectAttempts++;
        this.doConnect();
      }, delay);
    },
    
    startHeartbeat: function() {
      this.stopHeartbeat();
      
      const interval = this._internal.heartbeatInterval;
      if (!interval || interval <= 0) return;
      
      this._internal.heartbeatTimer = setInterval(() => {
        if (this._internal.isConnected) {
          this._internal.lastPingTime = Date.now();
          try {
            this._internal.socket.send('ping');
          } catch (e) {
            console.error('[WebSocket] Heartbeat failed:', e);
          }
        }
      }, interval);
    },
    
    stopHeartbeat: function() {
      if (this._internal.heartbeatTimer) {
        clearInterval(this._internal.heartbeatTimer);
        this._internal.heartbeatTimer = null;
      }
    },
    
    setError: function(message) {
      this._internal.lastError = message;
      this.flagOutputDirty('error');
    },
    
    _onNodeDeleted: function() {
      this.doDisconnect();
    }
  },
  
  getInspectInfo: function() {
    if (this._internal.isConnected) {
      return {
        type: 'value',
        value: {
          status: 'Connected',
          url: this._internal.url,
          latency: this._internal.latency + 'ms',
          queueSize: this._internal.messageQueue.length
        }
      };
    }
    return {
      type: 'text',
      value: 'Disconnected'
    };
  }
};

module.exports = {
  node: WebSocketNode
};

Usage Examples

Example 1: Chat Application

[Text Input: message]
    → [WebSocket] message
    
[Send Button] clicked
    → [WebSocket] send signal

[WebSocket] connected
    → [Variable] isOnline = true

[WebSocket] received
    → [Array] add to messages
    → [Repeater] render chat
    
[WebSocket] disconnected
    → [Show Toast] "Connection lost, reconnecting..."

Example 2: Collaborative Cursors

[Mouse Move Event]
    → [Debounce] 100ms
    → [Object] { x, y, userId }
    → [WebSocket] message
    → [WebSocket] send

[WebSocket] received → { x, y, userId }
    → [Array Filter] exclude own cursor
    → [Repeater] render other cursors

Example 3: Real-Time Game

// Send player action
[Keyboard Event: space]
    → [Object] { action: "jump", timestamp: Date.now() }
    → [WebSocket] message
    → [WebSocket] send

// Receive game state
[WebSocket] received → { players: [], score: 100 }
    → [For Each] in players
    → [Sprite] update positions

Example 4: IoT Device Control

// Send command
[Toggle Switch] changed
    → [Object] { device: "light-1", state: value }
    → [WebSocket] send

// Receive telemetry
[WebSocket] received → { temperature: 72, humidity: 45 }
    → [Number] temperature
    → [Gauge] display

Testing Checklist

Functional Tests

  • Connection establishes to valid WebSocket server
  • Connection fails gracefully with invalid URL
  • Can send text messages
  • Can send JSON objects (auto-serialized)
  • Can receive text messages
  • Can receive JSON messages (auto-parsed)
  • connected signal fires on open
  • disconnected signal fires on close
  • messageSent signal fires after send
  • messageReceived signal fires on receive
  • isConnected reflects current state
  • Auto-reconnect works after disconnect
  • Exponential backoff increases delay
  • Messages queue when disconnected
  • Queued messages flush on reconnect
  • Manual disconnect stops auto-reconnect
  • Heartbeat sends ping messages
  • Latency calculated from ping/pong
  • Subprotocols negotiated correctly
  • Node cleanup works (no memory leaks)

Edge Cases

  • Handles rapid connect/disconnect cycles
  • Handles very large messages (>1MB)
  • Handles binary data correctly
  • Handles server closing connection unexpectedly
  • Handles network going offline/online
  • Queue doesn't grow unbounded
  • Multiple WebSocket nodes don't interfere
  • Connection closes cleanly on component unmount

Performance

  • Memory usage stable over 1000+ messages
  • No visible UI lag during high-frequency messages
  • Queue processing doesn't block main thread
  • Heartbeat doesn't impact performance

WebSocket vs SSE Decision Guide

Help users choose between WebSocket (AGENT-002) and SSE (AGENT-001):

Feature SSE WebSocket
Direction Server → Client only Bidirectional
Protocol HTTP/1.1, HTTP/2 WebSocket protocol
Auto-reconnect Native browser behavior Manual implementation
Message format Text (typically JSON) Text or Binary
Firewall friendly Yes (uses HTTP) ⚠️ Sometimes blocked
Complexity Simpler More complex
Use when Server pushes updates Client needs to send data

Rule of thumb:

  • Need to receive updates → SSE
  • Need to send and receive → WebSocket

Browser Compatibility

WebSocket is supported in all modern browsers:

Browser Support
Chrome Yes
Firefox Yes
Safari Yes
Edge Yes
IE 11 ⚠️ Partial (no binary frames)

Security Considerations

1. Use WSS (WebSocket Secure)

Always use wss:// in production, not ws://. This encrypts traffic.

2. Authentication

WebSocket doesn't support custom headers in browser. Options:

  • Send auth token as first message after connect
  • Include token in URL query: wss://example.com/ws?token=abc123
  • Use cookie-based authentication

3. Message Validation

Always validate received messages server-side. Don't trust client data.

4. Rate Limiting

Implement server-side rate limiting to prevent abuse.


Documentation Requirements

User-Facing Docs

Create: docs/nodes/data/websocket.md

# WebSocket

Enable real-time bidirectional communication with WebSocket servers. Perfect for chat, collaborative editing, gaming, and live dashboards.

## When to Use

- **Chat**: Send and receive messages
- **Collaboration**: Real-time multi-user editing
- **Gaming**: Multiplayer interactions
- **IoT**: Device control and telemetry
- **Trading**: Price updates + order placement

## WebSocket vs Server-Sent Events

- Use **SSE** when server pushes updates, client doesn't send
- Use **WebSocket** when client needs to send data to server

## Basic Usage

1. Add WebSocket node
2. Set `URL` to your WebSocket endpoint (wss://...)
3. Send `Connect` signal
4. To send: Set `message`, trigger `send` signal
5. To receive: Listen to `messageReceived`, read `received`

## Example: Chat

[Screenshot showing WebSocket in chat application]

## Queuing

When disconnected, messages can queue automatically:
- Enable `Queue When Disconnected`
- Messages send when reconnected

## Heartbeat

Keep connection alive with automatic ping/pong:
- Set `Heartbeat Interval` (milliseconds)
- Monitor `Latency` for connection health

## Security

Always use `wss://` (secure) in production, not `ws://`.

For authentication:
```javascript
// Send auth on connect
[WebSocket] connected
  → [Object] { type: "auth", token: authToken }
  → [WebSocket] message
  → [WebSocket] send

---

## Success Criteria

1. ✅ WebSocket node successfully connects and exchanges messages
2. ✅ Auto-reconnect works reliably with exponential backoff
3. ✅ Message queuing prevents data loss during disconnects
4. ✅ Heartbeat detects dead connections
5. ✅ No memory leaks over extended usage
6. ✅ Clear documentation with examples
7. ✅ Works in Erleah for real-time backend communication

---

## Future Enhancements

Post-MVP features to consider:

1. **Compression** - Enable permessage-deflate extension
2. **Binary Frames** - Better binary data support
3. **Subprotocol Handling** - React to negotiated protocol
4. **Message Buffering** - Batch sends for performance
5. **Connection Pooling** - Share socket across components
6. **Custom Heartbeat Messages** - Beyond ping/pong

---

## References

- [MDN: WebSocket API](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket)
- [WebSocket Protocol RFC 6455](https://tools.ietf.org/html/rfc6455)
- [WebSocket vs SSE](https://ably.com/topic/websockets-vs-sse)

---

## Dependencies

- None (uses native WebSocket API)

## Blocked By

- None

## Blocks

- Erleah development - requires bidirectional communication

---

## Estimated Effort Breakdown

| Phase | Estimate | Description |
|-------|----------|-------------|
| Core Implementation | 1.5 days | Basic WebSocket with send/receive |
| Auto-reconnect & Queue | 1 day | Exponential backoff, message queuing |
| Heartbeat | 0.5 day | Ping/pong latency tracking |
| Testing | 1 day | Unit tests, integration tests, edge cases |
| Documentation | 0.5 day | User docs, technical docs, examples |
| Polish | 0.5 day | Error handling, performance, cleanup |

**Total: 5 days**

Buffer: None needed (straightforward implementation)

**Final: 3-5 days** (depending on testing depth)