this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

some batch repo writes work

+143 -3
+1 -1
lex/gen.go
··· 967 967 // TODO: maybe do a native type? 968 968 return "string", nil 969 969 case "unknown": 970 - return "any", nil 970 + return "util.LexconTypeDecoder", nil 971 971 case "union": 972 972 return "*" + name + "_" + strings.Title(k), nil 973 973 case "image":
+24
lex/util/decoder.go
··· 29 29 lexTypesMap[id] = t 30 30 } 31 31 32 + func NewFromType(typ string) (interface{}, error) { 33 + t, ok := lexTypesMap[typ] 34 + if !ok { 35 + return nil, fmt.Errorf("unknown type: %q", typ) 36 + } 37 + v := reflect.New(t) 38 + return v.Interface(), nil 39 + } 40 + 32 41 func JsonDecodeValue(b []byte) (any, error) { 33 42 tstr, err := TypeExtract(b) 34 43 if err != nil { ··· 74 83 75 84 return ival, nil 76 85 } 86 + 87 + type LexconTypeDecoder struct { 88 + Val any 89 + } 90 + 91 + func (ltd *LexconTypeDecoder) UnmarshalJSON(b []byte) error { 92 + val, err := JsonDecodeValue(b) 93 + if err != nil { 94 + return err 95 + } 96 + 97 + ltd.Val = val 98 + 99 + return nil 100 + }
-1
lex/util/util.go
··· 48 48 } 49 49 50 50 return tcheck.Type, buf.Bytes(), nil 51 - 52 51 }
+108
repomgr/repomgr.go
··· 8 8 9 9 "github.com/ipfs/go-cid" 10 10 cbg "github.com/whyrusleeping/cbor-gen" 11 + atproto "github.com/whyrusleeping/gosky/api/atproto" 11 12 apibsky "github.com/whyrusleeping/gosky/api/bsky" 12 13 "github.com/whyrusleeping/gosky/carstore" 13 14 "github.com/whyrusleeping/gosky/repo" ··· 516 517 517 518 return nil 518 519 } 520 + 521 + func nsidForCollection(collection string) string { 522 + return collection + "/" + repo.NextTID() 523 + } 524 + 525 + func anyRecordParse(rec any) (cbg.CBORMarshaler, error) { 526 + // TODO: really should just have a fancy type that auto-things upon json unmarshal 527 + rmap, ok := rec.(map[string]any) 528 + if !ok { 529 + return nil, fmt.Errorf("record should have been an object") 530 + } 531 + 532 + t, ok := rmap["$type"].(string) 533 + if !ok { 534 + return nil, fmt.Errorf("records must have string $type field") 535 + } 536 + } 537 + 538 + func (rm *RepoManager) BatchWrite(ctx context.Context, user uint, writes []*atproto.RepoBatchWrite_Input_Writes_Elem) error { 539 + ctx, span := otel.Tracer("repoman").Start(ctx, "BatchWrite") 540 + defer span.End() 541 + 542 + unlock := rm.lockUser(ctx, user) 543 + defer unlock() 544 + 545 + head, err := rm.getUserRepoHead(ctx, user) 546 + if err != nil { 547 + return err 548 + } 549 + 550 + ds, err := rm.cs.NewDeltaSession(ctx, user, &head) 551 + if err != nil { 552 + return err 553 + } 554 + 555 + r, err := repo.OpenRepo(ctx, ds, head) 556 + if err != nil { 557 + return err 558 + } 559 + 560 + for _, w := range writes { 561 + switch { 562 + case w.RepoBatchWrite_Create != nil: 563 + c := w.RepoBatchWrite_Create 564 + var nsid string 565 + if c.Rkey != nil { 566 + nsid = c.Collection + "/" + *c.Rkey 567 + } else { 568 + nsid = nsidForCollection(c.Collection) 569 + } 570 + 571 + cc, rpath, err := r.CreateRecord(ctx, nsid, c.Value) 572 + if err != nil { 573 + return err 574 + } 575 + 576 + _ = rpath 577 + _ = cc // do we do something about this? 578 + case w.RepoBatchWrite_Update != nil: 579 + u := w.RepoBatchWrite_Update 580 + 581 + cc, err := r.PutRecord(ctx, u.Collection+"/"+u.Rkey, u.Value) 582 + if err != nil { 583 + return err 584 + } 585 + 586 + _ = cc 587 + case w.RepoBatchWrite_Delete != nil: 588 + d := w.RepoBatchWrite_Delete 589 + 590 + if err := r.DeleteRecord(ctx, d.Collection+"/"+d.Rkey); err != nil { 591 + return err 592 + } 593 + default: 594 + return fmt.Errorf("no operation set in write enum") 595 + } 596 + } 597 + 598 + nroot, err := r.Commit(ctx) 599 + if err != nil { 600 + return err 601 + } 602 + 603 + rslice, err := ds.CloseWithRoot(ctx, nroot) 604 + if err != nil { 605 + return fmt.Errorf("close with root: %w", err) 606 + } 607 + 608 + // TODO: what happens if this update fails? 609 + if err := rm.updateUserRepoHead(ctx, user, nroot); err != nil { 610 + return fmt.Errorf("updating user head: %w", err) 611 + } 612 + 613 + if rm.events != nil { 614 + rm.events(ctx, &RepoEvent{ 615 + Kind: EvtKindDeleteRecord, 616 + User: user, 617 + OldRoot: head, 618 + NewRoot: nroot, 619 + Collection: collection, 620 + Rkey: rkey, 621 + RepoSlice: rslice, 622 + }) 623 + } 624 + 625 + return nil 626 + }
+10 -1
server/handlers.go
··· 487 487 } 488 488 489 489 func (s *Server) handleComAtprotoRepoBatchWrite(ctx context.Context, input *comatprototypes.RepoBatchWrite_Input) error { 490 - panic("not yet implemented") 490 + u, err := s.getUser(ctx) 491 + if err != nil { 492 + return err 493 + } 494 + 495 + if u.Did != input.Did { 496 + return fmt.Errorf("writes for non-user actors not supported (DID mismatch)") 497 + } 498 + 499 + return s.repoman.BatchWrite(ctx, u.ID, input.Writes) 491 500 } 492 501 493 502 func (s *Server) handleComAtprotoRepoCreateRecord(ctx context.Context, input *comatprototypes.RepoCreateRecord_Input) (*comatprototypes.RepoCreateRecord_Output, error) {