Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: orphan spans from firehose indexing

+30 -2
+8 -2
cmd/server/main.go
··· 290 290 } 291 291 } 292 292 293 + // Create a root span so all backfill PDS calls are grouped under one trace 294 + backfillCtx, backfillSpan := tracing.HandlerSpan(ctx, "backfill.startup") 295 + 293 296 // Backfill all collected DIDs 294 297 successCount := 0 295 298 for did := range didsToBackfill { 296 - if err := firehoseConsumer.BackfillDID(ctx, did); err != nil { 299 + if err := firehoseConsumer.BackfillDID(backfillCtx, did); err != nil { 297 300 log.Warn().Err(err).Str("did", did).Msg("Failed to backfill user") 298 301 } else { 299 302 successCount++ 300 303 } 301 304 } 305 + backfillSpan.End() 302 306 log.Info().Int("total", len(didsToBackfill)).Int("success", successCount).Msg("Backfill complete") 303 307 }() 304 308 ··· 306 310 // This ensures users are added to the feed even if they had an existing session 307 311 oauthManager.SetOnAuthSuccess(func(did string) { 308 312 feedRegistry.Register(did) 309 - // Backfill the user's records 313 + // Backfill the user's records (BackfillUser creates its own span 314 + // only when there is actual work to do, avoiding empty traces for 315 + // already-backfilled users) 310 316 go func() { 311 317 if err := firehoseConsumer.BackfillDID(context.Background(), did); err != nil { 312 318 log.Warn().Err(err).Str("did", did).Msg("Failed to backfill new user")
+17
internal/atproto/public_client.go
··· 13 13 "sync" 14 14 "time" 15 15 16 + "arabica/internal/tracing" 17 + 16 18 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" 17 19 ) 18 20 ··· 118 120 return pds, nil 119 121 } 120 122 c.pdsCacheMu.RUnlock() 123 + 124 + ctx, span := tracing.PdsSpan(ctx, "resolvePDS", "did:plc", did) 125 + defer span.End() 121 126 122 127 // Resolve DID document from PLC directory 123 128 var pdsEndpoint string ··· 217 222 218 223 // GetProfile fetches a user's public profile by DID or handle 219 224 func (c *PublicClient) GetProfile(ctx context.Context, actor string) (*Profile, error) { 225 + ctx, span := tracing.PdsSpan(ctx, "getProfile", "app.bsky.actor", actor) 226 + defer span.End() 227 + 220 228 reqURL := fmt.Sprintf("%s/xrpc/app.bsky.actor.getProfile?actor=%s", 221 229 c.baseURL, url.QueryEscape(actor)) 222 230 ··· 247 255 // Records are returned in reverse chronological order (newest first) 248 256 // This queries the user's PDS directly to support custom collections 249 257 func (c *PublicClient) ListRecords(ctx context.Context, did, collection string, limit int) (*PublicListRecordsOutput, error) { 258 + ctx, span := tracing.PdsSpan(ctx, "publicListRecords", collection, did) 259 + defer span.End() 260 + 250 261 // Resolve the user's PDS endpoint 251 262 pdsEndpoint, err := c.GetPDSEndpoint(ctx, did) 252 263 if err != nil { ··· 281 292 282 293 // ResolveHandle resolves an AT Protocol handle to a DID 283 294 func (c *PublicClient) ResolveHandle(ctx context.Context, handle string) (string, error) { 295 + ctx, span := tracing.PdsSpan(ctx, "resolveHandle", "com.atproto.identity", handle) 296 + defer span.End() 297 + 284 298 reqURL := fmt.Sprintf("%s/xrpc/com.atproto.identity.resolveHandle?handle=%s", 285 299 c.baseURL, url.QueryEscape(handle)) 286 300 ··· 315 329 316 330 // GetRecord fetches a single public record from the user's PDS 317 331 func (c *PublicClient) GetRecord(ctx context.Context, did, collection, rkey string) (*PublicRecordEntry, error) { 332 + ctx, span := tracing.PdsSpan(ctx, "publicGetRecord", collection, did) 333 + defer span.End() 334 + 318 335 // Resolve the user's PDS endpoint 319 336 pdsEndpoint, err := c.GetPDSEndpoint(ctx, did) 320 337 if err != nil {
+5
internal/firehose/index.go
··· 1446 1446 return nil 1447 1447 } 1448 1448 1449 + ctx, span := tracing.HandlerSpan(ctx, "backfill.user", 1450 + attribute.String("backfill.did", did), 1451 + ) 1452 + defer span.End() 1453 + 1449 1454 log.Info().Str("did", did).Msg("backfilling user records") 1450 1455 1451 1456 recordCount := 0