this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

atproto/repo: new SDK package supporting inductive firehose (#936)

This is a new/alternative SDK for atproto repositories, to replace the
top-level `repo` package.

It wraps the newer `atproto/repo/mst` package, and support inductive
firehose.

The initial scope (eg, for this PR) is relay and consumption use-cases,
not PDS and repo-maintenance use-cases (there there is some initial code
supporting the later).

The earlier MST PR included a skeleton of this package; this PR fleshes
it out.

Things to maybe look at before merging:

- [ ] this PR contains manual updates to `subscribeRepos` struct. maybe
we should wait for https://github.com/bluesky-social/atproto/pull/3391
or https://github.com/bluesky-social/atproto/pull/3449 to land
- [x] the `VerifyCommitMessage` function is included as an example.
maybe that should go in a different package? in the CLI tool? or
explicitly as a test/example function. (added more detailed comment
about status)
- [x] add doc strings, especially disclaiming the overall state of this
package (isn't ready for a PDS implementation, for example)

authored by

bnewbold and committed by
GitHub
73435524 63d4acc5

+382 -89
+90 -3
api/atproto/cbor_gen.go
··· 343 343 } 344 344 345 345 cw := cbg.NewCborWriter(w) 346 - fieldCount := 12 346 + fieldCount := 13 347 347 348 348 if t.Blocks == nil { 349 + fieldCount-- 350 + } 351 + 352 + if t.PrevData == nil { 349 353 fieldCount-- 350 354 } 351 355 ··· 616 620 if err := cbg.WriteBool(w, t.TooBig); err != nil { 617 621 return err 618 622 } 623 + 624 + // t.PrevData (util.LexLink) (struct) 625 + if t.PrevData != nil { 626 + 627 + if len("prevData") > 1000000 { 628 + return xerrors.Errorf("Value in field \"prevData\" was too long") 629 + } 630 + 631 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("prevData"))); err != nil { 632 + return err 633 + } 634 + if _, err := cw.WriteString(string("prevData")); err != nil { 635 + return err 636 + } 637 + 638 + if err := t.PrevData.MarshalCBOR(cw); err != nil { 639 + return err 640 + } 641 + } 619 642 return nil 620 643 } 621 644 ··· 644 667 645 668 n := extra 646 669 647 - nameBuf := make([]byte, 6) 670 + nameBuf := make([]byte, 8) 648 671 for i := uint64(0); i < n; i++ { 649 672 nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 650 673 if err != nil { ··· 916 939 t.TooBig = true 917 940 default: 918 941 return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) 942 + } 943 + // t.PrevData (util.LexLink) (struct) 944 + case "prevData": 945 + 946 + { 947 + 948 + b, err := cr.ReadByte() 949 + if err != nil { 950 + return err 951 + } 952 + if b != cbg.CborNull[0] { 953 + if err := cr.UnreadByte(); err != nil { 954 + return err 955 + } 956 + t.PrevData = new(util.LexLink) 957 + if err := t.PrevData.UnmarshalCBOR(cr); err != nil { 958 + return xerrors.Errorf("unmarshaling t.PrevData pointer: %w", err) 959 + } 960 + } 961 + 919 962 } 920 963 921 964 default: ··· 2055 2098 } 2056 2099 2057 2100 cw := cbg.NewCborWriter(w) 2101 + fieldCount := 4 2058 2102 2059 - if _, err := cw.Write([]byte{163}); err != nil { 2103 + if t.Prev == nil { 2104 + fieldCount-- 2105 + } 2106 + 2107 + if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil { 2060 2108 return err 2061 2109 } 2062 2110 ··· 2097 2145 } 2098 2146 if _, err := cw.WriteString(string(t.Path)); err != nil { 2099 2147 return err 2148 + } 2149 + 2150 + // t.Prev (util.LexLink) (struct) 2151 + if t.Prev != nil { 2152 + 2153 + if len("prev") > 1000000 { 2154 + return xerrors.Errorf("Value in field \"prev\" was too long") 2155 + } 2156 + 2157 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("prev"))); err != nil { 2158 + return err 2159 + } 2160 + if _, err := cw.WriteString(string("prev")); err != nil { 2161 + return err 2162 + } 2163 + 2164 + if err := t.Prev.MarshalCBOR(cw); err != nil { 2165 + return err 2166 + } 2100 2167 } 2101 2168 2102 2169 // t.Action (string) (string) ··· 2195 2262 } 2196 2263 2197 2264 t.Path = string(sval) 2265 + } 2266 + // t.Prev (util.LexLink) (struct) 2267 + case "prev": 2268 + 2269 + { 2270 + 2271 + b, err := cr.ReadByte() 2272 + if err != nil { 2273 + return err 2274 + } 2275 + if b != cbg.CborNull[0] { 2276 + if err := cr.UnreadByte(); err != nil { 2277 + return err 2278 + } 2279 + t.Prev = new(util.LexLink) 2280 + if err := t.Prev.UnmarshalCBOR(cr); err != nil { 2281 + return xerrors.Errorf("unmarshaling t.Prev pointer: %w", err) 2282 + } 2283 + } 2284 + 2198 2285 } 2199 2286 // t.Action (string) (string) 2200 2287 case "action":
+7 -3
api/atproto/syncsubscribeRepos.go
··· 29 29 // blocks: CAR file containing relevant blocks, as a diff since the previous repo state. 30 30 Blocks util.LexBytes `json:"blocks,omitempty" cborgen:"blocks,omitempty"` 31 31 // commit: Repo commit object CID. 32 - Commit util.LexLink `json:"commit" cborgen:"commit"` 33 - Ops []*SyncSubscribeRepos_RepoOp `json:"ops" cborgen:"ops"` 32 + Commit util.LexLink `json:"commit" cborgen:"commit"` 33 + // prevData 34 + PrevData *util.LexLink `json:"prevData,omitempty" cborgen:"prevData,omitempty"` 35 + Ops []*SyncSubscribeRepos_RepoOp `json:"ops" cborgen:"ops"` 34 36 // prev: DEPRECATED -- unused. WARNING -- nullable and optional; stick with optional to ensure golang interoperability. 35 37 Prev *util.LexLink `json:"prev" cborgen:"prev"` 36 38 // rebase: DEPRECATED -- unused ··· 92 94 type SyncSubscribeRepos_RepoOp struct { 93 95 Action string `json:"action" cborgen:"action"` 94 96 // cid: For creates and updates, the new record CID. For deletions, null. 95 - Cid *util.LexLink `json:"cid" cborgen:"cid"` 97 + Cid *util.LexLink `json:"cid" cborgen:"cid"` 98 + // prev 99 + Prev *util.LexLink `json:"prev,omitempty" cborgen:"prev,omitempty"` 96 100 Path string `json:"path" cborgen:"path"` 97 101 } 98 102
+14 -14
atproto/repo/car.go
··· 14 14 "github.com/ipld/go-car" 15 15 ) 16 16 17 - func LoadFromCAR(ctx context.Context, r io.Reader) (*Repo, error) { 17 + func LoadFromCAR(ctx context.Context, r io.Reader) (*Commit, *Repo, error) { 18 18 19 19 bs := blockstore.NewBlockstore(datastore.NewMapDatastore()) 20 20 21 21 cr, err := car.NewCarReader(r) 22 22 if err != nil { 23 - return nil, err 23 + return nil, nil, err 24 24 } 25 25 26 26 if cr.Header.Version != 1 { 27 - return nil, fmt.Errorf("unsupported CAR file version: %d", cr.Header.Version) 27 + return nil, nil, fmt.Errorf("unsupported CAR file version: %d", cr.Header.Version) 28 28 } 29 29 if len(cr.Header.Roots) < 1 { 30 - return nil, fmt.Errorf("CAR file missing root CID") 30 + return nil, nil, fmt.Errorf("CAR file missing root CID") 31 31 } 32 32 commitCID := cr.Header.Roots[0] 33 33 ··· 37 37 if err == io.EOF { 38 38 break 39 39 } 40 - return nil, err 40 + return nil, nil, err 41 41 } 42 42 43 43 if err := bs.Put(ctx, blk); err != nil { 44 - return nil, err 44 + return nil, nil, err 45 45 } 46 46 } 47 47 48 48 commitBlock, err := bs.Get(ctx, commitCID) 49 49 if err != nil { 50 - return nil, fmt.Errorf("reading commit block from CAR file: %w", err) 50 + return nil, nil, fmt.Errorf("reading commit block from CAR file: %w", err) 51 51 } 52 52 53 53 var commit Commit 54 54 if err := commit.UnmarshalCBOR(bytes.NewReader(commitBlock.RawData())); err != nil { 55 - return nil, fmt.Errorf("parsing commit block from CAR file: %w", err) 55 + return nil, nil, fmt.Errorf("parsing commit block from CAR file: %w", err) 56 56 } 57 57 if err := commit.VerifyStructure(); err != nil { 58 - return nil, fmt.Errorf("parsing commit block from CAR file: %w", err) 58 + return nil, nil, fmt.Errorf("parsing commit block from CAR file: %w", err) 59 59 } 60 60 61 61 tree, err := mst.LoadTreeFromStore(ctx, bs, commit.Data) 62 62 if err != nil { 63 - return nil, fmt.Errorf("reading MST from CAR file: %w", err) 63 + return nil, nil, fmt.Errorf("reading MST from CAR file: %w", err) 64 64 } 65 + clk := syntax.ClockFromTID(syntax.TID(commit.Rev)) 65 66 repo := Repo{ 66 - DID: syntax.DID(commit.DID), // VerifyStructure() verified syntax 67 - Clock: syntax.NewTIDClock(0), // TODO: initialize with commit.Rev 68 - Commit: &commit, 67 + DID: syntax.DID(commit.DID), // NOTE: VerifyStructure() already checked DID syntax 68 + Clock: &clk, 69 69 MST: *tree, 70 70 RecordStore: bs, // TODO: put just records in a smaller blockstore? 71 71 } 72 - return &repo, nil 72 + return &commit, &repo, nil 73 73 }
+25 -17
atproto/repo/cbor_gen.go
··· 27 27 cw := cbg.NewCborWriter(w) 28 28 fieldCount := 6 29 29 30 + if t.Sig == nil { 31 + fieldCount-- 32 + } 33 + 30 34 if t.Rev == "" { 31 35 fieldCount-- 32 36 } ··· 85 89 } 86 90 87 91 // t.Sig ([]uint8) (slice) 88 - if len("sig") > 1000000 { 89 - return xerrors.Errorf("Value in field \"sig\" was too long") 90 - } 92 + if t.Sig != nil { 91 93 92 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("sig"))); err != nil { 93 - return err 94 - } 95 - if _, err := cw.WriteString(string("sig")); err != nil { 96 - return err 97 - } 94 + if len("sig") > 1000000 { 95 + return xerrors.Errorf("Value in field \"sig\" was too long") 96 + } 98 97 99 - if len(t.Sig) > 2097152 { 100 - return xerrors.Errorf("Byte array in field t.Sig was too long") 101 - } 98 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("sig"))); err != nil { 99 + return err 100 + } 101 + if _, err := cw.WriteString(string("sig")); err != nil { 102 + return err 103 + } 102 104 103 - if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Sig))); err != nil { 104 - return err 105 - } 105 + if len(t.Sig) > 2097152 { 106 + return xerrors.Errorf("Byte array in field t.Sig was too long") 107 + } 108 + 109 + if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Sig))); err != nil { 110 + return err 111 + } 112 + 113 + if _, err := cw.Write(t.Sig); err != nil { 114 + return err 115 + } 106 116 107 - if _, err := cw.Write(t.Sig); err != nil { 108 - return err 109 117 } 110 118 111 119 // t.Data (cid.Cid) (struct)
+54 -3
atproto/repo/cmd/repo-tool/main.go
··· 8 8 "os" 9 9 "strings" 10 10 11 + "github.com/bluesky-social/indigo/atproto/identity" 11 12 "github.com/bluesky-social/indigo/atproto/repo" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 12 14 13 15 "github.com/urfave/cli/v2" 14 16 ) ··· 31 33 Usage: "load a CAR file and check the MST tree", 32 34 ArgsUsage: "<path>", 33 35 Action: runVerifyCarMst, 36 + }, 37 + &cli.Command{ 38 + Name: "verify-car-signature", 39 + Usage: "load a CAR file and check the commit message signature", 40 + ArgsUsage: "<path>", 41 + Action: runVerifyCarSignature, 34 42 }, 35 43 &cli.Command{ 36 44 Name: "verify-firehose", ··· 85 93 } 86 94 defer f.Close() 87 95 88 - repo, err := repo.LoadFromCAR(ctx, f) 96 + commit, repo, err := repo.LoadFromCAR(ctx, f) 89 97 if err != nil { 90 98 return err 91 99 } ··· 95 103 return err 96 104 } 97 105 98 - if repo.Commit.Data != *computedCID { 99 - return fmt.Errorf("failed to re-compute: %s != %s", computedCID, repo.Commit.Data) 106 + if commit.Data != *computedCID { 107 + return fmt.Errorf("failed to re-compute: %s != %s", computedCID, commit.Data) 100 108 } 101 109 fmt.Println("verified tree") 102 110 return nil 103 111 } 112 + 113 + func runVerifyCarSignature(cctx *cli.Context) error { 114 + ctx := context.Background() 115 + dir := identity.DefaultDirectory() 116 + 117 + p := cctx.Args().First() 118 + if p == "" { 119 + return fmt.Errorf("need to provide path to CAR file") 120 + } 121 + 122 + f, err := os.Open(p) 123 + if err != nil { 124 + return err 125 + } 126 + defer f.Close() 127 + 128 + commit, _, err := repo.LoadFromCAR(ctx, f) 129 + if err != nil { 130 + return err 131 + } 132 + 133 + if err := commit.VerifyStructure(); err != nil { 134 + return err 135 + } 136 + did, err := syntax.ParseDID(commit.DID) 137 + if err != nil { 138 + return err 139 + } 140 + 141 + ident, err := dir.LookupDID(ctx, did) 142 + if err != nil { 143 + return err 144 + } 145 + pubkey, err := ident.PublicKey() 146 + if err != nil { 147 + return err 148 + } 149 + if err := commit.VerifySignature(pubkey); err != nil { 150 + return err 151 + } 152 + fmt.Println("verified signature") 153 + return nil 154 + }
+55 -4
atproto/repo/commit.go
··· 1 1 package repo 2 2 3 3 import ( 4 + "bytes" 4 5 "fmt" 5 6 7 + "github.com/bluesky-social/indigo/atproto/crypto" 6 8 "github.com/bluesky-social/indigo/atproto/syntax" 7 9 8 10 "github.com/ipfs/go-cid" 9 11 ) 10 12 13 + // atproto repo commit object as a struct type. Can be used for direct CBOR or JSON serialization. 11 14 type Commit struct { 12 15 DID string `json:"did" cborgen:"did"` 13 16 Version int64 `json:"version" cborgen:"version"` // currently: 3 14 - Prev *cid.Cid `json:"prev" cborgen:"prev"` // TODO: could we omitempty yet? breaks signatures I guess 17 + Prev *cid.Cid `json:"prev" cborgen:"prev"` // NOTE: omitempty would break signature verification for repo v3 15 18 Data cid.Cid `json:"data" cborgen:"data"` 16 - Sig []byte `json:"sig" cborgen:"sig"` 17 - Rev string `json:"rev" cborgen:"rev"` 19 + Sig []byte `json:"sig,omitempty" cborgen:"sig,omitempty"` 20 + Rev string `json:"rev,omitempty" cborgen:"rev,omitempty"` 18 21 } 19 22 20 - // does basic checks that syntax is correct 23 + // does basic checks that field values and syntax are correct 21 24 func (c *Commit) VerifyStructure() error { 22 25 if c.Version != ATPROTO_REPO_VERSION { 23 26 return fmt.Errorf("unsupported repo version: %d", c.Version) ··· 35 38 } 36 39 return nil 37 40 } 41 + 42 + // Encodes the commit object as DAG-CBOR, without the signature field. Used for signing or validating signatures. 43 + func (c *Commit) UnsignedBytes() ([]byte, error) { 44 + buf := new(bytes.Buffer) 45 + if c.Sig == nil { 46 + if err := c.MarshalCBOR(buf); err != nil { 47 + return nil, err 48 + } 49 + return buf.Bytes(), nil 50 + } 51 + unsigned := Commit{ 52 + DID: c.DID, 53 + Version: c.Version, 54 + Prev: c.Prev, 55 + Data: c.Data, 56 + Rev: c.Rev, 57 + } 58 + if err := unsigned.MarshalCBOR(buf); err != nil { 59 + return nil, err 60 + } 61 + return buf.Bytes(), nil 62 + } 63 + 64 + // Signs the commit, storing the signature in the `Sig` field 65 + func (c *Commit) Sign(privkey crypto.PrivateKey) error { 66 + b, err := c.UnsignedBytes() 67 + if err != nil { 68 + return err 69 + } 70 + sig, err := privkey.HashAndSign(b) 71 + if err != nil { 72 + return err 73 + } 74 + c.Sig = sig 75 + return nil 76 + } 77 + 78 + // Verifies `Sig` field using the provided key. Returns `nil` if signature is valid. 79 + func (c *Commit) VerifySignature(pubkey crypto.PublicKey) error { 80 + if c.Sig == nil { 81 + return fmt.Errorf("can not verify unsigned commit") 82 + } 83 + b, err := c.UnsignedBytes() 84 + if err != nil { 85 + return err 86 + } 87 + return pubkey.HashAndVerify(b, c.Sig) 88 + }
+6
atproto/repo/doc.go
··· 1 + /* 2 + Implementation of atproto repository and sync APIs, built on the MST data structure. 3 + 4 + The current package works for processing a sync firehose, including validation of "inductive firehose". It does not yet work for implementing a repository host (PDS). 5 + */ 6 + package repo
+8 -2
atproto/repo/operation.go
··· 9 9 "github.com/ipfs/go-cid" 10 10 ) 11 11 12 + // Metadata about update to a single record (key) in the repo. 13 + // 14 + // Used as an abstraction for creating or validating "commit diffs" (eg, `#commit` firehose events) 12 15 type Operation struct { 13 - Path string 16 + // key of the record, eg, '{collection}/{record-key}' 17 + Path string 18 + // the new record CID value (or nil if this is a deletion) 14 19 Value *cid.Cid 15 - Prev *cid.Cid 20 + // the previous record CID value (or nil if this is a creation) 21 + Prev *cid.Cid 16 22 } 17 23 18 24 func (op *Operation) IsCreate() bool {
+21 -16
atproto/repo/repo.go
··· 15 15 // Version of the repo data format implemented in this package 16 16 const ATPROTO_REPO_VERSION int64 = 3 17 17 18 + // High-level wrapper struct for an atproto repository. 18 19 type Repo struct { 19 - DID syntax.DID 20 - Clock *syntax.TIDClock 21 - Commit *Commit 20 + DID syntax.DID 21 + Clock *syntax.TIDClock 22 22 23 23 RecordStore blockstore.Blockstore 24 24 MST mst.Tree ··· 26 26 27 27 var ErrNotFound = errors.New("record not found in repository") 28 28 29 - func NewRepo(did syntax.DID) Repo { 29 + func NewEmptyRepo(did syntax.DID) Repo { 30 + clk := syntax.NewTIDClock(0) 30 31 return Repo{ 31 32 DID: did, 32 - Clock: syntax.NewTIDClock(0), 33 - Commit: nil, 33 + Clock: &clk, 34 34 RecordStore: blockstore.NewBlockstore(datastore.NewMapDatastore()), 35 35 MST: mst.NewEmptyTree(), 36 36 } ··· 61 61 return blk.RawData(), nil 62 62 } 63 63 64 - // TODO: 65 - // IsComplete() 66 - // LoadFromStore 67 - // LoadFromCAR(reader) 68 - // WriteBlocks 69 - // WriteCAR 70 - // VerifyCIDs(bool) 71 - // Export 72 - // GetRecordStruct 73 - // GetRecordProof 64 + // Snapshots the current state of the repository, resulting in a new (unsigned) `Commit` struct. 65 + func (repo *Repo) Commit() (*Commit, error) { 66 + root, err := repo.MST.RootCID() 67 + if err != nil { 68 + return nil, err 69 + } 70 + c := Commit{ 71 + DID: repo.DID.String(), 72 + Version: ATPROTO_REPO_VERSION, 73 + Prev: nil, 74 + Data: *root, 75 + Rev: repo.Clock.Next().String(), 76 + } 77 + return &c, nil 78 + }
+90 -24
atproto/repo/sync.go
··· 7 7 "log/slog" 8 8 9 9 comatproto "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/atproto/identity" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 11 12 12 13 "github.com/ipfs/go-cid" 13 14 ) 14 15 15 - // temporary/experimental method to parse and verify a firehose commit message 16 + // temporary/experimental method to parse and verify a firehose commit message. 17 + // 18 + // TODO: move to a separate 'sync' package? break up in to smaller components? 16 19 func VerifyCommitMessage(ctx context.Context, msg *comatproto.SyncSubscribeRepos_Commit) (*Repo, error) { 17 20 18 21 logger := slog.Default().With("did", msg.Repo, "rev", msg.Rev, "seq", msg.Seq, "time", msg.Time) ··· 37 40 logger.Warn("event with rebase flag set") 38 41 } 39 42 40 - repo, err := LoadFromCAR(ctx, bytes.NewReader([]byte(msg.Blocks))) 43 + commit, repo, err := LoadFromCAR(ctx, bytes.NewReader([]byte(msg.Blocks))) 41 44 if err != nil { 42 45 return nil, err 43 46 } 44 47 45 - if repo.Commit.Rev != rev.String() { 48 + if commit.Rev != rev.String() { 46 49 return nil, fmt.Errorf("rev did not match commit") 47 50 } 48 - if repo.Commit.DID != did.String() { 51 + if commit.DID != did.String() { 49 52 return nil, fmt.Errorf("rev did not match commit") 50 53 } 51 54 // TODO: check that commit CID matches root? re-compute? ··· 72 75 } 73 76 } 74 77 75 - // TODO: once firehose format is updated, remove this 78 + // TODO: once firehose format is fully shipped, remove this 76 79 for _, o := range msg.Ops { 77 - if o.Action != "create" { 78 - logger.Info("can't invert legacy op", "action", o.Action) 79 - return repo, nil 80 + switch o.Action { 81 + case "delete": 82 + if o.Prev == nil { 83 + logger.Info("can't invert legacy op", "action", o.Action) 84 + return repo, nil 85 + } 86 + case "update": 87 + if o.Prev == nil { 88 + logger.Info("can't invert legacy op", "action", o.Action) 89 + return repo, nil 90 + } 80 91 } 81 92 } 82 93 83 - ops, err := ParseCommitOps(msg.Ops) 94 + ops, err := parseCommitOps(msg.Ops) 84 95 if err != nil { 85 96 return nil, err 86 97 } ··· 97 108 return nil, err 98 109 } 99 110 } 100 - // TODO: compare against previous commit for this repo? 101 - _, err = invTree.RootCID() 111 + computed, err := invTree.RootCID() 112 + if err != nil { 113 + return nil, err 114 + } 115 + if msg.PrevData != nil { 116 + c := (*cid.Cid)(msg.PrevData) 117 + if *computed != *c { 118 + return nil, fmt.Errorf("inverted tree root didn't match prevData") 119 + } 120 + logger.Debug("prevData matched", "prevData", c.String(), "computed", computed.String()) 121 + } else { 122 + logger.Info("prevData was null; skipping tree root check") 123 + } 102 124 103 125 logger.Info("success") 104 126 return repo, nil 105 127 } 106 128 107 - func ParseCommitOps(ops []*comatproto.SyncSubscribeRepos_RepoOp) ([]Operation, error) { 108 - //out := make([]mst.Operation, len(ops)) 129 + func parseCommitOps(ops []*comatproto.SyncSubscribeRepos_RepoOp) ([]Operation, error) { 130 + //out := make([]Operation, len(ops)) 109 131 out := []Operation{} 110 132 for _, rop := range ops { 111 133 switch rop.Action { 112 134 case "create": 113 - if rop.Cid != nil { 114 - op := Operation{ 115 - Path: rop.Path, 116 - Prev: nil, 117 - Value: (*cid.Cid)(rop.Cid), 118 - } 119 - out = append(out, op) 120 - } else { 121 - return nil, fmt.Errorf("invalid repoOp: create missing CID") 135 + if rop.Cid == nil || rop.Prev != nil { 136 + return nil, fmt.Errorf("invalid repoOp: create") 122 137 } 138 + op := Operation{ 139 + Path: rop.Path, 140 + Prev: nil, 141 + Value: (*cid.Cid)(rop.Cid), 142 + } 143 + out = append(out, op) 123 144 case "delete": 124 - return nil, fmt.Errorf("unhandled delete repoOp") 145 + if rop.Cid != nil || rop.Prev == nil { 146 + return nil, fmt.Errorf("invalid repoOp: delete") 147 + } 148 + op := Operation{ 149 + Path: rop.Path, 150 + Prev: (*cid.Cid)(rop.Prev), 151 + Value: nil, 152 + } 153 + out = append(out, op) 125 154 case "update": 126 - return nil, fmt.Errorf("unhandled update repoOp") 155 + if rop.Cid == nil || rop.Prev == nil { 156 + return nil, fmt.Errorf("invalid repoOp: update") 157 + } 158 + op := Operation{ 159 + Path: rop.Path, 160 + Prev: (*cid.Cid)(rop.Prev), 161 + Value: (*cid.Cid)(rop.Cid), 162 + } 163 + out = append(out, op) 127 164 default: 128 165 return nil, fmt.Errorf("invalid repoOp action: %s", rop.Action) 129 166 } 130 167 } 131 168 return out, nil 132 169 } 170 + 171 + // temporary/experimental code showing how to verify a commit signature from firehose 172 + // 173 + // TODO: in real implementation, will want to merge this code with `VerifyCommitMessage` above, and have it hanging off some service struct with a configured `identity.Directory` 174 + func VerifyCommitSignature(ctx context.Context, dir identity.Directory, msg *comatproto.SyncSubscribeRepos_Commit) error { 175 + commit, _, err := LoadFromCAR(ctx, bytes.NewReader([]byte(msg.Blocks))) 176 + if err != nil { 177 + return err 178 + } 179 + 180 + if err := commit.VerifyStructure(); err != nil { 181 + return err 182 + } 183 + did, err := syntax.ParseDID(commit.DID) 184 + if err != nil { 185 + return err 186 + } 187 + 188 + ident, err := dir.LookupDID(ctx, did) 189 + if err != nil { 190 + return err 191 + } 192 + pubkey, err := ident.PublicKey() 193 + if err != nil { 194 + return err 195 + } 196 + 197 + return commit.VerifySignature(pubkey) 198 + }
+1 -1
atproto/repo/sync_test.go
··· 47 47 _, err = VerifyCommitMessage(ctx, &msg) 48 48 assert.NoError(err) 49 49 if err != nil { 50 - repo, err := LoadFromCAR(ctx, bytes.NewReader([]byte(msg.Blocks))) 50 + _, repo, err := LoadFromCAR(ctx, bytes.NewReader([]byte(msg.Blocks))) 51 51 if err != nil { 52 52 t.Fail() 53 53 }
+11 -2
atproto/syntax/tid.go
··· 123 123 lastUnixMicro int64 124 124 } 125 125 126 - func NewTIDClock(clockId uint) *TIDClock { 127 - return &TIDClock{ 126 + func NewTIDClock(clockId uint) TIDClock { 127 + return TIDClock{ 128 128 ClockID: clockId, 129 + } 130 + } 131 + 132 + func ClockFromTID(t TID) TIDClock { 133 + um := t.Integer() 134 + um = (um >> 10) & 0x1FFF_FFFF_FFFF_FFFF 135 + return TIDClock{ 136 + ClockID: t.ClockID(), 137 + lastUnixMicro: int64(um), 129 138 } 130 139 } 131 140