package jetstream import ( "context" "encoding/json" "testing" ) func TestNew(t *testing.T) { c := New(&Config{ WantedCollections: []string{"app.bsky.feed.post"}, }, func(ctx context.Context, evt *Event) error { return nil }) if c == nil { t.Fatal("expected non-nil consumer") } if c.IsConnected() { t.Fatal("should not be connected before Start") } } func TestConfig_Defaults(t *testing.T) { cfg := &Config{} if eps := cfg.endpoints(); len(eps) == 0 { t.Fatal("expected default endpoints") } if cfg.cursorPersistEvery() != 1000 { t.Fatalf("expected 1000, got %d", cfg.cursorPersistEvery()) } } func TestConfig_CustomEndpoints(t *testing.T) { cfg := &Config{Endpoints: []string{"wss://custom.example.com/subscribe"}} eps := cfg.endpoints() if len(eps) != 1 || eps[0] != "wss://custom.example.com/subscribe" { t.Fatalf("unexpected endpoints: %v", eps) } } func TestBuildURL_Collections(t *testing.T) { c := New(&Config{ Endpoints: []string{"wss://jetstream1.us-east.bsky.network/subscribe"}, WantedCollections: []string{"app.bsky.feed.post", "app.bsky.feed.like"}, }, func(ctx context.Context, evt *Event) error { return nil }) u, err := c.buildURL("wss://jetstream1.us-east.bsky.network/subscribe") if err != nil { t.Fatal(err) } if u == "" { t.Fatal("expected non-empty URL") } // Should contain wantedCollections params if !contains(u, "wantedCollections=app.bsky.feed.post") { t.Errorf("URL missing wantedCollections: %s", u) } } func TestBuildURL_Cursor(t *testing.T) { c := New(&Config{ Endpoints: []string{"wss://jetstream1.us-east.bsky.network/subscribe"}, }, func(ctx context.Context, evt *Event) error { return nil }) c.cursor.Store(1000000) u, err := c.buildURL("wss://jetstream1.us-east.bsky.network/subscribe") if err != nil { t.Fatal(err) } if !contains(u, "cursor=") { t.Errorf("URL missing cursor: %s", u) } } func TestProcess_ValidEvent(t *testing.T) { var received *Event c := New(&Config{}, func(ctx context.Context, evt *Event) error { received = evt return nil }) evt := Event{ DID: "did:plc:test", TimeUS: 1234567890, Kind: "commit", Commit: &Commit{ Operation: "create", Collection: "app.bsky.feed.post", RKey: "abc123", }, } data, _ := json.Marshal(evt) if err := c.process(context.Background(), data); err != nil { t.Fatal(err) } if received == nil { t.Fatal("expected handler to be called") } if received.DID != "did:plc:test" { t.Fatalf("got DID %q", received.DID) } } func TestProcess_UpdatesCursor(t *testing.T) { c := New(&Config{}, func(ctx context.Context, evt *Event) error { return nil }) data, _ := json.Marshal(Event{DID: "did:plc:test", TimeUS: 9999999}) c.process(context.Background(), data) if c.cursor.Load() != 9999999 { t.Fatalf("cursor not updated: %d", c.cursor.Load()) } } func TestProcess_InvalidJSON(t *testing.T) { c := New(&Config{}, func(ctx context.Context, evt *Event) error { return nil }) err := c.process(context.Background(), []byte("{bad json")) if err == nil { t.Fatal("expected error for invalid JSON") } } func TestStats(t *testing.T) { c := New(&Config{}, func(ctx context.Context, evt *Event) error { return nil }) evts, bytes := c.Stats() if evts != 0 || bytes != 0 { t.Fatalf("expected zero stats, got events=%d bytes=%d", evts, bytes) } } func contains(s, substr string) bool { return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsStr(s, substr)) } func containsStr(s, substr string) bool { for i := 0; i <= len(s)-len(substr); i++ { if s[i:i+len(substr)] == substr { return true } } return false }