Go boilerplate library for building atproto apps
atproto
go
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "testing"
7)
8
9func TestNew(t *testing.T) {
10 c := New(&Config{
11 WantedCollections: []string{"app.bsky.feed.post"},
12 }, func(ctx context.Context, evt *Event) error {
13 return nil
14 })
15 if c == nil {
16 t.Fatal("expected non-nil consumer")
17 }
18 if c.IsConnected() {
19 t.Fatal("should not be connected before Start")
20 }
21}
22
23func TestConfig_Defaults(t *testing.T) {
24 cfg := &Config{}
25 if eps := cfg.endpoints(); len(eps) == 0 {
26 t.Fatal("expected default endpoints")
27 }
28 if cfg.cursorPersistEvery() != 1000 {
29 t.Fatalf("expected 1000, got %d", cfg.cursorPersistEvery())
30 }
31}
32
33func TestConfig_CustomEndpoints(t *testing.T) {
34 cfg := &Config{Endpoints: []string{"wss://custom.example.com/subscribe"}}
35 eps := cfg.endpoints()
36 if len(eps) != 1 || eps[0] != "wss://custom.example.com/subscribe" {
37 t.Fatalf("unexpected endpoints: %v", eps)
38 }
39}
40
41func TestBuildURL_Collections(t *testing.T) {
42 c := New(&Config{
43 Endpoints: []string{"wss://jetstream1.us-east.bsky.network/subscribe"},
44 WantedCollections: []string{"app.bsky.feed.post", "app.bsky.feed.like"},
45 }, func(ctx context.Context, evt *Event) error { return nil })
46
47 u, err := c.buildURL("wss://jetstream1.us-east.bsky.network/subscribe")
48 if err != nil {
49 t.Fatal(err)
50 }
51 if u == "" {
52 t.Fatal("expected non-empty URL")
53 }
54 // Should contain wantedCollections params
55 if !contains(u, "wantedCollections=app.bsky.feed.post") {
56 t.Errorf("URL missing wantedCollections: %s", u)
57 }
58}
59
60func TestBuildURL_Cursor(t *testing.T) {
61 c := New(&Config{
62 Endpoints: []string{"wss://jetstream1.us-east.bsky.network/subscribe"},
63 }, func(ctx context.Context, evt *Event) error { return nil })
64
65 c.cursor.Store(1000000)
66 u, err := c.buildURL("wss://jetstream1.us-east.bsky.network/subscribe")
67 if err != nil {
68 t.Fatal(err)
69 }
70 if !contains(u, "cursor=") {
71 t.Errorf("URL missing cursor: %s", u)
72 }
73}
74
75func TestProcess_ValidEvent(t *testing.T) {
76 var received *Event
77 c := New(&Config{}, func(ctx context.Context, evt *Event) error {
78 received = evt
79 return nil
80 })
81
82 evt := Event{
83 DID: "did:plc:test",
84 TimeUS: 1234567890,
85 Kind: "commit",
86 Commit: &Commit{
87 Operation: "create",
88 Collection: "app.bsky.feed.post",
89 RKey: "abc123",
90 },
91 }
92 data, _ := json.Marshal(evt)
93
94 if err := c.process(context.Background(), data); err != nil {
95 t.Fatal(err)
96 }
97 if received == nil {
98 t.Fatal("expected handler to be called")
99 }
100 if received.DID != "did:plc:test" {
101 t.Fatalf("got DID %q", received.DID)
102 }
103}
104
105func TestProcess_UpdatesCursor(t *testing.T) {
106 c := New(&Config{}, func(ctx context.Context, evt *Event) error { return nil })
107
108 data, _ := json.Marshal(Event{DID: "did:plc:test", TimeUS: 9999999})
109 c.process(context.Background(), data)
110
111 if c.cursor.Load() != 9999999 {
112 t.Fatalf("cursor not updated: %d", c.cursor.Load())
113 }
114}
115
116func TestProcess_InvalidJSON(t *testing.T) {
117 c := New(&Config{}, func(ctx context.Context, evt *Event) error { return nil })
118 err := c.process(context.Background(), []byte("{bad json"))
119 if err == nil {
120 t.Fatal("expected error for invalid JSON")
121 }
122}
123
124func TestStats(t *testing.T) {
125 c := New(&Config{}, func(ctx context.Context, evt *Event) error { return nil })
126 evts, bytes := c.Stats()
127 if evts != 0 || bytes != 0 {
128 t.Fatalf("expected zero stats, got events=%d bytes=%d", evts, bytes)
129 }
130}
131
132func contains(s, substr string) bool {
133 return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsStr(s, substr))
134}
135
136func containsStr(s, substr string) bool {
137 for i := 0; i <= len(s)-len(substr); i++ {
138 if s[i:i+len(substr)] == substr {
139 return true
140 }
141 }
142 return false
143}