this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: web sync working client->server

+1056 -299
+72 -137
mast-react-vite/src/App.tsx
··· 1 - import { useState, useEffect, useMemo } from "react"; 1 + import { useState, useEffect, useMemo, useRef } from "react"; 2 2 import { useQuery } from "@vlcn.io/react"; 3 - import { ColumnDef } from "@tanstack/react-table"; 4 - import { DataTable } from "@/components/ui/data-table"; 5 3 import * as commandParser from "@/lib/command_js.js"; 6 4 import { Checkbox } from "@/components/ui/checkbox"; 7 5 import { ActionParser } from "@/components/ui/action-parser"; 8 6 import { useSelection } from "@/contexts/selection-context"; 9 7 import { SidebarProvider, SidebarTrigger } from "@/components/ui/sidebar"; 10 8 import { AppSidebar } from "@/components/ui/app-sidebar"; 9 + import { DataTable } from "@/components/ui/data-table"; 11 10 import { Badge } from "@/components/ui/badge"; 12 11 13 12 type Todo = { ··· 18 17 project: string; 19 18 }; 20 19 21 - export const columns: ColumnDef<Todo>[] = [ 22 - { 23 - id: "select", 24 - header: ({ table }) => ( 25 - <div className="flex items-center gap-2"> 26 - <p>0</p> 27 - <Checkbox 28 - checked={ 29 - table.getIsAllPageRowsSelected() || 30 - (table.getIsSomePageRowsSelected() && "indeterminate") 31 - } 32 - onCheckedChange={(value) => table.toggleAllPageRowsSelected(!!value)} 33 - aria-label="Select all" 34 - className="translate-y-[2px]" 35 - /> 36 - </div> 37 - ), 38 - cell: ({ row, table }) => ( 39 - <div className="flex items-center gap-2"> 40 - <p>{row.index + 1}</p> 41 - <Checkbox 42 - checked={row.getIsSelected()} 43 - onCheckedChange={(value) => { 44 - row.toggleSelected(!!value); 45 - // Get setNewText from table meta 46 - const setNewText = table.options.meta?.setNewText; 47 - const NewText = table.options.meta?.newText; 48 - if (setNewText && value === true) { 49 - const calculatedRowId = row.index + 1; 50 - setNewText(NewText + calculatedRowId + ","); 51 - } else { 52 - // TODO: 53 - // We need to _remove_ the value here 54 - // That probably means taking the NewText 55 - // Removing the first instance of "row.index," 56 - // And then setting that with setNewText 57 - } 58 - }} 59 - aria-label="Select row" 60 - className="translate-y-[2px]" 61 - /> 62 - </div> 63 - ), 64 - enableSorting: false, 65 - enableHiding: false, 66 - size: 20, 67 - }, 68 - { 69 - accessorKey: "description", 70 - header: "Description", 71 - }, 72 - { 73 - accessorKey: "project", 74 - header: "Project", 75 - }, 76 - { 77 - accessorKey: "tags", 78 - header: "Tags", 79 - cell: ({ row }) => { 80 - const formatted = JSON.parse(row.getValue("tags")); 81 - return <div className="font-medium">{formatted.join(", ")}</div>; 82 - }, 83 - }, 84 - ]; 85 - 86 - function App({ ctx }) { 20 + function App({ ctx, syncWorker, dbname }) { 87 21 const [newText, setNewText] = useState(""); 88 22 const { selectedItems, clearSelection, getSelectionString } = useSelection(); 89 23 const [currentAction, setCurrentAction] = useState("add"); 90 24 const [filterContext, setFilterContext] = useState({}); 91 - 92 25 const newConditions = []; 93 26 const newParams = []; 94 27 ··· 99 32 newConditions.push("description LIKE ?"); 100 33 newParams.push(`%${filterContext.filterDescription}%`); 101 34 } 35 + 102 36 if (filterContext.filterProject && filterContext.filterProject.length > 0) { 103 37 newConditions.push("project = ?"); 104 38 newParams.push(filterContext.filterProject); 105 39 } 40 + 106 41 if (filterContext.filterTags && filterContext.filterTags.length > 0) { 107 42 newConditions.push(`EXISTS ( 108 43 SELECT 1 FROM json_each(tags) ··· 126 61 `SELECT * FROM active_todos ${whereClause ? "WHERE " + whereClause : ""}`, 127 62 newParams, 128 63 ).data; 64 + 65 + // Trigger sync when todos change 66 + useEffect(() => { 67 + if (syncWorker && todos.length > 0) { 68 + syncWorker.postMessage({ 69 + type: 'SYNC_CHANGES', 70 + dbname 71 + }); 72 + } 73 + }, [todos, syncWorker]); 129 74 130 75 useEffect(() => { 131 76 if ( ··· 145 90 ); 146 91 } catch (error) { 147 92 console.log("Could not parse command"); 148 - console.log( getSelectionString() + " " + currentAction + " " + newText) 93 + console.log(getSelectionString() + " " + currentAction + " " + newText); 149 94 return {}; 150 95 } 151 96 }, [getSelectionString(), currentAction, newText]); ··· 164 109 working_id: 0, 165 110 }; 166 111 return [newTodo, ...todos]; 167 - 168 112 case "done": 169 113 return todos.map((todo) => { 170 114 if (selectedItems.has(todo.working_id)) { ··· 172 116 } 173 117 return todo; 174 118 }); 175 - 176 - case "modify": 177 - return todos.map((todo) => { 178 - if (selectedItems.has(todo.working_id)) { 179 - // Create a preview object that contains the original todo and the changes 180 - const previewChanges = {}; 181 - 182 - // Only add fields that are being changed to the preview 183 - if (parsedCommand.description) { 184 - previewChanges.description = parsedCommand.description; 185 - } 186 - 187 - if (parsedCommand.project) { 188 - previewChanges.project = parsedCommand.project; 189 - } 190 - 191 - if (parsedCommand.tags && parsedCommand.tags.length > 0) { 192 - // For tags, we want to show both existing and new tags in preview 193 - previewChanges.tags = JSON.stringify([ 194 - ...parsedCommand.tags 195 - ]); 196 - } 197 - 198 - // Return the original todo with preview changes attached 199 - return { 200 - ...todo, 201 - preview: previewChanges, 202 - previewMode: true 203 - }; 204 - } 205 - return todo; 119 + case "modify": 120 + return todos.map((todo) => { 121 + if (selectedItems.has(todo.working_id)) { 122 + // Create a preview object that contains the original todo and the changes 123 + const previewChanges = {}; 124 + 125 + // Only add fields that are being changed to the preview 126 + if (parsedCommand.description) { 127 + previewChanges.description = parsedCommand.description; 128 + } 129 + 130 + if (parsedCommand.project) { 131 + previewChanges.project = parsedCommand.project; 132 + } 133 + 134 + if (parsedCommand.tags && parsedCommand.tags.length > 0) { 135 + // For tags, we want to show both existing and new tags in preview 136 + previewChanges.tags = JSON.stringify([ 137 + ...parsedCommand.tags 138 + ]); 139 + } 140 + 141 + // Return the original todo with preview changes attached 142 + return { 143 + ...todo, 144 + preview: previewChanges, 145 + previewMode: true 146 + }; 147 + } 148 + return todo; 206 149 }); 207 - 208 - 209 150 case "filter": 210 151 // For filter, we don't need preview todos 211 152 return []; 212 - 213 153 default: 214 154 return []; 215 155 } ··· 259 199 selection.forEach((sel) => { 260 200 if (sel.type === "id") { 261 201 conditions.push(`id IN ( 262 - SELECT id 263 - FROM active_todos 264 - WHERE working_id IN (${sel.ids.map(() => "?").join(",")}) 265 - )`); 202 + SELECT id 203 + FROM active_todos 204 + WHERE working_id IN (${sel.ids.map(() => "?").join(",")}) 205 + )`); 266 206 params.push(...sel.ids); 267 207 } else if (sel.type === "tag") { 268 208 // TODO this is busted ··· 291 231 conditions: [], 292 232 params: [], 293 233 }); 294 - 295 234 if (conditions.length > 0) { 296 235 const sqlQuery = ` 297 - UPDATE todos 298 - SET completed = 1 299 - WHERE ${conditions.join(" OR ")} 300 - `; 301 - 236 + UPDATE todos 237 + SET completed = 1 238 + WHERE ${conditions.join(" OR ")} 239 + `; 302 240 ctx.db.exec(sqlQuery, params); 303 241 clearSelection(); 304 242 } ··· 313 251 case "modify": 314 252 const editParams = []; 315 253 const updates = []; 316 - 317 254 // Handle new values for SET clause 318 255 if (parsed.description && parsed.description.length > 0) { 319 256 updates.push("description = ?"); ··· 325 262 } 326 263 if (parsed.tags && parsed.tags.length > 0) { 327 264 updates.push(`tags = ( 328 - SELECT json_group_array(value) 329 - FROM ( 330 - SELECT DISTINCT value 331 - FROM ( 332 - SELECT value FROM json_each(tags) 333 - UNION 334 - SELECT value FROM json_each(json(?)) 335 - ) 336 - ) 337 - )`); 265 + SELECT json_group_array(value) 266 + FROM ( 267 + SELECT DISTINCT value 268 + FROM ( 269 + SELECT value FROM json_each(tags) 270 + UNION 271 + SELECT value FROM json_each(json(?)) 272 + ) 273 + ) 274 + )`); 338 275 editParams.push(JSON.stringify(parsed.tags)); 339 276 } 340 - 341 277 // Build WHERE clause using only selection conditions 342 - 343 278 const { conditions: editConditions, params: selectionParams } = 344 279 parseSelection({ 345 280 selection: parsed.selection, 346 281 conditions: [], 347 282 params: [], 348 283 }); 349 - 350 284 if (editConditions.length > 0 && updates.length > 0) { 351 285 const sqlQuery = ` 352 286 UPDATE todos ··· 371 305 break; 372 306 } 373 307 setNewText(""); 308 + 374 309 } catch (error) { 375 310 // TODO: 376 311 // This is actually bad ··· 386 321 <SidebarProvider defaultOpen={false} className="h-screen dark"> 387 322 <div className="hidden md:flex flex-col w-full h-svh"> 388 323 <AppSidebar ctx={ctx} 389 - filterContext={filterContext} 390 - setFilterContext={setFilterContext} /> 324 + filterContext={filterContext} 325 + setFilterContext={setFilterContext} /> 391 326 <div className="bg-muted h-14 w-full absolute top-0 " /> 392 327 <section className="flex-1 container py-12 h-[calc(100vh-theme(spacing.4))] overflow-hidden relative"> 393 328 <SidebarTrigger className="absolute top-4 left-8 border border-foreground" /> ··· 400 335 {todos.length} items found 401 336 <button 402 337 onClick={() => setFilterContext({})} 403 - className="hover:bg-muted rounded-full p-1" 404 - > 338 + className="hover:bg-muted rounded-full p-1"> 405 339 406 340 </button> 407 341 </Badge> ··· 423 357 </div> 424 358 </section> 425 359 </div> 426 - 427 360 {/* Mobile view layout - shown only on mobile */} 428 361 <div className="md:hidden flex left-0 w-full h-full flex flex-col items-center pb-2"> 429 362 <AppSidebar ctx={ctx} 430 - filterContext={filterContext} 431 - setFilterContext={setFilterContext} /> 363 + filterContext={filterContext} 364 + setFilterContext={setFilterContext} /> 432 365 <section className="flex-1 py-12 h-[calc(100vh-theme(spacing.4))] w-full overflow-hidden relative"> 433 366 <div className="flex-1 bg-muted border-b border-foreground h-14 absolute inset-x-0 top-0 " /> 434 367 <SidebarTrigger className="fixed top-4 border border-foreground left-8" /> ··· 463 396 </div> 464 397 </section> 465 398 </div> 399 + 466 400 </SidebarProvider> 467 401 </> 468 402 ); 469 403 } 470 404 471 405 export default App; 406 +
+59
mast-react-vite/src/components/ui/sync-status.tsx
··· 1 + type SyncStatusProps = { 2 + status: 'connecting' | 'connected' | 'disconnected' | 'error'; 3 + changesSent: number; 4 + changesReceived: number; 5 + lastError?: string; 6 + lastSyncTime?: Date; 7 + }; 8 + 9 + export function SyncStatus({ 10 + status, 11 + changesSent, 12 + changesReceived, 13 + lastError, 14 + lastSyncTime 15 + }: SyncStatusProps) { 16 + // Define status colors 17 + const statusColors = { 18 + connecting: 'bg-yellow-500', 19 + connected: 'bg-green-500', 20 + disconnected: 'bg-gray-500', 21 + error: 'bg-red-500' 22 + }; 23 + 24 + // Format last sync time 25 + const formattedTime = lastSyncTime 26 + ? new Intl.DateTimeFormat('en-US', { 27 + hour: '2-digit', 28 + minute: '2-digit', 29 + second: '2-digit' 30 + }).format(lastSyncTime) 31 + : 'Never'; 32 + 33 + return ( 34 + <div className="fixed bottom-4 right-4 p-3 bg-background border border-border rounded-lg shadow-lg z-50 text-sm"> 35 + <div className="flex items-center gap-2 mb-1"> 36 + <div className={`w-3 h-3 rounded-full ${statusColors[status]}`}></div> 37 + <span className="font-medium">Sync: {status}</span> 38 + </div> 39 + 40 + <div className="grid grid-cols-2 gap-x-4 gap-y-1 mt-2 text-xs text-muted-foreground"> 41 + <div>Changes sent:</div> 42 + <div>{changesSent}</div> 43 + 44 + <div>Changes received:</div> 45 + <div>{changesReceived}</div> 46 + 47 + <div>Last sync:</div> 48 + <div>{formattedTime}</div> 49 + 50 + {lastError && ( 51 + <> 52 + <div className="col-span-2 text-red-500 mt-1">Error: {lastError}</div> 53 + </> 54 + )} 55 + </div> 56 + </div> 57 + ); 58 + } 59 +
+96
mast-react-vite/src/hooks/use-sync.ts
··· 1 + import { useState, useEffect, useCallback } from 'react'; 2 + 3 + interface SyncConfig { 4 + dbname: string; 5 + room: string; 6 + endpoint: string; 7 + worker: Worker; 8 + } 9 + 10 + export function useCustomSync({ 11 + dbname, 12 + room, 13 + endpoint, 14 + worker 15 + }: SyncConfig) { 16 + const [status, setStatus] = useState<string>('disconnected'); 17 + const [changesSent, setChangesSent] = useState<number>(0); 18 + const [changesReceived, setChangesReceived] = useState<number>(0); 19 + const [lastError, setLastError] = useState<string | null>(null); 20 + const [lastSyncTime, setLastSyncTime] = useState<Date | null>(null); 21 + 22 + // Initialize sync once on component mount 23 + useEffect(() => { 24 + if (!worker) return; 25 + 26 + // Start sync 27 + worker.postMessage({ 28 + type: 'START_SYNC', 29 + dbname, 30 + config: { 31 + room: room, 32 + url: endpoint 33 + } 34 + }); 35 + 36 + // Set up message handlers 37 + const handleMessage = (event: MessageEvent) => { 38 + const { type, dbname: eventDbname, count, error } = event.data; 39 + 40 + // Only process messages for our database 41 + if (eventDbname && eventDbname !== dbname) return; 42 + 43 + switch (type) { 44 + case 'SYNC_CONNECTED': 45 + setStatus('connected'); 46 + break; 47 + case 'SYNC_DISCONNECTED': 48 + setStatus('disconnected'); 49 + break; 50 + case 'SYNC_ERROR': 51 + setStatus('error'); 52 + setLastError(error); 53 + break; 54 + case 'CHANGES_SENT': 55 + setChangesSent(prev => prev + (count || 0)); 56 + setLastSyncTime(new Date()); 57 + break; 58 + case 'CHANGES_APPLIED': 59 + setChangesReceived(prev => prev + (count || 0)); 60 + setLastSyncTime(new Date()); 61 + break; 62 + } 63 + }; 64 + 65 + worker.addEventListener('message', handleMessage); 66 + 67 + // Clean up when component unmounts 68 + return () => { 69 + worker.removeEventListener('message', handleMessage); 70 + worker.postMessage({ 71 + type: 'STOP_SYNC', 72 + dbname 73 + }); 74 + }; 75 + }, [dbname, room, endpoint, worker]); 76 + 77 + // Function to trigger sync manually 78 + const syncChanges = useCallback(() => { 79 + if (worker) { 80 + worker.postMessage({ 81 + type: 'SYNC_CHANGES', 82 + dbname 83 + }); 84 + } 85 + }, [worker, dbname]); 86 + 87 + return { 88 + status, 89 + changesSent, 90 + changesReceived, 91 + lastError, 92 + lastSyncTime, 93 + syncChanges 94 + }; 95 + } 96 +
+91 -25
mast-react-vite/src/main.tsx
··· 1 1 import { createRoot } from "react-dom/client"; 2 2 import "./index.css"; 3 3 import App from "./App.tsx"; 4 - 5 4 import { Helmet } from "react-helmet"; 6 5 import { DBAsync } from "@vlcn.io/xplat-api"; 7 6 import initWasm from "@vlcn.io/crsqlite-wasm"; 8 7 import wasmUrl from "@vlcn.io/crsqlite-wasm/crsqlite.wasm?url"; 9 8 import tblrx from "@vlcn.io/rx-tbl"; 10 - 11 9 import { SelectionProvider } from "@/contexts/selection-context"; 10 + import { useCustomSync } from "@/hooks/use-sync"; 11 + import { SyncStatus } from "@/components/ui/sync-status"; 12 + 13 + // Create a worker instance only once 14 + import SyncWorker from './worker/sync-worker.ts?worker'; 15 + const syncWorker = new SyncWorker(); 16 + 17 + // Function to get room ID from URL hash or generate a new one 18 + function getRoomId(): string { 19 + // Check URL hash for room parameter 20 + const hash = window.location.hash.substring(1); 21 + const params = new URLSearchParams(hash); 22 + const roomFromHash = params.get('room'); 23 + 24 + if (roomFromHash) { 25 + return roomFromHash; 26 + } 27 + 28 + // Check localStorage 29 + const storedRoom = localStorage.getItem("room"); 30 + if (storedRoom) { 31 + return storedRoom; 32 + } 33 + 34 + // Generate a new room ID if none exists 35 + const newRoomId = crypto.randomUUID().replaceAll("-", ""); 36 + localStorage.setItem("room", newRoomId); 37 + 38 + // Update URL with the new room ID 39 + updateUrlWithRoom(newRoomId); 40 + 41 + return newRoomId; 42 + } 43 + 44 + // Function to update URL with room ID 45 + function updateUrlWithRoom(roomId: string) { 46 + const hash = window.location.hash.substring(1); 47 + const params = new URLSearchParams(hash); 48 + params.set('room', roomId); 49 + window.location.hash = params.toString(); 50 + } 51 + 52 + // Custom sync component using your own implementation 53 + function CustomSyncComponent({ dbname, roomId }: { dbname: string, roomId: string }) { 54 + const endpoint = `ws://localhost:8080/sync`; 55 + 56 + const syncStats = useCustomSync({ 57 + dbname, 58 + room: roomId, 59 + endpoint, 60 + worker: syncWorker 61 + }); 62 + 63 + return <SyncStatus {...syncStats} />; 64 + } 12 65 13 66 const initDb = async () => { 14 67 const sqlite = await initWasm(() => wasmUrl); 15 - const db: DBAsync = await sqlite.open("todo.db"); 68 + 69 + // Get or create room ID 70 + const roomId = getRoomId(); 71 + 72 + // Use room ID in database name to isolate data per room 73 + const dbname = `todo-${roomId}.db`; 74 + const db: DBAsync = await sqlite.open(dbname); 16 75 await db.exec(` 17 - CREATE TABLE IF NOT EXISTS todos ( 18 - id BLOB PRIMARY KEY NOT NULL, 19 - description TEXT, 76 + CREATE TABLE IF NOT EXISTS todos ( 77 + id BLOB PRIMARY KEY NOT NULL, 78 + description TEXT, 20 79 project text, 21 80 tags text, 22 81 due text, 23 82 wait text, 24 83 priority text, 25 84 urgency real, 26 - completed INTEGER NOT NULL DEFAULT 0 27 - ); 28 - SELECT crsql_as_crr('todos'); 29 - CREATE VIEW IF NOT EXISTS active_todos AS 30 - SELECT 31 - id, 32 - ROW_NUMBER() OVER (ORDER BY id) as working_id, 33 - description, 34 - project, 35 - tags, 36 - due, 37 - wait, 38 - priority, 39 - urgency 40 - FROM todos 41 - WHERE completed = 0; 42 - `); 85 + completed INTEGER NOT NULL DEFAULT 0 86 + ); 87 + SELECT crsql_as_crr('todos'); 88 + CREATE VIEW IF NOT EXISTS active_todos AS 89 + SELECT 90 + id, 91 + ROW_NUMBER() OVER (ORDER BY id) as working_id, 92 + description, 93 + project, 94 + tags, 95 + due, 96 + wait, 97 + priority, 98 + urgency 99 + FROM todos 100 + WHERE completed = 0; 101 + `); 43 102 const rx = tblrx(db); 44 - return { db, rx }; 103 + return { db, rx, roomId, dbname }; 45 104 }; 46 105 47 106 const init = async () => { 48 107 const ctx = await initDb(); 108 + 109 + // Make the worker available globally for debugging 110 + (window as any).syncWorker = syncWorker; 111 + 49 112 createRoot(document.getElementById("root") as HTMLElement).render( 50 113 <> 51 114 <Helmet> ··· 56 119 </Helmet> 57 120 <main className="h-screen flex flex-col bg-background text-gray-200"> 58 121 <SelectionProvider> 59 - <App ctx={ctx} /> 122 + {/* Use the custom sync component with the database name and room ID */} 123 + <CustomSyncComponent dbname={ctx.dbname} roomId={ctx.roomId} /> 124 + <App ctx={ctx} syncWorker={syncWorker} dbname={ctx.dbname} roomId={ctx.roomId} /> 60 125 </SelectionProvider> 61 126 </main> 62 127 </>, ··· 65 130 66 131 document.getElementById("root")!.classList.add("dark"); 67 132 init(); 133 +
+361
mast-react-vite/src/worker/sync-worker.ts
··· 1 + import initWasm from "@vlcn.io/crsqlite-wasm"; 2 + import wasmUrl from "@vlcn.io/crsqlite-wasm/crsqlite.wasm?url"; 3 + 4 + const DEBUG = true; 5 + 6 + function logDebug(message: string, ...data: any[]) { 7 + if (DEBUG) { 8 + console.log(`[SyncWorker] ${message}`, ...data); 9 + } 10 + } 11 + 12 + function logError(message: string, ...data: any[]) { 13 + console.error(`[SyncWorker] ${message}`, ...data); 14 + } 15 + 16 + // Store active connections - only one per database 17 + const connections: Record<string, { 18 + ws: WebSocket | null; 19 + db: any; 20 + lastSyncVersion: number; 21 + siteId?: Uint8Array; 22 + room: string; 23 + url: string; 24 + isConnecting: boolean; 25 + }> = {}; 26 + 27 + // Handle messages from the main thread 28 + self.onmessage = async (event) => { 29 + const { type, dbname, config } = event.data; 30 + logDebug(`Received message: ${type}`, { dbname, config }); 31 + 32 + switch (type) { 33 + case 'START_SYNC': 34 + await startSync(dbname, config); 35 + break; 36 + case 'STOP_SYNC': 37 + stopSync(dbname); 38 + break; 39 + case 'SYNC_CHANGES': 40 + // This is triggered when React tells us changes happened 41 + await sendChanges(dbname); 42 + break; 43 + } 44 + }; 45 + 46 + // Start syncing a database 47 + async function startSync(dbname: string, config: { room: string, url: string }) { 48 + try { 49 + // Check if we already have a connection for this database 50 + if (connections[dbname]) { 51 + logDebug(`Connection already exists for ${dbname}`); 52 + 53 + // If the connection is already established or in progress, do nothing 54 + if (connections[dbname].ws && 55 + (connections[dbname].ws.readyState === WebSocket.OPEN || 56 + connections[dbname].ws.readyState === WebSocket.CONNECTING) || 57 + connections[dbname].isConnecting) { 58 + logDebug(`WebSocket already connected or connecting for ${dbname}`); 59 + return; 60 + } 61 + 62 + // If the connection exists but WebSocket is closed, we'll reconnect below 63 + } 64 + 65 + logDebug(`Starting sync for ${dbname} with room ${config.room}`); 66 + 67 + // Initialize SQLite if needed 68 + const sqlite = await initWasm(() => wasmUrl); 69 + logDebug(`SQLite initialized`); 70 + 71 + // Open the database 72 + const db = await sqlite.open(dbname); 73 + logDebug(`Database opened: ${dbname}`); 74 + 75 + // Get site ID 76 + const siteIdResult = await db.execO<{site_id: Uint8Array}[]>("SELECT crsql_site_id() as site_id"); 77 + const siteId = siteIdResult[0].site_id; 78 + logDebug(`Site ID: ${Array.from(siteId)}`); 79 + 80 + // Store or update connection info 81 + if (!connections[dbname]) { 82 + connections[dbname] = { 83 + ws: null, 84 + db, 85 + lastSyncVersion: 0, 86 + siteId, 87 + room: config.room, 88 + url: config.url, 89 + isConnecting: true 90 + }; 91 + } else { 92 + connections[dbname].db = db; 93 + connections[dbname].siteId = siteId; 94 + connections[dbname].room = config.room; 95 + connections[dbname].url = config.url; 96 + connections[dbname].isConnecting = true; 97 + } 98 + 99 + // Create WebSocket connection with room in query parameter 100 + let wsUrl = config.url; 101 + if (!wsUrl.includes('?room=')) { 102 + const separator = wsUrl.includes('?') ? '&' : '?'; 103 + wsUrl = `${wsUrl}${separator}room=${config.room}`; 104 + } 105 + logDebug(`Connecting to WebSocket at ${wsUrl}`); 106 + 107 + const ws = new WebSocket(wsUrl); 108 + 109 + // Set up WebSocket event handlers 110 + ws.onopen = () => { 111 + logDebug(`WebSocket connected for ${dbname}`); 112 + connections[dbname].isConnecting = false; 113 + 114 + // Set up change listener using onUpdate 115 + db.onUpdate(async () => { 116 + logDebug(`Database update detected for ${dbname}`); 117 + await sendChanges(dbname); 118 + }); 119 + 120 + // Initial sync - request changes from server 121 + logDebug(`Sending initial pull request`); 122 + ws.send(JSON.stringify({ 123 + type: "pull", 124 + room: config.room 125 + })); 126 + 127 + // Notify main thread of connection 128 + self.postMessage({ type: 'SYNC_CONNECTED', dbname }); 129 + }; 130 + 131 + ws.onmessage = async (event) => { 132 + try { 133 + logDebug(`Received WebSocket message: ${event.data.substring(0, 100)}...`); 134 + const message = JSON.parse(event.data); 135 + 136 + if (message.type === "changes" && Array.isArray(message.data)) { 137 + logDebug(`Received ${message.data.length} changes from server`); 138 + await applyChanges(dbname, message.data); 139 + } 140 + } catch (error) { 141 + logError(`Error processing message:`, error); 142 + } 143 + }; 144 + 145 + ws.onerror = (error) => { 146 + logError(`WebSocket error:`, error); 147 + connections[dbname].isConnecting = false; 148 + // Notify main thread of error 149 + self.postMessage({ type: 'SYNC_ERROR', dbname, error: 'WebSocket error' }); 150 + }; 151 + 152 + ws.onclose = () => { 153 + logDebug(`WebSocket connection closed for ${dbname}`); 154 + connections[dbname].isConnecting = false; 155 + // Notify main thread of disconnection 156 + self.postMessage({ type: 'SYNC_DISCONNECTED', dbname }); 157 + }; 158 + 159 + // Store the WebSocket 160 + connections[dbname].ws = ws; 161 + 162 + } catch (error) { 163 + logError(`Error starting sync:`, error); 164 + if (connections[dbname]) { 165 + connections[dbname].isConnecting = false; 166 + } 167 + self.postMessage({ type: 'SYNC_ERROR', dbname, error: 'Failed to start sync' }); 168 + } 169 + } 170 + 171 + // Stop syncing a database 172 + function stopSync(dbname: string) { 173 + logDebug(`Stopping sync for ${dbname}`); 174 + const connection = connections[dbname]; 175 + if (!connection) { 176 + logDebug(`No connection found for ${dbname}`); 177 + return; 178 + } 179 + 180 + // Close WebSocket 181 + if (connection.ws && 182 + (connection.ws.readyState === WebSocket.OPEN || 183 + connection.ws.readyState === WebSocket.CONNECTING)) { 184 + logDebug(`Closing WebSocket for ${dbname}`); 185 + connection.ws.close(); 186 + connection.ws = null; 187 + } 188 + 189 + // Clean up 190 + delete connections[dbname]; 191 + logDebug(`Sync stopped for ${dbname}`); 192 + 193 + // Notify main thread 194 + self.postMessage({ type: 'SYNC_STOPPED', dbname }); 195 + } 196 + 197 + // Send changes to the server 198 + async function sendChanges(dbname: string) { 199 + const connection = connections[dbname]; 200 + if (!connection) { 201 + logDebug(`Cannot send changes - no connection for ${dbname}`); 202 + return; 203 + } 204 + 205 + if (!connection.ws || connection.ws.readyState !== WebSocket.OPEN) { 206 + logDebug(`Cannot send changes - WebSocket not open for ${dbname}`); 207 + return; 208 + } 209 + 210 + try { 211 + logDebug(`Querying for changes since version ${connection.lastSyncVersion}`); 212 + 213 + // Query for changes since last sync 214 + const changes = await connection.db.execA( 215 + `SELECT * FROM crsql_changes WHERE db_version > ? AND site_id = crsql_site_id()`, 216 + [connection.lastSyncVersion] 217 + ); 218 + 219 + if (changes.length === 0) { 220 + logDebug(`No changes to send for ${dbname}`); 221 + return; 222 + } 223 + 224 + logDebug(`Found ${changes.length} changes to send for ${dbname}`); 225 + 226 + // Format changes - with fixed site ID encoding 227 + const formattedChanges = changes.map(change => { 228 + // Properly encode binary data to base64 229 + const encodePK = change[1] instanceof Uint8Array 230 + ? btoa(String.fromCharCode.apply(null, change[1])) 231 + : btoa(String(change[1])); 232 + 233 + const encodeSiteID = change[6] instanceof Uint8Array 234 + ? btoa(String.fromCharCode.apply(null, change[6])) 235 + : btoa(String(change[6])); 236 + 237 + return { 238 + TableName: change[0], 239 + PK: encodePK, 240 + ColumnName: change[2], 241 + Value: change[3], 242 + ColVersion: Number(change[4]), 243 + DBVersion: Number(change[5]), 244 + SiteID: encodeSiteID, 245 + CL: Number(change[7]), 246 + Seq: Number(change[8]) 247 + }; 248 + }); 249 + 250 + // Send changes to server 251 + logDebug(`Sending ${changes.length} changes to server`); 252 + connection.ws.send(JSON.stringify({ 253 + type: "changes", 254 + data: formattedChanges 255 + })); 256 + 257 + // Update last sync version 258 + const maxVersion = Math.max(...changes.map(c => Number(c[5]))); 259 + connection.lastSyncVersion = maxVersion; 260 + logDebug(`Updated lastSyncVersion to ${maxVersion}`); 261 + 262 + // Notify main thread that changes were sent 263 + self.postMessage({ type: 'CHANGES_SENT', dbname, count: changes.length }); 264 + 265 + } catch (error) { 266 + logError(`Error sending changes:`, error); 267 + self.postMessage({ type: 'SYNC_ERROR', dbname, error: 'Failed to send changes' }); 268 + } 269 + } 270 + 271 + // Also update the applyChanges function to handle decoding correctly 272 + async function applyChanges(dbname: string, changes: any[]) { 273 + const connection = connections[dbname]; 274 + if (!connection) { 275 + logDebug(`Cannot apply changes - no connection for ${dbname}`); 276 + return; 277 + } 278 + 279 + if (changes.length === 0) { 280 + logDebug(`No changes to apply for ${dbname}`); 281 + return; 282 + } 283 + 284 + try { 285 + logDebug(`Applying ${changes.length} changes to ${dbname}`); 286 + 287 + // Use transaction for applying changes 288 + await connection.db.tx(async (tx: any) => { 289 + for (const change of changes) { 290 + logDebug(`Applying change: ${change.TableName}.${change.ColumnName}`); 291 + 292 + try { 293 + // Safely decode base64 strings to Uint8Array 294 + const decodePK = (str: string): Uint8Array => { 295 + try { 296 + return new Uint8Array( 297 + atob(str).split('').map(c => c.charCodeAt(0)) 298 + ); 299 + } catch (e) { 300 + logError(`Error decoding PK: ${e}`); 301 + return new Uint8Array(); 302 + } 303 + }; 304 + 305 + const decodeSiteID = (str: string): Uint8Array => { 306 + try { 307 + return new Uint8Array( 308 + atob(str).split('').map(c => c.charCodeAt(0)) 309 + ); 310 + } catch (e) { 311 + logError(`Error decoding SiteID: ${e}`); 312 + return new Uint8Array(); 313 + } 314 + }; 315 + 316 + // Convert the change format back to what CR-SQLite expects 317 + const decodedChange = [ 318 + change.TableName, 319 + decodePK(change.PK), 320 + change.ColumnName, 321 + change.Value, 322 + change.ColVersion, 323 + change.DBVersion, 324 + decodeSiteID(change.SiteID), 325 + change.CL, 326 + change.Seq 327 + ]; 328 + 329 + await tx.exec( 330 + `INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, 331 + decodedChange 332 + ); 333 + } catch (error) { 334 + logError(`Error applying individual change:`, error); 335 + // Continue with other changes 336 + } 337 + } 338 + }); 339 + 340 + logDebug(`Changes applied successfully`); 341 + 342 + // Update last sync version 343 + const maxVersion = Math.max(...changes.map(c => c.DBVersion)); 344 + if (maxVersion > connection.lastSyncVersion) { 345 + connection.lastSyncVersion = maxVersion; 346 + logDebug(`Updated lastSyncVersion to ${maxVersion}`); 347 + } 348 + 349 + // Notify main thread that changes were applied 350 + self.postMessage({ type: 'CHANGES_APPLIED', dbname, count: changes.length }); 351 + 352 + } catch (error) { 353 + logError(`Error applying changes:`, error); 354 + self.postMessage({ type: 'SYNC_ERROR', dbname, error: 'Failed to apply changes' }); 355 + } 356 + } 357 + 358 + 359 + // Export an empty default to satisfy TypeScript 360 + export default {}; 361 +
+75
mast-react-vite/src/worker/worker-interface.ts
··· 1 + export class CustomWorkerInterface { 2 + private worker: Worker | null; 3 + private messageHandlers: Record<string, Array<(data: any) => void>> = {}; 4 + 5 + constructor(worker: Worker | null) { 6 + this.worker = worker; 7 + 8 + if (this.worker) { 9 + // Set up message handler 10 + this.worker.onmessage = (event) => { 11 + const { type, ...data } = event.data; 12 + 13 + // Call all registered handlers for this message type 14 + const handlers = this.messageHandlers[type] || []; 15 + for (const handler of handlers) { 16 + handler(data); 17 + } 18 + }; 19 + } 20 + } 21 + 22 + // Start syncing a database 23 + startSync(dbname: string, config: { room: string, url: string }) { 24 + if (!this.worker) { 25 + console.warn('Worker not initialized'); 26 + return; 27 + } 28 + 29 + this.worker.postMessage({ 30 + type: 'START_SYNC', 31 + dbname, 32 + config 33 + }); 34 + } 35 + 36 + // Stop syncing a database 37 + stopSync(dbname: string) { 38 + if (!this.worker) { 39 + console.warn('Worker not initialized'); 40 + return; 41 + } 42 + 43 + this.worker.postMessage({ 44 + type: 'STOP_SYNC', 45 + dbname 46 + }); 47 + } 48 + 49 + // Send changes to be synced 50 + syncChanges(dbname: string) { 51 + if (!this.worker) { 52 + console.warn('Worker not initialized'); 53 + return; 54 + } 55 + 56 + this.worker.postMessage({ 57 + type: 'SYNC_CHANGES', 58 + dbname 59 + }); 60 + } 61 + 62 + // Register a handler for a specific message type from the worker 63 + onMessage(type: string, handler: (data: any) => void) { 64 + if (!this.messageHandlers[type]) { 65 + this.messageHandlers[type] = []; 66 + } 67 + this.messageHandlers[type].push(handler); 68 + 69 + // Return unsubscribe function 70 + return () => { 71 + this.messageHandlers[type] = this.messageHandlers[type].filter(h => h !== handler); 72 + }; 73 + } 74 + } 75 +
+3
mast-react-vite/vite.config.ts
··· 13 13 "@": path.resolve(__dirname, "./src"), 14 14 }, 15 15 }, 16 + worker: { 17 + format: 'es', // Use ES modules in workers 18 + }, 16 19 })
+299 -137
server/main.go
··· 1 1 package main 2 2 3 3 import ( 4 - "database/sql" 5 4 "encoding/base64" 5 + "database/sql" 6 6 "encoding/json" 7 - "fmt" 8 7 "log" 9 8 "net/http" 10 - "strconv" 11 - "strings" 12 - 13 - "mast/db" 9 + "sync" 10 + "time" 14 11 15 12 "github.com/gorilla/websocket" 16 13 sqlite3 "github.com/mattn/go-sqlite3" ··· 22 19 }, 23 20 } 24 21 25 - var db *sql.DB 26 - var clients = make(map[*websocket.Conn]bool) 22 + // Room management 23 + type Room struct { 24 + clients map[*websocket.Conn]bool 25 + mu sync.Mutex 26 + } 27 + 28 + var rooms = make(map[string]*Room) 29 + var roomsMu sync.Mutex 30 + 31 + // Get or create a room 32 + func getRoom(roomID string) *Room { 33 + roomsMu.Lock() 34 + defer roomsMu.Unlock() 35 + 36 + if room, exists := rooms[roomID]; exists { 37 + return room 38 + } 39 + 40 + room := &Room{ 41 + clients: make(map[*websocket.Conn]bool), 42 + } 43 + rooms[roomID] = room 44 + return room 45 + } 46 + 47 + // Add client to room 48 + func addClientToRoom(roomID string, conn *websocket.Conn) { 49 + room := getRoom(roomID) 50 + 51 + room.mu.Lock() 52 + defer room.mu.Unlock() 53 + 54 + room.clients[conn] = true 55 + log.Printf("Client added to room %s. Total clients: %d", roomID, len(room.clients)) 56 + } 57 + 58 + // Remove client from room 59 + func removeClientFromRoom(roomID string, conn *websocket.Conn) { 60 + roomsMu.Lock() 61 + room, exists := rooms[roomID] 62 + roomsMu.Unlock() 63 + 64 + if !exists { 65 + return 66 + } 67 + 68 + room.mu.Lock() 69 + defer room.mu.Unlock() 70 + 71 + delete(room.clients, conn) 72 + log.Printf("Client removed from room %s. Remaining clients: %d", roomID, len(room.clients)) 73 + 74 + // Clean up empty rooms 75 + if len(room.clients) == 0 { 76 + roomsMu.Lock() 77 + delete(rooms, roomID) 78 + roomsMu.Unlock() 79 + log.Printf("Room %s removed (empty)", roomID) 80 + } 81 + } 82 + 83 + // Broadcast changes to all clients in a room except sender 84 + func broadcastToRoom(roomID string, sender *websocket.Conn, message []byte) { 85 + roomsMu.Lock() 86 + room, exists := rooms[roomID] 87 + roomsMu.Unlock() 88 + 89 + if !exists { 90 + return 91 + } 92 + 93 + room.mu.Lock() 94 + defer room.mu.Unlock() 95 + 96 + for client := range room.clients { 97 + if client != sender { 98 + client.WriteMessage(websocket.TextMessage, message) 99 + } 100 + } 101 + } 27 102 28 103 func handleWebSocket(w http.ResponseWriter, r *http.Request) { 104 + // Extract room ID from query parameters 105 + roomID := r.URL.Query().Get("room") 106 + if roomID == "" { 107 + http.Error(w, "Missing room parameter", http.StatusBadRequest) 108 + return 109 + } 110 + 29 111 conn, err := upgrader.Upgrade(w, r, nil) 30 112 if err != nil { 31 - log.Println(err) 113 + log.Println("Error upgrading connection:", err) 32 114 return 33 115 } 34 - defer conn.Close() 35 - 116 + 117 + // Add client to room 118 + addClientToRoom(roomID, conn) 119 + 120 + // Create database connection for this room 121 + dbPath := "./rooms/" + roomID + ".db" 122 + db, err := sql.Open("sqlite3_with_extensions", dbPath) 123 + if err != nil { 124 + log.Println("Error opening database:", err) 125 + conn.Close() 126 + removeClientFromRoom(roomID, conn) 127 + return 128 + } 129 + defer db.Close() 130 + 131 + // Ensure the database has the necessary tables 132 + setupDatabase(db) 133 + 134 + // Set up ping/pong to keep connection alive 135 + conn.SetPingHandler(func(string) error { 136 + conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second)) 137 + return nil 138 + }) 139 + 140 + // Clean up when connection closes 141 + defer func() { 142 + conn.Close() 143 + removeClientFromRoom(roomID, conn) 144 + }() 145 + 146 + // Send initial changes to client 147 + sendInitialChanges(conn, db) 148 + 149 + // Handle incoming messages 36 150 for { 37 151 _, message, err := conn.ReadMessage() 38 152 if err != nil { 39 - log.Println(err) 40 - return 41 - } 42 - 43 - var msg struct { 44 - Type string `json:"type"` 45 - Data []interface{} `json:"data"` 153 + log.Println("Read error:", err) 154 + break 46 155 } 156 + 157 + var msg map[string]interface{} 47 158 if err := json.Unmarshal(message, &msg); err != nil { 48 - log.Println("Error unmarshaling message:", err) 159 + log.Println("Error parsing message:", err) 49 160 continue 50 161 } 51 - 52 - if msg.Type == "changes" { 53 - fmt.Println("We're applying changes!!") 54 - applyChanges(msg.Data) 162 + 163 + msgType, ok := msg["type"].(string) 164 + if !ok { 165 + log.Println("Message missing 'type' field") 166 + continue 55 167 } 56 - } 57 - } 58 - 59 - func printChange(change interface{}) { 60 - // Attempt to type assert and print more details 61 - if c, ok := change.(map[string]interface{}); ok { 62 - for key, value := range c { 63 - fmt.Printf(" %s: %v\n", key, value) 168 + 169 + switch msgType { 170 + case "pull": 171 + // Client is requesting changes 172 + changes := getChangesFromDB(db, msg) 173 + response := map[string]interface{}{ 174 + "type": "changes", 175 + "data": changes, 176 + } 177 + responseJSON, _ := json.Marshal(response) 178 + conn.WriteMessage(websocket.TextMessage, responseJSON) 179 + 180 + case "changes": 181 + // Client is sending changes 182 + if data, ok := msg["data"].([]interface{}); ok { 183 + applyChangesToDB(db, data) 184 + 185 + // Broadcast changes to other clients in the same room 186 + broadcastToRoom(roomID, conn, message) 187 + } 64 188 } 65 - } else { 66 - fmt.Printf(" Not a map: %T\n", change) 67 189 } 68 190 } 69 191 70 - func printChanges(changes []interface{}) { 71 - for i, change := range changes { 72 - fmt.Printf("Change %d: %+v\n", i, change) 73 - printChange(change) 192 + func setupDatabase(db *sql.DB) { 193 + // Create todos table if it doesn't exist 194 + _, err := db.Exec(` 195 + CREATE TABLE IF NOT EXISTS todos ( 196 + id BLOB PRIMARY KEY NOT NULL, 197 + description TEXT, 198 + project text, 199 + tags text, 200 + due text, 201 + wait text, 202 + priority text, 203 + urgency real, 204 + completed INTEGER NOT NULL DEFAULT 0 205 + ); 206 + SELECT crsql_as_crr('todos'); 207 + `) 208 + if err != nil { 209 + log.Println("Error setting up database:", err) 74 210 } 75 211 } 76 212 77 - type CRSQLChange struct { 78 - TableName string 79 - PK []byte 80 - ColumnName string 81 - Value any 82 - ColVersion int64 83 - DBVersion int64 84 - SiteID []byte 85 - CL int64 86 - Seq int64 87 - } 88 - 89 - func mapToCRSQLChange(m map[string]interface{}) (CRSQLChange, error) { 90 - var err error 91 - change := CRSQLChange{} 92 - 93 - change.TableName = m["TableName"].(string) 94 - change.ColumnName = m["ColumnName"].(string) 95 - change.ColVersion = int64(m["ColVersion"].(float64)) 96 - change.DBVersion = int64(m["DBVersion"].(float64)) 97 - change.CL = int64(m["CL"].(float64)) 98 - change.Seq = int64(m["Seq"].(float64)) 99 - 100 - // Still unsure about this, it can encode anything 101 - if m["Value"] != nil { 102 - change.Value = m["Value"] 213 + func sendInitialChanges(conn *websocket.Conn, db *sql.DB) { 214 + // Query all changes 215 + rows, err := db.Query("SELECT * FROM crsql_changes") 216 + if err != nil { 217 + log.Println("Error querying changes:", err) 218 + return 103 219 } 104 - // Handle []byte fields 105 - change.PK, err = decodeBase64ToByteArray(m["PK"].(string)) 106 - if err != nil { 107 - return change, err 220 + defer rows.Close() 221 + 222 + var changes []map[string]interface{} 223 + 224 + for rows.Next() { 225 + var tableName string 226 + var pk []byte 227 + var columnName string 228 + var value interface{} 229 + var colVersion, dbVersion int64 230 + var siteID []byte 231 + var cl, seq int64 232 + 233 + if err := rows.Scan(&tableName, &pk, &columnName, &value, &colVersion, &dbVersion, &siteID, &cl, &seq); err != nil { 234 + log.Println("Error scanning row:", err) 235 + continue 236 + } 237 + 238 + change := map[string]interface{}{ 239 + "TableName": tableName, 240 + "PK": encodeToBase64(pk), 241 + "ColumnName": columnName, 242 + "Value": value, 243 + "ColVersion": colVersion, 244 + "DBVersion": dbVersion, 245 + "SiteID": encodeToBase64(siteID), 246 + "CL": cl, 247 + "Seq": seq, 248 + } 249 + 250 + changes = append(changes, change) 108 251 } 109 - change.SiteID, err = decodeBase64ToByteArray(m["SiteID"].(string)) 110 - fmt.Println(change.SiteID) 111 - if err != nil { 112 - return change, err 252 + 253 + response := map[string]interface{}{ 254 + "type": "changes", 255 + "data": changes, 113 256 } 114 - 115 - return change, nil 257 + 258 + responseJSON, _ := json.Marshal(response) 259 + conn.WriteMessage(websocket.TextMessage, responseJSON) 116 260 } 117 261 118 - func decodeBase64ToByteArray(encodedString string) ([]uint8, error) { 119 - // Decode the base64 string 120 - decodedBytes, err := base64.StdEncoding.DecodeString(encodedString) 262 + func getChangesFromDB(db *sql.DB, msg map[string]interface{}) []map[string]interface{} { 263 + // Implementation to get changes based on client's request 264 + // This would typically filter based on site_id and version 265 + 266 + rows, err := db.Query("SELECT * FROM crsql_changes") 121 267 if err != nil { 122 - return nil, fmt.Errorf("error decoding base64: %w", err) 268 + log.Println("Error querying changes:", err) 269 + return nil 123 270 } 124 - 125 - // Convert bytes to string 126 - decodedString := string(decodedBytes) 127 - 128 - // Split the string by commas 129 - stringSlice := strings.Split(decodedString, ",") 130 - 131 - // Convert each string to an integer 132 - intSlice := make([]uint8, len(stringSlice)) 133 - for i, s := range stringSlice { 134 - num, err := strconv.Atoi(s) 135 - if err != nil { 136 - return nil, fmt.Errorf("error converting string to int: %w", err) 271 + defer rows.Close() 272 + 273 + var changes []map[string]interface{} 274 + 275 + for rows.Next() { 276 + var tableName string 277 + var pk []byte 278 + var columnName string 279 + var value interface{} 280 + var colVersion, dbVersion int64 281 + var siteID []byte 282 + var cl, seq int64 283 + 284 + if err := rows.Scan(&tableName, &pk, &columnName, &value, &colVersion, &dbVersion, &siteID, &cl, &seq); err != nil { 285 + log.Println("Error scanning row:", err) 286 + continue 137 287 } 138 - intSlice[i] = byte(num) 288 + 289 + change := map[string]interface{}{ 290 + "TableName": tableName, 291 + "PK": encodeToBase64(pk), 292 + "ColumnName": columnName, 293 + "Value": value, 294 + "ColVersion": colVersion, 295 + "DBVersion": dbVersion, 296 + "SiteID": encodeToBase64(siteID), 297 + "CL": cl, 298 + "Seq": seq, 299 + } 300 + 301 + changes = append(changes, change) 139 302 } 140 - 141 - return intSlice, nil 303 + 304 + return changes 142 305 } 143 306 144 - func applyChanges(changes []interface{}) { 145 - tx, _ := db.Begin() 146 - for i, change := range changes { 147 - fmt.Println("Change number: ", i) 148 - fmt.Println(change) 307 + func applyChangesToDB(db *sql.DB, changes []interface{}) { 308 + tx, err := db.Begin() 309 + if err != nil { 310 + log.Println("Error starting transaction:", err) 311 + return 312 + } 313 + 314 + for _, change := range changes { 149 315 if changeMap, ok := change.(map[string]interface{}); ok { 150 - c, err := mapToCRSQLChange(changeMap) 151 - if err != nil { 152 - fmt.Println("Error converting to CRSQLChange:", err) 153 - continue 154 - } 155 - _, err = tx.Exec(`INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, 156 - c.TableName, c.PK, c.ColumnName, c.Value, c.ColVersion, c.DBVersion, c.SiteID, c.CL, c.Seq) 316 + tableName := changeMap["TableName"].(string) 317 + pk, _ := decodeBase64(changeMap["PK"].(string)) 318 + columnName := changeMap["ColumnName"].(string) 319 + value := changeMap["Value"] 320 + colVersion := int64(changeMap["ColVersion"].(float64)) 321 + dbVersion := int64(changeMap["DBVersion"].(float64)) 322 + siteID, _ := decodeBase64(changeMap["SiteID"].(string)) 323 + cl := int64(changeMap["CL"].(float64)) 324 + seq := int64(changeMap["Seq"].(float64)) 325 + 326 + _, err := tx.Exec( 327 + `INSERT INTO crsql_changes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, 328 + tableName, pk, columnName, value, colVersion, dbVersion, siteID, cl, seq, 329 + ) 157 330 if err != nil { 158 331 log.Println("Error inserting change:", err) 159 332 } 160 - log.Println("We inserted data successfully??") 161 - fmt.Println(c) 162 - } else { 163 - fmt.Println("Not OK to convert change") 164 - fmt.Println("Change look like:") 165 - printChange(change) 166 - continue 167 333 } 168 334 } 169 - tx.Commit() 335 + 336 + if err := tx.Commit(); err != nil { 337 + log.Println("Error committing transaction:", err) 338 + tx.Rollback() 339 + } 170 340 } 171 341 172 - func broadcastChanges(sender *websocket.Conn, changes interface{}) { 173 - for client := range clients { 174 - if client != sender { 175 - client.WriteJSON(map[string]interface{}{"type": "changes", "data": changes}) 176 - } 177 - } 342 + func encodeToBase64(data []byte) string { 343 + return base64.StdEncoding.EncodeToString(data) 178 344 } 179 - func main() { 180 - var err error 181 - if err != nil { 182 - log.Fatal("error:", err) 183 - } 184 - sql.Register("sqlite3_with_extensions", 185 - &sqlite3.SQLiteDriver{ 186 - Extensions: []string{ 187 - "../db/crsqlite.so", 188 - }, 189 - }) 190 - db, err = sql.Open("sqlite3_with_extensions", "./todo.sqlite") 191 - if err != nil { 192 - log.Fatal(err) 193 - } 194 - err = mast.RunMigrations(db) 195 - defer db.Close() 345 + 346 + func decodeBase64(encoded string) ([]byte, error) { 347 + return base64.StdEncoding.DecodeString(encoded) 348 + } 196 349 197 - // Initialize database and tables... 350 + func main() { 351 + // Register SQLite with CR-SQLite extension 352 + sql.Register("sqlite3_with_extensions", &sqlite3.SQLiteDriver{ 353 + Extensions: []string{"../db/crsqlite"}, 354 + }) 355 + 356 + // Create directory for room databases 198 357 http.HandleFunc("/sync", handleWebSocket) 358 + 359 + log.Println("WebSocket server started on :8080") 199 360 log.Fatal(http.ListenAndServe(":8080", nil)) 200 361 } 362 +