package main import ( "context" "fmt" "testing" "time" ) func newTestEvent(path string) *Event { return &Event{ ID: "test-" + path, Timestamp: time.Now(), Path: path, Payload: map[string]any{"test": true}, } } func mustReceive(t *testing.T, ch <-chan *Event, timeout time.Duration) *Event { t.Helper() select { case e := <-ch: return e case <-time.After(timeout): t.Fatal("timed out waiting for event") return nil } } func mustNotReceive(t *testing.T, ch <-chan *Event, timeout time.Duration) { t.Helper() select { case e := <-ch: t.Fatalf("expected no event, got %+v", e) case <-time.After(timeout): } } func newTestBroker(bufferSize int) (*Broker, context.CancelFunc) { backend := NewMemoryBackend(bufferSize) broker := NewBroker(backend) ctx, cancel := context.WithCancel(context.Background()) broker.Start(ctx) return broker, cancel } func TestBroker_exactPathDelivery(t *testing.T) { b, cancel := newTestBroker(100) defer cancel() ch, unsub := b.Subscribe("github.com/chrisguidry/docketeer", "") defer unsub() event := newTestEvent("github.com/chrisguidry/docketeer") b.Publish(event) got := mustReceive(t, ch, time.Second) if got.ID != event.ID { t.Errorf("expected event %s, got %s", event.ID, got.ID) } } func TestBroker_parentReceivesChildEvents(t *testing.T) { b, cancel := newTestBroker(100) defer cancel() ch, unsub := b.Subscribe("github.com/chrisguidry", "") defer unsub() event := newTestEvent("github.com/chrisguidry/docketeer") b.Publish(event) got := mustReceive(t, ch, time.Second) if got.ID != event.ID { t.Errorf("expected event %s, got %s", event.ID, got.ID) } } func TestBroker_rootReceivesAll(t *testing.T) { b, cancel := newTestBroker(100) defer cancel() ch, unsub := b.Subscribe("", "") defer unsub() event := newTestEvent("github.com/chrisguidry/docketeer") b.Publish(event) got := mustReceive(t, ch, time.Second) if got.ID != event.ID { t.Errorf("expected event %s, got %s", event.ID, got.ID) } } func TestBroker_unrelatedSubscriberDoesNotReceive(t *testing.T) { b, cancel := newTestBroker(100) defer cancel() ch, unsub := b.Subscribe("gitlab.com", "") defer unsub() b.Publish(newTestEvent("github.com/chrisguidry/docketeer")) mustNotReceive(t, ch, 50*time.Millisecond) } func TestBroker_multipleSubscribersSamePath(t *testing.T) { b, cancel := newTestBroker(100) defer cancel() ch1, unsub1 := b.Subscribe("github.com/chrisguidry/docketeer", "") defer unsub1() ch2, unsub2 := b.Subscribe("github.com/chrisguidry/docketeer", "") defer unsub2() b.Publish(newTestEvent("github.com/chrisguidry/docketeer")) mustReceive(t, ch1, time.Second) mustReceive(t, ch2, time.Second) } func TestBroker_unsubscribeStopsDelivery(t *testing.T) { b, cancel := newTestBroker(100) defer cancel() ch, unsub := b.Subscribe("github.com/chrisguidry/docketeer", "") unsub() b.Publish(newTestEvent("github.com/chrisguidry/docketeer")) select { case _, ok := <-ch: if ok { t.Fatal("expected channel to be closed, but received an event") } case <-time.After(50 * time.Millisecond): t.Fatal("expected channel to be closed") } } func TestBroker_unsubscribeCleansUpEmptyPath(t *testing.T) { b, cancel := newTestBroker(100) defer cancel() _, unsub := b.Subscribe("ephemeral/path", "") b.mu.RLock() _, exists := b.subscribers["ephemeral/path"] b.mu.RUnlock() if !exists { t.Fatal("expected subscriber map entry to exist") } unsub() b.mu.RLock() _, exists = b.subscribers["ephemeral/path"] b.mu.RUnlock() if exists { t.Fatal("expected subscriber map entry to be deleted after last unsub") } } func TestBroker_ringBufferWraps(t *testing.T) { b, cancel := newTestBroker(5) defer cancel() for i := range 10 { b.Publish(&Event{ ID: fmt.Sprintf("event-%d", i), Path: "test", Payload: map[string]any{}, }) } ch, unsub := b.Subscribe("test", "event-4") defer unsub() for i := 5; i < 10; i++ { got := mustReceive(t, ch, time.Second) expected := fmt.Sprintf("event-%d", i) if got.ID != expected { t.Errorf("expected %s, got %s", expected, got.ID) } } } func TestBroker_lastEventIDReplay(t *testing.T) { b, cancel := newTestBroker(100) defer cancel() b.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) b.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) b.Publish(&Event{ID: "e3", Path: "test", Payload: map[string]any{}}) ch, unsub := b.Subscribe("test", "e1") defer unsub() got1 := mustReceive(t, ch, time.Second) got2 := mustReceive(t, ch, time.Second) if got1.ID != "e2" || got2.ID != "e3" { t.Errorf("expected e2 and e3, got %s and %s", got1.ID, got2.ID) } } func TestBroker_lastEventIDRespectsPathHierarchy(t *testing.T) { b, cancel := newTestBroker(100) defer cancel() b.Publish(&Event{ID: "e1", Path: "github.com/chrisguidry/docketeer", Payload: map[string]any{}}) b.Publish(&Event{ID: "e2", Path: "gitlab.com/other/project", Payload: map[string]any{}}) b.Publish(&Event{ID: "e3", Path: "github.com/chrisguidry/other", Payload: map[string]any{}}) ch, unsub := b.Subscribe("github.com/chrisguidry", "e1") defer unsub() got := mustReceive(t, ch, time.Second) if got.ID != "e3" { t.Errorf("expected e3 (same prefix), got %s", got.ID) } mustNotReceive(t, ch, 50*time.Millisecond) } func TestBroker_lastEventIDReplayExactPath(t *testing.T) { b, cancel := newTestBroker(100) defer cancel() b.Publish(&Event{ID: "e1", Path: "exact/path", Payload: map[string]any{}}) b.Publish(&Event{ID: "e2", Path: "exact/path/child", Payload: map[string]any{}}) b.Publish(&Event{ID: "e3", Path: "exact/path", Payload: map[string]any{}}) ch, unsub := b.Subscribe("exact/path", "e1") defer unsub() got1 := mustReceive(t, ch, time.Second) got2 := mustReceive(t, ch, time.Second) if got1.ID != "e2" { t.Errorf("expected e2, got %s", got1.ID) } if got2.ID != "e3" { t.Errorf("expected e3, got %s", got2.ID) } } func TestPathMatches(t *testing.T) { tests := []struct { subscribePath string eventPath string want bool }{ {"", "anything", true}, {"exact", "exact", true}, {"parent", "parent/child", true}, {"parent", "parentchild", false}, {"parent", "other", false}, } for _, tt := range tests { got := pathMatches(tt.subscribePath, tt.eventPath) if got != tt.want { t.Errorf("pathMatches(%q, %q) = %v, want %v", tt.subscribePath, tt.eventPath, got, tt.want) } } } func TestBroker_lastEventIDExpiredFromBuffer(t *testing.T) { b, cancel := newTestBroker(3) defer cancel() b.Publish(&Event{ID: "old1", Path: "test", Payload: map[string]any{}}) b.Publish(&Event{ID: "old2", Path: "test", Payload: map[string]any{}}) b.Publish(&Event{ID: "old3", Path: "test", Payload: map[string]any{}}) b.Publish(&Event{ID: "new1", Path: "test", Payload: map[string]any{}}) b.Publish(&Event{ID: "new2", Path: "test", Payload: map[string]any{}}) b.Publish(&Event{ID: "new3", Path: "test", Payload: map[string]any{}}) ch, unsub := b.Subscribe("test", "old1") defer unsub() got := mustReceive(t, ch, time.Second) if got.ID != "new1" { t.Errorf("expected new1 (oldest in buffer), got %s", got.ID) } }