this repo has no description
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(stars): account for different shapes #3

open opened by chown.de targeting main from fix-stars

the sh.tangled.feed.star lexicon can apparently come in different shapes. yay, decentralized firehose. this one decodes to so we can continue to backfill.

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:x3ni2r3jgdqms5euhzt2qqdr/sh.tangled.repo.pull/3mlbwqa3fux22
+117 -19
Diff #0
+13 -14
firehose/handler.go
··· 25 25 CreatedAt string `json:"createdAt"` 26 26 } 27 27 28 - // starRecord is the on-wire shape of an sh.tangled.feed.star record. 29 - type starRecord struct { 30 - Subject string `json:"subject"` 31 - CreatedAt string `json:"createdAt"` 32 - } 33 - 34 28 // LanguagesEnqueuer accepts (atURI, knot, did, name) for async language 35 29 // enrichment. *LanguagesWorker satisfies this; pass nil to disable enrichment. 36 30 type LanguagesEnqueuer interface { ··· 133 127 atURI := fmt.Sprintf("at://%s/%s/%s", ev.Did, ev.Commit.Collection, ev.Commit.RKey) 134 128 switch ev.Commit.Operation { 135 129 case "create", "update": 136 - var rec starRecord 130 + var rec StarRecord 137 131 if err := json.Unmarshal(ev.Commit.Record, &rec); err != nil { 138 132 return fmt.Errorf("decode star record: %w", err) 139 133 } 140 - if rec.Subject == "" { 134 + // Subject is either a repo at-uri (string form) or a DID (object 135 + // form). Both shapes get the same treatment. 136 + subject := rec.Subject 137 + if subject == "" { 138 + subject = rec.SubjectDID 139 + } 140 + if subject == "" { 141 141 return s.AdvanceCursor(ctx, ev.TimeUS) 142 142 } 143 143 createdAt, err := time.Parse(time.RFC3339Nano, rec.CreatedAt) 144 144 if err != nil { 145 145 createdAt = time.UnixMicro(ev.TimeUS) 146 146 } 147 - if err := s.ApplyStarCreate(ctx, atURI, rec.Subject, createdAt, ev.TimeUS); err != nil { 147 + if err := s.ApplyStarCreate(ctx, atURI, subject, createdAt, ev.TimeUS); err != nil { 148 148 return err 149 149 } 150 - logger.Info("star", "by", ev.Did, "subject", rec.Subject) 151 - // If the starred repo isn't in our index yet, queue discovery. The 152 - // worker checks HasRepo before fetching, so duplicates are cheap. 150 + logger.Info("star", "by", ev.Did, "subject", subject) 151 + // Discovery validates input and drops non-at-uris, so passing a DID 152 + // is a safe no-op for the object form. 153 153 if discovery != nil { 154 - discovery.EnqueueAtURI(rec.Subject) 154 + discovery.EnqueueAtURI(subject) 155 155 } 156 - // First time we see this user — also pull their historical stars. 157 156 if starsBackfill != nil { 158 157 starsBackfill.Enqueue(ev.Did) 159 158 }
+43
firehose/star.go
··· 1 + package firehose 2 + 3 + import "encoding/json" 4 + 5 + // StarRecord is the sh.tangled.feed.star lexicon. Subject appears in two 6 + // shapes on the wire — a string at-uri pointing at a repo, or an object 7 + // {"$type":"sh.tangled.feed.star#repo","did":"..."} that stars an account. 8 + // We capture both: Subject for the at-uri form, SubjectDID for the account 9 + // form (also populated from the top-level subjectDid sibling when present). 10 + type StarRecord struct { 11 + Subject string 12 + SubjectDID string 13 + CreatedAt string 14 + } 15 + 16 + func (r *StarRecord) UnmarshalJSON(data []byte) error { 17 + var raw struct { 18 + Subject json.RawMessage `json:"subject"` 19 + SubjectDID string `json:"subjectDid"` 20 + CreatedAt string `json:"createdAt"` 21 + } 22 + if err := json.Unmarshal(data, &raw); err != nil { 23 + return err 24 + } 25 + r.SubjectDID = raw.SubjectDID 26 + r.CreatedAt = raw.CreatedAt 27 + if len(raw.Subject) == 0 || string(raw.Subject) == "null" { 28 + return nil 29 + } 30 + if raw.Subject[0] == '"' { 31 + return json.Unmarshal(raw.Subject, &r.Subject) 32 + } 33 + var obj struct { 34 + DID string `json:"did"` 35 + } 36 + if err := json.Unmarshal(raw.Subject, &obj); err != nil { 37 + return err 38 + } 39 + if obj.DID != "" { 40 + r.SubjectDID = obj.DID 41 + } 42 + return nil 43 + }
+55
firehose/star_test.go
··· 1 + package firehose_test 2 + 3 + import ( 4 + "encoding/json" 5 + "testing" 6 + 7 + "github.com/stretchr/testify/assert" 8 + "github.com/stretchr/testify/require" 9 + 10 + "tangled.sh/chown.de/tangled-repo-firehose/firehose" 11 + ) 12 + 13 + func TestStarRecordUnmarshal(t *testing.T) { 14 + tests := []struct { 15 + name string 16 + input string 17 + wantSubject string 18 + wantSubjectDID string 19 + wantErr bool 20 + }{ 21 + { 22 + name: "string subject (at-uri form) with sibling subjectDid", 23 + input: `{"$type":"sh.tangled.feed.star","createdAt":"2026-05-07T17:41:40+03:00","subject":"at://did:plc:thuylqmisypnmekwzfgymm3z/sh.tangled.repo/3mkzm63u2t322","subjectDid":"did:plc:ycef6t5dwrcmno7brso45xgd"}`, 24 + wantSubject: "at://did:plc:thuylqmisypnmekwzfgymm3z/sh.tangled.repo/3mkzm63u2t322", 25 + wantSubjectDID: "did:plc:ycef6t5dwrcmno7brso45xgd", 26 + }, 27 + { 28 + name: "object subject (#repo form) extracts did", 29 + input: `{"$type":"sh.tangled.feed.star","createdAt":"2026-05-07T13:33:02Z","subject":{"$type":"sh.tangled.feed.star#repo","did":"did:plc:di4gol2smljyj6gjnjdu5qrg"}}`, 30 + wantSubjectDID: "did:plc:di4gol2smljyj6gjnjdu5qrg", 31 + }, 32 + { 33 + name: "missing subject decodes to empty", 34 + input: `{"$type":"sh.tangled.feed.star","createdAt":"2026-05-07T13:33:02Z"}`, 35 + }, 36 + { 37 + name: "malformed subject string fails", 38 + input: `{"subject":"unterminated`, 39 + wantErr: true, 40 + }, 41 + } 42 + for _, tt := range tests { 43 + t.Run(tt.name, func(t *testing.T) { 44 + var rec firehose.StarRecord 45 + err := json.Unmarshal([]byte(tt.input), &rec) 46 + if tt.wantErr { 47 + require.Error(t, err) 48 + return 49 + } 50 + require.NoError(t, err) 51 + assert.Equal(t, tt.wantSubject, rec.Subject) 52 + assert.Equal(t, tt.wantSubjectDID, rec.SubjectDID) 53 + }) 54 + } 55 + }
+5 -4
firehose/starsbackfill.go
··· 71 71 return err 72 72 } 73 73 for _, r := range records { 74 - if r.Value.Subject == "" { 74 + subject := r.Value.Subject 75 + if subject == "" { 75 76 continue 76 77 } 77 78 createdAt, err := time.Parse(time.RFC3339Nano, r.Value.CreatedAt) 78 79 if err != nil { 79 80 createdAt = time.Now() 80 81 } 81 - if err := w.store.UpsertStar(ctx, r.URI, r.Value.Subject, createdAt); err != nil { 82 + if err := w.store.UpsertStar(ctx, r.URI, subject, createdAt); err != nil { 82 83 w.logger.Warn("upsert star", "err", err, "uri", r.URI) 83 84 continue 84 85 } ··· 86 87 if w.discovery != nil { 87 88 // Block on backpressure — a single user can have hundreds of 88 89 // stars and discovery is rate-limited per PDS call. 89 - w.discovery.EnqueueAtURIWait(ctx, r.Value.Subject) 90 + w.discovery.EnqueueAtURIWait(ctx, subject) 90 91 } 91 92 } 92 93 if next == "" || next == cursor || len(records) == 0 { ··· 103 104 type listRecordEntry struct { 104 105 URI string `json:"uri"` 105 106 CID string `json:"cid"` 106 - Value starRecord `json:"value"` 107 + Value StarRecord `json:"value"` 107 108 } 108 109 109 110 func (w *StarsBackfillWorker) listStars(ctx context.Context, xc *xrpc.Client, did, cursor string) ([]listRecordEntry, string, error) {
+1 -1
Makefile
··· 1 1 DB ?= firehose.db 2 - LISTEN ?= :8080 2 + LISTEN ?= :8282 3 3 4 4 .PHONY: run-dev build generate vet tidy clean 5 5

History

1 round 0 comments
sign up or login to add to the discussion
chown.de submitted #0
2 commits
expand
fix(stars): account for different shapes
chore(dev): use a port I don't use already
merge conflicts detected
expand
  • firehose/handler.go:25
  • firehose/starsbackfill.go:71
expand 0 comments