Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "context"
5 "testing"
6 "time"
7
8 "github.com/alicebob/miniredis/v2"
9 "github.com/redis/go-redis/v9"
10)
11
12func newTestRedisBackend(t *testing.T, bufferSize int) (*RedisBackend, *miniredis.Miniredis) {
13 t.Helper()
14 mr := miniredis.RunT(t)
15 client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
16 return newRedisBackendFromClient(client, bufferSize), mr
17}
18
19func TestRedisBackend_publishAndSubscribe(t *testing.T) {
20 backend, _ := newTestRedisBackend(t, 100)
21 defer backend.Close()
22
23 ctx, cancel := context.WithCancel(context.Background())
24 defer cancel()
25
26 ch := backend.Subscribe(ctx)
27 time.Sleep(50 * time.Millisecond)
28
29 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{"ok": true}})
30
31 select {
32 case got := <-ch:
33 if got.ID != "e1" {
34 t.Errorf("expected e1, got %s", got.ID)
35 }
36 if got.Path != "test" {
37 t.Errorf("expected path test, got %s", got.Path)
38 }
39 case <-time.After(2 * time.Second):
40 t.Fatal("timed out waiting for event")
41 }
42}
43
44func TestRedisBackend_since(t *testing.T) {
45 backend, _ := newTestRedisBackend(t, 100)
46 defer backend.Close()
47
48 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}})
49 backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}})
50 backend.Publish(&Event{ID: "e3", Path: "other", Payload: map[string]any{}})
51
52 events := backend.Since("e1", "test")
53 if len(events) != 1 {
54 t.Fatalf("expected 1 event, got %d", len(events))
55 }
56 if events[0].ID != "e2" {
57 t.Errorf("expected e2, got %s", events[0].ID)
58 }
59}
60
61func TestRedisBackend_sinceExpired(t *testing.T) {
62 backend, _ := newTestRedisBackend(t, 3)
63 defer backend.Close()
64
65 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}})
66 backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}})
67 backend.Publish(&Event{ID: "e3", Path: "test", Payload: map[string]any{}})
68 backend.Publish(&Event{ID: "e4", Path: "test", Payload: map[string]any{}})
69
70 events := backend.Since("e1", "test")
71 if len(events) != 3 {
72 t.Fatalf("expected 3 events (full buffer), got %d", len(events))
73 }
74 if events[0].ID != "e2" {
75 t.Errorf("expected e2, got %s", events[0].ID)
76 }
77}
78
79func TestRedisBackend_sincePathFiltering(t *testing.T) {
80 backend, _ := newTestRedisBackend(t, 100)
81 defer backend.Close()
82
83 backend.Publish(&Event{ID: "e1", Path: "a/b", Payload: map[string]any{}})
84 backend.Publish(&Event{ID: "e2", Path: "a/b/c", Payload: map[string]any{}})
85 backend.Publish(&Event{ID: "e3", Path: "x/y", Payload: map[string]any{}})
86
87 events := backend.Since("e1", "a/b")
88 if len(events) != 1 {
89 t.Fatalf("expected 1 event, got %d", len(events))
90 }
91 if events[0].ID != "e2" {
92 t.Errorf("expected e2, got %s", events[0].ID)
93 }
94}
95
96func TestRedisBackend_multiReplica(t *testing.T) {
97 mr := miniredis.RunT(t)
98
99 client1 := redis.NewClient(&redis.Options{Addr: mr.Addr()})
100 backend1 := newRedisBackendFromClient(client1, 100)
101 defer backend1.Close()
102
103 client2 := redis.NewClient(&redis.Options{Addr: mr.Addr()})
104 backend2 := newRedisBackendFromClient(client2, 100)
105 defer backend2.Close()
106
107 ctx, cancel := context.WithCancel(context.Background())
108 defer cancel()
109
110 ch2 := backend2.Subscribe(ctx)
111 time.Sleep(50 * time.Millisecond)
112
113 backend1.Publish(&Event{ID: "cross-replica", Path: "test", Payload: map[string]any{"from": "replica1"}})
114
115 select {
116 case got := <-ch2:
117 if got.ID != "cross-replica" {
118 t.Errorf("expected cross-replica, got %s", got.ID)
119 }
120 case <-time.After(2 * time.Second):
121 t.Fatal("timed out waiting for cross-replica event")
122 }
123}
124
125func TestRedisBackend_multiReplicaReplay(t *testing.T) {
126 mr := miniredis.RunT(t)
127
128 client1 := redis.NewClient(&redis.Options{Addr: mr.Addr()})
129 backend1 := newRedisBackendFromClient(client1, 100)
130 defer backend1.Close()
131
132 backend1.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}})
133 backend1.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}})
134
135 client2 := redis.NewClient(&redis.Options{Addr: mr.Addr()})
136 backend2 := newRedisBackendFromClient(client2, 100)
137 defer backend2.Close()
138
139 events := backend2.Since("e1", "test")
140 if len(events) != 1 {
141 t.Fatalf("expected 1 event, got %d", len(events))
142 }
143 if events[0].ID != "e2" {
144 t.Errorf("expected e2, got %s", events[0].ID)
145 }
146}
147
148func TestRedisBackend_closeIsIdempotent(t *testing.T) {
149 backend, _ := newTestRedisBackend(t, 100)
150 if err := backend.Close(); err != nil {
151 t.Errorf("first close: %v", err)
152 }
153 if err := backend.Close(); err != nil {
154 t.Errorf("second close: %v", err)
155 }
156}
157
158func TestRedisBackend_bufferTrims(t *testing.T) {
159 backend, _ := newTestRedisBackend(t, 3)
160 defer backend.Close()
161
162 for i := range 10 {
163 backend.Publish(&Event{ID: string(rune('a' + i)), Path: "test", Payload: map[string]any{}})
164 }
165
166 events := backend.Since("", "test")
167 if len(events) != 3 {
168 t.Fatalf("expected 3 events in trimmed buffer, got %d", len(events))
169 }
170}
171
172func TestRedisBackend_bufferHasTTL(t *testing.T) {
173 backend, mr := newTestRedisBackend(t, 100)
174 defer backend.Close()
175
176 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}})
177
178 ttl := mr.TTL(redisBufferKey)
179 if ttl <= 0 {
180 t.Fatalf("expected positive TTL on buffer key, got %v", ttl)
181 }
182 if ttl > 24*time.Hour {
183 t.Fatalf("expected TTL <= 24h, got %v", ttl)
184 }
185}
186
187func TestNewRedisBackend_success(t *testing.T) {
188 mr := miniredis.RunT(t)
189 backend, err := NewRedisBackend("redis://"+mr.Addr(), 100)
190 if err != nil {
191 t.Fatalf("expected no error, got %v", err)
192 }
193 defer backend.Close()
194}
195
196func TestNewRedisBackend_badURL(t *testing.T) {
197 _, err := NewRedisBackend("not-a-url", 100)
198 if err == nil {
199 t.Fatal("expected error for bad URL")
200 }
201}
202
203func TestNewRedisBackend_unreachable(t *testing.T) {
204 _, err := NewRedisBackend("redis://127.0.0.1:1", 100)
205 if err == nil {
206 t.Fatal("expected error for unreachable redis")
207 }
208}
209
210func TestRedisBackend_sinceWithCorruptData(t *testing.T) {
211 backend, mr := newTestRedisBackend(t, 100)
212 defer backend.Close()
213
214 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}})
215 mr.Lpush(redisBufferKey, "not-valid-json")
216 backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}})
217
218 events := backend.Since("e1", "test")
219 if len(events) != 1 {
220 t.Fatalf("expected 1 event (corrupt data skipped), got %d", len(events))
221 }
222 if events[0].ID != "e2" {
223 t.Errorf("expected e2, got %s", events[0].ID)
224 }
225}
226
227func TestRedisBackend_subscribeContextCancel(t *testing.T) {
228 backend, _ := newTestRedisBackend(t, 100)
229 defer backend.Close()
230
231 ctx, cancel := context.WithCancel(context.Background())
232 ch := backend.Subscribe(ctx)
233 cancel()
234
235 // Channel should close after context cancel
236 select {
237 case _, ok := <-ch:
238 if ok {
239 t.Error("expected channel to be closed")
240 }
241 case <-time.After(2 * time.Second):
242 t.Fatal("timed out waiting for channel close")
243 }
244}
245
246func TestRedisBackend_subscribeSkipsBadJSON(t *testing.T) {
247 backend, _ := newTestRedisBackend(t, 100)
248 defer backend.Close()
249
250 ctx, cancel := context.WithCancel(context.Background())
251 defer cancel()
252
253 ch := backend.Subscribe(ctx)
254 time.Sleep(50 * time.Millisecond)
255
256 // Publish garbage directly to the Redis channel
257 backend.client.Publish(context.Background(), redisChannel, "not-json")
258 // Then a valid event
259 backend.Publish(&Event{ID: "valid", Path: "test", Payload: map[string]any{}})
260
261 select {
262 case got := <-ch:
263 if got.ID != "valid" {
264 t.Errorf("expected valid, got %s", got.ID)
265 }
266 case <-time.After(2 * time.Second):
267 t.Fatal("timed out")
268 }
269}
270
271func TestRedisBackend_sinceClosedClient(t *testing.T) {
272 backend, _ := newTestRedisBackend(t, 100)
273 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}})
274 backend.client.Close()
275 backend.closed = true
276
277 events := backend.Since("", "test")
278 if events != nil {
279 t.Errorf("expected nil, got %v", events)
280 }
281}
282
283func TestRedisBackend_subscribeNilMessage(t *testing.T) {
284 mr := miniredis.RunT(t)
285 client := redis.NewClient(&redis.Options{Addr: mr.Addr()})
286 backend := newRedisBackendFromClient(client, 100)
287
288 ctx, cancel := context.WithCancel(context.Background())
289 defer cancel()
290
291 ch := backend.Subscribe(ctx)
292 time.Sleep(50 * time.Millisecond)
293
294 // Close the client, which causes the pubsub channel to close
295 client.Close()
296 mr.Close()
297
298 select {
299 case _, ok := <-ch:
300 if ok {
301 t.Error("expected channel to be closed after client close")
302 }
303 case <-time.After(5 * time.Second):
304 t.Fatal("timed out waiting for channel close")
305 }
306}
307
308func TestRedisBackend_publishMarshalError(t *testing.T) {
309 backend, _ := newTestRedisBackend(t, 100)
310 defer backend.Close()
311
312 err := backend.Publish(&Event{ID: "bad", Path: "test", Payload: make(chan int)})
313 if err == nil {
314 t.Fatal("expected marshal error")
315 }
316}