feat(local-backend): add WorkflowRunner for visual workflow execution

TASK-007C: Workflow Runtime Integration

- Add WorkflowRunner class to manage CloudRunner instances
- Integrate WorkflowRunner with LocalBackendServer
- Add workflow IPC handlers to BackendManager:
  - backend:update-workflow - Deploy/update a workflow
  - backend:reload-workflows - Hot reload all workflows
  - backend:workflow-status - Get workflow status
- LocalBackendServer now handles /functions/:name endpoints via WorkflowRunner
- WorkflowRunner loads .workflow.json files from backends/{id}/workflows/
This commit is contained in:
Richard Osborne
2026-01-15 17:44:11 +01:00
parent ddcb9cd02e
commit 98fa779548
4 changed files with 547 additions and 3 deletions

View File

@@ -100,6 +100,19 @@ class BackendManager {
return this.exportSchema(id, format);
});
// Workflow management
ipcMain.handle('backend:update-workflow', async (_, args) => {
return this.updateWorkflow(args.backendId, args.name, args.workflow);
});
ipcMain.handle('backend:reload-workflows', async (_, id) => {
return this.reloadWorkflows(id);
});
ipcMain.handle('backend:workflow-status', async (_, id) => {
return this.getWorkflowStatus(id);
});
this.ipcHandlersSetup = true;
}
@@ -234,6 +247,7 @@ class BackendManager {
id: config.id,
name: config.name,
dbPath: path.join(backendPath, 'data', 'local.db'),
workflowsPath: path.join(backendPath, 'workflows'),
port: config.port
});
@@ -346,6 +360,51 @@ class BackendManager {
this.runningBackends.clear();
}
// ==========================================================================
// WORKFLOW MANAGEMENT
// ==========================================================================
/**
* Update/deploy a workflow to a backend
* @param {string} backendId - Backend ID
* @param {string} name - Workflow name
* @param {Object} workflow - Workflow export data
*/
async updateWorkflow(backendId, name, workflow) {
const server = this.runningBackends.get(backendId);
if (!server) {
throw new Error('Backend must be running to update workflows');
}
return server.updateWorkflow(name, workflow);
}
/**
* Reload all workflows for a backend
* @param {string} backendId - Backend ID
*/
async reloadWorkflows(backendId) {
const server = this.runningBackends.get(backendId);
if (!server) {
throw new Error('Backend must be running to reload workflows');
}
return server.reloadWorkflows();
}
/**
* Get workflow status for a backend
* @param {string} backendId - Backend ID
*/
getWorkflowStatus(backendId) {
const server = this.runningBackends.get(backendId);
if (!server) {
return { initialized: false, workflowCount: 0, functions: [] };
}
return server.getWorkflowStatus();
}
}
// Singleton instance

View File

@@ -15,6 +15,8 @@ const http = require('http');
const path = require('path');
const EventEmitter = require('events');
const { WorkflowRunner } = require('./WorkflowRunner');
// Using native http.IncomingMessage handling instead of Express for lighter weight
// This keeps the main process simple and avoids additional dependencies
@@ -46,11 +48,13 @@ class LocalBackendServer {
* @param {string} config.name - Backend display name
* @param {string} config.dbPath - Path to SQLite database
* @param {number} config.port - Port to listen on
* @param {string} config.workflowsPath - Path to workflows directory
*/
constructor(config) {
this.config = config;
this.server = null;
this.adapter = null;
this.workflowRunner = null;
this.events = new EventEmitter();
this.wsClients = new Set();
}
@@ -412,12 +416,34 @@ class LocalBackendServer {
/**
* POST /functions/:name - Execute cloud function
* Placeholder - will be implemented with CloudRunner
* Executes a visual workflow via the WorkflowRunner
*/
async handleFunction(res, functionName, body, headers) {
// TODO: Integrate with CloudRunner when TASK-007C is complete
if (!this.workflowRunner) {
return this.sendError(res, 501, 'Workflows not initialized');
}
safeLog(`Cloud function called: ${functionName}`);
this.sendError(res, 501, 'Cloud functions not yet implemented');
// Build request object matching CloudRunner's expected format
const request = {
body: JSON.stringify(body),
headers: headers
};
try {
const response = await this.workflowRunner.run(functionName, request);
res.writeHead(response.statusCode, {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
'Access-Control-Allow-Headers': '*'
});
res.end(response.body);
} catch (error) {
safeLog('Function execution error:', error);
this.sendError(res, 500, error.message);
}
}
// ==========================================================================
@@ -455,6 +481,24 @@ class LocalBackendServer {
// Initialize adapter
await this.initAdapter();
// Initialize WorkflowRunner if workflows path is provided
if (this.config.workflowsPath) {
this.workflowRunner = new WorkflowRunner({
workflowsPath: this.config.workflowsPath,
adapter: this.adapter,
enableDebugInspectors: false
});
try {
await this.workflowRunner.initialize();
await this.workflowRunner.loadWorkflows();
safeLog(`WorkflowRunner initialized with ${this.workflowRunner.getAvailableFunctions().length} functions`);
} catch (e) {
safeLog('WorkflowRunner initialization failed (workflows disabled):', e.message);
// Don't fail server startup if workflows can't be initialized
}
}
// Create HTTP server
this.server = http.createServer((req, res) => {
this.handleRequest(req, res);
@@ -507,6 +551,45 @@ class LocalBackendServer {
getAdapter() {
return this.adapter;
}
/**
* Get WorkflowRunner for direct access
*/
getWorkflowRunner() {
return this.workflowRunner;
}
/**
* Update a workflow (hot reload)
* @param {string} name - Workflow name
* @param {Object} exportData - Workflow export data
*/
async updateWorkflow(name, exportData) {
if (!this.workflowRunner) {
return { success: false, error: 'Workflows not initialized' };
}
return this.workflowRunner.loadWorkflow(name, exportData);
}
/**
* Reload all workflows
*/
async reloadWorkflows() {
if (!this.workflowRunner) {
return { success: false, error: 'Workflows not initialized' };
}
return this.workflowRunner.reloadWorkflows();
}
/**
* Get workflow status
*/
getWorkflowStatus() {
if (!this.workflowRunner) {
return { initialized: false, workflowCount: 0, functions: [] };
}
return this.workflowRunner.getStatus();
}
}
module.exports = { LocalBackendServer, generateObjectId };

View File

@@ -0,0 +1,400 @@
/**
* WorkflowRunner
*
* Manages CloudRunner instances for executing visual workflows.
* This integrates the noodl-viewer-cloud CloudRunner with the LocalBackendServer,
* providing database access to workflow nodes via the LocalSQLAdapter.
*
* @module local-backend/WorkflowRunner
*/
const fs = require('fs').promises;
const path = require('path');
const EventEmitter = require('events');
/**
* Safe console.log wrapper to prevent EPIPE errors
*/
function safeLog(...args) {
try {
console.log('[WorkflowRunner]', ...args);
} catch (e) {
// Ignore EPIPE errors
}
}
/**
* WorkflowRunner class
*
* Loads and executes visual workflows using the CloudRunner from noodl-viewer-cloud.
* Workflows are JSON exports from the editor that contain cloud function components.
*/
class WorkflowRunner {
/**
* @param {Object} options
* @param {string} options.workflowsPath - Path to workflows directory
* @param {Object} options.adapter - LocalSQLAdapter instance for database access
* @param {boolean} [options.enableDebugInspectors=false] - Enable debug inspectors
*/
constructor(options) {
this.workflowsPath = options.workflowsPath;
this.adapter = options.adapter;
this.enableDebugInspectors = options.enableDebugInspectors || false;
this.cloudRunner = null;
this.loadedWorkflows = new Map(); // name -> export data
this.events = new EventEmitter();
this.isInitialized = false;
}
/**
* Initialize the CloudRunner instance
*/
async initialize() {
if (this.isInitialized) {
return;
}
try {
// Dynamically import CloudRunner from noodl-viewer-cloud
// This package may not be available in all builds, so we handle the error gracefully
const viewerCloudPath = this.findViewerCloudPath();
if (viewerCloudPath) {
const { CloudRunner } = require(viewerCloudPath);
this.cloudRunner = new CloudRunner({
enableDebugInspectors: this.enableDebugInspectors,
connectToEditor: false // We're running standalone
});
// Inject our adapter into the runtime context
// This allows workflow nodes to access the database
this.injectAdapterIntoContext();
safeLog('CloudRunner initialized');
this.isInitialized = true;
} else {
safeLog('noodl-viewer-cloud not found, workflows disabled');
}
} catch (error) {
safeLog('Failed to initialize CloudRunner:', error.message);
// Don't throw - workflow support is optional
}
}
/**
* Find the path to noodl-viewer-cloud package
*/
findViewerCloudPath() {
const possiblePaths = [
// Development: relative path from editor
path.resolve(__dirname, '..', '..', '..', '..', '..', 'noodl-viewer-cloud', 'src', 'index.ts'),
// Built: node_modules
'@noodl/viewer-cloud',
// Alternative built path
path.resolve(__dirname, '..', '..', '..', '..', '..', 'noodl-viewer-cloud', 'dist', 'index.js')
];
for (const p of possiblePaths) {
try {
require.resolve(p);
return p;
} catch (e) {
// Path not available, try next
}
}
return null;
}
/**
* Inject the LocalSQLAdapter into the CloudRunner context
* This allows workflow nodes to perform database operations
*/
injectAdapterIntoContext() {
if (!this.cloudRunner) return;
// Access the runtime context and inject our adapter
// The runtime has a context property that nodes can access
const runtime = this.cloudRunner.runtime;
if (runtime && runtime.context) {
// Add a method to get the local adapter
runtime.context.getLocalAdapter = () => this.adapter;
// Also inject into Services if needed
if (!runtime.Services) {
runtime.Services = {};
}
runtime.Services.LocalBackend = {
getAdapter: () => this.adapter
};
safeLog('Adapter injected into runtime context');
}
}
/**
* Load all workflows from the workflows directory
*/
async loadWorkflows() {
if (!this.cloudRunner) {
safeLog('CloudRunner not initialized, skipping workflow load');
return;
}
try {
// Ensure directory exists
await fs.mkdir(this.workflowsPath, { recursive: true });
const files = await fs.readdir(this.workflowsPath);
const workflowFiles = files.filter((f) => f.endsWith('.workflow.json'));
safeLog(`Found ${workflowFiles.length} workflow files`);
// Load each workflow
for (const file of workflowFiles) {
try {
const filePath = path.join(this.workflowsPath, file);
const content = await fs.readFile(filePath, 'utf-8');
const exportData = JSON.parse(content);
// Extract workflow name from file (without .workflow.json)
const workflowName = file.replace('.workflow.json', '');
// Store for later reference
this.loadedWorkflows.set(workflowName, exportData);
// Load into CloudRunner
await this.cloudRunner.load(exportData);
safeLog(`Loaded workflow: ${workflowName}`);
} catch (e) {
safeLog(`Failed to load workflow ${file}:`, e.message);
}
}
this.events.emit('workflowsLoaded', {
count: this.loadedWorkflows.size,
names: Array.from(this.loadedWorkflows.keys())
});
} catch (error) {
if (error.code === 'ENOENT') {
safeLog('Workflows directory does not exist yet');
} else {
safeLog('Error loading workflows:', error.message);
}
}
}
/**
* Load or update a single workflow
* @param {string} name - Workflow name (without .workflow.json extension)
* @param {Object} exportData - The workflow export data
*/
async loadWorkflow(name, exportData) {
if (!this.cloudRunner) {
return { success: false, error: 'CloudRunner not initialized' };
}
try {
// Save to file
const filePath = path.join(this.workflowsPath, `${name}.workflow.json`);
await fs.mkdir(this.workflowsPath, { recursive: true });
await fs.writeFile(filePath, JSON.stringify(exportData, null, 2));
// Update in-memory
this.loadedWorkflows.set(name, exportData);
// Reload into CloudRunner
await this.cloudRunner.load(exportData);
safeLog(`Workflow updated: ${name}`);
this.events.emit('workflowUpdated', { name });
return { success: true };
} catch (error) {
safeLog(`Failed to update workflow ${name}:`, error.message);
return { success: false, error: error.message };
}
}
/**
* Delete a workflow
* @param {string} name - Workflow name
*/
async deleteWorkflow(name) {
try {
const filePath = path.join(this.workflowsPath, `${name}.workflow.json`);
await fs.unlink(filePath);
this.loadedWorkflows.delete(name);
safeLog(`Workflow deleted: ${name}`);
this.events.emit('workflowDeleted', { name });
return { success: true };
} catch (error) {
safeLog(`Failed to delete workflow ${name}:`, error.message);
return { success: false, error: error.message };
}
}
/**
* Reload all workflows (hot reload)
*/
async reloadWorkflows() {
// Clear existing workflows
this.loadedWorkflows.clear();
// Re-initialize CloudRunner
if (this.cloudRunner) {
// Create new instance to clear state
const viewerCloudPath = this.findViewerCloudPath();
if (viewerCloudPath) {
const { CloudRunner } = require(viewerCloudPath);
this.cloudRunner = new CloudRunner({
enableDebugInspectors: this.enableDebugInspectors,
connectToEditor: false
});
this.injectAdapterIntoContext();
}
}
// Load all workflows again
await this.loadWorkflows();
safeLog('Workflows reloaded');
this.events.emit('workflowsReloaded', {
count: this.loadedWorkflows.size
});
return { success: true, count: this.loadedWorkflows.size };
}
/**
* Execute a workflow/cloud function
* @param {string} functionName - Name of the function to execute
* @param {Object} request - Request object with body and headers
* @returns {Promise<{statusCode: number, body: string}>}
*/
async run(functionName, request) {
if (!this.cloudRunner) {
return {
statusCode: 501,
body: JSON.stringify({ error: 'Workflows not enabled (CloudRunner not available)' })
};
}
// Check if we have this function loaded
if (!this.hasFunction(functionName)) {
return {
statusCode: 404,
body: JSON.stringify({ error: `Function '${functionName}' not found` })
};
}
const startTime = Date.now();
try {
safeLog(`Executing function: ${functionName}`);
// Execute via CloudRunner
const response = await this.cloudRunner.run(functionName, request);
const duration = Date.now() - startTime;
safeLog(`Function ${functionName} completed in ${duration}ms`);
// Emit execution event for logging/debugging
this.events.emit('functionExecuted', {
functionName,
duration,
statusCode: response.statusCode,
success: response.statusCode >= 200 && response.statusCode < 300
});
return response;
} catch (error) {
const duration = Date.now() - startTime;
safeLog(`Function ${functionName} failed after ${duration}ms:`, error.message);
this.events.emit('functionExecuted', {
functionName,
duration,
statusCode: 500,
success: false,
error: error.message
});
return {
statusCode: 500,
body: JSON.stringify({ error: error.message })
};
}
}
/**
* Check if a function is available
* @param {string} functionName
*/
hasFunction(functionName) {
// CloudRunner looks for components starting with /#__cloud__/
// Check if any loaded workflow has this component
for (const [, exportData] of this.loadedWorkflows) {
if (exportData.components) {
const fullName = `/#__cloud__/${functionName}`;
if (exportData.components.some((c) => c.name === fullName)) {
return true;
}
}
}
return false;
}
/**
* Get list of available functions
*/
getAvailableFunctions() {
const functions = [];
for (const [workflowName, exportData] of this.loadedWorkflows) {
if (exportData.components) {
for (const component of exportData.components) {
if (component.name.startsWith('/#__cloud__/')) {
functions.push({
name: component.name.replace('/#__cloud__/', ''),
workflow: workflowName
});
}
}
}
}
return functions;
}
/**
* Get status information
*/
getStatus() {
return {
initialized: this.isInitialized,
cloudRunnerAvailable: !!this.cloudRunner,
workflowCount: this.loadedWorkflows.size,
functions: this.getAvailableFunctions()
};
}
/**
* Subscribe to events
*/
on(event, handler) {
this.events.on(event, handler);
}
off(event, handler) {
this.events.off(event, handler);
}
}
module.exports = { WorkflowRunner };

View File

@@ -18,11 +18,13 @@
const { LocalBackendServer, generateObjectId } = require('./LocalBackendServer');
const { BackendManager, backendManager, setupBackendIPC } = require('./BackendManager');
const { WorkflowRunner } = require('./WorkflowRunner');
module.exports = {
// Classes
LocalBackendServer,
BackendManager,
WorkflowRunner,
// Singleton instance
backendManager,