the sh.tangled.feed.star lexicon can apparently come in different shapes. yay, decentralized firehose. this one decodes to so we can continue to backfill.
+117
-19
Diff
round #0
+13
-14
firehose/handler.go
+13
-14
firehose/handler.go
···
25
25
CreatedAt string `json:"createdAt"`
26
26
}
27
27
28
-
// starRecord is the on-wire shape of an sh.tangled.feed.star record.
29
-
type starRecord struct {
30
-
Subject string `json:"subject"`
31
-
CreatedAt string `json:"createdAt"`
32
-
}
33
-
34
28
// LanguagesEnqueuer accepts (atURI, knot, did, name) for async language
35
29
// enrichment. *LanguagesWorker satisfies this; pass nil to disable enrichment.
36
30
type LanguagesEnqueuer interface {
···
133
127
atURI := fmt.Sprintf("at://%s/%s/%s", ev.Did, ev.Commit.Collection, ev.Commit.RKey)
134
128
switch ev.Commit.Operation {
135
129
case "create", "update":
136
-
var rec starRecord
130
+
var rec StarRecord
137
131
if err := json.Unmarshal(ev.Commit.Record, &rec); err != nil {
138
132
return fmt.Errorf("decode star record: %w", err)
139
133
}
140
-
if rec.Subject == "" {
134
+
// Subject is either a repo at-uri (string form) or a DID (object
135
+
// form). Both shapes get the same treatment.
136
+
subject := rec.Subject
137
+
if subject == "" {
138
+
subject = rec.SubjectDID
139
+
}
140
+
if subject == "" {
141
141
return s.AdvanceCursor(ctx, ev.TimeUS)
142
142
}
143
143
createdAt, err := time.Parse(time.RFC3339Nano, rec.CreatedAt)
144
144
if err != nil {
145
145
createdAt = time.UnixMicro(ev.TimeUS)
146
146
}
147
-
if err := s.ApplyStarCreate(ctx, atURI, rec.Subject, createdAt, ev.TimeUS); err != nil {
147
+
if err := s.ApplyStarCreate(ctx, atURI, subject, createdAt, ev.TimeUS); err != nil {
148
148
return err
149
149
}
150
-
logger.Info("star", "by", ev.Did, "subject", rec.Subject)
151
-
// If the starred repo isn't in our index yet, queue discovery. The
152
-
// worker checks HasRepo before fetching, so duplicates are cheap.
150
+
logger.Info("star", "by", ev.Did, "subject", subject)
151
+
// Discovery validates input and drops non-at-uris, so passing a DID
152
+
// is a safe no-op for the object form.
153
153
if discovery != nil {
154
-
discovery.EnqueueAtURI(rec.Subject)
154
+
discovery.EnqueueAtURI(subject)
155
155
}
156
-
// First time we see this user — also pull their historical stars.
157
156
if starsBackfill != nil {
158
157
starsBackfill.Enqueue(ev.Did)
159
158
}
+43
firehose/star.go
+43
firehose/star.go
···
1
+
package firehose
2
+
3
+
import "encoding/json"
4
+
5
+
// StarRecord is the sh.tangled.feed.star lexicon. Subject appears in two
6
+
// shapes on the wire — a string at-uri pointing at a repo, or an object
7
+
// {"$type":"sh.tangled.feed.star#repo","did":"..."} that stars an account.
8
+
// We capture both: Subject for the at-uri form, SubjectDID for the account
9
+
// form (also populated from the top-level subjectDid sibling when present).
10
+
type StarRecord struct {
11
+
Subject string
12
+
SubjectDID string
13
+
CreatedAt string
14
+
}
15
+
16
+
func (r *StarRecord) UnmarshalJSON(data []byte) error {
17
+
var raw struct {
18
+
Subject json.RawMessage `json:"subject"`
19
+
SubjectDID string `json:"subjectDid"`
20
+
CreatedAt string `json:"createdAt"`
21
+
}
22
+
if err := json.Unmarshal(data, &raw); err != nil {
23
+
return err
24
+
}
25
+
r.SubjectDID = raw.SubjectDID
26
+
r.CreatedAt = raw.CreatedAt
27
+
if len(raw.Subject) == 0 || string(raw.Subject) == "null" {
28
+
return nil
29
+
}
30
+
if raw.Subject[0] == '"' {
31
+
return json.Unmarshal(raw.Subject, &r.Subject)
32
+
}
33
+
var obj struct {
34
+
DID string `json:"did"`
35
+
}
36
+
if err := json.Unmarshal(raw.Subject, &obj); err != nil {
37
+
return err
38
+
}
39
+
if obj.DID != "" {
40
+
r.SubjectDID = obj.DID
41
+
}
42
+
return nil
43
+
}
+55
firehose/star_test.go
+55
firehose/star_test.go
···
1
+
package firehose_test
2
+
3
+
import (
4
+
"encoding/json"
5
+
"testing"
6
+
7
+
"github.com/stretchr/testify/assert"
8
+
"github.com/stretchr/testify/require"
9
+
10
+
"tangled.sh/chown.de/tangled-repo-firehose/firehose"
11
+
)
12
+
13
+
func TestStarRecordUnmarshal(t *testing.T) {
14
+
tests := []struct {
15
+
name string
16
+
input string
17
+
wantSubject string
18
+
wantSubjectDID string
19
+
wantErr bool
20
+
}{
21
+
{
22
+
name: "string subject (at-uri form) with sibling subjectDid",
23
+
input: `{"$type":"sh.tangled.feed.star","createdAt":"2026-05-07T17:41:40+03:00","subject":"at://did:plc:thuylqmisypnmekwzfgymm3z/sh.tangled.repo/3mkzm63u2t322","subjectDid":"did:plc:ycef6t5dwrcmno7brso45xgd"}`,
24
+
wantSubject: "at://did:plc:thuylqmisypnmekwzfgymm3z/sh.tangled.repo/3mkzm63u2t322",
25
+
wantSubjectDID: "did:plc:ycef6t5dwrcmno7brso45xgd",
26
+
},
27
+
{
28
+
name: "object subject (#repo form) extracts did",
29
+
input: `{"$type":"sh.tangled.feed.star","createdAt":"2026-05-07T13:33:02Z","subject":{"$type":"sh.tangled.feed.star#repo","did":"did:plc:di4gol2smljyj6gjnjdu5qrg"}}`,
30
+
wantSubjectDID: "did:plc:di4gol2smljyj6gjnjdu5qrg",
31
+
},
32
+
{
33
+
name: "missing subject decodes to empty",
34
+
input: `{"$type":"sh.tangled.feed.star","createdAt":"2026-05-07T13:33:02Z"}`,
35
+
},
36
+
{
37
+
name: "malformed subject string fails",
38
+
input: `{"subject":"unterminated`,
39
+
wantErr: true,
40
+
},
41
+
}
42
+
for _, tt := range tests {
43
+
t.Run(tt.name, func(t *testing.T) {
44
+
var rec firehose.StarRecord
45
+
err := json.Unmarshal([]byte(tt.input), &rec)
46
+
if tt.wantErr {
47
+
require.Error(t, err)
48
+
return
49
+
}
50
+
require.NoError(t, err)
51
+
assert.Equal(t, tt.wantSubject, rec.Subject)
52
+
assert.Equal(t, tt.wantSubjectDID, rec.SubjectDID)
53
+
})
54
+
}
55
+
}
+5
-4
firehose/starsbackfill.go
+5
-4
firehose/starsbackfill.go
···
71
71
return err
72
72
}
73
73
for _, r := range records {
74
-
if r.Value.Subject == "" {
74
+
subject := r.Value.Subject
75
+
if subject == "" {
75
76
continue
76
77
}
77
78
createdAt, err := time.Parse(time.RFC3339Nano, r.Value.CreatedAt)
78
79
if err != nil {
79
80
createdAt = time.Now()
80
81
}
81
-
if err := w.store.UpsertStar(ctx, r.URI, r.Value.Subject, createdAt); err != nil {
82
+
if err := w.store.UpsertStar(ctx, r.URI, subject, createdAt); err != nil {
82
83
w.logger.Warn("upsert star", "err", err, "uri", r.URI)
83
84
continue
84
85
}
···
86
87
if w.discovery != nil {
87
88
// Block on backpressure — a single user can have hundreds of
88
89
// stars and discovery is rate-limited per PDS call.
89
-
w.discovery.EnqueueAtURIWait(ctx, r.Value.Subject)
90
+
w.discovery.EnqueueAtURIWait(ctx, subject)
90
91
}
91
92
}
92
93
if next == "" || next == cursor || len(records) == 0 {
···
103
104
type listRecordEntry struct {
104
105
URI string `json:"uri"`
105
106
CID string `json:"cid"`
106
-
Value starRecord `json:"value"`
107
+
Value StarRecord `json:"value"`
107
108
}
108
109
109
110
func (w *StarsBackfillWorker) listStars(ctx context.Context, xc *xrpc.Client, did, cursor string) ([]listRecordEntry, string, error) {
History
1 round
0 comments
chown.de
submitted
#0
2 commits
expand
collapse
fix(stars): account for different shapes
the sh.tangled.feed.star lexicon can apparently come in different
shapes. yay, decentralized firehose. this one decodes to so we can
continue to backfill.
chore(dev): use a port I don't use already
merge conflicts detected
expand
collapse
expand
collapse
- firehose/handler.go:25
- firehose/starsbackfill.go:71