package main import ( "bufio" "bytes" "context" "encoding/json" "math" "net/http" "net/http/httptest" "strings" "sync/atomic" "testing" "time" ) func sseSubscribe(ctx context.Context, url string, headers map[string]string) <-chan *Event { events := make(chan *Event, 10) go func() { defer close(events) req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) req.Header.Set("Accept", "text/event-stream") for k, v := range headers { req.Header.Set(k, v) } resp, err := http.DefaultClient.Do(req) if err != nil { return } defer resp.Body.Close() scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() if strings.HasPrefix(line, "data: ") { var event Event json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &event) select { case events <- &event: case <-ctx.Done(): return } } } }() return events } func TestServer_postAndSSEReceive(t *testing.T) { ts, _, cancel := newTestServer(nil) defer cancel() defer ts.Close() ctx, cancelSSE := context.WithCancel(context.Background()) defer cancelSSE() events := sseSubscribe(ctx, ts.URL+"/test/topic", nil) time.Sleep(50 * time.Millisecond) http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"hello":"world"}`)) select { case event := <-events: if event.Path != "test/topic" { t.Errorf("expected path test/topic, got %s", event.Path) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for SSE event") } } func TestServer_sseWithValidBearerToken(t *testing.T) { cfg := &Configuration{ Paths: map[string]PathConfiguration{ "private/topic": {SubscribeSecret: "my-token"}, }, } ts, _, cancel := newTestServer(cfg) defer cancel() defer ts.Close() req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil) req.Header.Set("Accept", "text/event-stream") req.Header.Set("Authorization", "Bearer my-token") client := &http.Client{Timeout: 500 * time.Millisecond} resp, err := client.Do(req) if err != nil { t.Fatalf("GET failed: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Errorf("expected 200, got %d", resp.StatusCode) } } func TestServer_sseWithWrongToken(t *testing.T) { cfg := &Configuration{ Paths: map[string]PathConfiguration{ "private/topic": {SubscribeSecret: "my-token"}, }, } ts, _, cancel := newTestServer(cfg) defer cancel() defer ts.Close() req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil) req.Header.Set("Accept", "text/event-stream") req.Header.Set("Authorization", "Bearer wrong-token") resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("GET failed: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusUnauthorized { t.Errorf("expected 401, got %d", resp.StatusCode) } } func TestServer_sseToOpenPath(t *testing.T) { ts, _, cancel := newTestServer(nil) defer cancel() defer ts.Close() req, _ := http.NewRequest("GET", ts.URL+"/open/topic", nil) req.Header.Set("Accept", "text/event-stream") client := &http.Client{Timeout: 500 * time.Millisecond} resp, err := client.Do(req) if err != nil { t.Fatalf("GET failed: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Errorf("expected 200, got %d", resp.StatusCode) } ct := resp.Header.Get("Content-Type") if ct != "text/event-stream" { t.Errorf("expected text/event-stream, got %s", ct) } } func TestServer_prefixSubscription(t *testing.T) { ts, _, cancel := newTestServer(nil) defer cancel() defer ts.Close() ctx, cancelSSE := context.WithCancel(context.Background()) defer cancelSSE() events := sseSubscribe(ctx, ts.URL+"/github.com/chrisguidry", nil) time.Sleep(50 * time.Millisecond) http.Post(ts.URL+"/github.com/chrisguidry/docketeer", "application/json", strings.NewReader(`{"ref":"main"}`)) select { case event := <-events: if event.Path != "github.com/chrisguidry/docketeer" { t.Errorf("expected child path, got %s", event.Path) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for SSE event") } } func TestServer_prefixSubscriptionWithTrailingSlash(t *testing.T) { ts, _, cancel := newTestServer(nil) defer cancel() defer ts.Close() ctx, cancelSSE := context.WithCancel(context.Background()) defer cancelSSE() events := sseSubscribe(ctx, ts.URL+"/test/", nil) time.Sleep(50 * time.Millisecond) http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"hello":"world"}`)) select { case event := <-events: if event.Path != "test/topic" { t.Errorf("expected test/topic, got %s", event.Path) } case <-time.After(2 * time.Second): t.Fatal("timed out: trailing slash subscribe should receive child events") } } func TestServer_lastEventIDReplay(t *testing.T) { ts, broker, cancel := newTestServer(nil) defer cancel() defer ts.Close() broker.Publish(&Event{ ID: "replay-1", Path: "test/topic", Payload: map[string]any{"n": 1}, }) broker.Publish(&Event{ ID: "replay-2", Path: "test/topic", Payload: map[string]any{"n": 2}, }) ctx, cancelSSE := context.WithCancel(context.Background()) defer cancelSSE() events := sseSubscribe(ctx, ts.URL+"/test/topic", map[string]string{ "Last-Event-ID": "replay-1", }) select { case event := <-events: if event.ID != "replay-2" { t.Errorf("expected replay-2, got %s", event.ID) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for replayed event") } } func TestServer_filterQueryParam(t *testing.T) { ts, _, cancel := newTestServer(nil) defer cancel() defer ts.Close() ctx, cancelSSE := context.WithCancel(context.Background()) defer cancelSSE() events := sseSubscribe(ctx, ts.URL+"/test/topic?filter=payload.ref:refs/heads/main", nil) time.Sleep(50 * time.Millisecond) http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"ref":"refs/heads/develop"}`)) time.Sleep(20 * time.Millisecond) http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"ref":"refs/heads/main"}`)) select { case event := <-events: payload := event.Payload.(map[string]any) if payload["ref"] != "refs/heads/main" { t.Errorf("expected filtered event with ref=main, got %v", payload["ref"]) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for filtered event") } } func TestServer_sseWithMissingAuth(t *testing.T) { cfg := &Configuration{ Paths: map[string]PathConfiguration{ "private/topic": {SubscribeSecret: "my-token"}, }, } ts, _, cancel := newTestServer(cfg) defer cancel() defer ts.Close() req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil) req.Header.Set("Accept", "text/event-stream") resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("GET failed: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusUnauthorized { t.Errorf("expected 401, got %d", resp.StatusCode) } } func TestServer_sseChannelClosed(t *testing.T) { backend := NewMemoryBackend(100) broker := NewBroker(backend) ctx, cancel := context.WithCancel(context.Background()) defer cancel() broker.Start(ctx) var cfgPtr atomic.Pointer[Configuration] handler := NewServer(broker, &cfgPtr) ts := httptest.NewServer(handler) defer ts.Close() sseCtx, sseCancel := context.WithCancel(context.Background()) defer sseCancel() events := sseSubscribe(sseCtx, ts.URL+"/test/topic", nil) time.Sleep(50 * time.Millisecond) broker.mu.Lock() for _, subs := range broker.subscribers { for _, sub := range subs { close(sub.ch) } } broker.subscribers = make(map[string][]*subscriber) broker.mu.Unlock() time.Sleep(50 * time.Millisecond) select { case _, ok := <-events: if ok { t.Error("expected channel to be closed") } case <-time.After(time.Second): t.Fatal("timed out waiting for SSE to close") } } func TestServer_sseSkipsUnmarshalableEvent(t *testing.T) { ts, broker, cancel := newTestServer(nil) defer cancel() defer ts.Close() ctx, cancelSSE := context.WithCancel(context.Background()) defer cancelSSE() events := sseSubscribe(ctx, ts.URL+"/test/topic", nil) time.Sleep(50 * time.Millisecond) broker.Publish(&Event{ ID: "bad-event", Path: "test/topic", Payload: math.NaN(), }) broker.Publish(&Event{ ID: "good-event", Path: "test/topic", Payload: map[string]any{"ok": true}, }) select { case event := <-events: if event.ID != "good-event" { t.Errorf("expected good-event, got %s", event.ID) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for good event after unmarshalable one") } } func TestServer_textPayloadStoredAsString(t *testing.T) { ts, _, cancel := newTestServer(nil) defer cancel() defer ts.Close() ctx, cancelSSE := context.WithCancel(context.Background()) defer cancelSSE() done := make(chan string, 1) go func() { req, _ := http.NewRequestWithContext(ctx, "GET", ts.URL+"/test/topic", nil) req.Header.Set("Accept", "text/event-stream") resp, err := http.DefaultClient.Do(req) if err != nil { return } defer resp.Body.Close() scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() if strings.HasPrefix(line, "data: ") { var envelope struct { Payload json.RawMessage `json:"payload"` } json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &envelope) var s string if json.Unmarshal(envelope.Payload, &s) == nil { done <- s return } } } }() time.Sleep(50 * time.Millisecond) resp, _ := http.Post(ts.URL+"/test/topic", "text/plain", bytes.NewReader([]byte("hello world"))) resp.Body.Close() select { case payload := <-done: if payload != "hello world" { t.Errorf("expected plain text payload, got %s", payload) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for event") } }