Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 422 lines 11 kB view raw
1package main 2 3import ( 4 "context" 5 "crypto/hmac" 6 "crypto/sha256" 7 "encoding/hex" 8 "fmt" 9 "net/http" 10 "net/http/httptest" 11 "strings" 12 "sync/atomic" 13 "testing" 14 "time" 15) 16 17func newTestServer(cfg *Configuration) (*httptest.Server, *Broker, context.CancelFunc) { 18 backend := NewMemoryBackend(100) 19 broker := NewBroker(backend) 20 ctx, cancel := context.WithCancel(context.Background()) 21 broker.Start(ctx) 22 var cfgPtr atomic.Pointer[Configuration] 23 if cfg != nil { 24 cfgPtr.Store(cfg) 25 } 26 handler := NewServer(broker, &cfgPtr) 27 return httptest.NewServer(handler), broker, cancel 28} 29 30func TestServer_rootBanner(t *testing.T) { 31 ts, _, cancel := newTestServer(nil) 32 defer cancel() 33 defer ts.Close() 34 35 resp, err := http.Get(ts.URL + "/") 36 if err != nil { 37 t.Fatalf("GET / failed: %v", err) 38 } 39 defer resp.Body.Close() 40 if resp.StatusCode != http.StatusOK { 41 t.Errorf("expected 200, got %d", resp.StatusCode) 42 } 43 if ct := resp.Header.Get("Content-Type"); ct != "text/plain" { 44 t.Errorf("expected text/plain, got %s", ct) 45 } 46 buf := make([]byte, 1024) 47 n, _ := resp.Body.Read(buf) 48 body := string(buf[:n]) 49 if !strings.Contains(body, "wicket") { 50 t.Errorf("expected body to contain 'wicket', got %q", body) 51 } 52} 53 54func TestServer_healthEndpoint(t *testing.T) { 55 ts, _, cancel := newTestServer(nil) 56 defer cancel() 57 defer ts.Close() 58 59 resp, err := http.Get(ts.URL + "/_health") 60 if err != nil { 61 t.Fatalf("GET /_health failed: %v", err) 62 } 63 defer resp.Body.Close() 64 if resp.StatusCode != http.StatusNoContent { 65 t.Errorf("expected 204, got %d", resp.StatusCode) 66 } 67 buf := make([]byte, 1) 68 n, _ := resp.Body.Read(buf) 69 if n != 0 { 70 t.Errorf("expected empty body, got %d bytes", n) 71 } 72} 73 74func TestServer_postPublishesEvent(t *testing.T) { 75 ts, _, cancel := newTestServer(nil) 76 defer cancel() 77 defer ts.Close() 78 79 resp, err := http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"hello":"world"}`)) 80 if err != nil { 81 t.Fatalf("POST failed: %v", err) 82 } 83 defer resp.Body.Close() 84 if resp.StatusCode != http.StatusAccepted { 85 t.Errorf("expected 202, got %d", resp.StatusCode) 86 } 87} 88 89func TestServer_postTrailingSlashNormalized(t *testing.T) { 90 ts, broker, cancel := newTestServer(nil) 91 defer cancel() 92 defer ts.Close() 93 94 ch, unsub := broker.Subscribe("test/topic", "") 95 defer unsub() 96 97 http.Post(ts.URL+"/test/topic/", "application/json", strings.NewReader(`{}`)) 98 99 select { 100 case event := <-ch: 101 if event.Path != "test/topic" { 102 t.Errorf("expected normalized path test/topic, got %s", event.Path) 103 } 104 case <-time.After(time.Second): 105 t.Fatal("timed out: POST with trailing slash should deliver to normalized path") 106 } 107} 108 109func TestServer_postWithValidHMAC(t *testing.T) { 110 cfg := &Configuration{ 111 Paths: map[string]PathConfiguration{ 112 "secure/path": { 113 Verify: "hmac-sha256", 114 Secret: "test-secret", 115 SignatureHeader: "X-Hub-Signature-256", 116 }, 117 }, 118 } 119 ts, _, cancel := newTestServer(cfg) 120 defer cancel() 121 defer ts.Close() 122 123 body := `{"action":"push"}` 124 mac := hmac.New(sha256.New, []byte("test-secret")) 125 mac.Write([]byte(body)) 126 sig := "sha256=" + hex.EncodeToString(mac.Sum(nil)) 127 128 req, _ := http.NewRequest("POST", ts.URL+"/secure/path", strings.NewReader(body)) 129 req.Header.Set("Content-Type", "application/json") 130 req.Header.Set("X-Hub-Signature-256", sig) 131 resp, err := http.DefaultClient.Do(req) 132 if err != nil { 133 t.Fatalf("POST failed: %v", err) 134 } 135 defer resp.Body.Close() 136 if resp.StatusCode != http.StatusAccepted { 137 t.Errorf("expected 202, got %d", resp.StatusCode) 138 } 139} 140 141func TestServer_postWithInvalidSignature(t *testing.T) { 142 cfg := &Configuration{ 143 Paths: map[string]PathConfiguration{ 144 "secure/path": { 145 Verify: "hmac-sha256", 146 Secret: "test-secret", 147 SignatureHeader: "X-Hub-Signature-256", 148 }, 149 }, 150 } 151 ts, _, cancel := newTestServer(cfg) 152 defer cancel() 153 defer ts.Close() 154 155 req, _ := http.NewRequest("POST", ts.URL+"/secure/path", strings.NewReader(`{"bad":"data"}`)) 156 req.Header.Set("Content-Type", "application/json") 157 req.Header.Set("X-Hub-Signature-256", "sha256=deadbeef") 158 resp, err := http.DefaultClient.Do(req) 159 if err != nil { 160 t.Fatalf("POST failed: %v", err) 161 } 162 defer resp.Body.Close() 163 if resp.StatusCode != http.StatusForbidden { 164 t.Errorf("expected 403, got %d", resp.StatusCode) 165 } 166} 167 168func TestServer_postToUnconfiguredPath(t *testing.T) { 169 cfg := &Configuration{ 170 Paths: map[string]PathConfiguration{ 171 "secure/path": { 172 Verify: "hmac-sha256", 173 Secret: "test-secret", 174 SignatureHeader: "X-Hub-Signature-256", 175 }, 176 }, 177 } 178 ts, _, cancel := newTestServer(cfg) 179 defer cancel() 180 defer ts.Close() 181 182 resp, err := http.Post(ts.URL+"/open/path", "application/json", strings.NewReader(`{"ok":true}`)) 183 if err != nil { 184 t.Fatalf("POST failed: %v", err) 185 } 186 defer resp.Body.Close() 187 if resp.StatusCode != http.StatusAccepted { 188 t.Errorf("expected 202, got %d", resp.StatusCode) 189 } 190} 191 192func TestServer_getWithoutSSEAccept(t *testing.T) { 193 ts, _, cancel := newTestServer(nil) 194 defer cancel() 195 defer ts.Close() 196 197 resp, err := http.Get(ts.URL + "/test/topic") 198 if err != nil { 199 t.Fatalf("GET failed: %v", err) 200 } 201 defer resp.Body.Close() 202 if resp.StatusCode != http.StatusNotFound { 203 t.Errorf("expected 404, got %d", resp.StatusCode) 204 } 205} 206 207func TestServer_corsHeaders(t *testing.T) { 208 ts, _, cancel := newTestServer(nil) 209 defer cancel() 210 defer ts.Close() 211 212 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{}`)) 213 if err != nil { 214 t.Fatalf("POST failed: %v", err) 215 } 216 defer resp.Body.Close() 217 218 if resp.Header.Get("Access-Control-Allow-Origin") != "*" { 219 t.Error("missing CORS Allow-Origin header") 220 } 221} 222 223func TestServer_optionsPreflight(t *testing.T) { 224 ts, _, cancel := newTestServer(nil) 225 defer cancel() 226 defer ts.Close() 227 228 req, _ := http.NewRequest("OPTIONS", ts.URL+"/test", nil) 229 resp, err := http.DefaultClient.Do(req) 230 if err != nil { 231 t.Fatalf("OPTIONS failed: %v", err) 232 } 233 defer resp.Body.Close() 234 if resp.StatusCode != http.StatusNoContent { 235 t.Errorf("expected 204, got %d", resp.StatusCode) 236 } 237 if resp.Header.Get("Access-Control-Allow-Methods") == "" { 238 t.Error("missing CORS Allow-Methods header") 239 } 240} 241 242func TestServer_methodNotAllowed(t *testing.T) { 243 ts, _, cancel := newTestServer(nil) 244 defer cancel() 245 defer ts.Close() 246 247 req, _ := http.NewRequest("DELETE", ts.URL+"/test", nil) 248 resp, err := http.DefaultClient.Do(req) 249 if err != nil { 250 t.Fatalf("DELETE failed: %v", err) 251 } 252 defer resp.Body.Close() 253 if resp.StatusCode != http.StatusMethodNotAllowed { 254 t.Errorf("expected 405, got %d", resp.StatusCode) 255 } 256} 257 258func TestServer_postWithBadVerifierConfig(t *testing.T) { 259 cfg := &Configuration{ 260 Paths: map[string]PathConfiguration{ 261 "bad/path": { 262 Verify: "unknown-method", 263 Secret: "secret", 264 SignatureHeader: "X-Signature", 265 }, 266 }, 267 } 268 ts, _, cancel := newTestServer(cfg) 269 defer cancel() 270 defer ts.Close() 271 272 req, _ := http.NewRequest("POST", ts.URL+"/bad/path", strings.NewReader(`{}`)) 273 req.Header.Set("Content-Type", "application/json") 274 resp, err := http.DefaultClient.Do(req) 275 if err != nil { 276 t.Fatalf("POST failed: %v", err) 277 } 278 defer resp.Body.Close() 279 if resp.StatusCode != http.StatusInternalServerError { 280 t.Errorf("expected 500, got %d", resp.StatusCode) 281 } 282} 283 284func TestServer_postInvalidJSON(t *testing.T) { 285 ts, _, cancel := newTestServer(nil) 286 defer cancel() 287 defer ts.Close() 288 289 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{not json`)) 290 if err != nil { 291 t.Fatalf("POST failed: %v", err) 292 } 293 defer resp.Body.Close() 294 if resp.StatusCode != http.StatusBadRequest { 295 t.Errorf("expected 400, got %d", resp.StatusCode) 296 } 297} 298 299type bareResponseWriter struct { 300 code int 301 headers http.Header 302 body strings.Builder 303} 304 305func (w *bareResponseWriter) Header() http.Header { return w.headers } 306func (w *bareResponseWriter) Write(b []byte) (int, error) { return w.body.Write(b) } 307func (w *bareResponseWriter) WriteHeader(code int) { w.code = code } 308 309func TestServer_sseWithoutFlusher(t *testing.T) { 310 backend := NewMemoryBackend(100) 311 broker := NewBroker(backend) 312 ctx, cancel := context.WithCancel(context.Background()) 313 defer cancel() 314 broker.Start(ctx) 315 var cfgPtr atomic.Pointer[Configuration] 316 handler := NewServer(broker, &cfgPtr) 317 318 w := &bareResponseWriter{headers: make(http.Header)} 319 req := httptest.NewRequest("GET", "/test/topic", nil) 320 req.Header.Set("Accept", "text/event-stream") 321 322 handler.ServeHTTP(w, req) 323 324 if w.code != http.StatusInternalServerError { 325 t.Errorf("expected 500, got %d", w.code) 326 } 327} 328 329func TestServer_postInheritsVerificationFromParent(t *testing.T) { 330 cfg := &Configuration{ 331 Paths: map[string]PathConfiguration{ 332 "github.com": { 333 Verify: "hmac-sha256", 334 Secret: "parent-secret", 335 SignatureHeader: "X-Hub-Signature-256", 336 }, 337 }, 338 } 339 ts, _, cancel := newTestServer(cfg) 340 defer cancel() 341 defer ts.Close() 342 343 body := `{"action":"push"}` 344 mac := hmac.New(sha256.New, []byte("parent-secret")) 345 mac.Write([]byte(body)) 346 sig := "sha256=" + hex.EncodeToString(mac.Sum(nil)) 347 348 req, _ := http.NewRequest("POST", ts.URL+"/github.com/org/repo", strings.NewReader(body)) 349 req.Header.Set("Content-Type", "application/json") 350 req.Header.Set("X-Hub-Signature-256", sig) 351 resp, err := http.DefaultClient.Do(req) 352 if err != nil { 353 t.Fatalf("POST failed: %v", err) 354 } 355 defer resp.Body.Close() 356 if resp.StatusCode != http.StatusAccepted { 357 t.Errorf("expected 202 with valid signature, got %d", resp.StatusCode) 358 } 359 360 req, _ = http.NewRequest("POST", ts.URL+"/github.com/org/repo", strings.NewReader(body)) 361 req.Header.Set("Content-Type", "application/json") 362 req.Header.Set("X-Hub-Signature-256", "sha256=deadbeef") 363 resp, err = http.DefaultClient.Do(req) 364 if err != nil { 365 t.Fatalf("POST failed: %v", err) 366 } 367 defer resp.Body.Close() 368 if resp.StatusCode != http.StatusForbidden { 369 t.Errorf("expected 403 with invalid signature, got %d", resp.StatusCode) 370 } 371} 372 373type failingBackend struct{ MemoryBackend } 374 375func (f *failingBackend) Publish(*Event) error { 376 return fmt.Errorf("backend unavailable") 377} 378 379func TestServer_postPublishError(t *testing.T) { 380 backend := &failingBackend{MemoryBackend: *NewMemoryBackend(100)} 381 broker := NewBroker(backend) 382 ctx, cancel := context.WithCancel(context.Background()) 383 defer cancel() 384 broker.Start(ctx) 385 var cfgPtr atomic.Pointer[Configuration] 386 handler := NewServer(broker, &cfgPtr) 387 ts := httptest.NewServer(handler) 388 defer ts.Close() 389 390 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{}`)) 391 if err != nil { 392 t.Fatalf("POST failed: %v", err) 393 } 394 defer resp.Body.Close() 395 if resp.StatusCode != http.StatusInternalServerError { 396 t.Errorf("expected 500, got %d", resp.StatusCode) 397 } 398} 399 400func TestServer_postWithMissingSignatureOnSecuredPath(t *testing.T) { 401 cfg := &Configuration{ 402 Paths: map[string]PathConfiguration{ 403 "secure/path": { 404 Verify: "hmac-sha256", 405 Secret: "test-secret", 406 SignatureHeader: "X-Hub-Signature-256", 407 }, 408 }, 409 } 410 ts, _, cancel := newTestServer(cfg) 411 defer cancel() 412 defer ts.Close() 413 414 resp, err := http.Post(ts.URL+"/secure/path", "application/json", strings.NewReader(`{}`)) 415 if err != nil { 416 t.Fatalf("POST failed: %v", err) 417 } 418 defer resp.Body.Close() 419 if resp.StatusCode != http.StatusForbidden { 420 t.Errorf("expected 403, got %d", resp.StatusCode) 421 } 422}