···19192020import (
2121 "context"
2222+ "encoding/json"
2223 "fmt"
2324 "time"
2425···2829 "tangled.org/core/api/tangled"
2930)
30313232+// jetstream operation strings. The jetstream protocol publishes these as
3333+// the Commit.Operation field; pulling them out as constants keeps the
3434+// switch in handleJetstreamEvent honest about typos.
3535+const (
3636+ jsOpCreate = "create"
3737+ jsOpUpdate = "update"
3838+ jsOpDelete = "delete"
3939+)
4040+3141// startJetstream dials the configured jetstream endpoint and spawns a
3242// background goroutine that consumes events for the lifetime of ctx. It
3343// returns once the client is constructed; connection errors surface in
3444// logs, not return values, because the read loop is expected to reconnect
3545// on its own.
3646//
4747+// The store is used for two things: loading the persisted cursor so we
4848+// resume from the last seen event after a restart, and persisting
4949+// observed records so the rest of tack can answer membership questions
5050+// without re-reading the firehose.
5151+//
3752// The logger is pulled from ctx (see log.go); falls back to slog.Default()
3853// if none is attached.
3939-func startJetstream(ctx context.Context, cfg config) error {
5454+func startJetstream(ctx context.Context, cfg config, st *store) error {
4055 logger := loggerFrom(ctx).With("component", "jetstream")
41564257 // `wantedCollections` is a server-side filter: jetstream will only send
···5469 clientCfg.WebsocketURL = cfg.JetstreamURL
5570 clientCfg.WantedCollections = collections
56715757- // Re-attach the component-scoped logger so handleJetstreamEvent — which
5858- // the scheduler invokes with the ctx we pass to ConnectAndRead — can
7272+ // The handler closes over `st` and `logger` so the scheduler signature
7373+ // stays plain `func(ctx, *Event) error` — no need for a method
7474+ // receiver or a global.
7575+ handler := func(ctx context.Context, evt *jsmodels.Event) error {
7676+ return handleJetstreamEvent(ctx, st, evt)
7777+ }
7878+7979+ // Re-attach the component-scoped logger so handler — which the
8080+ // scheduler invokes with the ctx we pass to ConnectAndRead — can
5981 // pull it back out via loggerFrom.
6082 ctx = loggerInto(ctx, logger)
6183···6688 c, err := client.NewClient(
6789 clientCfg,
6890 logger,
6969- sequential.NewScheduler(
7070- "tack",
7171- logger,
7272- handleJetstreamEvent),
9191+ sequential.NewScheduler("tack", logger, handler),
7392 )
7493 if err != nil {
7594 return fmt.Errorf("new jetstream client: %w", err)
7695 }
77967878- // Reconnect loop. ConnectAndRead blocks on the websocket and returns
7979- // either when the connection drops (transient network error, server
8080- // restart, etc.) or when ctx is cancelled. On error we sleep briefly
8181- // and reconnect; on ctx cancellation we exit cleanly.
8282- //
8383- // TODO: pass a *cursor here once we persist one, so we resume from the
8484- // last seen event instead of "now" after a restart.
8597 go func() {
8698 for {
8787- if err := c.ConnectAndRead(ctx, nil); err != nil {
9999+ // We re-read the cursor from the store at every (re)connect so we
100100+ // pick up any progress the previous connection persisted before
101101+ // dying. nil means "start from now", which is the right default on
102102+ // a brand-new install or after a corrupt cursor read.
103103+ cur, err := st.LoadCursor(ctx)
104104+ if err != nil {
105105+ logger.Warn("ignoring unreadable cursor; resuming from now", "err", err)
106106+ cur = nil
107107+ }
108108+ if cur != nil {
109109+ logger.Info("connecting to jetstream", "cursor_us", *cur)
110110+ } else {
111111+ logger.Info("connecting to jetstream from now (no cursor)")
112112+ }
113113+114114+ // Reconnect loop. ConnectAndRead blocks on the websocket and returns
115115+ // either when the connection drops (transient network error, server
116116+ // restart, etc.) or when ctx is cancelled. On error we sleep briefly
117117+ // and reconnect; on ctx cancellation we exit cleanly.
118118+ if err := c.ConnectAndRead(ctx, cur); err != nil {
88119 if ctx.Err() != nil {
89120 return
90121 }
···101132 return nil
102133}
103134104104-// handleJetstreamEvent is the per-event callback for the JetStream.
105105-func handleJetstreamEvent(ctx context.Context, evt *jsmodels.Event) error {
106106- // We only care about commits, which are the actual record CRUD operations
107107- // on a user's PDS.
135135+// handleJetstreamEvent is the per-event callback for the JetStream. It
136136+// applies the event to the store and advances the persisted cursor. Any
137137+// returned error is logged by the scheduler but does not tear down the
138138+// connection — the next event will retry the cursor write implicitly.
139139+func handleJetstreamEvent(ctx context.Context, st *store, evt *jsmodels.Event) error {
140140+ // We only care about commits, which are the actual record CRUD
141141+ // operations on a user's PDS. Account/identity events are ignored
142142+ // for now; if we ever care about handle changes we can add them.
108143 if evt.Kind != jsmodels.EventKindCommit || evt.Commit == nil {
109144 return nil
110145 }
146146+ logger := loggerFrom(ctx)
111147112112- loggerFrom(ctx).Debug("event",
113113- "did", evt.Did,
114114- "collection", evt.Commit.Collection,
115115- "op", evt.Commit.Operation,
116116- "rkey", evt.Commit.RKey,
117117- )
148148+ // Dispatch on collection. Unknown collections shouldn't happen given
149149+ // our wantedCollections filter, but be defensive — jetstream may
150150+ // send schema changes ahead of us updating the filter.
151151+ if err := applyCommit(ctx, st, evt); err != nil {
152152+ logger.Error("apply commit",
153153+ "err", err,
154154+ "did", evt.Did,
155155+ "collection", evt.Commit.Collection,
156156+ "op", evt.Commit.Operation,
157157+ "rkey", evt.Commit.RKey,
158158+ )
159159+160160+ // Fall through to cursor save: a single bad record shouldn't
161161+ // stall the cursor forever and force us to re-process every
162162+ // subsequent event after a restart.
163163+ }
164164+165165+ // Advance the cursor. TimeUS is the jetstream-assigned microsecond
166166+ // timestamp; saving it after-apply means a crash mid-batch will at
167167+ // worst replay the failing event, never skip past it.
168168+ if err := st.SaveCursor(ctx, evt.TimeUS); err != nil {
169169+ // Returning the error logs it; it doesn't kill the scheduler.
170170+ return fmt.Errorf("save cursor: %w", err)
171171+ }
172172+118173 return nil
119174}
175175+176176+// applyCommit routes a commit to the right store mutation based on its
177177+// collection NSID and operation.
178178+func applyCommit(ctx context.Context, st *store, evt *jsmodels.Event) error {
179179+ c := evt.Commit
180180+ switch c.Collection {
181181+ case tangled.SpindleMemberNSID:
182182+ return applySpindleMember(ctx, st, evt.Did, c)
183183+ case tangled.RepoNSID:
184184+ return applyRepo(ctx, st, evt.Did, c)
185185+ case tangled.RepoCollaboratorNSID:
186186+ return applyRepoCollaborator(ctx, st, evt.Did, c)
187187+ default:
188188+ // Server-side filter should prevent this, but log so we notice
189189+ // if jetstream ever changes behavior.
190190+ loggerFrom(ctx).Debug("ignoring unexpected collection",
191191+ "collection", c.Collection)
192192+ return nil
193193+ }
194194+}
195195+196196+func applySpindleMember(ctx context.Context, st *store, did string, c *jsmodels.Commit) error {
197197+ switch c.Operation {
198198+ case jsOpCreate, jsOpUpdate:
199199+ var rec tangled.SpindleMember
200200+ if err := json.Unmarshal(c.Record, &rec); err != nil {
201201+ return fmt.Errorf("decode spindle.member: %w", err)
202202+ }
203203+ return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt)
204204+ case jsOpDelete:
205205+ return st.DeleteSpindleMember(ctx, did, c.RKey)
206206+ }
207207+ return nil
208208+}
209209+210210+func applyRepo(ctx context.Context, st *store, did string, c *jsmodels.Commit) error {
211211+ switch c.Operation {
212212+ case jsOpCreate, jsOpUpdate:
213213+ var rec tangled.Repo
214214+ if err := json.Unmarshal(c.Record, &rec); err != nil {
215215+ return fmt.Errorf("decode repo: %w", err)
216216+ }
217217+ return st.UpsertRepo(ctx, did, c.RKey,
218218+ rec.Knot, rec.Name,
219219+ deref(rec.Spindle), deref(rec.RepoDid),
220220+ rec.CreatedAt,
221221+ )
222222+ case jsOpDelete:
223223+ return st.DeleteRepo(ctx, did, c.RKey)
224224+ }
225225+ return nil
226226+}
227227+228228+func applyRepoCollaborator(ctx context.Context, st *store, did string, c *jsmodels.Commit) error {
229229+ switch c.Operation {
230230+ case jsOpCreate, jsOpUpdate:
231231+ var rec tangled.RepoCollaborator
232232+ if err := json.Unmarshal(c.Record, &rec); err != nil {
233233+ return fmt.Errorf("decode repo.collaborator: %w", err)
234234+ }
235235+ return st.UpsertRepoCollaborator(ctx, did, c.RKey,
236236+ deref(rec.Repo), deref(rec.RepoDid),
237237+ rec.Subject, rec.CreatedAt,
238238+ )
239239+ case jsOpDelete:
240240+ return st.DeleteRepoCollaborator(ctx, did, c.RKey)
241241+ }
242242+ return nil
243243+}
244244+245245+// deref returns the pointed-to string, or "" for nil. The lexicon types
246246+// model optional fields as *string; the store schema treats absent and
247247+// empty the same, so collapsing the two here keeps callers tidy.
248248+func deref(s *string) string {
249249+ if s == nil {
250250+ return ""
251251+ }
252252+ return *s
253253+}
+258
jetstream_test.go
···11+package main
22+33+// Tests for handleJetstreamEvent. We exercise the full path — collection
44+// dispatch, record decoding, store mutation, and cursor advancement —
55+// using an in-memory store from store_test.go's newTestStore helper.
66+//
77+// Events are constructed by hand rather than recorded from a live
88+// jetstream so the tests don't need network access and stay fast.
99+1010+import (
1111+ "context"
1212+ "encoding/json"
1313+ "testing"
1414+1515+ jsmodels "github.com/bluesky-social/jetstream/pkg/models"
1616+ "tangled.org/core/api/tangled"
1717+)
1818+1919+// commitEvent builds a jetstream commit event for tests. timeUS is the
2020+// cursor value the handler should persist after applying. record can be
2121+// nil for delete operations, which carry no body.
2222+func commitEvent(timeUS int64, did, collection, op, rkey string, record any) *jsmodels.Event {
2323+ var raw json.RawMessage
2424+ if record != nil {
2525+ b, err := json.Marshal(record)
2626+ if err != nil {
2727+ panic(err) // test helper — callers control the input
2828+ }
2929+ raw = b
3030+ }
3131+ return &jsmodels.Event{
3232+ Did: did,
3333+ TimeUS: timeUS,
3434+ Kind: jsmodels.EventKindCommit,
3535+ Commit: &jsmodels.Commit{
3636+ Operation: op,
3737+ Collection: collection,
3838+ RKey: rkey,
3939+ Record: raw,
4040+ },
4141+ }
4242+}
4343+4444+// requireCursor asserts the persisted cursor equals want. Pulled out
4545+// because almost every test in this file checks it.
4646+func requireCursor(t *testing.T, s *store, want int64) {
4747+ t.Helper()
4848+ got, err := s.LoadCursor(context.Background())
4949+ if err != nil {
5050+ t.Fatalf("load cursor: %v", err)
5151+ }
5252+ if got == nil || *got != want {
5353+ t.Fatalf("cursor = %v, want %d", got, want)
5454+ }
5555+}
5656+5757+// TestHandleNonCommitEvent confirms account/identity events are ignored
5858+// without error and without advancing the cursor — they don't have a
5959+// TimeUS we want to commit to.
6060+func TestHandleNonCommitEvent(t *testing.T) {
6161+ s := newTestStore(t)
6262+ ctx := context.Background()
6363+6464+ evt := &jsmodels.Event{
6565+ Did: "did:plc:foo",
6666+ TimeUS: 100,
6767+ Kind: jsmodels.EventKindAccount,
6868+ }
6969+ if err := handleJetstreamEvent(ctx, s, evt); err != nil {
7070+ t.Fatalf("handle: %v", err)
7171+ }
7272+ got, err := s.LoadCursor(ctx)
7373+ if err != nil {
7474+ t.Fatalf("load cursor: %v", err)
7575+ }
7676+ if got != nil {
7777+ t.Fatalf("expected no cursor for non-commit, got %d", *got)
7878+ }
7979+}
8080+8181+// TestHandleSpindleMemberCreate exercises the happy path: a create
8282+// commit lands a row in spindle_members and advances the cursor.
8383+func TestHandleSpindleMemberCreate(t *testing.T) {
8484+ s := newTestStore(t)
8585+ ctx := context.Background()
8686+8787+ rec := tangled.SpindleMember{
8888+ Instance: "https://spindle.example",
8989+ Subject: "did:plc:alice",
9090+ CreatedAt: "2026-01-01T00:00:00Z",
9191+ }
9292+ evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec)
9393+ if err := handleJetstreamEvent(ctx, s, evt); err != nil {
9494+ t.Fatalf("handle: %v", err)
9595+ }
9696+9797+ var instance, subject string
9898+ err := s.db.QueryRowContext(ctx,
9999+ `SELECT instance, subject FROM spindle_members WHERE did = ? AND rkey = ?`,
100100+ "did:plc:owner", "rk1",
101101+ ).Scan(&instance, &subject)
102102+ if err != nil {
103103+ t.Fatalf("query: %v", err)
104104+ }
105105+ if instance != "https://spindle.example" || subject != "did:plc:alice" {
106106+ t.Fatalf("got (%q,%q)", instance, subject)
107107+ }
108108+ requireCursor(t, s, 12345)
109109+}
110110+111111+// TestHandleSpindleMemberDelete confirms a delete commit removes the
112112+// previously-persisted row.
113113+func TestHandleSpindleMemberDelete(t *testing.T) {
114114+ s := newTestStore(t)
115115+ ctx := context.Background()
116116+117117+ if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "rk1", "i", "s", "t"); err != nil {
118118+ t.Fatalf("seed: %v", err)
119119+ }
120120+ evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil)
121121+ if err := handleJetstreamEvent(ctx, s, evt); err != nil {
122122+ t.Fatalf("handle: %v", err)
123123+ }
124124+ if n := countRows(t, s, "spindle_members"); n != 0 {
125125+ t.Fatalf("after delete: %d rows, want 0", n)
126126+ }
127127+ requireCursor(t, s, 99)
128128+}
129129+130130+// TestHandleRepoCreateOptionals verifies pointer-typed optional fields
131131+// on tangled.Repo are derefed correctly (and nils become empty strings)
132132+// when written to the store.
133133+func TestHandleRepoCreateOptionals(t *testing.T) {
134134+ s := newTestStore(t)
135135+ ctx := context.Background()
136136+137137+ spindle := "https://spindle.example"
138138+ repoDid := "did:plc:repo"
139139+ rec := tangled.Repo{
140140+ Knot: "knot.example",
141141+ Name: "myrepo",
142142+ Spindle: &spindle,
143143+ RepoDid: &repoDid,
144144+ CreatedAt: "2026-01-01T00:00:00Z",
145145+ }
146146+ evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec)
147147+ if err := handleJetstreamEvent(ctx, s, evt); err != nil {
148148+ t.Fatalf("handle: %v", err)
149149+ }
150150+151151+ var gotSpindle, gotRepoDid string
152152+ err := s.db.QueryRowContext(ctx,
153153+ `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
154154+ "did:plc:owner", "repo1",
155155+ ).Scan(&gotSpindle, &gotRepoDid)
156156+ if err != nil {
157157+ t.Fatalf("query: %v", err)
158158+ }
159159+ if gotSpindle != spindle || gotRepoDid != repoDid {
160160+ t.Fatalf("got (%q,%q)", gotSpindle, gotRepoDid)
161161+ }
162162+163163+ // Now a record with both optionals nil — should land as empty
164164+ // strings, not crash on a nil dereference in deref().
165165+ rec2 := tangled.Repo{
166166+ Knot: "knot.example",
167167+ Name: "other",
168168+ CreatedAt: "2026-01-01T00:00:00Z",
169169+ }
170170+ evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2)
171171+ if err := handleJetstreamEvent(ctx, s, evt2); err != nil {
172172+ t.Fatalf("handle nil-optionals: %v", err)
173173+ }
174174+ err = s.db.QueryRowContext(ctx,
175175+ `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
176176+ "did:plc:owner", "repo2",
177177+ ).Scan(&gotSpindle, &gotRepoDid)
178178+ if err != nil {
179179+ t.Fatalf("query nil-optionals: %v", err)
180180+ }
181181+ if gotSpindle != "" || gotRepoDid != "" {
182182+ t.Fatalf("nil optionals: got (%q,%q), want both empty", gotSpindle, gotRepoDid)
183183+ }
184184+ requireCursor(t, s, 8)
185185+}
186186+187187+// TestHandleRepoCollaboratorCreate covers the third dispatch arm so each
188188+// collection has at least one apply test.
189189+func TestHandleRepoCollaboratorCreate(t *testing.T) {
190190+ s := newTestStore(t)
191191+ ctx := context.Background()
192192+193193+ repo := "myrepo"
194194+ rec := tangled.RepoCollaborator{
195195+ Repo: &repo,
196196+ Subject: "did:plc:carol",
197197+ CreatedAt: "2026-01-01T00:00:00Z",
198198+ }
199199+ evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec)
200200+ if err := handleJetstreamEvent(ctx, s, evt); err != nil {
201201+ t.Fatalf("handle: %v", err)
202202+ }
203203+204204+ var subject string
205205+ err := s.db.QueryRowContext(ctx,
206206+ `SELECT subject FROM repo_collaborators WHERE did = ? AND rkey = ?`,
207207+ "did:plc:owner", "c1",
208208+ ).Scan(&subject)
209209+ if err != nil {
210210+ t.Fatalf("query: %v", err)
211211+ }
212212+ if subject != "did:plc:carol" {
213213+ t.Fatalf("subject = %q", subject)
214214+ }
215215+ requireCursor(t, s, 55)
216216+}
217217+218218+// TestHandleUnknownCollection makes sure a collection we didn't ask for
219219+// (jetstream filter changes, schema drift) is silently dropped — no
220220+// error, no row, but cursor still advances so we don't replay it.
221221+func TestHandleUnknownCollection(t *testing.T) {
222222+ s := newTestStore(t)
223223+ ctx := context.Background()
224224+225225+ evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"})
226226+ if err := handleJetstreamEvent(ctx, s, evt); err != nil {
227227+ t.Fatalf("handle: %v", err)
228228+ }
229229+ requireCursor(t, s, 42)
230230+}
231231+232232+// TestHandleBadRecordAdvancesCursor is the failure-mode counterpart to
233233+// the happy paths: a malformed record body must be logged-and-skipped
234234+// (not returned as an error that pauses the scheduler) and the cursor
235235+// must still advance so we don't loop on the same bad event forever.
236236+func TestHandleBadRecordAdvancesCursor(t *testing.T) {
237237+ s := newTestStore(t)
238238+ ctx := context.Background()
239239+240240+ evt := &jsmodels.Event{
241241+ Did: "did:plc:owner",
242242+ TimeUS: 1000,
243243+ Kind: jsmodels.EventKindCommit,
244244+ Commit: &jsmodels.Commit{
245245+ Operation: jsOpCreate,
246246+ Collection: tangled.SpindleMemberNSID,
247247+ RKey: "broken",
248248+ Record: json.RawMessage(`{not valid json`),
249249+ },
250250+ }
251251+ if err := handleJetstreamEvent(ctx, s, evt); err != nil {
252252+ t.Fatalf("handle should swallow decode error, got: %v", err)
253253+ }
254254+ if n := countRows(t, s, "spindle_members"); n != 0 {
255255+ t.Fatalf("bad record should not have inserted; got %d rows", n)
256256+ }
257257+ requireCursor(t, s, 1000)
258258+}
+18-1
main.go
···2323 Addr string
2424 OwnerDID string
2525 JetstreamURL string
2626+ DBPath string
2627}
27282829func loadConfig() (config, error) {
···3031 Addr: envOr("TACK_LISTEN_ADDR", ":8080"),
3132 OwnerDID: os.Getenv("TACK_OWNER_DID"),
3233 JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"),
3434+ DBPath: envOr("TACK_DB_PATH", "tack.db"),
3335 }
3436 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)")
3537 flag.Parse()
···7577 defer stop()
7678 ctx = loggerInto(ctx, logger)
77798080+ // Open (or create) the SQLite store. Holds jetstream cursor +
8181+ // observed Tangled membership records. Closed last during shutdown
8282+ // so anything writing to it during teardown still succeeds.
8383+ st, err := openStore(cfg.DBPath)
8484+ if err != nil {
8585+ logger.Error("failed to open store", "err", err, "path", cfg.DBPath)
8686+ os.Exit(1)
8787+ }
8888+ defer func() {
8989+ if err := st.Close(); err != nil {
9090+ logger.Error("close store", "err", err)
9191+ }
9292+ }()
9393+ logger.Info("store open", "path", cfg.DBPath)
9494+7895 // Start the JetStream listener in the background.
7979- if err := startJetstream(ctx, cfg); err != nil {
9696+ if err := startJetstream(ctx, cfg, st); err != nil {
8097 logger.Error("failed to start jetstream consumer", "err", err)
8198 os.Exit(1)
8299 }
+208
store.go
···11+package main
22+33+// SQLite-backed persistence for tack.
44+//
55+// Two responsibilities live here:
66+//
77+// 1. The jetstream cursor — a microsecond unix timestamp the AT Proto
88+// firehose uses to resume from a specific point. Without persistence
99+// every restart begins at "now" and we'd silently miss any record
1010+// published while we were down.
1111+//
1212+// 2. Tangled membership state derived from jetstream commits:
1313+// sh.tangled.spindle.member, sh.tangled.repo, and
1414+// sh.tangled.repo.collaborator. We need this to later answer
1515+// "is DID X allowed to trigger a build on this spindle for repo Y?"
1616+//
1717+// We use mattn/go-sqlite3, which is the most battle-tested SQLite driver
1818+// for Go. It requires CGo, which is fine for tack — the project already
1919+// builds under Nix where a C toolchain is readily available.
2020+2121+import (
2222+ "context"
2323+ "database/sql"
2424+ "errors"
2525+ "fmt"
2626+ "strconv"
2727+2828+ _ "github.com/mattn/go-sqlite3"
2929+)
3030+3131+// store wraps the SQLite handle and exposes the small set of operations
3232+// the rest of tack needs. Keeping the surface narrow lets the persistence
3333+// layer be swapped or mocked later without rewriting callers.
3434+type store struct {
3535+ db *sql.DB
3636+}
3737+3838+// openStore opens (or creates) the SQLite database at path and applies
3939+// the schema. It also flips on WAL + NORMAL synchronous, which is the
4040+// usual "this is a long-running server, not a one-shot script" config:
4141+// concurrent reads don't block the single writer, and we trade a little
4242+// crash-safety on power loss for a lot of write throughput.
4343+func openStore(path string) (*store, error) {
4444+ // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode,
4545+ // _synchronous, _foreign_keys) and applies them on each new connection.
4646+ // journal_mode=WAL persists in the database file but setting it here
4747+ // also covers the first-ever open.
4848+ dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path)
4949+ db, err := sql.Open("sqlite3", dsn)
5050+ if err != nil {
5151+ return nil, fmt.Errorf("open sqlite %q: %w", path, err)
5252+ }
5353+5454+ // SQLite only supports one writer at a time. Capping max open conns
5555+ // at 1 keeps database/sql from spinning up extra connections that
5656+ // will only ever serialize behind that writer.
5757+ db.SetMaxOpenConns(1)
5858+5959+ s := &store{db: db}
6060+ if err := s.migrate(context.Background()); err != nil {
6161+ _ = db.Close()
6262+ return nil, err
6363+ }
6464+ return s, nil
6565+}
6666+6767+// Close releases the underlying database handle.
6868+func (s *store) Close() error {
6969+ return s.db.Close()
7070+}
7171+7272+// metaCursorKey is the meta-table key under which we persist the
7373+// jetstream cursor. Pulled out as a constant to avoid drift between
7474+// load/save sites.
7575+const metaCursorKey = "jetstream_cursor"
7676+7777+// LoadCursor returns the persisted jetstream cursor, or nil if none has
7878+// been saved yet (signaling "start from now"). The cursor is a unix
7979+// microsecond timestamp — see jetstream.Event.TimeUS.
8080+func (s *store) LoadCursor(ctx context.Context) (*int64, error) {
8181+ var raw string
8282+ err := s.db.QueryRowContext(ctx,
8383+ `SELECT value FROM meta WHERE key = ?`, metaCursorKey,
8484+ ).Scan(&raw)
8585+ if errors.Is(err, sql.ErrNoRows) {
8686+ return nil, nil
8787+ }
8888+ if err != nil {
8989+ return nil, fmt.Errorf("load cursor: %w", err)
9090+ }
9191+ v, err := strconv.ParseInt(raw, 10, 64)
9292+ if err != nil {
9393+ // A malformed cursor shouldn't wedge startup — log-and-ignore
9494+ // (return nil) so we just resume from "now". The caller has the
9595+ // logger; we surface the parse error so it can decide.
9696+ return nil, fmt.Errorf("parse cursor %q: %w", raw, err)
9797+ }
9898+ return &v, nil
9999+}
100100+101101+// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta
102102+// row is created on first call and updated thereafter.
103103+func (s *store) SaveCursor(ctx context.Context, cursor int64) error {
104104+ _, err := s.db.ExecContext(ctx,
105105+ `INSERT INTO meta (key, value) VALUES (?, ?)
106106+ ON CONFLICT(key) DO UPDATE SET value = excluded.value`,
107107+ metaCursorKey, strconv.FormatInt(cursor, 10),
108108+ )
109109+ if err != nil {
110110+ return fmt.Errorf("save cursor: %w", err)
111111+ }
112112+ return nil
113113+}
114114+115115+// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member
116116+// observation.
117117+func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error {
118118+ _, err := s.db.ExecContext(ctx,
119119+ `INSERT INTO spindle_members (did, rkey, instance, subject, created_at)
120120+ VALUES (?, ?, ?, ?, ?)
121121+ ON CONFLICT(did, rkey) DO UPDATE SET
122122+ instance = excluded.instance,
123123+ subject = excluded.subject,
124124+ created_at = excluded.created_at`,
125125+ did, rkey, instance, subject, createdAt,
126126+ )
127127+ if err != nil {
128128+ return fmt.Errorf("upsert spindle_member: %w", err)
129129+ }
130130+ return nil
131131+}
132132+133133+// DeleteSpindleMember removes a member record by its ATProto identity.
134134+func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error {
135135+ _, err := s.db.ExecContext(ctx,
136136+ `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`,
137137+ did, rkey,
138138+ )
139139+ if err != nil {
140140+ return fmt.Errorf("delete spindle_member: %w", err)
141141+ }
142142+ return nil
143143+}
144144+145145+// UpsertRepo records (or refreshes) a sh.tangled.repo observation.
146146+// spindle and repoDid may be empty strings — we store them as such rather
147147+// than as SQL NULLs to keep the upsert path uniform.
148148+func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error {
149149+ _, err := s.db.ExecContext(ctx,
150150+ `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at)
151151+ VALUES (?, ?, ?, ?, ?, ?, ?)
152152+ ON CONFLICT(did, rkey) DO UPDATE SET
153153+ knot = excluded.knot,
154154+ name = excluded.name,
155155+ spindle = excluded.spindle,
156156+ repo_did = excluded.repo_did,
157157+ created_at = excluded.created_at`,
158158+ did, rkey, knot, name, spindle, repoDid, createdAt,
159159+ )
160160+ if err != nil {
161161+ return fmt.Errorf("upsert repo: %w", err)
162162+ }
163163+ return nil
164164+}
165165+166166+// DeleteRepo removes a repo record by its ATProto identity.
167167+func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error {
168168+ _, err := s.db.ExecContext(ctx,
169169+ `DELETE FROM repos WHERE did = ? AND rkey = ?`,
170170+ did, rkey,
171171+ )
172172+ if err != nil {
173173+ return fmt.Errorf("delete repo: %w", err)
174174+ }
175175+ return nil
176176+}
177177+178178+// UpsertRepoCollaborator records (or refreshes) a
179179+// sh.tangled.repo.collaborator observation.
180180+func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error {
181181+ _, err := s.db.ExecContext(ctx,
182182+ `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at)
183183+ VALUES (?, ?, ?, ?, ?, ?)
184184+ ON CONFLICT(did, rkey) DO UPDATE SET
185185+ repo = excluded.repo,
186186+ repo_did = excluded.repo_did,
187187+ subject = excluded.subject,
188188+ created_at = excluded.created_at`,
189189+ did, rkey, repo, repoDid, subject, createdAt,
190190+ )
191191+ if err != nil {
192192+ return fmt.Errorf("upsert repo_collaborator: %w", err)
193193+ }
194194+ return nil
195195+}
196196+197197+// DeleteRepoCollaborator removes a collaborator record by its ATProto
198198+// identity.
199199+func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error {
200200+ _, err := s.db.ExecContext(ctx,
201201+ `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`,
202202+ did, rkey,
203203+ )
204204+ if err != nil {
205205+ return fmt.Errorf("delete repo_collaborator: %w", err)
206206+ }
207207+ return nil
208208+}
+68
store_migrate.go
···11+package main
22+33+// Schema definition and migration logic for the SQLite store. Pulled out
44+// of store.go so the big SQL block doesn't sit in the middle of the
55+// runtime API surface.
66+77+import (
88+ "context"
99+ "fmt"
1010+)
1111+1212+// schema is the full set of CREATE statements applied at startup. It is
1313+// idempotent and additive only — no `DROP`s — so future changes can be
1414+// layered on as additional statements without needing a separate
1515+// migration tool until the project actually outgrows that.
1616+const schema = `
1717+CREATE TABLE IF NOT EXISTS meta (
1818+ key TEXT PRIMARY KEY,
1919+ value TEXT NOT NULL
2020+);
2121+2222+-- Records of sh.tangled.spindle.member. The owner of a spindle publishes
2323+-- one of these per authorized member. (did, rkey) is the natural ATProto
2424+-- key — did identifies the publisher's PDS, rkey identifies the record
2525+-- within that PDS's collection.
2626+CREATE TABLE IF NOT EXISTS spindle_members (
2727+ did TEXT NOT NULL,
2828+ rkey TEXT NOT NULL,
2929+ instance TEXT NOT NULL,
3030+ subject TEXT NOT NULL,
3131+ created_at TEXT NOT NULL,
3232+ PRIMARY KEY (did, rkey)
3333+);
3434+3535+-- Records of sh.tangled.repo. We keep the full set so that when a
3636+-- pipeline trigger arrives we can look up which knot/spindle/repo_did
3737+-- it corresponds to without another round-trip.
3838+CREATE TABLE IF NOT EXISTS repos (
3939+ did TEXT NOT NULL,
4040+ rkey TEXT NOT NULL,
4141+ knot TEXT NOT NULL,
4242+ name TEXT NOT NULL,
4343+ spindle TEXT,
4444+ repo_did TEXT,
4545+ created_at TEXT NOT NULL,
4646+ PRIMARY KEY (did, rkey)
4747+);
4848+4949+-- Records of sh.tangled.repo.collaborator. Used together with repos to
5050+-- decide whether a triggering DID is allowed to push builds to us.
5151+CREATE TABLE IF NOT EXISTS repo_collaborators (
5252+ did TEXT NOT NULL,
5353+ rkey TEXT NOT NULL,
5454+ repo TEXT,
5555+ repo_did TEXT,
5656+ subject TEXT NOT NULL,
5757+ created_at TEXT NOT NULL,
5858+ PRIMARY KEY (did, rkey)
5959+);
6060+`
6161+6262+// migrate applies the schema. Safe to call repeatedly.
6363+func (s *store) migrate(ctx context.Context) error {
6464+ if _, err := s.db.ExecContext(ctx, schema); err != nil {
6565+ return fmt.Errorf("apply schema: %w", err)
6666+ }
6767+ return nil
6868+}
+253
store_test.go
···11+package main
22+33+// Tests for the SQLite store. We use t.TempDir() for an isolated database
44+// per test, which both keeps tests independent and exercises the open +
55+// migrate path on every run.
66+//
77+// Where the store doesn't expose a query method (it's intentionally
88+// write-mostly today) we drop down to raw SQL via s.db to verify the
99+// row is what we expect.
1010+1111+import (
1212+ "context"
1313+ "path/filepath"
1414+ "testing"
1515+)
1616+1717+// newTestStore opens a fresh store in a per-test temp dir and registers
1818+// cleanup. Centralized so test bodies stay focused on behavior.
1919+func newTestStore(t *testing.T) *store {
2020+ t.Helper()
2121+ path := filepath.Join(t.TempDir(), "tack.db")
2222+ s, err := openStore(path)
2323+ if err != nil {
2424+ t.Fatalf("openStore: %v", err)
2525+ }
2626+ t.Cleanup(func() {
2727+ if err := s.Close(); err != nil {
2828+ t.Errorf("close store: %v", err)
2929+ }
3030+ })
3131+ return s
3232+}
3333+3434+// TestOpenStoreIdempotent makes sure re-opening an existing database
3535+// (which is what happens on every restart) succeeds and leaves the
3636+// schema intact.
3737+func TestOpenStoreIdempotent(t *testing.T) {
3838+ path := filepath.Join(t.TempDir(), "tack.db")
3939+ s1, err := openStore(path)
4040+ if err != nil {
4141+ t.Fatalf("first open: %v", err)
4242+ }
4343+ if err := s1.Close(); err != nil {
4444+ t.Fatalf("first close: %v", err)
4545+ }
4646+ s2, err := openStore(path)
4747+ if err != nil {
4848+ t.Fatalf("second open: %v", err)
4949+ }
5050+ defer s2.Close()
5151+5252+ // Sanity check: the schema is in place by writing and reading back
5353+ // a cursor through the freshly re-opened handle.
5454+ ctx := context.Background()
5555+ if err := s2.SaveCursor(ctx, 42); err != nil {
5656+ t.Fatalf("save cursor: %v", err)
5757+ }
5858+ got, err := s2.LoadCursor(ctx)
5959+ if err != nil {
6060+ t.Fatalf("load cursor: %v", err)
6161+ }
6262+ if got == nil || *got != 42 {
6363+ t.Fatalf("cursor = %v, want 42", got)
6464+ }
6565+}
6666+6767+// TestCursorRoundtrip covers the three states a cursor can be in:
6868+// missing (nil), present, and overwritten. Together they exercise the
6969+// INSERT and the ON CONFLICT branch of SaveCursor.
7070+func TestCursorRoundtrip(t *testing.T) {
7171+ s := newTestStore(t)
7272+ ctx := context.Background()
7373+7474+ got, err := s.LoadCursor(ctx)
7575+ if err != nil {
7676+ t.Fatalf("load cursor (empty): %v", err)
7777+ }
7878+ if got != nil {
7979+ t.Fatalf("expected nil cursor on fresh store, got %d", *got)
8080+ }
8181+8282+ if err := s.SaveCursor(ctx, 1234567890); err != nil {
8383+ t.Fatalf("save cursor: %v", err)
8484+ }
8585+ got, err = s.LoadCursor(ctx)
8686+ if err != nil {
8787+ t.Fatalf("load cursor: %v", err)
8888+ }
8989+ if got == nil || *got != 1234567890 {
9090+ t.Fatalf("cursor after save = %v, want 1234567890", got)
9191+ }
9292+9393+ // Overwrite path — exercises ON CONFLICT DO UPDATE.
9494+ if err := s.SaveCursor(ctx, 9999); err != nil {
9595+ t.Fatalf("overwrite cursor: %v", err)
9696+ }
9797+ got, err = s.LoadCursor(ctx)
9898+ if err != nil {
9999+ t.Fatalf("load cursor (overwrite): %v", err)
100100+ }
101101+ if got == nil || *got != 9999 {
102102+ t.Fatalf("cursor after overwrite = %v, want 9999", got)
103103+ }
104104+}
105105+106106+// TestSpindleMemberLifecycle covers insert, update-on-conflict, and
107107+// delete for spindle.member rows. We read back via raw SQL since the
108108+// store doesn't expose a query helper.
109109+func TestSpindleMemberLifecycle(t *testing.T) {
110110+ s := newTestStore(t)
111111+ ctx := context.Background()
112112+113113+ const did, rkey = "did:plc:owner", "abc123"
114114+115115+ if err := s.UpsertSpindleMember(ctx, did, rkey, "https://spindle.example", "did:plc:alice", "2026-01-01T00:00:00Z"); err != nil {
116116+ t.Fatalf("insert: %v", err)
117117+ }
118118+119119+ var instance, subject, createdAt string
120120+ err := s.db.QueryRowContext(ctx,
121121+ `SELECT instance, subject, created_at FROM spindle_members WHERE did = ? AND rkey = ?`,
122122+ did, rkey,
123123+ ).Scan(&instance, &subject, &createdAt)
124124+ if err != nil {
125125+ t.Fatalf("query after insert: %v", err)
126126+ }
127127+ if instance != "https://spindle.example" || subject != "did:plc:alice" || createdAt != "2026-01-01T00:00:00Z" {
128128+ t.Fatalf("after insert got (%q,%q,%q)", instance, subject, createdAt)
129129+ }
130130+131131+ // Update path — same primary key, different fields.
132132+ if err := s.UpsertSpindleMember(ctx, did, rkey, "https://spindle.example", "did:plc:bob", "2026-02-02T00:00:00Z"); err != nil {
133133+ t.Fatalf("update: %v", err)
134134+ }
135135+ err = s.db.QueryRowContext(ctx,
136136+ `SELECT subject, created_at FROM spindle_members WHERE did = ? AND rkey = ?`,
137137+ did, rkey,
138138+ ).Scan(&subject, &createdAt)
139139+ if err != nil {
140140+ t.Fatalf("query after update: %v", err)
141141+ }
142142+ if subject != "did:plc:bob" || createdAt != "2026-02-02T00:00:00Z" {
143143+ t.Fatalf("after update got (%q,%q)", subject, createdAt)
144144+ }
145145+146146+ // Delete and verify the row is gone.
147147+ if err := s.DeleteSpindleMember(ctx, did, rkey); err != nil {
148148+ t.Fatalf("delete: %v", err)
149149+ }
150150+ if n := countRows(t, s, "spindle_members"); n != 0 {
151151+ t.Fatalf("after delete, spindle_members has %d rows, want 0", n)
152152+ }
153153+154154+ // Deleting a row that doesn't exist must succeed silently — the
155155+ // jetstream stream can replay deletes after a restart.
156156+ if err := s.DeleteSpindleMember(ctx, did, rkey); err != nil {
157157+ t.Fatalf("delete missing: %v", err)
158158+ }
159159+}
160160+161161+// TestRepoLifecycle exercises Repo upsert/delete and confirms that
162162+// optional fields (spindle, repo_did) round-trip as the empty string
163163+// rather than NULL — the store schema treats them uniformly.
164164+func TestRepoLifecycle(t *testing.T) {
165165+ s := newTestStore(t)
166166+ ctx := context.Background()
167167+168168+ const did, rkey = "did:plc:owner", "repo1"
169169+170170+ if err := s.UpsertRepo(ctx, did, rkey, "knot.example", "myrepo", "https://spindle.example", "did:plc:repo", "2026-01-01T00:00:00Z"); err != nil {
171171+ t.Fatalf("insert: %v", err)
172172+ }
173173+174174+ var knot, name, spindle, repoDid string
175175+ err := s.db.QueryRowContext(ctx,
176176+ `SELECT knot, name, spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
177177+ did, rkey,
178178+ ).Scan(&knot, &name, &spindle, &repoDid)
179179+ if err != nil {
180180+ t.Fatalf("query after insert: %v", err)
181181+ }
182182+ if knot != "knot.example" || name != "myrepo" || spindle != "https://spindle.example" || repoDid != "did:plc:repo" {
183183+ t.Fatalf("after insert got (%q,%q,%q,%q)", knot, name, spindle, repoDid)
184184+ }
185185+186186+ // Upsert with cleared optional fields — verifies UPDATE actually
187187+ // overwrites them rather than preserving the old non-empty values.
188188+ if err := s.UpsertRepo(ctx, did, rkey, "knot.example", "myrepo", "", "", "2026-01-01T00:00:00Z"); err != nil {
189189+ t.Fatalf("update: %v", err)
190190+ }
191191+ err = s.db.QueryRowContext(ctx,
192192+ `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
193193+ did, rkey,
194194+ ).Scan(&spindle, &repoDid)
195195+ if err != nil {
196196+ t.Fatalf("query after update: %v", err)
197197+ }
198198+ if spindle != "" || repoDid != "" {
199199+ t.Fatalf("after update got (%q,%q), want both empty", spindle, repoDid)
200200+ }
201201+202202+ if err := s.DeleteRepo(ctx, did, rkey); err != nil {
203203+ t.Fatalf("delete: %v", err)
204204+ }
205205+ if n := countRows(t, s, "repos"); n != 0 {
206206+ t.Fatalf("after delete, repos has %d rows, want 0", n)
207207+ }
208208+}
209209+210210+// TestRepoCollaboratorLifecycle mirrors the repo and member tests for
211211+// the collaborator table.
212212+func TestRepoCollaboratorLifecycle(t *testing.T) {
213213+ s := newTestStore(t)
214214+ ctx := context.Background()
215215+216216+ const did, rkey = "did:plc:owner", "collab1"
217217+218218+ if err := s.UpsertRepoCollaborator(ctx, did, rkey, "myrepo", "did:plc:repo", "did:plc:carol", "2026-01-01T00:00:00Z"); err != nil {
219219+ t.Fatalf("insert: %v", err)
220220+ }
221221+222222+ var repo, repoDid, subject string
223223+ err := s.db.QueryRowContext(ctx,
224224+ `SELECT repo, repo_did, subject FROM repo_collaborators WHERE did = ? AND rkey = ?`,
225225+ did, rkey,
226226+ ).Scan(&repo, &repoDid, &subject)
227227+ if err != nil {
228228+ t.Fatalf("query: %v", err)
229229+ }
230230+ if repo != "myrepo" || repoDid != "did:plc:repo" || subject != "did:plc:carol" {
231231+ t.Fatalf("got (%q,%q,%q)", repo, repoDid, subject)
232232+ }
233233+234234+ if err := s.DeleteRepoCollaborator(ctx, did, rkey); err != nil {
235235+ t.Fatalf("delete: %v", err)
236236+ }
237237+ if n := countRows(t, s, "repo_collaborators"); n != 0 {
238238+ t.Fatalf("after delete, repo_collaborators has %d rows, want 0", n)
239239+ }
240240+}
241241+242242+// countRows is a small SELECT COUNT(*) helper used by lifecycle tests
243243+// to verify deletes actually removed the row. Table name is interpolated
244244+// directly because callers pass a constant from the schema, not user
245245+// input — SQLite doesn't allow parameterized table names anyway.
246246+func countRows(t *testing.T, s *store, table string) int {
247247+ t.Helper()
248248+ var n int
249249+ if err := s.db.QueryRow(`SELECT COUNT(*) FROM ` + table).Scan(&n); err != nil {
250250+ t.Fatalf("count %s: %v", table, err)
251251+ }
252252+ return n
253253+}