this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Pass records around as *[]byte instead of as CBOR Marshallers (#522)

authored by

Jaz and committed by
GitHub
439d8603 cf33c689

+68 -42
+27 -29
backfill/backfill.go
··· 11 11 "time" 12 12 13 13 "github.com/bluesky-social/indigo/api/atproto" 14 - // Blank import to register types for CBORGEN 15 - _ "github.com/bluesky-social/indigo/api/bsky" 16 - lexutil "github.com/bluesky-social/indigo/lex/util" 17 14 "github.com/bluesky-social/indigo/repo" 18 15 "github.com/bluesky-social/indigo/repomgr" 19 16 "github.com/ipfs/go-cid" 20 - typegen "github.com/whyrusleeping/cbor-gen" 21 17 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" 22 18 "go.opentelemetry.io/otel" 23 19 "golang.org/x/time/rate" ··· 37 33 // Once done it clears the buffer and marks the job as "complete" 38 34 // Allowing the Job interface to abstract away the details of how buffered 39 35 // operations are stored and/or locked 40 - FlushBufferedOps(ctx context.Context, cb func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error 36 + FlushBufferedOps(ctx context.Context, cb func(kind, rev, path string, rec *[]byte, cid *cid.Cid) error) error 41 37 42 38 ClearBufferedOps(ctx context.Context) error 43 39 } ··· 56 52 // Backfiller is a struct which handles backfilling a repo 57 53 type Backfiller struct { 58 54 Name string 59 - HandleCreateRecord func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error 60 - HandleUpdateRecord func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error 55 + HandleCreateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error 56 + HandleUpdateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error 61 57 HandleDeleteRecord func(ctx context.Context, repo string, rev string, path string) error 62 58 Store Store 63 59 ··· 123 119 func NewBackfiller( 124 120 name string, 125 121 store Store, 126 - handleCreate func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, 127 - handleUpdate func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, 122 + handleCreate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error, 123 + handleUpdate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error, 128 124 handleDelete func(ctx context.Context, repo string, rev string, path string) error, 129 125 opts *BackfillOptions, 130 126 ) *Backfiller { ··· 213 209 214 210 // Flush buffered operations, clear the buffer, and mark the job as "complete" 215 211 // Clearning and marking are handled by the job interface 216 - err := job.FlushBufferedOps(ctx, func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { 212 + err := job.FlushBufferedOps(ctx, func(kind, rev, path string, rec *[]byte, cid *cid.Cid) error { 217 213 switch repomgr.EventKind(kind) { 218 214 case repomgr.EvtKindCreateRecord: 219 215 err := b.HandleCreateRecord(ctx, repo, rev, path, rec, cid) ··· 293 289 } 294 290 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 295 291 if err != nil { 292 + state := fmt.Sprintf("failed (create request: %s)", err.Error()) 293 + // Mark the job as "failed" 294 + err := job.SetState(ctx, state) 295 + if err != nil { 296 + log.Error("failed to set job state", "error", err) 297 + } 298 + 296 299 log.Error("failed to create request", "error", err) 297 300 return 298 301 } ··· 307 310 308 311 resp, err := client.Do(req) 309 312 if err != nil { 313 + state := fmt.Sprintf("failed (do request: %s)", err.Error()) 314 + // Mark the job as "failed" 315 + err := job.SetState(ctx, state) 316 + if err != nil { 317 + log.Error("failed to set job state", "error", err) 318 + } 319 + 310 320 log.Error("failed to send request", "error", err) 311 321 return 312 322 } ··· 392 402 recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get blocks for record: %w", err)} 393 403 continue 394 404 } 395 - rec, err := lexutil.CborDecodeValue(blk.RawData()) 396 - if err != nil { 397 - recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to decode record: %w", err)} 398 - continue 399 - } 400 405 401 - recM, ok := rec.(typegen.CBORMarshaler) 402 - if !ok { 403 - recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to cast record to CBORMarshaler")} 404 - continue 405 - } 406 + raw := blk.RawData() 406 407 407 - err = b.HandleCreateRecord(ctx, repoDid, rev, item.recordPath, recM, &item.nodeCid) 408 + err = b.HandleCreateRecord(ctx, repoDid, rev, item.recordPath, &raw, &item.nodeCid) 408 409 if err != nil { 409 410 recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} 410 411 continue ··· 448 449 449 450 const trust = true 450 451 451 - func (bf *Backfiller) getRecord(ctx context.Context, r *repo.Repo, op *atproto.SyncSubscribeRepos_RepoOp) (cid.Cid, typegen.CBORMarshaler, error) { 452 + func (bf *Backfiller) getRecord(ctx context.Context, r *repo.Repo, op *atproto.SyncSubscribeRepos_RepoOp) (cid.Cid, *[]byte, error) { 452 453 if trust { 453 454 if op.Cid == nil { 454 455 return cid.Undef, nil, fmt.Errorf("op had no cid set") ··· 460 461 return cid.Undef, nil, err 461 462 } 462 463 463 - rec, err := lexutil.CborDecodeValue(blk.RawData()) 464 - if err != nil { 465 - return cid.Undef, nil, fmt.Errorf("failed to decode value: %w", err) 466 - } 464 + raw := blk.RawData() 467 465 468 - return c, rec, nil 466 + return c, &raw, nil 469 467 } else { 470 - return r.GetRecord(ctx, op.Path) 468 + return r.GetRecordBytes(ctx, op.Path) 471 469 } 472 470 } 473 471 ··· 535 533 return nil 536 534 } 537 535 538 - func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { 536 + func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec *[]byte, cid *cid.Cid) (bool, error) { 539 537 return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{{ 540 538 path: path, 541 539 kind: kind,
+1 -2
backfill/gormstore.go
··· 9 9 "time" 10 10 11 11 "github.com/ipfs/go-cid" 12 - typegen "github.com/whyrusleeping/cbor-gen" 13 12 "gorm.io/gorm" 14 13 ) 15 14 ··· 316 315 return j.db.Save(j.dbj).Error 317 316 } 318 317 319 - func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { 318 + func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, rev, path string, rec *[]byte, cid *cid.Cid) error) error { 320 319 // TODO: this will block any events for this repo while this flush is ongoing, is that okay? 321 320 j.lk.Lock() 322 321 defer j.lk.Unlock()
+3 -4
backfill/memstore.go
··· 7 7 "time" 8 8 9 9 "github.com/ipfs/go-cid" 10 - typegen "github.com/whyrusleeping/cbor-gen" 11 10 ) 12 11 13 12 type bufferedOp struct { 14 13 kind string 15 14 path string 16 - rec typegen.CBORMarshaler 15 + rec *[]byte 17 16 cid *cid.Cid 18 17 } 19 18 ··· 64 63 return nil 65 64 } 66 65 67 - func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { 66 + func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec *[]byte, cid *cid.Cid) (bool, error) { 68 67 s.lk.Lock() 69 68 70 69 // If the job doesn't exist, we can't buffer an op for it ··· 174 173 return nil 175 174 } 176 175 177 - func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { 176 + func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, rev, path string, rec *[]byte, cid *cid.Cid) error) error { 178 177 panic("TODO: copy what we end up doing from the gormstore") 179 178 /* 180 179 j.lk.Lock()
+23 -5
repo/repo.go
··· 334 334 ctx, span := otel.Tracer("repo").Start(ctx, "GetRecord") 335 335 defer span.End() 336 336 337 + cc, recB, err := r.GetRecordBytes(ctx, rpath) 338 + if err != nil { 339 + return cid.Undef, nil, err 340 + } 341 + 342 + if recB == nil { 343 + return cid.Undef, nil, fmt.Errorf("empty record bytes") 344 + } 345 + 346 + rec, err := lexutil.CborDecodeValue(*recB) 347 + if err != nil { 348 + return cid.Undef, nil, err 349 + } 350 + 351 + return cc, rec, nil 352 + } 353 + 354 + func (r *Repo) GetRecordBytes(ctx context.Context, rpath string) (cid.Cid, *[]byte, error) { 355 + ctx, span := otel.Tracer("repo").Start(ctx, "GetRecordBytes") 356 + defer span.End() 357 + 337 358 mst, err := r.getMst(ctx) 338 359 if err != nil { 339 360 return cid.Undef, nil, fmt.Errorf("getting repo mst: %w", err) ··· 349 370 return cid.Undef, nil, err 350 371 } 351 372 352 - rec, err := lexutil.CborDecodeValue(blk.RawData()) 353 - if err != nil { 354 - return cid.Undef, nil, err 355 - } 373 + raw := blk.RawData() 356 374 357 - return cc, rec, nil 375 + return cc, &raw, nil 358 376 } 359 377 360 378 func (r *Repo) DiffSince(ctx context.Context, oldrepo cid.Cid) ([]*mst.DiffOp, error) {
+14 -2
search/firehose.go
··· 15 15 "github.com/bluesky-social/indigo/backfill" 16 16 "github.com/bluesky-social/indigo/events" 17 17 "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 18 + lexutil "github.com/bluesky-social/indigo/lex/util" 18 19 "github.com/bluesky-social/indigo/repo" 20 + typegen "github.com/whyrusleeping/cbor-gen" 19 21 20 22 "github.com/carlmjohnson/versioninfo" 21 23 "github.com/gorilla/websocket" 22 24 "github.com/ipfs/go-cid" 23 - typegen "github.com/whyrusleeping/cbor-gen" 24 25 ) 25 26 26 27 func (s *Server) getLastCursor() (int64, error) { ··· 174 175 log.Info("finished repo discovery", "totalJobs", total, "totalErrored", totalErrored) 175 176 } 176 177 177 - func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, rev string, path string, rec typegen.CBORMarshaler, rcid *cid.Cid) error { 178 + func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, rev string, path string, recB *[]byte, rcid *cid.Cid) error { 178 179 // Since this gets called in a backfill job, we need to check if the path is a post or profile 179 180 if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { 180 181 return nil ··· 191 192 } 192 193 if ident == nil { 193 194 return fmt.Errorf("identity not found for did: %s", did.String()) 195 + } 196 + 197 + // CBOR Unmarshal the record 198 + recCBOR, err := lexutil.CborDecodeValue(*recB) 199 + if err != nil { 200 + return fmt.Errorf("cbor decode: %w", err) 201 + } 202 + 203 + rec, ok := recCBOR.(typegen.CBORMarshaler) 204 + if !ok { 205 + return fmt.Errorf("failed to cast record to CBORMarshaler") 194 206 } 195 207 196 208 switch rec := rec.(type) {