Phase 5: Embedded Real-Time Hub

Goal

Built-in SSE and WebSocket hub inside bext — plugins and apps can push events to connected browsers without separate infrastructure. Inspired by FrankenPHP's embedded Mercure hub.

Current State

  • Turbo.js intercepts navigations (HTML-over-the-wire)
  • Server Islands stream deferred components
  • No WebSocket support
  • No SSE endpoint
  • No mechanism for server → browser push
  • Real-time requires external infrastructure (Redis Pub/Sub, WebSocket server)

Why This Matters

Use case Without hub With hub
ISR cache invalidation Browser polls or user manually refreshes Server pushes "page updated" → Turbo.js auto-refetches
Deploy notifications Nothing Server pushes "new version deployed" → banner
Plugin events Not possible Plugin calls bext_publish("analytics", data) → dashboard live-updates
Dev HMR File watcher + custom WS server Built-in WS on /__bext/hmr
Multi-user collaboration External WebSocket Apps subscribe to topics, server relays

Design

Two Protocols

Server-Sent Events (SSE) — server → browser only, uses EventSource API (built into all browsers, no library needed). Best for: cache invalidation, deploy notifications, live metrics.

WebSocket — bidirectional. Best for: dev HMR, collaborative editing, chat. WebSocket support is required to fully replace nginx (which proxies WS via Upgrade header).

Topic-Based Pub/Sub

Topics:
  system/deploy          — deployment events (all apps)
  system/cache           — cache invalidation events
  app/{app_id}/events    — per-app custom events
  plugin/{plugin_id}     — per-plugin events
  user/{user_id}         — per-user notifications
  custom/{topic}         — app-defined topics

Clients subscribe to topics. Server publishes to topics. Messages fan out to all subscribers of that topic.

Architecture

                  ┌──────────────────────────────┐
                  │        BextHub               │
                  │                              │
  SSE clients ───▶│  TopicRegistry               │◀── Plugin publish
                  │    ├─ system/deploy           │◀── Cache invalidation
  WS clients  ───▶│    ├─ app/marketing/events   │◀── App publish
                  │    ├─ user/abc123             │◀── Deploy events
                  │    └─ custom/live-scores      │
                  │                              │
                  │  Cross-Instance (Redis PS)    │
                  └──────────────────────────────┘

Endpoints

Method Path Protocol Purpose
GET /__bext/events SSE Subscribe to topics (query: ?topics=system/deploy,app/marketing/events)
GET /__bext/ws WebSocket Bidirectional connection
POST /__bext/publish HTTP Publish event to topic (internal/plugin use)
GET /__bext/hub/stats HTTP Hub statistics (connections, topics, messages/sec)

SSE Protocol

GET /__bext/events?topics=system/deploy,system/cache HTTP/2
Accept: text/event-stream

HTTP/2 200
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

event: system/deploy
data: {"app":"marketing","version":"v1.2.3","status":"live"}
id: evt-001

event: system/cache
data: {"pattern":"/products/*","action":"purged","count":42}
id: evt-002

: heartbeat

WebSocket Protocol

// Client → Server: Subscribe
{"type": "subscribe", "topics": ["system/deploy", "app/marketing/events"]}

// Client → Server: Unsubscribe
{"type": "unsubscribe", "topics": ["system/deploy"]}

// Client → Server: Publish (authorized only)
{"type": "publish", "topic": "custom/live-scores", "data": {"home": 2, "away": 1}}

// Server → Client: Event
{"type": "event", "topic": "system/deploy", "data": {...}, "id": "evt-001"}

// Server → Client: Heartbeat (every 30s)
{"type": "ping"}

// Client → Server: Pong
{"type": "pong"}

Implementation

New Crate: bext-realtime

bext-realtime/
  src/
    lib.rs           # Public API
    hub.rs           # BextHub: topic registry, subscriber management
    sse.rs           # SSE handler (actix-web streaming)
    ws.rs            # WebSocket handler (actix-web-actors)
    topic.rs         # Topic matching (exact, wildcard: app/*)
    message.rs       # Event types, serialization
    auth.rs          # Subscriber authorization (topic-level)
    redis.rs         # Cross-instance relay via Redis Pub/Sub
    stats.rs         # Connection/message counters

BextHub Core

pub struct BextHub {
    /// Topic → set of subscriber channels
    topics: DashMap<String, Vec>,
    /// Event ID counter (monotonic)
    next_id: AtomicU64,
    /// Optional Redis relay for multi-instance
    redis: Option<Arc>,
    /// Stats
    stats: HubStats,
}

impl BextHub {
    pub fn subscribe(&self, topics: &[String]) -> EventStream { ... }
    pub fn publish(&self, topic: &str, data: serde_json::Value) { ... }
    pub fn unsubscribe(&self, subscriber_id: u64) { ... }
    pub fn stats(&self) -> HubStatsSnapshot { ... }
}

Turbo.js Integration

Extend the existing Turbo.js client script to auto-connect to the SSE hub and handle page refresh events:

// In /__bext/turbo.js (existing ~1.5KB script)
const events = new EventSource('/__bext/events?topics=system/cache');
events.addEventListener('system/cache', (e) => {
    const data = JSON.parse(e.data);
    if (data.pattern && location.pathname.match(patternToRegex(data.pattern))) {
        // Page was invalidated — refetch via Turbo
        BextTurbo.navigate(location.href);
    }
});

Plugin Host Function

Add bext_publish to the WASM/QuickJS host function API:

// WASM host function
fn bext_publish(topic: &str, data: &str) -> Result<()> {
    // Validate topic is within plugin's allowed namespace
    // plugin/{plugin_id}/** only, unless admin permission
    hub.publish(topic, serde_json::from_str(data)?);
    Ok(())
}

Cross-Instance via Redis Pub/Sub

When Redis is configured, the hub relays events through Redis Pub/Sub so all bext instances see all events:

Instance A publishes to "system/deploy"
  → Redis PUBLISH bext:events:system/deploy {data}
  → Instance B's Redis subscriber receives it
  → Instance B fans out to its local SSE/WS clients

This reuses the existing Redis infrastructure from the L2 cache backend.

Authorization

Not all topics are public. Authorization rules:

Topic pattern Who can subscribe Who can publish
system/* Authenticated users bext internals only
app/{app_id}/* Users with app access App code, plugins
plugin/{plugin_id}/* Anyone That plugin only
user/{user_id}/* That user only bext internals, plugins
custom/* Configurable Configurable

Connection Lifecycle

  • Heartbeat: Server sends : heartbeat\n\n every 30 seconds (SSE) or {"type":"ping"} (WS)
  • Reconnect: SSE clients auto-reconnect (browser EventSource built-in). WS clients should implement exponential backoff.
  • Last-Event-ID: SSE supports Last-Event-ID header on reconnect. Hub replays missed events from a bounded buffer (last 100 events per topic).
  • Max connections: Configurable per-instance limit (default: 10,000 concurrent SSE/WS connections).
  • Cleanup: Dead connections detected via heartbeat timeout, removed from topic subscriptions.

Config Reference

[server.realtime]
enabled = true                        # Enable SSE/WS hub (default: true)
max_connections = 10000               # Max concurrent connections
heartbeat_interval_ms = 30000         # Heartbeat period
replay_buffer_size = 100              # Events to replay on reconnect
allowed_origins = ["*"]               # CORS for WS upgrade

[server.realtime.redis]
enabled = true                        # Cross-instance relay (uses existing Redis config)

# Topic authorization (optional)
[[server.realtime.topics]]
pattern = "custom/live-*"
subscribe = "public"                  # public | authenticated | role:admin
publish = "authenticated"

Testing Plan

Test Type What it validates
Hub publish/subscribe Unit Message reaches subscriber
Topic wildcard matching Unit app/* matches app/marketing/events
SSE stream format Unit Valid SSE syntax (event, data, id fields)
WS message protocol Unit JSON messages parsed and dispatched
Multi-subscriber fanout Unit 100 subscribers all receive event
Last-Event-ID replay Unit Reconnecting client gets missed events
Heartbeat Integration Client receives periodic heartbeats
Dead connection cleanup Unit Timed-out connections removed
Redis relay Integration Event published on instance A reaches clients on instance B
Auth enforcement Unit Unauthorized subscribe rejected
Plugin publish Integration WASM plugin publishes, SSE client receives
Turbo.js cache invalidation Integration Cache purge triggers page reload via SSE
Connection limit Unit Excess connections rejected with 503
Hub stats endpoint Unit Returns connection count, topic count, msg/sec

Dependencies

Crate Purpose
actix-web-actors WebSocket actor support (already available in actix-web)
tokio::sync::broadcast In-process pub/sub channel
Existing Redis infrastructure Cross-instance relay

Done Criteria

  • SSE endpoint at /__bext/events with topic subscription
  • WebSocket endpoint at /__bext/ws with bidirectional messaging
  • Topic-based pub/sub with wildcard matching
  • Plugin host function bext_publish(topic, data)
  • Turbo.js auto-refresh on cache invalidation events
  • Cross-instance relay via Redis Pub/Sub
  • Last-Event-ID replay on reconnect
  • Heartbeat and dead connection cleanup
  • Authorization rules per topic
  • Hub stats endpoint
  • All tests passing