this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

labeling: split out persist/event code into commit helper

+81 -44
+74
labeling/commit.go
··· 1 + package labeling 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + "time" 8 + 9 + label "github.com/bluesky-social/indigo/api/label" 10 + "github.com/bluesky-social/indigo/events" 11 + "github.com/bluesky-social/indigo/models" 12 + util "github.com/bluesky-social/indigo/util" 13 + 14 + "gorm.io/gorm/clause" 15 + ) 16 + 17 + // Persist to database (and repo), and emit events. 18 + func (s *Server) CommitLabels(ctx context.Context, labels []label.Label, negate bool) error { 19 + 20 + now := time.Now() 21 + nowStr := now.Format(util.ISO8601) 22 + var labelRows []models.Label 23 + 24 + for _, l := range labels { 25 + l.Cts = nowStr 26 + 27 + path, _, err := s.repoman.CreateRecord(ctx, s.user.UserId, "com.atproto.label.label", &l) 28 + if err != nil { 29 + return fmt.Errorf("failed to persist label in local repo: %w", err) 30 + } 31 + labelUri := "at://" + s.user.Did + "/" + path 32 + log.Infof("persisted label in repo: %s", labelUri) 33 + 34 + rkey := strings.SplitN(path, ":", 2)[1] 35 + lr := models.Label{ 36 + Uri: l.Uri, 37 + SourceDid: l.Src, 38 + Cid: l.Cid, 39 + Val: l.Val, 40 + RepoRKey: &rkey, 41 + CreatedAt: now, 42 + } 43 + if negate { 44 + lr.NegatedAt = now 45 + } 46 + labelRows = append(labelRows, lr) 47 + } 48 + 49 + // ... and database ... 50 + if len(labelRows) > 0 { 51 + // TODO(bnewbold): don't clobber action labels (aka, human interventions) 52 + res := s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(&labelRows) 53 + if res.Error != nil { 54 + return res.Error 55 + } 56 + } 57 + 58 + // ... then re-publish as XRPCStreamEvent 59 + if len(labels) > 0 { 60 + log.Infof("broadcasting labels: %s", labels) 61 + lev := events.XRPCStreamEvent{ 62 + LabelBatch: &events.LabelBatch{ 63 + // NOTE(bnewbold): generic event handler code handles Seq field for us 64 + Labels: labels, 65 + }, 66 + } 67 + err := s.evtmgr.AddEvent(ctx, &lev) 68 + if err != nil { 69 + return fmt.Errorf("failed to publish XRPCStreamEvent: %w", err) 70 + } 71 + } 72 + 73 + return nil 74 + }
+6 -44
labeling/service.go
··· 25 25 util "github.com/bluesky-social/indigo/util" 26 26 cbg "github.com/whyrusleeping/cbor-gen" 27 27 28 + "github.com/ipfs/go-cid" 28 29 logging "github.com/ipfs/go-log" 29 30 "github.com/labstack/echo/v4" 30 31 "github.com/labstack/echo/v4/middleware" ··· 339 340 return err 340 341 } 341 342 342 - now := time.Now() 343 - nowStr := now.Format(util.ISO8601) 344 343 labels := []label.Label{} 345 - 346 344 for _, op := range evt.RepoCommit.Ops { 347 345 uri := "at://" + evt.RepoCommit.Repo + "/" + op.Path 348 346 nsid := strings.SplitN(op.Path, "/", 2)[0] ··· 368 366 Src: s.user.Did, 369 367 Uri: "at://" + evt.RepoCommit.Repo, 370 368 Val: val, 371 - Cts: nowStr, 369 + //Cts 372 370 }) 373 371 } else { 374 372 labels = append(labels, label.Label{ ··· 376 374 Uri: uri, 377 375 Cid: &cidStr, 378 376 Val: val, 379 - Cts: nowStr, 377 + //Cts 380 378 }) 381 379 } 382 380 } 383 381 } 384 382 385 - // if any labels generated, persist them to repo... 386 - for _, l := range labels { 387 - path, _, err := s.repoman.CreateRecord(ctx, s.user.UserId, "com.atproto.label.label", &l) 388 - if err != nil { 389 - return fmt.Errorf("failed to persist label in local repo: %w", err) 390 - } 391 - labeluri := "at://" + s.user.Did + "/" + path 392 - log.Infof("persisted label: %s", labeluri) 383 + // persist and emit events, as needed 384 + if err := s.CommitLabels(ctx, labels, false); err != nil { 385 + return err 393 386 } 394 387 395 - // ... and database ... 396 - var labelRows []models.Label 397 - for _, l := range labels { 398 - lr := models.Label{ 399 - Uri: l.Uri, 400 - SourceDid: l.Src, 401 - Cid: l.Cid, 402 - Val: l.Val, 403 - CreatedAt: now, 404 - } 405 - labelRows = append(labelRows, lr) 406 - } 407 - if len(labelRows) > 0 { 408 - // TODO(bnewbold): don't clobber action labels (aka, human interventions) 409 - s.db.Clauses(clause.OnConflict{DoNothing: true}).Create(&labelRows) 410 - } 411 - 412 - // ... then re-publish as XRPCStreamEvent 413 - log.Infof("broadcasting labels: %s", labels) 414 - if len(labels) > 0 { 415 - lev := events.XRPCStreamEvent{ 416 - LabelBatch: &events.LabelBatch{ 417 - // NOTE(bnewbold): seems like other code handles Seq field automatically 418 - Labels: labels, 419 - }, 420 - } 421 - err = s.evtmgr.AddEvent(ctx, &lev) 422 - if err != nil { 423 - return fmt.Errorf("failed to publish XRPCStreamEvent: %w", err) 424 - } 425 - } 426 388 // TODO(bnewbold): persist state that we successfully processed the repo event (aka, 427 389 // persist "last" seq in database, or something like that). also above, at 428 390 // the short-circuit
+1
models/models.go
··· 133 133 Cid *string `gorm:"uniqueIndex:idx_uri_src_val_cid"` 134 134 RepoRKey *string `gorm:"uniqueIndex:idx_src_rkey"` 135 135 CreatedAt time.Time 136 + NegatedAt time.Time 136 137 UpdatedAt time.Time 137 138 }