Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "context"
5 "strings"
6 "sync"
7 "sync/atomic"
8)
9
10type subscriber struct {
11 ch chan *Event
12 path string
13 startSeq int64
14}
15
16type Broker struct {
17 mu sync.RWMutex
18 subscribers map[string][]*subscriber
19 backend Backend
20 seq atomic.Int64
21}
22
23func NewBroker(backend Backend) *Broker {
24 return &Broker{
25 subscribers: make(map[string][]*subscriber),
26 backend: backend,
27 }
28}
29
30func (b *Broker) Start(ctx context.Context) {
31 ch := b.backend.Subscribe(ctx)
32 go func() {
33 var fanOutSeq int64
34 for event := range ch {
35 fanOutSeq++
36 b.fanOut(event, fanOutSeq)
37 }
38 }()
39}
40
41func (b *Broker) fanOut(event *Event, seq int64) {
42 b.mu.RLock()
43 defer b.mu.RUnlock()
44
45 for _, path := range publishPaths(event.Path) {
46 for _, sub := range b.subscribers[path] {
47 if seq <= sub.startSeq {
48 continue
49 }
50 select {
51 case sub.ch <- event:
52 default:
53 }
54 }
55 }
56}
57
58func (b *Broker) Publish(event *Event) error {
59 b.seq.Add(1)
60 return b.backend.Publish(event)
61}
62
63func (b *Broker) Subscribe(path string, lastEventID string) (<-chan *Event, func()) {
64 ch := make(chan *Event, 64)
65 sub := &subscriber{ch: ch, path: path, startSeq: b.seq.Load()}
66
67 b.mu.Lock()
68 b.subscribers[path] = append(b.subscribers[path], sub)
69 b.mu.Unlock()
70
71 if lastEventID != "" {
72 events := b.backend.Since(lastEventID, path)
73 for _, e := range events {
74 ch <- e
75 }
76 }
77
78 unsub := func() {
79 b.mu.Lock()
80 defer b.mu.Unlock()
81 subs := b.subscribers[path]
82 for i, s := range subs {
83 if s == sub {
84 b.subscribers[path] = append(subs[:i], subs[i+1:]...)
85 break
86 }
87 }
88 if len(b.subscribers[path]) == 0 {
89 delete(b.subscribers, path)
90 }
91 func() {
92 defer func() { recover() }()
93 close(ch)
94 }()
95 }
96
97 return ch, unsub
98}
99
100func publishPaths(eventPath string) []string {
101 paths := []string{eventPath}
102 for {
103 i := strings.LastIndex(eventPath, "/")
104 if i < 0 {
105 break
106 }
107 eventPath = eventPath[:i]
108 paths = append(paths, eventPath)
109 }
110 if paths[len(paths)-1] != "" {
111 paths = append(paths, "")
112 }
113 return paths
114}