···11+package main
22+33+// In-process event broker for the /events websocket fan-out.
44+//
55+// Lifecycle of an outbound event:
66+//
77+// publisher (e.g. Buildkite webhook handler)
88+// │
99+// ▼
1010+// broker.Publish ─── store.InsertEvent ──▶ events table (rowid = cursor)
1111+// │
1212+// └── notify() ──▶ each subscriber's signal channel
1313+// │
1414+// ▼
1515+// /events handler wakes,
1616+// calls store.EventsAfter(cursor),
1717+// writes envelope JSON to its websocket.
1818+//
1919+// Subscribers don't receive events through the channel directly — only
2020+// a "wake up" signal. They re-read from the store using the cursor they
2121+// last delivered. This means:
2222+//
2323+// - slow clients can't make us drop events (they just lag behind in
2424+// rowid space and catch up on their own pace),
2525+// - reconnecting clients can resume by passing ?cursor=N, hitting the
2626+// same EventsAfter path used for live deliveries,
2727+// - we never have to bound a per-subscriber buffer.
2828+//
2929+// This mirrors the upstream Tangled spindle's notifier+stream design
3030+// (see tangled.org/core/spindle/stream.go), which is the source of
3131+// truth for the wire format on /events.
3232+3333+import (
3434+ "context"
3535+ "encoding/json"
3636+ "sync"
3737+)
3838+3939+// eventsEnvelope is the wire shape we emit on /events frames. It must
4040+// match the upstream Tangled spindle byte-for-byte so the appview's
4141+// eventconsumer treats us as a drop-in source.
4242+//
4343+// Upstream defines this shape in two places that don't quite agree:
4444+//
4545+// - The producer (tangled.org/core/spindle/stream.go, streamPipelines)
4646+// marshals an inline map[string]any with lowercase keys.
4747+// - The consumer (tangled.org/core/eventconsumer.Message) is exported
4848+// but its Rkey/Nsid fields are missing JSON tags, so reusing it for
4949+// marshalling here would emit "Rkey"/"Nsid" — accepted on read only
5050+// because Go's json package matches field names case-insensitively.
5151+//
5252+// Defining our own struct keeps the wire output identical to the
5353+// upstream producer and lets every site that emits an event (the
5454+// /events handler today, future re-publishers tomorrow) share one
5555+// canonical type.
5656+//
5757+// Event is held as RawMessage so callers can splice a stored record
5858+// body straight in without an unmarshal/remarshal round-trip.
5959+type eventsEnvelope struct {
6060+ Rkey string `json:"rkey"`
6161+ Nsid string `json:"nsid"`
6262+ Event json.RawMessage `json:"event"`
6363+ Created int64 `json:"created"`
6464+}
6565+6666+// broker fans out event-table writes to connected /events subscribers.
6767+// Construct with newBroker; safe for concurrent use.
6868+type broker struct {
6969+ st *store
7070+7171+ mu sync.Mutex
7272+ subs map[chan struct{}]struct{}
7373+}
7474+7575+// newBroker returns a broker bound to st. The store is used both for
7676+// durable writes in Publish and for cursor-based reads in EventsAfter
7777+// from the /events handler.
7878+func newBroker(st *store) *broker {
7979+ return &broker{
8080+ st: st,
8181+ subs: make(map[chan struct{}]struct{}),
8282+ }
8383+}
8484+8585+// Subscribe registers a new subscriber and returns its signal channel.
8686+// The channel is buffered with a capacity of 1: notify() does a
8787+// non-blocking send, so a pending notification simply coalesces with
8888+// the next one rather than blocking the publisher. Subscribers must
8989+// call Unsubscribe when done to free the slot.
9090+func (b *broker) Subscribe() chan struct{} {
9191+ ch := make(chan struct{}, 1)
9292+ b.mu.Lock()
9393+ b.subs[ch] = struct{}{}
9494+ b.mu.Unlock()
9595+ return ch
9696+}
9797+9898+// Unsubscribe removes ch from the broker. Safe to call with a channel
9999+// that was never subscribed (no-op) or to call more than once.
100100+func (b *broker) Unsubscribe(ch chan struct{}) {
101101+ b.mu.Lock()
102102+ delete(b.subs, ch)
103103+ b.mu.Unlock()
104104+}
105105+106106+// Publish persists an event and wakes every subscriber. The returned
107107+// int64 is the assigned cursor (rowid) for the new row, useful in tests
108108+// and for any caller that wants to log "published as cursor=N".
109109+//
110110+// eventJSON is the record body — typically a marshalled
111111+// tangled.PipelineStatus. The caller is responsible for choosing rkey
112112+// (atproto record key) and nsid (collection NSID).
113113+func (b *broker) Publish(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) {
114114+ created, err := b.st.InsertEvent(ctx, rkey, nsid, eventJSON)
115115+ if err != nil {
116116+ return 0, err
117117+ }
118118+ b.notify()
119119+ return created, nil
120120+}
121121+122122+// notify sends a non-blocking signal to every subscriber. Held lock
123123+// covers iteration only — the send itself is O(1) and never blocks
124124+// because of the buffered channel + default case.
125125+func (b *broker) notify() {
126126+ b.mu.Lock()
127127+ defer b.mu.Unlock()
128128+ for ch := range b.subs {
129129+ select {
130130+ case ch <- struct{}{}:
131131+ default:
132132+ // A previous signal hasn't been drained yet; coalesce —
133133+ // the subscriber will catch up on the next read since it
134134+ // queries by cursor, not by message count.
135135+ }
136136+ }
137137+}
+335
events_test.go
···11+package main
22+33+// Tests for the /events fan-out: the store's event log methods, the
44+// in-process broker, and the eventsHandler's wire output. The handler
55+// test boots a real httptest server and a real gorilla websocket client
66+// so we exercise the upgrade + envelope codec end to end.
77+88+import (
99+ "context"
1010+ "encoding/json"
1111+ "io"
1212+ "log/slog"
1313+ "net/http"
1414+ "net/http/httptest"
1515+ "net/url"
1616+ "strconv"
1717+ "strings"
1818+ "testing"
1919+ "time"
2020+2121+ "github.com/gorilla/websocket"
2222+)
2323+2424+// TestEventsLogRoundtrip covers InsertEvent / EventsAfter together
2525+// because they're a tightly-coupled pair: the cursor returned by Insert
2626+// is the same value EventsAfter must accept to skip past that row.
2727+func TestEventsLogRoundtrip(t *testing.T) {
2828+ s := newTestStore(t)
2929+ ctx := context.Background()
3030+3131+ // Empty log → empty slice (not nil), so callers can range freely.
3232+ got, err := s.EventsAfter(ctx, 0)
3333+ if err != nil {
3434+ t.Fatalf("EventsAfter empty: %v", err)
3535+ }
3636+ if got == nil || len(got) != 0 {
3737+ t.Fatalf("empty log: got %v, want empty non-nil slice", got)
3838+ }
3939+4040+ c1, err := s.InsertEvent(ctx, "rk1", "sh.tangled.pipeline.status", []byte(`{"a":1}`))
4141+ if err != nil {
4242+ t.Fatalf("insert 1: %v", err)
4343+ }
4444+ c2, err := s.InsertEvent(ctx, "rk2", "sh.tangled.pipeline.status", []byte(`{"a":2}`))
4545+ if err != nil {
4646+ t.Fatalf("insert 2: %v", err)
4747+ }
4848+ if c2 <= c1 {
4949+ t.Fatalf("cursors must be monotonically increasing: c1=%d c2=%d", c1, c2)
5050+ }
5151+5252+ // cursor=0 returns everything; cursor=c1 skips the first row.
5353+ got, err = s.EventsAfter(ctx, 0)
5454+ if err != nil {
5555+ t.Fatalf("EventsAfter 0: %v", err)
5656+ }
5757+ if len(got) != 2 || got[0].Created != c1 || got[1].Created != c2 {
5858+ t.Fatalf("EventsAfter(0) = %+v, want both rows in order", got)
5959+ }
6060+ if got[0].Rkey != "rk1" || got[1].Rkey != "rk2" {
6161+ t.Fatalf("rkey order wrong: %q %q", got[0].Rkey, got[1].Rkey)
6262+ }
6363+ // json.RawMessage round-trip — matters because the /events handler
6464+ // splices these straight into the envelope.
6565+ if string(got[0].EventJSON) != `{"a":1}` {
6666+ t.Fatalf("event_json round-trip = %q", got[0].EventJSON)
6767+ }
6868+6969+ got, err = s.EventsAfter(ctx, c1)
7070+ if err != nil {
7171+ t.Fatalf("EventsAfter c1: %v", err)
7272+ }
7373+ if len(got) != 1 || got[0].Created != c2 {
7474+ t.Fatalf("EventsAfter(c1) = %+v, want only row c2", got)
7575+ }
7676+}
7777+7878+// TestBrokerPublishWakesSubscribers asserts the core invariant: a
7979+// Publish causes Subscribe()'d channels to fire (at least once) and the
8080+// row is durably visible via EventsAfter so the subscriber can drain
8181+// it. Two subscribers cover the multi-fanout case.
8282+func TestBrokerPublishWakesSubscribers(t *testing.T) {
8383+ s := newTestStore(t)
8484+ ctx := context.Background()
8585+ br := newBroker(s)
8686+8787+ a := br.Subscribe()
8888+ b := br.Subscribe()
8989+ defer br.Unsubscribe(a)
9090+ defer br.Unsubscribe(b)
9191+9292+ cursor, err := br.Publish(ctx, "rk", "sh.tangled.pipeline.status", []byte(`{}`))
9393+ if err != nil {
9494+ t.Fatalf("publish: %v", err)
9595+ }
9696+ if cursor <= 0 {
9797+ t.Fatalf("publish cursor = %d, want > 0", cursor)
9898+ }
9999+100100+ // Both subscribers must receive the wake-up promptly. Use a
101101+ // generous timeout so flaky CI doesn't false-alarm.
102102+ for name, ch := range map[string]chan struct{}{"a": a, "b": b} {
103103+ select {
104104+ case <-ch:
105105+ case <-time.After(time.Second):
106106+ t.Fatalf("subscriber %s did not receive signal", name)
107107+ }
108108+ }
109109+110110+ rows, err := s.EventsAfter(ctx, 0)
111111+ if err != nil {
112112+ t.Fatalf("EventsAfter: %v", err)
113113+ }
114114+ if len(rows) != 1 || rows[0].Created != cursor {
115115+ t.Fatalf("after publish, EventsAfter(0) = %+v, want one row with cursor=%d", rows, cursor)
116116+ }
117117+}
118118+119119+// TestBrokerCoalescesPendingSignal ensures Publish never blocks on a
120120+// subscriber that hasn't drained its channel: a second Publish while
121121+// the first signal is still pending must coalesce, not deadlock. This
122122+// is the property that lets slow clients lag without backpressuring
123123+// the rest of the system.
124124+func TestBrokerCoalescesPendingSignal(t *testing.T) {
125125+ s := newTestStore(t)
126126+ ctx := context.Background()
127127+ br := newBroker(s)
128128+129129+ ch := br.Subscribe()
130130+ defer br.Unsubscribe(ch)
131131+132132+ // First publish lands a pending signal in ch (cap=1).
133133+ if _, err := br.Publish(ctx, "rk1", "n", []byte(`{}`)); err != nil {
134134+ t.Fatalf("publish 1: %v", err)
135135+ }
136136+ // Second publish must succeed immediately — without a default
137137+ // branch in notify(), this would block forever waiting on the
138138+ // unread ch.
139139+ done := make(chan error, 1)
140140+ go func() {
141141+ _, err := br.Publish(ctx, "rk2", "n", []byte(`{}`))
142142+ done <- err
143143+ }()
144144+ select {
145145+ case err := <-done:
146146+ if err != nil {
147147+ t.Fatalf("publish 2: %v", err)
148148+ }
149149+ case <-time.After(time.Second):
150150+ t.Fatal("Publish blocked on un-drained subscriber")
151151+ }
152152+153153+ // One drain is enough — cursor-based catch-up will pick up *both*
154154+ // rows in a single EventsAfter call.
155155+ <-ch
156156+ rows, err := s.EventsAfter(ctx, 0)
157157+ if err != nil {
158158+ t.Fatalf("EventsAfter: %v", err)
159159+ }
160160+ if len(rows) != 2 {
161161+ t.Fatalf("expected 2 rows after 2 publishes, got %d", len(rows))
162162+ }
163163+}
164164+165165+// TestBrokerUnsubscribeStopsDelivery confirms an unsubscribed channel
166166+// no longer receives wake-ups. Without this we'd leak signals to dead
167167+// websockets and (worse) hold their channels in the broker map.
168168+func TestBrokerUnsubscribeStopsDelivery(t *testing.T) {
169169+ s := newTestStore(t)
170170+ ctx := context.Background()
171171+ br := newBroker(s)
172172+173173+ ch := br.Subscribe()
174174+ br.Unsubscribe(ch)
175175+176176+ if _, err := br.Publish(ctx, "rk", "n", []byte(`{}`)); err != nil {
177177+ t.Fatalf("publish: %v", err)
178178+ }
179179+ select {
180180+ case <-ch:
181181+ t.Fatal("unsubscribed channel still received signal")
182182+ case <-time.After(50 * time.Millisecond):
183183+ }
184184+}
185185+186186+// TestEventsHandlerStreamsLiveAndBackfill exercises the full HTTP
187187+// surface: open a websocket, observe a backfill of pre-existing rows,
188188+// publish a new row, observe it arrive live, then reconnect with a
189189+// cursor and observe only events strictly after that cursor.
190190+func TestEventsHandlerStreamsLiveAndBackfill(t *testing.T) {
191191+ s := newTestStore(t)
192192+ br := newBroker(s)
193193+ ctx := context.Background()
194194+195195+ // Seed two rows so the first connection has something to backfill.
196196+ pre1, err := br.Publish(ctx, "rk-pre1", "sh.tangled.pipeline.status", []byte(`{"i":1}`))
197197+ if err != nil {
198198+ t.Fatalf("seed 1: %v", err)
199199+ }
200200+ pre2, err := br.Publish(ctx, "rk-pre2", "sh.tangled.pipeline.status", []byte(`{"i":2}`))
201201+ if err != nil {
202202+ t.Fatalf("seed 2: %v", err)
203203+ }
204204+205205+ // Boot the handler behind an httptest server. Using a discarding
206206+ // logger keeps test output quiet.
207207+ logger := slog.New(slog.NewTextHandler(io.Discard, nil))
208208+ srv := httptest.NewServer(eventsHandler(logger, br))
209209+ t.Cleanup(srv.Close)
210210+211211+ // First connection: no cursor, expect both seeded rows plus a
212212+ // freshly published live row.
213213+ c1 := dialEvents(t, srv.URL, 0)
214214+ defer c1.Close()
215215+216216+ got1 := readEnvelope(t, c1)
217217+ if got1.Created != pre1 || got1.Rkey != "rk-pre1" {
218218+ t.Fatalf("first frame = %+v, want pre1 (cursor=%d)", got1, pre1)
219219+ }
220220+ got2 := readEnvelope(t, c1)
221221+ if got2.Created != pre2 || got2.Rkey != "rk-pre2" {
222222+ t.Fatalf("second frame = %+v, want pre2 (cursor=%d)", got2, pre2)
223223+ }
224224+ // Verify the wire envelope's `event` field round-trips as the raw
225225+ // record body, not a re-encoded blob.
226226+ if strings.TrimSpace(string(got2.Event)) != `{"i":2}` {
227227+ t.Fatalf("event body = %q, want %q", got2.Event, `{"i":2}`)
228228+ }
229229+230230+ // Live publish — handler should wake on broker signal and emit it.
231231+ live, err := br.Publish(ctx, "rk-live", "sh.tangled.pipeline.status", []byte(`{"i":3}`))
232232+ if err != nil {
233233+ t.Fatalf("live publish: %v", err)
234234+ }
235235+ got3 := readEnvelope(t, c1)
236236+ if got3.Created != live || got3.Rkey != "rk-live" {
237237+ t.Fatalf("live frame = %+v, want rk-live (cursor=%d)", got3, live)
238238+ }
239239+240240+ // Second connection with cursor=pre2: must skip pre1 and pre2,
241241+ // receive only the live row. No timeout is set; if the handler
242242+ // over-delivers we'll fail in readEnvelope's assert below.
243243+ c2 := dialEvents(t, srv.URL, pre2)
244244+ defer c2.Close()
245245+ got := readEnvelope(t, c2)
246246+ if got.Created != live || got.Rkey != "rk-live" {
247247+ t.Fatalf("cursor-resume frame = %+v, want rk-live (cursor=%d)", got, live)
248248+ }
249249+}
250250+251251+// TestEventsHandlerBadCursorStartsFromZero confirms a malformed cursor
252252+// query parameter doesn't 4xx the upgrade — the handler logs and falls
253253+// back to the full backfill, matching the upstream spindle's behaviour.
254254+func TestEventsHandlerBadCursorStartsFromZero(t *testing.T) {
255255+ s := newTestStore(t)
256256+ br := newBroker(s)
257257+ ctx := context.Background()
258258+259259+ if _, err := br.Publish(ctx, "rk", "sh.tangled.pipeline.status", []byte(`{}`)); err != nil {
260260+ t.Fatalf("publish: %v", err)
261261+ }
262262+263263+ logger := slog.New(slog.NewTextHandler(io.Discard, nil))
264264+ srv := httptest.NewServer(eventsHandler(logger, br))
265265+ t.Cleanup(srv.Close)
266266+267267+ // Build the URL by hand so we can inject a non-numeric cursor.
268268+ u, _ := url.Parse(srv.URL)
269269+ u.Scheme = "ws"
270270+ q := u.Query()
271271+ q.Set("cursor", "not-a-number")
272272+ u.RawQuery = q.Encode()
273273+274274+ conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
275275+ if err != nil {
276276+ t.Fatalf("dial: %v", err)
277277+ }
278278+ defer conn.Close()
279279+280280+ conn.SetReadDeadline(time.Now().Add(2 * time.Second))
281281+ _, msg, err := conn.ReadMessage()
282282+ if err != nil {
283283+ t.Fatalf("read: %v", err)
284284+ }
285285+ var env eventsEnvelope
286286+ if err := json.Unmarshal(msg, &env); err != nil {
287287+ t.Fatalf("unmarshal: %v", err)
288288+ }
289289+ if env.Rkey != "rk" {
290290+ t.Fatalf("expected backfill of seeded row, got envelope %+v", env)
291291+ }
292292+}
293293+294294+// dialEvents opens a websocket against an httptest server (which
295295+// returns http://) by rewriting the scheme to ws://. cursor=0 omits the
296296+// query parameter entirely so we exercise the "no cursor" code path.
297297+func dialEvents(t *testing.T, base string, cursor int64) *websocket.Conn {
298298+ t.Helper()
299299+ u, err := url.Parse(base)
300300+ if err != nil {
301301+ t.Fatalf("parse url: %v", err)
302302+ }
303303+ u.Scheme = "ws"
304304+ if cursor != 0 {
305305+ q := u.Query()
306306+ q.Set("cursor", strconv.FormatInt(cursor, 10))
307307+ u.RawQuery = q.Encode()
308308+ }
309309+ conn, _, err := websocket.DefaultDialer.Dial(u.String(), http.Header{})
310310+ if err != nil {
311311+ t.Fatalf("dial %s: %v", u, err)
312312+ }
313313+ return conn
314314+}
315315+316316+// readEnvelope reads one TextMessage frame, decodes it as the spindle
317317+// wire envelope, and returns it. It enforces a read deadline so a
318318+// handler bug that fails to flush events doesn't hang the test forever.
319319+func readEnvelope(t *testing.T, conn *websocket.Conn) eventsEnvelope {
320320+ t.Helper()
321321+ conn.SetReadDeadline(time.Now().Add(2 * time.Second))
322322+ mt, msg, err := conn.ReadMessage()
323323+ if err != nil {
324324+ t.Fatalf("read: %v", err)
325325+ }
326326+ if mt != websocket.TextMessage {
327327+ t.Fatalf("frame type = %d, want TextMessage", mt)
328328+ }
329329+ var env eventsEnvelope
330330+ if err := json.Unmarshal(msg, &env); err != nil {
331331+ t.Fatalf("decode envelope: %v (raw: %s)", err, msg)
332332+ }
333333+ return env
334334+}
335335+
+101-16
http.go
···2121 "fmt"
2222 "log/slog"
2323 "net/http"
2424+ "strconv"
2425 "time"
25262627 "github.com/gorilla/websocket"
···3132// cancelled or the listener returns a fatal error. On ctx cancellation it
3233// performs a graceful shutdown with a bounded timeout.
3334//
3434-// The logger is read from ctx via loggerFrom.
3535-func runHTTP(ctx context.Context, cfg config) error {
3535+// The logger is read from ctx via loggerFrom. The broker is the
3636+// in-process pub/sub used by /events to fan published records out to
3737+// connected websocket subscribers.
3838+func runHTTP(ctx context.Context, cfg config, br *broker) error {
3639 logger := loggerFrom(ctx)
37403841 mux := http.NewServeMux()
3942 mux.HandleFunc("GET /", rootHandler())
4040- mux.HandleFunc("GET /events", eventsHandler(logger))
4343+ mux.HandleFunc("GET /events", eventsHandler(logger, br))
4144 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID))
4245 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler())
4346···97100 }
98101}
99102100100-// eventsHandler upgrades to a WebSocket and emits no events yet. It exists
101101-// so the Tangled appview can connect; once we wire up Buildkite webhooks
102102-// this is where sh.tangled.pipeline.status frames will be sent.
103103+// eventsHandler upgrades to a WebSocket and streams persisted records
104104+// to the connected client. The wire protocol mirrors the upstream
105105+// Tangled spindle so the appview's eventconsumer treats us as a
106106+// drop-in source:
107107+//
108108+// - Optional ?cursor=<int64> resumes after that rowid; absent or 0
109109+// means "from the beginning of our retained log".
110110+// - We do a backfill pass first (everything with created > cursor),
111111+// then loop: on each broker signal, drain new rows; on a 30s
112112+// timer, write a websocket ping so intermediaries don't idle the
113113+// connection out.
103114//
104104-// We send a periodic ping to keep intermediaries (load balancers, tunnels)
105105-// from idling the connection, and watch for client reads to detect a
106106-// disconnect.
107107-func eventsHandler(logger *slog.Logger) http.HandlerFunc {
115115+// We subscribe to the broker *before* the backfill so any Publish that
116116+// races between the cursor read and the loop entry is captured by the
117117+// pending channel signal — the loop will see it on its first iteration
118118+// and call streamEvents again, which is idempotent on the cursor.
119119+func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc {
108120 upgrader := websocket.Upgrader{
109121 ReadBufferSize: 1024,
110122 WriteBufferSize: 1024,
···116128 return
117129 }
118130 defer conn.Close()
119119- logger.Debug("events client connected", "remote", r.RemoteAddr)
131131+132132+ // Parse the resume cursor up front. An unparseable cursor is a
133133+ // client bug, but rather than 4xx the upgraded connection we
134134+ // log it and start from zero — same behaviour as the upstream
135135+ // spindle.
136136+ var cursor int64
137137+ if raw := r.URL.Query().Get("cursor"); raw != "" {
138138+ parsed, err := strconv.ParseInt(raw, 10, 64)
139139+ if err != nil {
140140+ logger.Warn("events: bad cursor, starting from 0",
141141+ "cursor", raw, "err", err,
142142+ )
143143+ } else {
144144+ cursor = parsed
145145+ }
146146+ }
147147+ logger.Debug("events client connected",
148148+ "remote", r.RemoteAddr, "cursor", cursor,
149149+ )
150150+151151+ // Subscribe before the backfill so a Publish that races between
152152+ // the EventsAfter read and our select loop is captured by the
153153+ // pending channel signal — we'll re-drain on the first wake-up.
154154+ sig := br.Subscribe()
155155+ defer br.Unsubscribe(sig)
120156121157 ctx, cancel := context.WithCancel(r.Context())
122158 defer cancel()
123159124124- // Detect client disconnect by trying to read; we don't expect any
125125- // payloads from the client, so any read result (including EOF)
126126- // signals the connection has gone away.
160160+ // Detect client disconnect by trying to read; we don't expect
161161+ // any payloads from the client, so any read outcome (including
162162+ // EOF) signals the connection has gone away.
127163 go func() {
128164 for {
129165 if _, _, err := conn.NextReader(); err != nil {
···133169 }
134170 }()
135171172172+ // Initial backfill. If this fails the connection is unusable
173173+ // (we can't promise ordering after a partial write) so just
174174+ // return and let the client reconnect with the same cursor.
175175+ if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
176176+ logger.Debug("events backfill ended", "err", err, "cursor", cursor)
177177+ return
178178+ }
179179+136180 ticker := time.NewTicker(30 * time.Second)
137181 defer ticker.Stop()
138182 for {
139183 select {
140184 case <-ctx.Done():
141141- logger.Debug("events client disconnected", "remote", r.RemoteAddr)
185185+ logger.Debug("events client disconnected",
186186+ "remote", r.RemoteAddr, "cursor", cursor,
187187+ )
142188 return
189189+ case <-sig:
190190+ if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
191191+ logger.Debug("events stream ended", "err", err, "cursor", cursor)
192192+ return
193193+ }
143194 case <-ticker.C:
144144- if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(time.Second)); err != nil {
195195+ if err := conn.WriteControl(
196196+ websocket.PingMessage, nil,
197197+ time.Now().Add(time.Second),
198198+ ); err != nil {
145199 logger.Debug("events ping failed", "err", err)
146200 return
147201 }
···149203 }
150204 }
151205}
206206+207207+// streamEvents drains every event row with `created > *cursor`, writes
208208+// each as a wire envelope frame, and advances *cursor in lockstep. The
209209+// cursor is updated *after* the write succeeds so a half-flushed batch
210210+// (interrupted by a websocket error) replays cleanly on the next
211211+// connection.
212212+//
213213+// It is safe to call repeatedly: when there are no new rows the query
214214+// returns an empty slice and we noop.
215215+func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error {
216216+ rows, err := st.EventsAfter(ctx, *cursor)
217217+ if err != nil {
218218+ return fmt.Errorf("read events: %w", err)
219219+ }
220220+ for _, row := range rows {
221221+ frame, err := json.Marshal(eventsEnvelope{
222222+ Rkey: row.Rkey,
223223+ Nsid: row.Nsid,
224224+ Event: row.EventJSON,
225225+ Created: row.Created,
226226+ })
227227+ if err != nil {
228228+ return fmt.Errorf("marshal envelope: %w", err)
229229+ }
230230+ if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
231231+ return fmt.Errorf("write frame: %w", err)
232232+ }
233233+ *cursor = row.Created
234234+ }
235235+ return nil
236236+}
+7-1
main.go
···108108 }()
109109 logger.Info("store open", "path", cfg.DBPath)
110110111111+ // In-process broker for the /events fan-out. Wraps the store so
112112+ // publishes are durable and reconnecting subscribers can resume by
113113+ // cursor. Constructed before the consumers in case we ever want
114114+ // them to publish synthetic status events at startup.
115115+ br := newBroker(st)
116116+111117 // Start the knot event-stream consumer first so the jetstream
112118 // loop has somewhere to register newly-observed knots into.
113119 knots, err := startKnotConsumer(ctx, cfg, st)
···127133128134 // Run the HTTP server. This blocks until ctx is cancelled or the
129135 // listener errors.
130130- if err := runHTTP(ctx, cfg); err != nil {
136136+ if err := runHTTP(ctx, cfg, br); err != nil {
131137 logger.Error("http server error", "err", err)
132138 os.Exit(1)
133139 }
+84
store.go
···2121import (
2222 "context"
2323 "database/sql"
2424+ "encoding/json"
2425 "errors"
2526 "fmt"
2627 "strconv"
2828+ "time"
27292830 _ "github.com/mattn/go-sqlite3"
2931)
···275277 }
276278 return nil
277279}
280280+281281+// EventRow is one row of the events table. It represents an outbound
282282+// record we want to deliver to /events websocket subscribers, in the
283283+// shape callers actually need (raw record JSON, not stringly-typed).
284284+type EventRow struct {
285285+ // Created is the assigned monotonic rowid; doubles as the cursor
286286+ // value subscribers use to resume.
287287+ Created int64
288288+ // Rkey is the ATProto record key. For sh.tangled.pipeline.status
289289+ // records this is the rkey we mint when publishing.
290290+ Rkey string
291291+ // Nsid is the lexicon collection (e.g. sh.tangled.pipeline.status).
292292+ Nsid string
293293+ // EventJSON is the record body verbatim — held as RawMessage so
294294+ // the /events handler can splice it into the wire envelope without
295295+ // an unmarshal/remarshal round-trip.
296296+ EventJSON json.RawMessage
297297+}
298298+299299+// InsertEvent appends an event row and returns its assigned `created`
300300+// (rowid) cursor. Storage is the source of truth for fan-out, so we
301301+// write here even if zero subscribers are connected — a subscriber that
302302+// connects later (with an old cursor) will pick the row up via
303303+// EventsAfter.
304304+//
305305+// eventJSON must be a valid JSON object; we store it verbatim. Length
306306+// validation is intentionally absent — the schema accepts arbitrary
307307+// TEXT and SQLite handles huge blobs fine for our scale.
308308+func (s *store) InsertEvent(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) {
309309+ res, err := s.db.ExecContext(ctx,
310310+ `INSERT INTO events (rkey, nsid, event_json, inserted_at)
311311+ VALUES (?, ?, ?, ?)`,
312312+ rkey, nsid, string(eventJSON),
313313+ time.Now().UTC().Format(time.RFC3339Nano),
314314+ )
315315+ if err != nil {
316316+ return 0, fmt.Errorf("insert event: %w", err)
317317+ }
318318+ id, err := res.LastInsertId()
319319+ if err != nil {
320320+ return 0, fmt.Errorf("event last insert id: %w", err)
321321+ }
322322+ return id, nil
323323+}
324324+325325+// EventsAfter returns every event row with `created` strictly greater
326326+// than cursor, in cursor order. Used by /events to backfill a
327327+// reconnecting subscriber and to drain newly-published rows on each
328328+// broker notification.
329329+//
330330+// Pass cursor=0 to get the full log from the beginning, which is what
331331+// happens when a subscriber connects without a ?cursor= query param.
332332+func (s *store) EventsAfter(ctx context.Context, cursor int64) ([]EventRow, error) {
333333+ rows, err := s.db.QueryContext(ctx,
334334+ `SELECT created, rkey, nsid, event_json
335335+ FROM events
336336+ WHERE created > ?
337337+ ORDER BY created ASC`,
338338+ cursor,
339339+ )
340340+ if err != nil {
341341+ return nil, fmt.Errorf("query events: %w", err)
342342+ }
343343+ defer rows.Close()
344344+345345+ out := []EventRow{}
346346+ for rows.Next() {
347347+ var (
348348+ ev EventRow
349349+ raw string
350350+ )
351351+ if err := rows.Scan(&ev.Created, &ev.Rkey, &ev.Nsid, &raw); err != nil {
352352+ return nil, fmt.Errorf("scan event: %w", err)
353353+ }
354354+ ev.EventJSON = json.RawMessage(raw)
355355+ out = append(out, ev)
356356+ }
357357+ if err := rows.Err(); err != nil {
358358+ return nil, fmt.Errorf("iterate events: %w", err)
359359+ }
360360+ return out, nil
361361+}
+21
store_migrate.go
···5757 created_at TEXT NOT NULL,
5858 PRIMARY KEY (did, rkey)
5959);
6060+6161+-- Outbound event log. Each row is one record we want to fan out to
6262+-- connected /events websocket subscribers (typically the Tangled
6363+-- appview) — today only sh.tangled.pipeline.status.
6464+--
6565+-- We persist instead of pushing through an in-memory channel so that
6666+-- (a) a reconnecting subscriber can resume from a cursor without
6767+-- missing events that happened during the gap, and
6868+-- (b) slow subscribers can't make us drop events for fast ones — they
6969+-- simply lag behind in the rowid space.
7070+--
7171+-- AUTOINCREMENT (vs plain INTEGER PRIMARY KEY) guarantees rowids
7272+-- strictly increase and never get reused if a row is ever deleted, so
7373+-- treating the created column as a monotonic cursor is safe forever.
7474+CREATE TABLE IF NOT EXISTS events (
7575+ created INTEGER PRIMARY KEY AUTOINCREMENT,
7676+ rkey TEXT NOT NULL,
7777+ nsid TEXT NOT NULL,
7878+ event_json TEXT NOT NULL,
7979+ inserted_at TEXT NOT NULL
8080+);
6081`
61826283// migrate applies the schema. Safe to call repeatedly.