this repo has no description
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add SSE event streaming from appview to SDK

Appview: PubSub broadcast after indexer dispatch, token exchange endpoint,
SSE stream with chunked response, keepalive, dynamic keyring subscription,
per-DID connection cap, crash-safe cleanup.

Rust/WASM: request_sse_token binding for Ed25519-signed token exchange.

SDK: EventStream class with Zod validation, exponential backoff with jitter,
Opake.subscribe() and requestSseToken() methods.

Web: SSE in cabinet layout with targeted reloads, debounced proposal sync,
directory-sync daemon task replaced by SSE-driven sync.

+1205 -12
+3
apps/appview/config/config.exs
··· 16 16 config :opake_appview, OpakeAppviewWeb.Endpoint, 17 17 url: [host: "localhost"], 18 18 adapter: Bandit.PhoenixAdapter, 19 + # SSE connections are long-lived — raise the idle timeout so keepalive 20 + # chunks (every 15s) don't race the default limit. 21 + thousand_island_options: [read_timeout: 86_400_000], 19 22 render_errors: [ 20 23 formats: [json: OpakeAppviewWeb.ErrorJSON], 21 24 layout: false
+4
apps/appview/lib/opake_appview/application.ex
··· 16 16 def start(_type, _args) do 17 17 OpakeAppview.Indexer.init_state() 18 18 OpakeAppview.Jetstream.Compression.init() 19 + OpakeAppview.SSE.TokenStore.init_table() 20 + OpakeAppview.SSE.ConnectionTracker.init_table() 19 21 20 22 children = 21 23 [ 22 24 OpakeAppview.Repo, 25 + {Phoenix.PubSub, name: OpakeAppview.PubSub}, 26 + OpakeAppview.SSE.TokenStore, 23 27 OpakeAppview.Auth.KeyCache, 24 28 OpakeAppviewWeb.Endpoint 25 29 ] ++
+17
apps/appview/lib/opake_appview/indexer.ex
··· 53 53 KeyringQueries 54 54 } 55 55 56 + alias OpakeAppview.SSE.Broadcaster 57 + 56 58 @telemetry_prefix [:opake_appview, :indexer] 57 59 58 60 # -- State init -- ··· 133 135 134 136 log_query_error(result, "grant upsert", attrs.uri) 135 137 emit_event_telemetry("app.opake.grant", :upsert, status_of(result)) 138 + Broadcaster.broadcast_grant(attrs, :upsert) 136 139 end 137 140 138 141 defp dispatch({:delete_grant, %{uri: uri}}, _time_us, _now) do 139 142 Logger.info("[Indexer] grant delete: #{uri}") 140 143 GrantQueries.delete_grant(uri) 141 144 emit_event_telemetry("app.opake.grant", :delete, :ok) 145 + Broadcaster.broadcast_grant(%{uri: uri}, :delete) 142 146 end 143 147 144 148 defp dispatch({:upsert_keyring, attrs}, _time_us, _now) do ··· 159 163 :upsert, 160 164 worst_status([members_result, record_result]) 161 165 ) 166 + 167 + Broadcaster.broadcast_keyring(attrs, :upsert) 162 168 end 163 169 164 170 defp dispatch({:delete_keyring, %{uri: uri}}, _time_us, _now) do 165 171 Logger.info("[Indexer] keyring delete: #{uri}") 166 172 KeyringQueries.delete_keyring(uri) 167 173 emit_event_telemetry("app.opake.keyring", :delete, :ok) 174 + Broadcaster.broadcast_keyring(%{uri: uri, owner_did: nil}, :delete) 168 175 end 169 176 170 177 defp dispatch({:upsert_directory, attrs}, _time_us, now) do ··· 184 191 185 192 log_query_error(result, "directory upsert", attrs.directory_uri) 186 193 emit_event_telemetry("app.opake.directory", :upsert, status_of(result)) 194 + Broadcaster.broadcast_directory(attrs, :upsert) 187 195 end 188 196 189 197 defp dispatch({:delete_directory, %{directory_uri: directory_uri}}, _time_us, now) do 190 198 Logger.info("[Indexer] directory delete (soft): #{directory_uri}") 191 199 DirectoryQueries.soft_delete_directory(directory_uri, now) 192 200 emit_event_telemetry("app.opake.directory", :delete, :ok) 201 + Broadcaster.broadcast_directory(%{directory_uri: directory_uri, owner_did: nil}, :delete) 193 202 end 194 203 195 204 defp dispatch({:upsert_document, attrs}, _time_us, now) do ··· 210 219 211 220 log_query_error(result, "document upsert", attrs.document_uri) 212 221 emit_event_telemetry("app.opake.document", :upsert, status_of(result)) 222 + Broadcaster.broadcast_document(attrs, :upsert) 213 223 end 214 224 215 225 defp dispatch({:delete_document, %{document_uri: document_uri}}, _time_us, now) do 216 226 Logger.info("[Indexer] document delete (soft): #{document_uri}") 217 227 DocumentQueries.soft_delete_document(document_uri, now) 218 228 emit_event_telemetry("app.opake.document", :delete, :ok) 229 + Broadcaster.broadcast_document(%{document_uri: document_uri, owner_did: nil}, :delete) 219 230 end 220 231 221 232 defp dispatch({:upsert_document_update, attrs}, _time_us, now) do ··· 232 243 233 244 log_query_error(result, "document update upsert", attrs.uri) 234 245 emit_event_telemetry("app.opake.documentUpdate", :upsert, status_of(result)) 246 + Broadcaster.broadcast_document_update(attrs, :upsert) 235 247 end 236 248 237 249 defp dispatch({:delete_document_update, %{uri: uri}}, _time_us, _now) do 238 250 Logger.info("[Indexer] document update delete: #{uri}") 239 251 DocumentUpdateQueries.delete_document_update(uri) 240 252 emit_event_telemetry("app.opake.documentUpdate", :delete, :ok) 253 + Broadcaster.broadcast_document_update(%{uri: uri}, :delete) 241 254 end 242 255 243 256 defp dispatch({:upsert_directory_update, attrs}, _time_us, now) do ··· 262 275 263 276 log_query_error(result, "directory update upsert", attrs.uri) 264 277 emit_event_telemetry("app.opake.directoryUpdate", :upsert, status_of(result)) 278 + Broadcaster.broadcast_directory_update(attrs, :upsert) 265 279 end 266 280 267 281 defp dispatch({:delete_directory_update, %{uri: uri}}, _time_us, _now) do 268 282 Logger.info("[Indexer] directory update delete: #{uri}") 269 283 DirectoryQueries.delete_directory_update(uri) 270 284 emit_event_telemetry("app.opake.directoryUpdate", :delete, :ok) 285 + Broadcaster.broadcast_directory_update(%{uri: uri}, :delete) 271 286 end 272 287 273 288 defp dispatch({:upsert_keyring_update, attrs}, _time_us, now) do ··· 297 312 end 298 313 299 314 emit_event_telemetry("app.opake.keyringUpdate", :upsert, status_of(result)) 315 + Broadcaster.broadcast_keyring_update(attrs, :upsert) 300 316 end 301 317 302 318 defp dispatch({:delete_keyring_update, %{uri: uri}}, _time_us, _now) do 303 319 Logger.info("[Indexer] keyring update delete: #{uri}") 304 320 KeyringQueries.delete_keyring_update(uri) 305 321 emit_event_telemetry("app.opake.keyringUpdate", :delete, :ok) 322 + Broadcaster.broadcast_keyring_update(%{uri: uri}, :delete) 306 323 end 307 324 308 325 # Heartbeat-only path: log the account config write but don't persist.
+179
apps/appview/lib/opake_appview/sse/broadcaster.ex
··· 1 + defmodule OpakeAppview.SSE.Broadcaster do 2 + @moduledoc """ 3 + Broadcasts indexed events to SSE subscribers via Phoenix PubSub. 4 + 5 + Called from the indexer after each successful DB write. Fire-and-forget — 6 + broadcast failures are logged at warning level but never propagate. 7 + 8 + Formatting uses bracket-access (`attrs[:key]`) because the indexer passes 9 + raw event attrs (plain maps from the firehose parser), not Ecto schema 10 + structs. TreeHelpers formatters use dot access and require fields like 11 + `indexed_at` that only exist on DB records — they can't be reused here. 12 + """ 13 + 14 + require Logger 15 + 16 + alias OpakeAppview.SSE.Topics 17 + alias OpakeAppviewWeb.TreeHelpers 18 + 19 + @pubsub OpakeAppview.PubSub 20 + 21 + # -- Public API -- 22 + 23 + def broadcast_directory(attrs, action) do 24 + payload = if action == :upsert, do: format_directory(attrs), else: %{directory_uri: get(attrs, :directory_uri)} 25 + broadcast_owned(attrs, "directory", action, payload) 26 + rescue 27 + e -> Logger.warning("[Broadcaster] directory broadcast failed: #{inspect(e)}") 28 + end 29 + 30 + def broadcast_document(attrs, action) do 31 + payload = if action == :upsert, do: format_document(attrs), else: %{document_uri: get(attrs, :document_uri)} 32 + broadcast_owned(attrs, "document", action, payload) 33 + rescue 34 + e -> Logger.warning("[Broadcaster] document broadcast failed: #{inspect(e)}") 35 + end 36 + 37 + def broadcast_keyring(attrs, action) do 38 + uri = get(attrs, :uri) 39 + owner = get(attrs, :owner_did) 40 + payload = if action == :upsert, do: format_keyring(attrs), else: %{uri: uri} 41 + event_type = "keyring:#{action}" 42 + 43 + # Broadcast to the workspace topic (existing subscribers) 44 + if uri, do: broadcast(Topics.workspace(uri), event_type, payload) 45 + # Broadcast to the owner's personal topic 46 + if owner, do: broadcast(Topics.personal(owner), event_type, payload) 47 + 48 + # Broadcast to each member's personal topic so newly-added members 49 + # can discover the keyring and subscribe to its workspace topic. 50 + if action == :upsert do 51 + for entry <- get(attrs, :member_entries) || [] do 52 + did = entry[:did] || entry["did"] 53 + if did && did != owner, do: broadcast(Topics.personal(did), event_type, payload) 54 + end 55 + end 56 + rescue 57 + e -> Logger.warning("[Broadcaster] keyring broadcast failed: #{inspect(e)}") 58 + end 59 + 60 + def broadcast_grant(attrs, action) do 61 + payload = 62 + case action do 63 + :upsert -> TreeHelpers.format_grant(attrs) 64 + :delete -> %{uri: get(attrs, :uri)} 65 + end 66 + 67 + if owner = get(attrs, :owner_did), do: broadcast(Topics.personal(owner), "grant:#{action}", payload) 68 + if action == :upsert, do: maybe_broadcast_recipient(attrs, payload) 69 + rescue 70 + e -> Logger.warning("[Broadcaster] grant broadcast failed: #{inspect(e)}") 71 + end 72 + 73 + def broadcast_directory_update(attrs, action) do 74 + payload = 75 + case action do 76 + :upsert -> format_proposal(attrs) 77 + :delete -> %{uri: get(attrs, :uri)} 78 + end 79 + 80 + if kr = get(attrs, :keyring_uri), do: broadcast(Topics.workspace(kr), "directory_update:#{action}", payload) 81 + rescue 82 + e -> Logger.warning("[Broadcaster] directory_update broadcast failed: #{inspect(e)}") 83 + end 84 + 85 + def broadcast_keyring_update(attrs, action) do 86 + payload = 87 + case action do 88 + :upsert -> format_keyring_proposal(attrs) 89 + :delete -> %{uri: get(attrs, :uri)} 90 + end 91 + 92 + if kr = get(attrs, :keyring_uri), do: broadcast(Topics.workspace(kr), "keyring_update:#{action}", payload) 93 + rescue 94 + e -> Logger.warning("[Broadcaster] keyring_update broadcast failed: #{inspect(e)}") 95 + end 96 + 97 + def broadcast_document_update(attrs, action) do 98 + payload = 99 + case action do 100 + :upsert -> format_document_proposal(attrs) 101 + :delete -> %{uri: get(attrs, :uri)} 102 + end 103 + 104 + if did = get(attrs, :author_did), do: broadcast(Topics.personal(did), "document_update:#{action}", payload) 105 + rescue 106 + e -> Logger.warning("[Broadcaster] document_update broadcast failed: #{inspect(e)}") 107 + end 108 + 109 + # -- Internal -- 110 + 111 + defp broadcast_owned(attrs, prefix, action, payload) do 112 + event_type = "#{prefix}:#{action}" 113 + 114 + case get(attrs, :keyring_uri) do 115 + nil -> if did = get(attrs, :owner_did), do: broadcast(Topics.personal(did), event_type, payload) 116 + kr -> broadcast(Topics.workspace(kr), event_type, payload) 117 + end 118 + end 119 + 120 + defp broadcast(topic, event_type, payload) do 121 + Phoenix.PubSub.broadcast(@pubsub, topic, {:sse_event, event_type, payload}) 122 + end 123 + 124 + defp maybe_broadcast_recipient(attrs, payload) do 125 + if recipient = get(attrs, :recipient_did), do: broadcast(Topics.personal(recipient), "grant:upsert", payload) 126 + end 127 + 128 + defp get(attrs, key), do: attrs[key] 129 + 130 + # -- Formatters (bracket-access safe for raw indexer attrs) -- 131 + 132 + defp format_directory(attrs) do 133 + %{directory_uri: get(attrs, :directory_uri), owner_did: get(attrs, :owner_did), entries: get(attrs, :entries) || []} 134 + |> TreeHelpers.maybe_put(:keyring_uri, get(attrs, :keyring_uri)) 135 + |> TreeHelpers.maybe_put(:encrypted_metadata, get(attrs, :encrypted_metadata)) 136 + |> TreeHelpers.maybe_put(:key_wrapping, get(attrs, :key_wrapping)) 137 + end 138 + 139 + defp format_document(attrs) do 140 + %{document_uri: get(attrs, :document_uri), owner_did: get(attrs, :owner_did)} 141 + |> TreeHelpers.maybe_put(:keyring_uri, get(attrs, :keyring_uri)) 142 + |> TreeHelpers.maybe_put(:rotation, get(attrs, :rotation)) 143 + |> TreeHelpers.maybe_put(:encrypted_metadata, get(attrs, :encrypted_metadata)) 144 + |> TreeHelpers.maybe_put(:encryption, get(attrs, :encryption)) 145 + |> TreeHelpers.maybe_put(:blob_ref, get(attrs, :blob_ref)) 146 + end 147 + 148 + defp format_keyring(attrs) do 149 + %{uri: get(attrs, :uri), owner_did: get(attrs, :owner_did), rotation: get(attrs, :rotation), member_entries: get(attrs, :member_entries) || []} 150 + |> TreeHelpers.maybe_put(:encrypted_metadata, get(attrs, :encrypted_metadata)) 151 + |> TreeHelpers.maybe_put(:created_at, get(attrs, :created_at)) 152 + end 153 + 154 + # Proposal formatters — safe bracket-access versions of TreeHelpers formatters 155 + # which use dot access and require DB schema fields like indexed_at. 156 + 157 + defp format_proposal(attrs) do 158 + %{uri: get(attrs, :uri), author_did: get(attrs, :author_did), action_type: get(attrs, :action_type)} 159 + |> TreeHelpers.maybe_put(:directory_uri, get(attrs, :directory_uri)) 160 + |> TreeHelpers.maybe_put(:entry_uri, get(attrs, :entry_uri)) 161 + |> TreeHelpers.maybe_put(:encrypted_metadata, get(attrs, :encrypted_metadata)) 162 + |> TreeHelpers.maybe_put(:source_directory_uri, get(attrs, :source_directory_uri)) 163 + |> TreeHelpers.maybe_put(:target_directory_uri, get(attrs, :target_directory_uri)) 164 + |> TreeHelpers.maybe_put(:parent_directory_uri, get(attrs, :parent_directory_uri)) 165 + end 166 + 167 + defp format_keyring_proposal(attrs) do 168 + %{uri: get(attrs, :uri), author_did: get(attrs, :author_did), action_type: get(attrs, :action_type)} 169 + |> TreeHelpers.maybe_put(:member_did, get(attrs, :member_did)) 170 + |> TreeHelpers.maybe_put(:member_public_key, get(attrs, :member_public_key)) 171 + |> TreeHelpers.maybe_put(:role, get(attrs, :role)) 172 + |> TreeHelpers.maybe_put(:encrypted_metadata, get(attrs, :encrypted_metadata)) 173 + end 174 + 175 + defp format_document_proposal(attrs) do 176 + %{uri: get(attrs, :uri), document_uri: get(attrs, :document_uri), author_did: get(attrs, :author_did)} 177 + |> TreeHelpers.maybe_put(:supersedes_uri, get(attrs, :supersedes_uri)) 178 + end 179 + end
+33
apps/appview/lib/opake_appview/sse/connection_tracker.ex
··· 1 + defmodule OpakeAppview.SSE.ConnectionTracker do 2 + @moduledoc """ 3 + Tracks active SSE connections per DID to prevent runaway reconnection 4 + loops from exhausting BEAM processes. Simple ETS counter. 5 + """ 6 + 7 + @table :sse_connections 8 + @max_per_did 5 9 + 10 + @spec init_table() :: :ok 11 + def init_table do 12 + :ets.new(@table, [:named_table, :set, :public, write_concurrency: true]) 13 + :ok 14 + end 15 + 16 + @spec acquire(String.t()) :: :ok | {:error, :limit_reached} 17 + def acquire(did) do 18 + count = :ets.update_counter(@table, did, {2, 1}, {did, 0}) 19 + 20 + if count > @max_per_did do 21 + :ets.update_counter(@table, did, {2, -1}) 22 + {:error, :limit_reached} 23 + else 24 + :ok 25 + end 26 + end 27 + 28 + @spec release(String.t()) :: :ok 29 + def release(did) do 30 + :ets.update_counter(@table, did, {2, -1, 0, 0}, {did, 0}) 31 + :ok 32 + end 33 + end
+84
apps/appview/lib/opake_appview/sse/token_store.ex
··· 1 + defmodule OpakeAppview.SSE.TokenStore do 2 + @moduledoc """ 3 + ETS-backed single-use token store for SSE authentication. 4 + 5 + Tokens are short-lived (60s TTL) and consumed on first use. This 6 + sidesteps EventSource's inability to send custom headers — the client 7 + POSTs with Ed25519 auth to get a token, then passes it as a query 8 + parameter to the SSE endpoint. 9 + """ 10 + 11 + @table :sse_tokens 12 + @ttl_ms 60_000 13 + @cleanup_interval_ms 60_000 14 + 15 + use GenServer 16 + 17 + # -- Public API -- 18 + 19 + @spec init_table() :: :ok 20 + def init_table do 21 + :ets.new(@table, [:named_table, :set, :public, read_concurrency: true]) 22 + :ok 23 + end 24 + 25 + def start_link(_opts) do 26 + GenServer.start_link(__MODULE__, [], name: __MODULE__) 27 + end 28 + 29 + @spec create_token(String.t()) :: String.t() 30 + def create_token(did) when is_binary(did) do 31 + token = 32 + :crypto.strong_rand_bytes(32) 33 + |> Base.url_encode64(padding: false) 34 + 35 + expires_at = System.monotonic_time(:millisecond) + @ttl_ms 36 + :ets.insert(@table, {token, did, expires_at}) 37 + token 38 + end 39 + 40 + @spec consume_token(String.t()) :: {:ok, String.t()} | :error 41 + def consume_token(token) when is_binary(token) do 42 + case :ets.take(@table, token) do 43 + [{^token, did, expires_at}] -> 44 + if System.monotonic_time(:millisecond) <= expires_at do 45 + {:ok, did} 46 + else 47 + :error 48 + end 49 + 50 + [] -> 51 + :error 52 + end 53 + end 54 + 55 + @spec ttl_seconds() :: integer() 56 + def ttl_seconds, do: div(@ttl_ms, 1000) 57 + 58 + # -- GenServer (periodic cleanup) -- 59 + 60 + @impl true 61 + def init(_) do 62 + schedule_cleanup() 63 + {:ok, %{}} 64 + end 65 + 66 + @impl true 67 + def handle_info(:cleanup, state) do 68 + cleanup_expired() 69 + schedule_cleanup() 70 + {:noreply, state} 71 + end 72 + 73 + defp schedule_cleanup do 74 + Process.send_after(self(), :cleanup, @cleanup_interval_ms) 75 + end 76 + 77 + defp cleanup_expired do 78 + now = System.monotonic_time(:millisecond) 79 + 80 + :ets.select_delete(@table, [ 81 + {{:_, :_, :"$1"}, [{:<, :"$1", now}], [true]} 82 + ]) 83 + end 84 + end
+15
apps/appview/lib/opake_appview/sse/topics.ex
··· 1 + defmodule OpakeAppview.SSE.Topics do 2 + @moduledoc """ 3 + Canonical PubSub topic construction for SSE event routing. 4 + 5 + Two topic tiers: 6 + - `"did:<did>"` — personal events (cabinet, grants, owned keyrings) 7 + - `"keyring:<uri>"` — workspace-scoped events (dirs/docs/proposals) 8 + """ 9 + 10 + @spec personal(String.t()) :: String.t() 11 + def personal(did), do: "did:#{did}" 12 + 13 + @spec workspace(String.t()) :: String.t() 14 + def workspace(keyring_uri), do: "keyring:#{keyring_uri}" 15 + end
+203
apps/appview/lib/opake_appview_web/controllers/events_controller.ex
··· 1 + defmodule OpakeAppviewWeb.EventsController do 2 + @moduledoc """ 3 + SSE event streaming + token exchange endpoint. 4 + 5 + ## Token exchange 6 + 7 + `POST /api/events/token` (Ed25519-authenticated) returns a short-lived 8 + single-use opaque token. The client passes it as `?token=` on the SSE 9 + endpoint, sidestepping EventSource's lack of custom headers. 10 + 11 + ## SSE stream 12 + 13 + `GET /api/events?token=<token>` validates the token, subscribes to 14 + PubSub topics for the authenticated DID, and enters a chunked response 15 + loop. Each indexed event is forwarded as an SSE message. A keepalive 16 + comment is sent every 15 seconds to prevent proxy timeouts. 17 + """ 18 + 19 + use OpakeAppviewWeb, :controller 20 + 21 + require Logger 22 + 23 + alias OpakeAppview.SSE.{TokenStore, ConnectionTracker, Topics} 24 + alias OpakeAppview.Queries.KeyringQueries 25 + 26 + @keepalive_interval_ms 15_000 27 + @pubsub OpakeAppview.PubSub 28 + 29 + # -- Token exchange -- 30 + 31 + @spec create_token(Plug.Conn.t(), map()) :: Plug.Conn.t() 32 + def create_token(conn, _params) do 33 + did = conn.assigns.authenticated_did 34 + token = TokenStore.create_token(did) 35 + json(conn, %{token: token, ttl: TokenStore.ttl_seconds()}) 36 + end 37 + 38 + # -- SSE stream -- 39 + 40 + @spec stream(Plug.Conn.t(), map()) :: Plug.Conn.t() 41 + def stream(conn, %{"token" => token}) do 42 + case TokenStore.consume_token(token) do 43 + {:ok, did} -> 44 + case ConnectionTracker.acquire(did) do 45 + :ok -> 46 + conn 47 + |> put_resp_header("content-type", "text/event-stream") 48 + |> put_resp_header("cache-control", "no-cache") 49 + |> put_resp_header("connection", "keep-alive") 50 + |> put_resp_header("x-accel-buffering", "no") 51 + |> send_chunked(200) 52 + |> subscribe_and_loop(did) 53 + 54 + {:error, :limit_reached} -> 55 + conn 56 + |> put_status(429) 57 + |> json(%{error: "too many SSE connections for this account"}) 58 + end 59 + 60 + :error -> 61 + conn 62 + |> put_status(401) 63 + |> json(%{error: "invalid or expired token"}) 64 + end 65 + end 66 + 67 + def stream(conn, _params) do 68 + conn 69 + |> put_status(400) 70 + |> json(%{error: "token parameter required"}) 71 + end 72 + 73 + # -- SSE loop -- 74 + 75 + defp subscribe_and_loop(conn, did) do 76 + # Subscribe to personal topic 77 + Phoenix.PubSub.subscribe(@pubsub, Topics.personal(did)) 78 + 79 + # Subscribe to all workspace keyrings 80 + {keyrings, _cursor} = KeyringQueries.list_keyrings_for_member(did, limit: 1000) 81 + keyring_uris = MapSet.new(Enum.map(keyrings, & &1.uri)) 82 + 83 + if MapSet.size(keyring_uris) >= 1000 do 84 + Logger.warning("[SSE] #{did} has 1000+ keyrings — SSE subscriptions truncated") 85 + end 86 + 87 + Enum.each(keyring_uris, fn uri -> 88 + Phoenix.PubSub.subscribe(@pubsub, Topics.workspace(uri)) 89 + end) 90 + 91 + timer = schedule_keepalive() 92 + state = %{did: did, subscribed_keyrings: keyring_uris, keepalive_timer: timer} 93 + 94 + # try/after guarantees ConnectionTracker.release even on crash. 95 + # Phoenix requires the action to return a Plug.Conn, so we track it 96 + # through the loop and return it after cleanup. 97 + final_conn = 98 + try do 99 + sse_loop(conn, state) 100 + after 101 + cleanup(state) 102 + end 103 + 104 + final_conn 105 + end 106 + 107 + defp sse_loop(conn, state) do 108 + receive do 109 + {:sse_event, "keyring:upsert" = event_type, payload} -> 110 + # Dynamic membership: check if we need to subscribe/unsubscribe 111 + state = handle_keyring_membership(payload, state) 112 + 113 + case send_sse_event(conn, event_type, payload) do 114 + {:ok, conn} -> sse_loop(conn, state) 115 + {:error, _} -> conn 116 + end 117 + 118 + {:sse_event, "keyring:delete", payload} -> 119 + # Unsubscribe from deleted keyring 120 + uri = payload[:uri] || payload["uri"] 121 + state = unsubscribe_keyring(uri, state) 122 + 123 + case send_sse_event(conn, "keyring:delete", payload) do 124 + {:ok, conn} -> sse_loop(conn, state) 125 + {:error, _} -> conn 126 + end 127 + 128 + {:sse_event, event_type, payload} -> 129 + case send_sse_event(conn, event_type, payload) do 130 + {:ok, conn} -> sse_loop(conn, state) 131 + {:error, _} -> conn 132 + end 133 + 134 + :keepalive -> 135 + case Plug.Conn.chunk(conn, ": keepalive\n\n") do 136 + {:ok, conn} -> 137 + timer = schedule_keepalive() 138 + sse_loop(conn, %{state | keepalive_timer: timer}) 139 + 140 + {:error, _} -> 141 + :ok 142 + end 143 + end 144 + end 145 + 146 + # -- Dynamic membership management -- 147 + 148 + defp handle_keyring_membership(payload, state) do 149 + uri = payload[:uri] || payload["uri"] 150 + members = payload[:member_entries] || payload["member_entries"] || [] 151 + 152 + member_dids = 153 + members 154 + |> Enum.map(fn 155 + %{did: did} -> did 156 + %{"did" => did} -> did 157 + _ -> nil 158 + end) 159 + |> Enum.reject(&is_nil/1) 160 + |> MapSet.new() 161 + 162 + is_member = MapSet.member?(member_dids, state.did) 163 + was_subscribed = uri && MapSet.member?(state.subscribed_keyrings, uri) 164 + 165 + cond do 166 + # New membership — subscribe 167 + is_member and uri and not was_subscribed -> 168 + Phoenix.PubSub.subscribe(@pubsub, Topics.workspace(uri)) 169 + %{state | subscribed_keyrings: MapSet.put(state.subscribed_keyrings, uri)} 170 + 171 + # Removed from membership — unsubscribe 172 + not is_member and was_subscribed -> 173 + unsubscribe_keyring(uri, state) 174 + 175 + true -> 176 + state 177 + end 178 + end 179 + 180 + defp unsubscribe_keyring(nil, state), do: state 181 + 182 + defp unsubscribe_keyring(uri, state) do 183 + Phoenix.PubSub.unsubscribe(@pubsub, Topics.workspace(uri)) 184 + %{state | subscribed_keyrings: MapSet.delete(state.subscribed_keyrings, uri)} 185 + end 186 + 187 + # -- Helpers -- 188 + 189 + defp send_sse_event(conn, event_type, payload) do 190 + data = Jason.encode!(payload) 191 + Plug.Conn.chunk(conn, "event: #{event_type}\ndata: #{data}\n\n") 192 + end 193 + 194 + defp schedule_keepalive do 195 + Process.send_after(self(), :keepalive, @keepalive_interval_ms) 196 + end 197 + 198 + defp cleanup(state) do 199 + if timer = state[:keepalive_timer], do: Process.cancel_timer(timer) 200 + ConnectionTracker.release(state.did) 201 + :ok 202 + end 203 + end
+17 -7
apps/appview/lib/opake_appview_web/controllers/tree_helpers.ex
··· 29 29 30 30 def parse_since(_), do: {:error, "since parameter is required"} 31 31 32 - defp format_directory(dir) do 32 + def format_directory(dir) do 33 33 base = %{ 34 34 directory_uri: dir.directory_uri, 35 35 owner_did: dir.owner_did, ··· 44 44 |> maybe_put(:deleted_at, format_datetime(dir.deleted_at)) 45 45 end 46 46 47 - defp format_document(doc) do 47 + def format_document(doc) do 48 48 base = %{ 49 49 document_uri: doc.document_uri, 50 50 owner_did: doc.owner_did, ··· 60 60 |> maybe_put(:deleted_at, format_datetime(doc.deleted_at)) 61 61 end 62 62 63 - defp format_proposal(update) do 63 + def format_proposal(update) do 64 64 format_update_base(update) 65 65 |> maybe_put(:directory_uri, update.directory_uri) 66 66 |> maybe_put(:entry_uri, update.entry_uri) ··· 88 88 |> maybe_put(:supersedes_uri, update.supersedes_uri) 89 89 end 90 90 91 + def format_grant(grant) do 92 + %{ 93 + uri: grant[:uri] || grant.uri, 94 + owner_did: grant[:owner_did] || grant.owner_did, 95 + document_uri: grant[:document_uri] || grant.document_uri, 96 + created_at: grant[:created_at] || grant.created_at 97 + } 98 + |> maybe_put(:recipient_did, grant[:recipient_did]) 99 + end 100 + 91 101 defp format_update_base(update) do 92 102 %{ 93 103 uri: update.uri, ··· 100 110 defp encode_binary(nil), do: nil 101 111 defp encode_binary(bin) when is_binary(bin), do: Base.encode64(bin) 102 112 103 - defp format_datetime(nil), do: nil 104 - defp format_datetime(%DateTime{} = dt), do: DateTime.to_iso8601(dt) 113 + def format_datetime(nil), do: nil 114 + def format_datetime(%DateTime{} = dt), do: DateTime.to_iso8601(dt) 105 115 106 - defp maybe_put(map, _key, nil), do: map 107 - defp maybe_put(map, key, value), do: Map.put(map, key, value) 116 + def maybe_put(map, _key, nil), do: map 117 + def maybe_put(map, key, value), do: Map.put(map, key, value) 108 118 end
+1 -1
apps/appview/lib/opake_appview_web/plugs/cors.ex
··· 22 22 conn = 23 23 conn 24 24 |> put_resp_header("access-control-allow-origin", origin) 25 - |> put_resp_header("access-control-allow-methods", "GET, OPTIONS") 25 + |> put_resp_header("access-control-allow-methods", "GET, POST, OPTIONS") 26 26 |> put_resp_header("access-control-allow-headers", "authorization, content-type") 27 27 |> put_resp_header("access-control-max-age", "86400") 28 28
+7
apps/appview/lib/opake_appview_web/router.ex
··· 21 21 plug OpakeAppview.Auth.Plug 22 22 end 23 23 24 + # SSE stream — token-authenticated, outside the :authenticated pipeline. 25 + # Rate limiting excluded (long-lived connection, not a burst endpoint). 26 + scope "/api", OpakeAppviewWeb do 27 + get "/events", EventsController, :stream 28 + end 29 + 24 30 scope "/api", OpakeAppviewWeb do 25 31 pipe_through :api 26 32 ··· 28 34 29 35 pipe_through :authenticated 30 36 37 + post "/events/token", EventsController, :create_token 31 38 get "/inbox", InboxController, :index 32 39 get "/keyrings", KeyringsController, :index 33 40
+110
apps/appview/test/opake_appview/sse/broadcaster_test.exs
··· 1 + defmodule OpakeAppview.SSE.BroadcasterTest do 2 + use ExUnit.Case, async: true 3 + 4 + alias OpakeAppview.SSE.Broadcaster 5 + 6 + @pubsub OpakeAppview.PubSub 7 + 8 + setup do 9 + # Ensure PubSub is available (started by Application) 10 + :ok 11 + end 12 + 13 + describe "broadcast_directory/2" do 14 + test "routes cabinet directory to did: topic" do 15 + Phoenix.PubSub.subscribe(@pubsub, OpakeAppview.SSE.Topics.personal("did:plc:owner")) 16 + 17 + Broadcaster.broadcast_directory( 18 + %{ 19 + directory_uri: "at://did:plc:owner/app.opake.directory/3dir", 20 + owner_did: "did:plc:owner", 21 + keyring_uri: nil, 22 + entries: ["at://did:plc:owner/app.opake.document/3a"], 23 + indexed_at: DateTime.utc_now() 24 + }, 25 + :upsert 26 + ) 27 + 28 + assert_receive {:sse_event, "directory:upsert", payload} 29 + assert payload.directory_uri == "at://did:plc:owner/app.opake.directory/3dir" 30 + end 31 + 32 + test "routes workspace directory to keyring: topic" do 33 + keyring = "at://did:plc:owner/app.opake.keyring/3kr" 34 + Phoenix.PubSub.subscribe(@pubsub, OpakeAppview.SSE.Topics.workspace(keyring)) 35 + 36 + Broadcaster.broadcast_directory( 37 + %{ 38 + directory_uri: "at://did:plc:owner/app.opake.directory/3dir", 39 + owner_did: "did:plc:owner", 40 + keyring_uri: keyring, 41 + entries: [], 42 + indexed_at: DateTime.utc_now() 43 + }, 44 + :upsert 45 + ) 46 + 47 + assert_receive {:sse_event, "directory:upsert", payload} 48 + assert payload.directory_uri == "at://did:plc:owner/app.opake.directory/3dir" 49 + end 50 + 51 + test "delete broadcasts directory_uri" do 52 + Phoenix.PubSub.subscribe(@pubsub, OpakeAppview.SSE.Topics.personal("did:plc:owner")) 53 + 54 + Broadcaster.broadcast_directory( 55 + %{ 56 + directory_uri: "at://did:plc:owner/app.opake.directory/3dir", 57 + owner_did: "did:plc:owner", 58 + keyring_uri: nil 59 + }, 60 + :delete 61 + ) 62 + 63 + assert_receive {:sse_event, "directory:delete", %{directory_uri: _}} 64 + end 65 + end 66 + 67 + describe "broadcast_keyring/2" do 68 + test "broadcasts to both keyring: and did: topics" do 69 + uri = "at://did:plc:owner/app.opake.keyring/3kr" 70 + Phoenix.PubSub.subscribe(@pubsub, OpakeAppview.SSE.Topics.workspace(uri)) 71 + Phoenix.PubSub.subscribe(@pubsub, OpakeAppview.SSE.Topics.personal("did:plc:owner")) 72 + 73 + Broadcaster.broadcast_keyring( 74 + %{ 75 + uri: uri, 76 + owner_did: "did:plc:owner", 77 + rotation: 0, 78 + member_entries: [], 79 + indexed_at: DateTime.utc_now() 80 + }, 81 + :upsert 82 + ) 83 + 84 + # Should receive on both topics 85 + assert_receive {:sse_event, "keyring:upsert", _} 86 + assert_receive {:sse_event, "keyring:upsert", _} 87 + end 88 + end 89 + 90 + describe "broadcast_grant/2" do 91 + test "broadcasts to owner and recipient" do 92 + Phoenix.PubSub.subscribe(@pubsub, OpakeAppview.SSE.Topics.personal("did:plc:owner")) 93 + Phoenix.PubSub.subscribe(@pubsub, OpakeAppview.SSE.Topics.personal("did:plc:recipient")) 94 + 95 + Broadcaster.broadcast_grant( 96 + %{ 97 + uri: "at://did:plc:owner/app.opake.grant/3gr", 98 + owner_did: "did:plc:owner", 99 + recipient_did: "did:plc:recipient", 100 + document_uri: "at://did:plc:owner/app.opake.document/3doc", 101 + created_at: "2026-04-10T12:00:00Z" 102 + }, 103 + :upsert 104 + ) 105 + 106 + assert_receive {:sse_event, "grant:upsert", _} 107 + assert_receive {:sse_event, "grant:upsert", _} 108 + end 109 + end 110 + end
+34
apps/appview/test/opake_appview/sse/token_store_test.exs
··· 1 + defmodule OpakeAppview.SSE.TokenStoreTest do 2 + use ExUnit.Case, async: true 3 + 4 + alias OpakeAppview.SSE.TokenStore 5 + 6 + describe "create_token/1 and consume_token/1" do 7 + test "creates a token and consumes it" do 8 + token = TokenStore.create_token("did:plc:test") 9 + assert is_binary(token) 10 + assert byte_size(token) > 20 11 + 12 + assert {:ok, "did:plc:test"} = TokenStore.consume_token(token) 13 + end 14 + 15 + test "token is single-use" do 16 + token = TokenStore.create_token("did:plc:test") 17 + assert {:ok, _} = TokenStore.consume_token(token) 18 + assert :error = TokenStore.consume_token(token) 19 + end 20 + 21 + test "unknown token returns error" do 22 + assert :error = TokenStore.consume_token("nonexistent-token") 23 + end 24 + 25 + test "different DIDs get different tokens" do 26 + t1 = TokenStore.create_token("did:plc:alice") 27 + t2 = TokenStore.create_token("did:plc:bob") 28 + assert t1 != t2 29 + 30 + assert {:ok, "did:plc:alice"} = TokenStore.consume_token(t1) 31 + assert {:ok, "did:plc:bob"} = TokenStore.consume_token(t2) 32 + end 33 + end 34 + end
+72
apps/appview/test/opake_appview_web/controllers/events_controller_test.exs
··· 1 + defmodule OpakeAppviewWeb.EventsControllerTest do 2 + use OpakeAppviewWeb.ConnCase, async: false 3 + 4 + import Mox 5 + setup :set_mox_global 6 + setup :verify_on_exit! 7 + 8 + alias OpakeAppview.SSE.TokenStore 9 + 10 + @did "did:plc:test123" 11 + 12 + setup do 13 + :ets.delete_all_objects(:key_cache) 14 + :ok 15 + end 16 + 17 + describe "POST /api/events/token" do 18 + test "returns a token for authenticated user", %{conn: conn} do 19 + conn = 20 + conn 21 + |> authed_conn_post(@did, "/api/events/token") 22 + |> post("/api/events/token") 23 + 24 + assert %{"token" => token, "ttl" => ttl} = json_response(conn, 200) 25 + assert is_binary(token) 26 + assert ttl > 0 27 + end 28 + 29 + test "returns 401 without auth", %{conn: conn} do 30 + conn = post(conn, "/api/events/token") 31 + assert conn.status == 401 32 + end 33 + end 34 + 35 + describe "GET /api/events" do 36 + test "returns 400 without token", %{conn: conn} do 37 + conn = get(conn, "/api/events") 38 + assert json_response(conn, 400)["error"] =~ "token" 39 + end 40 + 41 + test "returns 401 with invalid token", %{conn: conn} do 42 + conn = get(conn, "/api/events?token=bogus") 43 + assert json_response(conn, 401)["error"] =~ "invalid" 44 + end 45 + 46 + test "returns 401 with consumed token", %{conn: conn} do 47 + token = TokenStore.create_token(@did) 48 + {:ok, _} = TokenStore.consume_token(token) 49 + 50 + conn = get(conn, "/api/events?token=#{token}") 51 + assert json_response(conn, 401) 52 + end 53 + end 54 + 55 + # Like authed_conn but for POST method 56 + defp authed_conn_post(conn, did, path) do 57 + {pubkey, privkey} = :crypto.generate_key(:eddsa, :ed25519) 58 + 59 + Mox.expect(OpakeAppview.Auth.KeyFetcherMock, :fetch_signing_key, fn ^did -> {:ok, pubkey} end) 60 + 61 + timestamp = System.system_time(:second) 62 + message = "POST:#{path}:#{timestamp}:#{did}" 63 + signature = :crypto.sign(:eddsa, :none, message, [privkey, :ed25519]) 64 + sig_b64 = Base.encode64(signature) 65 + 66 + Plug.Conn.put_req_header( 67 + conn, 68 + "authorization", 69 + "Opake-Ed25519 #{did}:#{timestamp}:#{sig_b64}" 70 + ) 71 + end 72 + end
+67 -4
apps/web/src/routes/cabinet/route.lazy.tsx
··· 8 8 } from "@/components/cabinet/CreateWorkspaceDialog"; 9 9 import { useWorkspaceStore } from "@/stores/workspace"; 10 10 import { getOpake, useAuthStore } from "@/stores/auth"; 11 + import { useDocumentsStore } from "@/stores/documents/store"; 11 12 import { taskStore } from "@/stores/tasks"; 12 13 import { startDaemon } from "@opake/daemon"; 13 - import { Opake } from "@opake/sdk"; 14 + import { Opake, type EventStream } from "@opake/sdk"; 14 15 import { toastError, toastSuccess } from "@/stores/toast"; 15 16 17 + /** Re-sync the currently viewed directory from the PDS. */ 18 + function reloadCurrentDirectory(): void { 19 + const { loaded, currentDirectoryUri } = useDocumentsStore.getState(); 20 + if (loaded) { 21 + void useDocumentsStore.getState().loadDirectory(currentDirectoryUri); 22 + } 23 + } 24 + 25 + /** Debounced proposal sync — multiple SSE events within the window collapse into one call. */ 26 + // eslint-disable-next-line functional/no-let -- mutable timer for debounce 27 + let proposalSyncTimer: ReturnType<typeof setTimeout> | null = null; 28 + const PROPOSAL_SYNC_DEBOUNCE_MS = 2000; 29 + 30 + function debouncedProposalSync(): void { 31 + if (proposalSyncTimer) clearTimeout(proposalSyncTimer); 32 + proposalSyncTimer = setTimeout(() => { 33 + proposalSyncTimer = null; 34 + void getOpake() 35 + .syncOwnedWorkspacesDetailed() 36 + .catch(() => { 37 + // Sync failure is non-fatal — next SSE event will retry 38 + }); 39 + }, PROPOSAL_SYNC_DEBOUNCE_MS); 40 + } 41 + 16 42 function CabinetLayout() { 17 43 const [sidebarOpen, setSidebarOpen] = useState(false); 18 44 const dialogRef = useRef<CreateWorkspaceDialogHandle>(null); ··· 22 48 void useWorkspaceStore.getState().loadWorkspaces(); 23 49 }, []); 24 50 25 - // Start background daemon — Web Locks ensures only one tab is leader 51 + // Background daemon — PDS maintenance writes (share retry, grant healing, pair cleanup). 52 + // directory-sync is SSE-driven via debouncedProposalSync above. 26 53 useEffect(() => { 27 54 // eslint-disable-next-line functional/no-let -- handle assigned inside async IIFE 28 55 let handle: ReturnType<typeof startDaemon> | null = null; 29 56 void Opake.taskDefs().then((defs) => { 30 - handle = startDaemon(getOpake(), defs, taskStore, { 31 - onWorkspaceUpdated: () => void useWorkspaceStore.getState().loadWorkspaces(), 57 + const filteredDefs = defs.filter((d) => d.name !== "directory-sync"); 58 + handle = startDaemon(getOpake(), filteredDefs, taskStore, { 32 59 onSessionExpired: () => { 33 60 handle?.stop(); 34 61 void useAuthStore.getState().logout(); ··· 36 63 }); 37 64 }); 38 65 return () => handle?.stop(); 66 + }, []); 67 + 68 + // Real-time event stream from the appview 69 + useEffect(() => { 70 + const appviewUrl = import.meta.env.VITE_APPVIEW_URL as string | undefined; 71 + if (!appviewUrl) return; 72 + 73 + // eslint-disable-next-line functional/no-let, functional/prefer-immutable-types -- mutable ref for cleanup 74 + let stream: EventStream | null = null; 75 + try { 76 + stream = getOpake().subscribe( 77 + { 78 + onDirectoryUpsert: () => { 79 + reloadCurrentDirectory(); 80 + debouncedProposalSync(); 81 + }, 82 + onDirectoryDelete: () => reloadCurrentDirectory(), 83 + onDocumentUpsert: () => reloadCurrentDirectory(), 84 + onDocumentDelete: () => reloadCurrentDirectory(), 85 + onKeyringUpsert: () => { 86 + void useWorkspaceStore.getState().loadWorkspaces(); 87 + debouncedProposalSync(); 88 + }, 89 + onKeyringDelete: () => void useWorkspaceStore.getState().loadWorkspaces(), 90 + onReconnect: () => { 91 + reloadCurrentDirectory(); 92 + void useWorkspaceStore.getState().loadWorkspaces(); 93 + debouncedProposalSync(); 94 + }, 95 + }, 96 + appviewUrl, 97 + ); 98 + } catch { 99 + // SSE is optional — don't block the UI if it fails to connect 100 + } 101 + return () => stream?.close(); 39 102 }, []); 40 103 41 104 // Reset workspace store only when session transitions away from active
+38
crates/opake-core/src/client/appview.rs
··· 331 331 }) 332 332 } 333 333 334 + /// Request a short-lived SSE token from the appview. 335 + /// 336 + /// The token authenticates the EventSource connection (which cannot carry 337 + /// custom headers). Valid for ~60 seconds, single-use. 338 + pub async fn request_sse_token( 339 + transport: &impl Transport, 340 + appview_url: &str, 341 + did: &str, 342 + signing_key: &[u8; 32], 343 + ) -> Result<String, Error> { 344 + let path = "/api/events/token"; 345 + let timestamp = super::time::unix_now() as u64; 346 + let auth = sign_appview_request("POST", path, did, signing_key, timestamp); 347 + 348 + let request = HttpRequest { 349 + method: HttpMethod::Post, 350 + url: format!("{appview_url}{path}"), 351 + headers: vec![("Authorization".into(), auth)], 352 + body: None, 353 + }; 354 + 355 + let response = transport.send(request).await?; 356 + check_appview_response(response.status, &response.body)?; 357 + 358 + #[derive(serde::Deserialize)] 359 + struct TokenResponse { 360 + token: String, 361 + } 362 + 363 + let parsed: TokenResponse = 364 + serde_json::from_slice(&response.body).map_err(|e| Error::Appview { 365 + status: response.status, 366 + message: format!("failed to parse SSE token response: {e}"), 367 + })?; 368 + 369 + Ok(parsed.token) 370 + } 371 + 334 372 #[cfg(test)] 335 373 mod tests { 336 374 use super::*;
+20
crates/opake-core/src/opake.rs
··· 1386 1386 }) 1387 1387 } 1388 1388 1389 + // -- SSE token (for EventSource auth) -- 1390 + 1391 + /// Request a short-lived SSE token from the AppView. 1392 + /// 1393 + /// The token is passed as a query parameter to the SSE endpoint, 1394 + /// sidestepping EventSource's inability to send custom headers. 1395 + pub async fn request_sse_token( 1396 + &mut self, 1397 + default_appview_url: Option<&str>, 1398 + ) -> Result<String, Error> { 1399 + let identity = self.require_identity()?; 1400 + let signing_key = identity 1401 + .signing_key_bytes()? 1402 + .ok_or_else(|| Error::Auth("no signing key for SSE token request".into()))?; 1403 + 1404 + let url = self.resolve_appview_url(default_appview_url)?; 1405 + crate::client::request_sse_token(self.client.transport(), &url, &self.did, &signing_key) 1406 + .await 1407 + } 1408 + 1389 1409 // -- Inbox (incoming grants via AppView) -- 1390 1410 1391 1411 /// Fetch all incoming grants from the AppView.
+10
crates/opake-wasm/src/opake_wasm.rs
··· 618 618 to_js(&keyrings) 619 619 } 620 620 621 + /// Request a short-lived SSE token from the AppView. 622 + #[wasm_bindgen(js_name = requestSseToken)] 623 + pub async fn request_sse_token(&self, appview_url: Option<String>) -> Result<String, JsError> { 624 + let mut opake = self.opake().await?; 625 + opake 626 + .request_sse_token(appview_url.as_deref()) 627 + .await 628 + .map_err(wasm_err) 629 + } 630 + 621 631 /// Fetch all incoming grants from the AppView. 622 632 #[wasm_bindgen(js_name = listInbox)] 623 633 pub async fn list_inbox(&self, appview_url: Option<String>) -> Result<JsValue, JsError> {
+230
packages/opake-sdk/src/event-stream.ts
··· 1 + // Real-time event streaming from the Opake appview via Server-Sent Events. 2 + // 3 + // Opens an SSE connection authenticated via a short-lived token (obtained 4 + // from the appview's Ed25519-signed token endpoint). Auto-reconnects with 5 + // exponential backoff; fires `onReconnect` so the consumer can full-sync 6 + // to cover the gap. 7 + 8 + import { z } from "zod"; 9 + 10 + // --------------------------------------------------------------------------- 11 + // SSE event schemas (validate appview payloads at the boundary) 12 + // --------------------------------------------------------------------------- 13 + 14 + export const sseDirectorySchema = z.object({ 15 + directory_uri: z.string(), 16 + owner_did: z.string(), 17 + entries: z.array(z.unknown()).default([]), 18 + encrypted_metadata: z.unknown().nullish(), 19 + key_wrapping: z.unknown().nullish(), 20 + keyring_uri: z.string().nullish(), 21 + deleted_at: z.string().nullish(), 22 + indexed_at: z.string().nullish(), 23 + }); 24 + 25 + export const sseDocumentSchema = z.object({ 26 + document_uri: z.string(), 27 + owner_did: z.string(), 28 + encrypted_metadata: z.unknown().nullish(), 29 + encryption: z.unknown().nullish(), 30 + blob_ref: z.unknown().nullish(), 31 + keyring_uri: z.string().nullish(), 32 + rotation: z.number().nullish(), 33 + deleted_at: z.string().nullish(), 34 + indexed_at: z.string().nullish(), 35 + }); 36 + 37 + export const sseKeyringSchema = z.object({ 38 + uri: z.string(), 39 + owner_did: z.string(), 40 + rotation: z.number().nullish(), 41 + member_entries: z.array(z.unknown()).default([]), 42 + encrypted_metadata: z.unknown().nullish(), 43 + created_at: z.string().nullish(), 44 + indexed_at: z.string().nullish(), 45 + }); 46 + 47 + export const sseGrantSchema = z.object({ 48 + uri: z.string(), 49 + owner_did: z.string(), 50 + recipient_did: z.string().nullish(), 51 + document_uri: z.string(), 52 + created_at: z.string().nullish(), 53 + }); 54 + 55 + export const sseDeleteSchema = z.object({ 56 + uri: z.string().optional(), 57 + directory_uri: z.string().optional(), 58 + document_uri: z.string().optional(), 59 + }); 60 + 61 + // --------------------------------------------------------------------------- 62 + // Types 63 + // --------------------------------------------------------------------------- 64 + 65 + export type SSEDirectory = z.output<typeof sseDirectorySchema>; 66 + export type SSEDocument = z.output<typeof sseDocumentSchema>; 67 + export type SSEKeyring = z.output<typeof sseKeyringSchema>; 68 + export type SSEGrant = z.output<typeof sseGrantSchema>; 69 + export type SSEDelete = z.output<typeof sseDeleteSchema>; 70 + 71 + /** Handlers for SSE events. All optional — subscribe to what you need. */ 72 + export interface EventStreamHandlers { 73 + readonly onDirectoryUpsert?: (data: SSEDirectory) => void; 74 + readonly onDirectoryDelete?: (data: SSEDelete) => void; 75 + readonly onDocumentUpsert?: (data: SSEDocument) => void; 76 + readonly onDocumentDelete?: (data: SSEDelete) => void; 77 + readonly onKeyringUpsert?: (data: SSEKeyring) => void; 78 + readonly onKeyringDelete?: (data: SSEDelete) => void; 79 + readonly onGrantUpsert?: (data: SSEGrant) => void; 80 + readonly onGrantDelete?: (data: SSEDelete) => void; 81 + /** Fired on reconnect — consumer should perform a full sync to cover the gap. */ 82 + readonly onReconnect?: () => void; 83 + readonly onError?: (error: Error) => void; 84 + readonly onOpen?: () => void; 85 + } 86 + 87 + /** Configuration for an EventStream. */ 88 + export interface EventStreamOptions { 89 + /** Appview base URL (e.g., "https://appview.opake.app"). */ 90 + readonly appviewUrl: string; 91 + /** Async function that returns a fresh single-use SSE token. Called on every connect/reconnect. */ 92 + readonly getToken: () => Promise<string>; 93 + /** Event handlers. */ 94 + readonly handlers: EventStreamHandlers; 95 + /** Max reconnect delay in ms (default: 30000). */ 96 + readonly maxReconnectDelay?: number; 97 + } 98 + 99 + // --------------------------------------------------------------------------- 100 + // EventStream 101 + // --------------------------------------------------------------------------- 102 + 103 + /** 104 + * Real-time event stream from the Opake appview. 105 + * 106 + * Connects via Server-Sent Events, authenticated with a short-lived token. 107 + * Auto-reconnects with exponential backoff on disconnect. 108 + * 109 + * @example 110 + * ```typescript 111 + * const stream = new EventStream({ 112 + * appviewUrl: "http://localhost:6100", 113 + * getToken: () => opake.requestSseToken(), 114 + * handlers: { 115 + * onDirectoryUpsert: (dir) => console.log("directory changed", dir), 116 + * onReconnect: () => store.fullSync(), 117 + * }, 118 + * }); 119 + * await stream.connect(); 120 + * // later: 121 + * stream.close(); 122 + * ``` 123 + */ 124 + export class EventStream { 125 + private eventSource: EventSource | null = null; 126 + private readonly appviewUrl: string; 127 + private readonly getToken: () => Promise<string>; 128 + private readonly handlers: EventStreamHandlers; 129 + private readonly maxReconnectDelay: number; 130 + private reconnectDelay = 1000; 131 + private reconnectTimer: ReturnType<typeof setTimeout> | null = null; 132 + private closed = false; 133 + private wasConnected = false; 134 + 135 + constructor(options: EventStreamOptions) { 136 + this.appviewUrl = options.appviewUrl; 137 + this.getToken = options.getToken; 138 + this.handlers = options.handlers; 139 + this.maxReconnectDelay = options.maxReconnectDelay ?? 30_000; 140 + } 141 + 142 + /** Open the SSE connection. Obtains a fresh token first. */ 143 + async connect(): Promise<void> { 144 + if (this.closed) return; 145 + 146 + try { 147 + const token = await this.getToken(); 148 + if (this.closed) return; // re-check after async gap (StrictMode cleanup race) 149 + 150 + const url = `${this.appviewUrl}/api/events?token=${encodeURIComponent(token)}`; 151 + const es = new EventSource(url); 152 + this.eventSource = es; 153 + 154 + es.onopen = () => { 155 + this.reconnectDelay = 1000; 156 + this.wasConnected = true; 157 + this.handlers.onOpen?.(); 158 + }; 159 + 160 + es.onerror = () => { 161 + es.close(); 162 + this.eventSource = null; 163 + this.scheduleReconnect(); 164 + }; 165 + 166 + // Register typed event listeners 167 + this.on(es, "directory:upsert", sseDirectorySchema, this.handlers.onDirectoryUpsert); 168 + this.on(es, "directory:delete", sseDeleteSchema, this.handlers.onDirectoryDelete); 169 + this.on(es, "document:upsert", sseDocumentSchema, this.handlers.onDocumentUpsert); 170 + this.on(es, "document:delete", sseDeleteSchema, this.handlers.onDocumentDelete); 171 + this.on(es, "keyring:upsert", sseKeyringSchema, this.handlers.onKeyringUpsert); 172 + this.on(es, "keyring:delete", sseDeleteSchema, this.handlers.onKeyringDelete); 173 + this.on(es, "grant:upsert", sseGrantSchema, this.handlers.onGrantUpsert); 174 + this.on(es, "grant:delete", sseDeleteSchema, this.handlers.onGrantDelete); 175 + } catch (e) { 176 + this.handlers.onError?.(e instanceof Error ? e : new Error(String(e))); 177 + this.scheduleReconnect(); 178 + } 179 + } 180 + 181 + /** Close the connection and stop reconnecting. */ 182 + close(): void { 183 + this.closed = true; 184 + if (this.reconnectTimer) { 185 + clearTimeout(this.reconnectTimer); 186 + this.reconnectTimer = null; 187 + } 188 + this.eventSource?.close(); 189 + this.eventSource = null; 190 + } 191 + 192 + /** Whether the connection is currently open. */ 193 + get connected(): boolean { 194 + return this.eventSource?.readyState === EventSource.OPEN; 195 + } 196 + 197 + // -- Internal -- 198 + 199 + private on<T>( 200 + es: EventSource, 201 + eventType: string, 202 + schema: z.ZodType<T>, 203 + handler?: (data: T) => void, 204 + ): void { 205 + if (!handler) return; 206 + es.addEventListener(eventType, ((e: MessageEvent) => { 207 + try { 208 + const parsed = schema.parse(JSON.parse(e.data as string)); 209 + handler(parsed); 210 + } catch (err) { 211 + this.handlers.onError?.( 212 + err instanceof Error ? err : new Error(`Failed to parse ${eventType} event`), 213 + ); 214 + } 215 + }) as EventListener); 216 + } 217 + 218 + private scheduleReconnect(): void { 219 + if (this.closed) return; 220 + // Only fire onReconnect after a previously-successful connection drops — 221 + // not on initial connection failure where there's no gap to sync. 222 + if (this.wasConnected) this.handlers.onReconnect?.(); 223 + // Jitter ±20% to prevent thundering herd on server restart 224 + const jitter = this.reconnectDelay * (0.8 + Math.random() * 0.4); 225 + this.reconnectTimer = setTimeout(() => { 226 + this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay); 227 + void this.connect(); 228 + }, jitter); 229 + } 230 + }
+12
packages/opake-sdk/src/index.ts
··· 54 54 type InvitationEntry, 55 55 type TaskDef, 56 56 } from "./types"; 57 + 58 + // Real-time event streaming 59 + export { 60 + EventStream, 61 + type EventStreamHandlers, 62 + type EventStreamOptions, 63 + type SSEDirectory, 64 + type SSEDocument, 65 + type SSEKeyring, 66 + type SSEGrant, 67 + type SSEDelete, 68 + } from "./event-stream";
+49
packages/opake-sdk/src/opake.ts
··· 32 32 import type { LoginOptions, StartLoginOptions, PendingLogin } from "./auth"; 33 33 import { createStorageAdapter } from "./storage-adapter"; 34 34 import { registerCleanup, unregisterCleanup } from "./finalizer"; 35 + import { EventStream, type EventStreamHandlers } from "./event-stream"; 35 36 import { 36 37 createPairRequest as pairingCreate, 37 38 listPairRequests as pairingList, ··· 616 617 @wrapWasmErrors @withTokenGuard 617 618 publishPublicKey(): Promise<string> { 618 619 return this.requireContext().publishPublicKey(); 620 + } 621 + 622 + // --------------------------------------------------------------------------- 623 + // Real-time event streaming (SSE) 624 + // --------------------------------------------------------------------------- 625 + 626 + /** 627 + * Request a short-lived SSE token from the appview. 628 + * 629 + * The token authenticates the EventSource connection (which cannot 630 + * carry custom headers). Valid for ~60 seconds, single-use. 631 + */ 632 + @wrapWasmErrors 633 + requestSseToken(appviewUrl?: string): Promise<string> { 634 + return this.requireContext().requestSseToken(appviewUrl ?? null); 635 + } 636 + 637 + /** 638 + * Subscribe to real-time events from the appview via SSE. 639 + * 640 + * Opens a persistent connection that receives full indexed records 641 + * as they're processed by the firehose indexer. Auto-reconnects with 642 + * exponential backoff; fires `onReconnect` so the consumer can 643 + * full-sync to cover the gap. 644 + * 645 + * @returns An `EventStream` — call `.close()` to disconnect. 646 + * 647 + * @example 648 + * ```typescript 649 + * const stream = opake.subscribe({ 650 + * onDirectoryUpsert: (dir) => console.log("changed:", dir.directory_uri), 651 + * onReconnect: () => store.fullSync(), 652 + * }); 653 + * // later: 654 + * stream.close(); 655 + * ``` 656 + */ 657 + subscribe( 658 + handlers: EventStreamHandlers, 659 + appviewUrl: string, 660 + ): EventStream { 661 + const stream = new EventStream({ 662 + appviewUrl, 663 + getToken: () => this.requestSseToken(appviewUrl), 664 + handlers, 665 + }); 666 + void stream.connect(); 667 + return stream; 619 668 } 620 669 621 670 // ---------------------------------------------------------------------------