A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
1package webhooks
2
3import (
4 "context"
5 "crypto/hmac"
6 "crypto/sha256"
7 "encoding/hex"
8 "encoding/json"
9 "fmt"
10 "io"
11 "log/slog"
12 "math/rand/v2"
13 "net/http"
14 "strings"
15 "time"
16
17 "atcr.io/pkg/appview/db"
18 "atcr.io/pkg/appview/storage"
19 "atcr.io/pkg/atproto"
20)
21
22// Dispatcher handles webhook delivery for push and scan notifications.
23// It reads webhooks from the appview DB and delivers payloads
24// with Discord/Slack formatting and HMAC signing.
25type Dispatcher struct {
26 db db.DBTX
27 meta atproto.AppviewMetadata
28}
29
30// NewDispatcher creates a new webhook dispatcher
31func NewDispatcher(database db.DBTX, meta atproto.AppviewMetadata) *Dispatcher {
32 return &Dispatcher{
33 db: database,
34 meta: meta,
35 }
36}
37
38// DispatchForScan fires matching webhooks after a scan record arrives via Jetstream.
39// previousScan is nil for first-time scans. userHandle is used for payload enrichment.
40func (d *Dispatcher) DispatchForScan(ctx context.Context, scan, previousScan *db.Scan, userHandle, tag, holdEndpoint string) {
41 webhooks, err := db.GetWebhooksForUser(d.db, scan.UserDID)
42 if err != nil || len(webhooks) == 0 {
43 return
44 }
45
46 isFirst := previousScan == nil
47 isChanged := previousScan != nil && vulnCountsChanged(scan, previousScan)
48
49 scanInfo := WebhookScanInfo{
50 ScannedAt: scan.ScannedAt.Format(time.RFC3339),
51 ScannerVersion: scan.ScannerVersion,
52 Vulnerabilities: WebhookVulnCounts{
53 Critical: scan.Critical,
54 High: scan.High,
55 Medium: scan.Medium,
56 Low: scan.Low,
57 Total: scan.Total,
58 },
59 }
60
61 manifestInfo := WebhookManifestInfo{
62 Digest: scan.ManifestDigest,
63 Repository: scan.Repository,
64 Tag: tag,
65 UserDID: scan.UserDID,
66 UserHandle: userHandle,
67 }
68
69 for _, wh := range webhooks {
70 // Check each trigger condition against bitmask
71 var triggers []string
72 if wh.Triggers&TriggerFirst != 0 && isFirst {
73 triggers = append(triggers, "scan:first")
74 }
75 if wh.Triggers&TriggerAll != 0 {
76 triggers = append(triggers, "scan:all")
77 }
78 if wh.Triggers&TriggerChanged != 0 && isChanged {
79 triggers = append(triggers, "scan:changed")
80 }
81
82 for _, trigger := range triggers {
83 payload := WebhookPayload{
84 Trigger: trigger,
85 HoldDID: scan.HoldDID,
86 HoldEndpoint: holdEndpoint,
87 Manifest: manifestInfo,
88 Scan: scanInfo,
89 }
90
91 // Include previous counts for scan:changed
92 if trigger == "scan:changed" && previousScan != nil {
93 payload.Previous = &WebhookVulnCounts{
94 Critical: previousScan.Critical,
95 High: previousScan.High,
96 Medium: previousScan.Medium,
97 Low: previousScan.Low,
98 Total: previousScan.Total,
99 }
100 }
101
102 payloadBytes, err := json.Marshal(payload)
103 if err != nil {
104 slog.Error("Failed to marshal webhook payload", "error", err)
105 continue
106 }
107
108 go d.deliverWithRetry(wh.URL, wh.Secret, payloadBytes)
109 }
110 }
111}
112
113// DispatchForPush fires matching webhooks after a manifest is pushed.
114func (d *Dispatcher) DispatchForPush(ctx context.Context, event storage.PushWebhookEvent) {
115 webhooks, err := db.GetWebhooksForUser(d.db, event.OwnerDID)
116 if err != nil || len(webhooks) == 0 {
117 return
118 }
119
120 // Fetch star/pull counts for payload enrichment
121 var starCount, pullCount int
122 stats, err := db.GetRepositoryStats(d.db, event.OwnerDID, event.Repository)
123 if err == nil && stats != nil {
124 starCount = stats.StarCount
125 pullCount = stats.PullCount
126 }
127
128 // Build repo URL using the primary registry domain (pull domain) if available
129 baseURL := d.meta.BaseURL
130 if len(d.meta.RegistryDomains) > 0 {
131 baseURL = "https://" + d.meta.RegistryDomains[0]
132 }
133 repoURL := fmt.Sprintf("%s/%s/%s", baseURL, event.OwnerHandle, event.Repository)
134
135 payload := PushWebhookPayload{
136 Trigger: "push",
137 PushData: PushData{
138 PushedAt: time.Now().Format(time.RFC3339),
139 Pusher: event.PusherHandle,
140 PusherDID: event.PusherDID,
141 Tag: event.Tag,
142 Digest: event.Digest,
143 },
144 Repository: PushRepository{
145 Name: event.Repository,
146 Namespace: event.OwnerHandle,
147 RepoName: event.OwnerHandle + "/" + event.Repository,
148 RepoURL: repoURL,
149 MediaType: event.MediaType,
150 StarCount: starCount,
151 PullCount: pullCount,
152 },
153 Hold: PushHold{
154 DID: event.HoldDID,
155 Endpoint: event.HoldEndpoint,
156 },
157 }
158
159 payloadBytes, err := json.Marshal(payload)
160 if err != nil {
161 slog.Error("Failed to marshal push webhook payload", "error", err)
162 return
163 }
164
165 for _, wh := range webhooks {
166 if wh.Triggers&TriggerPush == 0 {
167 continue
168 }
169 go d.deliverWithRetry(wh.URL, wh.Secret, payloadBytes)
170 }
171}
172
173// DeliverTest sends a test payload to a specific webhook (synchronous, single attempt)
174func (d *Dispatcher) DeliverTest(ctx context.Context, webhookID, userDID, userHandle string) (bool, error) {
175 wh, err := db.GetWebhookByID(d.db, webhookID)
176 if err != nil {
177 return false, err
178 }
179 if wh.UserDID != userDID {
180 return false, fmt.Errorf("unauthorized")
181 }
182
183 // Randomize vulnerability counts so each test shows a different severity color
184 critical := rand.IntN(3)
185 high := rand.IntN(5)
186 medium := rand.IntN(8)
187 low := rand.IntN(10)
188 total := critical + high + medium + low
189
190 payload := WebhookPayload{
191 Trigger: "test",
192 Manifest: WebhookManifestInfo{
193 Digest: "sha256:0000000000000000000000000000000000000000000000000000000000000000",
194 Repository: "test-repo",
195 Tag: "latest",
196 UserDID: userDID,
197 UserHandle: userHandle,
198 },
199 Scan: WebhookScanInfo{
200 ScannedAt: time.Now().Format(time.RFC3339),
201 ScannerVersion: "atcr-scanner-v1.0.0",
202 Vulnerabilities: WebhookVulnCounts{
203 Critical: critical, High: high, Medium: medium, Low: low, Total: total,
204 },
205 },
206 }
207
208 payloadBytes, _ := json.Marshal(payload)
209 success := d.attemptDelivery(wh.URL, wh.Secret, payloadBytes)
210 return success, nil
211}
212
213// deliverWithRetry attempts to deliver a webhook with exponential backoff
214func (d *Dispatcher) deliverWithRetry(webhookURL, secret string, payload []byte) {
215 delays := []time.Duration{0, 30 * time.Second, 2 * time.Minute, 8 * time.Minute}
216 for attempt, delay := range delays {
217 if attempt > 0 {
218 time.Sleep(delay)
219 }
220 if d.attemptDelivery(webhookURL, secret, payload) {
221 return
222 }
223 }
224 slog.Warn("Webhook delivery failed after retries", "url", maskURL(webhookURL))
225}
226
227// attemptDelivery sends a single webhook HTTP POST
228func (d *Dispatcher) attemptDelivery(webhookURL, secret string, payload []byte) bool {
229 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
230 defer cancel()
231
232 // Reformat payload for platform-specific webhook APIs
233 sendPayload := payload
234 if isDiscordWebhook(webhookURL) || isSlackWebhook(webhookURL) {
235 formatted, fmtErr := formatPlatformPayload(payload, webhookURL, d.meta)
236 if fmtErr == nil {
237 sendPayload = formatted
238 }
239 }
240
241 req, err := http.NewRequestWithContext(ctx, "POST", webhookURL, strings.NewReader(string(sendPayload)))
242 if err != nil {
243 slog.Warn("Failed to create webhook request", "error", err)
244 return false
245 }
246
247 req.Header.Set("Content-Type", "application/json")
248 req.Header.Set("User-Agent", d.meta.ClientShortName+"-Webhook/1.0")
249
250 // HMAC signing if secret is set (signs the actual payload sent)
251 if secret != "" {
252 mac := hmac.New(sha256.New, []byte(secret))
253 mac.Write(sendPayload)
254 sig := hex.EncodeToString(mac.Sum(nil))
255 req.Header.Set("X-Webhook-Signature-256", "sha256="+sig)
256 }
257
258 client := &http.Client{Timeout: 10 * time.Second}
259 resp, err := client.Do(req)
260 if err != nil {
261 slog.Warn("Webhook delivery attempt failed", "url", maskURL(webhookURL), "error", err)
262 return false
263 }
264 defer resp.Body.Close()
265
266 if resp.StatusCode >= 200 && resp.StatusCode < 300 {
267 slog.Info("Webhook delivered successfully", "url", maskURL(webhookURL), "status", resp.StatusCode)
268 return true
269 }
270
271 // Read response body for debugging
272 body, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
273 slog.Warn("Webhook delivery got non-2xx response",
274 "url", maskURL(webhookURL),
275 "status", resp.StatusCode,
276 "body", string(body))
277 return false
278}
279
280// vulnCountsChanged checks if vulnerability counts differ between scans
281func vulnCountsChanged(current, previous *db.Scan) bool {
282 return current.Critical != previous.Critical ||
283 current.High != previous.High ||
284 current.Medium != previous.Medium ||
285 current.Low != previous.Low
286}