this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: sync all changes while offline

this implementation is so nasty, it's entirely vibe coded. But it does
work!
the bug we were facing was due to lack of understanding that `colVersion` also needs to be tracked alongside `dbVersion`. we also made some updates to the way the handshake happens for the protocol, introducing sync_request and having the server request changes from the client when it is ahead. The solution is entierly unoptimal right now, possibly with many duplication bugs and request (state tracking for the lastest version on the client side is particularly garbage), but it does work.

I'm also certain that there's code smells surronding authentication, up
to and possibly including overwriting existing keys in the
authentication table.
The v2 spec included in this commit has been implemented up to Phase 1:
Core Protocol. We will still need to implement authorization.

+1386 -53
+16 -4
mast-react-vite/src/hooks/use-sync.ts
··· 5 5 room: string; 6 6 endpoint: string; 7 7 worker: Worker; 8 + publicKey?: string; 8 9 } 9 10 10 11 export function useCustomSync({ 11 12 dbname, 12 13 room, 13 14 endpoint, 14 - worker 15 + worker, 16 + publicKey 15 17 }: SyncConfig) { 16 18 const [status, setStatus] = useState<string>('disconnected'); 17 19 const [changesSent, setChangesSent] = useState<number>(0); 18 20 const [changesReceived, setChangesReceived] = useState<number>(0); 19 21 const [lastError, setLastError] = useState<string | null>(null); 20 22 const [lastSyncTime, setLastSyncTime] = useState<Date | null>(null); 23 + const [roomAccess, setRoomAccess] = useState<string | null>(null); 24 + const [needsPayment, setNeedsPayment] = useState<boolean>(false); 21 25 22 26 // Initialize sync once on component mount 23 27 useEffect(() => { ··· 31 35 config: { 32 36 room: room, 33 37 url: endpoint, 38 + publicKey, 34 39 requestUnsyncedChanges: true // Request server to send unsynced changes immediately 35 40 } 36 41 }); 37 42 38 43 // Set up message handlers 39 44 const handleMessage = (event: MessageEvent) => { 40 - const { type, dbname: eventDbname, count, error } = event.data; 45 + const { type, dbname: eventDbname, count, error, access, needsPayment: needsPaymentFlag } = event.data; 41 46 42 47 // Only process messages for our database 43 48 if (eventDbname && eventDbname !== dbname) return; ··· 53 58 setStatus('error'); 54 59 setLastError(error); 55 60 break; 61 + case 'room_status': 62 + setRoomAccess(access); 63 + setNeedsPayment(needsPaymentFlag || false); 64 + console.log(`Room access: ${access}, needs payment: ${needsPaymentFlag}`); 65 + break; 56 66 case 'CHANGES_SENT': 57 67 setChangesSent(prev => prev + (count || 0)); 58 68 setLastSyncTime(new Date()); ··· 75 85 dbname 76 86 }); 77 87 }; 78 - }, [dbname, room, endpoint, worker]); 88 + }, [dbname, room, endpoint, worker, publicKey]); 79 89 80 90 // Function to trigger sync manually 81 91 const syncChanges = useCallback(() => { ··· 93 103 changesReceived, 94 104 lastError, 95 105 lastSyncTime, 96 - syncChanges 106 + syncChanges, 107 + roomAccess, 108 + needsPayment 97 109 }; 98 110 } 99 111
+5 -4
mast-react-vite/src/main.tsx
··· 254 254 } 255 255 256 256 // Custom sync component using your own implementation 257 - function CustomSyncComponent({ dbname, roomId }: { dbname: string, roomId: string }) { 257 + function CustomSyncComponent({ dbname, roomId, publicKey }: { dbname: string, roomId: string, publicKey?: string }) { 258 258 const endpoint = process.env.NODE_ENV === 'production' 259 259 ? `wss://mast-server.fly.dev/sync` 260 260 : `ws://localhost:8080/sync`; ··· 263 263 dbname, 264 264 room: roomId, 265 265 endpoint, 266 - worker: syncWorker 266 + worker: syncWorker, 267 + publicKey 267 268 }); 268 269 269 270 return <SyncStatus {...syncStats} />; ··· 405 406 } 406 407 }); 407 408 408 - return { db, rx, roomId, dbname }; 409 + return { db, rx, roomId, dbname, publicKeyBase64 }; 409 410 }; 410 411 411 412 // Function to register a key with the server ··· 580 581 <SelectionProvider> 581 582 <FilterProvider> 582 583 {/* Use the custom sync component with the database name and room ID */} 583 - <CustomSyncComponent dbname={ctx.dbname} roomId={ctx.roomId} /> 584 + <CustomSyncComponent dbname={ctx.dbname} roomId={ctx.roomId} publicKey={ctx.publicKeyBase64} /> 584 585 <App ctx={ctx} syncWorker={syncWorker} dbname={ctx.dbname} roomId={ctx.roomId} /> 585 586 </FilterProvider> 586 587 </SelectionProvider>
+509 -32
mast-react-vite/src/worker/sync-worker.ts
··· 13 13 console.error(`[SyncWorker] ${message}`, ...data); 14 14 } 15 15 16 + // Version state for tracking missing changes (v2 protocol) 17 + interface VersionColPair { 18 + dbVersion: number; 19 + colVersion: number; 20 + } 21 + 22 + interface MissingVersionRange { 23 + dbVersion: number; 24 + colVersions: number[]; 25 + } 26 + 27 + interface VersionState { 28 + contiguousUpTo: VersionColPair; 29 + missingRanges: MissingVersionRange[]; 30 + maxVersionSeen: VersionColPair; 31 + } 32 + 16 33 // Store active connections - only one per database 17 34 const connections: Record<string, { 18 35 ws: WebSocket | null; ··· 23 40 url: string; 24 41 isConnecting: boolean; 25 42 requestUnsyncedChanges?: boolean; 43 + versionState?: VersionState; 44 + publicKey?: string; 45 + roomAccess?: string; 46 + shouldReconnect?: boolean; 47 + reconnectTimeoutId?: number; 26 48 }> = {}; 27 49 28 50 // Handle messages from the main thread ··· 103 125 }; 104 126 105 127 // Start syncing a database 106 - async function startSync(dbname: string, config: { room: string, url: string }) { 128 + async function startSync(dbname: string, config: { room: string, url: string, publicKey?: string }) { 107 129 try { 108 130 // Check if we already have a connection for this database 109 131 if (connections[dbname]) { ··· 136 158 const siteId = siteIdResult[0].site_id; 137 159 logDebug(`Site ID: ${Array.from(siteId)}`); 138 160 139 - // Get the highest db_version from crsql_changes table 161 + // Initialize version state for v2 protocol 140 162 let lastSyncVersion = 0; 163 + let versionState: VersionState = { 164 + contiguousUpTo: { dbVersion: 0, colVersion: 0 }, 165 + missingRanges: [], 166 + maxVersionSeen: { dbVersion: 0, colVersion: 0 } 167 + }; 168 + 141 169 try { 142 - const versionResult = await db.execO<{max_version: number}[]>( 170 + logDebug(`Querying version state for database ${dbname}`); 171 + 172 + // Get all (db_version, col_version) pairs we have (from all sites, not just our own) 173 + const allVersionsResult = await db.execO<{db_version: number; col_version: number}[]>( 174 + "SELECT db_version, col_version FROM crsql_changes ORDER BY db_version, col_version" 175 + ); 176 + 177 + logDebug(`Found ${allVersionsResult.length} version pairs in database`); 178 + 179 + if (allVersionsResult.length > 0) { 180 + const versionPairs = allVersionsResult.map(r => ({ dbVersion: r.db_version, colVersion: r.col_version })); 181 + logDebug(`All version pairs in database: ${versionPairs.slice(0, 10).map(v => `${v.dbVersion}.${v.colVersion}`).join(',')}${versionPairs.length > 10 ? '...' : ''}`); 182 + 183 + // Calculate actual contiguous versions with col_version tracking 184 + logDebug(`Calculating contiguous versions from ${versionPairs.length} pairs`); 185 + const contiguousUpTo = calculateContiguousVersionPairs(versionPairs); 186 + logDebug(`Calculated contiguous up to: ${contiguousUpTo.dbVersion}.${contiguousUpTo.colVersion}`); 187 + 188 + const maxVersionSeen = getMaxVersionPair(versionPairs); 189 + logDebug(`Calculated max version seen: ${maxVersionSeen.dbVersion}.${maxVersionSeen.colVersion}`); 190 + 191 + const missingRanges = calculateMissingVersionRanges(versionPairs, maxVersionSeen); 192 + logDebug(`Calculated ${missingRanges.length} missing ranges`); 193 + 194 + versionState.contiguousUpTo = contiguousUpTo; 195 + versionState.maxVersionSeen = maxVersionSeen; 196 + versionState.missingRanges = missingRanges; 197 + 198 + // For lastSyncVersion, get max from our own changes only 199 + const ownVersionResult = await db.execO<{max_version: number}[]>( 200 + "SELECT MAX(db_version) as max_version FROM crsql_changes WHERE site_id = crsql_site_id()" 201 + ); 202 + if (ownVersionResult.length > 0 && ownVersionResult[0].max_version !== null) { 203 + lastSyncVersion = ownVersionResult[0].max_version; 204 + } 205 + 206 + logDebug(`Calculated version state: contiguous_up_to=${contiguousUpTo.dbVersion}.${contiguousUpTo.colVersion}, max_version_seen=${maxVersionSeen.dbVersion}.${maxVersionSeen.colVersion}, missing_ranges=${missingRanges.length}, own_max=${lastSyncVersion}`); 207 + } 208 + 209 + // Also get the global max version for reference 210 + const globalVersionResult = await db.execO<{max_version: number}[]>( 143 211 "SELECT MAX(db_version) as max_version FROM crsql_changes" 144 212 ); 145 - if (versionResult.length > 0 && versionResult[0].max_version !== null) { 146 - lastSyncVersion = versionResult[0].max_version; 147 - logDebug(`Retrieved lastSyncVersion ${lastSyncVersion} from database`); 213 + if (globalVersionResult.length > 0 && globalVersionResult[0].max_version !== null) { 214 + logDebug(`Global max version in database: ${globalVersionResult[0].max_version}`); 148 215 } 149 216 } catch (error) { 150 - logError(`Error retrieving lastSyncVersion:`, error); 217 + logError(`Error retrieving version state:`, error); 151 218 } 152 219 153 220 // Store or update connection info ··· 159 226 siteId, 160 227 room: config.room, 161 228 url: config.url, 162 - isConnecting: true 229 + isConnecting: true, 230 + versionState, 231 + publicKey: config.publicKey, 232 + shouldReconnect: true 163 233 }; 164 234 } else { 165 235 connections[dbname].db = db; ··· 168 238 connections[dbname].room = config.room; 169 239 connections[dbname].url = config.url; 170 240 connections[dbname].isConnecting = true; 241 + connections[dbname].versionState = versionState; 242 + connections[dbname].publicKey = config.publicKey; 243 + connections[dbname].shouldReconnect = true; 171 244 } 172 245 173 - // Create WebSocket connection with room in query parameter 246 + // Create WebSocket connection with room and publicKey in query parameters 174 247 let wsUrl = config.url; 175 248 if (!wsUrl.includes('?room=')) { 176 249 const separator = wsUrl.includes('?') ? '&' : '?'; 177 250 wsUrl = `${wsUrl}${separator}room=${config.room}`; 178 251 } 179 252 253 + // Add publicKey parameter for v2 protocol 254 + if (config.publicKey) { 255 + const separator = wsUrl.includes('?') ? '&' : '?'; 256 + wsUrl = `${wsUrl}${separator}publicKey=${encodeURIComponent(config.publicKey)}`; 257 + } 258 + 180 259 logDebug(`Connecting to WebSocket at ${wsUrl}`); 181 260 182 261 const ws = new WebSocket(wsUrl); ··· 196 275 }, 100); 197 276 }); 198 277 199 - // Initial sync - request changes from server 200 - const connection = connections[dbname]; 201 - 202 - // Encode siteId to base64 for transmission 203 - const encodedSiteId = siteId instanceof Uint8Array 204 - ? btoa(String.fromCharCode.apply(null, siteId)) 205 - : btoa(String(siteId)); 206 - 207 - logDebug(`Requesting pull with siteId: ${Array.from(siteId)}, encoded: ${encodedSiteId}, version: ${connection.lastSyncVersion}`) 208 - ws.send(JSON.stringify({ 209 - type: "pull", 210 - room: config.room, 211 - site_id: encodedSiteId, 212 - version: connection.lastSyncVersion 213 - })); 278 + // Wait for room_status before sending sync_request 279 + // The sync_request will be sent in the room_status message handler 214 280 215 281 // Notify main thread of connection 216 282 self.postMessage({ type: 'SYNC_CONNECTED', dbname }); ··· 223 289 const message = JSON.parse(event.data); 224 290 225 291 switch (message.type) { 292 + case "room_status": 293 + logDebug(`Received room status: ${message.access}`); 294 + connections[dbname].roomAccess = message.access; 295 + self.postMessage({ 296 + type: 'room_status', 297 + dbname, 298 + access: message.access, 299 + needsPayment: message.access === 'no_room' 300 + }); 301 + 302 + // After receiving room_status, send sync_request if we have access 303 + if (message.access === 'write' || message.access === 'read') { 304 + const connection = connections[dbname]; 305 + 306 + // Encode siteId to base64 for transmission 307 + const encodedSiteId = connection.siteId instanceof Uint8Array 308 + ? btoa(String.fromCharCode.apply(null, connection.siteId)) 309 + : btoa(String(connection.siteId)); 310 + 311 + if (config.publicKey && connection.versionState) { 312 + // Use v2 sync_request protocol 313 + logDebug(`Sending sync_request with v2 protocol: contiguous_up_to=${connection.versionState.contiguousUpTo.dbVersion}.${connection.versionState.contiguousUpTo.colVersion}, max_version_seen=${connection.versionState.maxVersionSeen.dbVersion}.${connection.versionState.maxVersionSeen.colVersion}`) 314 + ws.send(JSON.stringify({ 315 + type: "sync_request", 316 + site_id: encodedSiteId, 317 + contiguous_up_to: { 318 + db_version: connection.versionState.contiguousUpTo.dbVersion, 319 + col_version: connection.versionState.contiguousUpTo.colVersion 320 + }, 321 + missing_ranges: connection.versionState.missingRanges.map(range => ({ 322 + db_version: range.dbVersion, 323 + col_versions: range.colVersions 324 + })), 325 + max_version_seen: { 326 + db_version: connection.versionState.maxVersionSeen.dbVersion, 327 + col_version: connection.versionState.maxVersionSeen.colVersion 328 + }, 329 + publicKey: config.publicKey 330 + })); 331 + } else { 332 + // Fall back to v1 pull protocol 333 + logDebug(`Sending pull with v1 protocol: siteId: ${Array.from(connection.siteId)}, encoded: ${encodedSiteId}, version: ${connection.lastSyncVersion}`) 334 + ws.send(JSON.stringify({ 335 + type: "pull", 336 + room: config.room, 337 + site_id: encodedSiteId, 338 + version: connection.lastSyncVersion 339 + })); 340 + } 341 + } 342 + break; 343 + 344 + case "sync_response": 345 + logDebug(`Received sync response with ${message.changes?.length || 0} changes, server max version: ${message.current_max_version}`); 346 + if (Array.isArray(message.changes)) { 347 + await applyChanges(dbname, message.changes); 348 + updateVersionState(dbname, message.changes); 349 + } 350 + break; 351 + 226 352 case "changes": 227 353 if (Array.isArray(message.data)) { 228 354 logDebug(`Received ${message.data.length} changes from server`); 229 355 await applyChanges(dbname, message.data); 356 + updateVersionState(dbname, message.data); 230 357 } 231 358 break; 232 359 233 360 case "request_changes": 234 - logDebug(`Server requested changes since version ${message.version}`); 361 + logDebug(`Server requested changes since version ${JSON.stringify(message.version)}`); 235 362 236 363 try { 237 - await sendChanges(dbname, message.version); 364 + // Extract db_version from the version pair object 365 + const versionNumber = typeof message.version === 'object' && message.version?.db_version !== undefined 366 + ? message.version.db_version 367 + : (typeof message.version === 'number' ? message.version : 0); 368 + logDebug(`Extracted version number: ${versionNumber}`); 369 + await sendChanges(dbname, versionNumber); 238 370 } catch (error) { 239 371 logError(`Error responding to request_changes:`, error); 240 372 } 241 373 break; 242 374 375 + case "error": 376 + logError(`Received server error: ${message.code} - ${message.message}`); 377 + self.postMessage({ 378 + type: 'SYNC_ERROR', 379 + dbname, 380 + error: `${message.code}: ${message.message}` 381 + }); 382 + break; 383 + 243 384 default: 244 385 logDebug(`Received unknown message type: ${message.type}`); 245 386 } ··· 255 396 self.postMessage({ type: 'SYNC_ERROR', dbname, error: 'WebSocket error' }); 256 397 }; 257 398 258 - ws.onclose = () => { 259 - logDebug(`WebSocket connection closed for ${dbname}`); 399 + ws.onclose = (event) => { 400 + logDebug(`WebSocket connection closed for ${dbname}`, { code: event.code, reason: event.reason }); 260 401 connections[dbname].isConnecting = false; 402 + 403 + // Check if this was an auth failure (server termination) 404 + // Code 1002 = protocol error, 1008 = policy violation, 4001-4999 = custom app codes 405 + const isAuthFailure = event.code === 1002 || event.code === 1008 || (event.code >= 4000 && event.code < 5000); 406 + 407 + if (isAuthFailure) { 408 + logDebug(`Connection closed due to auth failure (code: ${event.code}), not reconnecting`); 409 + connections[dbname].shouldReconnect = false; 410 + self.postMessage({ type: 'SYNC_AUTH_FAILED', dbname, code: event.code }); 411 + } else if (connections[dbname].shouldReconnect !== false) { 412 + // Normal disconnect - attempt reconnection with random delay 413 + logDebug(`Connection lost, scheduling reconnection for ${dbname}`); 414 + attemptReconnection(dbname); 415 + } 416 + 261 417 // Notify main thread of disconnection 262 418 self.postMessage({ type: 'SYNC_DISCONNECTED', dbname }); 263 419 }; ··· 274 430 } 275 431 } 276 432 433 + // Attempt reconnection with random delay (3-9 seconds per v2 spec) 434 + function attemptReconnection(dbname: string) { 435 + const connection = connections[dbname]; 436 + if (!connection || connection.shouldReconnect === false) { 437 + return; 438 + } 439 + 440 + // Clear any existing reconnection timeout 441 + if (connection.reconnectTimeoutId) { 442 + clearTimeout(connection.reconnectTimeoutId); 443 + } 444 + 445 + // Random delay between 3 and 9 seconds to prevent stampeding herd 446 + const delay = Math.floor(Math.random() * 6000) + 3000; // 3000-9000ms 447 + logDebug(`Reconnecting to ${dbname} in ${delay}ms`); 448 + 449 + connection.reconnectTimeoutId = setTimeout(async () => { 450 + if (connections[dbname] && connections[dbname].shouldReconnect !== false) { 451 + logDebug(`Attempting reconnection to ${dbname}`); 452 + try { 453 + // Reset the WebSocket and connection state for reconnection 454 + connections[dbname].ws = null; 455 + connections[dbname].isConnecting = false; 456 + 457 + await startSync(dbname, { 458 + room: connection.room, 459 + url: connection.url, 460 + publicKey: connection.publicKey 461 + }); 462 + } catch (error) { 463 + logError(`Reconnection failed for ${dbname}:`, error); 464 + // Try again after another random delay 465 + attemptReconnection(dbname); 466 + } 467 + } 468 + }, delay); 469 + } 470 + 277 471 // Stop syncing a database 278 472 function stopSync(dbname: string) { 279 473 logDebug(`Stopping sync for ${dbname}`); ··· 281 475 if (!connection) { 282 476 logDebug(`No connection found for ${dbname}`); 283 477 return; 478 + } 479 + 480 + // Disable reconnection 481 + connection.shouldReconnect = false; 482 + 483 + // Clear any pending reconnection timeout 484 + if (connection.reconnectTimeoutId) { 485 + clearTimeout(connection.reconnectTimeoutId); 486 + connection.reconnectTimeoutId = undefined; 284 487 } 285 488 286 489 // Close WebSocket ··· 321 524 322 525 try { 323 526 // Use provided version or fall back to connection's lastSyncVersion 324 - const syncVersion = version !== undefined ? version : connection.lastSyncVersion; 325 - logDebug(`Querying for changes since version ${syncVersion}`); 527 + let syncVersion = version !== undefined ? version : connection.lastSyncVersion; 528 + logDebug(`syncVersion: ${syncVersion}`) 529 + 530 + // Special case: if version is explicitly 0 or -1, send ALL changes 531 + if (version === 0 || version === -1) { 532 + syncVersion = -1; // This will get all changes since db_version > -1 means all changes 533 + logDebug(`Sending ALL changes for ${dbname} (version was ${version})`); 534 + } else { 535 + logDebug(`Querying for changes since version ${syncVersion}`); 536 + } 326 537 327 538 // Query for changes since specified version 328 539 const changes = await connection.db.execA( 329 - `SELECT * FROM crsql_changes WHERE db_version > ? AND site_id = crsql_site_id()`, 540 + `SELECT * FROM crsql_changes WHERE db_version >= ? AND site_id = crsql_site_id()`, 330 541 [syncVersion] 331 542 ); 332 543 544 + // Debug: also check what ALL changes exist 545 + const allChanges = await connection.db.execA( 546 + `SELECT db_version, site_id FROM crsql_changes ORDER BY db_version` 547 + ); 548 + logDebug(`Total changes in DB: ${allChanges.length}`); 549 + logDebug(`All versions: ${allChanges.map(c => c[0]).join(',')}`); 550 + logDebug(`Querying for: db_version > ${syncVersion} AND site_id = crsql_site_id()`); 551 + logDebug(`Found ${changes.length} matching changes to send`); 552 + 333 553 if (changes.length === 0) { 334 - logDebug(`No changes to send for ${dbname}`); 554 + logDebug(`No changes to send for ${dbname} (queried > ${syncVersion})`); 335 555 return; 336 556 } 337 557 ··· 421 641 } 422 642 } 423 643 644 + // Update version state based on received changes 645 + async function recalculateVersionState(dbname: string) { 646 + const connection = connections[dbname]; 647 + if (!connection || !connection.versionState) { 648 + return; 649 + } 650 + 651 + try { 652 + // Re-query all version pairs from database 653 + const allVersionsResult = await connection.db.execO<{db_version: number; col_version: number}[]>( 654 + "SELECT db_version, col_version FROM crsql_changes ORDER BY db_version, col_version" 655 + ); 656 + 657 + if (allVersionsResult.length === 0) { 658 + connection.versionState = { 659 + contiguousUpTo: { dbVersion: 0, colVersion: 0 }, 660 + missingRanges: [], 661 + maxVersionSeen: { dbVersion: 0, colVersion: 0 } 662 + }; 663 + return; 664 + } 665 + 666 + const versionPairs = allVersionsResult.map(r => ({ dbVersion: r.db_version, colVersion: r.col_version })); 667 + 668 + // Recalculate all version state 669 + const contiguousUpTo = calculateContiguousVersionPairs(versionPairs); 670 + const maxVersionSeen = getMaxVersionPair(versionPairs); 671 + const missingRanges = calculateMissingVersionRanges(versionPairs, maxVersionSeen); 672 + 673 + connection.versionState = { 674 + contiguousUpTo, 675 + maxVersionSeen, 676 + missingRanges 677 + }; 678 + 679 + logDebug(`Recalculated version state: contiguous_up_to=${contiguousUpTo.dbVersion}.${contiguousUpTo.colVersion}, max_version_seen=${maxVersionSeen.dbVersion}.${maxVersionSeen.colVersion}, missing_ranges=${missingRanges.length}`); 680 + } catch (error) { 681 + logError(`Error recalculating version state for ${dbname}:`, error); 682 + } 683 + } 684 + 685 + function updateVersionState(dbname: string, changes: any[]) { 686 + const connection = connections[dbname]; 687 + if (!connection || !connection.versionState) { 688 + return; 689 + } 690 + 691 + const versionState = connection.versionState; 692 + 693 + for (const change of changes) { 694 + const versionPair: VersionColPair = { 695 + dbVersion: change.DBVersion, 696 + colVersion: change.ColVersion 697 + }; 698 + 699 + // Update max seen 700 + if (compareVersionPairs(versionPair, versionState.maxVersionSeen) > 0) { 701 + versionState.maxVersionSeen = versionPair; 702 + } 703 + 704 + // Remove this specific (db_version, col_version) from missing ranges 705 + removeMissingVersionPair(versionState, versionPair); 706 + } 707 + 708 + logDebug(`Updated version state: contiguous_up_to=${versionState.contiguousUpTo.dbVersion}.${versionState.contiguousUpTo.colVersion}, max_version_seen=${versionState.maxVersionSeen.dbVersion}.${versionState.maxVersionSeen.colVersion}, missing_ranges=${versionState.missingRanges.length}`); 709 + } 710 + 711 + // Helper function to remove a specific (db_version, col_version) pair from missing ranges 712 + function removeMissingVersionPair(versionState: VersionState, versionPair: VersionColPair) { 713 + const newRanges: MissingVersionRange[] = []; 714 + 715 + for (const range of versionState.missingRanges) { 716 + if (range.dbVersion !== versionPair.dbVersion) { 717 + // Different db_version, keep the entire range 718 + newRanges.push(range); 719 + } else { 720 + // Same db_version, remove the specific col_version 721 + const remainingColVersions = range.colVersions.filter(cv => cv !== versionPair.colVersion); 722 + if (remainingColVersions.length > 0) { 723 + newRanges.push({ 724 + dbVersion: range.dbVersion, 725 + colVersions: remainingColVersions 726 + }); 727 + } 728 + } 729 + } 730 + 731 + versionState.missingRanges = newRanges; 732 + } 733 + 734 + // Helper function to recalculate contiguous version state after changes 735 + function recalculateContiguousVersionState(versionState: VersionState) { 736 + // This function is complex and error-prone. Let's use a simpler approach: 737 + // Re-query the database to get the current state and recalculate from scratch 738 + logDebug("Recalculating contiguous version state - this approach is flawed, should re-query database"); 739 + 740 + // For now, we'll use a conservative approach: don't change contiguous state 741 + // during incremental updates. The initial calculation should be correct. 742 + // This is a temporary fix - ideally we should re-query the database. 743 + 744 + // Only update if we were at 0,0 (initial state) 745 + if (versionState.contiguousUpTo.dbVersion === 0 && versionState.contiguousUpTo.colVersion === 0) { 746 + // Try to set a minimal contiguous state 747 + if (versionState.missingRanges.length === 0) { 748 + // No missing ranges, so contiguous up to max 749 + versionState.contiguousUpTo = versionState.maxVersionSeen; 750 + } else { 751 + // Find the lowest missing range and set contiguous just before it 752 + let minMissingDb = Math.min(...versionState.missingRanges.map(r => r.dbVersion)); 753 + if (minMissingDb > 1) { 754 + versionState.contiguousUpTo = { dbVersion: minMissingDb - 1, colVersion: 1 }; 755 + } else { 756 + // Missing something in db_version 1, so contiguous up to 0 757 + versionState.contiguousUpTo = { dbVersion: 0, colVersion: 0 }; 758 + } 759 + } 760 + } 761 + 762 + logDebug(`Recalculated contiguous up to: ${versionState.contiguousUpTo.dbVersion}.${versionState.contiguousUpTo.colVersion}`); 763 + } 764 + 765 + // Helper function to get the maximum version pair from an array 766 + function getMaxVersionPair(versionPairs: VersionColPair[]): VersionColPair { 767 + if (versionPairs.length === 0) return { dbVersion: 0, colVersion: 0 }; 768 + 769 + let maxDbVersion = 0; 770 + let maxColVersionForMaxDb = 0; 771 + 772 + for (const pair of versionPairs) { 773 + if (pair.dbVersion > maxDbVersion || (pair.dbVersion === maxDbVersion && pair.colVersion > maxColVersionForMaxDb)) { 774 + maxDbVersion = pair.dbVersion; 775 + maxColVersionForMaxDb = pair.colVersion; 776 + } 777 + } 778 + 779 + return { dbVersion: maxDbVersion, colVersion: maxColVersionForMaxDb }; 780 + } 781 + 782 + // Helper function to compare version pairs 783 + function compareVersionPairs(a: VersionColPair, b: VersionColPair): number { 784 + if (a.dbVersion !== b.dbVersion) { 785 + return a.dbVersion - b.dbVersion; 786 + } 787 + return a.colVersion - b.colVersion; 788 + } 789 + 790 + // Calculate the highest contiguous version pair from an array of version pairs 791 + function calculateContiguousVersionPairs(versionPairs: VersionColPair[]): VersionColPair { 792 + if (versionPairs.length === 0) return { dbVersion: 0, colVersion: 0 }; 793 + 794 + // Sort pairs by db_version, then col_version 795 + const sorted = [...versionPairs].sort(compareVersionPairs); 796 + 797 + // Group by db_version 798 + const versionGroups = new Map<number, number[]>(); 799 + for (const pair of sorted) { 800 + if (!versionGroups.has(pair.dbVersion)) { 801 + versionGroups.set(pair.dbVersion, []); 802 + } 803 + versionGroups.get(pair.dbVersion)!.push(pair.colVersion); 804 + } 805 + 806 + // Find the highest contiguous db_version where we have all col_versions from 1 to max 807 + let contiguousDbVersion = 0; 808 + let contiguousColVersion = 0; 809 + 810 + for (let dbVersion = 1; dbVersion <= Math.max(...Array.from(versionGroups.keys())); dbVersion++) { 811 + const colVersions = versionGroups.get(dbVersion); 812 + if (!colVersions) { 813 + // Missing this entire db_version 814 + break; 815 + } 816 + 817 + // Check if we have contiguous col_versions from 1 to max 818 + const sortedColVersions = [...colVersions].sort((a, b) => a - b); 819 + let expectedColVersion = 1; 820 + let lastContiguousCol = 0; 821 + 822 + for (const colVersion of sortedColVersions) { 823 + if (colVersion === expectedColVersion) { 824 + lastContiguousCol = colVersion; 825 + expectedColVersion++; 826 + } else if (colVersion > expectedColVersion) { 827 + break; 828 + } 829 + } 830 + 831 + if (lastContiguousCol > 0) { 832 + contiguousDbVersion = dbVersion; 833 + contiguousColVersion = lastContiguousCol; 834 + } else { 835 + break; 836 + } 837 + } 838 + 839 + return { dbVersion: contiguousDbVersion, colVersion: contiguousColVersion }; 840 + } 841 + 842 + // Calculate missing version ranges given version pairs and max version 843 + function calculateMissingVersionRanges(versionPairs: VersionColPair[], maxVersionSeen: VersionColPair): MissingVersionRange[] { 844 + const ranges: MissingVersionRange[] = []; 845 + 846 + if (versionPairs.length === 0) { 847 + if (maxVersionSeen.dbVersion > 0) { 848 + // Missing everything from 1 to max 849 + for (let dbVersion = 1; dbVersion <= maxVersionSeen.dbVersion; dbVersion++) { 850 + const maxColForThisDb = dbVersion === maxVersionSeen.dbVersion ? maxVersionSeen.colVersion : Number.MAX_SAFE_INTEGER; 851 + ranges.push({ 852 + dbVersion, 853 + colVersions: Array.from({ length: maxColForThisDb }, (_, i) => i + 1) 854 + }); 855 + } 856 + } 857 + return ranges; 858 + } 859 + 860 + // Group by db_version 861 + const versionGroups = new Map<number, Set<number>>(); 862 + for (const pair of versionPairs) { 863 + if (!versionGroups.has(pair.dbVersion)) { 864 + versionGroups.set(pair.dbVersion, new Set()); 865 + } 866 + versionGroups.get(pair.dbVersion)!.add(pair.colVersion); 867 + } 868 + 869 + // Find missing col_versions for each db_version from 1 to maxVersionSeen 870 + for (let dbVersion = 1; dbVersion <= maxVersionSeen.dbVersion; dbVersion++) { 871 + const existingColVersions = versionGroups.get(dbVersion) || new Set(); 872 + const maxColForThisDb = dbVersion === maxVersionSeen.dbVersion ? maxVersionSeen.colVersion : Math.max(...Array.from(existingColVersions), 0); 873 + 874 + if (maxColForThisDb === 0) { 875 + // No col_versions exist for this db_version, skip it for now 876 + // This might indicate we don't have any changes for this db_version yet 877 + continue; 878 + } 879 + 880 + const missingColVersions: number[] = []; 881 + for (let colVersion = 1; colVersion <= maxColForThisDb; colVersion++) { 882 + if (!existingColVersions.has(colVersion)) { 883 + missingColVersions.push(colVersion); 884 + } 885 + } 886 + 887 + if (missingColVersions.length > 0) { 888 + ranges.push({ 889 + dbVersion, 890 + colVersions: missingColVersions 891 + }); 892 + } 893 + } 894 + 895 + return ranges; 896 + } 897 + 424 898 // Also update the applyChanges function to handle decoding correctly 425 899 async function applyChanges(dbname: string, changes: any[]) { 426 900 const connection = connections[dbname]; ··· 498 972 connection.lastSyncVersion = maxVersion; 499 973 logDebug(`Updated lastSyncVersion to ${maxVersion}`); 500 974 } 975 + 976 + // Recalculate version state after applying changes 977 + await recalculateVersionState(dbname); 501 978 502 979 // Notify main thread that database was updated 503 980 logDebug(`🔄 DATABASE UPDATED: Notifying main thread for ${dbname} with ${changes.length} changes`);
+416
mast-sync-protocol-v3-spec.md
··· 1 + # Mast Sync Protocol v2.0 Specification 2 + 3 + ## Overview 4 + 5 + This is a clean v2 specification that starts fresh from v1, incorporating lessons learned to solve critical issues while maintaining simplicity. This replaces the previous over-engineered v2 implementation with a focused, production-ready protocol. 6 + 7 + ## Core Design Principles 8 + 9 + 1. **Secure by Default**: All operations require authentication, rooms are private by default 10 + 2. **Simple & Clean**: Remove unnecessary complexity while maintaining reliability 11 + 3. **Payment Ready**: Built-in support for paid room creation with backwards compatibility 12 + 4. **CRDT-Native**: Leverage CR-SQLite's conflict resolution, no complex coordination needed 13 + 5. **Immediate Feedback**: Clients know their room status immediately after connection 14 + 15 + ## Authentication & Authorization 16 + 17 + ### Key Management 18 + - **ECDSA P-256** public key cryptography 19 + - **Automatic registration** on first connection (when payment enforcement disabled) 20 + - **Room-scoped permissions**: read, write, invite capabilities 21 + - **Signature format**: `{type}:{data-json}` 22 + 23 + ### Environment-Based Enforcement 24 + ```bash 25 + # Development/Testing (default) 26 + REQUIRE_AUTH=false # Auto-grant invite permissions to any connecting user 27 + 28 + # Production (future) 29 + REQUIRE_AUTH=true # Only users in auth table can access rooms 30 + ``` 31 + 32 + ## Connection Lifecycle 33 + 34 + ### 1. WebSocket Connection 35 + ``` 36 + URL: wss://server/sync?room={roomId}&publicKey={base64PublicKey} 37 + ``` 38 + 39 + ### 2. Immediate Room Status 40 + Upon connection, server immediately sends room status: 41 + 42 + ```json 43 + { 44 + "type": "room_status", 45 + "access": "write|read|none|no_room" 46 + } 47 + ``` 48 + 49 + **Access Levels**: 50 + - `"write"`: Full read/write access to existing room 51 + - `"read"`: Read-only access to existing room 52 + - `"none"`: Room exists but no permissions granted (need to be invited -- later) 53 + - `"no_room"`: Room doesn't exist (payment may be required if they wish to create one) 54 + 55 + ### 3. Room Access Behavior 56 + 57 + **When `REQUIRE_AUTH=false` (Development)**: 58 + - Any room → Auto-grant invite permissions (read + write) 59 + - New rooms created automatically 60 + - User receives `"access": "write"` 61 + 62 + **When `REQUIRE_AUTH=true` (Production)**: 63 + - Only authenticated users can access 64 + - User must exist in room_keys table 65 + - User receives access level based on permissions 66 + 67 + ## Core Sync Protocol 68 + 69 + ### Missing Changes Problem Solution 70 + The critical flaw in v1 was using `MAX(db_version)` which caused missing intermediate versions. 71 + 72 + **Problem**: Client has versions [1,2,5,10] and requests `> 10`, never getting versions 3,4,6,7,8,9. 73 + 74 + **Solution**: Track highest contiguous version + explicit missing ranges. 75 + 76 + #### Version State Tracking 77 + ```javascript 78 + const versionState = { 79 + contiguousUpTo: 2, // Highest version with no gaps before it 80 + missingRanges: [ // Explicit gaps we know about 81 + {start: 3, end: 4}, // Missing versions 3-4 82 + {start: 6, end: 9} // Missing versions 6-9 83 + ], 84 + maxVersionSeen: 10 // Highest version ever seen 85 + }; 86 + ``` 87 + 88 + ### Change Synchronization 89 + 90 + #### Sync Request (Client → Server) 91 + ```json 92 + { 93 + "type": "sync_request", 94 + "site_id": "base64-site-id", 95 + "contiguous_up_to": 2, 96 + "missing_ranges": [ 97 + {"start": 3, "end": 4}, 98 + {"start": 6, "end": 9} 99 + ], 100 + "max_version_seen": 10, 101 + "publicKey": "base64-public-key" 102 + } 103 + ``` 104 + 105 + #### Sync Response (Server → Client) 106 + ```json 107 + { 108 + "type": "sync_response", 109 + "current_max_version": 15, 110 + "changes": [ 111 + // Missing ranges 3-4, 6-9 112 + // Plus new changes 11-15 113 + { 114 + "TableName": "todos", 115 + "PK": "base64-pk", 116 + "ColumnName": "description", 117 + "Value": "Task content", 118 + "ColVersion": 15, 119 + "DBVersion": 8, 120 + "SiteID": "base64-site-id", 121 + "CL": 1, 122 + "Seq": 1 123 + } 124 + ] 125 + } 126 + ``` 127 + 128 + #### Change Push (Write Operations) 129 + ```json 130 + { 131 + "type": "changes", 132 + "publicKey": "base64-public-key", 133 + "signature": "base64-signature", 134 + "data": [...changes...] 135 + } 136 + ``` 137 + 138 + **Signature payload**: `changes:{JSON.stringify(data)}` 139 + 140 + ### Real-time Broadcasting 141 + Server broadcasts changes to all authorized clients in room (excluding sender): 142 + 143 + ```json 144 + { 145 + "type": "changes", 146 + "data": [...changes...] 147 + } 148 + ``` 149 + 150 + ## Server Architecture 151 + 152 + ### Essential Components 153 + - ✅ ECDSA signature verification 154 + - ✅ Room-based isolation 155 + - ✅ Permission checking 156 + - ✅ Auto room creation (with environment flag) 157 + - ✅ Real-time broadcasting 158 + - ✅ Connection cleanup on auth failure 159 + 160 + ### Database Schema 161 + ```sql 162 + CREATE TABLE rooms ( 163 + room_id TEXT PRIMARY KEY, 164 + created_at INTEGER NOT NULL 165 + ); 166 + 167 + CREATE TABLE room_keys ( 168 + room_id TEXT NOT NULL, 169 + public_key TEXT NOT NULL, 170 + can_read BOOLEAN NOT NULL DEFAULT 1, 171 + can_write BOOLEAN NOT NULL DEFAULT 1, 172 + can_invite BOOLEAN NOT NULL DEFAULT 0, 173 + created_at INTEGER NOT NULL, 174 + PRIMARY KEY (room_id, public_key) 175 + ); 176 + ``` 177 + 178 + ### Server Version Query Logic 179 + 180 + #### Multi-Range SQL Query 181 + ```sql 182 + -- Get missing ranges + new changes 183 + SELECT * FROM crsql_changes 184 + WHERE site_id != ? 185 + AND ( 186 + -- Missing range 3-4 187 + (db_version >= 3 AND db_version <= 4) OR 188 + -- Missing range 6-9 189 + (db_version >= 6 AND db_version <= 9) OR 190 + -- New changes 11-15 191 + (db_version > 10) 192 + ) 193 + ORDER BY db_version ASC 194 + ``` 195 + 196 + #### Client Version State Updates 197 + ```javascript 198 + function updateVersionState(changes) { 199 + for (const change of changes) { 200 + const version = change.DBVersion; 201 + 202 + // Update max seen 203 + versionState.maxVersionSeen = Math.max(versionState.maxVersionSeen, version); 204 + 205 + // Fill gaps and update contiguous 206 + fillGapsAndUpdateContiguous(version); 207 + } 208 + } 209 + 210 + function fillGapsAndUpdateContiguous(version) { 211 + // Remove version from missing ranges 212 + removeMissingVersion(version); 213 + 214 + // Extend contiguous if possible 215 + while (versionState.contiguousUpTo + 1 <= versionState.maxVersionSeen && 216 + !isVersionMissing(versionState.contiguousUpTo + 1)) { 217 + versionState.contiguousUpTo++; 218 + } 219 + } 220 + ``` 221 + 222 + ## Auto-Registration Flow 223 + 224 + ### Connection Logic 225 + 1. Client connects with `publicKey` parameter 226 + 2. Server checks room + key permissions 227 + 3. **When `REQUIRE_AUTH=false`**: 228 + - Auto-grant invite permissions to any user 229 + - Create room if it doesn't exist 230 + 4. **When `REQUIRE_AUTH=true`**: 231 + - Only users in auth table can access 232 + - No auto-registration 233 + 234 + ### Permission Granting Logic 235 + ```go 236 + // Auto-grant invite permissions (read + write) 237 + func AutoGrantInvitePermissions(roomID, publicKey string) error { 238 + return GrantPermissions(roomID, publicKey, true, true, true) // read, write, invite 239 + } 240 + ``` 241 + 242 + ## Client Implementation 243 + 244 + ### Connection Management 245 + ```typescript 246 + // Simple reconnection (no exponential backoff) 247 + function attemptReconnection(room: string) { 248 + setTimeout(() => { 249 + connectWebSocket(room).catch(() => attemptReconnection(room)); 250 + }, math.Rand(3, 9); // Between 3 and 9 seconds retry, to stop stampeding herd 251 + } 252 + ``` 253 + 254 + ### Authentication Integration 255 + ```typescript 256 + // Include publicKey in all requests 257 + async function sendSyncRequest(connection) { 258 + const syncRequest = { 259 + type: "sync_request", 260 + site_id: connection.siteId, 261 + last_version: connection.lastSyncVersion, 262 + publicKey // Always include for all syncRequest 263 + }; 264 + 265 + ws.send(JSON.stringify(syncRequest)); 266 + } 267 + ``` 268 + 269 + ### Room Status Handling 270 + ```typescript 271 + // Handle immediate room status 272 + case 'room_status': 273 + self.postMessage({ 274 + type: 'room_status', 275 + dbname: dbname, 276 + access: msg.access, 277 + needsPayment: msg.access === 'no_room' 278 + }); 279 + break; 280 + ``` 281 + 282 + ## Server Implementation 283 + 284 + ### Core Logic 285 + ```go 286 + func handleWebSocket(w http.ResponseWriter, r *http.Request) { 287 + roomID := r.URL.Query().Get("room") 288 + publicKey := r.URL.Query().Get("publicKey") 289 + 290 + // Check authentication/auto-grant permissions 291 + access := determineAccess(roomID, publicKey) 292 + 293 + // Upgrade WebSocket 294 + conn, err := upgrader.Upgrade(w, r, nil) 295 + if err != nil { 296 + return 297 + } 298 + 299 + // Send immediate room status 300 + sendRoomStatus(conn, access) 301 + 302 + // Handle sync protocol 303 + handleSyncProtocol(conn, roomID, publicKey) 304 + } 305 + 306 + func determineAccess(roomID, publicKey string) string { 307 + if !requireAuth { 308 + // Development mode - auto-grant invite permissions 309 + autoGrantInvitePermissions(roomID, publicKey) 310 + return "write" 311 + } 312 + 313 + // Production mode - check existing permissions 314 + return checkUserPermissions(roomID, publicKey) 315 + } 316 + ``` 317 + 318 + ## Error Handling 319 + 320 + ### Structured Error Responses 321 + ```json 322 + { 323 + "type": "error", 324 + "code": "AUTH_FAILED|ROOM_NOT_FOUND|PAYMENT_REQUIRED|PERMISSION_DENIED", 325 + "message": "Human readable description" 326 + } 327 + ``` 328 + 329 + ### Authentication Failure Behavior 330 + - **Invalid signature**: Close connection immediately 331 + - **No read permission**: Close connection immediately 332 + - **No write permission**: Reject change, keep connection open 333 + - **No room**: Send `room_status` with `"access": "no_room"` 334 + 335 + ## Migration from v1 336 + 337 + ### Environment Variable Control 338 + ```bash 339 + # Start with development mode 340 + REQUIRE_AUTH=false 341 + 342 + # Switch to production when ready 343 + REQUIRE_AUTH=true 344 + ``` 345 + 346 + ### Protocol Changes from v1 347 + - Replace `"pull"` message with `"sync_request"` 348 + - Add version range tracking instead of simple `last_version` 349 + - Add immediate `"room_status"` response 350 + - Add `publicKey` to WebSocket URL and all requests 351 + 352 + ### Seamless Transition 353 + - v1 rooms continue working unchanged 354 + - Auto-registration ensures no user disruption 355 + - Environment variable provides clean cutoff point 356 + 357 + ## Security Model 358 + 359 + ### Transport Security 360 + - **WSS required** for production (TLS 1.3 minimum) 361 + - **Certificate validation** on client 362 + 363 + ### Message Security 364 + - **ECDSA P-256 signatures** for all write operations 365 + - **Public key authentication** for all read operations 366 + - **Room isolation** - no cross-room access 367 + 368 + ### Key Security 369 + - **Web Crypto API** for key generation 370 + - **Secure storage** (recommend upgrade from localStorage for production) 371 + - **No key transmission** (only public keys sent to server) 372 + 373 + ### CR-SQLite Benefits 374 + - **Offline-first**: Changes work immediately without server 375 + - **Conflict-free**: Automatic merge resolution 376 + - **Consistent**: Guaranteed eventual consistency 377 + - **Efficient**: Delta-only synchronization 378 + 379 + ## Implementation Priorities 380 + 381 + ### Phase 1: Core Protocol 382 + 1. Implement missing changes solution with version range tracking through sync_request messages 383 + - Version state tracking on client side 384 + - Version requests in sync_request messages 385 + - Response from server giving exactly the changes requested 386 + 2. Add immediate room_status messages 387 + 3. Simple reconnection (3-second retry) 388 + 4. Room status handling in UI (SyncStatus component) 389 + 390 + ### Phase 2: Authorization 391 + 1. Environment-based auth enforcement (`REQUIRE_AUTH` flag) 392 + 2. Auto-registration for development mode 393 + 3. ECDSA signature verification (real implementation) 394 + 4. publicKey authentication for all operations 395 + 5. Connection cleanup on auth failure 396 + 397 + ## Configuration 398 + 399 + ### Server Configuration 400 + ```go 401 + // Environment variables 402 + var ( 403 + REQUIRE_AUTH = os.Getenv("REQUIRE_AUTH") == "true" 404 + ) 405 + ``` 406 + 407 + ### Client Configuration 408 + ```typescript 409 + interface SyncConfig { 410 + room: string; 411 + endpoint: string; 412 + autoReconnect?: boolean; // Default: true 413 + reconnectDelay?: number; // Default: 3000ms 414 + } 415 + ``` 416 +
+24
server/auth.go
··· 276 276 return hasPerm, nil 277 277 } 278 278 279 + // AutoGrantInvitePermissions auto-grants invite permissions (read + write + invite) for development mode 280 + func AutoGrantInvitePermissions(roomID, publicKey string) error { 281 + logKey := publicKey 282 + if len(logKey) > 20 { 283 + logKey = publicKey[:20] + "..." 284 + } 285 + log.Printf("Auto-granting invite permissions for key %s in room %s", logKey, roomID) 286 + 287 + _, err := authDB.Exec( 288 + `INSERT OR REPLACE INTO room_keys 289 + (room_id, public_key, can_read, can_write, can_invite) 290 + VALUES (?, ?, ?, ?, ?)`, 291 + roomID, publicKey, true, true, true, 292 + ) 293 + 294 + if err != nil { 295 + log.Printf("Error auto-granting permissions: %v", err) 296 + return err 297 + } 298 + 299 + log.Printf("Successfully auto-granted invite permissions") 300 + return nil 301 + } 302 + 279 303 // VerifySignature verifies that a signature was made by the public key 280 304 func VerifySignature(publicKey string, data string, signature string) (bool, error) { 281 305 // This is a placeholder - the actual implementation will depend on how you're handling
+416 -13
server/main.go
··· 4 4 "database/sql" 5 5 "encoding/base64" 6 6 "encoding/json" 7 + "fmt" 7 8 "log" 8 9 "net/http" 9 10 "os" 11 + "strings" 10 12 "sync" 11 13 "time" 12 14 ··· 20 22 return true 21 23 }, 22 24 } 25 + 26 + // Environment configuration 27 + var requireAuth = os.Getenv("REQUIRE_AUTH") == "true" 23 28 24 29 // Room management 25 30 type Room struct { ··· 113 118 // Set CORS headers for the WebSocket handshake 114 119 w.Header().Set("Access-Control-Allow-Origin", "*") 115 120 116 - // Extract room ID from query parameters 121 + // Extract room ID and publicKey from query parameters 117 122 roomID := r.URL.Query().Get("room") 123 + publicKey := r.URL.Query().Get("publicKey") 124 + 118 125 if roomID == "" { 119 126 http.Error(w, "Missing room parameter", http.StatusBadRequest) 120 127 return 121 128 } 122 129 130 + if publicKey == "" { 131 + http.Error(w, "Missing publicKey parameter", http.StatusBadRequest) 132 + return 133 + } 134 + 123 135 // Get or create the room in the auth database 124 136 err := GetOrCreateRoom(roomID) 125 137 if err != nil { ··· 128 140 return 129 141 } 130 142 143 + // Determine access level 144 + access := determineAccess(roomID, publicKey) 145 + 146 + // Upgrade WebSocket 131 147 conn, err := upgrader.Upgrade(w, r, nil) 132 148 if err != nil { 133 149 log.Println("Error upgrading connection:", err) 134 150 return 135 151 } 136 152 137 - // Add client to room 153 + // Send immediate room status 154 + sendRoomStatus(conn, access) 155 + 156 + // Close connection if no read access 157 + if access == "none" || access == "no_room" { 158 + log.Printf("Closing connection for user with access: %s", access) 159 + conn.Close() 160 + return 161 + } 162 + 163 + // Add client to room only if authorized 138 164 addClientToRoom(roomID, conn) 139 165 140 166 // Create database connection for this room ··· 163 189 removeClientFromRoom(roomID, conn) 164 190 }() 165 191 166 - // We don't automatically send initial changes anymore 167 - // The client will request them with a pull message that includes requestUnsyncedChanges 168 - 169 192 // Handle incoming messages 170 193 for { 171 194 _, message, err := conn.ReadMessage() ··· 187 210 } 188 211 189 212 switch msgType { 213 + case "sync_request": 214 + log.Printf("Client in room %s requested sync", roomID) 215 + var syncMsg SyncRequestMessage 216 + syncData, _ := json.Marshal(msg) 217 + json.Unmarshal(syncData, &syncMsg) 218 + 219 + log.Printf("Sync request with site_id: %s, contiguous_up_to: %d.%d, max_version_seen: %d.%d", 220 + syncMsg.SiteID, syncMsg.ContiguousUpTo.DBVersion, syncMsg.ContiguousUpTo.ColVersion, 221 + syncMsg.MaxVersionSeen.DBVersion, syncMsg.MaxVersionSeen.ColVersion) 222 + 223 + // Verify publicKey matches 224 + if syncMsg.PublicKey != publicKey { 225 + log.Printf("PublicKey mismatch in sync request") 226 + sendError(conn, "AUTH_FAILED", "PublicKey mismatch") 227 + continue 228 + } 229 + 230 + // Get current server version pair 231 + serverMaxVersionPair, err := getLatestDBVersionCol(db) 232 + if err != nil { 233 + log.Printf("Error getting server's latest version pair: %v", err) 234 + serverMaxVersionPair = VersionColPair{DBVersion: 0, ColVersion: 0} 235 + } 236 + log.Printf("Server max version: %d.%d, client max version seen: %d.%d", 237 + serverMaxVersionPair.DBVersion, serverMaxVersionPair.ColVersion, 238 + syncMsg.MaxVersionSeen.DBVersion, syncMsg.MaxVersionSeen.ColVersion) 239 + 240 + // Check if client has newer data than server 241 + if compareVersionPairs(syncMsg.MaxVersionSeen, serverMaxVersionPair) > 0 { 242 + log.Printf("Client has higher version (%d.%d) than server (%d.%d). Requesting changes.", 243 + syncMsg.MaxVersionSeen.DBVersion, syncMsg.MaxVersionSeen.ColVersion, 244 + serverMaxVersionPair.DBVersion, serverMaxVersionPair.ColVersion) 245 + 246 + // Send a request_changes message to the client 247 + requestChangesMsg := RequestChangesMessage{ 248 + Type: "request_changes", 249 + RoomID: roomID, 250 + Version: serverMaxVersionPair, 251 + } 252 + 253 + requestJSON, _ := json.Marshal(requestChangesMsg) 254 + conn.WriteMessage(websocket.TextMessage, requestJSON) 255 + } 256 + 257 + // Always send sync_response with any changes the server has for the client 258 + changes := getChangesForSyncRequest(db, syncMsg) 259 + 260 + response := SyncResponseMessage{ 261 + Type: "sync_response", 262 + CurrentMaxVersion: serverMaxVersionPair, 263 + Changes: changes, 264 + } 265 + responseJSON, _ := json.Marshal(response) 266 + conn.WriteMessage(websocket.TextMessage, responseJSON) 267 + 190 268 case "pull": 191 - log.Printf("Client in room %s requested pull", roomID) 269 + // Legacy support for v1 protocol 270 + log.Printf("Client in room %s requested pull (legacy)", roomID) 192 271 var pullMsg PullMessage 193 272 pullData, _ := json.Marshal(msg) 194 273 json.Unmarshal(pullData, &pullMsg) ··· 211 290 requestChangesMsg := RequestChangesMessage{ 212 291 Type: "request_changes", 213 292 RoomID: roomID, 214 - Version: serverLatestVersion, 293 + Version: VersionColPair{DBVersion: serverLatestVersion, ColVersion: 0}, 215 294 } 216 295 217 296 requestJSON, _ := json.Marshal(requestChangesMsg) ··· 229 308 230 309 case "changes": 231 310 log.Printf("Received changes from client in room %s", roomID) 232 - if publicKey, hasKey := msg["publicKey"].(string); hasKey { 233 - log.Printf("Changes are authenticated with public key: %s...", publicKey[:20]) 311 + 312 + // Check write permission 313 + if access != "write" { 314 + log.Printf("User has no write permission for room %s", roomID) 315 + sendError(conn, "PERMISSION_DENIED", "No write permission") 316 + continue 317 + } 318 + 319 + // For signed changes, verify the signature 320 + if msgPublicKey, hasKey := msg["publicKey"].(string); hasKey { 321 + log.Printf("Changes are authenticated with public key: %s...", msgPublicKey[:20]) 322 + 323 + // Verify publicKey matches connection 324 + if msgPublicKey != publicKey { 325 + log.Printf("PublicKey mismatch in changes message") 326 + sendError(conn, "AUTH_FAILED", "PublicKey mismatch") 327 + continue 328 + } 329 + 330 + // TODO: Verify signature when ECDSA is implemented 331 + if signature, hasSig := msg["signature"].(string); hasSig && requireAuth { 332 + dataStr := "" 333 + if data, ok := msg["data"]; ok { 334 + dataBytes, _ := json.Marshal(data) 335 + dataStr = string(dataBytes) 336 + } 337 + 338 + isValid, err := VerifySignature(msgPublicKey, "changes:"+dataStr, signature) 339 + if err != nil || !isValid { 340 + log.Printf("Signature verification failed") 341 + sendError(conn, "AUTH_FAILED", "Invalid signature") 342 + continue 343 + } 344 + } 234 345 } 235 346 236 347 if data, ok := msg["data"].([]interface{}); ok { ··· 315 426 conn.WriteMessage(websocket.TextMessage, responseJSON) 316 427 } 317 428 318 - // getLatestDBVersion gets the latest db_version from the database 429 + // getLatestDBVersion gets the latest db_version from the database (legacy) 319 430 func getLatestDBVersion(db *sql.DB) (int, error) { 320 431 // Query the maximum db_version 321 432 row := db.QueryRow("SELECT MAX(db_version) FROM crsql_changes") ··· 333 444 return int(version.Int64), nil 334 445 } 335 446 447 + // getLatestDBVersionCol gets the latest (db_version, col_version) pair from the database 448 + func getLatestDBVersionCol(db *sql.DB) (VersionColPair, error) { 449 + // First check if there are any changes at all 450 + var count int 451 + err := db.QueryRow("SELECT COUNT(*) FROM crsql_changes").Scan(&count) 452 + if err != nil { 453 + return VersionColPair{DBVersion: 0, ColVersion: 0}, err 454 + } 455 + 456 + // If no changes exist, return 0,0 457 + if count == 0 { 458 + return VersionColPair{DBVersion: 0, ColVersion: 0}, nil 459 + } 460 + 461 + // Query the maximum db_version and its maximum col_version 462 + row := db.QueryRow(` 463 + SELECT db_version, col_version 464 + FROM crsql_changes 465 + ORDER BY db_version DESC, col_version DESC 466 + LIMIT 1 467 + `) 468 + 469 + var dbVersion, colVersion sql.NullInt64 470 + if err := row.Scan(&dbVersion, &colVersion); err != nil { 471 + if err == sql.ErrNoRows { 472 + return VersionColPair{DBVersion: 0, ColVersion: 0}, nil 473 + } 474 + return VersionColPair{DBVersion: 0, ColVersion: 0}, err 475 + } 476 + 477 + // If there are no rows or the values are null, return 0,0 478 + if !dbVersion.Valid || !colVersion.Valid { 479 + return VersionColPair{DBVersion: 0, ColVersion: 0}, nil 480 + } 481 + 482 + return VersionColPair{DBVersion: int(dbVersion.Int64), ColVersion: int(colVersion.Int64)}, nil 483 + } 484 + 485 + // compareVersionPairs compares two version pairs, returns -1, 0, or 1 486 + func compareVersionPairs(a, b VersionColPair) int { 487 + if a.DBVersion != b.DBVersion { 488 + if a.DBVersion < b.DBVersion { 489 + return -1 490 + } 491 + return 1 492 + } 493 + if a.ColVersion != b.ColVersion { 494 + if a.ColVersion < b.ColVersion { 495 + return -1 496 + } 497 + return 1 498 + } 499 + return 0 500 + } 501 + 336 502 func getChangesFromDB(db *sql.DB, siteID string, version int) []map[string]interface{} { 337 503 // Decode the site_id from base64 338 504 decodedSiteID, err := decodeBase64(siteID) ··· 444 610 } 445 611 446 612 447 - // PullMessage represents the structure of a 'pull' type message from clients 613 + // PullMessage represents the structure of a 'pull' type message from clients (legacy v1) 448 614 type PullMessage struct { 449 615 Type string `json:"type"` 450 616 SiteID string `json:"site_id"` ··· 453 619 454 620 // RequestChangesMessage represents a request from server to client to send their changes 455 621 type RequestChangesMessage struct { 622 + Type string `json:"type"` 623 + RoomID string `json:"room_id"` 624 + Version VersionColPair `json:"version"` 625 + } 626 + 627 + // VersionColPair represents a (db_version, col_version) pair 628 + type VersionColPair struct { 629 + DBVersion int `json:"db_version"` 630 + ColVersion int `json:"col_version"` 631 + } 632 + 633 + // MissingVersionRange represents missing col_versions for a specific db_version 634 + type MissingVersionRange struct { 635 + DBVersion int `json:"db_version"` 636 + ColVersions []int `json:"col_versions"` 637 + } 638 + 639 + // VersionRange represents a missing version range (legacy - kept for compatibility) 640 + type VersionRange struct { 641 + Start int `json:"start"` 642 + End int `json:"end"` 643 + } 644 + 645 + // SyncRequestMessage represents the v2 sync request with version ranges 646 + type SyncRequestMessage struct { 647 + Type string `json:"type"` 648 + SiteID string `json:"site_id"` 649 + ContiguousUpTo VersionColPair `json:"contiguous_up_to"` 650 + MissingRanges []MissingVersionRange `json:"missing_ranges"` 651 + MaxVersionSeen VersionColPair `json:"max_version_seen"` 652 + PublicKey string `json:"publicKey"` 653 + } 654 + 655 + // SyncResponseMessage represents the server response to sync request 656 + type SyncResponseMessage struct { 657 + Type string `json:"type"` 658 + CurrentMaxVersion VersionColPair `json:"current_max_version"` 659 + Changes []map[string]interface{} `json:"changes"` 660 + } 661 + 662 + // RoomStatusMessage represents immediate room status on connection 663 + type RoomStatusMessage struct { 664 + Type string `json:"type"` 665 + Access string `json:"access"` 666 + } 667 + 668 + // ErrorMessage represents structured error responses 669 + type ErrorMessage struct { 456 670 Type string `json:"type"` 457 - RoomID string `json:"room_id"` 458 - Version int `json:"version"` 671 + Code string `json:"code"` 672 + Message string `json:"message"` 459 673 } 460 674 461 675 // AuthRequest is the structure for authentication verification requests ··· 559 773 json.NewEncoder(w).Encode(response) 560 774 } 561 775 776 + // determineAccess determines the access level for a user connecting to a room 777 + func determineAccess(roomID, publicKey string) string { 778 + logKey := publicKey 779 + if len(logKey) > 20 { 780 + logKey = publicKey[:20] + "..." 781 + } 782 + log.Printf("Determining access for publicKey %s in room %s", logKey, roomID) 783 + 784 + // Check if room exists 785 + roomExists, err := CheckRoomExists(roomID) 786 + if err != nil { 787 + log.Printf("Error checking room existence: %v", err) 788 + return "none" 789 + } 790 + 791 + if !roomExists { 792 + log.Printf("Room %s does not exist", roomID) 793 + if !requireAuth { 794 + // Development mode - auto-create room and grant permissions 795 + log.Printf("Auto-creating room %s and granting permissions", roomID) 796 + if err := GetOrCreateRoom(roomID); err != nil { 797 + log.Printf("Error auto-creating room: %v", err) 798 + return "no_room" 799 + } 800 + if err := AutoGrantInvitePermissions(roomID, publicKey); err != nil { 801 + log.Printf("Error auto-granting permissions: %v", err) 802 + return "no_room" 803 + } 804 + return "write" 805 + } 806 + return "no_room" 807 + } 808 + 809 + // Room exists - check permissions 810 + if !requireAuth { 811 + // Development mode - auto-grant permissions for existing rooms 812 + log.Printf("Auto-granting permissions for existing room %s", roomID) 813 + if err := AutoGrantInvitePermissions(roomID, publicKey); err != nil { 814 + log.Printf("Error auto-granting permissions: %v", err) 815 + } 816 + return "write" 817 + } 818 + 819 + // Production mode - check actual permissions 820 + hasRead, err := CheckKeyPermission(roomID, publicKey, "read") 821 + if err != nil { 822 + log.Printf("Error checking read permission: %v", err) 823 + return "none" 824 + } 825 + 826 + if !hasRead { 827 + return "none" 828 + } 829 + 830 + hasWrite, err := CheckKeyPermission(roomID, publicKey, "write") 831 + if err != nil { 832 + log.Printf("Error checking write permission: %v", err) 833 + return "read" 834 + } 835 + 836 + if hasWrite { 837 + return "write" 838 + } 839 + 840 + return "read" 841 + } 842 + 843 + // sendRoomStatus sends immediate room status to client 844 + func sendRoomStatus(conn *websocket.Conn, access string) { 845 + status := RoomStatusMessage{ 846 + Type: "room_status", 847 + Access: access, 848 + } 849 + 850 + statusJSON, _ := json.Marshal(status) 851 + conn.WriteMessage(websocket.TextMessage, statusJSON) 852 + log.Printf("Sent room status: %s", access) 853 + } 854 + 855 + // sendError sends structured error response to client 856 + func sendError(conn *websocket.Conn, code, message string) { 857 + errorMsg := ErrorMessage{ 858 + Type: "error", 859 + Code: code, 860 + Message: message, 861 + } 862 + 863 + errorJSON, _ := json.Marshal(errorMsg) 864 + conn.WriteMessage(websocket.TextMessage, errorJSON) 865 + log.Printf("Sent error: %s - %s", code, message) 866 + } 867 + 868 + // getChangesForSyncRequest gets changes for missing ranges + new changes 869 + func getChangesForSyncRequest(db *sql.DB, syncMsg SyncRequestMessage) []map[string]interface{} { 870 + decodedSiteID, err := decodeBase64(syncMsg.SiteID) 871 + if err != nil { 872 + log.Printf("Error decoding site_id '%s': %v", syncMsg.SiteID, err) 873 + decodedSiteID = []byte{} 874 + } 875 + 876 + // Build query for missing (db_version, col_version) pairs + new changes 877 + var queryParts []string 878 + var args []interface{} 879 + 880 + // Add site_id filter 881 + baseCond := "site_id != ?" 882 + args = append(args, decodedSiteID) 883 + 884 + // Add missing ranges - for each db_version, get specific missing col_versions 885 + for _, vrange := range syncMsg.MissingRanges { 886 + if len(vrange.ColVersions) > 0 { 887 + // Create placeholders for the col_versions 888 + placeholders := make([]string, len(vrange.ColVersions)) 889 + for i := range placeholders { 890 + placeholders[i] = "?" 891 + args = append(args, vrange.ColVersions[i]) 892 + } 893 + 894 + // Add condition for this db_version with specific col_versions 895 + queryParts = append(queryParts, 896 + fmt.Sprintf("(db_version = ? AND col_version IN (%s))", 897 + strings.Join(placeholders, ","))) 898 + args = append(args, vrange.DBVersion) 899 + } 900 + } 901 + 902 + // Add new changes beyond max version seen 903 + // This includes: db_version > max_db_version OR (db_version = max_db_version AND col_version > max_col_version) 904 + queryParts = append(queryParts, 905 + "(db_version > ? OR (db_version = ? AND col_version > ?))") 906 + args = append(args, syncMsg.MaxVersionSeen.DBVersion, 907 + syncMsg.MaxVersionSeen.DBVersion, syncMsg.MaxVersionSeen.ColVersion) 908 + 909 + // Combine all conditions 910 + whereClause := baseCond 911 + if len(queryParts) > 0 { 912 + whereClause += " AND (" + strings.Join(queryParts, " OR ") + ")" 913 + } 914 + 915 + query := "SELECT * FROM crsql_changes WHERE " + whereClause + " ORDER BY db_version ASC, col_version ASC" 916 + 917 + log.Printf("Executing sync query with %d missing ranges and max_version %d.%d", 918 + len(syncMsg.MissingRanges), syncMsg.MaxVersionSeen.DBVersion, syncMsg.MaxVersionSeen.ColVersion) 919 + 920 + rows, err := db.Query(query, args...) 921 + if err != nil { 922 + log.Printf("Error querying changes for sync: %v", err) 923 + return nil 924 + } 925 + defer rows.Close() 926 + 927 + var changes []map[string]interface{} 928 + 929 + for rows.Next() { 930 + var tableName string 931 + var pk []byte 932 + var columnName string 933 + var value interface{} 934 + var colVersion, dbVersion int64 935 + var siteID []byte 936 + var cl, seq int64 937 + 938 + if err := rows.Scan(&tableName, &pk, &columnName, &value, &colVersion, &dbVersion, &siteID, &cl, &seq); err != nil { 939 + log.Printf("Error scanning row: %v", err) 940 + continue 941 + } 942 + 943 + change := map[string]interface{}{ 944 + "TableName": tableName, 945 + "PK": encodeToBase64(pk), 946 + "ColumnName": columnName, 947 + "Value": value, 948 + "ColVersion": colVersion, 949 + "DBVersion": dbVersion, 950 + "SiteID": encodeToBase64(siteID), 951 + "CL": cl, 952 + "Seq": seq, 953 + } 954 + 955 + changes = append(changes, change) 956 + } 957 + 958 + log.Printf("Returning %d changes for sync request", len(changes)) 959 + return changes 960 + } 961 + 562 962 func main() { 963 + // Log startup configuration 964 + log.Printf("Starting server with REQUIRE_AUTH=%v", requireAuth) 965 + 563 966 // Register SQLite with CR-SQLite extension 564 967 sql.Register("sqlite3_with_extensions", &sqlite3.SQLiteDriver{ 565 968 Extensions: []string{"../db/crsqlite"},