Stitch any CI into Tangled
151
fork

Configure Feed

Select the types of activity you want to include in your feed.

jetstream: extract reusable consumer into internal/jetstream

The consumer code in jetstream.go mixed Tack-specific concerns
(collections, applyCommit, store cursor) with generic firehose
mechanics: configuring the upstream client, looping reconnects,
rewinding time-based cursors, persisting cursor progress, and
distinguishing permanent bad-record failures from transient
handler failures. None of those are specific to Tack and they are
the bits most likely to be reused for future jetstream consumers.

Move the generic mechanics into a new internal/jetstream package.

authored by

Mitchell Hashimoto and committed by
Tangled
b6e55404 c731f142

+639 -201
+195
internal/jetstream/consumer.go
··· 1 + package jetstream 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "log/slog" 8 + "time" 9 + 10 + "github.com/bluesky-social/jetstream/pkg/client" 11 + "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 12 + ) 13 + 14 + const ( 15 + // DefaultRewind is the cursor safety buffer used on reconnect. Jetstream 16 + // cursors are time-based, so reconnecting a few seconds before the saved 17 + // cursor avoids exact-boundary gaps at the cost of harmless duplicate 18 + // deliveries for idempotent handlers. 19 + DefaultRewind = 5 * time.Second 20 + 21 + // DefaultReconnectDelay is the pause between failed websocket reads. 22 + DefaultReconnectDelay = 2 * time.Second 23 + ) 24 + 25 + // Config configures a reusable jetstream consumer. Use Processor directly when 26 + // you already have events from another source and do not need websocket setup. 27 + type Config struct { 28 + // WebsocketURL is the jetstream endpoint used by Consumer. 29 + WebsocketURL string 30 + 31 + // Collections is the set of record collection NSIDs to request from 32 + // jetstream and process locally. An empty slice means "all collections", 33 + // matching the upstream client behavior. 34 + Collections []string 35 + 36 + CursorStore CursorStore 37 + Handler Handler 38 + Logger *slog.Logger 39 + 40 + // SchedulerIdent names the upstream sequential scheduler. It defaults to 41 + // "jetstream". 42 + SchedulerIdent string 43 + 44 + // Rewind controls how far before a saved cursor the Consumer reconnects. 45 + // Zero uses DefaultRewind. 46 + Rewind time.Duration 47 + 48 + // ReconnectDelay controls how long the Consumer waits before reconnecting 49 + // after a failed read loop. Zero uses DefaultReconnectDelay. 50 + ReconnectDelay time.Duration 51 + } 52 + 53 + // Consumer owns the upstream jetstream client and reconnect loop. 54 + type Consumer struct { 55 + client *client.Client 56 + processor *Processor 57 + cursorStore CursorStore 58 + logger *slog.Logger 59 + rewind time.Duration 60 + reconnectDelay time.Duration 61 + } 62 + 63 + // NewConsumer builds a Consumer. Call Run to enter the blocking reconnect loop, 64 + // or use Start to run it in a background goroutine. 65 + func NewConsumer(cfg Config) (*Consumer, error) { 66 + if cfg.WebsocketURL == "" { 67 + return nil, errors.New("websocket URL is required") 68 + } 69 + 70 + processor := &Processor{ 71 + Collections: cfg.Collections, 72 + CursorStore: cfg.CursorStore, 73 + Handler: cfg.Handler, 74 + Logger: cfg.Logger, 75 + } 76 + if err := processor.validate(); err != nil { 77 + return nil, err 78 + } 79 + 80 + cfg = withDefaults(cfg) 81 + clientCfg := client.DefaultClientConfig() 82 + clientCfg.WebsocketURL = cfg.WebsocketURL 83 + clientCfg.WantedCollections = append([]string(nil), cfg.Collections...) 84 + 85 + c, err := client.NewClient( 86 + clientCfg, 87 + cfg.Logger, 88 + sequential.NewScheduler(cfg.SchedulerIdent, cfg.Logger, processor.HandleEvent), 89 + ) 90 + if err != nil { 91 + return nil, fmt.Errorf("new jetstream client: %w", err) 92 + } 93 + 94 + return &Consumer{ 95 + client: c, 96 + processor: processor, 97 + cursorStore: cfg.CursorStore, 98 + logger: cfg.Logger, 99 + rewind: cfg.Rewind, 100 + reconnectDelay: cfg.ReconnectDelay, 101 + }, nil 102 + } 103 + 104 + // Start creates a Consumer and runs it in a background goroutine for the 105 + // lifetime of ctx. 106 + func Start(ctx context.Context, cfg Config) (*Consumer, error) { 107 + c, err := NewConsumer(cfg) 108 + if err != nil { 109 + return nil, err 110 + } 111 + go c.Run(ctx) 112 + return c, nil 113 + } 114 + 115 + // Run consumes jetstream until ctx is cancelled. Connection failures are logged 116 + // and retried after ReconnectDelay because the websocket is expected to be 117 + // long-lived but not permanent. 118 + func (c *Consumer) Run(ctx context.Context) { 119 + for { 120 + if ctx.Err() != nil { 121 + return 122 + } 123 + 124 + cur, err := c.cursorStore.LoadCursor(ctx) 125 + if err != nil { 126 + c.logger.Warn("ignoring unreadable cursor; resuming from now", "err", err) 127 + cur = nil 128 + } 129 + 130 + cursorForLog := cur 131 + cursorForRead := c.rewindCursor(cur) 132 + if cursorForRead != nil { 133 + c.logger.Info("connecting to jetstream", 134 + "cursor_us", *cursorForLog, 135 + "rewound_us", *cursorForRead, 136 + "rewind", c.rewind, 137 + ) 138 + } else { 139 + c.logger.Info("connecting to jetstream from now (no cursor)") 140 + } 141 + 142 + if err := c.client.ConnectAndRead(ctx, cursorForRead); err != nil { 143 + if ctx.Err() != nil { 144 + return 145 + } 146 + c.logger.Error("jetstream read loop", "err", err) 147 + select { 148 + case <-ctx.Done(): 149 + return 150 + case <-time.After(c.reconnectDelay): 151 + } 152 + continue 153 + } 154 + if ctx.Err() != nil { 155 + return 156 + } 157 + } 158 + } 159 + 160 + // rewindCursor returns the cursor value to hand to jetstream on reconnect. 161 + // Jetstream cursors are microsecond timestamps and exact-boundary replay is not 162 + // guaranteed, so we intentionally resume slightly before the last saved cursor. 163 + // The duplicate window is expected to be safe for idempotent handlers; clamping 164 + // at zero keeps brand-new or very-early cursors valid. 165 + func (c *Consumer) rewindCursor(cur *int64) *int64 { 166 + if cur == nil { 167 + return nil 168 + } 169 + rewound := *cur - int64(c.rewind/time.Microsecond) 170 + if rewound < 0 { 171 + rewound = 0 172 + } 173 + return &rewound 174 + } 175 + 176 + func withDefaults(cfg Config) Config { 177 + cfg.Logger = loggerOrDefault(cfg.Logger) 178 + if cfg.SchedulerIdent == "" { 179 + cfg.SchedulerIdent = "jetstream" 180 + } 181 + if cfg.Rewind == 0 { 182 + cfg.Rewind = DefaultRewind 183 + } 184 + if cfg.ReconnectDelay == 0 { 185 + cfg.ReconnectDelay = DefaultReconnectDelay 186 + } 187 + return cfg 188 + } 189 + 190 + func loggerOrDefault(logger *slog.Logger) *slog.Logger { 191 + if logger != nil { 192 + return logger 193 + } 194 + return slog.Default() 195 + }
+22
internal/jetstream/consumer_test.go
··· 1 + package jetstream 2 + 3 + import ( 4 + "testing" 5 + "time" 6 + ) 7 + 8 + func TestConsumerRewindCursor(t *testing.T) { 9 + c := &Consumer{rewind: 5 * time.Second} 10 + 11 + cursor := int64(10_000_000) 12 + rewound := c.rewindCursor(&cursor) 13 + if rewound == nil || *rewound != 5_000_000 { 14 + t.Fatalf("rewound = %v, want 5000000", rewound) 15 + } 16 + 17 + cursor = 2_000_000 18 + rewound = c.rewindCursor(&cursor) 19 + if rewound == nil || *rewound != 0 { 20 + t.Fatalf("rewound = %v, want 0", rewound) 21 + } 22 + }
+61
internal/jetstream/cursor.go
··· 1 + package jetstream 2 + 3 + import ( 4 + "context" 5 + "sync" 6 + ) 7 + 8 + // CursorStore persists the jetstream cursor. Implementations commonly back this 9 + // with a database row so process restarts can resume from the last applied 10 + // event. 11 + type CursorStore interface { 12 + LoadCursor(context.Context) (*int64, error) 13 + SaveCursor(context.Context, int64) error 14 + } 15 + 16 + // MemoryCursorStore is an in-memory CursorStore implementation. It is useful 17 + // for tests and for consumers that intentionally do not need cursor persistence 18 + // across process restarts. 19 + // 20 + // The zero value is ready to use and starts with no cursor, which makes the 21 + // consumer start from "now" until SaveCursor is called. 22 + type MemoryCursorStore struct { 23 + mu sync.Mutex 24 + cursor *int64 25 + } 26 + 27 + var _ CursorStore = (*MemoryCursorStore)(nil) 28 + 29 + // NewMemoryCursorStore returns a MemoryCursorStore initialized with cursor. A 30 + // nil cursor means no cursor has been saved yet. 31 + func NewMemoryCursorStore(cursor *int64) *MemoryCursorStore { 32 + s := &MemoryCursorStore{} 33 + if cursor != nil { 34 + v := *cursor 35 + s.cursor = &v 36 + } 37 + return s 38 + } 39 + 40 + // LoadCursor returns the last cursor saved in memory, or nil if none has been 41 + // saved. The returned pointer is a copy so callers cannot mutate store state 42 + // without going through SaveCursor. 43 + func (s *MemoryCursorStore) LoadCursor(context.Context) (*int64, error) { 44 + s.mu.Lock() 45 + defer s.mu.Unlock() 46 + 47 + if s.cursor == nil { 48 + return nil, nil 49 + } 50 + cursor := *s.cursor 51 + return &cursor, nil 52 + } 53 + 54 + // SaveCursor stores cursor in memory. 55 + func (s *MemoryCursorStore) SaveCursor(_ context.Context, cursor int64) error { 56 + s.mu.Lock() 57 + defer s.mu.Unlock() 58 + 59 + s.cursor = &cursor 60 + return nil 61 + }
+16
internal/jetstream/doc.go
··· 1 + // Package jetstream provides a reusable Bluesky Jetstream consumer. 2 + // 3 + // The package owns the mechanics that are common to firehose consumers: 4 + // configuring the upstream websocket client, filtering by record collection, 5 + // reconnecting after dropped reads, rewinding time-based cursors on reconnect, 6 + // persisting cursor progress, and distinguishing permanent bad-record failures 7 + // from transient handler failures. 8 + // 9 + // Callers provide a CursorStore and Handler, either through Config for a full 10 + // Consumer or directly on Processor when they already have events from another 11 + // source. The handler receives commit events for the configured collections and 12 + // should apply domain-specific mutations. If a record is permanently unusable, 13 + // the handler should return BadRecord(err) so the processor can advance the 14 + // cursor and avoid replaying the same broken event forever. Any other error 15 + // leaves the cursor unchanged so a later delivery can retry the event. 16 + package jetstream
+146
internal/jetstream/processor.go
··· 1 + package jetstream 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "log/slog" 8 + 9 + jsmodels "github.com/bluesky-social/jetstream/pkg/models" 10 + ) 11 + 12 + // Handler applies one commit event. Returning BadRecord(err) tells the 13 + // Processor that the event is permanently unusable and that the cursor should 14 + // still advance; any other error is treated as transient and leaves the cursor 15 + // unchanged for a later retry. 16 + type Handler interface { 17 + HandleJetstreamEvent(context.Context, *jsmodels.Event) error 18 + } 19 + 20 + // HandlerFunc adapts a function to Handler. 21 + type HandlerFunc func(context.Context, *jsmodels.Event) error 22 + 23 + // HandleJetstreamEvent calls f(ctx, event). 24 + func (f HandlerFunc) HandleJetstreamEvent(ctx context.Context, event *jsmodels.Event) error { 25 + return f(ctx, event) 26 + } 27 + 28 + // Processor handles per-event policy that is independent of websocket IO. 29 + type Processor struct { 30 + // Collections is the set of record collection NSIDs to process locally. An 31 + // empty slice means "all collections". 32 + Collections []string 33 + 34 + // CursorStore persists progress after events are handled. 35 + CursorStore CursorStore 36 + 37 + // Handler applies commit events that pass the collection filter. 38 + Handler Handler 39 + 40 + // Logger receives apply errors and ignored-collection diagnostics. A nil 41 + // Logger uses slog.Default(). 42 + Logger *slog.Logger 43 + } 44 + 45 + // HandleEvent applies a single jetstream event and advances the cursor when it 46 + // is safe to do so. 47 + func (p *Processor) HandleEvent(ctx context.Context, event *jsmodels.Event) error { 48 + if event == nil || event.Kind != jsmodels.EventKindCommit || event.Commit == nil { 49 + return nil 50 + } 51 + 52 + if err := p.validate(); err != nil { 53 + return err 54 + } 55 + 56 + logger := loggerOrDefault(p.Logger) 57 + wanted, err := p.wantsCollection(event.Commit.Collection) 58 + if err != nil { 59 + return err 60 + } 61 + if !wanted { 62 + logger.Debug("ignoring unexpected collection", 63 + "collection", event.Commit.Collection) 64 + return p.saveCursor(ctx, event.TimeUS) 65 + } 66 + 67 + applyErr := p.Handler.HandleJetstreamEvent(ctx, event) 68 + if applyErr != nil { 69 + logger.Error("apply commit", 70 + "err", applyErr, 71 + "did", event.Did, 72 + "collection", event.Commit.Collection, 73 + "op", event.Commit.Operation, 74 + "rkey", event.Commit.RKey, 75 + "transient", !IsBadRecord(applyErr), 76 + ) 77 + 78 + if !IsBadRecord(applyErr) { 79 + return applyErr 80 + } 81 + } 82 + 83 + return p.saveCursor(ctx, event.TimeUS) 84 + } 85 + 86 + func (p *Processor) saveCursor(ctx context.Context, cursor int64) error { 87 + if err := p.CursorStore.SaveCursor(ctx, cursor); err != nil { 88 + return fmt.Errorf("save cursor: %w", err) 89 + } 90 + return nil 91 + } 92 + 93 + func (p *Processor) validate() error { 94 + if p.CursorStore == nil { 95 + return errors.New("cursor store is required") 96 + } 97 + if p.Handler == nil { 98 + return errors.New("handler is required") 99 + } 100 + for _, collection := range p.Collections { 101 + if collection == "" { 102 + return errors.New("collection must not be empty") 103 + } 104 + } 105 + return nil 106 + } 107 + 108 + func (p *Processor) wantsCollection(collection string) (bool, error) { 109 + if len(p.Collections) == 0 { 110 + return true, nil 111 + } 112 + for _, wanted := range p.Collections { 113 + if wanted == "" { 114 + return false, errors.New("collection must not be empty") 115 + } 116 + if wanted == collection { 117 + return true, nil 118 + } 119 + } 120 + return false, nil 121 + } 122 + 123 + // badRecordError marks a handler failure as caused by the record itself being 124 + // permanently unusable, e.g. malformed JSON or an unrecoverable schema 125 + // violation. Processors advance the cursor past these errors so one bad event 126 + // cannot stall every later event on restart. 127 + type badRecordError struct{ err error } 128 + 129 + func (e *badRecordError) Error() string { return e.err.Error() } 130 + func (e *badRecordError) Unwrap() error { return e.err } 131 + 132 + // BadRecord wraps err so Processor recognizes it as a permanent, 133 + // cursor-advancing failure. Do not use this for storage, network, or other 134 + // transient infrastructure failures. 135 + func BadRecord(err error) error { 136 + if err == nil { 137 + return nil 138 + } 139 + return &badRecordError{err: err} 140 + } 141 + 142 + // IsBadRecord reports whether err, or anything it wraps, came from BadRecord. 143 + func IsBadRecord(err error) bool { 144 + var b *badRecordError 145 + return errors.As(err, &b) 146 + }
+149
internal/jetstream/processor_test.go
··· 1 + package jetstream 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "io" 7 + "log/slog" 8 + "testing" 9 + 10 + jsmodels "github.com/bluesky-social/jetstream/pkg/models" 11 + ) 12 + 13 + func testLogger() *slog.Logger { 14 + return slog.New(slog.NewTextHandler(io.Discard, nil)) 15 + } 16 + 17 + func requireMemoryCursor(t *testing.T, store CursorStore, want *int64) { 18 + t.Helper() 19 + 20 + got, err := store.LoadCursor(context.Background()) 21 + if err != nil { 22 + t.Fatalf("load cursor: %v", err) 23 + } 24 + if want == nil { 25 + if got != nil { 26 + t.Fatalf("cursor = %v, want nil", *got) 27 + } 28 + return 29 + } 30 + if got == nil || *got != *want { 31 + t.Fatalf("cursor = %v, want %d", got, *want) 32 + } 33 + } 34 + 35 + func testCommit(timeUS int64, collection string) *jsmodels.Event { 36 + return &jsmodels.Event{ 37 + Did: "did:plc:test", 38 + TimeUS: timeUS, 39 + Kind: jsmodels.EventKindCommit, 40 + Commit: &jsmodels.Commit{ 41 + Operation: "create", 42 + Collection: collection, 43 + RKey: "rk", 44 + }, 45 + } 46 + } 47 + 48 + func TestProcessorCommitAdvancesCursor(t *testing.T) { 49 + store := NewMemoryCursorStore(nil) 50 + called := false 51 + 52 + processor := &Processor{ 53 + Collections: []string{"example.collection"}, 54 + CursorStore: store, 55 + Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error { 56 + called = true 57 + return nil 58 + }), 59 + Logger: testLogger(), 60 + } 61 + 62 + if err := processor.HandleEvent(context.Background(), testCommit(123, "example.collection")); err != nil { 63 + t.Fatalf("handle: %v", err) 64 + } 65 + if !called { 66 + t.Fatalf("handler was not called") 67 + } 68 + want := int64(123) 69 + requireMemoryCursor(t, store, &want) 70 + } 71 + 72 + func TestProcessorIgnoresNonCommit(t *testing.T) { 73 + store := NewMemoryCursorStore(nil) 74 + processor := &Processor{ 75 + CursorStore: store, 76 + Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error { 77 + t.Fatalf("handler should not be called") 78 + return nil 79 + }), 80 + Logger: testLogger(), 81 + } 82 + 83 + event := &jsmodels.Event{ 84 + Did: "did:plc:test", 85 + TimeUS: 123, 86 + Kind: jsmodels.EventKindAccount, 87 + } 88 + if err := processor.HandleEvent(context.Background(), event); err != nil { 89 + t.Fatalf("handle: %v", err) 90 + } 91 + requireMemoryCursor(t, store, nil) 92 + } 93 + 94 + func TestProcessorUnexpectedCollectionAdvancesCursor(t *testing.T) { 95 + store := NewMemoryCursorStore(nil) 96 + processor := &Processor{ 97 + Collections: []string{"wanted.collection"}, 98 + CursorStore: store, 99 + Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error { 100 + t.Fatalf("handler should not be called") 101 + return nil 102 + }), 103 + Logger: testLogger(), 104 + } 105 + 106 + if err := processor.HandleEvent(context.Background(), testCommit(456, "other.collection")); err != nil { 107 + t.Fatalf("handle: %v", err) 108 + } 109 + want := int64(456) 110 + requireMemoryCursor(t, store, &want) 111 + } 112 + 113 + func TestProcessorBadRecordAdvancesCursor(t *testing.T) { 114 + store := NewMemoryCursorStore(nil) 115 + processor := &Processor{ 116 + Collections: []string{"example.collection"}, 117 + CursorStore: store, 118 + Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error { 119 + return BadRecord(errors.New("decode failed")) 120 + }), 121 + Logger: testLogger(), 122 + } 123 + 124 + if err := processor.HandleEvent(context.Background(), testCommit(789, "example.collection")); err != nil { 125 + t.Fatalf("handle: %v", err) 126 + } 127 + want := int64(789) 128 + requireMemoryCursor(t, store, &want) 129 + } 130 + 131 + func TestProcessorTransientErrorDoesNotAdvanceCursor(t *testing.T) { 132 + cursor := int64(100) 133 + store := NewMemoryCursorStore(&cursor) 134 + transientErr := errors.New("database busy") 135 + processor := &Processor{ 136 + Collections: []string{"example.collection"}, 137 + CursorStore: store, 138 + Handler: HandlerFunc(func(ctx context.Context, event *jsmodels.Event) error { 139 + return transientErr 140 + }), 141 + Logger: testLogger(), 142 + } 143 + 144 + err := processor.HandleEvent(context.Background(), testCommit(200, "example.collection")) 145 + if !errors.Is(err, transientErr) { 146 + t.Fatalf("handle error = %v, want %v", err, transientErr) 147 + } 148 + requireMemoryCursor(t, store, &cursor) 149 + }
+28 -201
jetstream.go
··· 20 20 import ( 21 21 "context" 22 22 "encoding/json" 23 - "errors" 24 23 "fmt" 25 - "time" 26 24 27 - "github.com/bluesky-social/jetstream/pkg/client" 28 - "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 29 25 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 26 + js "go.mitchellh.com/tack/internal/jetstream" 30 27 "tangled.org/core/api/tangled" 31 28 ) 32 29 33 30 // jetstream operation strings. The jetstream protocol publishes these as 34 31 // the Commit.Operation field; pulling them out as constants keeps the 35 - // switch in handleJetstreamEvent honest about typos. 32 + // switch in applyCommit honest about typos. 36 33 const ( 37 34 jsOpCreate = "create" 38 35 jsOpUpdate = "update" 39 36 jsOpDelete = "delete" 40 37 ) 41 38 42 - // jetstreamRewind is how far before the persisted cursor we resume on 43 - // reconnect. Jetstream's docs recommend rewinding a few seconds because 44 - // the cursor is a time-based filter and exact-boundary replay is not 45 - // guaranteed gapless across disconnects. The handler's mutations are 46 - // idempotent (UPSERTs and DELETEs keyed on (did, rkey)) so the small 47 - // amount of duplicate replay this introduces is harmless. 48 - const jetstreamRewind = 5 * time.Second 39 + var _ js.CursorStore = (*store)(nil) 40 + 41 + // jetstreamCollections is the server-side and local filter for the Tangled 42 + // records tack mirrors out of jetstream. 43 + var jetstreamCollections = []string{ 44 + tangled.SpindleMemberNSID, 45 + tangled.RepoNSID, 46 + tangled.RepoCollaboratorNSID, 47 + } 49 48 50 49 // startJetstream dials the configured jetstream endpoint and spawns a 51 50 // background goroutine that consumes events for the lifetime of ctx. It ··· 63 62 func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error { 64 63 logger := loggerFrom(ctx).With("component", "jetstream") 65 64 66 - // `wantedCollections` is a server-side filter: jetstream will only send 67 - // us commit events whose record collection (NSID) is in this list. The 68 - // NSIDs come from tangled-core's generated lexicon types so they stay 69 - // in sync with whatever the appview/knots are publishing. 70 - collections := []string{ 71 - tangled.SpindleMemberNSID, 72 - tangled.RepoNSID, 73 - tangled.RepoCollaboratorNSID, 74 - } 75 - 76 - // Configure our JetStream client. 77 - clientCfg := client.DefaultClientConfig() 78 - clientCfg.WebsocketURL = cfg.JetstreamURL 79 - clientCfg.WantedCollections = collections 80 - 81 65 // The handler closes over `st`, `knots`, the spindle hostname and 82 66 // the owner DID so the scheduler signature stays plain 83 67 // `func(ctx, *Event) error` and applyCommit can hand the knot 84 68 // consumer new sources as soon as matching repo records arrive 85 69 // (gated on the publisher being an authorized actor). 86 - handler := func(ctx context.Context, evt *jsmodels.Event) error { 87 - return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, cfg.OwnerDID, evt) 88 - } 70 + handler := js.HandlerFunc(func(ctx context.Context, evt *jsmodels.Event) error { 71 + return applyCommit(ctx, st, knots, cfg.Hostname, cfg.OwnerDID, evt) 72 + }) 89 73 90 74 // Re-attach the component-scoped logger so handler — which the 91 - // scheduler invokes with the ctx we pass to ConnectAndRead — can 92 - // pull it back out via loggerFrom. 75 + // consumer invokes with the ctx we pass to ConnectAndRead — can pull 76 + // it back out via loggerFrom. 93 77 ctx = loggerInto(ctx, logger) 94 78 95 - // The sequential scheduler processes events one-at-a-time in arrival 96 - // order. That's the right default for a spindle: ordering matters 97 - // (e.g. a member-added event must apply before any record from that 98 - // member is processed), and our event volume is tiny. 99 - c, err := client.NewClient( 100 - clientCfg, 101 - logger, 102 - sequential.NewScheduler("tack", logger, handler), 103 - ) 104 - if err != nil { 105 - return fmt.Errorf("new jetstream client: %w", err) 106 - } 107 - 108 - go func() { 109 - for { 110 - // We re-read the cursor from the store at every (re)connect so we 111 - // pick up any progress the previous connection persisted before 112 - // dying. nil means "start from now", which is the right default on 113 - // a brand-new install or after a corrupt cursor read. 114 - cur, err := st.LoadCursor(ctx) 115 - if err != nil { 116 - logger.Warn("ignoring unreadable cursor; resuming from now", "err", err) 117 - cur = nil 118 - } 119 - // Rewind a few seconds before the saved cursor on reconnect. 120 - // Jetstream cursors are time-based and the docs explicitly note 121 - // that exact-boundary replay is not guaranteed gapless, so a 122 - // small negative buffer protects against missing events that 123 - // straddle the disconnect. Duplicates the rewind produces are 124 - // safe: every applyCommit path is an idempotent upsert/delete 125 - // keyed on (did, rkey), and SaveCursor only moves forward in 126 - // practice because TimeUS is monotonic. 127 - if cur != nil { 128 - rewound := *cur - int64(jetstreamRewind/time.Microsecond) 129 - if rewound < 0 { 130 - rewound = 0 131 - } 132 - logger.Info("connecting to jetstream", 133 - "cursor_us", *cur, 134 - "rewound_us", rewound, 135 - "rewind", jetstreamRewind, 136 - ) 137 - cur = &rewound 138 - } else { 139 - logger.Info("connecting to jetstream from now (no cursor)") 140 - } 141 - 142 - // Reconnect loop. ConnectAndRead blocks on the websocket and returns 143 - // either when the connection drops (transient network error, server 144 - // restart, etc.) or when ctx is cancelled. On error we sleep briefly 145 - // and reconnect; on ctx cancellation we exit cleanly. 146 - if err := c.ConnectAndRead(ctx, cur); err != nil { 147 - if ctx.Err() != nil { 148 - return 149 - } 150 - logger.Error("jetstream read loop", "err", err) 151 - time.Sleep(2 * time.Second) 152 - continue 153 - } 154 - if ctx.Err() != nil { 155 - return 156 - } 157 - } 158 - }() 159 - 160 - return nil 161 - } 162 - 163 - // handleJetstreamEvent is the per-event callback for the JetStream. It 164 - // applies the event to the store and, when appropriate, advances the 165 - // persisted cursor. Any returned error is logged by the scheduler but 166 - // does not tear down the connection. 167 - // 168 - // Cursor-advancement policy: 169 - // 170 - // - Apply succeeded: advance the cursor. 171 - // - Apply failed with a badRecordError (malformed record / unusable 172 - // input): advance anyway, since replaying a permanently-broken 173 - // event on every reconnect would stall the firehose forever. 174 - // - Apply failed with anything else (treated as transient: store 175 - // hiccup, SQLite busy, disk full, shutdown race, ...): do NOT 176 - // advance. Saving the cursor here would permanently skip the 177 - // record, which for membership/repo state means we'd silently 178 - // lose a row that the rest of tack relies on. Leaving the cursor 179 - // in place gives the next reconnect (which rewinds by 180 - // jetstreamRewind) a chance to re-deliver and re-apply it. 181 - func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID string, evt *jsmodels.Event) error { 182 - // We only care about commits, which are the actual record CRUD 183 - // operations on a user's PDS. Account/identity events are ignored 184 - // for now; if we ever care about handle changes we can add them. 185 - if evt.Kind != jsmodels.EventKindCommit || evt.Commit == nil { 186 - return nil 187 - } 188 - logger := loggerFrom(ctx) 189 - 190 - // Dispatch on collection. Unknown collections shouldn't happen given 191 - // our wantedCollections filter, but be defensive — jetstream may 192 - // send schema changes ahead of us updating the filter. 193 - applyErr := applyCommit(ctx, st, knots, hostname, ownerDID, evt) 194 - if applyErr != nil { 195 - logger.Error("apply commit", 196 - "err", applyErr, 197 - "did", evt.Did, 198 - "collection", evt.Commit.Collection, 199 - "op", evt.Commit.Operation, 200 - "rkey", evt.Commit.RKey, 201 - "transient", !isBadRecord(applyErr), 202 - ) 203 - 204 - // Transient failure: bail without advancing the cursor so the 205 - // next delivery can retry this event. Returning the error 206 - // surfaces it in the scheduler's logs too. 207 - if !isBadRecord(applyErr) { 208 - return applyErr 209 - } 210 - // Otherwise (badRecordError) fall through to cursor save: a 211 - // single bad record shouldn't stall the cursor forever and 212 - // force us to re-process every subsequent event after a 213 - // restart. 214 - } 215 - 216 - // Advance the cursor. TimeUS is the jetstream-assigned microsecond 217 - // timestamp; saving it after-apply means a crash mid-batch will at 218 - // worst replay the failing event, never skip past it. 219 - if err := st.SaveCursor(ctx, evt.TimeUS); err != nil { 220 - // Returning the error logs it; it doesn't kill the scheduler. 221 - return fmt.Errorf("save cursor: %w", err) 222 - } 223 - 224 - return nil 225 - } 226 - 227 - // badRecordError marks an applyCommit failure as caused by the *record 228 - // itself* being permanently unusable, e.g. a malformed JSON body, or a 229 - // field that violates an invariant we can't recover from on retry. 230 - // 231 - // We need this distinction because the jetstream cursor is time-based 232 - // and once advanced past an event we will never see it again. So: 233 - // 234 - // - Permanent bad-input failures should still advance the cursor: 235 - // replaying the same broken record on every restart accomplishes 236 - // nothing and would stall progress on every later event behind it. 237 - // - Transient infrastructure failures (SQLite busy, disk full, store 238 - // closed mid-shutdown, etc.) must NOT advance the cursor: the 239 - // record is fine and the next attempt, on reconnect or after the 240 - // transient condition clears, should reapply it. Skipping past 241 - // such a failure can permanently lose membership/repo state. 242 - // 243 - // Anything returned from applyCommit that isn't a badRecordError is 244 - // treated as transient by handleJetstreamEvent. 245 - type badRecordError struct{ err error } 246 - 247 - func (e *badRecordError) Error() string { return e.err.Error() } 248 - func (e *badRecordError) Unwrap() error { return e.err } 249 - 250 - // badRecord wraps err so handleJetstreamEvent recognizes it as a 251 - // permanent, cursor-advancing failure. Use this for any error caused by 252 - // the contents of the record (decode errors, schema violations); never 253 - // for store/IO errors. 254 - func badRecord(err error) error { return &badRecordError{err: err} } 255 - 256 - // isBadRecord reports whether err (or anything it wraps) is a 257 - // badRecordError. 258 - func isBadRecord(err error) bool { 259 - var b *badRecordError 260 - return errors.As(err, &b) 79 + _, err := js.Start(ctx, js.Config{ 80 + WebsocketURL: cfg.JetstreamURL, 81 + Collections: jetstreamCollections, 82 + CursorStore: st, 83 + Handler: handler, 84 + Logger: logger, 85 + SchedulerIdent: "tack", 86 + }) 87 + return err 261 88 } 262 89 263 90 // applyCommit routes a commit to the right store mutation based on its ··· 296 123 if err := json.Unmarshal(c.Record, &rec); err != nil { 297 124 // Decode failures are a permanent property of the record's 298 125 // bytes; mark as bad so the cursor can advance past it. 299 - return badRecord(fmt.Errorf("decode spindle.member: %w", err)) 126 + return js.BadRecord(fmt.Errorf("decode spindle.member: %w", err)) 300 127 } 301 128 if err := st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt); err != nil { 302 129 return err ··· 340 167 var rec tangled.Repo 341 168 if err := json.Unmarshal(c.Record, &rec); err != nil { 342 169 // See applySpindleMember: decode errors are permanent. 343 - return badRecord(fmt.Errorf("decode repo: %w", err)) 170 + return js.BadRecord(fmt.Errorf("decode repo: %w", err)) 344 171 } 345 172 346 173 // Capture the prior (knot, spindle) before the upsert so the ··· 507 334 var rec tangled.RepoCollaborator 508 335 if err := json.Unmarshal(c.Record, &rec); err != nil { 509 336 // See applySpindleMember: decode errors are permanent. 510 - return badRecord(fmt.Errorf("decode repo.collaborator: %w", err)) 337 + return js.BadRecord(fmt.Errorf("decode repo.collaborator: %w", err)) 511 338 } 512 339 return st.UpsertRepoCollaborator(ctx, did, c.RKey, 513 340 deref(rec.Repo), deref(rec.RepoDid),
+22
jetstream_test.go
··· 14 14 "testing" 15 15 16 16 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 17 + js "go.mitchellh.com/tack/internal/jetstream" 17 18 "tangled.org/core/api/tangled" 18 19 ) 19 20 ··· 53 54 if got == nil || *got != want { 54 55 t.Fatalf("cursor = %v, want %d", got, want) 55 56 } 57 + } 58 + 59 + // handleJetstreamEvent is the testable per-event entry point. The generic 60 + // jetstream processor owns cursor advancement; this wrapper only supplies 61 + // Tack's collection list and commit application callback. 62 + func handleJetstreamEvent( 63 + ctx context.Context, 64 + st *store, 65 + knots KnotConsumer, 66 + hostname, ownerDID string, 67 + evt *jsmodels.Event, 68 + ) error { 69 + processor := &js.Processor{ 70 + Collections: jetstreamCollections, 71 + CursorStore: st, 72 + Handler: js.HandlerFunc(func(ctx context.Context, evt *jsmodels.Event) error { 73 + return applyCommit(ctx, st, knots, hostname, ownerDID, evt) 74 + }), 75 + Logger: loggerFrom(ctx), 76 + } 77 + return processor.HandleEvent(ctx, evt) 56 78 } 57 79 58 80 // TestHandleNonCommitEvent confirms account/identity events are ignored