Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "context"
5 "fmt"
6 "testing"
7 "time"
8)
9
10func newTestEvent(path string) *Event {
11 return &Event{
12 ID: "test-" + path,
13 Timestamp: time.Now(),
14 Path: path,
15 Payload: map[string]any{"test": true},
16 }
17}
18
19func mustReceive(t *testing.T, ch <-chan *Event, timeout time.Duration) *Event {
20 t.Helper()
21 select {
22 case e := <-ch:
23 return e
24 case <-time.After(timeout):
25 t.Fatal("timed out waiting for event")
26 return nil
27 }
28}
29
30func mustNotReceive(t *testing.T, ch <-chan *Event, timeout time.Duration) {
31 t.Helper()
32 select {
33 case e := <-ch:
34 t.Fatalf("expected no event, got %+v", e)
35 case <-time.After(timeout):
36 }
37}
38
39func newTestBroker(bufferSize int) (*Broker, context.CancelFunc) {
40 backend := NewMemoryBackend(bufferSize)
41 broker := NewBroker(backend)
42 ctx, cancel := context.WithCancel(context.Background())
43 broker.Start(ctx)
44 return broker, cancel
45}
46
47func TestBroker_exactPathDelivery(t *testing.T) {
48 b, cancel := newTestBroker(100)
49 defer cancel()
50 ch, unsub := b.Subscribe("github.com/chrisguidry/docketeer", "")
51 defer unsub()
52
53 event := newTestEvent("github.com/chrisguidry/docketeer")
54 b.Publish(event)
55
56 got := mustReceive(t, ch, time.Second)
57 if got.ID != event.ID {
58 t.Errorf("expected event %s, got %s", event.ID, got.ID)
59 }
60}
61
62func TestBroker_parentReceivesChildEvents(t *testing.T) {
63 b, cancel := newTestBroker(100)
64 defer cancel()
65 ch, unsub := b.Subscribe("github.com/chrisguidry", "")
66 defer unsub()
67
68 event := newTestEvent("github.com/chrisguidry/docketeer")
69 b.Publish(event)
70
71 got := mustReceive(t, ch, time.Second)
72 if got.ID != event.ID {
73 t.Errorf("expected event %s, got %s", event.ID, got.ID)
74 }
75}
76
77func TestBroker_rootReceivesAll(t *testing.T) {
78 b, cancel := newTestBroker(100)
79 defer cancel()
80 ch, unsub := b.Subscribe("", "")
81 defer unsub()
82
83 event := newTestEvent("github.com/chrisguidry/docketeer")
84 b.Publish(event)
85
86 got := mustReceive(t, ch, time.Second)
87 if got.ID != event.ID {
88 t.Errorf("expected event %s, got %s", event.ID, got.ID)
89 }
90}
91
92func TestBroker_unrelatedSubscriberDoesNotReceive(t *testing.T) {
93 b, cancel := newTestBroker(100)
94 defer cancel()
95 ch, unsub := b.Subscribe("gitlab.com", "")
96 defer unsub()
97
98 b.Publish(newTestEvent("github.com/chrisguidry/docketeer"))
99
100 mustNotReceive(t, ch, 50*time.Millisecond)
101}
102
103func TestBroker_multipleSubscribersSamePath(t *testing.T) {
104 b, cancel := newTestBroker(100)
105 defer cancel()
106 ch1, unsub1 := b.Subscribe("github.com/chrisguidry/docketeer", "")
107 defer unsub1()
108 ch2, unsub2 := b.Subscribe("github.com/chrisguidry/docketeer", "")
109 defer unsub2()
110
111 b.Publish(newTestEvent("github.com/chrisguidry/docketeer"))
112
113 mustReceive(t, ch1, time.Second)
114 mustReceive(t, ch2, time.Second)
115}
116
117func TestBroker_unsubscribeStopsDelivery(t *testing.T) {
118 b, cancel := newTestBroker(100)
119 defer cancel()
120 ch, unsub := b.Subscribe("github.com/chrisguidry/docketeer", "")
121 unsub()
122
123 b.Publish(newTestEvent("github.com/chrisguidry/docketeer"))
124
125 select {
126 case _, ok := <-ch:
127 if ok {
128 t.Fatal("expected channel to be closed, but received an event")
129 }
130 case <-time.After(50 * time.Millisecond):
131 t.Fatal("expected channel to be closed")
132 }
133}
134
135func TestBroker_unsubscribeCleansUpEmptyPath(t *testing.T) {
136 b, cancel := newTestBroker(100)
137 defer cancel()
138
139 _, unsub := b.Subscribe("ephemeral/path", "")
140
141 b.mu.RLock()
142 _, exists := b.subscribers["ephemeral/path"]
143 b.mu.RUnlock()
144 if !exists {
145 t.Fatal("expected subscriber map entry to exist")
146 }
147
148 unsub()
149
150 b.mu.RLock()
151 _, exists = b.subscribers["ephemeral/path"]
152 b.mu.RUnlock()
153 if exists {
154 t.Fatal("expected subscriber map entry to be deleted after last unsub")
155 }
156}
157
158func TestBroker_ringBufferWraps(t *testing.T) {
159 b, cancel := newTestBroker(5)
160 defer cancel()
161 for i := range 10 {
162 b.Publish(&Event{
163 ID: fmt.Sprintf("event-%d", i),
164 Path: "test",
165 Payload: map[string]any{},
166 })
167 }
168
169 ch, unsub := b.Subscribe("test", "event-4")
170 defer unsub()
171
172 for i := 5; i < 10; i++ {
173 got := mustReceive(t, ch, time.Second)
174 expected := fmt.Sprintf("event-%d", i)
175 if got.ID != expected {
176 t.Errorf("expected %s, got %s", expected, got.ID)
177 }
178 }
179}
180
181func TestBroker_lastEventIDReplay(t *testing.T) {
182 b, cancel := newTestBroker(100)
183 defer cancel()
184 b.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}})
185 b.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}})
186 b.Publish(&Event{ID: "e3", Path: "test", Payload: map[string]any{}})
187
188 ch, unsub := b.Subscribe("test", "e1")
189 defer unsub()
190
191 got1 := mustReceive(t, ch, time.Second)
192 got2 := mustReceive(t, ch, time.Second)
193 if got1.ID != "e2" || got2.ID != "e3" {
194 t.Errorf("expected e2 and e3, got %s and %s", got1.ID, got2.ID)
195 }
196}
197
198func TestBroker_lastEventIDRespectsPathHierarchy(t *testing.T) {
199 b, cancel := newTestBroker(100)
200 defer cancel()
201 b.Publish(&Event{ID: "e1", Path: "github.com/chrisguidry/docketeer", Payload: map[string]any{}})
202 b.Publish(&Event{ID: "e2", Path: "gitlab.com/other/project", Payload: map[string]any{}})
203 b.Publish(&Event{ID: "e3", Path: "github.com/chrisguidry/other", Payload: map[string]any{}})
204
205 ch, unsub := b.Subscribe("github.com/chrisguidry", "e1")
206 defer unsub()
207
208 got := mustReceive(t, ch, time.Second)
209 if got.ID != "e3" {
210 t.Errorf("expected e3 (same prefix), got %s", got.ID)
211 }
212 mustNotReceive(t, ch, 50*time.Millisecond)
213}
214
215func TestBroker_lastEventIDReplayExactPath(t *testing.T) {
216 b, cancel := newTestBroker(100)
217 defer cancel()
218 b.Publish(&Event{ID: "e1", Path: "exact/path", Payload: map[string]any{}})
219 b.Publish(&Event{ID: "e2", Path: "exact/path/child", Payload: map[string]any{}})
220 b.Publish(&Event{ID: "e3", Path: "exact/path", Payload: map[string]any{}})
221
222 ch, unsub := b.Subscribe("exact/path", "e1")
223 defer unsub()
224
225 got1 := mustReceive(t, ch, time.Second)
226 got2 := mustReceive(t, ch, time.Second)
227 if got1.ID != "e2" {
228 t.Errorf("expected e2, got %s", got1.ID)
229 }
230 if got2.ID != "e3" {
231 t.Errorf("expected e3, got %s", got2.ID)
232 }
233}
234
235func TestPathMatches(t *testing.T) {
236 tests := []struct {
237 subscribePath string
238 eventPath string
239 want bool
240 }{
241 {"", "anything", true},
242 {"exact", "exact", true},
243 {"parent", "parent/child", true},
244 {"parent", "parentchild", false},
245 {"parent", "other", false},
246 }
247 for _, tt := range tests {
248 got := pathMatches(tt.subscribePath, tt.eventPath)
249 if got != tt.want {
250 t.Errorf("pathMatches(%q, %q) = %v, want %v", tt.subscribePath, tt.eventPath, got, tt.want)
251 }
252 }
253}
254
255func TestBroker_lastEventIDExpiredFromBuffer(t *testing.T) {
256 b, cancel := newTestBroker(3)
257 defer cancel()
258 b.Publish(&Event{ID: "old1", Path: "test", Payload: map[string]any{}})
259 b.Publish(&Event{ID: "old2", Path: "test", Payload: map[string]any{}})
260 b.Publish(&Event{ID: "old3", Path: "test", Payload: map[string]any{}})
261 b.Publish(&Event{ID: "new1", Path: "test", Payload: map[string]any{}})
262 b.Publish(&Event{ID: "new2", Path: "test", Payload: map[string]any{}})
263 b.Publish(&Event{ID: "new3", Path: "test", Payload: map[string]any{}})
264
265 ch, unsub := b.Subscribe("test", "old1")
266 defer unsub()
267
268 got := mustReceive(t, ch, time.Second)
269 if got.ID != "new1" {
270 t.Errorf("expected new1 (oldest in buffer), got %s", got.ID)
271 }
272}