this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

automod: defer parsing (#585)

This shifts the external API for automod from taking a pre-parsed "any"
struct of a record, to taking CBOR bytes. the motivation is to make it
easier to work with content for which the lexicon isn't know at compile
time, which will soon become much more common as we allow folks to write
arbitrary record types to their repositories, and this content shows up
on the relay firehose.

The `atproto/data` package knows how to parse and extract some
information from generic atproto record CBOR, so it is able to pull all
the blobs (`$type: blob` objects nested within records), and we can
switch over to that, which is more generic (and hopefully robust) than
doing schema-by-schema extraction.

For firehose content, this is all pretty straight-forward, because the
records are already coming in as CBOR. in the capture system, we now do
schema-specific parsing, then re-encode as CBOR from the parsed struct,
which is kind of weird. and in tests records need to be marshaled to
CBOR, which is a bit unfortunate. maybe some helpers would make this
more idiomatic?

tested a small amount against prod firehose.

authored by

bnewbold and committed by
GitHub
447a6507 ef839105

+110 -103
+13 -2
automod/capture/fetch.go
··· 1 1 package capture 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "fmt" 6 7 ··· 35 36 return fmt.Errorf("expected a CID in getRecord response") 36 37 } 37 38 recCID := syntax.CID(*out.Cid) 39 + recBuf := new(bytes.Buffer) 40 + if err := out.Value.Val.MarshalCBOR(recBuf); err != nil { 41 + return err 42 + } 43 + recBytes := recBuf.Bytes() 38 44 op := automod.RecordOp{ 39 45 Action: automod.CreateOp, 40 46 DID: ident.DID, 41 47 Collection: aturi.Collection(), 42 48 RecordKey: aturi.RecordKey(), 43 49 CID: &recCID, 44 - Value: out.Value.Val, 50 + RecordCBOR: recBytes, 45 51 } 46 52 return eng.ProcessRecordOp(ctx, op) 47 53 } ··· 79 85 return fmt.Errorf("parsing PDS record response: %v", err) 80 86 } 81 87 recCID := syntax.CID(rec.Cid) 88 + recBuf := new(bytes.Buffer) 89 + if err := rec.Value.Val.MarshalCBOR(recBuf); err != nil { 90 + return err 91 + } 92 + recBytes := recBuf.Bytes() 82 93 op := automod.RecordOp{ 83 94 Action: automod.CreateOp, 84 95 DID: ident.DID, 85 96 Collection: aturi.Collection(), 86 97 RecordKey: aturi.RecordKey(), 87 98 CID: &recCID, 88 - Value: rec.Value.Val, 99 + RecordCBOR: recBytes, 89 100 } 90 101 err = eng.ProcessRecordOp(ctx, op) 91 102 if err != nil {
+7 -1
automod/capture/testing.go
··· 1 1 package capture 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "encoding/json" 6 7 "io" ··· 55 56 return err 56 57 } 57 58 recCID := syntax.CID(pr.Cid) 59 + recBuf := new(bytes.Buffer) 60 + if err := pr.Value.Val.MarshalCBOR(recBuf); err != nil { 61 + return err 62 + } 63 + recBytes := recBuf.Bytes() 58 64 eng.Logger.Debug("processing record", "did", did) 59 65 op := automod.RecordOp{ 60 66 Action: automod.CreateOp, ··· 62 68 Collection: aturi.Collection(), 63 69 RecordKey: aturi.RecordKey(), 64 70 CID: &recCID, 65 - Value: pr.Value.Val, 71 + RecordCBOR: recBytes, 66 72 } 67 73 eng.ProcessRecordOp(ctx, op) 68 74 }
+5 -1
automod/engine/action_dedupe_test.go
··· 1 1 package engine 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "testing" 6 7 ··· 30 31 //path := "app.bsky.feed.post/abc123" 31 32 cid1 := syntax.CID("cid123") 32 33 p1 := appbsky.FeedPost{Text: "some post blah"} 34 + p1buf := new(bytes.Buffer) 35 + assert.NoError(p1.MarshalCBOR(p1buf)) 36 + p1cbor := p1buf.Bytes() 33 37 id1 := identity.Identity{ 34 38 DID: syntax.DID("did:plc:abc111"), 35 39 Handle: syntax.Handle("handle.example.com"), ··· 42 46 Collection: "app.bsky.feed.post", 43 47 RecordKey: "abc123", 44 48 CID: &cid1, 45 - Value: &p1, 49 + RecordCBOR: p1cbor, 46 50 } 47 51 for i := 0; i < 5; i++ { 48 52 assert.NoError(eng.ProcessRecordOp(ctx, op))
+16 -67
automod/engine/blobs.go
··· 7 7 "strings" 8 8 "time" 9 9 10 - appbsky "github.com/bluesky-social/indigo/api/bsky" 10 + "github.com/bluesky-social/indigo/atproto/data" 11 11 lexutil "github.com/bluesky-social/indigo/lex/util" 12 12 13 13 "github.com/carlmjohnson/versioninfo" ··· 15 15 16 16 // Parses out any blobs from the enclosed record. 17 17 // 18 - // TODO: currently this function uses schema-specific logic, and won't work with generic lexicon records. A future version could use the indigo/atproto/data package and the raw record CBOR to extract blobs from arbitrary records 19 - // 20 - // NOTE: for consistency with other RecordContext methods, which don't usually return errors, maybe the error-returning version of this function should be a helper function, or definted on RecordOp, and the RecordContext version should return an empty array on error? 18 + // NOTE: for consistency with other RecordContext methods, which don't usually return errors, maybe the error-returning version of this function should be a helper function, or defined on RecordOp, and the RecordContext version should return an empty array on error? 21 19 func (c *RecordContext) Blobs() ([]lexutil.LexBlob, error) { 22 20 23 21 if c.RecordOp.Action == DeleteOp { 24 22 return []lexutil.LexBlob{}, nil 25 23 } 26 24 27 - var blobs []lexutil.LexBlob 25 + rec, err := data.UnmarshalCBOR(c.RecordOp.RecordCBOR) 26 + if err != nil { 27 + return nil, fmt.Errorf("parsing generic record CBOR: %v", err) 28 + } 29 + blobs := data.ExtractBlobs(rec) 28 30 29 - switch c.RecordOp.Collection.String() { 30 - case "app.bsky.feed.post": 31 - post, ok := c.RecordOp.Value.(*appbsky.FeedPost) 32 - if !ok { 33 - return nil, fmt.Errorf("mismatch between collection (%s) and type", c.RecordOp.Collection) 34 - } 35 - if post.Embed != nil && post.Embed.EmbedImages != nil { 36 - for _, eii := range post.Embed.EmbedImages.Images { 37 - if eii.Image != nil { 38 - blobs = append(blobs, *eii.Image) 39 - } 40 - } 31 + // convert from data.Blob to lexutil.LexBlob; plan is to merge these types eventually 32 + var out []lexutil.LexBlob 33 + for _, b := range blobs { 34 + lb := lexutil.LexBlob{ 35 + Ref: lexutil.LexLink(b.Ref), 36 + MimeType: b.MimeType, 37 + Size: b.Size, 41 38 } 42 - if post.Embed != nil && post.Embed.EmbedExternal != nil { 43 - ext := post.Embed.EmbedExternal.External 44 - if ext != nil && ext.Thumb != nil { 45 - blobs = append(blobs, *ext.Thumb) 46 - } 47 - } 48 - if post.Embed != nil && post.Embed.EmbedRecordWithMedia != nil { 49 - media := post.Embed.EmbedRecordWithMedia.Media 50 - if media != nil && media.EmbedImages != nil { 51 - for _, eii := range media.EmbedImages.Images { 52 - if eii.Image != nil { 53 - blobs = append(blobs, *eii.Image) 54 - } 55 - } 56 - } 57 - if media != nil && media.EmbedExternal != nil { 58 - ext := media.EmbedExternal.External 59 - if ext != nil && ext.Thumb != nil { 60 - blobs = append(blobs, *ext.Thumb) 61 - } 62 - } 63 - } 64 - case "app.bsky.actor.profile": 65 - profile, ok := c.RecordOp.Value.(*appbsky.ActorProfile) 66 - if !ok { 67 - return nil, fmt.Errorf("mismatch between collection (%s) and type", c.RecordOp.Collection) 68 - } 69 - if profile.Avatar != nil { 70 - blobs = append(blobs, *profile.Avatar) 71 - } 72 - if profile.Banner != nil { 73 - blobs = append(blobs, *profile.Banner) 74 - } 75 - case "app.bsky.graph.list": 76 - list, ok := c.RecordOp.Value.(*appbsky.GraphList) 77 - if !ok { 78 - return nil, fmt.Errorf("mismatch between collection (%s) and type", c.RecordOp.Collection) 79 - } 80 - if list.Avatar != nil { 81 - blobs = append(blobs, *list.Avatar) 82 - } 83 - case "app.bsky.feed.generator": 84 - generator, ok := c.RecordOp.Value.(*appbsky.FeedGenerator) 85 - if !ok { 86 - return nil, fmt.Errorf("mismatch between collection (%s) and type", c.RecordOp.Collection) 87 - } 88 - if generator.Avatar != nil { 89 - blobs = append(blobs, *generator.Avatar) 90 - } 39 + out = append(out, lb) 91 40 } 92 - return blobs, nil 41 + return out, nil 93 42 } 94 43 95 44 func (c *RecordContext) fetchBlob(blob lexutil.LexBlob) ([]byte, error) {
+9 -2
automod/engine/circuit_breaker_test.go
··· 1 1 package engine 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "fmt" 6 7 "testing" ··· 38 39 39 40 cid1 := syntax.CID("cid123") 40 41 p1 := appbsky.FeedPost{Text: "some post blah"} 42 + p1buf := new(bytes.Buffer) 43 + assert.NoError(p1.MarshalCBOR(p1buf)) 44 + p1cbor := p1buf.Bytes() 41 45 42 46 // generate double the quote of events; expect to only count the quote worth of actions 43 47 for i := 0; i < 2*QuotaModTakedownDay; i++ { ··· 52 56 Collection: syntax.NSID("app.bsky.feed.post"), 53 57 RecordKey: syntax.RecordKey("abc123"), 54 58 CID: &cid1, 55 - Value: &p1, 59 + RecordCBOR: p1cbor, 56 60 } 57 61 assert.NoError(eng.ProcessRecordOp(ctx, op)) 58 62 } ··· 80 84 81 85 cid1 := syntax.CID("cid123") 82 86 p1 := appbsky.FeedPost{Text: "some post blah"} 87 + p1buf := new(bytes.Buffer) 88 + assert.NoError(p1.MarshalCBOR(p1buf)) 89 + p1cbor := p1buf.Bytes() 83 90 84 91 // generate double the quota of events; expect to only count the quota worth of actions 85 92 for i := 0; i < 2*QuotaModReportDay; i++ { ··· 94 101 Collection: syntax.NSID("app.bsky.feed.post"), 95 102 RecordKey: syntax.RecordKey("abc123"), 96 103 CID: &cid1, 97 - Value: &p1, 104 + RecordCBOR: p1cbor, 98 105 } 99 106 assert.NoError(eng.ProcessRecordOp(ctx, op)) 100 107 }
+3 -4
automod/engine/context.go
··· 52 52 Collection syntax.NSID 53 53 RecordKey syntax.RecordKey 54 54 CID *syntax.CID 55 - // NOTE: usually a *pointer*, not the value itself 56 - Value any 55 + RecordCBOR []byte 57 56 } 58 57 59 58 // Originally intended for push notifications, but can also work for any inter-account notification. ··· 76 75 func (op *RecordOp) Validate() error { 77 76 switch op.Action { 78 77 case CreateOp, UpdateOp: 79 - if op.Value == nil || op.CID == nil { 78 + if op.RecordCBOR == nil || op.CID == nil { 80 79 return fmt.Errorf("expected record create/update op to contain both value and CID") 81 80 } 82 81 case DeleteOp: 83 - if op.Value != nil || op.CID != nil { 82 + if op.RecordCBOR != nil || op.CID != nil { 84 83 return fmt.Errorf("expected record delete op to be empty") 85 84 } 86 85 default:
+10 -2
automod/engine/engine_test.go
··· 1 1 package engine 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "testing" 6 7 ··· 24 25 p1 := appbsky.FeedPost{ 25 26 Text: "some post blah", 26 27 } 28 + p1buf := new(bytes.Buffer) 29 + assert.NoError(p1.MarshalCBOR(p1buf)) 30 + p1cbor := p1buf.Bytes() 31 + 27 32 op := RecordOp{ 28 33 Action: CreateOp, 29 34 DID: id1.DID, 30 35 Collection: syntax.NSID("app.bsky.feed.post"), 31 36 RecordKey: syntax.RecordKey("abc123"), 32 37 CID: &cid1, 33 - Value: &p1, 38 + RecordCBOR: p1cbor, 34 39 } 35 40 assert.NoError(eng.ProcessRecordOp(ctx, op)) 36 41 ··· 38 43 Text: "some post blah", 39 44 Tags: []string{"one", "slur"}, 40 45 } 41 - op.Value = &p2 46 + p2buf := new(bytes.Buffer) 47 + assert.NoError(p2.MarshalCBOR(p2buf)) 48 + p2cbor := p2buf.Bytes() 49 + op.RecordCBOR = p2cbor 42 50 assert.NoError(eng.ProcessRecordOp(ctx, op)) 43 51 }
+9 -8
automod/engine/ruleset.go
··· 1 1 package engine 2 2 3 3 import ( 4 + "bytes" 4 5 "fmt" 5 6 "sync" 6 7 ··· 31 32 // then any record-type-specific rules 32 33 switch c.RecordOp.Collection.String() { 33 34 case "app.bsky.feed.post": 34 - post, ok := c.RecordOp.Value.(*appbsky.FeedPost) 35 - if !ok { 36 - return fmt.Errorf("mismatch between collection (%s) and type", c.RecordOp.Collection) 35 + var post appbsky.FeedPost 36 + if err := post.UnmarshalCBOR(bytes.NewReader(c.RecordOp.RecordCBOR)); err != nil { 37 + return fmt.Errorf("failed to parse app.bsky.feed.post record: %v", err) 37 38 } 38 39 for _, f := range r.PostRules { 39 - err := f(c, post) 40 + err := f(c, &post) 40 41 if err != nil { 41 42 c.Logger.Error("post rule execution failed", "err", err) 42 43 } 43 44 } 44 45 case "app.bsky.actor.profile": 45 - profile, ok := c.RecordOp.Value.(*appbsky.ActorProfile) 46 - if !ok { 47 - return fmt.Errorf("mismatch between collection (%s) and type", c.RecordOp.Collection) 46 + var profile appbsky.ActorProfile 47 + if err := profile.UnmarshalCBOR(bytes.NewReader(c.RecordOp.RecordCBOR)); err != nil { 48 + return fmt.Errorf("failed to parse app.bsky.actor.profile record: %v", err) 48 49 } 49 50 for _, f := range r.ProfileRules { 50 - err := f(c, profile) 51 + err := f(c, &profile) 51 52 if err != nil { 52 53 c.Logger.Error("profile rule execution failed", "err", err) 53 54 }
+9 -2
automod/rules/hashtags_test.go
··· 1 1 package rules 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "testing" 6 7 ··· 28 29 p1 := appbsky.FeedPost{ 29 30 Text: "some post blah", 30 31 } 32 + p1buf := new(bytes.Buffer) 33 + assert.NoError(p1.MarshalCBOR(p1buf)) 34 + p1cbor := p1buf.Bytes() 31 35 op := engine.RecordOp{ 32 36 Action: engine.CreateOp, 33 37 DID: am1.Identity.DID, 34 38 Collection: syntax.NSID("app.bsky.feed.post"), 35 39 RecordKey: syntax.RecordKey("abc123"), 36 40 CID: &cid1, 37 - Value: p1, 41 + RecordCBOR: p1cbor, 38 42 } 39 43 c1 := engine.NewRecordContext(ctx, &eng, am1, op) 40 44 assert.NoError(BadHashtagsPostRule(&c1, &p1)) ··· 45 49 Text: "some post blah", 46 50 Tags: []string{"one", "slur"}, 47 51 } 48 - op.Value = p2 52 + p2buf := new(bytes.Buffer) 53 + assert.NoError(p2.MarshalCBOR(p2buf)) 54 + p2cbor := p2buf.Bytes() 55 + op.RecordCBOR = p2cbor 49 56 c2 := engine.NewRecordContext(ctx, &eng, am1, op) 50 57 assert.NoError(BadHashtagsPostRule(&c2, &p2)) 51 58 eff2 := engine.ExtractEffects(&c2.BaseContext)
+7 -6
automod/rules/keyword.go
··· 1 1 package rules 2 2 3 3 import ( 4 + "bytes" 4 5 "fmt" 5 6 "strings" 6 7 ··· 82 83 text := "" 83 84 switch c.RecordOp.Collection.String() { 84 85 case "app.bsky.graph.list": 85 - list, ok := c.RecordOp.Value.(*appbsky.GraphList) 86 - if !ok { 87 - return fmt.Errorf("mismatch between collection (%s) and type", c.RecordOp.Collection) 86 + var list appbsky.GraphList 87 + if err := list.UnmarshalCBOR(bytes.NewReader(c.RecordOp.RecordCBOR)); err != nil { 88 + return fmt.Errorf("failed to parse app.bsky.graph.list record: %v", err) 88 89 } 89 90 name += " " + list.Name 90 91 if list.Description != nil { ··· 94 95 text += " " + *list.Purpose 95 96 } 96 97 case "app.bsky.feed.generator": 97 - generator, ok := c.RecordOp.Value.(*appbsky.FeedGenerator) 98 - if !ok { 99 - return fmt.Errorf("mismatch between collection (%s) and type", c.RecordOp.Collection) 98 + var generator appbsky.FeedGenerator 99 + if err := generator.UnmarshalCBOR(bytes.NewReader(c.RecordOp.RecordCBOR)); err != nil { 100 + return fmt.Errorf("failed to parse app.bsky.feed.generator record: %v", err) 100 101 } 101 102 name += " " + generator.DisplayName 102 103 if generator.Description != nil {
+9 -2
automod/rules/keyword_test.go
··· 1 1 package rules 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "testing" 6 7 ··· 70 71 p1 := appbsky.FeedPost{ 71 72 Text: "some post blah", 72 73 } 74 + p1buf := new(bytes.Buffer) 75 + assert.NoError(p1.MarshalCBOR(p1buf)) 76 + p1cbor := p1buf.Bytes() 73 77 op := engine.RecordOp{ 74 78 Action: engine.CreateOp, 75 79 DID: am1.Identity.DID, 76 80 Collection: syntax.NSID("app.bsky.feed.post"), 77 81 RecordKey: syntax.RecordKey("fagg0t"), 78 82 CID: &cid1, 79 - Value: p1, 83 + RecordCBOR: p1cbor, 80 84 } 81 85 c1 := engine.NewRecordContext(ctx, &eng, am1, op) 82 86 assert.NoError(BadWordRecordKeyRule(&c1)) ··· 87 91 p2 := appbsky.FeedPost{ 88 92 Text: "some post hardestr blah", 89 93 } 94 + p2buf := new(bytes.Buffer) 95 + assert.NoError(p2.MarshalCBOR(p2buf)) 96 + p2cbor := p2buf.Bytes() 90 97 op2 := engine.RecordOp{ 91 98 Action: engine.CreateOp, 92 99 DID: am1.Identity.DID, 93 100 Collection: syntax.NSID("app.bsky.feed.post"), 94 101 RecordKey: syntax.RecordKey("abc123"), 95 102 CID: &cid1, 96 - Value: p1, 103 + RecordCBOR: p2cbor, 97 104 } 98 105 c2 := engine.NewRecordContext(ctx, &eng, am1, op2) 99 106 assert.NoError(BadWordPostRule(&c2, &p2))
+9 -2
automod/rules/misleading_test.go
··· 1 1 package rules 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "log/slog" 6 7 "testing" ··· 44 45 }, 45 46 }, 46 47 } 48 + p1buf := new(bytes.Buffer) 49 + assert.NoError(p1.MarshalCBOR(p1buf)) 50 + p1cbor := p1buf.Bytes() 47 51 op := engine.RecordOp{ 48 52 Action: engine.CreateOp, 49 53 DID: am1.Identity.DID, 50 54 Collection: syntax.NSID("app.bsky.feed.post"), 51 55 RecordKey: syntax.RecordKey("abc123"), 52 56 CID: &cid1, 53 - Value: p1, 57 + RecordCBOR: p1cbor, 54 58 } 55 59 c1 := engine.NewRecordContext(ctx, &eng, am1, op) 56 60 assert.NoError(MisleadingURLPostRule(&c1, &p1)) ··· 88 92 }, 89 93 }, 90 94 } 95 + p1buf := new(bytes.Buffer) 96 + assert.NoError(p1.MarshalCBOR(p1buf)) 97 + p1cbor := p1buf.Bytes() 91 98 op := engine.RecordOp{ 92 99 Action: engine.CreateOp, 93 100 DID: am1.Identity.DID, 94 101 Collection: syntax.NSID("app.bsky.feed.post"), 95 102 RecordKey: syntax.RecordKey("abc123"), 96 103 CID: &cid1, 97 - Value: p1, 104 + RecordCBOR: p1cbor, 98 105 } 99 106 c1 := engine.NewRecordContext(ctx, &eng, am1, op) 100 107 assert.NoError(MisleadingMentionPostRule(&c1, &p1))
+4 -4
cmd/hepa/consumer.go
··· 161 161 ek := repomgr.EventKind(op.Action) 162 162 switch ek { 163 163 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 164 - // read the record from blocks, and verify CID 165 - rc, rec, err := rr.GetRecord(ctx, op.Path) 164 + // read the record bytes from blocks, and verify CID 165 + rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path) 166 166 if err != nil { 167 167 logger.Error("reading record from event blocks (CAR)", "err", err) 168 168 break ··· 188 188 Collection: collection, 189 189 RecordKey: rkey, 190 190 CID: &recCID, 191 - Value: rec, 191 + RecordCBOR: *recCBOR, 192 192 }) 193 193 if err != nil { 194 194 logger.Error("engine failed to process record", "err", err) ··· 201 201 Collection: collection, 202 202 RecordKey: rkey, 203 203 CID: nil, 204 - Value: nil, 204 + RecordCBOR: nil, 205 205 }) 206 206 if err != nil { 207 207 logger.Error("engine failed to process record", "err", err)