Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 316 lines 8.7 kB view raw
1package main 2 3import ( 4 "context" 5 "testing" 6 "time" 7 8 "github.com/alicebob/miniredis/v2" 9 "github.com/redis/go-redis/v9" 10) 11 12func newTestRedisBackend(t *testing.T, bufferSize int) (*RedisBackend, *miniredis.Miniredis) { 13 t.Helper() 14 mr := miniredis.RunT(t) 15 client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 16 return newRedisBackendFromClient(client, bufferSize), mr 17} 18 19func TestRedisBackend_publishAndSubscribe(t *testing.T) { 20 backend, _ := newTestRedisBackend(t, 100) 21 defer backend.Close() 22 23 ctx, cancel := context.WithCancel(context.Background()) 24 defer cancel() 25 26 ch := backend.Subscribe(ctx) 27 time.Sleep(50 * time.Millisecond) 28 29 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{"ok": true}}) 30 31 select { 32 case got := <-ch: 33 if got.ID != "e1" { 34 t.Errorf("expected e1, got %s", got.ID) 35 } 36 if got.Path != "test" { 37 t.Errorf("expected path test, got %s", got.Path) 38 } 39 case <-time.After(2 * time.Second): 40 t.Fatal("timed out waiting for event") 41 } 42} 43 44func TestRedisBackend_since(t *testing.T) { 45 backend, _ := newTestRedisBackend(t, 100) 46 defer backend.Close() 47 48 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 49 backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 50 backend.Publish(&Event{ID: "e3", Path: "other", Payload: map[string]any{}}) 51 52 events := backend.Since("e1", "test") 53 if len(events) != 1 { 54 t.Fatalf("expected 1 event, got %d", len(events)) 55 } 56 if events[0].ID != "e2" { 57 t.Errorf("expected e2, got %s", events[0].ID) 58 } 59} 60 61func TestRedisBackend_sinceExpired(t *testing.T) { 62 backend, _ := newTestRedisBackend(t, 3) 63 defer backend.Close() 64 65 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 66 backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 67 backend.Publish(&Event{ID: "e3", Path: "test", Payload: map[string]any{}}) 68 backend.Publish(&Event{ID: "e4", Path: "test", Payload: map[string]any{}}) 69 70 events := backend.Since("e1", "test") 71 if len(events) != 3 { 72 t.Fatalf("expected 3 events (full buffer), got %d", len(events)) 73 } 74 if events[0].ID != "e2" { 75 t.Errorf("expected e2, got %s", events[0].ID) 76 } 77} 78 79func TestRedisBackend_sincePathFiltering(t *testing.T) { 80 backend, _ := newTestRedisBackend(t, 100) 81 defer backend.Close() 82 83 backend.Publish(&Event{ID: "e1", Path: "a/b", Payload: map[string]any{}}) 84 backend.Publish(&Event{ID: "e2", Path: "a/b/c", Payload: map[string]any{}}) 85 backend.Publish(&Event{ID: "e3", Path: "x/y", Payload: map[string]any{}}) 86 87 events := backend.Since("e1", "a/b") 88 if len(events) != 1 { 89 t.Fatalf("expected 1 event, got %d", len(events)) 90 } 91 if events[0].ID != "e2" { 92 t.Errorf("expected e2, got %s", events[0].ID) 93 } 94} 95 96func TestRedisBackend_multiReplica(t *testing.T) { 97 mr := miniredis.RunT(t) 98 99 client1 := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 100 backend1 := newRedisBackendFromClient(client1, 100) 101 defer backend1.Close() 102 103 client2 := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 104 backend2 := newRedisBackendFromClient(client2, 100) 105 defer backend2.Close() 106 107 ctx, cancel := context.WithCancel(context.Background()) 108 defer cancel() 109 110 ch2 := backend2.Subscribe(ctx) 111 time.Sleep(50 * time.Millisecond) 112 113 backend1.Publish(&Event{ID: "cross-replica", Path: "test", Payload: map[string]any{"from": "replica1"}}) 114 115 select { 116 case got := <-ch2: 117 if got.ID != "cross-replica" { 118 t.Errorf("expected cross-replica, got %s", got.ID) 119 } 120 case <-time.After(2 * time.Second): 121 t.Fatal("timed out waiting for cross-replica event") 122 } 123} 124 125func TestRedisBackend_multiReplicaReplay(t *testing.T) { 126 mr := miniredis.RunT(t) 127 128 client1 := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 129 backend1 := newRedisBackendFromClient(client1, 100) 130 defer backend1.Close() 131 132 backend1.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 133 backend1.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 134 135 client2 := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 136 backend2 := newRedisBackendFromClient(client2, 100) 137 defer backend2.Close() 138 139 events := backend2.Since("e1", "test") 140 if len(events) != 1 { 141 t.Fatalf("expected 1 event, got %d", len(events)) 142 } 143 if events[0].ID != "e2" { 144 t.Errorf("expected e2, got %s", events[0].ID) 145 } 146} 147 148func TestRedisBackend_closeIsIdempotent(t *testing.T) { 149 backend, _ := newTestRedisBackend(t, 100) 150 if err := backend.Close(); err != nil { 151 t.Errorf("first close: %v", err) 152 } 153 if err := backend.Close(); err != nil { 154 t.Errorf("second close: %v", err) 155 } 156} 157 158func TestRedisBackend_bufferTrims(t *testing.T) { 159 backend, _ := newTestRedisBackend(t, 3) 160 defer backend.Close() 161 162 for i := range 10 { 163 backend.Publish(&Event{ID: string(rune('a' + i)), Path: "test", Payload: map[string]any{}}) 164 } 165 166 events := backend.Since("", "test") 167 if len(events) != 3 { 168 t.Fatalf("expected 3 events in trimmed buffer, got %d", len(events)) 169 } 170} 171 172func TestRedisBackend_bufferHasTTL(t *testing.T) { 173 backend, mr := newTestRedisBackend(t, 100) 174 defer backend.Close() 175 176 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 177 178 ttl := mr.TTL(redisBufferKey) 179 if ttl <= 0 { 180 t.Fatalf("expected positive TTL on buffer key, got %v", ttl) 181 } 182 if ttl > 24*time.Hour { 183 t.Fatalf("expected TTL <= 24h, got %v", ttl) 184 } 185} 186 187func TestNewRedisBackend_success(t *testing.T) { 188 mr := miniredis.RunT(t) 189 backend, err := NewRedisBackend("redis://"+mr.Addr(), 100) 190 if err != nil { 191 t.Fatalf("expected no error, got %v", err) 192 } 193 defer backend.Close() 194} 195 196func TestNewRedisBackend_badURL(t *testing.T) { 197 _, err := NewRedisBackend("not-a-url", 100) 198 if err == nil { 199 t.Fatal("expected error for bad URL") 200 } 201} 202 203func TestNewRedisBackend_unreachable(t *testing.T) { 204 _, err := NewRedisBackend("redis://127.0.0.1:1", 100) 205 if err == nil { 206 t.Fatal("expected error for unreachable redis") 207 } 208} 209 210func TestRedisBackend_sinceWithCorruptData(t *testing.T) { 211 backend, mr := newTestRedisBackend(t, 100) 212 defer backend.Close() 213 214 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 215 mr.Lpush(redisBufferKey, "not-valid-json") 216 backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 217 218 events := backend.Since("e1", "test") 219 if len(events) != 1 { 220 t.Fatalf("expected 1 event (corrupt data skipped), got %d", len(events)) 221 } 222 if events[0].ID != "e2" { 223 t.Errorf("expected e2, got %s", events[0].ID) 224 } 225} 226 227func TestRedisBackend_subscribeContextCancel(t *testing.T) { 228 backend, _ := newTestRedisBackend(t, 100) 229 defer backend.Close() 230 231 ctx, cancel := context.WithCancel(context.Background()) 232 ch := backend.Subscribe(ctx) 233 cancel() 234 235 // Channel should close after context cancel 236 select { 237 case _, ok := <-ch: 238 if ok { 239 t.Error("expected channel to be closed") 240 } 241 case <-time.After(2 * time.Second): 242 t.Fatal("timed out waiting for channel close") 243 } 244} 245 246func TestRedisBackend_subscribeSkipsBadJSON(t *testing.T) { 247 backend, _ := newTestRedisBackend(t, 100) 248 defer backend.Close() 249 250 ctx, cancel := context.WithCancel(context.Background()) 251 defer cancel() 252 253 ch := backend.Subscribe(ctx) 254 time.Sleep(50 * time.Millisecond) 255 256 // Publish garbage directly to the Redis channel 257 backend.client.Publish(context.Background(), redisChannel, "not-json") 258 // Then a valid event 259 backend.Publish(&Event{ID: "valid", Path: "test", Payload: map[string]any{}}) 260 261 select { 262 case got := <-ch: 263 if got.ID != "valid" { 264 t.Errorf("expected valid, got %s", got.ID) 265 } 266 case <-time.After(2 * time.Second): 267 t.Fatal("timed out") 268 } 269} 270 271func TestRedisBackend_sinceClosedClient(t *testing.T) { 272 backend, _ := newTestRedisBackend(t, 100) 273 backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 274 backend.client.Close() 275 backend.closed = true 276 277 events := backend.Since("", "test") 278 if events != nil { 279 t.Errorf("expected nil, got %v", events) 280 } 281} 282 283func TestRedisBackend_subscribeNilMessage(t *testing.T) { 284 mr := miniredis.RunT(t) 285 client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 286 backend := newRedisBackendFromClient(client, 100) 287 288 ctx, cancel := context.WithCancel(context.Background()) 289 defer cancel() 290 291 ch := backend.Subscribe(ctx) 292 time.Sleep(50 * time.Millisecond) 293 294 // Close the client, which causes the pubsub channel to close 295 client.Close() 296 mr.Close() 297 298 select { 299 case _, ok := <-ch: 300 if ok { 301 t.Error("expected channel to be closed after client close") 302 } 303 case <-time.After(5 * time.Second): 304 t.Fatal("timed out waiting for channel close") 305 } 306} 307 308func TestRedisBackend_publishMarshalError(t *testing.T) { 309 backend, _ := newTestRedisBackend(t, 100) 310 defer backend.Close() 311 312 err := backend.Publish(&Event{ID: "bad", Path: "test", Payload: make(chan int)}) 313 if err == nil { 314 t.Fatal("expected marshal error") 315 } 316}