Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: upsert tracing

authored by

Patrick Dewey and committed by tangled.org a8fcb524 3ed1099c

+166 -24
+13
internal/atproto/witness.go
··· 47 47 // Used for write-through caching after successful PDS mutations. 48 48 UpsertWitnessRecord(ctx context.Context, did, collection, rkey, cid string, record json.RawMessage) error 49 49 50 + // UpsertWitnessRecordBatch inserts or updates multiple records in a single 51 + // transaction. Used for bulk operations like refresh/backfill. 52 + UpsertWitnessRecordBatch(ctx context.Context, records []WitnessWriteRecord) error 53 + 50 54 // DeleteWitnessRecord removes a record from the cache. 51 55 // Used for write-through caching after successful PDS deletions. 52 56 DeleteWitnessRecord(ctx context.Context, did, collection, rkey string) error 53 57 } 58 + 59 + // WitnessWriteRecord holds the fields needed to upsert a record into the witness cache. 60 + type WitnessWriteRecord struct { 61 + DID string 62 + Collection string 63 + RKey string 64 + CID string 65 + Record json.RawMessage 66 + }
+1
internal/firehose/consumer.go
··· 330 330 return nil 331 331 } 332 332 if err := c.index.UpsertRecord( 333 + context.Background(), 333 334 event.DID, 334 335 commit.Collection, 335 336 commit.RKey,
+105 -14
internal/firehose/index.go
··· 15 15 "arabica/internal/atproto" 16 16 "arabica/internal/lexicons" 17 17 "arabica/internal/models" 18 + "arabica/internal/tracing" 18 19 19 20 "database/sql/driver" 20 21 21 22 "github.com/XSAM/otelsql" 22 23 "github.com/rs/zerolog/log" 24 + "go.opentelemetry.io/otel/attribute" 23 25 semconv "go.opentelemetry.io/otel/semconv/v1.26.0" 24 26 "go.opentelemetry.io/otel/trace" 25 27 _ "modernc.org/sqlite" ··· 378 380 return err 379 381 } 380 382 381 - // UpsertRecord adds or updates a record in the index 382 - func (idx *FeedIndex) UpsertRecord(did, collection, rkey, cid string, record json.RawMessage, eventTime int64) error { 383 + // UpsertRecord adds or updates a record in the index. 384 + // The context is used for OTel tracing; pass context.Background() for background operations. 385 + func (idx *FeedIndex) UpsertRecord(ctx context.Context, did, collection, rkey, cid string, record json.RawMessage, eventTime int64) error { 386 + const stmt = `INSERT INTO records (uri, did, collection, rkey, record, cid, indexed_at, created_at) 387 + VALUES (?, ?, ?, ?, ?, ?, ?, ?) 388 + ON CONFLICT(uri) DO UPDATE SET 389 + record = excluded.record, 390 + cid = excluded.cid, 391 + indexed_at = excluded.indexed_at, 392 + created_at = excluded.created_at` 393 + 394 + ctx, span := tracing.SqliteSpan(ctx, "upsert", "records") 395 + span.SetAttributes( 396 + attribute.String("record.did", did), 397 + attribute.String("record.collection", collection), 398 + attribute.String("record.rkey", rkey), 399 + attribute.String("db.statement", stmt), 400 + ) 401 + defer span.End() 402 + 383 403 uri := atproto.BuildATURI(did, collection, rkey) 384 404 385 405 // Parse createdAt from record ··· 395 415 396 416 now := time.Now() 397 417 398 - _, err := idx.db.Exec(` 399 - INSERT INTO records (uri, did, collection, rkey, record, cid, indexed_at, created_at) 400 - VALUES (?, ?, ?, ?, ?, ?, ?, ?) 401 - ON CONFLICT(uri) DO UPDATE SET 402 - record = excluded.record, 403 - cid = excluded.cid, 404 - indexed_at = excluded.indexed_at, 405 - created_at = excluded.created_at 406 - `, uri, did, collection, rkey, string(record), cid, 418 + _, err := idx.db.Exec(stmt, uri, did, collection, rkey, string(record), cid, 407 419 now.Format(time.RFC3339Nano), createdAt.Format(time.RFC3339Nano)) 408 420 if err != nil { 421 + tracing.EndWithError(span, err) 409 422 return fmt.Errorf("failed to upsert record: %w", err) 410 423 } 411 424 412 425 // Track known DID 413 426 _, err = idx.db.Exec(`INSERT OR IGNORE INTO known_dids (did) VALUES (?)`, did) 414 427 if err != nil { 428 + tracing.EndWithError(span, err) 415 429 return fmt.Errorf("failed to track known DID: %w", err) 416 430 } 417 431 ··· 426 440 } 427 441 428 442 // UpsertWitnessRecord implements atproto.WitnessCache for write-through caching. 429 - func (idx *FeedIndex) UpsertWitnessRecord(_ context.Context, did, collection, rkey, cid string, record json.RawMessage) error { 430 - return idx.UpsertRecord(did, collection, rkey, cid, record, time.Now().UnixMicro()) 443 + func (idx *FeedIndex) UpsertWitnessRecord(ctx context.Context, did, collection, rkey, cid string, record json.RawMessage) error { 444 + return idx.UpsertRecord(ctx, did, collection, rkey, cid, record, time.Now().UnixMicro()) 445 + } 446 + 447 + // UpsertWitnessRecordBatch implements atproto.WitnessCache batch upsert. 448 + // All records are inserted in a single transaction for efficiency. 449 + func (idx *FeedIndex) UpsertWitnessRecordBatch(ctx context.Context, records []atproto.WitnessWriteRecord) error { 450 + if len(records) == 0 { 451 + return nil 452 + } 453 + 454 + const upsertSQL = `INSERT INTO records (uri, did, collection, rkey, record, cid, indexed_at, created_at) 455 + VALUES (?, ?, ?, ?, ?, ?, ?, ?) 456 + ON CONFLICT(uri) DO UPDATE SET 457 + record = excluded.record, 458 + cid = excluded.cid, 459 + indexed_at = excluded.indexed_at, 460 + created_at = excluded.created_at` 461 + 462 + ctx, span := tracing.SqliteSpan(ctx, "upsert_batch", "records") 463 + span.SetAttributes( 464 + attribute.Int("batch.size", len(records)), 465 + attribute.String("db.statement", upsertSQL), 466 + ) 467 + defer span.End() 468 + 469 + tx, err := idx.db.Begin() 470 + if err != nil { 471 + tracing.EndWithError(span, err) 472 + return fmt.Errorf("failed to begin transaction: %w", err) 473 + } 474 + defer tx.Rollback() //nolint:errcheck 475 + 476 + stmt, err := tx.Prepare(upsertSQL) 477 + if err != nil { 478 + tracing.EndWithError(span, err) 479 + return fmt.Errorf("failed to prepare statement: %w", err) 480 + } 481 + defer stmt.Close() 482 + 483 + now := time.Now() 484 + seenDIDs := make(map[string]struct{}) 485 + 486 + for _, rec := range records { 487 + uri := atproto.BuildATURI(rec.DID, rec.Collection, rec.RKey) 488 + 489 + createdAt := now 490 + var recordData map[string]any 491 + if err := json.Unmarshal(rec.Record, &recordData); err == nil { 492 + if createdAtStr, ok := recordData["createdAt"].(string); ok { 493 + if t, err := time.Parse(time.RFC3339, createdAtStr); err == nil { 494 + createdAt = t 495 + } 496 + } 497 + } 498 + 499 + if _, err := stmt.Exec(uri, rec.DID, rec.Collection, rec.RKey, 500 + string(rec.Record), rec.CID, 501 + now.Format(time.RFC3339Nano), createdAt.Format(time.RFC3339Nano)); err != nil { 502 + tracing.EndWithError(span, err) 503 + return fmt.Errorf("failed to upsert record %s: %w", uri, err) 504 + } 505 + seenDIDs[rec.DID] = struct{}{} 506 + } 507 + 508 + // Track known DIDs (deduplicated) 509 + for did := range seenDIDs { 510 + if _, err := tx.Exec(`INSERT OR IGNORE INTO known_dids (did) VALUES (?)`, did); err != nil { 511 + tracing.EndWithError(span, err) 512 + return fmt.Errorf("failed to track known DID: %w", err) 513 + } 514 + } 515 + 516 + if err := tx.Commit(); err != nil { 517 + tracing.EndWithError(span, err) 518 + return fmt.Errorf("failed to commit transaction: %w", err) 519 + } 520 + 521 + return nil 431 522 } 432 523 433 524 // DeleteWitnessRecord implements atproto.WitnessCache for write-through caching. ··· 1196 1287 continue 1197 1288 } 1198 1289 1199 - if err := idx.UpsertRecord(did, collection, rkey, record.CID, recordJSON, 0); err != nil { 1290 + if err := idx.UpsertRecord(ctx, did, collection, rkey, record.CID, recordJSON, 0); err != nil { 1200 1291 log.Warn().Err(err).Str("uri", record.URI).Msg("failed to upsert record during backfill") 1201 1292 continue 1202 1293 }
+4 -4
internal/firehose/index_test.go
··· 235 235 236 236 // Index a record 237 237 record := []byte(`{"$type":"social.arabica.alpha.bean","name":"Test Bean","origin":"Ethiopia","createdAt":"2025-01-01T00:00:00Z"}`) 238 - err = idx.UpsertRecord(did, collection, rkey, "cid123", record, time.Now().Unix()) 238 + err = idx.UpsertRecord(context.Background(), did, collection, rkey, "cid123", record, time.Now().Unix()) 239 239 assert.NoError(t, err) 240 240 241 241 // Verify it exists ··· 279 279 // Index two records 280 280 record1 := []byte(`{"$type":"social.arabica.alpha.bean","name":"Bean One","createdAt":"2025-01-01T00:00:00Z"}`) 281 281 record2 := []byte(`{"$type":"social.arabica.alpha.bean","name":"Bean Two","createdAt":"2025-01-02T00:00:00Z"}`) 282 - err = idx.UpsertRecord(did, collection, "bean1", "cid1", record1, time.Now().Unix()) 282 + err = idx.UpsertRecord(context.Background(), did, collection, "bean1", "cid1", record1, time.Now().Unix()) 283 283 assert.NoError(t, err) 284 - err = idx.UpsertRecord(did, collection, "bean2", "cid2", record2, time.Now().Unix()) 284 + err = idx.UpsertRecord(context.Background(), did, collection, "bean2", "cid2", record2, time.Now().Unix()) 285 285 assert.NoError(t, err) 286 286 287 287 assert.Equal(t, 2, idx.RecordCount()) ··· 340 340 } 341 341 342 342 for _, tt := range types { 343 - err := idx.UpsertRecord(did, tt.collection, tt.rkey, "cid-"+tt.rkey, []byte(tt.record), now) 343 + err := idx.UpsertRecord(context.Background(), did, tt.collection, tt.rkey, "cid-"+tt.rkey, []byte(tt.record), now) 344 344 assert.NoError(t, err, "failed to upsert %s", tt.collection) 345 345 } 346 346
+20 -5
internal/handlers/entities.go
··· 7 7 8 8 "arabica/internal/atproto" 9 9 "arabica/internal/models" 10 + "arabica/internal/tracing" 10 11 "arabica/internal/web/components" 11 12 "arabica/internal/web/pages" 12 13 13 14 "github.com/rs/zerolog/log" 15 + "go.opentelemetry.io/otel/attribute" 14 16 "golang.org/x/sync/errgroup" 15 17 ) 16 18 ··· 405 407 } 406 408 407 409 if h.witnessCache != nil { 410 + refreshCtx, refreshSpan := tracing.HandlerSpan(r.Context(), "manage.refresh.witness_sync", 411 + attribute.String("user.did", didStr), 412 + ) 413 + var batch []atproto.WitnessWriteRecord 408 414 for _, collection := range entityCollections { 409 - output, err := h.atprotoClient.ListAllRecords(r.Context(), did, sessionID, collection) 415 + output, err := h.atprotoClient.ListAllRecords(refreshCtx, did, sessionID, collection) 410 416 if err != nil { 411 417 log.Warn().Err(err).Str("collection", collection).Msg("refresh: failed to list records from PDS") 412 418 continue ··· 420 426 if jsonErr != nil { 421 427 continue 422 428 } 423 - if err := h.witnessCache.UpsertWitnessRecord(r.Context(), didStr, collection, rkey, rec.CID, recordJSON); err != nil { 424 - log.Warn().Err(err).Str("uri", rec.URI).Msg("refresh: failed to write-through record") 425 - } 429 + batch = append(batch, atproto.WitnessWriteRecord{ 430 + DID: didStr, 431 + Collection: collection, 432 + RKey: rkey, 433 + CID: rec.CID, 434 + Record: recordJSON, 435 + }) 426 436 } 427 437 short := collection[strings.LastIndex(collection, ".")+1:] 428 - log.Info().Str("collection", short).Int("count", len(output.Records)).Msg("refresh: synced collection from PDS") 438 + log.Info().Str("collection", short).Int("count", len(output.Records)).Msg("refresh: fetched collection from PDS") 439 + } 440 + if err := h.witnessCache.UpsertWitnessRecordBatch(refreshCtx, batch); err != nil { 441 + log.Error().Err(err).Msg("refresh: failed to batch upsert records") 429 442 } 443 + refreshSpan.SetAttributes(attribute.Int("refresh.total_records", len(batch))) 444 + refreshSpan.End() 430 445 } 431 446 432 447 // Now fetch and render the manage partial with fresh PDS data
+2 -1
internal/suggestions/suggestions_test.go
··· 1 1 package suggestions 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 6 "os" 6 7 "path/filepath" ··· 32 33 fields["createdAt"] = time.Now().Format(time.RFC3339) 33 34 data, err := json.Marshal(fields) 34 35 assert.NoError(t, err) 35 - err = idx.UpsertRecord(did, collection, rkey, "cid-"+rkey, data, 0) 36 + err = idx.UpsertRecord(context.Background(), did, collection, rkey, "cid-"+rkey, data, 0) 36 37 assert.NoError(t, err) 37 38 } 38 39
+21
internal/tracing/tracing.go
··· 83 83 ) 84 84 } 85 85 86 + // SqliteSpan starts a span for a SQLite operation with standard attributes. 87 + // Returns a no-op span if there is no parent span in ctx. 88 + func SqliteSpan(ctx context.Context, op, table string) (context.Context, trace.Span) { 89 + if !trace.SpanFromContext(ctx).SpanContext().IsValid() { 90 + return ctx, trace.SpanFromContext(ctx) 91 + } 92 + return tracer().Start(ctx, "sqlite."+op, 93 + trace.WithAttributes( 94 + attribute.String("db.system", "sqlite"), 95 + attribute.String("db.operation", op), 96 + attribute.String("db.sql.table", table), 97 + ), 98 + ) 99 + } 100 + 101 + // HandlerSpan starts a span for a logical operation within a handler. 102 + // Use this to group related work (e.g. a refresh loop) under a single span. 103 + func HandlerSpan(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { 104 + return tracer().Start(ctx, name, trace.WithAttributes(attrs...)) 105 + } 106 + 86 107 // EndWithError records an error on a span and sets its status. 87 108 // If err is nil, this is a no-op. 88 109 func EndWithError(span trace.Span, err error) {