Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "sync"
9 "time"
10
11 "github.com/redis/go-redis/v9"
12)
13
14const (
15 redisChannel = "wicket:events"
16 redisBufferKey = "wicket:buffer"
17)
18
19type RedisBackend struct {
20 client *redis.Client
21 bufferSize int
22 mu sync.Mutex
23 closed bool
24}
25
26func NewRedisBackend(url string, bufferSize int) (*RedisBackend, error) {
27 opts, err := redis.ParseURL(url)
28 if err != nil {
29 return nil, fmt.Errorf("parsing redis URL: %w", err)
30 }
31 client := redis.NewClient(opts)
32 if err := client.Ping(context.Background()).Err(); err != nil {
33 return nil, fmt.Errorf("connecting to redis: %w", err)
34 }
35 return &RedisBackend{
36 client: client,
37 bufferSize: bufferSize,
38 }, nil
39}
40
41func newRedisBackendFromClient(client *redis.Client, bufferSize int) *RedisBackend {
42 return &RedisBackend{
43 client: client,
44 bufferSize: bufferSize,
45 }
46}
47
48func (r *RedisBackend) Publish(event *Event) error {
49 data, err := json.Marshal(event)
50 if err != nil {
51 return fmt.Errorf("marshaling event: %w", err)
52 }
53 ctx := context.Background()
54 pipe := r.client.Pipeline()
55 pipe.LPush(ctx, redisBufferKey, data)
56 pipe.LTrim(ctx, redisBufferKey, 0, int64(r.bufferSize-1))
57 pipe.Expire(ctx, redisBufferKey, 24*time.Hour)
58 pipe.Publish(ctx, redisChannel, data)
59 _, err = pipe.Exec(ctx)
60 return err
61}
62
63func (r *RedisBackend) Subscribe(ctx context.Context) <-chan *Event {
64 ch := make(chan *Event, 256)
65 pubsub := r.client.Subscribe(ctx, redisChannel)
66
67 go func() {
68 defer close(ch)
69 defer pubsub.Close()
70 msgCh := pubsub.Channel()
71 for {
72 select {
73 case <-ctx.Done():
74 return
75 case msg := <-msgCh:
76 if msg == nil {
77 return
78 }
79 var event Event
80 if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
81 log.Printf("redis: unmarshal event: %v", err)
82 continue
83 }
84 select {
85 case ch <- &event:
86 default:
87 }
88 }
89 }
90 }()
91
92 return ch
93}
94
95func (r *RedisBackend) Since(lastEventID string, subscribePath string) []*Event {
96 ctx := context.Background()
97 vals, err := r.client.LRange(ctx, redisBufferKey, 0, -1).Result()
98 if err != nil {
99 log.Printf("redis: LRANGE: %v", err)
100 return nil
101 }
102
103 // Redis list is newest-first (LPUSH), reverse to chronological order
104 events := make([]*Event, 0, len(vals))
105 for i := len(vals) - 1; i >= 0; i-- {
106 var e Event
107 if err := json.Unmarshal([]byte(vals[i]), &e); err != nil {
108 continue
109 }
110 events = append(events, &e)
111 }
112
113 found := false
114 foundIdx := 0
115 for i, e := range events {
116 if e.ID == lastEventID {
117 found = true
118 foundIdx = i + 1
119 break
120 }
121 }
122 if !found {
123 foundIdx = 0
124 }
125
126 var result []*Event
127 for i := foundIdx; i < len(events); i++ {
128 if pathMatches(subscribePath, events[i].Path) {
129 result = append(result, events[i])
130 }
131 }
132 return result
133}
134
135func (r *RedisBackend) Close() error {
136 r.mu.Lock()
137 defer r.mu.Unlock()
138 if r.closed {
139 return nil
140 }
141 r.closed = true
142 return r.client.Close()
143}