Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "encoding/base64"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log"
9 "mime"
10 "net/http"
11 "strings"
12 "sync/atomic"
13 "time"
14
15 "github.com/google/uuid"
16)
17
18var version = "dev"
19
20type Server struct {
21 broker *Broker
22 config *atomic.Pointer[Configuration]
23}
24
25func NewServer(broker *Broker, config *atomic.Pointer[Configuration]) http.Handler {
26 s := &Server{broker: broker, config: config}
27 return http.HandlerFunc(s.ServeHTTP)
28}
29
30func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
31 setCORSHeaders(w)
32
33 if r.Method == "GET" && r.URL.Path == "/" {
34 w.Header().Set("Content-Type", "text/plain")
35 fmt.Fprintf(w, "wicket %s\n", version)
36 return
37 }
38
39 if r.Method == "GET" && r.URL.Path == "/_health" {
40 w.WriteHeader(http.StatusNoContent)
41 return
42 }
43
44 switch r.Method {
45 case "OPTIONS":
46 w.WriteHeader(http.StatusNoContent)
47 case "POST":
48 s.handlePost(w, r)
49 case "GET":
50 if !strings.Contains(r.Header.Get("Accept"), "text/event-stream") {
51 http.NotFound(w, r)
52 return
53 }
54 s.handleSSE(w, r)
55 default:
56 http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
57 }
58}
59
60func normalizePath(raw string) string {
61 return strings.TrimRight(strings.TrimPrefix(raw, "/"), "/")
62}
63
64func setCORSHeaders(w http.ResponseWriter) {
65 w.Header().Set("Access-Control-Allow-Origin", "*")
66 w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
67 w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, Last-Event-ID")
68}
69
70func (s *Server) handlePost(w http.ResponseWriter, r *http.Request) {
71 path := normalizePath(r.URL.Path)
72
73 body, _ := io.ReadAll(r.Body)
74
75 cfg := s.config.Load()
76 if pc := cfg.LookupVerification(path); pc != nil {
77 verifier, err := NewVerifier(pc.Verify)
78 if err != nil {
79 http.Error(w, "server configuration error", http.StatusInternalServerError)
80 return
81 }
82 if err := verifier.Verify(body, r.Header, pc.Secret, pc.SignatureHeader); err != nil {
83 log.Printf("verification failed for %s: %v, headers: %v", path, err, r.Header)
84 http.Error(w, "forbidden", http.StatusForbidden)
85 return
86 }
87 }
88
89 var payload any
90 mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
91 if err == nil && mediaType == "application/json" {
92 if err := json.Unmarshal(body, &payload); err != nil {
93 http.Error(w, "invalid JSON", http.StatusBadRequest)
94 return
95 }
96 } else if err == nil && isTextContent(mediaType, params) {
97 payload = string(body)
98 } else {
99 payload = base64.StdEncoding.EncodeToString(body)
100 }
101
102 headers := extractHeaders(r.Header)
103
104 event := &Event{
105 ID: uuid.New().String(),
106 Timestamp: time.Now().UTC(),
107 Method: r.Method,
108 Path: path,
109 Headers: headers,
110 Payload: payload,
111 }
112
113 if err := s.broker.Publish(event); err != nil {
114 log.Printf("publish failed for %s: %v", path, err)
115 http.Error(w, "publish failed", http.StatusInternalServerError)
116 return
117 }
118 log.Printf("published %s %s (%s) id=%s", r.Method, path, r.Header.Get("Content-Type"), event.ID)
119 w.WriteHeader(http.StatusAccepted)
120}
121
122func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) {
123 path := normalizePath(r.URL.Path)
124
125 cfg := s.config.Load()
126 if secret := cfg.LookupSubscribeSecret(path); secret != "" {
127 auth := r.Header.Get("Authorization")
128 if !strings.HasPrefix(auth, "Bearer ") || strings.TrimPrefix(auth, "Bearer ") != secret {
129 http.Error(w, "unauthorized", http.StatusUnauthorized)
130 return
131 }
132 }
133
134 flusher, ok := w.(http.Flusher)
135 if !ok {
136 http.Error(w, "streaming unsupported", http.StatusInternalServerError)
137 return
138 }
139
140 filters := ParseFilters(r.URL.Query())
141 lastEventID := r.Header.Get("Last-Event-ID")
142
143 ch, unsub := s.broker.Subscribe(path, lastEventID)
144 defer unsub()
145
146 w.Header().Set("Content-Type", "text/event-stream")
147 w.Header().Set("Cache-Control", "no-cache")
148 w.Header().Set("Connection", "keep-alive")
149 w.WriteHeader(http.StatusOK)
150 flusher.Flush()
151
152 ctx := r.Context()
153 for {
154 select {
155 case <-ctx.Done():
156 return
157 case event, ok := <-ch:
158 if !ok {
159 return
160 }
161 if !MatchAll(filters, event) {
162 continue
163 }
164 data, err := json.Marshal(event)
165 if err != nil {
166 log.Printf("marshaling event %s: %v", event.ID, err)
167 continue
168 }
169 fmt.Fprintf(w, "id: %s\ndata: %s\n\n", event.ID, data)
170 flusher.Flush()
171 }
172 }
173}
174
175var hopByHopHeaders = map[string]bool{
176 "Connection": true,
177 "Keep-Alive": true,
178 "Proxy-Authenticate": true,
179 "Proxy-Authorization": true,
180 "Te": true,
181 "Trailer": true,
182 "Transfer-Encoding": true,
183 "Upgrade": true,
184 "Host": true,
185 "Content-Length": true,
186}
187
188func extractHeaders(h http.Header) map[string]string {
189 headers := make(map[string]string)
190 for name, values := range h {
191 if hopByHopHeaders[name] {
192 continue
193 }
194 headers[name] = values[0]
195 }
196 return headers
197}
198
199func isTextContent(mediaType string, params map[string]string) bool {
200 if strings.HasPrefix(mediaType, "text/") {
201 return true
202 }
203 if mediaType == "application/x-www-form-urlencoded" ||
204 mediaType == "application/xml" ||
205 mediaType == "application/xhtml+xml" {
206 return true
207 }
208 if _, ok := params["charset"]; ok {
209 return true
210 }
211 return false
212}