+236
-44
Diff
round #0
+33
internal/atproto/public_client.go
+33
internal/atproto/public_client.go
···
92
92
return did, nil
93
93
}
94
94
95
+
// InvalidateHandle removes a handle from the resolver cache so the next
96
+
// ResolveHandle call refetches from the directory. Called when a firehose
97
+
// identity event signals that a handle's DID mapping has changed.
98
+
func (c *PublicClient) InvalidateHandle(handle string) {
99
+
if handle == "" {
100
+
return
101
+
}
102
+
c.handleMu.Lock()
103
+
delete(c.handleCache, handle)
104
+
c.handleMu.Unlock()
105
+
}
106
+
107
+
// InvalidateDID drops any cached entries pointing at this DID โ both the
108
+
// PDS endpoint cache and any handleโDID mappings whose resolved DID is the
109
+
// given one. Used when a DID's repo is gone (account deleted/takendown) or
110
+
// when a handle has been reassigned away from this DID.
111
+
func (c *PublicClient) InvalidateDID(did string) {
112
+
if did == "" {
113
+
return
114
+
}
115
+
c.pdsMu.Lock()
116
+
delete(c.pdsCache, did)
117
+
c.pdsMu.Unlock()
118
+
119
+
c.handleMu.Lock()
120
+
for h, v := range c.handleCache {
121
+
if v.value == did {
122
+
delete(c.handleCache, h)
123
+
}
124
+
}
125
+
c.handleMu.Unlock()
126
+
}
127
+
95
128
// PublicListRecordsOutput represents the response from public listRecords API.
96
129
type PublicListRecordsOutput struct {
97
130
Records []PublicRecordEntry `json:"records"`
+143
-39
internal/firehose/index.go
+143
-39
internal/firehose/index.go
···
128
128
expires_at TEXT NOT NULL
129
129
);
130
130
131
+
CREATE TABLE IF NOT EXISTS did_by_handle (
132
+
handle TEXT PRIMARY KEY,
133
+
did TEXT NOT NULL,
134
+
updated_at TEXT NOT NULL
135
+
);
136
+
CREATE INDEX IF NOT EXISTS idx_did_by_handle_did ON did_by_handle(did);
137
+
131
138
CREATE TABLE IF NOT EXISTS likes (
132
139
subject_uri TEXT NOT NULL,
133
140
actor_did TEXT NOT NULL,
···
289
296
profileCache: make(map[string]*CachedProfile),
290
297
}
291
298
299
+
// One-time backfill: populate did_by_handle from any pre-existing profile rows
300
+
// so handle resolution works for users observed before this table existed.
301
+
if err := idx.backfillHandleIndex(); err != nil {
302
+
log.Warn().Err(err).Msg("did_by_handle backfill failed; lookups will populate lazily")
303
+
}
304
+
292
305
// If the database already has records from a previous run, mark ready immediately
293
306
// so the feed is served from persisted data while the firehose reconnects.
294
307
var count int
···
299
312
return idx, nil
300
313
}
301
314
315
+
// backfillHandleIndex populates did_by_handle from the profiles table. Idempotent.
316
+
// Iterates every cached profile and inserts (handle, did) โ last writer wins,
317
+
// matching the live storeProfile semantics, so a handle that existed on multiple
318
+
// DIDs resolves to whichever profile was inserted most recently in the iteration.
319
+
func (idx *FeedIndex) backfillHandleIndex() error {
320
+
var n int
321
+
if err := idx.db.QueryRow(`SELECT COUNT(*) FROM did_by_handle`).Scan(&n); err == nil && n > 0 {
322
+
return nil
323
+
}
324
+
325
+
rows, err := idx.db.Query(`SELECT did, data FROM profiles`)
326
+
if err != nil {
327
+
return err
328
+
}
329
+
defer rows.Close()
330
+
331
+
now := time.Now().Format(time.RFC3339Nano)
332
+
for rows.Next() {
333
+
var did, dataStr string
334
+
if err := rows.Scan(&did, &dataStr); err != nil {
335
+
continue
336
+
}
337
+
cached := &CachedProfile{}
338
+
if err := json.Unmarshal([]byte(dataStr), cached); err != nil || cached.Profile == nil || cached.Profile.Handle == "" {
339
+
continue
340
+
}
341
+
_, _ = idx.db.Exec(
342
+
`INSERT OR REPLACE INTO did_by_handle (handle, did, updated_at) VALUES (?, ?, ?)`,
343
+
cached.Profile.Handle, did, now)
344
+
}
345
+
return rows.Err()
346
+
}
347
+
302
348
// Compile-time check: FeedIndex must satisfy the atproto.WitnessCache interface.
303
349
var _ atproto.WitnessCache = (*FeedIndex)(nil)
304
350
···
490
536
{`DELETE FROM notifications WHERE target_did = ? OR actor_did = ?`, []any{did, did}},
491
537
{`DELETE FROM notifications_meta WHERE target_did = ?`, []any{did}},
492
538
{`DELETE FROM profiles WHERE did = ?`, []any{did}},
539
+
{`DELETE FROM did_by_handle WHERE did = ?`, []any{did}},
493
540
{`DELETE FROM known_dids WHERE did = ?`, []any{did}},
494
541
{`DELETE FROM registered_dids WHERE did = ?`, []any{did}},
495
542
{`DELETE FROM backfilled WHERE did = ?`, []any{did}},
···
1124
1171
return profile, nil
1125
1172
}
1126
1173
1127
-
// storeProfile writes a profile to both in-memory and persistent caches.
1174
+
// storeProfile writes a profile to both in-memory and persistent caches, and
1175
+
// maintains the did_by_handle index so handle lookups stay accurate across
1176
+
// handle changes and handle reassignment between DIDs.
1128
1177
func (idx *FeedIndex) storeProfile(ctx context.Context, did string, profile *atproto.Profile) {
1129
1178
now := time.Now()
1130
1179
cached := &CachedProfile{
···
1140
1189
data, _ := json.Marshal(cached)
1141
1190
_, _ = idx.db.ExecContext(ctx, `INSERT OR REPLACE INTO profiles (did, data, expires_at) VALUES (?, ?, ?)`,
1142
1191
did, string(data), cached.ExpiresAt.Format(time.RFC3339Nano))
1143
-
}
1144
1192
1145
-
// GetDIDByHandle looks up a DID from the profile cache by handle.
1146
-
// Returns the DID and true if found, or empty string and false if not cached.
1147
-
// This avoids a ResolveHandle API call for known Arabica users.
1148
-
func (idx *FeedIndex) GetDIDByHandle(ctx context.Context, handle string) (string, bool) {
1149
-
// Check in-memory cache first
1150
-
idx.profileCacheMu.RLock()
1151
-
for did, cached := range idx.profileCache {
1152
-
if cached.Profile != nil && cached.Profile.Handle == handle && time.Now().Before(cached.ExpiresAt) {
1153
-
idx.profileCacheMu.RUnlock()
1154
-
return did, true
1155
-
}
1193
+
if profile != nil && profile.Handle != "" {
1194
+
// Drop any prior row pointing this DID at a different handle (handle change).
1195
+
_, _ = idx.db.ExecContext(ctx,
1196
+
`DELETE FROM did_by_handle WHERE did = ? AND handle != ?`, did, profile.Handle)
1197
+
// Last writer wins on handle โ this naturally resolves handle reassignment
1198
+
// from an old DID to a new one, since the new profile's INSERT OR REPLACE
1199
+
// overwrites the old DID's mapping.
1200
+
_, _ = idx.db.ExecContext(ctx,
1201
+
`INSERT OR REPLACE INTO did_by_handle (handle, did, updated_at) VALUES (?, ?, ?)`,
1202
+
profile.Handle, did, now.Format(time.RFC3339Nano))
1156
1203
}
1157
-
idx.profileCacheMu.RUnlock()
1204
+
}
1158
1205
1159
-
// Check persistent store
1160
-
rows, err := idx.db.QueryContext(ctx, `SELECT did, data FROM profiles`)
1161
-
if err != nil {
1206
+
// GetDIDByHandle looks up a DID from the handle index. Returns the DID and
1207
+
// true if found, or empty string and false if not indexed.
1208
+
//
1209
+
// Backed by the did_by_handle table โ last-writer-wins, so a handle that has
1210
+
// been reassigned to a new DID resolves to that new DID once the new profile
1211
+
// is observed (via the firehose profile watcher or a GetProfile call).
1212
+
func (idx *FeedIndex) GetDIDByHandle(ctx context.Context, handle string) (string, bool) {
1213
+
var did string
1214
+
err := idx.db.QueryRowContext(ctx,
1215
+
`SELECT did FROM did_by_handle WHERE handle = ?`, handle).Scan(&did)
1216
+
if err != nil || did == "" {
1162
1217
return "", false
1163
1218
}
1164
-
defer rows.Close()
1165
-
1166
-
for rows.Next() {
1167
-
var did, dataStr string
1168
-
if err := rows.Scan(&did, &dataStr); err != nil {
1169
-
continue
1170
-
}
1171
-
cached := &CachedProfile{}
1172
-
if err := json.Unmarshal([]byte(dataStr), cached); err != nil || cached.Profile == nil {
1173
-
continue
1174
-
}
1175
-
if cached.Profile.Handle == handle {
1176
-
// Promote to in-memory cache
1177
-
cached.ExpiresAt = time.Now().Add(idx.profileTTL)
1178
-
idx.profileCacheMu.Lock()
1179
-
idx.profileCache[did] = cached
1180
-
idx.profileCacheMu.Unlock()
1181
-
return did, true
1182
-
}
1183
-
}
1184
-
1185
-
return "", false
1219
+
return did, true
1186
1220
}
1187
1221
1188
1222
// InvalidateProfile removes a DID's profile from both the in-memory and persistent
···
1193
1227
idx.profileCacheMu.Unlock()
1194
1228
1195
1229
_, _ = idx.db.Exec(`DELETE FROM profiles WHERE did = ?`, did)
1230
+
_, _ = idx.db.Exec(`DELETE FROM did_by_handle WHERE did = ?`, did)
1196
1231
}
1197
1232
1198
1233
// RefreshProfile fetches a profile from the API and stores it in both caches.
···
1208
1243
idx.storeProfile(ctx, did, profile)
1209
1244
}
1210
1245
1246
+
// InvalidatePublicCachesForDID drops the public client's cached PDS endpoint
1247
+
// and any handleโDID mappings pointing at this DID. Used when an account is
1248
+
// deleted/takendown so subsequent lookups don't keep hitting the tombstoned DID.
1249
+
func (idx *FeedIndex) InvalidatePublicCachesForDID(did string) {
1250
+
if idx.publicClient != nil {
1251
+
idx.publicClient.InvalidateDID(did)
1252
+
}
1253
+
}
1254
+
1255
+
// OnIdentityEvent reconciles caches when a Jetstream identity event reports
1256
+
// that a DID's handle has changed. It is the only path through which a handle
1257
+
// can be reassigned from one DID to another (handle release + reclaim by a
1258
+
// different account), so this is where stale mappings must be evicted.
1259
+
//
1260
+
// Steps:
1261
+
// 1. Look up this DID's previously cached handle (the old handle).
1262
+
// 2. Find any *other* DID whose cached profile still claims the new handle โ
1263
+
// that's the prior owner; invalidate its profile and resolver entries.
1264
+
// 3. Drop the old handle from the resolver cache (it may now resolve to
1265
+
// someone else, or to nothing).
1266
+
// 4. Drop the new handle from the resolver cache so the next ResolveHandle
1267
+
// re-fetches from the directory.
1268
+
// 5. Refresh this DID's profile via the API; storeProfile then writes the
1269
+
// authoritative did_by_handle row.
1270
+
func (idx *FeedIndex) OnIdentityEvent(ctx context.Context, did, newHandle string) {
1271
+
var oldHandle string
1272
+
idx.profileCacheMu.RLock()
1273
+
if cached, ok := idx.profileCache[did]; ok && cached.Profile != nil {
1274
+
oldHandle = cached.Profile.Handle
1275
+
}
1276
+
idx.profileCacheMu.RUnlock()
1277
+
if oldHandle == "" {
1278
+
// Fall back to persistent store.
1279
+
var dataStr string
1280
+
if err := idx.db.QueryRowContext(ctx, `SELECT data FROM profiles WHERE did = ?`, did).Scan(&dataStr); err == nil {
1281
+
cached := &CachedProfile{}
1282
+
if err := json.Unmarshal([]byte(dataStr), cached); err == nil && cached.Profile != nil {
1283
+
oldHandle = cached.Profile.Handle
1284
+
}
1285
+
}
1286
+
}
1287
+
1288
+
if newHandle != "" {
1289
+
// Evict any prior owner of newHandle (other than `did` itself).
1290
+
var priorDID string
1291
+
err := idx.db.QueryRowContext(ctx,
1292
+
`SELECT did FROM did_by_handle WHERE handle = ? AND did != ?`, newHandle, did).Scan(&priorDID)
1293
+
if err == nil && priorDID != "" {
1294
+
log.Warn().
1295
+
Str("handle", newHandle).
1296
+
Str("prior_did", priorDID).
1297
+
Str("new_did", did).
1298
+
Msg("identity event: handle reassigned, invalidating prior owner")
1299
+
idx.InvalidateProfile(priorDID)
1300
+
idx.publicClient.InvalidateDID(priorDID)
1301
+
}
1302
+
}
1303
+
1304
+
if oldHandle != "" && oldHandle != newHandle {
1305
+
idx.publicClient.InvalidateHandle(oldHandle)
1306
+
}
1307
+
if newHandle != "" {
1308
+
idx.publicClient.InvalidateHandle(newHandle)
1309
+
}
1310
+
idx.publicClient.InvalidateDID(did)
1311
+
1312
+
idx.RefreshProfile(ctx, did)
1313
+
}
1314
+
1211
1315
// GetKnownDIDs returns all DIDs that have created Arabica records
1212
1316
func (idx *FeedIndex) GetKnownDIDs(ctx context.Context) ([]string, error) {
1213
1317
rows, err := idx.db.QueryContext(ctx, `SELECT did FROM known_dids`)
+7
-5
internal/firehose/profile_watcher.go
+7
-5
internal/firehose/profile_watcher.go
···
308
308
}
309
309
310
310
case "identity":
311
-
// Handle change or PDS migration โ refresh the cached profile so handle
312
-
// resolution stays accurate. Profile-commit events don't fire on handle
313
-
// changes, so this is the only signal we get.
314
-
pw.index.RefreshProfile(context.Background(), event.DID)
311
+
// Handle change, handle reassignment, or PDS migration. OnIdentityEvent
312
+
// reconciles the profile cache, did_by_handle index, and resolver caches
313
+
// before refreshing โ this is also the only signal we get when a handle
314
+
// moves from an abandoned DID to a new one.
315
315
handle := ""
316
316
if event.Identity != nil {
317
317
handle = event.Identity.Handle
318
318
}
319
-
log.Info().Str("did", event.DID).Str("handle", handle).Msg("profile watcher: identity update, refreshed profile")
319
+
pw.index.OnIdentityEvent(context.Background(), event.DID, handle)
320
+
log.Info().Str("did", event.DID).Str("handle", handle).Msg("profile watcher: identity update reconciled")
320
321
321
322
case "account":
322
323
if event.Account == nil {
···
336
337
log.Error().Err(err).Str("did", event.DID).Str("status", status).Msg("profile watcher: failed to delete user data")
337
338
return
338
339
}
340
+
pw.index.InvalidatePublicCachesForDID(event.DID)
339
341
log.Warn().Str("did", event.DID).Str("status", status).Msg("profile watcher: purged all data for account")
340
342
pw.Unwatch(event.DID)
341
343
}
+51
internal/handlers/admin.go
+51
internal/handlers/admin.go
···
791
791
log.Error().Err(err).Str("did", didStr).Msg("witness export: encode failed")
792
792
}
793
793
}
794
+
795
+
// HandleAdminPurgeDID removes every trace of a DID from the witness cache:
796
+
// records, likes, comments (including ones targeting this DID's records),
797
+
// notifications, profile cache, did_by_handle index, known/registered/backfilled
798
+
// tracking, and user settings. Moderation tables are preserved as evidence.
799
+
//
800
+
// Required when an account orphans its data โ e.g. the user's PDS goes away
801
+
// without the firehose ever emitting a deleted/takendown account event, so the
802
+
// stale records sit in the cache forever. Auth and admin checks are handled by
803
+
// RequireAdmin.
804
+
func (h *Handler) HandleAdminPurgeDID(w http.ResponseWriter, r *http.Request) {
805
+
rawDID := strings.TrimSpace(r.URL.Query().Get("did"))
806
+
if rawDID == "" {
807
+
// Form posts may put it in the body.
808
+
if err := r.ParseForm(); err == nil {
809
+
rawDID = strings.TrimSpace(r.FormValue("did"))
810
+
}
811
+
}
812
+
if rawDID == "" {
813
+
http.Error(w, "missing 'did' parameter", http.StatusBadRequest)
814
+
return
815
+
}
816
+
did, err := syntax.ParseDID(rawDID)
817
+
if err != nil {
818
+
http.Error(w, fmt.Sprintf("invalid DID: %v", err), http.StatusBadRequest)
819
+
return
820
+
}
821
+
if h.feedIndex == nil {
822
+
http.Error(w, "feed index not configured", http.StatusServiceUnavailable)
823
+
return
824
+
}
825
+
826
+
didStr := did.String()
827
+
actor, _ := atproto.GetAuthenticatedDID(r.Context())
828
+
829
+
if err := h.feedIndex.DeleteAllByDID(r.Context(), didStr); err != nil {
830
+
log.Error().Err(err).Str("did", didStr).Str("actor", actor).Msg("admin purge: DeleteAllByDID failed")
831
+
http.Error(w, "purge failed", http.StatusInternalServerError)
832
+
return
833
+
}
834
+
h.feedIndex.InvalidatePublicCachesForDID(didStr)
835
+
836
+
log.Warn().Str("did", didStr).Str("actor", actor).Msg("admin purge: removed all witness data for DID")
837
+
838
+
w.Header().Set("Content-Type", "application/json")
839
+
_ = json.NewEncoder(w).Encode(map[string]any{
840
+
"did": didStr,
841
+
"purged": true,
842
+
"purgedAt": time.Now().UTC(),
843
+
})
844
+
}
+2
internal/routing/routing.go
+2
internal/routing/routing.go
···
198
198
middleware.RequireHTMXMiddleware(http.HandlerFunc(h.HandleAdminStats))))
199
199
mux.Handle("GET /_mod/export", middleware.RequireAdmin(modSvc,
200
200
http.HandlerFunc(h.HandleAdminExportDID)))
201
+
mux.Handle("POST /_mod/purge", cop.Handler(
202
+
middleware.RequireAdmin(modSvc, http.HandlerFunc(h.HandleAdminPurgeDID))))
201
203
202
204
// Static files (must come after specific routes)
203
205
fs := http.FileServer(http.Dir("static"))
History
1 round
0 comments
pdewey.com
submitted
#0
1 commit
expand
collapse
fix: handle resolution cache race condition
merge conflicts detected
expand
collapse
expand
collapse
- internal/firehose/index.go:128
- internal/firehose/profile_watcher.go:308
- internal/handlers/admin.go:791
- internal/routing/routing.go:198