Files
OpenNoodl/dev-docs/tasks/phase-6-uba-system/UBA-005-REFERENCE-BACKEND.md

55 KiB

Phase 6E: UBA Reference Backend

Erleah AI Agent - Full UBA Implementation

Phase: 6E of 6
Duration: 3 weeks (15 working days)
Priority: HIGH
Status: NOT STARTED
Depends On: Phase 6A-6D complete


Overview

Phase 6E implements a complete, production-ready UBA backend using the Erleah AI Agent. This serves as both a real product deliverable for Visual Hive and the definitive reference implementation for the UBA specification.

The Erleah backend will demonstrate:

  • Complete schema with all field types
  • Config endpoint with validation and hot-reload
  • Debug streaming for agent observability
  • Health monitoring
  • Best practices for UBA integration

Dual Purpose

Visual Hive CTO Hat Low Code Foundation Hat
Production AI agent for conferences Reference implementation for community
Real-world validation of UBA Documentation and examples
Solves client configuration needs Proves UBA works at scale

Goals

  1. Complete Erleah schema - Full-featured schema demonstrating all capabilities
  2. Config endpoint - Receive, validate, and apply configuration
  3. Debug streaming - Real-time agent execution visibility
  4. Schema endpoint - Serve schema with proper caching
  5. End-to-end testing - Full integration validation
  6. Documentation - Erleah-specific setup and usage guides

Prerequisites

  • Phase 6A-6D complete
  • Erleah Python backend repository
  • FastAPI server running
  • LangGraph agent functional

Task Breakdown

UBA-023: Erleah Schema

Effort: 2 days
Assignee: TBD
Branch: feature/uba-023-erleah-schema

Description

Create the complete Erleah schema file demonstrating all UBA capabilities relevant to an AI agent backend.

Schema Structure

# erleah/nodegx-schema.yaml

schema_version: "1.0"

backend:
  id: "erleah-ai-agent"
  name: "Erleah AI Agent"
  description: "Agentic AI assistant for conference attendees, exhibitors, and VIP guests"
  version: "1.0.0"
  icon: "https://erleah.com/assets/icon.png"
  homepage: "https://docs.erleah.com"
  
  endpoints:
    config: "/nodegx/config"
    health: "/health"
    debug_stream: "/nodegx/debug"
    
  auth:
    type: "bearer"
    header: "X-Nodegx-Token"
    
  capabilities:
    hot_reload: true
    debug: true
    batch_config: false

# =============================================================================
# SECTION 1: Data Sources
# =============================================================================
sections:
  - id: "data_sources"
    name: "Data Sources"
    description: "Configure where your conference data comes from"
    icon: "database"
    
    fields:
      # Primary data backend selection
      - id: "primary_backend"
        type: "backend_reference"
        name: "Primary Data Backend"
        description: "Select your Directus or Supabase instance containing conference data"
        required: true
        backend_types: ["directus", "supabase"]
        
      # Attendee configuration
      - id: "attendee_collection"
        type: "directus_collection"
        name: "Attendee Collection"
        description: "Collection containing attendee/participant data"
        backend_field: "primary_backend"
        required: true
        
      - id: "attendee_mappings"
        type: "field_mapping"
        name: "Attendee Field Mappings"
        description: "Map your attendee fields to Erleah's expected format"
        
        source:
          type: "directus_fields"
          backend_field: "primary_backend"
          collection_field: "attendee_collection"
          
        targets:
          - id: "name"
            name: "Full Name"
            description: "Attendee's display name"
            required: true
            
          - id: "email"
            name: "Email"
            description: "Contact email (used for identification)"
            required: true
            
          - id: "title"
            name: "Job Title"
            description: "Professional title or role"
            required: false
            
          - id: "company"
            name: "Company/Organization"
            description: "Employer or organization name"
            required: false
            
          - id: "bio"
            name: "Biography"
            description: "Description or bio text"
            required: true
            hint: "Used for semantic search - longer, descriptive text works best"
            
          - id: "interests"
            name: "Interests/Tags"
            description: "Topics, skills, or interests"
            required: false
            array: true
            hint: "Array of keywords or tags"
            
          - id: "photo_url"
            name: "Photo URL"
            description: "Profile photo URL"
            required: false
            
          - id: "location"
            name: "Current Location"
            description: "Real-time or default location"
            required: false
            hint: "Required for proximity features"
            
      # Session configuration
      - id: "session_collection"
        type: "directus_collection"
        name: "Session Collection"
        description: "Collection containing conference sessions/talks"
        backend_field: "primary_backend"
        
      - id: "session_mappings"
        type: "field_mapping"
        name: "Session Field Mappings"
        
        source:
          type: "directus_fields"
          backend_field: "primary_backend"
          collection_field: "session_collection"
          
        targets:
          - id: "title"
            name: "Session Title"
            required: true
            
          - id: "description"
            name: "Description"
            required: true
            hint: "Used for semantic search"
            
          - id: "speaker_name"
            name: "Speaker Name"
            required: false
            
          - id: "speaker_id"
            name: "Speaker ID"
            description: "Reference to attendee record"
            required: false
            
          - id: "start_time"
            name: "Start Time"
            required: true
            
          - id: "end_time"
            name: "End Time"
            required: true
            
          - id: "location"
            name: "Room/Location"
            required: false
            
          - id: "track"
            name: "Track/Category"
            required: false
            
          - id: "capacity"
            name: "Capacity"
            required: false
            
      # Exhibitor configuration
      - id: "exhibitor_collection"
        type: "directus_collection"
        name: "Exhibitor Collection"
        description: "Collection containing exhibitor/sponsor data"
        backend_field: "primary_backend"
        
      - id: "exhibitor_mappings"
        type: "field_mapping"
        name: "Exhibitor Field Mappings"
        visible_when:
          field: "data_sources.exhibitor_collection"
          is_not_empty: true
          
        source:
          type: "directus_fields"
          backend_field: "primary_backend"
          collection_field: "exhibitor_collection"
          
        targets:
          - id: "name"
            name: "Company Name"
            required: true
            
          - id: "description"
            name: "Description"
            required: true
            
          - id: "booth_location"
            name: "Booth Location"
            required: false
            
          - id: "website"
            name: "Website URL"
            required: false
            
          - id: "categories"
            name: "Categories/Tags"
            array: true
            required: false

# =============================================================================
# SECTION 2: Vector Database
# =============================================================================
  - id: "vector_db"
    name: "Vector Database"
    description: "Configure semantic search capabilities"
    icon: "search"
    
    fields:
      - id: "qdrant_url"
        type: "url"
        name: "Qdrant URL"
        description: "URL of your Qdrant vector database"
        required: true
        default: "http://localhost:6333"
        protocols: ["http", "https"]
        
      - id: "qdrant_api_key"
        type: "secret"
        name: "Qdrant API Key"
        description: "Leave empty for local development without auth"
        required: false
        
      - id: "embedding_provider"
        type: "select"
        name: "Embedding Provider"
        description: "Service to use for generating embeddings"
        options:
          - value: "openai"
            label: "OpenAI"
          - value: "voyage"
            label: "Voyage AI"
          - value: "cohere"
            label: "Cohere"
        default: "openai"
        
      - id: "embedding_model"
        type: "select"
        name: "Embedding Model"
        options_from:
          endpoint: "/nodegx/embedding-models"
          value_field: "id"
          label_field: "name"
          
      - id: "collection_prefix"
        type: "string"
        name: "Collection Prefix"
        description: "Prefix for Qdrant collection names (useful for multi-tenant)"
        default: "erleah_"
        validation:
          pattern: "^[a-z][a-z0-9_]*$"
          pattern_message: "Use lowercase letters, numbers, and underscores only"
          
      - id: "auto_index"
        type: "boolean"
        name: "Auto-Index on Startup"
        description: "Automatically index/update vectors when agent starts"
        default: true

# =============================================================================
# SECTION 3: Agent Tools
# =============================================================================
  - id: "tools"
    name: "Agent Tools"
    description: "Enable or disable agent capabilities"
    icon: "wrench"
    
    fields:
      - id: "attendee_search"
        type: "tool_toggle"
        name: "Attendee Search"
        description: "Find attendees by interests, role, company, or description"
        icon: "users"
        default: true
        
        config:
          - id: "default_limit"
            type: "number"
            name: "Default Result Limit"
            default: 10
            min: 1
            max: 50
            
          - id: "min_similarity"
            type: "slider"
            name: "Minimum Similarity Score"
            description: "Filter out results below this threshold"
            min: 0
            max: 1
            step: 0.05
            default: 0.65
            marks:
              - value: 0.5
                label: "Lenient"
              - value: 0.7
                label: "Balanced"
              - value: 0.9
                label: "Strict"
                
      - id: "session_search"
        type: "tool_toggle"
        name: "Session Search"
        description: "Find sessions by topic, speaker, or track"
        icon: "calendar"
        default: true
        
        config:
          - id: "include_past"
            type: "boolean"
            name: "Include Past Sessions"
            description: "Show sessions that have already ended"
            default: false
            
          - id: "default_limit"
            type: "number"
            name: "Default Result Limit"
            default: 10
            
      - id: "exhibitor_search"
        type: "tool_toggle"
        name: "Exhibitor Search"
        description: "Find exhibitors by product, service, or category"
        icon: "building"
        default: true
        
        depends_on:
          field: "data_sources.exhibitor_collection"
          condition: "is_not_empty"
          message: "Configure exhibitor collection first"
          
      - id: "proximity_search"
        type: "tool_toggle"
        name: "Proximity Search"
        description: "Find people and places near a location"
        icon: "map-pin"
        default: true
        
        depends_on:
          field: "data_sources.attendee_mappings.location"
          condition: "is_mapped"
          message: "Map the location field to enable proximity search"
          
        config:
          - id: "default_radius"
            type: "number"
            name: "Default Search Radius"
            default: 100
            min: 10
            max: 1000
            unit: "meters"
            
          - id: "location_update_interval"
            type: "number"
            name: "Location Update Interval"
            description: "How often to refresh location data"
            default: 60
            unit: "seconds"
            
      - id: "schedule_management"
        type: "tool_toggle"
        name: "Schedule Management"
        description: "Help users build and manage their conference agenda"
        icon: "clipboard-list"
        default: true
        
        config:
          - id: "conflict_detection"
            type: "boolean"
            name: "Detect Schedule Conflicts"
            description: "Warn when sessions overlap"
            default: true
            
          - id: "travel_time_buffer"
            type: "number"
            name: "Travel Time Buffer"
            description: "Minutes to allow between sessions in different locations"
            default: 10
            min: 0
            max: 30
            unit: "minutes"
            
          - id: "max_sessions_per_day"
            type: "number"
            name: "Max Sessions per Day"
            description: "Suggest breaks when schedule is too packed"
            default: 8
            
      - id: "connection_recommendations"
        type: "tool_toggle"
        name: "Connection Recommendations"
        description: "Suggest relevant people to meet based on interests and goals"
        icon: "user-plus"
        default: true
        
        config:
          - id: "recommendation_count"
            type: "number"
            name: "Recommendations per Request"
            default: 5
            min: 1
            max: 20
            
          - id: "factors"
            type: "multi_select"
            name: "Recommendation Factors"
            description: "What to consider when recommending connections"
            options:
              - value: "interests"
                label: "Shared Interests"
              - value: "industry"
                label: "Same Industry"
              - value: "company_size"
                label: "Similar Company Size"
              - value: "role"
                label: "Complementary Roles"
              - value: "location"
                label: "Proximity"
            default: ["interests", "industry", "role"]
            
      - id: "venue_navigation"
        type: "tool_toggle"
        name: "Venue Navigation"
        description: "Help users find their way around the venue"
        icon: "navigation"
        default: false
        
        config:
          - id: "floor_plans"
            type: "file_upload"
            name: "Floor Plan Images"
            description: "Upload venue maps for AI-powered navigation"
            accept: ["image/png", "image/jpeg", "application/pdf"]
            multiple: true
            max_size: "10MB"
            upload_endpoint: "/nodegx/upload/floor-plans"
            
          - id: "use_vision"
            type: "boolean"
            name: "Use Vision AI"
            description: "Analyze floor plans with computer vision"
            default: true

# =============================================================================
# SECTION 4: Language Model
# =============================================================================
  - id: "llm"
    name: "Language Model"
    description: "Configure the AI model powering the agent"
    icon: "cpu"
    
    fields:
      - id: "provider"
        type: "select"
        name: "LLM Provider"
        options:
          - value: "anthropic"
            label: "Anthropic (Claude)"
          - value: "openai"
            label: "OpenAI (GPT)"
        default: "anthropic"
        
      - id: "anthropic_api_key"
        type: "secret"
        name: "Anthropic API Key"
        required: true
        visible_when:
          field: "llm.provider"
          equals: "anthropic"
          
      - id: "anthropic_model"
        type: "select"
        name: "Claude Model"
        visible_when:
          field: "llm.provider"
          equals: "anthropic"
        options:
          - value: "claude-sonnet-4-20250514"
            label: "Claude Sonnet 4 (Recommended)"
          - value: "claude-haiku-4-20250514"
            label: "Claude Haiku 4 (Faster, Cheaper)"
          - value: "claude-opus-4-20250514"
            label: "Claude Opus 4 (Most Capable)"
        default: "claude-sonnet-4-20250514"
        
      - id: "openai_api_key"
        type: "secret"
        name: "OpenAI API Key"
        required: true
        visible_when:
          field: "llm.provider"
          equals: "openai"
          
      - id: "openai_model"
        type: "select"
        name: "OpenAI Model"
        visible_when:
          field: "llm.provider"
          equals: "openai"
        options:
          - value: "gpt-4o"
            label: "GPT-4o (Recommended)"
          - value: "gpt-4o-mini"
            label: "GPT-4o Mini (Cheaper)"
          - value: "gpt-4-turbo"
            label: "GPT-4 Turbo"
        default: "gpt-4o"
        
      - id: "temperature"
        type: "slider"
        name: "Temperature"
        description: "Higher = more creative, Lower = more focused"
        min: 0
        max: 1
        step: 0.1
        default: 0.7
        
      - id: "max_tokens"
        type: "number"
        name: "Max Response Tokens"
        description: "Maximum length of agent responses"
        default: 2048
        min: 256
        max: 8192

# =============================================================================
# SECTION 5: Prompts
# =============================================================================
  - id: "prompts"
    name: "Prompts"
    description: "Customize the agent's personality and behavior"
    icon: "message-square"
    
    fields:
      - id: "system_prompt"
        type: "prompt"
        name: "System Prompt"
        description: "Main instructions defining the agent's behavior"
        rows: 20
        
        variables:
          - name: "conference_name"
            description: "Name of the conference"
            source: "project.name"
            
          - name: "conference_dates"
            description: "Conference date range"
            source: "runtime"
            
          - name: "current_date"
            description: "Today's date"
            source: "system.date"
            
          - name: "current_time"
            description: "Current time"
            source: "system.time"
            
          - name: "attendee_count"
            description: "Number of registered attendees"
            source: "runtime"
            
          - name: "session_count"
            description: "Number of sessions"
            source: "runtime"
            
          - name: "exhibitor_count"
            description: "Number of exhibitors"
            source: "runtime"
            
        default: |
          You are Erleah, an AI conference assistant for {{conference_name}}.
          
          Conference dates: {{conference_dates}}
          Current date and time: {{current_date}} {{current_time}}
          
          Conference statistics:
          - {{attendee_count}} registered attendees
          - {{session_count}} sessions and talks
          - {{exhibitor_count}} exhibitors
          
          Your role is to help conference attendees:
          1. Find relevant sessions and talks matching their interests
          2. Connect with other attendees who share their professional goals
          3. Navigate the venue efficiently
          4. Build an optimized conference schedule
          5. Discover exhibitors relevant to their needs
          
          Guidelines:
          - Be helpful, concise, and proactive
          - When you can anticipate what the user might need next, suggest it
          - Use the user's interests and background to personalize recommendations
          - Consider time constraints and travel between locations
          - Be aware of session capacities and popularity
          
          Always maintain a friendly, professional tone appropriate for a business conference.
          
      - id: "greeting"
        type: "text"
        name: "Greeting Message"
        description: "First message shown to new users"
        rows: 3
        default: "Hi! I'm Erleah, your AI conference assistant. I can help you find sessions, connect with other attendees, and make the most of your conference experience. What would you like to explore?"
        
      - id: "fallback_message"
        type: "text"
        name: "Fallback Message"
        description: "Shown when the agent can't help with a request"
        rows: 2
        default: "I'm not sure I can help with that specific request, but I'd be happy to help you find sessions, connect with other attendees, or navigate the venue. What would be most useful?"

# =============================================================================
# SECTION 6: Advanced Settings
# =============================================================================
  - id: "advanced"
    name: "Advanced Settings"
    description: "Fine-tune agent behavior and performance"
    icon: "settings"
    collapsed: true
    
    fields:
      - id: "request_timeout"
        type: "number"
        name: "Request Timeout"
        description: "Maximum time for a single user request"
        default: 60
        min: 10
        max: 300
        unit: "seconds"
        
      - id: "max_iterations"
        type: "number"
        name: "Max Agent Iterations"
        description: "Maximum tool calls per request (prevents infinite loops)"
        default: 15
        min: 3
        max: 30
        
      - id: "parallel_tool_calls"
        type: "boolean"
        name: "Parallel Tool Calls"
        description: "Execute independent tool calls simultaneously"
        default: true
        
      - id: "cache_enabled"
        type: "boolean"
        name: "Enable Response Caching"
        description: "Cache frequent queries for faster responses"
        default: true
        
      - id: "cache_ttl"
        type: "number"
        name: "Cache TTL"
        description: "How long to cache responses"
        visible_when:
          field: "advanced.cache_enabled"
          equals: true
        default: 300
        unit: "seconds"
        
      - id: "rate_limit_per_user"
        type: "number"
        name: "Rate Limit per User"
        description: "Maximum requests per user per minute"
        default: 20
        min: 1
        max: 100
        
      - id: "logging_level"
        type: "select"
        name: "Logging Level"
        options:
          - value: "error"
            label: "Errors Only"
          - value: "warn"
            label: "Warnings & Errors"
          - value: "info"
            label: "Info (Recommended)"
          - value: "debug"
            label: "Debug (Verbose)"
        default: "info"
        
      - id: "metrics_enabled"
        type: "boolean"
        name: "Enable Metrics"
        description: "Collect usage and performance metrics"
        default: true

# =============================================================================
# DEBUG SCHEMA
# =============================================================================
debug:
  enabled: true
  
  event_types:
    - id: "request_start"
      name: "Request Started"
      fields:
        - id: "user_id"
          type: "string"
        - id: "message"
          type: "string"
        - id: "context"
          type: "object"
          expandable: true
          
    - id: "agent_step"
      name: "Agent Step"
      fields:
        - id: "step"
          type: "string"
          display: "badge"
          colors:
            understand: "#3B82F6"
            plan: "#8B5CF6"
            execute: "#F59E0B"
            reflect: "#06B6D4"
            respond: "#10B981"
            
        - id: "thought"
          type: "string"
          optional: true
          
        - id: "duration_ms"
          type: "number"
          format: "duration"
          
    - id: "tool_call"
      name: "Tool Execution"
      fields:
        - id: "tool"
          type: "string"
          display: "badge"
          color: "#F59E0B"
          
        - id: "args"
          type: "object"
          expandable: true
          
        - id: "result"
          type: "object"
          expandable: true
          
        - id: "result_count"
          type: "number"
          optional: true
          
        - id: "duration_ms"
          type: "number"
          format: "duration"
          
        - id: "cached"
          type: "boolean"
          optional: true
          
        - id: "error"
          type: "string"
          optional: true
          highlight: "error"
          
    - id: "llm_call"
      name: "LLM Call"
      fields:
        - id: "model"
          type: "string"
          
        - id: "prompt_tokens"
          type: "number"
          
        - id: "completion_tokens"
          type: "number"
          
        - id: "total_tokens"
          type: "number"
          
        - id: "cost_usd"
          type: "number"
          format: "currency"
          
        - id: "duration_ms"
          type: "number"
          format: "duration"
          
    - id: "request_end"
      name: "Request Completed"
      fields:
        - id: "status"
          type: "string"
          display: "badge"
          colors:
            success: "#10B981"
            error: "#EF4444"
            timeout: "#F59E0B"
            
        - id: "total_duration_ms"
          type: "number"
          format: "duration"
          
        - id: "tool_calls"
          type: "number"
          
        - id: "llm_calls"
          type: "number"
          
        - id: "total_tokens"
          type: "number"
          
        - id: "total_cost_usd"
          type: "number"
          format: "currency"
          
    - id: "error"
      name: "Error"
      fields:
        - id: "code"
          type: "string"
          highlight: "error"
          
        - id: "message"
          type: "string"
          highlight: "error"
          
        - id: "stack"
          type: "string"
          optional: true
          expandable: true
          
        - id: "recoverable"
          type: "boolean"

Acceptance Criteria

  • Schema validates against UBA specification
  • All 6 sections complete
  • All relevant field types demonstrated
  • Debug schema complete
  • Variables defined for prompts
  • Conditional visibility working

UBA-024: Erleah Config Endpoint

Effort: 3 days
Assignee: TBD
Branch: feature/uba-024-erleah-config

Description

Implement the /nodegx/config endpoint in the Erleah Python backend to receive, validate, and apply configuration from Nodegx.

Files to Create/Modify

erleah/
├── api/
│   └── nodegx/
│       ├── __init__.py
│       ├── router.py
│       ├── config.py
│       └── models.py
├── config/
│   ├── loader.py
│   ├── validator.py
│   └── watcher.py
└── core/
    └── settings.py (update)

Implementation

# erleah/api/nodegx/models.py
from pydantic import BaseModel, Field
from typing import Dict, Any, Optional, List
from datetime import datetime

class ConfigMetadata(BaseModel):
    project_id: str
    project_name: str
    environment: str
    nodegx_version: str

class ConfigRequest(BaseModel):
    config: Dict[str, Any]
    metadata: ConfigMetadata

class ConfigWarning(BaseModel):
    field: str
    message: str
    code: str = "WARNING"

class ConfigError(BaseModel):
    field: str
    message: str
    code: str

class RuntimeValues(BaseModel):
    conference_dates: Optional[str] = None
    attendee_count: int = 0
    session_count: int = 0
    exhibitor_count: int = 0

class ConfigResponse(BaseModel):
    success: bool
    applied_at: str
    warnings: List[ConfigWarning] = []
    errors: List[ConfigError] = []
    runtime_values: Optional[RuntimeValues] = None


# erleah/api/nodegx/config.py
from fastapi import APIRouter, HTTPException, Depends
from .models import ConfigRequest, ConfigResponse, ConfigWarning, ConfigError, RuntimeValues
from erleah.config.loader import ConfigLoader
from erleah.config.validator import ConfigValidator
from erleah.core.settings import settings
from datetime import datetime
import asyncio

router = APIRouter(prefix="/nodegx", tags=["nodegx"])

config_loader = ConfigLoader()
config_validator = ConfigValidator()


@router.post("/config", response_model=ConfigResponse)
async def apply_config(request: ConfigRequest):
    """
    Receive and apply configuration from Nodegx.
    """
    config = request.config
    metadata = request.metadata
    
    warnings: List[ConfigWarning] = []
    errors: List[ConfigError] = []
    
    # Validate configuration
    validation_result = await config_validator.validate(config)
    
    if validation_result.errors:
        # Return validation errors without applying
        return ConfigResponse(
            success=False,
            applied_at=datetime.utcnow().isoformat() + "Z",
            errors=[
                ConfigError(field=e.field, message=e.message, code=e.code)
                for e in validation_result.errors
            ]
        )
    
    # Collect warnings
    for w in validation_result.warnings:
        warnings.append(ConfigWarning(field=w.field, message=w.message))
    
    # Apply configuration
    try:
        await config_loader.apply(config, metadata)
    except Exception as e:
        return ConfigResponse(
            success=False,
            applied_at=datetime.utcnow().isoformat() + "Z",
            errors=[ConfigError(field="", message=str(e), code="APPLY_FAILED")]
        )
    
    # Get runtime values
    runtime_values = await get_runtime_values(config)
    
    return ConfigResponse(
        success=True,
        applied_at=datetime.utcnow().isoformat() + "Z",
        warnings=warnings,
        runtime_values=runtime_values
    )


async def get_runtime_values(config: Dict[str, Any]) -> RuntimeValues:
    """
    Calculate runtime values based on current data.
    """
    from erleah.services.directus import DirectusService
    
    directus = DirectusService(config.get("data_sources", {}))
    
    # Get counts in parallel
    attendee_count, session_count, exhibitor_count = await asyncio.gather(
        directus.get_collection_count(config.get("data_sources", {}).get("attendee_collection")),
        directus.get_collection_count(config.get("data_sources", {}).get("session_collection")),
        directus.get_collection_count(config.get("data_sources", {}).get("exhibitor_collection")),
        return_exceptions=True
    )
    
    # Handle exceptions gracefully
    if isinstance(attendee_count, Exception):
        attendee_count = 0
    if isinstance(session_count, Exception):
        session_count = 0
    if isinstance(exhibitor_count, Exception):
        exhibitor_count = 0
    
    # Get conference dates from sessions
    conference_dates = await directus.get_conference_date_range()
    
    return RuntimeValues(
        conference_dates=conference_dates,
        attendee_count=attendee_count,
        session_count=session_count,
        exhibitor_count=exhibitor_count
    )


# erleah/config/loader.py
from typing import Dict, Any
import json
import asyncio
from pathlib import Path

class ConfigLoader:
    def __init__(self):
        self.config_path = Path("config/nodegx-config.json")
        self.current_config: Dict[str, Any] = {}
        self._subscribers: List[Callable] = []
    
    async def apply(self, config: Dict[str, Any], metadata: Dict[str, Any]):
        """
        Apply new configuration.
        """
        # Store config
        self.current_config = config
        
        # Persist to file (for restarts)
        self.config_path.parent.mkdir(exist_ok=True)
        with open(self.config_path, 'w') as f:
            json.dump({
                "config": config,
                "metadata": metadata,
                "applied_at": datetime.utcnow().isoformat()
            }, f, indent=2)
        
        # Notify subscribers (hot reload)
        await self._notify_subscribers()
    
    async def _notify_subscribers(self):
        """
        Notify all subscribers of config change.
        """
        for subscriber in self._subscribers:
            try:
                if asyncio.iscoroutinefunction(subscriber):
                    await subscriber(self.current_config)
                else:
                    subscriber(self.current_config)
            except Exception as e:
                logger.error(f"Config subscriber error: {e}")
    
    def subscribe(self, callback: Callable):
        """
        Subscribe to config changes.
        """
        self._subscribers.append(callback)
    
    def get(self, path: str, default: Any = None) -> Any:
        """
        Get config value by dot-notation path.
        """
        keys = path.split('.')
        value = self.current_config
        
        for key in keys:
            if isinstance(value, dict):
                value = value.get(key)
            else:
                return default
        
        return value if value is not None else default


# erleah/config/validator.py
from typing import Dict, Any, List
from dataclasses import dataclass

@dataclass
class ValidationIssue:
    field: str
    message: str
    code: str = "VALIDATION_ERROR"

@dataclass
class ValidationResult:
    valid: bool
    errors: List[ValidationIssue]
    warnings: List[ValidationIssue]

class ConfigValidator:
    async def validate(self, config: Dict[str, Any]) -> ValidationResult:
        errors = []
        warnings = []
        
        # Validate data sources
        data_sources = config.get("data_sources", {})
        
        if not data_sources.get("primary_backend"):
            errors.append(ValidationIssue(
                field="data_sources.primary_backend",
                message="Primary backend is required",
                code="REQUIRED"
            ))
        
        if not data_sources.get("attendee_collection"):
            errors.append(ValidationIssue(
                field="data_sources.attendee_collection",
                message="Attendee collection is required",
                code="REQUIRED"
            ))
        
        # Validate field mappings
        attendee_mappings = data_sources.get("attendee_mappings", {})
        if not attendee_mappings.get("name"):
            errors.append(ValidationIssue(
                field="data_sources.attendee_mappings.name",
                message="Name field mapping is required",
                code="REQUIRED"
            ))
        
        if not attendee_mappings.get("bio"):
            warnings.append(ValidationIssue(
                field="data_sources.attendee_mappings.bio",
                message="Bio field not mapped - semantic search will be limited",
                code="RECOMMENDED"
            ))
        
        # Validate LLM config
        llm = config.get("llm", {})
        provider = llm.get("provider", "anthropic")
        
        if provider == "anthropic" and not llm.get("anthropic_api_key"):
            errors.append(ValidationIssue(
                field="llm.anthropic_api_key",
                message="Anthropic API key is required",
                code="REQUIRED"
            ))
        elif provider == "openai" and not llm.get("openai_api_key"):
            errors.append(ValidationIssue(
                field="llm.openai_api_key",
                message="OpenAI API key is required",
                code="REQUIRED"
            ))
        
        # Validate vector DB
        vector_db = config.get("vector_db", {})
        if not vector_db.get("qdrant_url"):
            errors.append(ValidationIssue(
                field="vector_db.qdrant_url",
                message="Qdrant URL is required",
                code="REQUIRED"
            ))
        
        # Check tool dependencies
        tools = config.get("tools", {})
        if tools.get("proximity_search", {}).get("enabled"):
            if not attendee_mappings.get("location"):
                warnings.append(ValidationIssue(
                    field="tools.proximity_search",
                    message="Proximity search enabled but location field not mapped",
                    code="DEPENDENCY"
                ))
        
        return ValidationResult(
            valid=len(errors) == 0,
            errors=errors,
            warnings=warnings
        )

Acceptance Criteria

  • Config endpoint receives configuration
  • Validation returns errors/warnings
  • Config persisted to file
  • Hot reload notifies subscribers
  • Runtime values calculated
  • Error handling comprehensive

UBA-025: Erleah Debug Stream

Effort: 4 days
Assignee: TBD
Branch: feature/uba-025-erleah-debug

Description

Implement the debug streaming endpoint that sends real-time agent execution events to Nodegx.

Implementation

# erleah/api/nodegx/debug.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import asyncio
import json
from datetime import datetime
from erleah.debug.emitter import DebugEmitter, DebugEvent

router = APIRouter(prefix="/nodegx", tags=["nodegx"])

# Global debug emitter
debug_emitter = DebugEmitter()


@router.websocket("/debug")
async def debug_websocket(websocket: WebSocket):
    """
    WebSocket endpoint for debug events.
    """
    # Verify token from query params
    token = websocket.query_params.get("token")
    if not verify_nodegx_token(token):
        await websocket.close(code=4001, reason="Unauthorized")
        return
    
    await websocket.accept()
    
    # Subscribe to debug events
    queue: asyncio.Queue[DebugEvent] = asyncio.Queue(maxsize=100)
    
    async def event_handler(event: DebugEvent):
        try:
            queue.put_nowait(event)
        except asyncio.QueueFull:
            # Drop oldest event if queue is full
            try:
                queue.get_nowait()
                queue.put_nowait(event)
            except asyncio.QueueEmpty:
                pass
    
    subscription_id = debug_emitter.subscribe(event_handler)
    
    try:
        while True:
            event = await queue.get()
            await websocket.send_json(event.to_dict())
    except WebSocketDisconnect:
        pass
    finally:
        debug_emitter.unsubscribe(subscription_id)


@router.get("/debug")
async def debug_sse():
    """
    SSE endpoint for debug events (fallback for browsers without WebSocket).
    """
    async def event_generator() -> AsyncGenerator[str, None]:
        queue: asyncio.Queue[DebugEvent] = asyncio.Queue(maxsize=100)
        
        async def event_handler(event: DebugEvent):
            try:
                queue.put_nowait(event)
            except asyncio.QueueFull:
                pass
        
        subscription_id = debug_emitter.subscribe(event_handler)
        
        try:
            while True:
                event = await queue.get()
                yield f"event: {event.type}\ndata: {json.dumps(event.to_dict())}\n\n"
        finally:
            debug_emitter.unsubscribe(subscription_id)
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )


# erleah/debug/emitter.py
from typing import Dict, Any, Callable, List
from dataclasses import dataclass, field
from datetime import datetime
import uuid
import asyncio

@dataclass
class DebugEvent:
    type: str
    data: Dict[str, Any]
    request_id: str
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "timestamp": self.timestamp,
            "request_id": self.request_id,
            "type": self.type,
            "data": self.data
        }


class DebugEmitter:
    def __init__(self):
        self._subscribers: Dict[str, Callable] = {}
        self._enabled = True
    
    def subscribe(self, handler: Callable[[DebugEvent], None]) -> str:
        subscription_id = str(uuid.uuid4())
        self._subscribers[subscription_id] = handler
        return subscription_id
    
    def unsubscribe(self, subscription_id: str):
        self._subscribers.pop(subscription_id, None)
    
    async def emit(self, event_type: str, data: Dict[str, Any], request_id: str):
        if not self._enabled or not self._subscribers:
            return
        
        event = DebugEvent(type=event_type, data=data, request_id=request_id)
        
        for handler in self._subscribers.values():
            try:
                if asyncio.iscoroutinefunction(handler):
                    await handler(event)
                else:
                    handler(event)
            except Exception as e:
                # Don't let debug errors affect agent operation
                pass
    
    def enable(self):
        self._enabled = True
    
    def disable(self):
        self._enabled = False


# erleah/debug/context.py
from contextvars import ContextVar
from typing import Optional
import uuid

# Context variable for current request ID
current_request_id: ContextVar[Optional[str]] = ContextVar('request_id', default=None)

def get_request_id() -> str:
    return current_request_id.get() or str(uuid.uuid4())

def set_request_id(request_id: str):
    current_request_id.set(request_id)


# Integration with LangGraph agent
# erleah/agent/instrumented.py
from erleah.debug.emitter import debug_emitter
from erleah.debug.context import get_request_id
import time

class InstrumentedAgent:
    """
    Wrapper that adds debug instrumentation to the agent.
    """
    
    def __init__(self, agent):
        self.agent = agent
    
    async def run(self, message: str, user_context: Dict[str, Any]) -> str:
        request_id = get_request_id()
        start_time = time.time()
        
        # Emit request start
        await debug_emitter.emit("request_start", {
            "user_id": user_context.get("user_id"),
            "message": message,
            "context": user_context
        }, request_id)
        
        total_tokens = 0
        total_cost = 0
        tool_calls = 0
        llm_calls = 0
        
        try:
            # Run agent with step callbacks
            result = await self.agent.run(
                message,
                user_context,
                callbacks={
                    "on_step": lambda step, data: self._on_step(request_id, step, data),
                    "on_tool": lambda tool, args, result, duration: self._on_tool(
                        request_id, tool, args, result, duration
                    ),
                    "on_llm": lambda model, tokens, cost, duration: self._on_llm(
                        request_id, model, tokens, cost, duration
                    )
                }
            )
            
            # Emit request end
            await debug_emitter.emit("request_end", {
                "status": "success",
                "total_duration_ms": int((time.time() - start_time) * 1000),
                "tool_calls": tool_calls,
                "llm_calls": llm_calls,
                "total_tokens": total_tokens,
                "total_cost_usd": total_cost
            }, request_id)
            
            return result
            
        except Exception as e:
            await debug_emitter.emit("error", {
                "code": type(e).__name__,
                "message": str(e),
                "recoverable": False
            }, request_id)
            
            await debug_emitter.emit("request_end", {
                "status": "error",
                "total_duration_ms": int((time.time() - start_time) * 1000),
                "tool_calls": tool_calls,
                "llm_calls": llm_calls
            }, request_id)
            
            raise
    
    async def _on_step(self, request_id: str, step: str, data: Dict[str, Any]):
        await debug_emitter.emit("agent_step", {
            "step": step,
            "thought": data.get("thought"),
            "duration_ms": data.get("duration_ms", 0)
        }, request_id)
    
    async def _on_tool(self, request_id: str, tool: str, args: Dict, result: Any, duration_ms: int):
        nonlocal tool_calls
        tool_calls += 1
        
        await debug_emitter.emit("tool_call", {
            "tool": tool,
            "args": args,
            "result": self._summarize_result(result),
            "result_count": len(result) if isinstance(result, list) else None,
            "duration_ms": duration_ms,
            "cached": False
        }, request_id)
    
    async def _on_llm(self, request_id: str, model: str, tokens: Dict, cost: float, duration_ms: int):
        nonlocal llm_calls, total_tokens, total_cost
        llm_calls += 1
        total_tokens += tokens.get("total", 0)
        total_cost += cost
        
        await debug_emitter.emit("llm_call", {
            "model": model,
            "prompt_tokens": tokens.get("prompt", 0),
            "completion_tokens": tokens.get("completion", 0),
            "total_tokens": tokens.get("total", 0),
            "cost_usd": cost,
            "duration_ms": duration_ms
        }, request_id)
    
    def _summarize_result(self, result: Any) -> Any:
        """
        Summarize large results to avoid overwhelming the debug stream.
        """
        if isinstance(result, list) and len(result) > 5:
            return {
                "_summary": True,
                "count": len(result),
                "first_3": result[:3],
                "truncated": True
            }
        return result

Acceptance Criteria

  • WebSocket endpoint works
  • SSE fallback works
  • Events emitted for all agent steps
  • Tool calls tracked
  • LLM calls tracked
  • Request start/end events
  • Error events
  • No performance impact when no subscribers

UBA-026: Erleah Schema Endpoint

Effort: 1 day
Assignee: TBD
Branch: feature/uba-026-erleah-schema-endpoint

Description

Serve the Erleah schema at the well-known URL with proper caching headers.

Implementation

# erleah/api/nodegx/schema.py
from fastapi import APIRouter
from fastapi.responses import FileResponse, Response
from pathlib import Path
import hashlib

router = APIRouter(tags=["nodegx"])

SCHEMA_PATH = Path(__file__).parent.parent.parent / "nodegx-schema.yaml"


@router.get("/.well-known/nodegx-schema.yaml")
async def get_schema():
    """
    Serve the Nodegx configuration schema.
    """
    if not SCHEMA_PATH.exists():
        return Response(status_code=404, content="Schema not found")
    
    # Calculate ETag from file content
    content = SCHEMA_PATH.read_bytes()
    etag = hashlib.md5(content).hexdigest()
    
    return Response(
        content=content,
        media_type="application/x-yaml",
        headers={
            "Cache-Control": "public, max-age=3600",
            "ETag": f'"{etag}"'
        }
    )

UBA-027: End-to-End Testing

Effort: 3 days
Assignee: TBD
Branch: feature/uba-027-e2e-testing

Description

Create comprehensive integration tests for the full Erleah + Nodegx UBA flow.

Test Scenarios

# tests/integration/test_uba_integration.py
import pytest
from httpx import AsyncClient
from erleah.main import app

@pytest.fixture
async def client():
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client


class TestSchemaEndpoint:
    async def test_schema_returned(self, client):
        response = await client.get("/.well-known/nodegx-schema.yaml")
        assert response.status_code == 200
        assert "schema_version" in response.text
        assert response.headers["content-type"] == "application/x-yaml"
    
    async def test_schema_has_etag(self, client):
        response = await client.get("/.well-known/nodegx-schema.yaml")
        assert "etag" in response.headers
    
    async def test_schema_caching(self, client):
        response1 = await client.get("/.well-known/nodegx-schema.yaml")
        etag = response1.headers["etag"]
        
        response2 = await client.get(
            "/.well-known/nodegx-schema.yaml",
            headers={"If-None-Match": etag}
        )
        assert response2.status_code == 304


class TestConfigEndpoint:
    async def test_valid_config_applied(self, client):
        config = {
            "config": {
                "data_sources": {
                    "primary_backend": "backend_123",
                    "attendee_collection": "attendees",
                    "attendee_mappings": {
                        "name": "full_name",
                        "email": "email_address",
                        "bio": "biography"
                    }
                },
                "vector_db": {
                    "qdrant_url": "http://localhost:6333"
                },
                "llm": {
                    "provider": "anthropic",
                    "anthropic_api_key": "test-key"
                }
            },
            "metadata": {
                "project_id": "proj_123",
                "project_name": "Test Conference",
                "environment": "development",
                "nodegx_version": "2.0.0"
            }
        }
        
        response = await client.post("/nodegx/config", json=config)
        assert response.status_code == 200
        
        data = response.json()
        assert data["success"] is True
        assert "applied_at" in data
        assert "runtime_values" in data
    
    async def test_invalid_config_rejected(self, client):
        config = {
            "config": {
                "data_sources": {}  # Missing required fields
            },
            "metadata": {
                "project_id": "proj_123",
                "project_name": "Test",
                "environment": "development",
                "nodegx_version": "2.0.0"
            }
        }
        
        response = await client.post("/nodegx/config", json=config)
        data = response.json()
        
        assert data["success"] is False
        assert len(data["errors"]) > 0
    
    async def test_config_returns_warnings(self, client):
        config = {
            "config": {
                "data_sources": {
                    "primary_backend": "backend_123",
                    "attendee_collection": "attendees",
                    "attendee_mappings": {
                        "name": "full_name",
                        "email": "email_address"
                        # Missing bio - should warn
                    }
                },
                "vector_db": {
                    "qdrant_url": "http://localhost:6333"
                },
                "llm": {
                    "provider": "anthropic",
                    "anthropic_api_key": "test-key"
                }
            },
            "metadata": {
                "project_id": "proj_123",
                "project_name": "Test",
                "environment": "development",
                "nodegx_version": "2.0.0"
            }
        }
        
        response = await client.post("/nodegx/config", json=config)
        data = response.json()
        
        assert data["success"] is True
        assert len(data["warnings"]) > 0
        assert any("bio" in w["field"] for w in data["warnings"])


class TestHealthEndpoint:
    async def test_health_check(self, client):
        response = await client.get("/health")
        assert response.status_code == 200
        
        data = response.json()
        assert "status" in data
        assert "version" in data


class TestDebugStream:
    async def test_websocket_connects(self, client):
        # This would use websocket testing utilities
        pass
    
    async def test_events_streamed(self, client):
        # Test that agent events appear in stream
        pass

Acceptance Criteria

  • Schema endpoint tests pass
  • Config endpoint tests pass
  • Health endpoint tests pass
  • Debug stream tests pass
  • Error cases covered
  • Performance tests pass

UBA-028: Erleah Documentation

Effort: 2 days
Assignee: TBD
Branch: feature/uba-028-erleah-docs

Description

Create Erleah-specific documentation for setup and usage.

Documentation Structure

docs/erleah/
├── README.md              # Overview
├── setup.md               # Installation and setup
├── configuration.md       # All config options explained
├── tools.md               # Agent tools reference
├── debugging.md           # Using the debug panel
└── troubleshooting.md     # Common issues

Key Content

# Erleah AI Agent - Setup Guide

## Prerequisites

- Python 3.11+
- Qdrant (local or cloud)
- Directus or Supabase with conference data
- Anthropic or OpenAI API key

## Quick Start

### 1. Clone and Install

```bash
git clone https://github.com/visualhive/erleah.git
cd erleah
pip install -r requirements.txt

2. Start the Server

uvicorn erleah.main:app --host 0.0.0.0 --port 8000

3. Connect from Nodegx

  1. Open your Nodegx project
  2. Go to Backend Services
  3. Click "Add Backend" → "Schema-Configured Backend"
  4. Enter: http://localhost:8000
  5. Configure your data sources and API keys

Configuration Reference

Data Sources

Field Type Required Description
primary_backend backend_reference Yes Your Directus/Supabase instance
attendee_collection string Yes Collection containing attendees
attendee_mappings field_mapping Yes Map your fields to Erleah format

Full reference →

Troubleshooting

"Backend unreachable"

  1. Ensure the Erleah server is running
  2. Check the URL is correct
  3. Verify no firewall blocking

"Semantic search not working"

  1. Check Qdrant is running and accessible
  2. Verify embedding API key is set
  3. Check if indexing completed

---

## Phase 6E Checklist

### UBA-023: Erleah Schema
- [ ] Complete schema file
- [ ] All sections defined
- [ ] Debug schema complete
- [ ] Schema validates

### UBA-024: Config Endpoint
- [ ] Endpoint implemented
- [ ] Validation working
- [ ] Config persisted
- [ ] Hot reload working
- [ ] Runtime values returned

### UBA-025: Debug Stream
- [ ] WebSocket endpoint
- [ ] SSE fallback
- [ ] Agent instrumentation
- [ ] All event types

### UBA-026: Schema Endpoint
- [ ] Endpoint implemented
- [ ] ETag caching
- [ ] Proper headers

### UBA-027: E2E Testing
- [ ] Schema tests
- [ ] Config tests
- [ ] Health tests
- [ ] Debug tests

### UBA-028: Documentation
- [ ] Setup guide
- [ ] Configuration reference
- [ ] Troubleshooting

---

## Success Criteria

- [ ] Full UBA integration working
- [ ] Debug panel shows real agent execution
- [ ] Configuration changes apply in real-time
- [ ] Documentation enables self-service setup