Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 472 lines 13 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "slices" 10 "time" 11 12 "tangled.org/core/appview/cloudflare" 13 "tangled.org/core/appview/notify" 14 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/appview/sites" 21 ec "tangled.org/core/eventconsumer" 22 "tangled.org/core/eventconsumer/cursor" 23 knotdb "tangled.org/core/knotserver/db" 24 "tangled.org/core/log" 25 "tangled.org/core/orm" 26 "tangled.org/core/rbac" 27 "tangled.org/core/workflow" 28 29 "github.com/bluesky-social/indigo/atproto/syntax" 30 "github.com/go-git/go-git/v5/plumbing" 31 "github.com/posthog/posthog-go" 32) 33 34func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) { 35 logger := log.FromContext(ctx) 36 logger = log.SubLogger(logger, "knotstream") 37 38 knots, err := db.GetRegistrations( 39 d, 40 orm.FilterIsNot("registered", "null"), 41 ) 42 if err != nil { 43 return nil, err 44 } 45 46 srcs := make(map[ec.Source]struct{}) 47 for _, k := range knots { 48 s := ec.NewKnotSource(k.Domain) 49 srcs[s] = struct{}{} 50 } 51 52 cache := cache.New(c.Redis.Addr) 53 cursorStore := cursor.NewRedisCursorStore(cache) 54 55 cfg := ec.ConsumerConfig{ 56 Sources: srcs, 57 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 58 RetryInterval: c.Knotstream.RetryInterval, 59 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 60 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 61 WorkerCount: c.Knotstream.WorkerCount, 62 QueueSize: c.Knotstream.QueueSize, 63 Logger: logger, 64 Dev: c.Core.Dev, 65 CursorStore: &cursorStore, 66 } 67 68 return ec.NewConsumer(cfg), nil 69} 70 71func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) { 72 if repoDid != nil && *repoDid != "" { 73 return db.GetRepoByDid(d, *repoDid) 74 } 75 repos, err := db.GetRepos(d, orm.FilterEq("did", ownerDid), orm.FilterEq("name", repoName)) 76 if err != nil { 77 return nil, err 78 } 79 if len(repos) == 0 { 80 return nil, sql.ErrNoRows 81 } 82 return &repos[0], nil 83} 84 85func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 86 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 87 switch msg.Nsid { 88 case tangled.GitRefUpdateNSID: 89 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg) 90 case tangled.PipelineNSID: 91 return ingestPipeline(d, source, msg) 92 case knotdb.RepoDIDAssignNSID: 93 return ingestDIDAssign(d, enforcer, source, msg, ctx) 94 } 95 96 return nil 97 } 98} 99 100func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg ec.Message) error { 101 logger := log.FromContext(ctx) 102 103 var record tangled.GitRefUpdate 104 err := json.Unmarshal(msg.EventJson, &record) 105 if err != nil { 106 return err 107 } 108 109 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 110 if err != nil { 111 return err 112 } 113 if !slices.Contains(knownKnots, source.Key()) { 114 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 115 } 116 117 ownerDid := "" 118 if record.OwnerDid != nil { 119 ownerDid = *record.OwnerDid 120 } else { 121 // handle legacy event 122 if record.RepoDid != nil { 123 ownerDid = *record.RepoDid 124 } 125 } 126 127 repo, lookupErr := resolveRepo(d, record.RepoDid, ownerDid, record.RepoName) 128 if lookupErr != nil { 129 return fmt.Errorf("failed to look up repo: %w", lookupErr) 130 } 131 132 logger.Info("processing gitRefUpdate event", 133 "repo", repo.RepoIdentifier(), 134 "ref", record.Ref, 135 "old_sha", record.OldSha, 136 "new_sha", record.NewSha) 137 138 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 139 140 errPunchcard := populatePunchcard(d, record) 141 errLanguages := updateRepoLanguages(d, record) 142 143 var errPosthog error 144 if !dev && record.CommitterDid != "" { 145 errPosthog = pc.Enqueue(posthog.Capture{ 146 DistinctId: record.CommitterDid, 147 Event: "git_ref_update", 148 }) 149 } 150 151 // Trigger a sites redeploy if this push is to the configured sites branch. 152 if cfClient.Enabled() { 153 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source) 154 } 155 156 return errors.Join(errPunchcard, errLanguages, errPosthog) 157} 158 159// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites 160// branch configured for this repo and, if so, syncs the site to R2 161func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, c *config.Config, record tangled.GitRefUpdate, source ec.Source) { 162 logger := log.FromContext(ctx) 163 164 ref := plumbing.ReferenceName(record.Ref) 165 if !ref.IsBranch() { 166 return 167 } 168 pushedBranch := ref.Short() 169 170 ownerDid := "" 171 if record.OwnerDid != nil { 172 ownerDid = *record.OwnerDid 173 } 174 175 repo, err := resolveRepo(d, record.RepoDid, ownerDid, record.RepoName) 176 if err != nil { 177 return 178 } 179 180 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoAt().String()) 181 if err != nil || siteConfig == nil { 182 return 183 } 184 if siteConfig.Branch != pushedBranch { 185 return 186 } 187 188 scheme := "https" 189 if c.Core.Dev { 190 scheme = "http" 191 } 192 knotHost := fmt.Sprintf("%s://%s", scheme, source.Key()) 193 194 deploy := &models.SiteDeploy{ 195 RepoAt: repo.RepoAt().String(), 196 Branch: siteConfig.Branch, 197 Dir: siteConfig.Dir, 198 CommitSHA: record.NewSha, 199 Trigger: models.SiteDeployTriggerPush, 200 } 201 202 deployErr := sites.Deploy(ctx, cfClient, knotHost, repo.RepoIdentifier(), record.RepoName, siteConfig.Branch, siteConfig.Dir) 203 if deployErr != nil { 204 logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr) 205 deploy.Status = models.SiteDeployStatusFailure 206 deploy.Error = deployErr.Error() 207 } else { 208 deploy.Status = models.SiteDeployStatusSuccess 209 } 210 211 if err := db.AddSiteDeploy(d, deploy); err != nil { 212 logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err) 213 } 214 215 if deployErr == nil { 216 logger.Info("site deployed to r2", "repo", repo.RepoIdentifier()) 217 } 218} 219 220func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 221 if record.CommitterDid == "" { 222 return nil 223 } 224 225 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 226 if err != nil { 227 return err 228 } 229 230 count := 0 231 for _, ke := range knownEmails { 232 if record.Meta == nil { 233 continue 234 } 235 if record.Meta.CommitCount == nil { 236 continue 237 } 238 for _, ce := range record.Meta.CommitCount.ByEmail { 239 if ce == nil { 240 continue 241 } 242 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 243 count += int(ce.Count) 244 } 245 } 246 } 247 248 punch := models.Punch{ 249 Did: record.CommitterDid, 250 Date: time.Now(), 251 Count: count, 252 } 253 return db.AddPunch(d, punch) 254} 255 256func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 257 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 258 return fmt.Errorf("empty language data for repo: %v/%s", record.OwnerDid, record.RepoName) 259 } 260 261 ownerDid := "" 262 if record.OwnerDid != nil { 263 ownerDid = *record.OwnerDid 264 } 265 266 r, lookupErr := resolveRepo(d, record.RepoDid, ownerDid, record.RepoName) 267 if lookupErr != nil { 268 return fmt.Errorf("failed to look up repo: %w", lookupErr) 269 } 270 repo := *r 271 272 ref := plumbing.ReferenceName(record.Ref) 273 if !ref.IsBranch() { 274 return fmt.Errorf("%s is not a valid reference name", ref) 275 } 276 277 var langs []models.RepoLanguage 278 for _, l := range record.Meta.LangBreakdown.Inputs { 279 if l == nil { 280 continue 281 } 282 283 langs = append(langs, models.RepoLanguage{ 284 RepoAt: repo.RepoAt(), 285 Ref: ref.Short(), 286 IsDefaultRef: record.Meta.IsDefaultRef, 287 Language: l.Lang, 288 Bytes: l.Size, 289 }) 290 } 291 292 tx, err := d.Begin() 293 if err != nil { 294 return err 295 } 296 defer tx.Rollback() 297 298 // update appview's cache 299 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 300 if err != nil { 301 fmt.Printf("failed; %s\n", err) 302 // non-fatal 303 } 304 305 return tx.Commit() 306} 307 308func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 309 var record tangled.Pipeline 310 err := json.Unmarshal(msg.EventJson, &record) 311 if err != nil { 312 return err 313 } 314 315 if record.TriggerMetadata == nil { 316 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 317 } 318 319 if record.TriggerMetadata.Repo == nil { 320 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 321 } 322 323 repoName := "" 324 if record.TriggerMetadata.Repo.Repo != nil { 325 repoName = *record.TriggerMetadata.Repo.Repo 326 } 327 328 repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName) 329 if lookupErr != nil { 330 return fmt.Errorf("failed to look up repo: %w", lookupErr) 331 } 332 if repo.Spindle == "" { 333 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 334 } 335 336 // trigger info 337 var trigger models.Trigger 338 var sha string 339 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 340 switch trigger.Kind { 341 case workflow.TriggerKindPush: 342 trigger.PushRef = &record.TriggerMetadata.Push.Ref 343 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 344 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 345 sha = *trigger.PushNewSha 346 case workflow.TriggerKindPullRequest: 347 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 348 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 349 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 350 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 351 sha = *trigger.PRSourceSha 352 } 353 354 tx, err := d.Begin() 355 if err != nil { 356 return fmt.Errorf("failed to start txn: %w", err) 357 } 358 359 triggerId, err := db.AddTrigger(tx, trigger) 360 if err != nil { 361 return fmt.Errorf("failed to add trigger entry: %w", err) 362 } 363 364 pipeline := models.Pipeline{ 365 Rkey: msg.Rkey, 366 Knot: source.Key(), 367 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 368 RepoName: repoName, 369 RepoDid: repo.RepoDid, 370 TriggerId: int(triggerId), 371 Sha: sha, 372 } 373 374 err = db.AddPipeline(tx, pipeline) 375 if err != nil { 376 return fmt.Errorf("failed to add pipeline: %w", err) 377 } 378 379 err = tx.Commit() 380 if err != nil { 381 return fmt.Errorf("failed to commit txn: %w", err) 382 } 383 384 return nil 385} 386 387func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg ec.Message, ctx context.Context) error { 388 logger := log.FromContext(ctx) 389 390 var record knotdb.RepoDIDAssign 391 if err := json.Unmarshal(msg.EventJson, &record); err != nil { 392 return fmt.Errorf("unmarshal didAssign: %w", err) 393 } 394 395 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" { 396 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q", 397 record.RepoDid, record.OwnerDid, record.RepoName) 398 } 399 400 logger.Info("processing didAssign event", 401 "repo_did", record.RepoDid, 402 "owner_did", record.OwnerDid, 403 "repo_name", record.RepoName) 404 405 repos, err := db.GetRepos(d, 406 orm.FilterEq("did", record.OwnerDid), 407 orm.FilterEq("name", record.RepoName), 408 ) 409 if err != nil || len(repos) == 0 { 410 logger.Warn("didAssign for unknown repo, skipping", 411 "owner_did", record.OwnerDid, 412 "repo_name", record.RepoName) 413 return nil 414 } 415 repo := repos[0] 416 knot := source.Key() 417 418 if repo.Knot != knot { 419 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot) 420 } 421 422 repoAtUri := repo.RepoAt().String() 423 legacyResource := record.OwnerDid + "/" + record.RepoName 424 425 if repo.RepoDid != record.RepoDid { 426 tx, err := d.Begin() 427 if err != nil { 428 return fmt.Errorf("begin didAssign txn: %w", err) 429 } 430 defer tx.Rollback() 431 432 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil { 433 return fmt.Errorf("cascade repo_did: %w", err) 434 } 435 436 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil { 437 return fmt.Errorf("enqueue pds rewrites: %w", err) 438 } 439 440 if err := tx.Commit(); err != nil { 441 return fmt.Errorf("commit didAssign txn: %w", err) 442 } 443 } 444 445 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil { 446 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err) 447 } 448 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil { 449 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err) 450 } 451 452 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_at", repoAtUri)) 453 if collabErr != nil { 454 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr) 455 } 456 for _, c := range collabs { 457 collabDid := c.SubjectDid.String() 458 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil { 459 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err) 460 } 461 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil { 462 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err) 463 } 464 } 465 466 logger.Info("didAssign processed successfully", 467 "repo_did", record.RepoDid, 468 "owner_did", record.OwnerDid, 469 "repo_name", record.RepoName) 470 471 return nil 472}