forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "slices"
10 "time"
11
12 "tangled.org/core/appview/cloudflare"
13 "tangled.org/core/appview/notify"
14
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/models"
20 "tangled.org/core/appview/sites"
21 ec "tangled.org/core/eventconsumer"
22 "tangled.org/core/eventconsumer/cursor"
23 knotdb "tangled.org/core/knotserver/db"
24 "tangled.org/core/log"
25 "tangled.org/core/orm"
26 "tangled.org/core/rbac"
27 "tangled.org/core/workflow"
28
29 "github.com/bluesky-social/indigo/atproto/syntax"
30 "github.com/go-git/go-git/v5/plumbing"
31 "github.com/posthog/posthog-go"
32)
33
34func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) {
35 logger := log.FromContext(ctx)
36 logger = log.SubLogger(logger, "knotstream")
37
38 knots, err := db.GetRegistrations(
39 d,
40 orm.FilterIsNot("registered", "null"),
41 )
42 if err != nil {
43 return nil, err
44 }
45
46 srcs := make(map[ec.Source]struct{})
47 for _, k := range knots {
48 s := ec.NewKnotSource(k.Domain)
49 srcs[s] = struct{}{}
50 }
51
52 cache := cache.New(c.Redis.Addr)
53 cursorStore := cursor.NewRedisCursorStore(cache)
54
55 cfg := ec.ConsumerConfig{
56 Sources: srcs,
57 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient),
58 RetryInterval: c.Knotstream.RetryInterval,
59 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
60 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
61 WorkerCount: c.Knotstream.WorkerCount,
62 QueueSize: c.Knotstream.QueueSize,
63 Logger: logger,
64 Dev: c.Core.Dev,
65 CursorStore: &cursorStore,
66 }
67
68 return ec.NewConsumer(cfg), nil
69}
70
71func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) {
72 if repoDid != nil && *repoDid != "" {
73 return db.GetRepoByDid(d, *repoDid)
74 }
75 repos, err := db.GetRepos(d, orm.FilterEq("did", ownerDid), orm.FilterEq("name", repoName))
76 if err != nil {
77 return nil, err
78 }
79 if len(repos) == 0 {
80 return nil, sql.ErrNoRows
81 }
82 return &repos[0], nil
83}
84
85func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc {
86 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
87 switch msg.Nsid {
88 case tangled.GitRefUpdateNSID:
89 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg)
90 case tangled.PipelineNSID:
91 return ingestPipeline(d, source, msg)
92 case knotdb.RepoDIDAssignNSID:
93 return ingestDIDAssign(d, enforcer, source, msg, ctx)
94 }
95
96 return nil
97 }
98}
99
100func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg ec.Message) error {
101 logger := log.FromContext(ctx)
102
103 var record tangled.GitRefUpdate
104 err := json.Unmarshal(msg.EventJson, &record)
105 if err != nil {
106 return err
107 }
108
109 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
110 if err != nil {
111 return err
112 }
113 if !slices.Contains(knownKnots, source.Key()) {
114 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
115 }
116
117 ownerDid := ""
118 if record.OwnerDid != nil {
119 ownerDid = *record.OwnerDid
120 } else {
121 // handle legacy event
122 if record.RepoDid != nil {
123 ownerDid = *record.RepoDid
124 }
125 }
126
127 repo, lookupErr := resolveRepo(d, record.RepoDid, ownerDid, record.RepoName)
128 if lookupErr != nil {
129 return fmt.Errorf("failed to look up repo: %w", lookupErr)
130 }
131
132 logger.Info("processing gitRefUpdate event",
133 "repo", repo.RepoIdentifier(),
134 "ref", record.Ref,
135 "old_sha", record.OldSha,
136 "new_sha", record.NewSha)
137
138 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
139
140 errPunchcard := populatePunchcard(d, record)
141 errLanguages := updateRepoLanguages(d, record)
142
143 var errPosthog error
144 if !dev && record.CommitterDid != "" {
145 errPosthog = pc.Enqueue(posthog.Capture{
146 DistinctId: record.CommitterDid,
147 Event: "git_ref_update",
148 })
149 }
150
151 // Trigger a sites redeploy if this push is to the configured sites branch.
152 if cfClient.Enabled() {
153 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source)
154 }
155
156 return errors.Join(errPunchcard, errLanguages, errPosthog)
157}
158
159// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites
160// branch configured for this repo and, if so, syncs the site to R2
161func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, c *config.Config, record tangled.GitRefUpdate, source ec.Source) {
162 logger := log.FromContext(ctx)
163
164 ref := plumbing.ReferenceName(record.Ref)
165 if !ref.IsBranch() {
166 return
167 }
168 pushedBranch := ref.Short()
169
170 ownerDid := ""
171 if record.OwnerDid != nil {
172 ownerDid = *record.OwnerDid
173 }
174
175 repo, err := resolveRepo(d, record.RepoDid, ownerDid, record.RepoName)
176 if err != nil {
177 return
178 }
179
180 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoAt().String())
181 if err != nil || siteConfig == nil {
182 return
183 }
184 if siteConfig.Branch != pushedBranch {
185 return
186 }
187
188 scheme := "https"
189 if c.Core.Dev {
190 scheme = "http"
191 }
192 knotHost := fmt.Sprintf("%s://%s", scheme, source.Key())
193
194 deploy := &models.SiteDeploy{
195 RepoAt: repo.RepoAt().String(),
196 Branch: siteConfig.Branch,
197 Dir: siteConfig.Dir,
198 CommitSHA: record.NewSha,
199 Trigger: models.SiteDeployTriggerPush,
200 }
201
202 deployErr := sites.Deploy(ctx, cfClient, knotHost, repo.RepoIdentifier(), record.RepoName, siteConfig.Branch, siteConfig.Dir)
203 if deployErr != nil {
204 logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr)
205 deploy.Status = models.SiteDeployStatusFailure
206 deploy.Error = deployErr.Error()
207 } else {
208 deploy.Status = models.SiteDeployStatusSuccess
209 }
210
211 if err := db.AddSiteDeploy(d, deploy); err != nil {
212 logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err)
213 }
214
215 if deployErr == nil {
216 logger.Info("site deployed to r2", "repo", repo.RepoIdentifier())
217 }
218}
219
220func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
221 if record.CommitterDid == "" {
222 return nil
223 }
224
225 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
226 if err != nil {
227 return err
228 }
229
230 count := 0
231 for _, ke := range knownEmails {
232 if record.Meta == nil {
233 continue
234 }
235 if record.Meta.CommitCount == nil {
236 continue
237 }
238 for _, ce := range record.Meta.CommitCount.ByEmail {
239 if ce == nil {
240 continue
241 }
242 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
243 count += int(ce.Count)
244 }
245 }
246 }
247
248 punch := models.Punch{
249 Did: record.CommitterDid,
250 Date: time.Now(),
251 Count: count,
252 }
253 return db.AddPunch(d, punch)
254}
255
256func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
257 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
258 return fmt.Errorf("empty language data for repo: %v/%s", record.OwnerDid, record.RepoName)
259 }
260
261 ownerDid := ""
262 if record.OwnerDid != nil {
263 ownerDid = *record.OwnerDid
264 }
265
266 r, lookupErr := resolveRepo(d, record.RepoDid, ownerDid, record.RepoName)
267 if lookupErr != nil {
268 return fmt.Errorf("failed to look up repo: %w", lookupErr)
269 }
270 repo := *r
271
272 ref := plumbing.ReferenceName(record.Ref)
273 if !ref.IsBranch() {
274 return fmt.Errorf("%s is not a valid reference name", ref)
275 }
276
277 var langs []models.RepoLanguage
278 for _, l := range record.Meta.LangBreakdown.Inputs {
279 if l == nil {
280 continue
281 }
282
283 langs = append(langs, models.RepoLanguage{
284 RepoAt: repo.RepoAt(),
285 Ref: ref.Short(),
286 IsDefaultRef: record.Meta.IsDefaultRef,
287 Language: l.Lang,
288 Bytes: l.Size,
289 })
290 }
291
292 tx, err := d.Begin()
293 if err != nil {
294 return err
295 }
296 defer tx.Rollback()
297
298 // update appview's cache
299 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
300 if err != nil {
301 fmt.Printf("failed; %s\n", err)
302 // non-fatal
303 }
304
305 return tx.Commit()
306}
307
308func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
309 var record tangled.Pipeline
310 err := json.Unmarshal(msg.EventJson, &record)
311 if err != nil {
312 return err
313 }
314
315 if record.TriggerMetadata == nil {
316 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
317 }
318
319 if record.TriggerMetadata.Repo == nil {
320 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
321 }
322
323 repoName := ""
324 if record.TriggerMetadata.Repo.Repo != nil {
325 repoName = *record.TriggerMetadata.Repo.Repo
326 }
327
328 repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName)
329 if lookupErr != nil {
330 return fmt.Errorf("failed to look up repo: %w", lookupErr)
331 }
332 if repo.Spindle == "" {
333 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
334 }
335
336 // trigger info
337 var trigger models.Trigger
338 var sha string
339 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
340 switch trigger.Kind {
341 case workflow.TriggerKindPush:
342 trigger.PushRef = &record.TriggerMetadata.Push.Ref
343 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
344 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
345 sha = *trigger.PushNewSha
346 case workflow.TriggerKindPullRequest:
347 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
348 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
349 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
350 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
351 sha = *trigger.PRSourceSha
352 }
353
354 tx, err := d.Begin()
355 if err != nil {
356 return fmt.Errorf("failed to start txn: %w", err)
357 }
358
359 triggerId, err := db.AddTrigger(tx, trigger)
360 if err != nil {
361 return fmt.Errorf("failed to add trigger entry: %w", err)
362 }
363
364 pipeline := models.Pipeline{
365 Rkey: msg.Rkey,
366 Knot: source.Key(),
367 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
368 RepoName: repoName,
369 RepoDid: repo.RepoDid,
370 TriggerId: int(triggerId),
371 Sha: sha,
372 }
373
374 err = db.AddPipeline(tx, pipeline)
375 if err != nil {
376 return fmt.Errorf("failed to add pipeline: %w", err)
377 }
378
379 err = tx.Commit()
380 if err != nil {
381 return fmt.Errorf("failed to commit txn: %w", err)
382 }
383
384 return nil
385}
386
387func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg ec.Message, ctx context.Context) error {
388 logger := log.FromContext(ctx)
389
390 var record knotdb.RepoDIDAssign
391 if err := json.Unmarshal(msg.EventJson, &record); err != nil {
392 return fmt.Errorf("unmarshal didAssign: %w", err)
393 }
394
395 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" {
396 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q",
397 record.RepoDid, record.OwnerDid, record.RepoName)
398 }
399
400 logger.Info("processing didAssign event",
401 "repo_did", record.RepoDid,
402 "owner_did", record.OwnerDid,
403 "repo_name", record.RepoName)
404
405 repos, err := db.GetRepos(d,
406 orm.FilterEq("did", record.OwnerDid),
407 orm.FilterEq("name", record.RepoName),
408 )
409 if err != nil || len(repos) == 0 {
410 logger.Warn("didAssign for unknown repo, skipping",
411 "owner_did", record.OwnerDid,
412 "repo_name", record.RepoName)
413 return nil
414 }
415 repo := repos[0]
416 knot := source.Key()
417
418 if repo.Knot != knot {
419 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot)
420 }
421
422 repoAtUri := repo.RepoAt().String()
423 legacyResource := record.OwnerDid + "/" + record.RepoName
424
425 if repo.RepoDid != record.RepoDid {
426 tx, err := d.Begin()
427 if err != nil {
428 return fmt.Errorf("begin didAssign txn: %w", err)
429 }
430 defer tx.Rollback()
431
432 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil {
433 return fmt.Errorf("cascade repo_did: %w", err)
434 }
435
436 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil {
437 return fmt.Errorf("enqueue pds rewrites: %w", err)
438 }
439
440 if err := tx.Commit(); err != nil {
441 return fmt.Errorf("commit didAssign txn: %w", err)
442 }
443 }
444
445 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil {
446 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err)
447 }
448 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil {
449 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err)
450 }
451
452 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_at", repoAtUri))
453 if collabErr != nil {
454 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr)
455 }
456 for _, c := range collabs {
457 collabDid := c.SubjectDid.String()
458 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil {
459 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err)
460 }
461 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil {
462 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err)
463 }
464 }
465
466 logger.Info("didAssign processed successfully",
467 "repo_did", record.RepoDid,
468 "owner_did", record.OwnerDid,
469 "repo_name", record.RepoName)
470
471 return nil
472}