this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: incremental and initial sync

+131 -33
+53 -13
mast-react-vite/src/worker/sync-worker.ts
··· 136 136 const siteId = siteIdResult[0].site_id; 137 137 logDebug(`Site ID: ${Array.from(siteId)}`); 138 138 139 + // Get the highest db_version from crsql_changes table 140 + let lastSyncVersion = 0; 141 + try { 142 + const versionResult = await db.execO<{max_version: number}[]>( 143 + "SELECT MAX(db_version) as max_version FROM crsql_changes" 144 + ); 145 + if (versionResult.length > 0 && versionResult[0].max_version !== null) { 146 + lastSyncVersion = versionResult[0].max_version; 147 + logDebug(`Retrieved lastSyncVersion ${lastSyncVersion} from database`); 148 + } 149 + } catch (error) { 150 + logError(`Error retrieving lastSyncVersion:`, error); 151 + } 152 + 139 153 // Store or update connection info 140 154 if (!connections[dbname]) { 141 155 connections[dbname] = { 142 156 ws: null, 143 157 db, 144 - lastSyncVersion: 0, 158 + lastSyncVersion, 145 159 siteId, 146 160 room: config.room, 147 161 url: config.url, ··· 150 164 } else { 151 165 connections[dbname].db = db; 152 166 connections[dbname].siteId = siteId; 167 + connections[dbname].lastSyncVersion = lastSyncVersion; 153 168 connections[dbname].room = config.room; 154 169 connections[dbname].url = config.url; 155 170 connections[dbname].isConnecting = true; ··· 162 177 wsUrl = `${wsUrl}${separator}room=${config.room}`; 163 178 } 164 179 165 - // Store the requestUnsyncedChanges flag in the connection 166 - connections[dbname].requestUnsyncedChanges = config.requestUnsyncedChanges; 167 180 logDebug(`Connecting to WebSocket at ${wsUrl}`); 168 181 169 182 const ws = new WebSocket(wsUrl); 170 183 171 - // Set up WebSocket event handlers 172 184 ws.onopen = () => { 173 185 logDebug(`WebSocket connected for ${dbname} (state: ${ws.readyState})`); 174 186 connections[dbname].isConnecting = false; ··· 185 197 }); 186 198 187 199 // Initial sync - request changes from server 188 - logDebug(`Sending initial pull request with requestUnsyncedChanges=${connections[dbname].requestUnsyncedChanges}`); 200 + const connection = connections[dbname]; 201 + 202 + // Encode siteId to base64 for transmission 203 + const encodedSiteId = siteId instanceof Uint8Array 204 + ? btoa(String.fromCharCode.apply(null, siteId)) 205 + : btoa(String(siteId)); 206 + 207 + logDebug(`Requesting pull with siteId: ${Array.from(siteId)}, encoded: ${encodedSiteId}, version: ${connection.lastSyncVersion}`) 189 208 ws.send(JSON.stringify({ 190 209 type: "pull", 191 210 room: config.room, 192 - requestUnsyncedChanges: connections[dbname].requestUnsyncedChanges // Request any unsynced changes from server if specified 211 + site_id: encodedSiteId, 212 + version: connection.lastSyncVersion 193 213 })); 194 214 195 215 // Notify main thread of connection ··· 202 222 logDebug(`WebSocket state when receiving: ${ws.readyState}`); 203 223 const message = JSON.parse(event.data); 204 224 205 - if (message.type === "changes" && Array.isArray(message.data)) { 206 - logDebug(`Received ${message.data.length} changes from server`); 207 - await applyChanges(dbname, message.data); 225 + switch (message.type) { 226 + case "changes": 227 + if (Array.isArray(message.data)) { 228 + logDebug(`Received ${message.data.length} changes from server`); 229 + await applyChanges(dbname, message.data); 230 + } 231 + break; 232 + 233 + case "request_changes": 234 + logDebug(`Server requested changes since version ${message.version}`); 235 + 236 + try { 237 + await sendChanges(dbname, message.version); 238 + } catch (error) { 239 + logError(`Error responding to request_changes:`, error); 240 + } 241 + break; 242 + 243 + default: 244 + logDebug(`Received unknown message type: ${message.type}`); 208 245 } 209 246 } catch (error) { 210 247 logError(`Error processing message:`, error); ··· 264 301 } 265 302 266 303 // Send changes to the server 267 - async function sendChanges(dbname: string) { 304 + // If version is provided, sends changes after that version, otherwise uses connection.lastSyncVersion 305 + async function sendChanges(dbname: string, version?: number) { 268 306 logDebug(`Attempting to send changes for ${dbname}`); 269 307 270 308 const connection = connections[dbname]; ··· 282 320 logDebug(`Connection and WebSocket OK - proceeding with changes`); 283 321 284 322 try { 285 - logDebug(`Querying for changes since version ${connection.lastSyncVersion}`); 323 + // Use provided version or fall back to connection's lastSyncVersion 324 + const syncVersion = version !== undefined ? version : connection.lastSyncVersion; 325 + logDebug(`Querying for changes since version ${syncVersion}`); 286 326 287 - // Query for changes since last sync 327 + // Query for changes since specified version 288 328 const changes = await connection.db.execA( 289 329 `SELECT * FROM crsql_changes WHERE db_version > ? AND site_id = crsql_site_id()`, 290 - [connection.lastSyncVersion] 330 + [syncVersion] 291 331 ); 292 332 293 333 if (changes.length === 0) {
+78 -20
server/main.go
··· 1 1 package main 2 2 3 3 import ( 4 - "encoding/base64" 5 4 "database/sql" 5 + "encoding/base64" 6 6 "encoding/json" 7 7 "log" 8 8 "net/http" ··· 188 188 189 189 switch msgType { 190 190 case "pull": 191 - // Client is requesting changes 192 191 log.Printf("Client in room %s requested pull", roomID) 193 - changes := getChangesFromDB(db, msg) 194 - response := map[string]interface{}{ 195 - "type": "changes", 196 - "data": changes, 192 + var pullMsg PullMessage 193 + pullData, _ := json.Marshal(msg) 194 + json.Unmarshal(pullData, &pullMsg) 195 + 196 + log.Printf("Pull request with site_id: %s, version: %d", pullMsg.SiteID, pullMsg.Version) 197 + 198 + // Get the server's latest db_version 199 + serverLatestVersion, err := getLatestDBVersion(db) 200 + if err != nil { 201 + log.Printf("Error getting server's latest db_version: %v", err) 202 + serverLatestVersion = 0 203 + } 204 + 205 + // Check if client has a higher version than the server 206 + if pullMsg.Version > serverLatestVersion { 207 + log.Printf("Client has higher version (%d) than server (%d). Requesting changes.", 208 + pullMsg.Version, serverLatestVersion) 209 + 210 + // Send a request_changes message to the client 211 + requestChangesMsg := RequestChangesMessage{ 212 + Type: "request_changes", 213 + RoomID: roomID, 214 + Version: serverLatestVersion, 215 + } 216 + 217 + requestJSON, _ := json.Marshal(requestChangesMsg) 218 + conn.WriteMessage(websocket.TextMessage, requestJSON) 219 + } else { 220 + // Only send changes if we're newer 221 + changes := getChangesFromDB(db, pullMsg.SiteID, pullMsg.Version) 222 + response := map[string]interface{}{ 223 + "type": "changes", 224 + "data": changes, 225 + } 226 + responseJSON, _ := json.Marshal(response) 227 + conn.WriteMessage(websocket.TextMessage, responseJSON) 197 228 } 198 - responseJSON, _ := json.Marshal(response) 199 - conn.WriteMessage(websocket.TextMessage, responseJSON) 200 229 201 230 case "changes": 202 - // Client is sending changes 203 231 log.Printf("Received changes from client in room %s", roomID) 204 232 if publicKey, hasKey := msg["publicKey"].(string); hasKey { 205 233 log.Printf("Changes are authenticated with public key: %s...", publicKey[:20]) ··· 209 237 log.Printf("Processing %d changes from client", len(data)) 210 238 applyChangesToDB(db, data) 211 239 212 - // Broadcast changes to other clients in the same room 213 240 log.Printf("Broadcasting changes to other clients in room %s", roomID) 214 241 broadcastToRoom(roomID, conn, message) 215 242 log.Printf("Broadcast completed for room %s", roomID) ··· 288 315 conn.WriteMessage(websocket.TextMessage, responseJSON) 289 316 } 290 317 291 - func getChangesFromDB(db *sql.DB, msg map[string]interface{}) []map[string]interface{} { 292 - // Check if client is requesting unsynced changes 293 - requestUnsyncedChanges, _ := msg["requestUnsyncedChanges"].(bool) 294 - log.Printf("Client requesting changes. Unsynced changes requested: %v", requestUnsyncedChanges) 318 + // getLatestDBVersion gets the latest db_version from the database 319 + func getLatestDBVersion(db *sql.DB) (int, error) { 320 + // Query the maximum db_version 321 + row := db.QueryRow("SELECT MAX(db_version) FROM crsql_changes") 295 322 296 - // Default query gets all changes 297 - query := "SELECT * FROM crsql_changes" 323 + var version sql.NullInt64 324 + if err := row.Scan(&version); err != nil { 325 + return 0, err 326 + } 327 + 328 + // If there are no rows or the value is null, return 0 329 + if !version.Valid { 330 + return 0, nil 331 + } 332 + 333 + return int(version.Int64), nil 334 + } 335 + 336 + func getChangesFromDB(db *sql.DB, siteID string, version int) []map[string]interface{} { 337 + // Decode the site_id from base64 338 + decodedSiteID, err := decodeBase64(siteID) 339 + if err != nil { 340 + log.Printf("Error decoding site_id '%s': %v", siteID, err) 341 + decodedSiteID = []byte{} 342 + } 298 343 299 - // If we need to send all unsynced changes, no need to filter 300 - // But in a more sophisticated implementation, we could filter based on timestamps 301 - // or some other mechanism to determine what's "unsynced" 344 + log.Printf("Querying for changes with site_id != %v AND db_version > %d", decodedSiteID, version) 345 + query := "SELECT * FROM crsql_changes WHERE site_id != ? AND db_version > ?" 302 346 303 - rows, err := db.Query(query) 347 + rows, err := db.Query(query, decodedSiteID, version) 304 348 if err != nil { 305 349 log.Println("Error querying changes:", err) 306 350 return nil ··· 399 443 return nil 400 444 } 401 445 446 + 447 + // PullMessage represents the structure of a 'pull' type message from clients 448 + type PullMessage struct { 449 + Type string `json:"type"` 450 + SiteID string `json:"site_id"` 451 + Version int `json:"version"` 452 + } 453 + 454 + // RequestChangesMessage represents a request from server to client to send their changes 455 + type RequestChangesMessage struct { 456 + Type string `json:"type"` 457 + RoomID string `json:"room_id"` 458 + Version int `json:"version"` 459 + } 402 460 403 461 // AuthRequest is the structure for authentication verification requests 404 462 type AuthRequest struct {