this repo has no description
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Wire SSE consumer into daemon, add per-workspace proposal sync

Implements 4 of 5 SSE improvements: proposal event types in the SDK,
per-workspace targeted sync via syncWorkspaceByUri, daemon as the SSE
consumer with self-event filtering, and auth-error handling.

Daemon now owns the SSE subscription when options.sse is provided.
Proposal events trigger debounced per-workspace syncs instead of full
polls. Directory-sync downgrades to a 60s fallback timer to cover
document_update events that can't route to workspace topics. Record
events fire onRecordChanged (debounced 500ms) for non-self events only,
using markWrite() timestamps to suppress local echoes.

+485 -94
+3
apps/appview/lib/opake_appview/sse/broadcaster.ex
··· 156 156 157 157 defp format_proposal(attrs) do 158 158 %{uri: get(attrs, :uri), author_did: get(attrs, :author_did), action_type: get(attrs, :action_type)} 159 + |> TreeHelpers.maybe_put(:keyring_uri, get(attrs, :keyring_uri)) 159 160 |> TreeHelpers.maybe_put(:directory_uri, get(attrs, :directory_uri)) 160 161 |> TreeHelpers.maybe_put(:entry_uri, get(attrs, :entry_uri)) 161 162 |> TreeHelpers.maybe_put(:encrypted_metadata, get(attrs, :encrypted_metadata)) ··· 166 167 167 168 defp format_keyring_proposal(attrs) do 168 169 %{uri: get(attrs, :uri), author_did: get(attrs, :author_did), action_type: get(attrs, :action_type)} 170 + |> TreeHelpers.maybe_put(:keyring_uri, get(attrs, :keyring_uri)) 169 171 |> TreeHelpers.maybe_put(:member_did, get(attrs, :member_did)) 170 172 |> TreeHelpers.maybe_put(:member_public_key, get(attrs, :member_public_key)) 171 173 |> TreeHelpers.maybe_put(:role, get(attrs, :role)) ··· 174 176 175 177 defp format_document_proposal(attrs) do 176 178 %{uri: get(attrs, :uri), document_uri: get(attrs, :document_uri), author_did: get(attrs, :author_did)} 179 + |> TreeHelpers.maybe_put(:keyring_uri, get(attrs, :keyring_uri)) 177 180 |> TreeHelpers.maybe_put(:supersedes_uri, get(attrs, :supersedes_uri)) 178 181 end 179 182 end
+8 -9
apps/web/src/components/cabinet/InviteDialog.tsx
··· 1 1 import { forwardRef, useCallback, useEffect, useImperativeHandle, useRef, useState } from "react"; 2 2 import { LinkIcon, CopyIcon, TrashIcon, CheckIcon, ShareNetworkIcon } from "@phosphor-icons/react"; 3 3 import { MODAL_TRANSITION_MS } from "@/components/ConfirmDialog"; 4 - import { getOpakeWorker } from "@/lib/worker"; 4 + import { getOpake } from "@/stores/auth"; 5 5 import { toastSuccess, toastError } from "@/stores/toast"; 6 - import type { InvitationEntry } from "@/workers/api/workspace"; 7 - import type { WorkspaceRole } from "@/lib/workspaceSchemas"; 6 + import type { InvitationEntry, WorkspaceRole } from "@opake/sdk"; 8 7 9 8 export interface InviteDialogHandle { 10 9 readonly show: (keyringUri: string) => void; ··· 41 40 const loadInvitations = useCallback(async () => { 42 41 setLoading(true); 43 42 try { 44 - const worker = getOpakeWorker(); 45 - const all = await worker.listInvitations(); 43 + const opake = getOpake(); 44 + const all = await opake.listInvitations(); 46 45 // Filter to invitations for this workspace 47 46 setInvitations(keyringUri ? all.filter((inv) => inv.target === keyringUri) : []); 48 47 } catch { ··· 59 58 const handleCreate = useCallback(async () => { 60 59 if (!keyringUri) return; 61 60 try { 62 - const worker = getOpakeWorker(); 63 - await worker.createInvitation(keyringUri, role); 61 + const opake = getOpake(); 62 + await opake.createInvitation(keyringUri, role); 64 63 toastSuccess("Invitation created"); 65 64 await loadInvitations(); 66 65 } catch (err) { ··· 73 72 const handleRevoke = useCallback( 74 73 async (invitationUri: string) => { 75 74 try { 76 - const worker = getOpakeWorker(); 77 - await worker.revokeInvitation(invitationUri); 75 + const opake = getOpake(); 76 + await opake.revokeInvitation(invitationUri); 78 77 toastSuccess("Invitation revoked"); 79 78 await loadInvitations(); 80 79 } catch (err) {
+8 -3
apps/web/src/components/cabinet/ShareDialog.tsx
··· 2 2 import { ShareNetworkIcon } from "@phosphor-icons/react"; 3 3 import { resolveRecipient, RecipientNotReadyError } from "@/lib/sharing"; 4 4 import { useAuthStore } from "@/stores/auth"; 5 - import { getOpakeWorker } from "@/lib/worker"; 5 + import { getActiveFileManager } from "@/stores/documents/store"; 6 6 import { toastSuccess, toastError } from "@/stores/toast"; 7 7 import { useDocumentsStore } from "@/stores/documents/store"; 8 8 import { MODAL_TRANSITION_MS } from "@/components/ConfirmDialog"; ··· 67 67 setStatus("sharing"); 68 68 69 69 // Core handles: fetch document → unwrap key → wrap to recipient → create grant 70 - const worker = getOpakeWorker(); 71 - await worker.cabinetShare(documentUri, recipient.did, recipient.publicKey, "read", null); 70 + await getActiveFileManager().share( 71 + documentUri, 72 + recipient.did, 73 + recipient.publicKey, 74 + "read", 75 + null, 76 + ); 72 77 } catch (resolveError) { 73 78 if (resolveError instanceof RecipientNotReadyError) { 74 79 // REMOVE: pending share needs core domain method (Opake::create_pending_share)
+15 -58
apps/web/src/routes/cabinet/route.lazy.tsx
··· 11 11 import { useDocumentsStore } from "@/stores/documents/store"; 12 12 import { taskStore } from "@/stores/tasks"; 13 13 import { startDaemon } from "@opake/daemon"; 14 - import { Opake, type EventStream } from "@opake/sdk"; 14 + import { Opake } from "@opake/sdk"; 15 15 import { toastError, toastSuccess } from "@/stores/toast"; 16 16 17 17 /** Re-sync the currently viewed directory from the PDS. */ ··· 20 20 if (loaded) { 21 21 void useDocumentsStore.getState().loadDirectory(currentDirectoryUri); 22 22 } 23 - } 24 - 25 - /** Debounced proposal sync — multiple SSE events within the window collapse into one call. */ 26 - // eslint-disable-next-line functional/no-let -- mutable timer for debounce 27 - let proposalSyncTimer: ReturnType<typeof setTimeout> | null = null; 28 - const PROPOSAL_SYNC_DEBOUNCE_MS = 2000; 29 - 30 - function debouncedProposalSync(): void { 31 - if (proposalSyncTimer) clearTimeout(proposalSyncTimer); 32 - proposalSyncTimer = setTimeout(() => { 33 - proposalSyncTimer = null; 34 - void getOpake() 35 - .syncOwnedWorkspacesDetailed() 36 - .catch(() => { 37 - // Sync failure is non-fatal — next SSE event will retry 38 - }); 39 - }, PROPOSAL_SYNC_DEBOUNCE_MS); 40 23 } 41 24 42 25 function CabinetLayout() { ··· 48 31 void useWorkspaceStore.getState().loadWorkspaces(); 49 32 }, []); 50 33 51 - // Background daemon — PDS maintenance writes (share retry, grant healing, pair cleanup). 52 - // directory-sync is SSE-driven via debouncedProposalSync above. 34 + // Background daemon — PDS maintenance writes + SSE-driven proposal sync. 35 + // When `sse` is provided, directory-sync downgrades to a low-frequency 36 + // fallback and SSE events drive targeted per-workspace sync. Other tasks 37 + // (pair-cleanup, grant-healing, share-retry) always run on intervals. 53 38 useEffect(() => { 39 + const appviewUrl = import.meta.env.VITE_APPVIEW_URL as string | undefined; 54 40 // eslint-disable-next-line functional/no-let -- handle assigned inside async IIFE 55 41 let handle: ReturnType<typeof startDaemon> | null = null; 56 42 void Opake.taskDefs().then((defs) => { 57 - const filteredDefs = defs.filter((d) => d.name !== "directory-sync"); 58 - handle = startDaemon(getOpake(), filteredDefs, taskStore, { 43 + handle = startDaemon(getOpake(), defs, taskStore, { 44 + sse: appviewUrl ? { 45 + appviewUrl, 46 + onRecordChanged: () => reloadCurrentDirectory(), 47 + } : undefined, 48 + onWorkspaceUpdated: () => { 49 + reloadCurrentDirectory(); 50 + void useWorkspaceStore.getState().loadWorkspaces(); 51 + }, 59 52 onSessionExpired: () => { 60 53 handle?.stop(); 61 54 void useAuthStore.getState().logout(); ··· 63 56 }); 64 57 }); 65 58 return () => handle?.stop(); 66 - }, []); 67 - 68 - // Real-time event stream from the appview 69 - useEffect(() => { 70 - const appviewUrl = import.meta.env.VITE_APPVIEW_URL as string | undefined; 71 - if (!appviewUrl) return; 72 - 73 - // eslint-disable-next-line functional/no-let, functional/prefer-immutable-types -- mutable ref for cleanup 74 - let stream: EventStream | null = null; 75 - try { 76 - stream = getOpake().subscribe( 77 - { 78 - onDirectoryUpsert: () => { 79 - reloadCurrentDirectory(); 80 - debouncedProposalSync(); 81 - }, 82 - onDirectoryDelete: () => reloadCurrentDirectory(), 83 - onDocumentUpsert: () => reloadCurrentDirectory(), 84 - onDocumentDelete: () => reloadCurrentDirectory(), 85 - onKeyringUpsert: () => { 86 - void useWorkspaceStore.getState().loadWorkspaces(); 87 - debouncedProposalSync(); 88 - }, 89 - onKeyringDelete: () => void useWorkspaceStore.getState().loadWorkspaces(), 90 - onReconnect: () => { 91 - reloadCurrentDirectory(); 92 - void useWorkspaceStore.getState().loadWorkspaces(); 93 - debouncedProposalSync(); 94 - }, 95 - }, 96 - appviewUrl, 97 - ); 98 - } catch { 99 - // SSE is optional — don't block the UI if it fails to connect 100 - } 101 - return () => stream?.close(); 102 59 }, []); 103 60 104 61 // Reset workspace store only when session transitions away from active
+3
apps/web/src/stores/documents/store.ts
··· 223 223 try { 224 224 await enqueueMutation(async () => { 225 225 if (gen !== generation) return; 226 + // Open the suppression window BEFORE the write — SSE echo can arrive 227 + // as early as the appview indexes the firehose frame, before fn resolves. 228 + getOpake().markWrite(); 226 229 await fn(getActiveFileManager()); 227 230 }); 228 231 if (gen !== generation) return;
+36 -1
apps/web/src/stores/workspace.ts
··· 16 16 // eslint-disable-next-line functional/no-let 17 17 let loadPromise: Promise<void> | null = null; 18 18 19 + /** 20 + * Shallow change detection over the workspace record. Rotation bumps on 21 + * every keyring mutation so it's a reliable signal — combined with name, 22 + * description, and member count, it catches everything a user cares about 23 + * without a full deep-equal. 24 + */ 25 + function workspacesChanged( 26 + prev: Readonly<Record<string, WorkspaceEntry>>, 27 + next: Readonly<Record<string, WorkspaceEntry>>, 28 + ): boolean { 29 + const prevKeys = Object.keys(prev); 30 + const nextKeys = Object.keys(next); 31 + if (prevKeys.length !== nextKeys.length) return true; 32 + for (const key of nextKeys) { 33 + const a = prev[key]; 34 + const b = next[key]; 35 + if (!a) return true; 36 + if (a.rotation !== b.rotation) return true; 37 + if (a.memberCount !== b.memberCount) return true; 38 + if (a.name !== b.name) return true; 39 + if (a.description !== b.description) return true; 40 + } 41 + return false; 42 + } 43 + 19 44 // --------------------------------------------------------------------------- 20 45 // Types 21 46 // --------------------------------------------------------------------------- ··· 58 83 const record = Object.fromEntries(entries.map((e) => [e.uri, e])); 59 84 60 85 set((draft) => { 86 + // Skip the write if the record is shallow-equal to the current 87 + // state — avoids spurious Zustand notifications on no-op reloads 88 + // (common under SSE event bursts). 89 + if (!workspacesChanged(draft.workspaces, record)) { 90 + draft.loaded = true; 91 + draft.error = null; 92 + return; 93 + } 61 94 draft.workspaces = record; 62 95 draft.loaded = true; 63 96 draft.error = null; ··· 81 114 // Dialog disables its button during the async call. 82 115 const done = loading("create-workspace"); 83 116 try { 84 - const result = await getOpake().createWorkspace(name, description ?? ""); 117 + const opake = getOpake(); 118 + opake.markWrite(); 119 + const result = await opake.createWorkspace(name, description ?? ""); 85 120 loadPromise = null; 86 121 await useWorkspaceStore.getState().loadWorkspaces(); 87 122 return result.keyringUri;
+20
crates/opake-core/src/opake.rs
··· 461 461 Ok(results) 462 462 } 463 463 464 + /// Sync a single workspace identified by its keyring URI. 465 + /// 466 + /// Fetches all member keyrings from the appview (same as the full sync), 467 + /// finds the target, and syncs only that one. Returns `None` if the 468 + /// keyring URI wasn't found in the member list. 469 + pub async fn sync_workspace_by_uri( 470 + &mut self, 471 + keyring_uri: &str, 472 + ) -> Result<Option<crate::daemon::WorkspaceSyncResult>, Error> { 473 + let appview_keyrings = self.discover_member_keyrings(None).await?; 474 + let target = appview_keyrings.iter().find(|kr| kr.uri == keyring_uri); 475 + let Some(kr) = target else { return Ok(None) }; 476 + 477 + let identity = self.require_identity()?; 478 + let private_key = identity.private_key_bytes()?; 479 + let result = self.sync_single_workspace(kr, &private_key).await; 480 + self.auto_persist_session().await?; 481 + Ok(Some(result)) 482 + } 483 + 464 484 /// Sync a single workspace: cleanup + apply proposals. 465 485 /// 466 486 /// Captures all errors into `WorkspaceSyncResult::error` instead of
+28
crates/opake-wasm/src/opake_wasm.rs
··· 532 532 to_js(&results) 533 533 } 534 534 535 + /// Sync a single workspace by keyring URI. Returns null if the URI is not 536 + /// in the member list, or the sync result otherwise. 537 + #[wasm_bindgen(js_name = syncWorkspaceByUri)] 538 + pub async fn sync_workspace_by_uri(&self, keyring_uri: &str) -> Result<JsValue, JsError> { 539 + let mut opake = self.opake().await?; 540 + let result = opake 541 + .sync_workspace_by_uri(keyring_uri) 542 + .await 543 + .map_err(wasm_err)?; 544 + to_js(&result) 545 + } 546 + 535 547 /// Retry all pending shares (resolve recipients, create grants). 536 548 #[wasm_bindgen(js_name = retryPendingSharesViaOpake)] 537 549 pub async fn retry_pending_shares_via_opake(&self) -> Result<JsValue, JsError> { ··· 851 863 .ok_or_else(|| JsError::new("already consumed"))?; 852 864 let session = opake.session().ok_or_else(|| JsError::new("no session"))?; 853 865 serde_wasm_bindgen::to_value(session).map_err(|e| JsError::new(&e.to_string())) 866 + } 867 + 868 + /// Get the authenticated DID without exposing the full session. 869 + /// 870 + /// Returns the DID string, or an error if the context is busy or has 871 + /// no session. Used for self-event filtering in the SSE consumer. 872 + #[wasm_bindgen(js_name = getDid)] 873 + pub fn get_did(&self) -> Result<String, JsError> { 874 + let guard = self 875 + .inner 876 + .try_lock() 877 + .ok_or_else(|| JsError::new("Opake is busy"))?; 878 + let opake = guard 879 + .as_ref() 880 + .ok_or_else(|| JsError::new("already consumed"))?; 881 + Ok(opake.did().to_string()) 854 882 } 855 883 856 884 /// Get the token expiry timestamp without exposing the full session.
+2 -1
packages/opake-daemon/src/index.ts
··· 1 1 // @opake/daemon — background task scheduling for browser environments 2 2 3 3 export { startDaemon, type DaemonHandle } from "./scheduler"; 4 - export type { DaemonOptions, TaskDef, TaskRecord, TaskStatus, TaskStore } from "./types"; 4 + export type { DaemonOptions, SSEConfig, TaskDef, TaskRecord, TaskStatus, TaskStore } from "./types"; 5 + export type { SSEConsumerHandle } from "./sse-consumer";
+39 -3
packages/opake-daemon/src/scheduler.ts
··· 1 1 // Daemon scheduler — Web Locks leader election + setInterval orchestration. 2 2 // 3 + // When `options.sse` is provided, SSE-driven proposal sync replaces the 4 + // `directory-sync` timer. Other tasks (pair-cleanup, grant-healing, 5 + // share-retry) always run on intervals. 6 + // 3 7 // Returns a DaemonHandle that the caller uses to stop the daemon. 4 8 // No module-level state — multiple handles can coexist (though only one 5 9 // leader runs per Web Locks scope). 6 10 7 11 import type { Opake } from "@opake/sdk"; 8 - import type { DaemonOptions, TaskDef, TaskStore } from "./types"; 12 + import type { DaemonOptions, SSEConfig, TaskDef, TaskStore } from "./types"; 9 13 import { runTasks } from "./tasks"; 14 + import { startSSEConsumer, type SSEConsumerHandle } from "./sse-consumer"; 10 15 11 16 const DEFAULT_INITIAL_DELAY_MS = 5_000; 12 17 const DEFAULT_PRUNE_AGE_MS = 7 * 24 * 60 * 60 * 1000; // 7 days 18 + // Reduced interval for directory-sync when SSE is active. SSE handles most 19 + // sync, but document_update proposals can't route to workspace topics (the 20 + // lexicon has no keyring field) so this timer catches what SSE misses. 21 + const DIRECTORY_SYNC_FALLBACK_MS = 60_000; 13 22 14 23 /** Handle returned by `startDaemon` — call `stop()` to shut down. */ 15 24 export interface DaemonHandle { 16 - /** Stop all intervals and release the leader lock. */ 25 + /** Stop all intervals, SSE subscription, and release the leader lock. */ 17 26 stop(): void; 18 27 } 19 28 ··· 24 33 * background tasks. Schedules all tasks from the core registry at their 25 34 * configured intervals. 26 35 * 36 + * When `options.sse` is provided, SSE-driven proposal sync replaces the 37 + * normal `directory-sync` interval — proposal events trigger immediate 38 + * targeted syncs, and the `directory-sync` timer downgrades to a 39 + * low-frequency fallback for events SSE can't route. 40 + * 27 41 * @returns A handle to stop the daemon. 28 42 * 29 43 * @example ··· 33 47 * 34 48 * const opake = await Opake.init(); 35 49 * const daemon = startDaemon(opake, taskDefs, taskStore, { 50 + * sse: { 51 + * appviewUrl: "https://appview.opake.app", 52 + * onRecordChanged: () => reloadCurrentView(), 53 + * }, 36 54 * onWorkspaceUpdated: (uris) => reloadWorkspace(uris), 37 55 * }); 38 56 * ··· 47 65 options?: DaemonOptions, 48 66 ): DaemonHandle { 49 67 const intervalIds: ReturnType<typeof setInterval>[] = []; 68 + // eslint-disable-next-line functional/no-let -- mutable ref for cleanup 50 69 let releaseLock: (() => void) | null = null; 70 + // eslint-disable-next-line functional/no-let -- mutable ref for cleanup 71 + let sseConsumer: SSEConsumerHandle | null = null; 72 + 73 + const sse: SSEConfig | undefined = options?.sse; 51 74 52 75 function stop(): void { 76 + sseConsumer?.close(); 77 + sseConsumer = null; 53 78 for (const id of intervalIds) { 54 79 clearInterval(id); 55 80 } ··· 68 93 const handler = handlers[task.name]; 69 94 if (!handler) continue; 70 95 71 - const intervalMs = task.intervalSeconds * 1000; 96 + // When SSE is active, directory-sync becomes a low-frequency fallback 97 + // (60s) instead of the normal interval. SSE handles most proposal sync, 98 + // but document_update events can't be workspace-routed (no keyring_uri 99 + // in the lexicon) so the timer catches what SSE misses. 100 + const intervalMs = (sse && task.name === "directory-sync") 101 + ? DIRECTORY_SYNC_FALLBACK_MS 102 + : task.intervalSeconds * 1000; 103 + 72 104 setTimeout(() => void handler(), initialDelay); 73 105 intervalIds.push(setInterval(() => void handler(), intervalMs)); 106 + } 107 + 108 + if (sse) { 109 + sseConsumer = startSSEConsumer(opake, taskStore, options ?? {}, sse); 74 110 } 75 111 } 76 112
+177
packages/opake-daemon/src/sse-consumer.ts
··· 1 + // SSE-driven proposal sync. Reacts to appview events instead of polling, 2 + // debouncing per-keyring so rapid proposal bursts collapse into one sync. 3 + // Events from our own DID within a short window after a local write are 4 + // suppressed to avoid redundant reloads. 5 + 6 + import type { Opake, EventStream, SSEKeyring } from "@opake/sdk"; 7 + import type { DaemonOptions, SSEConfig, TaskStore } from "./types"; 8 + import { persistSyncResult, handleDaemonError } from "./tasks"; 9 + 10 + const PROPOSAL_DEBOUNCE_MS = 2_000; 11 + const RECORD_CHANGED_DEBOUNCE_MS = 500; 12 + const SELF_EVENT_SUPPRESS_MS = 3_000; 13 + 14 + /** Shape common to all record events — what we need for self-event filtering. */ 15 + interface RecordEvent { 16 + readonly owner_did: string; 17 + } 18 + 19 + /** Shape common to all proposal events — what we need for routing. */ 20 + interface ProposalEvent { 21 + readonly author_did: string; 22 + readonly keyring_uri?: string | null; 23 + } 24 + 25 + export interface SSEConsumerHandle { 26 + close(): void; 27 + } 28 + 29 + /** 30 + * Subscribe to SSE and sync workspaces when proposal events arrive. 31 + * 32 + * Record events fire `sse.onRecordChanged` (unless suppressed as self-events). 33 + * Proposal events trigger debounced per-workspace sync, which fires 34 + * `options.onWorkspaceUpdated` on proposal application. 35 + */ 36 + export function startSSEConsumer( 37 + opake: Opake, 38 + taskStore: TaskStore, 39 + options: DaemonOptions, 40 + sse: SSEConfig, 41 + ): SSEConsumerHandle { 42 + const proposalTimers = new Map<string, ReturnType<typeof setTimeout>>(); 43 + // Reserved key for full-sync debounce — cannot collide with a real AT URI 44 + // since those always start with "at://". 45 + const FULL_SYNC_KEY = "__full__"; 46 + 47 + // eslint-disable-next-line functional/no-let -- mutable ref for cleanup 48 + let stream: EventStream | null = null; 49 + // eslint-disable-next-line functional/no-let -- mutable ref for debounce 50 + let recordChangedTimer: ReturnType<typeof setTimeout> | null = null; 51 + 52 + // DID resolved once at startup. The daemon is torn down on account switch, 53 + // so staleness isn't a concern. 54 + const selfDid = opake.getDid(); 55 + 56 + function isSelfEvent(did: string | null | undefined): boolean { 57 + if (!selfDid || !did || did !== selfDid) return false; 58 + return (Date.now() - opake.lastWriteAt) < SELF_EVENT_SUPPRESS_MS; 59 + } 60 + 61 + /** Leading-edge debounce: schedule one reload per burst window. */ 62 + function notifyRecordChanged(did?: string | null): void { 63 + if (isSelfEvent(did)) return; 64 + if (recordChangedTimer) return; 65 + recordChangedTimer = setTimeout(() => { 66 + recordChangedTimer = null; 67 + sse.onRecordChanged?.(); 68 + }, RECORD_CHANGED_DEBOUNCE_MS); 69 + } 70 + 71 + /** Trailing-edge debounce per keyring. Rapid bursts collapse into one sync. */ 72 + function debouncedSync(keyringUri: string | null): void { 73 + const key = keyringUri ?? FULL_SYNC_KEY; 74 + const existing = proposalTimers.get(key); 75 + if (existing) clearTimeout(existing); 76 + 77 + proposalTimers.set( 78 + key, 79 + setTimeout(() => { 80 + proposalTimers.delete(key); 81 + void (keyringUri ? syncSingleWorkspace(keyringUri) : syncAllWorkspaces()); 82 + }, PROPOSAL_DEBOUNCE_MS), 83 + ); 84 + } 85 + 86 + async function syncSingleWorkspace(keyringUri: string): Promise<void> { 87 + try { 88 + const result = await opake.syncWorkspaceByUri(keyringUri); 89 + if (result && (result.proposalsApplied > 0 || result.error)) { 90 + await persistSyncResult(taskStore, result); 91 + if (result.proposalsApplied > 0) { 92 + options.onWorkspaceUpdated?.([keyringUri]); 93 + } 94 + } 95 + } catch (err) { 96 + handleDaemonError(err, options); 97 + } 98 + } 99 + 100 + async function syncAllWorkspaces(): Promise<void> { 101 + try { 102 + const results = await opake.syncOwnedWorkspacesDetailed(); 103 + const updated = results.filter((r) => r.proposalsApplied > 0); 104 + if (updated.length > 0) { 105 + await Promise.all(updated.map((r) => persistSyncResult(taskStore, r))); 106 + options.onWorkspaceUpdated?.(updated.map((r) => r.keyringUri)); 107 + } 108 + } catch (err) { 109 + handleDaemonError(err, options); 110 + } 111 + } 112 + 113 + // Generic handlers — all record events share the same "notify on non-self" 114 + // behavior; all proposal events share the same self-filter + per-keyring debounce. 115 + 116 + const handleRecordEvent = (data: RecordEvent): void => notifyRecordChanged(data.owner_did); 117 + const handleDelete = (): void => notifyRecordChanged(); 118 + 119 + function handleProposal(data: ProposalEvent): void { 120 + if (isSelfEvent(data.author_did)) return; 121 + debouncedSync(data.keyring_uri ?? null); 122 + } 123 + 124 + function handleKeyringUpsert(data: SSEKeyring): void { 125 + if (isSelfEvent(data.owner_did) || !data.uri) return; 126 + // Two paths: direct mutation (add member, rename) → sidebar needs refresh 127 + // immediately; proposal application → debouncedSync fires onWorkspaceUpdated 128 + // again later. Both call onWorkspaceUpdated. The host app's loadWorkspaces 129 + // dedupes in-flight calls and skips no-op updates, so the duplicate is free. 130 + options.onWorkspaceUpdated?.([data.uri]); 131 + debouncedSync(data.uri); 132 + } 133 + 134 + try { 135 + stream = opake.subscribe( 136 + { 137 + onDirectoryUpsert: handleRecordEvent, 138 + onDirectoryDelete: handleDelete, 139 + onDocumentUpsert: handleRecordEvent, 140 + onDocumentDelete: handleDelete, 141 + onKeyringUpsert: handleKeyringUpsert, 142 + onKeyringDelete: handleDelete, 143 + onGrantUpsert: handleRecordEvent, 144 + onGrantDelete: handleDelete, 145 + onDirectoryUpdateUpsert: handleProposal, 146 + onDirectoryUpdateDelete: handleDelete, 147 + onKeyringUpdateUpsert: handleProposal, 148 + onKeyringUpdateDelete: handleDelete, 149 + onDocumentUpdateUpsert: handleProposal, 150 + onDocumentUpdateDelete: handleDelete, 151 + onReconnect: () => { 152 + // syncAllWorkspaces fires onWorkspaceUpdated, which the host app 153 + // routes back to reloadCurrentDirectory. No separate notify needed. 154 + void syncAllWorkspaces(); 155 + }, 156 + }, 157 + sse.appviewUrl, 158 + ); 159 + } catch { 160 + // SSE connection failure is non-fatal — timer fallback still runs 161 + } 162 + 163 + function close(): void { 164 + stream?.close(); 165 + stream = null; 166 + if (recordChangedTimer) { 167 + clearTimeout(recordChangedTimer); 168 + recordChangedTimer = null; 169 + } 170 + for (const timer of proposalTimers.values()) { 171 + clearTimeout(timer); 172 + } 173 + proposalTimers.clear(); 174 + } 175 + 176 + return { close }; 177 + }
+28 -16
packages/opake-daemon/src/tasks.ts
··· 45 45 0, 46 46 ); 47 47 48 - // Per-workspace task entries 49 48 await Promise.all( 50 49 results 51 50 .filter((r: WorkspaceSyncResult) => r.proposalsApplied > 0 || r.error) 52 - .map((r: WorkspaceSyncResult) => 53 - persistTask(taskStore, `sync-${r.keyringUri}`, { 54 - type: "proposalSync", 55 - keyringUri: r.keyringUri, 56 - proposalsApplied: r.proposalsApplied, 57 - }, r.error ? { failed: r.error } : "completed"), 58 - ), 51 + .map((r: WorkspaceSyncResult) => persistSyncResult(taskStore, r)), 59 52 ); 60 53 61 - // Notify host app of updated workspaces 62 54 if (totalApplied > 0 && options?.onWorkspaceUpdated) { 63 55 const updatedUris = results 64 56 .filter((r: WorkspaceSyncResult) => r.proposalsApplied > 0) ··· 100 92 } 101 93 // No work done — don't persist at all (avoids write amplification) 102 94 } catch (err) { 103 - // Auth errors are transient — the token may have been rotated by a 104 - // concurrent refresh. Skip this cycle silently. 105 - if (err instanceof OpakeError && err.kind === "Auth") { 106 - options?.onSessionExpired?.(); 107 - return; 108 - } 109 - 95 + if (handleDaemonError(err, options)) return; 110 96 await persistTask(taskStore, id, initialKind, { failed: String(err) }, createdAt); 111 97 } 98 + } 99 + 100 + /** 101 + * Shared error branch for daemon operations. Returns true if the error was 102 + * handled (caller should return early), false if it should be persisted or 103 + * propagated. Auth errors fire `onSessionExpired` and are considered handled. 104 + */ 105 + export function handleDaemonError(err: unknown, options?: DaemonOptions): boolean { 106 + if (err instanceof OpakeError && err.kind === "Auth") { 107 + options?.onSessionExpired?.(); 108 + return true; 109 + } 110 + return false; 111 + } 112 + 113 + /** Persist a single workspace sync result as a task record. */ 114 + export async function persistSyncResult( 115 + taskStore: TaskStore, 116 + result: WorkspaceSyncResult, 117 + ): Promise<void> { 118 + await persistTask( 119 + taskStore, 120 + `sync-${result.keyringUri}`, 121 + { type: "proposalSync", keyringUri: result.keyringUri, proposalsApplied: result.proposalsApplied }, 122 + result.error ? { failed: result.error } : "completed", 123 + ); 112 124 } 113 125 114 126 async function persistTask(
+27 -3
packages/opake-daemon/src/types.ts
··· 20 20 readonly updatedAt: string; 21 21 } 22 22 23 + /** 24 + * SSE-specific daemon configuration. Presence of this field switches the 25 + * daemon into event-driven mode: `directory-sync` becomes a low-frequency 26 + * fallback timer while SSE events drive targeted per-workspace sync. 27 + * Absence keeps the daemon in classic timer-polling mode. 28 + */ 29 + export interface SSEConfig { 30 + /** Appview base URL for the SSE stream + token exchange. */ 31 + readonly appviewUrl: string; 32 + 33 + /** 34 + * Called when any SSE event arrives that affects directory/document 35 + * state (excluding self-events). Unlike `onWorkspaceUpdated`, this fires 36 + * for every non-self record mutation so the host app can reload the 37 + * current view. 38 + */ 39 + readonly onRecordChanged?: () => void; 40 + } 41 + 23 42 /** Configuration for the daemon scheduler. */ 24 43 export interface DaemonOptions { 25 44 /** ··· 36 55 readonly pruneAgeMs?: number; 37 56 38 57 /** 58 + * SSE streaming config. When present, the daemon subscribes to the 59 + * appview's event stream and reacts to proposal events with targeted 60 + * per-workspace sync. When absent, all sync happens via interval polling. 61 + */ 62 + readonly sse?: SSEConfig; 63 + 64 + /** 39 65 * Called when a workspace's directory tree is updated by proposal 40 66 * application. The daemon can't update UI state directly — this 41 67 * callback lets the host app trigger a reload. 42 68 */ 43 69 readonly onWorkspaceUpdated?: (keyringUris: readonly string[]) => void; 44 70 45 - /** 46 - * Called when the daemon detects an expired session. 47 - */ 71 + /** Called when the daemon detects an expired session. */ 48 72 readonly onSessionExpired?: () => void; 49 73 } 50 74
+49
packages/opake-sdk/src/event-stream.ts
··· 58 58 document_uri: z.string().optional(), 59 59 }); 60 60 61 + // Proposal schemas — workspace change proposals pending owner approval. 62 + // All proposals carry the same identity fields (uri + author + keyring); 63 + // individual schemas extend the base with their type-specific payload. 64 + 65 + const sseProposalBaseSchema = z.object({ 66 + uri: z.string(), 67 + author_did: z.string(), 68 + keyring_uri: z.string().nullish(), 69 + }); 70 + 71 + export const sseDirectoryUpdateSchema = sseProposalBaseSchema.extend({ 72 + action_type: z.string(), 73 + directory_uri: z.string().nullish(), 74 + entry_uri: z.string().nullish(), 75 + encrypted_metadata: z.unknown().nullish(), 76 + source_directory_uri: z.string().nullish(), 77 + target_directory_uri: z.string().nullish(), 78 + parent_directory_uri: z.string().nullish(), 79 + }); 80 + 81 + export const sseKeyringUpdateSchema = sseProposalBaseSchema.extend({ 82 + action_type: z.string(), 83 + member_did: z.string().nullish(), 84 + member_public_key: z.string().nullish(), 85 + role: z.string().nullish(), 86 + encrypted_metadata: z.unknown().nullish(), 87 + }); 88 + 89 + export const sseDocumentUpdateSchema = sseProposalBaseSchema.extend({ 90 + document_uri: z.string(), 91 + supersedes_uri: z.string().nullish(), 92 + }); 93 + 61 94 // --------------------------------------------------------------------------- 62 95 // Types 63 96 // --------------------------------------------------------------------------- ··· 67 100 export type SSEKeyring = z.output<typeof sseKeyringSchema>; 68 101 export type SSEGrant = z.output<typeof sseGrantSchema>; 69 102 export type SSEDelete = z.output<typeof sseDeleteSchema>; 103 + export type SSEDirectoryUpdate = z.output<typeof sseDirectoryUpdateSchema>; 104 + export type SSEKeyringUpdate = z.output<typeof sseKeyringUpdateSchema>; 105 + export type SSEDocumentUpdate = z.output<typeof sseDocumentUpdateSchema>; 70 106 71 107 /** Handlers for SSE events. All optional — subscribe to what you need. */ 72 108 export interface EventStreamHandlers { ··· 78 114 readonly onKeyringDelete?: (data: SSEDelete) => void; 79 115 readonly onGrantUpsert?: (data: SSEGrant) => void; 80 116 readonly onGrantDelete?: (data: SSEDelete) => void; 117 + readonly onDirectoryUpdateUpsert?: (data: SSEDirectoryUpdate) => void; 118 + readonly onDirectoryUpdateDelete?: (data: SSEDelete) => void; 119 + readonly onKeyringUpdateUpsert?: (data: SSEKeyringUpdate) => void; 120 + readonly onKeyringUpdateDelete?: (data: SSEDelete) => void; 121 + readonly onDocumentUpdateUpsert?: (data: SSEDocumentUpdate) => void; 122 + readonly onDocumentUpdateDelete?: (data: SSEDelete) => void; 81 123 /** Fired on reconnect — consumer should perform a full sync to cover the gap. */ 82 124 readonly onReconnect?: () => void; 83 125 readonly onError?: (error: Error) => void; ··· 172 214 this.on(es, "keyring:delete", sseDeleteSchema, this.handlers.onKeyringDelete); 173 215 this.on(es, "grant:upsert", sseGrantSchema, this.handlers.onGrantUpsert); 174 216 this.on(es, "grant:delete", sseDeleteSchema, this.handlers.onGrantDelete); 217 + // Proposal events (workspace change proposals pending owner approval) 218 + this.on(es, "directory_update:upsert", sseDirectoryUpdateSchema, this.handlers.onDirectoryUpdateUpsert); 219 + this.on(es, "directory_update:delete", sseDeleteSchema, this.handlers.onDirectoryUpdateDelete); 220 + this.on(es, "keyring_update:upsert", sseKeyringUpdateSchema, this.handlers.onKeyringUpdateUpsert); 221 + this.on(es, "keyring_update:delete", sseDeleteSchema, this.handlers.onKeyringUpdateDelete); 222 + this.on(es, "document_update:upsert", sseDocumentUpdateSchema, this.handlers.onDocumentUpdateUpsert); 223 + this.on(es, "document_update:delete", sseDeleteSchema, this.handlers.onDocumentUpdateDelete); 175 224 } catch (e) { 176 225 this.handlers.onError?.(e instanceof Error ? e : new Error(String(e))); 177 226 this.scheduleReconnect();
+3
packages/opake-sdk/src/index.ts
··· 65 65 type SSEKeyring, 66 66 type SSEGrant, 67 67 type SSEDelete, 68 + type SSEDirectoryUpdate, 69 + type SSEKeyringUpdate, 70 + type SSEDocumentUpdate, 68 71 } from "./event-stream";
+38
packages/opake-sdk/src/opake.ts
··· 26 26 createWorkspaceResultSchema, 27 27 listWorkspacesResultSchema, 28 28 syncDetailedResultSchema, 29 + syncSingleResultSchema, 29 30 } from "./schemas"; 30 31 import { initWasm } from "./wasm"; 31 32 import { FileManager } from "./file-manager"; ··· 402 403 } 403 404 404 405 // --------------------------------------------------------------------------- 406 + // Identity 407 + // --------------------------------------------------------------------------- 408 + 409 + /** The authenticated DID, or null if the context is busy/unavailable. */ 410 + getDid(): string | null { 411 + try { 412 + return this.requireContext().getDid(); 413 + } catch { 414 + return null; 415 + } 416 + } 417 + 418 + // --------------------------------------------------------------------------- 419 + // Write tracking (for self-event filtering in SSE consumers) 420 + // --------------------------------------------------------------------------- 421 + 422 + /** 423 + * Timestamp of the most recent local write. SSE consumers read this to 424 + * suppress echo events — if an SSE event from our own DID arrives within 425 + * the suppression window, it's assumed to be our own write echoing back. 426 + * 427 + * Callers should invoke `markWrite()` BEFORE starting a mutation so the 428 + * window opens at the earliest point the echo can arrive. 429 + */ 430 + lastWriteAt = 0; 431 + 432 + markWrite(): void { 433 + this.lastWriteAt = Date.now(); 434 + } 435 + 436 + // --------------------------------------------------------------------------- 405 437 // Token lifecycle (called by @withTokenGuard decorator) 406 438 // --------------------------------------------------------------------------- 407 439 ··· 681 713 @wrapWasmErrors @withTokenGuard 682 714 syncOwnedWorkspacesDetailed(): Promise<readonly WorkspaceSyncResult[]> { 683 715 return this.requireContext().syncOwnedWorkspacesDetailed().then(syncDetailedResultSchema.parse); 716 + } 717 + 718 + /** Sync a single workspace by keyring URI. Returns null if not a member. */ 719 + @wrapWasmErrors @withTokenGuard 720 + syncWorkspaceByUri(keyringUri: string): Promise<WorkspaceSyncResult | null> { 721 + return this.requireContext().syncWorkspaceByUri(keyringUri).then(syncSingleResultSchema.parse); 684 722 } 685 723 686 724 /** Retry pending shares — resolve recipients and create grants. */
+1
packages/opake-sdk/src/schemas.ts
··· 94 94 export type WorkspaceSyncResult = z.output<typeof workspaceSyncResultSchema>; 95 95 96 96 export const syncDetailedResultSchema = z.array(workspaceSyncResultSchema); 97 + export const syncSingleResultSchema = workspaceSyncResultSchema.nullable(); 97 98 98 99 // --------------------------------------------------------------------------- 99 100 // File operations