Phase 5: Embedded Real-Time Hub
Goal
Built-in SSE and WebSocket hub inside bext — plugins and apps can push events to connected browsers without separate infrastructure. Inspired by FrankenPHP's embedded Mercure hub.
Current State
- Turbo.js intercepts navigations (HTML-over-the-wire)
- Server Islands stream deferred components
- No WebSocket support
- No SSE endpoint
- No mechanism for server → browser push
- Real-time requires external infrastructure (Redis Pub/Sub, WebSocket server)
Why This Matters
| Use case | Without hub | With hub |
|---|---|---|
| ISR cache invalidation | Browser polls or user manually refreshes | Server pushes "page updated" → Turbo.js auto-refetches |
| Deploy notifications | Nothing | Server pushes "new version deployed" → banner |
| Plugin events | Not possible | Plugin calls bext_publish("analytics", data) → dashboard live-updates |
| Dev HMR | File watcher + custom WS server | Built-in WS on /__bext/hmr |
| Multi-user collaboration | External WebSocket | Apps subscribe to topics, server relays |
Design
Two Protocols
Server-Sent Events (SSE) — server → browser only, uses EventSource API (built into all browsers, no library needed). Best for: cache invalidation, deploy notifications, live metrics.
WebSocket — bidirectional. Best for: dev HMR, collaborative editing, chat. WebSocket support is required to fully replace nginx (which proxies WS via Upgrade header).
Topic-Based Pub/Sub
Topics:
system/deploy — deployment events (all apps)
system/cache — cache invalidation events
app/{app_id}/events — per-app custom events
plugin/{plugin_id} — per-plugin events
user/{user_id} — per-user notifications
custom/{topic} — app-defined topics
Clients subscribe to topics. Server publishes to topics. Messages fan out to all subscribers of that topic.
Architecture
┌──────────────────────────────┐
│ BextHub │
│ │
SSE clients ───▶│ TopicRegistry │◀── Plugin publish
│ ├─ system/deploy │◀── Cache invalidation
WS clients ───▶│ ├─ app/marketing/events │◀── App publish
│ ├─ user/abc123 │◀── Deploy events
│ └─ custom/live-scores │
│ │
│ Cross-Instance (Redis PS) │
└──────────────────────────────┘
Endpoints
| Method | Path | Protocol | Purpose |
|---|---|---|---|
| GET | /__bext/events |
SSE | Subscribe to topics (query: ?topics=system/deploy,app/marketing/events) |
| GET | /__bext/ws |
WebSocket | Bidirectional connection |
| POST | /__bext/publish |
HTTP | Publish event to topic (internal/plugin use) |
| GET | /__bext/hub/stats |
HTTP | Hub statistics (connections, topics, messages/sec) |
SSE Protocol
GET /__bext/events?topics=system/deploy,system/cache HTTP/2
Accept: text/event-stream
HTTP/2 200
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
event: system/deploy
data: {"app":"marketing","version":"v1.2.3","status":"live"}
id: evt-001
event: system/cache
data: {"pattern":"/products/*","action":"purged","count":42}
id: evt-002
: heartbeat
WebSocket Protocol
// Client → Server: Subscribe
{"type": "subscribe", "topics": ["system/deploy", "app/marketing/events"]}
// Client → Server: Unsubscribe
{"type": "unsubscribe", "topics": ["system/deploy"]}
// Client → Server: Publish (authorized only)
{"type": "publish", "topic": "custom/live-scores", "data": {"home": 2, "away": 1}}
// Server → Client: Event
{"type": "event", "topic": "system/deploy", "data": {...}, "id": "evt-001"}
// Server → Client: Heartbeat (every 30s)
{"type": "ping"}
// Client → Server: Pong
{"type": "pong"}
Implementation
New Crate: bext-realtime
bext-realtime/
src/
lib.rs # Public API
hub.rs # BextHub: topic registry, subscriber management
sse.rs # SSE handler (actix-web streaming)
ws.rs # WebSocket handler (actix-web-actors)
topic.rs # Topic matching (exact, wildcard: app/*)
message.rs # Event types, serialization
auth.rs # Subscriber authorization (topic-level)
redis.rs # Cross-instance relay via Redis Pub/Sub
stats.rs # Connection/message counters
BextHub Core
pub struct BextHub {
/// Topic → set of subscriber channels
topics: DashMap<String, Vec>,
/// Event ID counter (monotonic)
next_id: AtomicU64,
/// Optional Redis relay for multi-instance
redis: Option<Arc>,
/// Stats
stats: HubStats,
}
impl BextHub {
pub fn subscribe(&self, topics: &[String]) -> EventStream { ... }
pub fn publish(&self, topic: &str, data: serde_json::Value) { ... }
pub fn unsubscribe(&self, subscriber_id: u64) { ... }
pub fn stats(&self) -> HubStatsSnapshot { ... }
}
Turbo.js Integration
Extend the existing Turbo.js client script to auto-connect to the SSE hub and handle page refresh events:
// In /__bext/turbo.js (existing ~1.5KB script)
const events = new EventSource('/__bext/events?topics=system/cache');
events.addEventListener('system/cache', (e) => {
const data = JSON.parse(e.data);
if (data.pattern && location.pathname.match(patternToRegex(data.pattern))) {
// Page was invalidated — refetch via Turbo
BextTurbo.navigate(location.href);
}
});
Plugin Host Function
Add bext_publish to the WASM/QuickJS host function API:
// WASM host function
fn bext_publish(topic: &str, data: &str) -> Result<()> {
// Validate topic is within plugin's allowed namespace
// plugin/{plugin_id}/** only, unless admin permission
hub.publish(topic, serde_json::from_str(data)?);
Ok(())
}
Cross-Instance via Redis Pub/Sub
When Redis is configured, the hub relays events through Redis Pub/Sub so all bext instances see all events:
Instance A publishes to "system/deploy"
→ Redis PUBLISH bext:events:system/deploy {data}
→ Instance B's Redis subscriber receives it
→ Instance B fans out to its local SSE/WS clients
This reuses the existing Redis infrastructure from the L2 cache backend.
Authorization
Not all topics are public. Authorization rules:
| Topic pattern | Who can subscribe | Who can publish |
|---|---|---|
system/* |
Authenticated users | bext internals only |
app/{app_id}/* |
Users with app access | App code, plugins |
plugin/{plugin_id}/* |
Anyone | That plugin only |
user/{user_id}/* |
That user only | bext internals, plugins |
custom/* |
Configurable | Configurable |
Connection Lifecycle
- Heartbeat: Server sends
: heartbeat\n\nevery 30 seconds (SSE) or{"type":"ping"}(WS) - Reconnect: SSE clients auto-reconnect (browser
EventSourcebuilt-in). WS clients should implement exponential backoff. - Last-Event-ID: SSE supports
Last-Event-IDheader on reconnect. Hub replays missed events from a bounded buffer (last 100 events per topic). - Max connections: Configurable per-instance limit (default: 10,000 concurrent SSE/WS connections).
- Cleanup: Dead connections detected via heartbeat timeout, removed from topic subscriptions.
Config Reference
[server.realtime]
enabled = true # Enable SSE/WS hub (default: true)
max_connections = 10000 # Max concurrent connections
heartbeat_interval_ms = 30000 # Heartbeat period
replay_buffer_size = 100 # Events to replay on reconnect
allowed_origins = ["*"] # CORS for WS upgrade
[server.realtime.redis]
enabled = true # Cross-instance relay (uses existing Redis config)
# Topic authorization (optional)
[[server.realtime.topics]]
pattern = "custom/live-*"
subscribe = "public" # public | authenticated | role:admin
publish = "authenticated"
Testing Plan
| Test | Type | What it validates |
|---|---|---|
| Hub publish/subscribe | Unit | Message reaches subscriber |
| Topic wildcard matching | Unit | app/* matches app/marketing/events |
| SSE stream format | Unit | Valid SSE syntax (event, data, id fields) |
| WS message protocol | Unit | JSON messages parsed and dispatched |
| Multi-subscriber fanout | Unit | 100 subscribers all receive event |
| Last-Event-ID replay | Unit | Reconnecting client gets missed events |
| Heartbeat | Integration | Client receives periodic heartbeats |
| Dead connection cleanup | Unit | Timed-out connections removed |
| Redis relay | Integration | Event published on instance A reaches clients on instance B |
| Auth enforcement | Unit | Unauthorized subscribe rejected |
| Plugin publish | Integration | WASM plugin publishes, SSE client receives |
| Turbo.js cache invalidation | Integration | Cache purge triggers page reload via SSE |
| Connection limit | Unit | Excess connections rejected with 503 |
| Hub stats endpoint | Unit | Returns connection count, topic count, msg/sec |
Dependencies
| Crate | Purpose |
|---|---|
actix-web-actors |
WebSocket actor support (already available in actix-web) |
tokio::sync::broadcast |
In-process pub/sub channel |
| Existing Redis infrastructure | Cross-instance relay |
Done Criteria
- SSE endpoint at
/__bext/eventswith topic subscription - WebSocket endpoint at
/__bext/wswith bidirectional messaging - Topic-based pub/sub with wildcard matching
- Plugin host function
bext_publish(topic, data) - Turbo.js auto-refresh on cache invalidation events
- Cross-instance relay via Redis Pub/Sub
- Last-Event-ID replay on reconnect
- Heartbeat and dead connection cleanup
- Authorization rules per topic
- Hub stats endpoint
- All tests passing