···6464// UIConfig defines web UI settings
6565type UIConfig struct {
6666 // SQLite database path.
6767- DatabasePath string `yaml:"database_path" comment:"SQLite database for OAuth sessions, stars, pull counts, and device approvals."`
6767+ DatabasePath string `yaml:"database_path" comment:"SQLite/libSQL database for OAuth sessions, stars, pull counts, and device approvals."`
68686969 // Visual theme name (e.g. "seamark"). Empty string uses default atcr.io branding.
7070 Theme string `yaml:"theme" comment:"Visual theme name (e.g. \"seamark\"). Empty uses default atcr.io branding."`
7171+7272+ // libSQL sync URL for embedded replicas. Works with Turso cloud or self-hosted libsql-server.
7373+ // Leave empty for local-only SQLite mode (selfhost/dev).
7474+ LibsqlSyncURL string `yaml:"libsql_sync_url" comment:"libSQL sync URL (libsql://...). Works with Turso cloud or self-hosted libsql-server. Leave empty for local-only SQLite."`
7575+7676+ // Auth token for libSQL sync. Required if LibsqlSyncURL is set.
7777+ LibsqlAuthToken string `yaml:"libsql_auth_token" comment:"Auth token for libSQL sync. Required if libsql_sync_url is set."`
7878+7979+ // How often to sync with the remote libSQL server.
8080+ LibsqlSyncInterval time.Duration `yaml:"libsql_sync_interval" comment:"How often to sync with remote libSQL server. Default: 60s."`
7181}
72827383// HealthConfig defines health check and cache settings
···140150 // UI defaults
141151 v.SetDefault("ui.database_path", "/var/lib/atcr/ui.db")
142152 v.SetDefault("ui.theme", "")
153153+ v.SetDefault("ui.libsql_sync_url", "")
154154+ v.SetDefault("ui.libsql_auth_token", "")
155155+ v.SetDefault("ui.libsql_sync_interval", "60s")
143156144157 // Health defaults
145158 v.SetDefault("health.cache_ttl", "15m")
+1-1
pkg/appview/db/annotations_test.go
···2121func setupAnnotationsTestDB(t *testing.T) *sql.DB {
2222 t.Helper()
2323 // Use file::memory: with cache=shared to ensure all connections share the same in-memory DB
2424- db, err := InitDB("file::memory:?cache=shared")
2424+ db, err := InitDB("file::memory:?cache=shared", LibsqlConfig{})
2525 if err != nil {
2626 t.Fatalf("Failed to initialize test database: %v", err)
2727 }
···1414 t.Helper()
1515 // Use file::memory: with cache=shared to ensure all connections share the same in-memory DB
1616 // This prevents race conditions where different connections see different databases
1717- db, err := InitDB("file::memory:?cache=shared")
1717+ db, err := InitDB("file::memory:?cache=shared", LibsqlConfig{})
1818 if err != nil {
1919 t.Fatalf("Failed to initialize test database: %v", err)
2020 }
+13-2
pkg/appview/db/hold_store.go
···33import (
44 "database/sql"
55 "fmt"
66+ "strings"
67 "time"
78)
99+1010+// normalizeDateString strips time components from date strings.
1111+// libSQL normalizes date-like TEXT values to ISO 8601 (e.g., "2025-01-15" → "2025-01-15T00:00:00Z").
1212+// This preserves the original date-only format for display.
1313+func normalizeDateString(s string) string {
1414+ if i := strings.IndexByte(s, 'T'); i == 10 && strings.HasSuffix(s, "T00:00:00Z") {
1515+ return s[:10]
1616+ }
1717+ return s
1818+}
819920// HoldCaptainRecord represents a cached captain record from a hold's PDS
1021type HoldCaptainRecord struct {
···50615162 // Handle nullable fields
5263 if deployedAt.Valid {
5353- record.DeployedAt = deployedAt.String
6464+ record.DeployedAt = normalizeDateString(deployedAt.String)
5465 }
5566 if region.Valid {
5667 record.Region = region.String
···166177 }
167178168179 if deployedAt.Valid {
169169- record.DeployedAt = deployedAt.String
180180+ record.DeployedAt = normalizeDateString(deployedAt.String)
170181 }
171182 if region.Valid {
172183 record.Region = region.String
+1-1
pkg/appview/db/hold_store_test.go
···8181func setupHoldTestDB(t *testing.T) *sql.DB {
8282 t.Helper()
8383 // Use file::memory: with cache=shared to ensure all connections share the same in-memory DB
8484- db, err := InitDB("file::memory:?cache=shared")
8484+ db, err := InitDB("file::memory:?cache=shared", LibsqlConfig{})
8585 if err != nil {
8686 t.Fatalf("Failed to initialize test database: %v", err)
8787 }
···66 "log/slog"
77 "os"
88 "path/filepath"
99+ "strings"
910 "time"
1010-1111- sqlite3 "github.com/mattn/go-sqlite3"
1211)
13121414-const (
1515- // ReadOnlyDriverName is the name of the custom SQLite driver with table authorization
1616- ReadOnlyDriverName = "sqlite3_readonly_public"
1717-)
1818-1919-// sensitiveTables defines tables that should never be accessible from public queries
2020-var sensitiveTables = map[string]bool{
2121- "oauth_sessions": true, // OAuth tokens
2222- "ui_sessions": true, // Session IDs
2323- "oauth_auth_requests": true, // OAuth state
2424- "devices": true, // Device secret hashes
2525- "pending_device_auth": true, // Pending device secrets
2626-}
2727-2828-// readOnlyAuthorizerCallback blocks access to sensitive tables
2929-func readOnlyAuthorizerCallback(action int, arg1, arg2, dbName string) int {
3030- // arg1 contains the table name for most operations
3131- tableName := arg1
3232-3333- // Block any access to sensitive tables
3434- if action == sqlite3.SQLITE_READ || action == sqlite3.SQLITE_UPDATE ||
3535- action == sqlite3.SQLITE_INSERT || action == sqlite3.SQLITE_DELETE ||
3636- action == sqlite3.SQLITE_SELECT {
3737- if sensitiveTables[tableName] {
3838- slog.Warn("Blocked access to sensitive table", "component", "SECURITY", "table", tableName, "action", action)
3939- return sqlite3.SQLITE_DENY
4040- }
4141- }
4242-4343- // Allow everything else
4444- return sqlite3.SQLITE_OK
4545-}
4646-4747-func init() {
4848- // Register a custom SQLite driver with authorizer for read-only public queries
4949- sql.Register(ReadOnlyDriverName,
5050- &sqlite3.SQLiteDriver{
5151- ConnectHook: func(conn *sqlite3.SQLiteConn) error {
5252- conn.RegisterAuthorizer(readOnlyAuthorizerCallback)
5353- return nil
5454- },
5555- })
5656-}
5757-5858-// InitializeDatabase initializes the SQLite database and session store
1313+// InitializeDatabase initializes the libSQL database and session store.
5914// Returns: (read-write DB, read-only DB, session store)
6060-func InitializeDatabase(dbPath string) (*sql.DB, *sql.DB, *SessionStore) {
1515+func InitializeDatabase(dbPath string, cfg LibsqlConfig) (*sql.DB, *sql.DB, *SessionStore) {
6116 // Ensure directory exists
6217 dbDir := filepath.Dir(dbPath)
6318 if err := os.MkdirAll(dbDir, 0700); err != nil {
···6621 }
67226823 // Initialize read-write database (for writes and auth operations)
6969- database, err := InitDB(dbPath)
2424+ database, err := InitDB(dbPath, cfg)
7025 if err != nil {
7126 slog.Warn("Failed to initialize UI database", "error", err)
7227 return nil, nil, nil
7328 }
74297530 // Open read-only connection for public queries (search, user pages, etc.)
7676- // Uses custom driver with SQLite authorizer that blocks sensitive tables
7777- // This prevents accidental writes and blocks access to sensitive tables even if SQL injection occurs
7878- readOnlyDB, err := sql.Open(ReadOnlyDriverName, "file:"+dbPath+"?mode=ro")
3131+ // Uses ?mode=ro to prevent writes from public-facing handlers
3232+ roDSN := dbPath
3333+ if !strings.HasPrefix(dbPath, "file:") && !strings.HasPrefix(dbPath, ":memory:") {
3434+ roDSN = "file:" + dbPath
3535+ }
3636+ // Append ?mode=ro for read-only access
3737+ if strings.Contains(roDSN, "?") {
3838+ roDSN += "&mode=ro"
3939+ } else {
4040+ roDSN += "?mode=ro"
4141+ }
4242+ readOnlyDB, err := sql.Open("libsql", roDSN)
7943 if err != nil {
8044 slog.Warn("Failed to open read-only database connection", "error", err)
8145 return nil, nil, nil
8246 }
83474848+ // busy_timeout is per-connection — without this, reads return SQLITE_BUSY
4949+ // immediately when a write is in progress on the read-write connection.
5050+ var busyTimeout int
5151+ if err := readOnlyDB.QueryRow("PRAGMA busy_timeout = 5000").Scan(&busyTimeout); err != nil {
5252+ slog.Warn("Failed to set busy_timeout on read-only connection", "error", err)
5353+ }
5454+8455 slog.Info("UI database initialized", "mode", "readonly", "path", dbPath)
85568686- // Create SQLite-backed session store
5757+ // Create session store
8758 sessionStore := NewSessionStore(database)
88598989- // Start cleanup goroutines for all SQLite stores
6060+ // Start cleanup goroutines
9061 go func() {
9162 ticker := time.NewTicker(1 * time.Hour)
9263 defer ticker.Stop()
+18-41
pkg/appview/db/readonly_test.go
···77 "testing"
88)
991010-func TestAuthorizerBlocksSensitiveTables(t *testing.T) {
1010+func TestReadOnlyBlocksWrites(t *testing.T) {
1111 // Create temporary database
1212 tmpDir := t.TempDir()
1313 dbPath := filepath.Join(tmpDir, "test.db")
···1919 defer os.Unsetenv("ATCR_UI_DATABASE_PATH")
20202121 // Initialize database (creates schema)
2222- database, err := InitDB(dbPath)
2222+ database, err := InitDB(dbPath, LibsqlConfig{})
2323 if err != nil {
2424 t.Fatalf("Failed to initialize database: %v", err)
2525 }
2626 defer database.Close()
27272828- // Create some test data in sensitive tables
2929- _, err = database.Exec(`
3030- INSERT INTO oauth_sessions (session_key, account_did, session_id, session_data, created_at, updated_at)
3131- VALUES ('test-key', 'did:plc:test', 'test-session', 'secret-token-data', datetime('now'), datetime('now'))
3232- `)
3333- if err != nil {
3434- t.Fatalf("Failed to insert test data: %v", err)
3535- }
3636-2828+ // Create some test data
3729 _, err = database.Exec(`
3830 INSERT INTO users (did, handle, pds_endpoint, avatar, last_seen)
3931 VALUES ('did:plc:test', 'test.user', 'https://pds.example.com', '', datetime('now'))
···4234 t.Fatalf("Failed to insert test user: %v", err)
4335 }
44364545- // Open read-only connection with authorizer (using our custom driver)
4646- readOnlyDB, err := sql.Open(ReadOnlyDriverName, "file:"+dbPath+"?mode=ro")
3737+ // Open read-only connection
3838+ readOnlyDB, err := sql.Open("libsql", "file:"+dbPath+"?mode=ro")
4739 if err != nil {
4840 t.Fatalf("Failed to open read-only database: %v", err)
4941 }
5042 defer readOnlyDB.Close()
51435252- // Test 1: Should be able to read from public tables (users)
4444+ // Test 1: Should be able to read from public tables
5345 t.Run("AllowPublicTableRead", func(t *testing.T) {
5446 var handle string
5547 err := readOnlyDB.QueryRow("SELECT handle FROM users WHERE did = ?", "did:plc:test").Scan(&handle)
···6153 }
6254 })
63556464- // Test 2: Should NOT be able to read from sensitive tables (oauth_sessions)
6565- t.Run("BlockSensitiveTableRead", func(t *testing.T) {
6666- var sessionData string
6767- err := readOnlyDB.QueryRow("SELECT session_data FROM oauth_sessions WHERE session_key = ?", "test-key").Scan(&sessionData)
5656+ // Test 2: Should NOT be able to write to any table (read-only mode)
5757+ t.Run("BlockInsert", func(t *testing.T) {
5858+ _, err := readOnlyDB.Exec("INSERT INTO users (did, handle, pds_endpoint, avatar, last_seen) VALUES ('did:plc:test2', 'test2', 'https://pds.example.com', '', datetime('now'))")
6859 if err == nil {
6969- t.Errorf("Should NOT be able to read from sensitive table 'oauth_sessions', but got data: %s", sessionData)
7070- }
7171- // SQLite returns "not authorized" error when authorizer denies access
7272- if err != nil && err.Error() != "not authorized" {
7373- t.Logf("Got expected error (but different message): %v", err)
6060+ t.Error("Should NOT be able to INSERT in read-only mode")
7461 }
7562 })
76637777- // Test 3: Should NOT be able to read from ui_sessions
7878- t.Run("BlockUISessionsTableRead", func(t *testing.T) {
7979- rows, err := readOnlyDB.Query("SELECT * FROM ui_sessions LIMIT 1")
6464+ // Test 3: Should NOT be able to update in read-only mode
6565+ t.Run("BlockUpdate", func(t *testing.T) {
6666+ _, err := readOnlyDB.Exec("UPDATE users SET handle = 'changed' WHERE did = ?", "did:plc:test")
8067 if err == nil {
8181- rows.Close()
8282- t.Error("Should NOT be able to read from sensitive table 'ui_sessions'")
6868+ t.Error("Should NOT be able to UPDATE in read-only mode")
8369 }
8470 })
85718686- // Test 4: Should NOT be able to read from devices
8787- t.Run("BlockDevicesTableRead", func(t *testing.T) {
8888- rows, err := readOnlyDB.Query("SELECT * FROM devices LIMIT 1")
7272+ // Test 4: Should NOT be able to delete in read-only mode
7373+ t.Run("BlockDelete", func(t *testing.T) {
7474+ _, err := readOnlyDB.Exec("DELETE FROM users WHERE did = ?", "did:plc:test")
8975 if err == nil {
9090- rows.Close()
9191- t.Error("Should NOT be able to read from sensitive table 'devices'")
9292- }
9393- })
9494-9595- // Test 5: Should NOT be able to write to any table (read-only mode + authorizer)
9696- t.Run("BlockAllWrites", func(t *testing.T) {
9797- _, err := readOnlyDB.Exec("INSERT INTO users (did, handle, pds_endpoint, avatar, last_seen) VALUES ('did:plc:test2', 'test2', 'https://pds.example.com', '', datetime('now'))")
9898- if err == nil {
9999- t.Error("Should NOT be able to write to any table in read-only mode")
7676+ t.Error("Should NOT be able to DELETE in read-only mode")
10077 }
10178 })
10279}
+61-6
pkg/appview/db/schema.go
···1414 "sort"
1515 "strconv"
1616 "strings"
1717+ "time"
17181818- _ "github.com/mattn/go-sqlite3"
1919+ "github.com/tursodatabase/go-libsql"
1920 "go.yaml.in/yaml/v4"
2021)
2122···2526//go:embed schema.sql
2627var schemaSQL string
27282828-// InitDB initializes the SQLite database with the schema
2929-func InitDB(path string) (*sql.DB, error) {
3030- db, err := sql.Open("sqlite3", path)
3131- if err != nil {
3232- return nil, err
2929+// LibsqlConfig holds optional libSQL sync settings for embedded replicas.
3030+// When SyncURL is empty, the database operates in local-only mode.
3131+type LibsqlConfig struct {
3232+ SyncURL string
3333+ AuthToken string
3434+ SyncInterval time.Duration
3535+}
3636+3737+// InitDB initializes the database with the schema.
3838+// Uses libSQL driver: local-only when cfg.SyncURL is empty,
3939+// embedded replica when cfg.SyncURL is set.
4040+func InitDB(path string, cfg LibsqlConfig) (*sql.DB, error) {
4141+ var db *sql.DB
4242+4343+ if cfg.SyncURL != "" {
4444+ // Embedded replica mode: local file + sync to remote
4545+ opts := []libsql.Option{
4646+ libsql.WithAuthToken(cfg.AuthToken),
4747+ }
4848+ if cfg.SyncInterval > 0 {
4949+ opts = append(opts, libsql.WithSyncInterval(cfg.SyncInterval))
5050+ }
5151+ connector, err := libsql.NewEmbeddedReplicaConnector(path, cfg.SyncURL, opts...)
5252+ if err != nil {
5353+ return nil, fmt.Errorf("failed to create libsql embedded replica connector: %w", err)
5454+ }
5555+ db = sql.OpenDB(connector)
5656+ slog.Info("Database opened in embedded replica mode", "path", path, "sync_url", cfg.SyncURL)
5757+ } else {
5858+ // Local-only mode: plain file via libsql driver
5959+ // Paths starting with "file:" or ":memory:" are already valid libsql URIs
6060+ dsn := path
6161+ if !strings.HasPrefix(path, "file:") && !strings.HasPrefix(path, ":memory:") {
6262+ dsn = "file:" + path
6363+ }
6464+ var err error
6565+ db, err = sql.Open("libsql", dsn)
6666+ if err != nil {
6767+ return nil, err
6868+ }
6969+ slog.Info("Database opened in local-only mode", "path", path)
7070+ }
7171+7272+ // In local-only mode, configure WAL and busy_timeout locally.
7373+ // In embedded replica mode, the remote server manages these settings
7474+ // and PRAGMA assignments are rejected as "unsupported statement"
7575+ // (observed with Bunny Database; Turso may behave similarly).
7676+ if cfg.SyncURL == "" {
7777+ // Enable WAL mode for concurrent read/write access
7878+ var journalMode string
7979+ if err := db.QueryRow("PRAGMA journal_mode = WAL").Scan(&journalMode); err != nil {
8080+ return nil, err
8181+ }
8282+8383+ // Retry on lock instead of failing immediately (5s timeout)
8484+ var busyTimeout int
8585+ if err := db.QueryRow("PRAGMA busy_timeout = 5000").Scan(&busyTimeout); err != nil {
8686+ return nil, err
8787+ }
3388 }
34893590 // Enable foreign keys
+1-1
pkg/appview/db/session_store_test.go
···1313func setupSessionTestDB(t *testing.T) *SessionStore {
1414 t.Helper()
1515 // Use file::memory: with cache=shared to ensure all connections share the same in-memory DB
1616- db, err := InitDB("file::memory:?cache=shared")
1616+ db, err := InitDB("file::memory:?cache=shared", LibsqlConfig{})
1717 if err != nil {
1818 t.Fatalf("Failed to initialize test database: %v", err)
1919 }
+1-1
pkg/appview/db/tag_delete_test.go
···1111// This simulates what Jetstream does: encode repo/tag to rkey, then decode and delete
1212func TestTagDeleteRoundTrip(t *testing.T) {
1313 // Create in-memory test database
1414- db, err := InitDB(":memory:")
1414+ db, err := InitDB(":memory:", LibsqlConfig{})
1515 if err != nil {
1616 t.Fatalf("Failed to init database: %v", err)
1717 }
···11-package auth
22-33-import (
44- "context"
55- "fmt"
66-77- "atcr.io/pkg/atproto"
88- "atcr.io/pkg/hold/pds"
99-)
1010-1111-// LocalHoldAuthorizer queries the hold's own embedded PDS directly
1212-// Used by hold service to authorize access to its own storage
1313-type LocalHoldAuthorizer struct {
1414- pds *pds.HoldPDS
1515-}
1616-1717-// NewLocalHoldAuthorizer creates a new local authorizer for hold service
1818-func NewLocalHoldAuthorizer(holdPDS *pds.HoldPDS) HoldAuthorizer {
1919- return &LocalHoldAuthorizer{
2020- pds: holdPDS,
2121- }
2222-}
2323-2424-// NewLocalHoldAuthorizerFromInterface creates a new local authorizer from an any
2525-// This is used to avoid import cycles - caller must pass a *pds.HoldPDS
2626-func NewLocalHoldAuthorizerFromInterface(holdPDS any) HoldAuthorizer {
2727- // Type assert to *pds.HoldPDS
2828- if pdsTyped, ok := holdPDS.(*pds.HoldPDS); ok {
2929- return &LocalHoldAuthorizer{
3030- pds: pdsTyped,
3131- }
3232- }
3333- // Return nil if type assertion fails - caller should check
3434- return nil
3535-}
3636-3737-// GetCaptainRecord retrieves the captain record from the hold's PDS
3838-func (a *LocalHoldAuthorizer) GetCaptainRecord(ctx context.Context, holdDID string) (*atproto.CaptainRecord, error) {
3939- // Verify that the requested holdDID matches this hold
4040- if holdDID != a.pds.DID() {
4141- return nil, fmt.Errorf("holdDID mismatch: requested %s, this hold is %s", holdDID, a.pds.DID())
4242- }
4343-4444- // Query the PDS for captain record
4545- _, pdsCaptain, err := a.pds.GetCaptainRecord(ctx)
4646- if err != nil {
4747- return nil, fmt.Errorf("failed to get captain record: %w", err)
4848- }
4949-5050- // The PDS returns *atproto.CaptainRecord directly now (after we update pds to use atproto types)
5151- return pdsCaptain, nil
5252-}
5353-5454-// IsCrewMember checks if userDID is a crew member
5555-func (a *LocalHoldAuthorizer) IsCrewMember(ctx context.Context, holdDID, userDID string) (bool, error) {
5656- // Verify that the requested holdDID matches this hold
5757- if holdDID != a.pds.DID() {
5858- return false, fmt.Errorf("holdDID mismatch: requested %s, this hold is %s", holdDID, a.pds.DID())
5959- }
6060-6161- // Query the PDS for crew list
6262- crewList, err := a.pds.ListCrewMembers(ctx)
6363- if err != nil {
6464- return false, fmt.Errorf("failed to list crew members: %w", err)
6565- }
6666-6767- // Check if userDID is in the crew list
6868- for _, member := range crewList {
6969- if member.Record.Member == userDID {
7070- // TODO: Check expiration if set
7171- return true, nil
7272- }
7373- }
7474-7575- return false, nil
7676-}
7777-7878-// CheckReadAccess implements read authorization using shared logic
7979-func (a *LocalHoldAuthorizer) CheckReadAccess(ctx context.Context, holdDID, userDID string) (bool, error) {
8080- captain, err := a.GetCaptainRecord(ctx, holdDID)
8181- if err != nil {
8282- return false, err
8383- }
8484-8585- return CheckReadAccessWithCaptain(captain, userDID), nil
8686-}
8787-8888-// CheckWriteAccess implements write authorization using shared logic
8989-func (a *LocalHoldAuthorizer) CheckWriteAccess(ctx context.Context, holdDID, userDID string) (bool, error) {
9090- captain, err := a.GetCaptainRecord(ctx, holdDID)
9191- if err != nil {
9292- return false, err
9393- }
9494-9595- isCrew, err := a.IsCrewMember(ctx, holdDID, userDID)
9696- if err != nil {
9797- return false, err
9898- }
9999-100100- return CheckWriteAccessWithCaptain(captain, userDID, isCrew), nil
101101-}
102102-103103-// ClearCrewDenial is a no-op for LocalHoldAuthorizer
104104-// Local authorizer queries PDS directly without caching
105105-func (a *LocalHoldAuthorizer) ClearCrewDenial(ctx context.Context, holdDID, userDID string) error {
106106- return nil
107107-}
···11+// Package db contains a vendored from github.com/bluesky-social/indigo/carstore/sqlite_store.go
22+// Source: github.com/bluesky-social/indigo@v0.0.0-20260203235305-a86f3ae1f8ec/carstore/
33+// Reason: indigo's carstore hardcodes mattn/go-sqlite3, which conflicts with go-libsql
44+// (both bundle SQLite C libraries and cannot coexist in the same binary).
55+//
66+// This package replaces the mattn driver with go-libsql and removes Prometheus metrics.
77+// Once upstream accepts a driver-agnostic constructor, this vendored copy can be removed.
88+// Modifications:
99+// - Replaced mattn/go-sqlite3 driver with go-libsql
1010+// - Removed all Prometheus metric counters and .Inc() calls
1111+// - Changed package from 'carstore' to 'db'
1212+// - Added NewSQLiteStoreWithDB constructor for injecting an existing *sql.DB
1313+// - Changed sql.Open("sqlite3", path) to sql.Open("libsql", ...) with proper DSN
1414+1515+package db
1616+1717+import (
1818+ "bytes"
1919+ "context"
2020+ "database/sql"
2121+ "errors"
2222+ "fmt"
2323+ "io"
2424+ "log/slog"
2525+ "os"
2626+ "path/filepath"
2727+ "strings"
2828+2929+ "go.opentelemetry.io/otel/attribute"
3030+3131+ "github.com/bluesky-social/indigo/models"
3232+ blockformat "github.com/ipfs/go-block-format"
3333+ "github.com/ipfs/go-cid"
3434+ "github.com/ipfs/go-libipfs/blocks"
3535+ "github.com/ipld/go-car"
3636+ _ "github.com/tursodatabase/go-libsql"
3737+ "go.opentelemetry.io/otel"
3838+)
3939+4040+// CarShard represents metadata about a stored shard.
4141+// Stripped of gorm tags since we don't use gorm in the SQLite store.
4242+type CarShard struct {
4343+ Root models.DbCID
4444+ DataStart int64
4545+ Seq int
4646+ Path string
4747+ Usr models.Uid
4848+ Rev string
4949+}
5050+5151+type SQLiteStore struct {
5252+ dbPath string
5353+ db *sql.DB
5454+5555+ log *slog.Logger
5656+5757+ lastShardCache lastShardCache
5858+}
5959+6060+func ensureDir(path string) error {
6161+ fi, err := os.Stat(path)
6262+ if err != nil {
6363+ if os.IsNotExist(err) {
6464+ return os.MkdirAll(path, 0755)
6565+ }
6666+ return err
6767+ }
6868+ if fi.IsDir() {
6969+ return nil
7070+ }
7171+ return fmt.Errorf("%s exists but is not a directory", path)
7272+}
7373+7474+func NewSqliteStore(csdir string) (*SQLiteStore, error) {
7575+ if err := ensureDir(csdir); err != nil {
7676+ return nil, err
7777+ }
7878+ dbpath := filepath.Join(csdir, "db.sqlite3")
7979+ out := new(SQLiteStore)
8080+ err := out.Open(dbpath)
8181+ if err != nil {
8282+ return nil, err
8383+ }
8484+ return out, nil
8585+}
8686+8787+// NewSQLiteStoreWithDB creates a SQLiteStore using an existing *sql.DB connection.
8888+// This allows callers to configure the driver independently (e.g., using go-libsql
8989+// embedded replicas). The caller is responsible for the DB lifecycle.
9090+func NewSQLiteStoreWithDB(dbPath string, db *sql.DB) (*SQLiteStore, error) {
9191+ sqs := &SQLiteStore{
9292+ dbPath: dbPath,
9393+ db: db,
9494+ log: slog.Default(),
9595+ }
9696+ if err := sqs.createTables(); err != nil {
9797+ return nil, fmt.Errorf("%s: sqlite could not create tables, %w", dbPath, err)
9898+ }
9999+ sqs.lastShardCache.source = sqs
100100+ sqs.lastShardCache.Init()
101101+ return sqs, nil
102102+}
103103+104104+func (sqs *SQLiteStore) Open(path string) error {
105105+ if sqs.log == nil {
106106+ sqs.log = slog.Default()
107107+ }
108108+ sqs.log.Debug("open db", "path", path)
109109+110110+ // Build DSN for go-libsql
111111+ dsn := path
112112+ if path == ":memory:" {
113113+ dsn = ":memory:"
114114+ } else if !strings.HasPrefix(path, "file:") {
115115+ dsn = "file:" + path
116116+ }
117117+118118+ db, err := sql.Open("libsql", dsn)
119119+ if err != nil {
120120+ return fmt.Errorf("%s: sqlite could not open, %w", path, err)
121121+ }
122122+ sqs.db = db
123123+ sqs.dbPath = path
124124+ err = sqs.createTables()
125125+ if err != nil {
126126+ return fmt.Errorf("%s: sqlite could not create tables, %w", path, err)
127127+ }
128128+ sqs.lastShardCache.source = sqs
129129+ sqs.lastShardCache.Init()
130130+ return nil
131131+}
132132+133133+func (sqs *SQLiteStore) createTables() error {
134134+ tx, err := sqs.db.Begin()
135135+ if err != nil {
136136+ return err
137137+ }
138138+ defer tx.Rollback()
139139+ _, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));")
140140+ if err != nil {
141141+ return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err)
142142+ }
143143+ _, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)")
144144+ if err != nil {
145145+ return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err)
146146+ }
147147+ return tx.Commit()
148148+}
149149+150150+// writeNewShard needed for DeltaSession.CloseWithRoot
151151+func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
152152+ sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks))
153153+ ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard")
154154+ defer span.End()
155155+156156+ buf := new(bytes.Buffer)
157157+ hnw, err := WriteCarHeader(buf, root)
158158+ if err != nil {
159159+ return nil, fmt.Errorf("failed to write car header: %w", err)
160160+ }
161161+ offset := hnw
162162+163163+ tx, err := sqs.db.BeginTx(ctx, nil)
164164+ if err != nil {
165165+ return nil, fmt.Errorf("bad block insert tx, %w", err)
166166+ }
167167+ defer tx.Rollback()
168168+ insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block")
169169+ if err != nil {
170170+ return nil, fmt.Errorf("bad block insert sql, %w", err)
171171+ }
172172+ defer insertStatement.Close()
173173+174174+ dbroot := models.DbCID{CID: root}
175175+176176+ span.SetAttributes(attribute.Int("blocks", len(blks)))
177177+178178+ for bcid, block := range blks {
179179+ nw, err := LdWrite(buf, bcid.Bytes(), block.RawData())
180180+ if err != nil {
181181+ return nil, fmt.Errorf("failed to write block: %w", err)
182182+ }
183183+ offset += nw
184184+185185+ dbcid := models.DbCID{CID: bcid}
186186+ blockbytes := block.RawData()
187187+ _, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes)
188188+ if err != nil {
189189+ return nil, fmt.Errorf("(uid,cid) block store failed, %w", err)
190190+ }
191191+ sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes))
192192+ }
193193+ err = tx.Commit()
194194+ if err != nil {
195195+ return nil, fmt.Errorf("bad block insert commit, %w", err)
196196+ }
197197+198198+ shard := CarShard{
199199+ Root: models.DbCID{CID: root},
200200+ DataStart: hnw,
201201+ Seq: seq,
202202+ Usr: user,
203203+ Rev: rev,
204204+ }
205205+206206+ sqs.lastShardCache.put(&shard)
207207+208208+ return buf.Bytes(), nil
209209+}
210210+211211+var ErrNothingThere = errors.New("nothing to read)")
212212+213213+// GetLastShard needed for NewDeltaSession indirectly through lastShardCache
214214+func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) {
215215+ tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
216216+ if err != nil {
217217+ return nil, fmt.Errorf("bad last shard tx, %w", err)
218218+ }
219219+ defer tx.Rollback()
220220+ qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1")
221221+ if err != nil {
222222+ return nil, fmt.Errorf("bad last shard sql, %w", err)
223223+ }
224224+ rows, err := qstmt.QueryContext(ctx, uid)
225225+ if err != nil {
226226+ return nil, fmt.Errorf("last shard err, %w", err)
227227+ }
228228+ if rows.Next() {
229229+ var rev string
230230+ var rootb models.DbCID
231231+ err = rows.Scan(&rev, &rootb)
232232+ if err != nil {
233233+ return nil, fmt.Errorf("last shard bad scan, %w", err)
234234+ }
235235+ return &CarShard{
236236+ Root: rootb,
237237+ Rev: rev,
238238+ }, nil
239239+ }
240240+ return nil, nil
241241+}
242242+243243+func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
244244+ sqs.log.Warn("TODO: don't call compaction")
245245+ return nil, nil
246246+}
247247+248248+func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
249249+ sqs.log.Warn("TODO: don't call compaction targets")
250250+ return nil, nil
251251+}
252252+253253+func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
254254+ lastShard, err := sqs.lastShardCache.get(ctx, user)
255255+ if err != nil {
256256+ return cid.Undef, err
257257+ }
258258+ if lastShard == nil {
259259+ return cid.Undef, nil
260260+ }
261261+262262+ return lastShard.Root.CID, nil
263263+}
264264+265265+func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
266266+ lastShard, err := sqs.lastShardCache.get(ctx, user)
267267+ if err != nil {
268268+ return "", err
269269+ }
270270+ if lastShard == nil {
271271+ return "", nil
272272+ }
273273+274274+ return lastShard.Rev, nil
275275+}
276276+277277+func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
278278+ ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
279279+ defer span.End()
280280+281281+ carr, err := car.NewCarReader(bytes.NewReader(carslice))
282282+ if err != nil {
283283+ return cid.Undef, nil, err
284284+ }
285285+286286+ if len(carr.Header.Roots) != 1 {
287287+ return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
288288+ }
289289+290290+ ds, err := sqs.NewDeltaSession(ctx, uid, since)
291291+ if err != nil {
292292+ return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
293293+ }
294294+295295+ for {
296296+ blk, err := carr.Next()
297297+ if err != nil {
298298+ if err == io.EOF {
299299+ break
300300+ }
301301+ return cid.Undef, nil, err
302302+ }
303303+304304+ if err := ds.Put(ctx, blk); err != nil {
305305+ return cid.Undef, nil, err
306306+ }
307307+ }
308308+309309+ return carr.Header.Roots[0], ds, nil
310310+}
311311+312312+var zeroShard CarShard
313313+314314+func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
315315+ ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
316316+ defer span.End()
317317+318318+ lastShard, err := sqs.lastShardCache.get(ctx, user)
319319+ if err != nil {
320320+ return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err)
321321+ }
322322+323323+ if lastShard == nil {
324324+ lastShard = &zeroShard
325325+ }
326326+327327+ if since != nil && *since != lastShard.Rev {
328328+ return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch)
329329+ }
330330+331331+ return &DeltaSession{
332332+ blks: make(map[cid.Cid]blockformat.Block),
333333+ base: &sqliteUserView{
334334+ uid: user,
335335+ sqs: sqs,
336336+ },
337337+ user: user,
338338+ baseCid: lastShard.Root.CID,
339339+ cs: sqs,
340340+ seq: lastShard.Seq + 1,
341341+ lastRev: lastShard.Rev,
342342+ }, nil
343343+}
344344+345345+func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
346346+ return &DeltaSession{
347347+ base: &sqliteUserView{
348348+ uid: user,
349349+ sqs: sqs,
350350+ },
351351+ readonly: true,
352352+ user: user,
353353+ cs: sqs,
354354+ }, nil
355355+}
356356+357357+// ReadUserCar writes a CAR file for the user's blocks since sinceRev.
358358+func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error {
359359+ ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
360360+ defer span.End()
361361+362362+ tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
363363+ if err != nil {
364364+ return fmt.Errorf("rcar tx, %w", err)
365365+ }
366366+ defer tx.Rollback()
367367+ qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC")
368368+ if err != nil {
369369+ return fmt.Errorf("rcar sql, %w", err)
370370+ }
371371+ defer qstmt.Close()
372372+ rows, err := qstmt.QueryContext(ctx, user, sinceRev)
373373+ if err != nil {
374374+ return fmt.Errorf("rcar err, %w", err)
375375+ }
376376+ nblocks := 0
377377+ first := true
378378+ for rows.Next() {
379379+ var xcid models.DbCID
380380+ var xrev string
381381+ var xroot models.DbCID
382382+ var xblock []byte
383383+ err = rows.Scan(&xcid, &xrev, &xroot, &xblock)
384384+ if err != nil {
385385+ return fmt.Errorf("rcar bad scan, %w", err)
386386+ }
387387+ if first {
388388+ if err := car.WriteHeader(&car.CarHeader{
389389+ Roots: []cid.Cid{xroot.CID},
390390+ Version: 1,
391391+ }, shardOut); err != nil {
392392+ return fmt.Errorf("rcar bad header, %w", err)
393393+ }
394394+ first = false
395395+ }
396396+ nblocks++
397397+ _, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock)
398398+ if err != nil {
399399+ return fmt.Errorf("rcar bad write, %w", err)
400400+ }
401401+ }
402402+ sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev)
403403+ return nil
404404+}
405405+406406+// Stat is only used in a debugging admin handler
407407+func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
408408+ sqs.log.Warn("Stat debugging method not implemented for sqlite store")
409409+ return nil, nil
410410+}
411411+412412+func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error {
413413+ ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData")
414414+ defer span.End()
415415+ tx, err := sqs.db.BeginTx(ctx, nil)
416416+ if err != nil {
417417+ return fmt.Errorf("wipe tx, %w", err)
418418+ }
419419+ defer tx.Rollback()
420420+ _, err = tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user)
421421+ if err == nil {
422422+ err = tx.Commit()
423423+ }
424424+ return err
425425+}
426426+427427+// go-libsql does not support ReadOnly transactions, so we use default options.
428428+var txReadOnly = sql.TxOptions{}
429429+430430+// HasUIDCid needed for NewDeltaSession userView
431431+func (sqs *SQLiteStore) HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) {
432432+ tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
433433+ if err != nil {
434434+ return false, fmt.Errorf("hasUC tx, %w", err)
435435+ }
436436+ defer tx.Rollback()
437437+ qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
438438+ if err != nil {
439439+ return false, fmt.Errorf("hasUC sql, %w", err)
440440+ }
441441+ defer qstmt.Close()
442442+ rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
443443+ if err != nil {
444444+ return false, fmt.Errorf("hasUC err, %w", err)
445445+ }
446446+ if rows.Next() {
447447+ var rev string
448448+ var rootb models.DbCID
449449+ err = rows.Scan(&rev, &rootb)
450450+ if err != nil {
451451+ return false, fmt.Errorf("hasUC bad scan, %w", err)
452452+ }
453453+ return true, nil
454454+ }
455455+ return false, nil
456456+}
457457+458458+func (sqs *SQLiteStore) CarStore() CarStore {
459459+ return sqs
460460+}
461461+462462+func (sqs *SQLiteStore) Close() error {
463463+ return sqs.db.Close()
464464+}
465465+466466+func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) {
467467+ tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
468468+ if err != nil {
469469+ return nil, fmt.Errorf("getb tx, %w", err)
470470+ }
471471+ defer tx.Rollback()
472472+ qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
473473+ if err != nil {
474474+ return nil, fmt.Errorf("getb sql, %w", err)
475475+ }
476476+ defer qstmt.Close()
477477+ rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
478478+ if err != nil {
479479+ return nil, fmt.Errorf("getb err, %w", err)
480480+ }
481481+ if rows.Next() {
482482+ var blockb []byte
483483+ err = rows.Scan(&blockb)
484484+ if err != nil {
485485+ return nil, fmt.Errorf("getb bad scan, %w", err)
486486+ }
487487+ blk, err := blocks.NewBlockWithCid(blockb, bcid)
488488+ if err != nil {
489489+ return nil, fmt.Errorf("getb bad block, %w", err)
490490+ }
491491+ return blk, nil
492492+ }
493493+ return nil, ErrNothingThere
494494+}
495495+496496+func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) {
497497+ tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
498498+ if err != nil {
499499+ return 0, fmt.Errorf("getbs tx, %w", err)
500500+ }
501501+ defer tx.Rollback()
502502+ qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
503503+ if err != nil {
504504+ return 0, fmt.Errorf("getbs sql, %w", err)
505505+ }
506506+ defer qstmt.Close()
507507+ rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
508508+ if err != nil {
509509+ return 0, fmt.Errorf("getbs err, %w", err)
510510+ }
511511+ if rows.Next() {
512512+ var out int64
513513+ err = rows.Scan(&out)
514514+ if err != nil {
515515+ return 0, fmt.Errorf("getbs bad scan, %w", err)
516516+ }
517517+ return out, nil
518518+ }
519519+ return 0, nil
520520+}
521521+522522+type sqliteUserViewInner interface {
523523+ HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error)
524524+ getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error)
525525+ getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error)
526526+}
527527+528528+type sqliteUserView struct {
529529+ sqs sqliteUserViewInner
530530+ uid models.Uid
531531+}
532532+533533+func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) {
534534+ return s.sqs.HasUIDCid(ctx, s.uid, c)
535535+}
536536+537537+func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) {
538538+ return s.sqs.getBlock(ctx, s.uid, c)
539539+}
540540+541541+func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) {
542542+ bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c)
543543+ return int(bigsize), err
544544+}
545545+546546+// ensure we implement the interface
547547+var _ minBlockstore = (*sqliteUserView)(nil)
+32
pkg/hold/db/util.go
···11+package db
22+33+import (
44+ "encoding/binary"
55+ "io"
66+)
77+88+// LdWrite performs a length-delimited write.
99+// Writer stream gets Uvarint length then concatenated data.
1010+func LdWrite(w io.Writer, d ...[]byte) (int64, error) {
1111+ var sum uint64
1212+ for _, s := range d {
1313+ sum += uint64(len(s))
1414+ }
1515+1616+ buf := make([]byte, 8)
1717+ n := binary.PutUvarint(buf, sum)
1818+ nw, err := w.Write(buf[:n])
1919+ if err != nil {
2020+ return 0, err
2121+ }
2222+2323+ for _, s := range d {
2424+ onw, err := w.Write(s)
2525+ if err != nil {
2626+ return int64(nw), err
2727+ }
2828+ nw += onw
2929+ }
3030+3131+ return int64(nw), nil
3232+}
+25-18
pkg/hold/pds/events.go
···1919 "github.com/ipfs/go-cid"
2020 "github.com/ipld/go-car"
2121 carutil "github.com/ipld/go-car/util"
2222- _ "github.com/mattn/go-sqlite3"
2222+ _ "github.com/tursodatabase/go-libsql"
2323)
24242525// EventBroadcaster manages WebSocket connections and broadcasts repo events
···9090// initDatabase opens database connection, creates table, and loads last sequence
9191func (b *EventBroadcaster) initDatabase() error {
9292 // Open database connection
9393- db, err := sql.Open("sqlite3", b.dbPath)
9393+ dsn := b.dbPath
9494+ if b.dbPath != ":memory:" && !strings.HasPrefix(b.dbPath, "file:") {
9595+ dsn = "file:" + b.dbPath
9696+ }
9797+ db, err := sql.Open("libsql", dsn)
9498 if err != nil {
9599 return err
96100 }
···104108 b.db = db
105109106110 // Create events table if it doesn't exist
107107- schema := `
108108- CREATE TABLE IF NOT EXISTS firehose_events (
109109- seq INTEGER PRIMARY KEY,
110110- commit_cid TEXT NOT NULL,
111111- rev TEXT NOT NULL,
112112- since_rev TEXT,
113113- repo_slice BLOB NOT NULL,
114114- ops_json TEXT NOT NULL,
115115- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
116116- );
117117- CREATE INDEX IF NOT EXISTS idx_firehose_events_rev ON firehose_events(rev);
118118- `
111111+ // Execute statements individually for go-libsql compatibility
112112+ stmts := []string{
113113+ `CREATE TABLE IF NOT EXISTS firehose_events (
114114+ seq INTEGER PRIMARY KEY,
115115+ commit_cid TEXT NOT NULL,
116116+ rev TEXT NOT NULL,
117117+ since_rev TEXT,
118118+ repo_slice BLOB NOT NULL,
119119+ ops_json TEXT NOT NULL,
120120+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
121121+ )`,
122122+ `CREATE INDEX IF NOT EXISTS idx_firehose_events_rev ON firehose_events(rev)`,
123123+ }
119124120120- if _, err := db.Exec(schema); err != nil {
121121- db.Close()
122122- b.db = nil
123123- return err
125125+ for _, stmt := range stmts {
126126+ if _, err := db.Exec(stmt); err != nil {
127127+ db.Close()
128128+ b.db = nil
129129+ return err
130130+ }
124131 }
125132126133 // Load last sequence number from database
+26-7
pkg/hold/pds/records.go
···1111 "atcr.io/pkg/atproto"
1212 "github.com/bluesky-social/indigo/repo"
1313 "github.com/ipfs/go-cid"
1414- _ "github.com/mattn/go-sqlite3"
1414+ _ "github.com/tursodatabase/go-libsql"
1515)
16161717// RecordsIndex provides an efficient index for listing records
···4444// NewRecordsIndex creates or opens a records index
4545// If the schema is outdated (missing did column), drops and rebuilds the table
4646func NewRecordsIndex(dbPath string) (*RecordsIndex, error) {
4747- db, err := sql.Open("sqlite3", dbPath)
4747+ dsn := dbPath
4848+ if dbPath != ":memory:" && !strings.HasPrefix(dbPath, "file:") {
4949+ dsn = "file:" + dbPath
5050+ }
5151+ db, err := sql.Open("libsql", dsn)
4852 if err != nil {
4953 return nil, fmt.Errorf("failed to open records database: %w", err)
5054 }
···7276 }
7377 }
74787575- // Create schema
7676- _, err = db.Exec(recordsSchema)
7777- if err != nil {
7878- db.Close()
7979- return nil, fmt.Errorf("failed to create records schema: %w", err)
7979+ // Create schema (execute statements individually for go-libsql compatibility)
8080+ for _, stmt := range splitStatements(recordsSchema) {
8181+ if _, err = db.Exec(stmt); err != nil {
8282+ db.Close()
8383+ return nil, fmt.Errorf("failed to create records schema: %w", err)
8484+ }
8085 }
81868287 return &RecordsIndex{db: db}, nil
···338343339344 slog.Info("Backfill complete", "records", recordCount)
340345 return nil
346346+}
347347+348348+// splitStatements splits a multi-statement SQL string on semicolons,
349349+// returning only non-empty statements. go-libsql does not support
350350+// executing multiple statements in a single Exec call.
351351+func splitStatements(sql string) []string {
352352+ var out []string
353353+ for _, s := range strings.Split(sql, ";") {
354354+ s = strings.TrimSpace(s)
355355+ if s != "" {
356356+ out = append(out, s)
357357+ }
358358+ }
359359+ return out
341360}
342361343362// extractDIDFromRecord extracts the associated DID from a record based on its collection type
···88 "encoding/json"
99 "fmt"
1010 "log/slog"
1111+ "strings"
1112 "sync"
1213 "time"
1314···7980// NewScanBroadcaster creates a new scan job broadcaster
8081// dbPath should point to a SQLite database file (e.g., "/path/to/pds/db.sqlite3")
8182func NewScanBroadcaster(holdDID, holdEndpoint, secret, dbPath string, driver storagedriver.StorageDriver, holdPDS *HoldPDS) (*ScanBroadcaster, error) {
8282- db, err := sql.Open("sqlite3", dbPath)
8383+ dsn := dbPath
8484+ if dbPath != ":memory:" && !strings.HasPrefix(dbPath, "file:") {
8585+ dsn = "file:" + dbPath
8686+ }
8787+ db, err := sql.Open("libsql", dsn)
8388 if err != nil {
8489 return nil, fmt.Errorf("failed to open scan jobs database: %w", err)
8590 }
···112117113118// initSchema creates the scan_jobs table if it doesn't exist
114119func (sb *ScanBroadcaster) initSchema() error {
115115- schema := `
116116- CREATE TABLE IF NOT EXISTS scan_jobs (
117117- seq INTEGER PRIMARY KEY AUTOINCREMENT,
118118- manifest_digest TEXT NOT NULL,
119119- repository TEXT NOT NULL,
120120- tag TEXT,
121121- user_did TEXT NOT NULL,
122122- user_handle TEXT,
123123- hold_did TEXT NOT NULL,
124124- hold_endpoint TEXT NOT NULL,
125125- tier TEXT NOT NULL DEFAULT 'deckhand',
126126- config_json TEXT NOT NULL,
127127- layers_json TEXT NOT NULL,
128128- status TEXT NOT NULL DEFAULT 'pending',
129129- assigned_to TEXT,
130130- assigned_at TIMESTAMP,
131131- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
132132- completed_at TIMESTAMP
133133- );
134134- CREATE INDEX IF NOT EXISTS idx_scan_jobs_status ON scan_jobs(status);
135135- CREATE INDEX IF NOT EXISTS idx_scan_jobs_assigned ON scan_jobs(assigned_to, status);
136136- `
137137- _, err := sb.db.Exec(schema)
138138- return err
120120+ // Execute statements individually for go-libsql compatibility
121121+ stmts := []string{
122122+ `CREATE TABLE IF NOT EXISTS scan_jobs (
123123+ seq INTEGER PRIMARY KEY AUTOINCREMENT,
124124+ manifest_digest TEXT NOT NULL,
125125+ repository TEXT NOT NULL,
126126+ tag TEXT,
127127+ user_did TEXT NOT NULL,
128128+ user_handle TEXT,
129129+ hold_did TEXT NOT NULL,
130130+ hold_endpoint TEXT NOT NULL,
131131+ tier TEXT NOT NULL DEFAULT 'deckhand',
132132+ config_json TEXT NOT NULL,
133133+ layers_json TEXT NOT NULL,
134134+ status TEXT NOT NULL DEFAULT 'pending',
135135+ assigned_to TEXT,
136136+ assigned_at TIMESTAMP,
137137+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
138138+ completed_at TIMESTAMP
139139+ )`,
140140+ `CREATE INDEX IF NOT EXISTS idx_scan_jobs_status ON scan_jobs(status)`,
141141+ `CREATE INDEX IF NOT EXISTS idx_scan_jobs_assigned ON scan_jobs(assigned_to, status)`,
142142+ }
143143+ for _, stmt := range stmts {
144144+ if _, err := sb.db.Exec(stmt); err != nil {
145145+ return err
146146+ }
147147+ }
148148+ return nil
139149}
140150141151// Enqueue inserts a scan job into SQLite and dispatches to the next available scanner
+7-7
pkg/hold/pds/server.go
···10101111 "atcr.io/pkg/atproto"
1212 "atcr.io/pkg/auth/oauth"
1313+ holddb "atcr.io/pkg/hold/db"
1314 "github.com/bluesky-social/indigo/atproto/atcrypto"
1414- "github.com/bluesky-social/indigo/carstore"
1515 lexutil "github.com/bluesky-social/indigo/lex/util"
1616 "github.com/bluesky-social/indigo/models"
1717 "github.com/bluesky-social/indigo/repo"
···3636type HoldPDS struct {
3737 did string
3838 PublicURL string
3939- carstore carstore.CarStore
3939+ carstore holddb.CarStore
4040 repomgr *RepoManager
4141 dbPath string
4242 uid models.Uid
···5353 return nil, fmt.Errorf("failed to initialize signing key: %w", err)
5454 }
55555656- // Create SQLite-backed carstore
5757- var sqlStore *carstore.SQLiteStore
5656+ // Create SQLite-backed carstore (using vendored libsql-based store)
5757+ var sqlStore *holddb.SQLiteStore
58585959 if dbPath == ":memory:" {
6060 // In-memory mode for tests: create carstore manually and open with :memory:
6161- sqlStore = new(carstore.SQLiteStore)
6161+ sqlStore = new(holddb.SQLiteStore)
6262 if err := sqlStore.Open(":memory:"); err != nil {
6363 return nil, fmt.Errorf("failed to open in-memory sqlite store: %w", err)
6464 }
···7070 }
71717272 // dbPath is the directory, carstore creates and opens db.sqlite3 inside it
7373- sqlStore, err = carstore.NewSqliteStore(dbPath)
7373+ sqlStore, err = holddb.NewSqliteStore(dbPath)
7474 if err != nil {
7575 return nil, fmt.Errorf("failed to create sqlite store: %w", err)
7676 }
···144144}
145145146146// Carstore returns the carstore for repo operations
147147-func (p *HoldPDS) Carstore() carstore.CarStore {
147147+func (p *HoldPDS) Carstore() holddb.CarStore {
148148 return p.carstore
149149}
150150