Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 272 lines 7.2 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "testing" 7 "time" 8) 9 10func newTestEvent(path string) *Event { 11 return &Event{ 12 ID: "test-" + path, 13 Timestamp: time.Now(), 14 Path: path, 15 Payload: map[string]any{"test": true}, 16 } 17} 18 19func mustReceive(t *testing.T, ch <-chan *Event, timeout time.Duration) *Event { 20 t.Helper() 21 select { 22 case e := <-ch: 23 return e 24 case <-time.After(timeout): 25 t.Fatal("timed out waiting for event") 26 return nil 27 } 28} 29 30func mustNotReceive(t *testing.T, ch <-chan *Event, timeout time.Duration) { 31 t.Helper() 32 select { 33 case e := <-ch: 34 t.Fatalf("expected no event, got %+v", e) 35 case <-time.After(timeout): 36 } 37} 38 39func newTestBroker(bufferSize int) (*Broker, context.CancelFunc) { 40 backend := NewMemoryBackend(bufferSize) 41 broker := NewBroker(backend) 42 ctx, cancel := context.WithCancel(context.Background()) 43 broker.Start(ctx) 44 return broker, cancel 45} 46 47func TestBroker_exactPathDelivery(t *testing.T) { 48 b, cancel := newTestBroker(100) 49 defer cancel() 50 ch, unsub := b.Subscribe("github.com/chrisguidry/docketeer", "") 51 defer unsub() 52 53 event := newTestEvent("github.com/chrisguidry/docketeer") 54 b.Publish(event) 55 56 got := mustReceive(t, ch, time.Second) 57 if got.ID != event.ID { 58 t.Errorf("expected event %s, got %s", event.ID, got.ID) 59 } 60} 61 62func TestBroker_parentReceivesChildEvents(t *testing.T) { 63 b, cancel := newTestBroker(100) 64 defer cancel() 65 ch, unsub := b.Subscribe("github.com/chrisguidry", "") 66 defer unsub() 67 68 event := newTestEvent("github.com/chrisguidry/docketeer") 69 b.Publish(event) 70 71 got := mustReceive(t, ch, time.Second) 72 if got.ID != event.ID { 73 t.Errorf("expected event %s, got %s", event.ID, got.ID) 74 } 75} 76 77func TestBroker_rootReceivesAll(t *testing.T) { 78 b, cancel := newTestBroker(100) 79 defer cancel() 80 ch, unsub := b.Subscribe("", "") 81 defer unsub() 82 83 event := newTestEvent("github.com/chrisguidry/docketeer") 84 b.Publish(event) 85 86 got := mustReceive(t, ch, time.Second) 87 if got.ID != event.ID { 88 t.Errorf("expected event %s, got %s", event.ID, got.ID) 89 } 90} 91 92func TestBroker_unrelatedSubscriberDoesNotReceive(t *testing.T) { 93 b, cancel := newTestBroker(100) 94 defer cancel() 95 ch, unsub := b.Subscribe("gitlab.com", "") 96 defer unsub() 97 98 b.Publish(newTestEvent("github.com/chrisguidry/docketeer")) 99 100 mustNotReceive(t, ch, 50*time.Millisecond) 101} 102 103func TestBroker_multipleSubscribersSamePath(t *testing.T) { 104 b, cancel := newTestBroker(100) 105 defer cancel() 106 ch1, unsub1 := b.Subscribe("github.com/chrisguidry/docketeer", "") 107 defer unsub1() 108 ch2, unsub2 := b.Subscribe("github.com/chrisguidry/docketeer", "") 109 defer unsub2() 110 111 b.Publish(newTestEvent("github.com/chrisguidry/docketeer")) 112 113 mustReceive(t, ch1, time.Second) 114 mustReceive(t, ch2, time.Second) 115} 116 117func TestBroker_unsubscribeStopsDelivery(t *testing.T) { 118 b, cancel := newTestBroker(100) 119 defer cancel() 120 ch, unsub := b.Subscribe("github.com/chrisguidry/docketeer", "") 121 unsub() 122 123 b.Publish(newTestEvent("github.com/chrisguidry/docketeer")) 124 125 select { 126 case _, ok := <-ch: 127 if ok { 128 t.Fatal("expected channel to be closed, but received an event") 129 } 130 case <-time.After(50 * time.Millisecond): 131 t.Fatal("expected channel to be closed") 132 } 133} 134 135func TestBroker_unsubscribeCleansUpEmptyPath(t *testing.T) { 136 b, cancel := newTestBroker(100) 137 defer cancel() 138 139 _, unsub := b.Subscribe("ephemeral/path", "") 140 141 b.mu.RLock() 142 _, exists := b.subscribers["ephemeral/path"] 143 b.mu.RUnlock() 144 if !exists { 145 t.Fatal("expected subscriber map entry to exist") 146 } 147 148 unsub() 149 150 b.mu.RLock() 151 _, exists = b.subscribers["ephemeral/path"] 152 b.mu.RUnlock() 153 if exists { 154 t.Fatal("expected subscriber map entry to be deleted after last unsub") 155 } 156} 157 158func TestBroker_ringBufferWraps(t *testing.T) { 159 b, cancel := newTestBroker(5) 160 defer cancel() 161 for i := range 10 { 162 b.Publish(&Event{ 163 ID: fmt.Sprintf("event-%d", i), 164 Path: "test", 165 Payload: map[string]any{}, 166 }) 167 } 168 169 ch, unsub := b.Subscribe("test", "event-4") 170 defer unsub() 171 172 for i := 5; i < 10; i++ { 173 got := mustReceive(t, ch, time.Second) 174 expected := fmt.Sprintf("event-%d", i) 175 if got.ID != expected { 176 t.Errorf("expected %s, got %s", expected, got.ID) 177 } 178 } 179} 180 181func TestBroker_lastEventIDReplay(t *testing.T) { 182 b, cancel := newTestBroker(100) 183 defer cancel() 184 b.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 185 b.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 186 b.Publish(&Event{ID: "e3", Path: "test", Payload: map[string]any{}}) 187 188 ch, unsub := b.Subscribe("test", "e1") 189 defer unsub() 190 191 got1 := mustReceive(t, ch, time.Second) 192 got2 := mustReceive(t, ch, time.Second) 193 if got1.ID != "e2" || got2.ID != "e3" { 194 t.Errorf("expected e2 and e3, got %s and %s", got1.ID, got2.ID) 195 } 196} 197 198func TestBroker_lastEventIDRespectsPathHierarchy(t *testing.T) { 199 b, cancel := newTestBroker(100) 200 defer cancel() 201 b.Publish(&Event{ID: "e1", Path: "github.com/chrisguidry/docketeer", Payload: map[string]any{}}) 202 b.Publish(&Event{ID: "e2", Path: "gitlab.com/other/project", Payload: map[string]any{}}) 203 b.Publish(&Event{ID: "e3", Path: "github.com/chrisguidry/other", Payload: map[string]any{}}) 204 205 ch, unsub := b.Subscribe("github.com/chrisguidry", "e1") 206 defer unsub() 207 208 got := mustReceive(t, ch, time.Second) 209 if got.ID != "e3" { 210 t.Errorf("expected e3 (same prefix), got %s", got.ID) 211 } 212 mustNotReceive(t, ch, 50*time.Millisecond) 213} 214 215func TestBroker_lastEventIDReplayExactPath(t *testing.T) { 216 b, cancel := newTestBroker(100) 217 defer cancel() 218 b.Publish(&Event{ID: "e1", Path: "exact/path", Payload: map[string]any{}}) 219 b.Publish(&Event{ID: "e2", Path: "exact/path/child", Payload: map[string]any{}}) 220 b.Publish(&Event{ID: "e3", Path: "exact/path", Payload: map[string]any{}}) 221 222 ch, unsub := b.Subscribe("exact/path", "e1") 223 defer unsub() 224 225 got1 := mustReceive(t, ch, time.Second) 226 got2 := mustReceive(t, ch, time.Second) 227 if got1.ID != "e2" { 228 t.Errorf("expected e2, got %s", got1.ID) 229 } 230 if got2.ID != "e3" { 231 t.Errorf("expected e3, got %s", got2.ID) 232 } 233} 234 235func TestPathMatches(t *testing.T) { 236 tests := []struct { 237 subscribePath string 238 eventPath string 239 want bool 240 }{ 241 {"", "anything", true}, 242 {"exact", "exact", true}, 243 {"parent", "parent/child", true}, 244 {"parent", "parentchild", false}, 245 {"parent", "other", false}, 246 } 247 for _, tt := range tests { 248 got := pathMatches(tt.subscribePath, tt.eventPath) 249 if got != tt.want { 250 t.Errorf("pathMatches(%q, %q) = %v, want %v", tt.subscribePath, tt.eventPath, got, tt.want) 251 } 252 } 253} 254 255func TestBroker_lastEventIDExpiredFromBuffer(t *testing.T) { 256 b, cancel := newTestBroker(3) 257 defer cancel() 258 b.Publish(&Event{ID: "old1", Path: "test", Payload: map[string]any{}}) 259 b.Publish(&Event{ID: "old2", Path: "test", Payload: map[string]any{}}) 260 b.Publish(&Event{ID: "old3", Path: "test", Payload: map[string]any{}}) 261 b.Publish(&Event{ID: "new1", Path: "test", Payload: map[string]any{}}) 262 b.Publish(&Event{ID: "new2", Path: "test", Payload: map[string]any{}}) 263 b.Publish(&Event{ID: "new3", Path: "test", Payload: map[string]any{}}) 264 265 ch, unsub := b.Subscribe("test", "old1") 266 defer unsub() 267 268 got := mustReceive(t, ch, time.Second) 269 if got.ID != "new1" { 270 t.Errorf("expected new1 (oldest in buffer), got %s", got.ID) 271 } 272}