Go boilerplate library for building atproto apps
atproto go
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: firehose subscriber

authored by

Patrick Dewey and committed by tangled.org 2795bc15 413a41a1

+107
+65
testpds/testpds.go
··· 10 10 "crypto/rand" 11 11 "encoding/json" 12 12 "fmt" 13 + "log/slog" 13 14 "net" 14 15 "net/http" 15 16 "testing" ··· 21 22 22 23 "github.com/bluesky-social/indigo/api/atproto" 23 24 "github.com/bluesky-social/indigo/atproto/atcrypto" 25 + "github.com/bluesky-social/indigo/events" 26 + "github.com/bluesky-social/indigo/events/schedulers/sequential" 24 27 "github.com/bluesky-social/indigo/xrpc" 28 + "github.com/gorilla/websocket" 25 29 cocoonplc "github.com/haileyok/cocoon/plc" 26 30 "github.com/haileyok/cocoon/server" 27 31 "github.com/lestrrat-go/jwx/v2/jwk" ··· 199 203 Did: out.Did, 200 204 Handle: out.Handle, 201 205 }) 206 + } 207 + 208 + // FirehoseEvent is a union of the event types delivered by subscribeRepos. 209 + // Exactly one field will be non-nil per event. 210 + type FirehoseEvent struct { 211 + Commit *atproto.SyncSubscribeRepos_Commit 212 + Identity *atproto.SyncSubscribeRepos_Identity 213 + Account *atproto.SyncSubscribeRepos_Account 214 + } 215 + 216 + // Subscribe connects to the PDS firehose (com.atproto.sync.subscribeRepos) and 217 + // delivers events on the returned channel. The connection is closed and the 218 + // channel drained when ctx is cancelled. An optional cursor resumes from a 219 + // specific sequence number (pass 0 to start from live). 220 + func (p *TestPDS) Subscribe(ctx context.Context, cursor int64) (<-chan FirehoseEvent, error) { 221 + wsURL := fmt.Sprintf("ws://%s/xrpc/com.atproto.sync.subscribeRepos", p.URL[len("http://"):]) 222 + if cursor > 0 { 223 + wsURL = fmt.Sprintf("%s?cursor=%d", wsURL, cursor) 224 + } 225 + 226 + conn, _, err := websocket.DefaultDialer.DialContext(ctx, wsURL, nil) 227 + if err != nil { 228 + return nil, fmt.Errorf("testpds: subscribe: %w", err) 229 + } 230 + 231 + ch := make(chan FirehoseEvent, 256) 232 + 233 + rsc := &events.RepoStreamCallbacks{ 234 + RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 235 + select { 236 + case ch <- FirehoseEvent{Commit: evt}: 237 + case <-ctx.Done(): 238 + } 239 + return nil 240 + }, 241 + RepoIdentity: func(evt *atproto.SyncSubscribeRepos_Identity) error { 242 + select { 243 + case ch <- FirehoseEvent{Identity: evt}: 244 + case <-ctx.Done(): 245 + } 246 + return nil 247 + }, 248 + RepoAccount: func(evt *atproto.SyncSubscribeRepos_Account) error { 249 + select { 250 + case ch <- FirehoseEvent{Account: evt}: 251 + case <-ctx.Done(): 252 + } 253 + return nil 254 + }, 255 + } 256 + 257 + sched := sequential.NewScheduler("testpds", rsc.EventHandler) 258 + 259 + go func() { 260 + defer close(ch) 261 + defer conn.Close() 262 + defer sched.Shutdown() 263 + _ = events.HandleRepoStream(ctx, conn, sched, slog.Default()) 264 + }() 265 + 266 + return ch, nil 202 267 } 203 268 204 269 func freePort(t *testing.T) int {
+42
testpds/testpds_test.go
··· 96 96 } 97 97 } 98 98 99 + func TestSubscribe(t *testing.T) { 100 + pds := testpds.Start(t, nil) 101 + ctx, cancel := context.WithCancel(context.Background()) 102 + defer cancel() 103 + 104 + ch, err := pds.Subscribe(ctx, 0) 105 + if err != nil { 106 + t.Fatalf("subscribe: %v", err) 107 + } 108 + 109 + // Create an account — this should emit identity + commit events 110 + alice := pds.MustCreateAccount(t, "alice.test", "alice@test.com", "password123") 111 + 112 + // Create a record to generate a commit event 113 + _, err = atproto.RepoCreateRecord(ctx, alice, &atproto.RepoCreateRecord_Input{ 114 + Repo: alice.Auth.Did, 115 + Collection: "app.bsky.feed.post", 116 + Record: &lexutil.LexiconTypeDecoder{Val: &bsky.FeedPost{ 117 + Text: "hello firehose", 118 + CreatedAt: time.Now().Format(time.RFC3339), 119 + }}, 120 + }) 121 + if err != nil { 122 + t.Fatalf("create record: %v", err) 123 + } 124 + 125 + // We should receive at least one commit event for the post 126 + deadline := time.After(5 * time.Second) 127 + var gotCommit bool 128 + for !gotCommit { 129 + select { 130 + case evt := <-ch: 131 + if evt.Commit != nil && evt.Commit.Repo == alice.Auth.Did { 132 + gotCommit = true 133 + t.Logf("got commit event: repo=%s seq=%d ops=%d", evt.Commit.Repo, evt.Commit.Seq, len(evt.Commit.Ops)) 134 + } 135 + case <-deadline: 136 + t.Fatal("timed out waiting for commit event") 137 + } 138 + } 139 + } 140 + 99 141 func TestRecordCRUD(t *testing.T) { 100 142 pds := testpds.Start(t, nil) 101 143 ctx := context.Background()