Files
OpenNoodl/dev-docs/tasks/phase-3.5-realtime-agentic-ui/AGENT-007-stream-parser-utilities-task.md
2025-12-30 11:55:30 +01:00

878 lines
20 KiB
Markdown

# AGENT-007: Stream Parser Utilities
## Overview
Create utility nodes for parsing streaming data, particularly JSON streams, chunked responses, and incremental text accumulation. These utilities help process SSE and WebSocket messages that arrive in fragments.
**Phase:** 3.5 (Real-Time Agentic UI)
**Priority:** MEDIUM
**Effort:** 2-3 days
**Risk:** Low
---
## Problem Statement
### Current Limitation
Streaming data often arrives in fragments:
```
Chunk 1: {"type":"mes
Chunk 2: sage","conte
Chunk 3: nt":"Hello"}
```
Without parsing utilities, developers must manually:
1. Accumulate chunks
2. Detect message boundaries
3. Parse JSON safely
4. Handle malformed data
This is error-prone and repetitive.
### Desired Pattern
```
[SSE] data → raw chunks
[Stream Parser] accumulate & parse
[Complete JSON Object] → use in app
```
### Real-World Use Cases (Erleah)
1. **AI Chat Streaming** - Accumulate tokens into messages
2. **JSON Streaming** - Parse newline-delimited JSON (NDJSON)
3. **Progress Updates** - Extract percentages from stream
4. **CSV Streaming** - Parse CSV row-by-row
5. **Log Streaming** - Parse structured logs
---
## Goals
1. ✅ Accumulate text chunks into complete messages
2. ✅ Parse NDJSON (newline-delimited JSON)
3. ✅ Parse JSON chunks safely (handle incomplete JSON)
4. ✅ Extract values from streaming text (regex patterns)
5. ✅ Detect message boundaries (delimiters)
6. ✅ Buffer and flush patterns
7. ✅ Handle encoding/decoding
---
## Technical Design
### Node Specifications
We'll create FOUR utility nodes:
1. **Text Accumulator** - Accumulate chunks into complete text
2. **JSON Stream Parser** - Parse NDJSON or chunked JSON
3. **Pattern Extractor** - Extract values using regex
4. **Stream Buffer** - Buffer with custom flush logic
### Text Accumulator Node
```javascript
{
name: 'net.noodl.TextAccumulator',
displayNodeName: 'Text Accumulator',
category: 'Data',
color: 'green',
docs: 'https://docs.noodl.net/nodes/data/text-accumulator'
}
```
#### Ports: Text Accumulator
| Port Name | Type | Group | Description |
|-----------|------|-------|-------------|
| **Inputs** |
| `chunk` | string | Data | Text chunk to add |
| `add` | signal | Actions | Add chunk to buffer |
| `clear` | signal | Actions | Clear accumulated text |
| `delimiter` | string | Config | Message delimiter (default: "\\n") |
| `maxLength` | number | Config | Max buffer size (default: 1MB) |
| **Outputs** |
| `accumulated` | string | Data | Current accumulated text |
| `messages` | array | Data | Complete messages (split by delimiter) |
| `messageCount` | number | Status | Number of complete messages |
| `messageReceived` | signal | Events | Fires when complete message |
| `bufferSize` | number | Status | Current buffer size (bytes) |
| `cleared` | signal | Events | Fires after clear |
### JSON Stream Parser Node
```javascript
{
name: 'net.noodl.JSONStreamParser',
displayNodeName: 'JSON Stream Parser',
category: 'Data',
color: 'green'
}
```
#### Ports: JSON Stream Parser
| Port Name | Type | Group | Description |
|-----------|------|-------|-------------|
| **Inputs** |
| `chunk` | string | Data | JSON chunk |
| `parse` | signal | Actions | Trigger parse |
| `clear` | signal | Actions | Clear buffer |
| `format` | enum | Config | 'ndjson', 'array', 'single' |
| **Outputs** |
| `parsed` | * | Data | Parsed object |
| `success` | signal | Events | Fires on successful parse |
| `error` | string | Events | Parse error message |
| `isComplete` | boolean | Status | Object is complete |
### Pattern Extractor Node
```javascript
{
name: 'net.noodl.PatternExtractor',
displayNodeName: 'Pattern Extractor',
category: 'Data',
color: 'green'
}
```
#### Ports: Pattern Extractor
| Port Name | Type | Group | Description |
|-----------|------|-------|-------------|
| **Inputs** |
| `text` | string | Data | Text to extract from |
| `pattern` | string | Pattern | Regex pattern |
| `extract` | signal | Actions | Trigger extraction |
| `extractAll` | boolean | Config | Extract all matches vs first |
| **Outputs** |
| `match` | string | Data | Matched text |
| `matches` | array | Data | All matches |
| `groups` | array | Data | Capture groups |
| `found` | signal | Events | Fires when match found |
| `notFound` | signal | Events | Fires when no match |
### Stream Buffer Node
```javascript
{
name: 'net.noodl.StreamBuffer',
displayNodeName: 'Stream Buffer',
category: 'Data',
color: 'green'
}
```
#### Ports: Stream Buffer
| Port Name | Type | Group | Description |
|-----------|------|-------|-------------|
| **Inputs** |
| `data` | * | Data | Data to buffer |
| `add` | signal | Actions | Add to buffer |
| `flush` | signal | Actions | Flush buffer |
| `clear` | signal | Actions | Clear buffer |
| `flushSize` | number | Config | Auto-flush after N items |
| `flushInterval` | number | Config | Auto-flush after ms |
| **Outputs** |
| `buffer` | array | Data | Current buffer contents |
| `bufferSize` | number | Status | Items in buffer |
| `flushed` | signal | Events | Fires after flush |
| `flushedData` | array | Data | Data that was flushed |
---
## Implementation Details
### File Structure
```
packages/noodl-runtime/src/nodes/std-library/data/
├── textaccumulatornode.js # Text accumulator
├── jsonstreamparsernode.js # JSON parser
├── patternextractornode.js # Regex extractor
├── streambuffernode.js # Generic buffer
└── streamutils.test.js # Tests
```
### Text Accumulator Implementation
```javascript
// textaccumulatornode.js
var TextAccumulatorNode = {
name: 'net.noodl.TextAccumulator',
displayNodeName: 'Text Accumulator',
category: 'Data',
color: 'green',
initialize: function() {
this._internal.buffer = '';
this._internal.messages = [];
},
inputs: {
chunk: {
type: 'string',
displayName: 'Chunk',
group: 'Data',
set: function(value) {
this._internal.pendingChunk = value;
}
},
add: {
type: 'signal',
displayName: 'Add',
group: 'Actions',
valueChangedToTrue: function() {
this.addChunk();
}
},
clear: {
type: 'signal',
displayName: 'Clear',
group: 'Actions',
valueChangedToTrue: function() {
this.clearBuffer();
}
},
delimiter: {
type: 'string',
displayName: 'Delimiter',
group: 'Config',
default: '\n',
set: function(value) {
this._internal.delimiter = value;
}
},
maxLength: {
type: 'number',
displayName: 'Max Length (bytes)',
group: 'Config',
default: 1024 * 1024, // 1MB
set: function(value) {
this._internal.maxLength = value;
}
}
},
outputs: {
accumulated: {
type: 'string',
displayName: 'Accumulated',
group: 'Data',
getter: function() {
return this._internal.buffer;
}
},
messages: {
type: 'array',
displayName: 'Messages',
group: 'Data',
getter: function() {
return this._internal.messages;
}
},
messageCount: {
type: 'number',
displayName: 'Message Count',
group: 'Status',
getter: function() {
return this._internal.messages.length;
}
},
messageReceived: {
type: 'signal',
displayName: 'Message Received',
group: 'Events'
},
bufferSize: {
type: 'number',
displayName: 'Buffer Size',
group: 'Status',
getter: function() {
return new Blob([this._internal.buffer]).size;
}
},
cleared: {
type: 'signal',
displayName: 'Cleared',
group: 'Events'
}
},
methods: {
addChunk: function() {
const chunk = this._internal.pendingChunk;
if (!chunk) return;
// Add to buffer
this._internal.buffer += chunk;
// Check max length
const maxLength = this._internal.maxLength || (1024 * 1024);
if (this._internal.buffer.length > maxLength) {
console.warn('[TextAccumulator] Buffer overflow, truncating');
this._internal.buffer = this._internal.buffer.slice(-maxLength);
}
// Check for complete messages
const delimiter = this._internal.delimiter || '\n';
const parts = this._internal.buffer.split(delimiter);
// Keep last incomplete part in buffer
this._internal.buffer = parts.pop();
// Add complete messages
if (parts.length > 0) {
this._internal.messages = this._internal.messages.concat(parts);
this.flagOutputDirty('messages');
this.flagOutputDirty('messageCount');
this.sendSignalOnOutput('messageReceived');
}
this.flagOutputDirty('accumulated');
this.flagOutputDirty('bufferSize');
},
clearBuffer: function() {
this._internal.buffer = '';
this._internal.messages = [];
this.flagOutputDirty('accumulated');
this.flagOutputDirty('messages');
this.flagOutputDirty('messageCount');
this.flagOutputDirty('bufferSize');
this.sendSignalOnOutput('cleared');
}
},
getInspectInfo: function() {
return {
type: 'value',
value: {
bufferSize: this._internal.buffer.length,
messageCount: this._internal.messages.length,
lastMessage: this._internal.messages[this._internal.messages.length - 1]
}
};
}
};
module.exports = {
node: TextAccumulatorNode
};
```
### JSON Stream Parser Implementation
```javascript
// jsonstreamparsernode.js
var JSONStreamParserNode = {
name: 'net.noodl.JSONStreamParser',
displayNodeName: 'JSON Stream Parser',
category: 'Data',
color: 'green',
initialize: function() {
this._internal.buffer = '';
},
inputs: {
chunk: {
type: 'string',
displayName: 'Chunk',
group: 'Data',
set: function(value) {
this._internal.chunk = value;
}
},
parse: {
type: 'signal',
displayName: 'Parse',
group: 'Actions',
valueChangedToTrue: function() {
this.doParse();
}
},
clear: {
type: 'signal',
displayName: 'Clear',
group: 'Actions',
valueChangedToTrue: function() {
this._internal.buffer = '';
}
},
format: {
type: {
name: 'enum',
enums: [
{ label: 'NDJSON', value: 'ndjson' },
{ label: 'JSON Array', value: 'array' },
{ label: 'Single Object', value: 'single' }
]
},
displayName: 'Format',
group: 'Config',
default: 'ndjson'
}
},
outputs: {
parsed: {
type: '*',
displayName: 'Parsed',
group: 'Data',
getter: function() {
return this._internal.parsed;
}
},
success: {
type: 'signal',
displayName: 'Success',
group: 'Events'
},
error: {
type: 'string',
displayName: 'Error',
group: 'Events',
getter: function() {
return this._internal.error;
}
},
isComplete: {
type: 'boolean',
displayName: 'Is Complete',
group: 'Status',
getter: function() {
return this._internal.isComplete;
}
}
},
methods: {
doParse: function() {
const chunk = this._internal.chunk;
if (!chunk) return;
this._internal.buffer += chunk;
const format = this._internal.format || 'ndjson';
try {
if (format === 'ndjson') {
this.parseNDJSON();
} else if (format === 'single') {
this.parseSingleJSON();
} else if (format === 'array') {
this.parseJSONArray();
}
} catch (e) {
this._internal.error = e.message;
this._internal.isComplete = false;
this.flagOutputDirty('error');
this.flagOutputDirty('isComplete');
}
},
parseNDJSON: function() {
// NDJSON: one JSON per line
const lines = this._internal.buffer.split('\n');
// Keep last incomplete line
this._internal.buffer = lines.pop();
// Parse complete lines
const parsed = [];
for (const line of lines) {
if (line.trim()) {
try {
parsed.push(JSON.parse(line));
} catch (e) {
console.warn('[JSONStreamParser] Failed to parse line:', line);
}
}
}
if (parsed.length > 0) {
this._internal.parsed = parsed;
this._internal.isComplete = true;
this.flagOutputDirty('parsed');
this.flagOutputDirty('isComplete');
this.sendSignalOnOutput('success');
}
},
parseSingleJSON: function() {
// Try to parse complete JSON object
try {
const parsed = JSON.parse(this._internal.buffer);
this._internal.parsed = parsed;
this._internal.isComplete = true;
this._internal.buffer = '';
this.flagOutputDirty('parsed');
this.flagOutputDirty('isComplete');
this.sendSignalOnOutput('success');
} catch (e) {
// Not complete yet, wait for more chunks
this._internal.isComplete = false;
this.flagOutputDirty('isComplete');
}
},
parseJSONArray: function() {
// JSON array, possibly incomplete: [{"a":1},{"b":2}...
// Try to parse as-is, or add closing bracket
let buffer = this._internal.buffer.trim();
// Try parsing complete array
try {
const parsed = JSON.parse(buffer);
if (Array.isArray(parsed)) {
this._internal.parsed = parsed;
this._internal.isComplete = true;
this._internal.buffer = '';
this.flagOutputDirty('parsed');
this.flagOutputDirty('isComplete');
this.sendSignalOnOutput('success');
return;
}
} catch (e) {
// Not complete, try adding closing bracket
try {
const parsed = JSON.parse(buffer + ']');
if (Array.isArray(parsed)) {
this._internal.parsed = parsed;
this._internal.isComplete = false; // Partial
this.flagOutputDirty('parsed');
this.flagOutputDirty('isComplete');
this.sendSignalOnOutput('success');
}
} catch (e2) {
// Still not parseable
this._internal.isComplete = false;
this.flagOutputDirty('isComplete');
}
}
}
}
};
module.exports = {
node: JSONStreamParserNode
};
```
### Pattern Extractor Implementation (abbreviated)
```javascript
// patternextractornode.js
var PatternExtractorNode = {
name: 'net.noodl.PatternExtractor',
displayNodeName: 'Pattern Extractor',
category: 'Data',
color: 'green',
inputs: {
text: { type: 'string', displayName: 'Text' },
pattern: { type: 'string', displayName: 'Pattern' },
extract: { type: 'signal', displayName: 'Extract' },
extractAll: { type: 'boolean', displayName: 'Extract All', default: false }
},
outputs: {
match: { type: 'string', displayName: 'Match' },
matches: { type: 'array', displayName: 'Matches' },
groups: { type: 'array', displayName: 'Groups' },
found: { type: 'signal', displayName: 'Found' },
notFound: { type: 'signal', displayName: 'Not Found' }
},
methods: {
doExtract: function() {
const text = this._internal.text;
const pattern = this._internal.pattern;
if (!text || !pattern) return;
try {
const regex = new RegExp(pattern, this._internal.extractAll ? 'g' : '');
const matches = this._internal.extractAll
? [...text.matchAll(new RegExp(pattern, 'g'))]
: [text.match(regex)];
if (matches && matches[0]) {
this._internal.match = matches[0][0];
this._internal.matches = matches.map(m => m[0]);
this._internal.groups = matches[0].slice(1);
this.flagOutputDirty('match');
this.flagOutputDirty('matches');
this.flagOutputDirty('groups');
this.sendSignalOnOutput('found');
} else {
this.sendSignalOnOutput('notFound');
}
} catch (e) {
console.error('[PatternExtractor] Invalid regex:', e);
}
}
}
};
```
---
## Usage Examples
### Example 1: AI Chat Streaming (Erleah)
```
[SSE] data → text chunks
[Text Accumulator]
delimiter: "" // No delimiter, accumulate all
chunk: data
add
[Text Accumulator] accumulated
→ [Text] display streaming message
// Real-time text appears as AI types!
```
### Example 2: NDJSON Stream
```
[SSE] data → NDJSON chunks
[JSON Stream Parser]
format: "ndjson"
chunk: data
parse
[JSON Stream Parser] parsed → array of objects
[For Each] object in array
→ [Process each object]
```
### Example 3: Extract Progress
```
[SSE] data → "Processing... 45% complete"
[Pattern Extractor]
text: data
pattern: "(\d+)%"
extract
[Pattern Extractor] groups → [0] = "45"
→ [Number] 45
→ [Progress Bar] value
```
### Example 4: Buffered Updates
```
[SSE] data → frequent updates
[Stream Buffer]
data: item
add
flushInterval: 1000 // Flush every second
[Stream Buffer] flushed
[Stream Buffer] flushedData → batched items
→ [Process batch at once]
// Reduces processing overhead
```
---
## Testing Checklist
### Functional Tests
- [ ] Text accumulator handles chunks correctly
- [ ] NDJSON parser splits on newlines
- [ ] Single JSON waits for complete object
- [ ] Array JSON handles incomplete arrays
- [ ] Pattern extractor finds matches
- [ ] Capture groups extracted correctly
- [ ] Buffer flushes on size/interval
- [ ] Clear operations work
### Edge Cases
- [ ] Empty chunks
- [ ] Very large chunks (>1MB)
- [ ] Malformed JSON
- [ ] Invalid regex patterns
- [ ] No matches found
- [ ] Buffer overflow
- [ ] Rapid chunks (stress test)
- [ ] Unicode/emoji handling
### Performance
- [ ] No memory leaks with long streams
- [ ] Regex doesn't cause ReDoS
- [ ] Large buffer doesn't freeze UI
---
## Documentation Requirements
### User-Facing Docs
Create: `docs/nodes/data/stream-utilities.md`
```markdown
# Stream Utilities
Tools for working with streaming data from SSE, WebSocket, or chunked HTTP responses.
## Text Accumulator
Collect text chunks into complete messages:
```
[SSE] → chunks
[Text Accumulator] → complete messages
```
Use cases:
- AI chat streaming
- Log streaming
- Progress messages
## JSON Stream Parser
Parse streaming JSON in various formats:
- **NDJSON**: One JSON per line
- **Single**: Wait for complete object
- **Array**: Parse partial JSON arrays
## Pattern Extractor
Extract values using regex:
```
Text: "Status: 45% complete"
Pattern: "(\d+)%"
→ Match: "45"
```
Use cases:
- Extract progress percentages
- Parse structured logs
- Find specific values
## Stream Buffer
Batch frequent updates:
```
[Rapid Updates] → [Buffer] → [Batch Process]
```
Reduces processing overhead.
## Best Practices
1. **Set max lengths**: Prevent memory issues
2. **Handle parse errors**: JSON might be incomplete
3. **Use delimiters**: For message boundaries
4. **Batch when possible**: Reduce processing
```
---
## Success Criteria
1. ✅ Handles streaming data reliably
2. ✅ Parses NDJSON correctly
3. ✅ Regex extraction works
4. ✅ No memory leaks
5. ✅ Clear documentation
6. ✅ Works with AGENT-001 (SSE) for Erleah
---
## Future Enhancements
1. **XML Stream Parser** - Parse chunked XML
2. **CSV Stream Parser** - Parse CSV row-by-row
3. **Binary Parsers** - Protocol buffers, msgpack
4. **Compression** - Decompress gzip/deflate streams
5. **Encoding Detection** - Auto-detect UTF-8/UTF-16
---
## References
- [NDJSON Spec](http://ndjson.org/)
- [JSON Streaming](https://en.wikipedia.org/wiki/JSON_streaming)
---
## Dependencies
- None (pure utilities)
## Blocked By
- None (can be developed independently)
## Blocks
- None (but enhances AGENT-001, AGENT-002)
---
## Estimated Effort Breakdown
| Phase | Estimate | Description |
|-------|----------|-------------|
| Text Accumulator | 0.5 day | Basic chunking logic |
| JSON Parser | 1 day | NDJSON, single, array formats |
| Pattern Extractor | 0.5 day | Regex wrapper |
| Stream Buffer | 0.5 day | Time/size-based flushing |
| Testing | 0.5 day | Edge cases, performance |
| Documentation | 0.5 day | User docs, examples |
**Total: 3.5 days**
Buffer: None needed
**Final: 2-3 days**