this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

update `goat` to work with sync v1.1 (#982)

- [x] new MST implementation
- [x] `#sync` event

This is basically stacked on two other PRs:

- https://github.com/bluesky-social/indigo/pull/983
- https://github.com/bluesky-social/indigo/pull/984

authored by

bnewbold and committed by
GitHub
17b7770b 388bbc90

+181 -148
+32 -19
cmd/goat/firehose.go
··· 14 14 15 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 16 "github.com/bluesky-social/indigo/atproto/data" 17 + "github.com/bluesky-social/indigo/atproto/repo" 17 18 "github.com/bluesky-social/indigo/atproto/syntax" 18 19 "github.com/bluesky-social/indigo/events" 19 20 "github.com/bluesky-social/indigo/events/schedulers/parallel" 20 21 lexutil "github.com/bluesky-social/indigo/lex/util" 21 - "github.com/bluesky-social/indigo/repo" 22 - "github.com/bluesky-social/indigo/repomgr" 23 22 24 23 "github.com/carlmjohnson/versioninfo" 25 24 "github.com/gorilla/websocket" ··· 123 122 } 124 123 return nil 125 124 }, 125 + RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 126 + slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq) 127 + if !gfc.AccountsOnly && !gfc.OpsMode { 128 + return gfc.handleSyncEvent(ctx, evt) 129 + } 130 + return nil 131 + }, 126 132 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 127 133 slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq) 128 134 if !gfc.OpsMode { ··· 173 179 return nil 174 180 } 175 181 182 + func (gfc *GoatFirehoseConsumer) handleSyncEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error { 183 + out := make(map[string]interface{}) 184 + out["type"] = "sync" 185 + out["payload"] = evt 186 + b, err := json.Marshal(out) 187 + if err != nil { 188 + return err 189 + } 190 + fmt.Println(string(b)) 191 + return nil 192 + } 193 + 176 194 // this is the simple version, when not in "records" mode: print the event as JSON, but don't include blocks 177 195 func (gfc *GoatFirehoseConsumer) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 178 196 ··· 221 239 return nil 222 240 } 223 241 224 - rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 242 + _, rr, err := repo.LoadFromCAR(ctx, bytes.NewReader(evt.Blocks)) 225 243 if err != nil { 226 244 logger.Error("failed to read repo from car", "err", err) 227 245 return nil ··· 255 273 out["collection"] = collection 256 274 out["rkey"] = rkey 257 275 258 - ek := repomgr.EventKind(op.Action) 259 - switch ek { 260 - case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 276 + switch op.Action { 277 + case "create", "update": 278 + coll, rkey, err := syntax.ParseRepoPath(op.Path) 279 + if err != nil { 280 + return err 281 + } 261 282 // read the record bytes from blocks, and verify CID 262 - rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path) 283 + recBytes, rc, err := rr.GetRecordBytes(ctx, coll, rkey) 263 284 if err != nil { 264 285 logger.Error("reading record from event blocks (CAR)", "err", err) 265 286 break 266 287 } 267 - if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 288 + if op.Cid == nil || lexutil.LexLink(*rc) != *op.Cid { 268 289 logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 269 290 break 270 291 } 271 292 272 - switch ek { 273 - case repomgr.EvtKindCreateRecord: 274 - out["action"] = "create" 275 - case repomgr.EvtKindUpdateRecord: 276 - out["action"] = "update" 277 - default: 278 - logger.Error("impossible event kind", "kind", ek) 279 - break 280 - } 281 - d, err := data.UnmarshalCBOR(*recCBOR) 293 + out["action"] = op.Action 294 + d, err := data.UnmarshalCBOR(recBytes) 282 295 if err != nil { 283 296 slog.Warn("failed to parse record CBOR") 284 297 continue ··· 290 303 return err 291 304 } 292 305 fmt.Println(string(b)) 293 - case repomgr.EvtKindDeleteRecord: 306 + case "delete": 294 307 out["action"] = "delete" 295 308 b, err := json.Marshal(out) 296 309 if err != nil {
+22 -129
cmd/goat/repo.go
··· 8 8 "fmt" 9 9 "os" 10 10 "path/filepath" 11 - "strings" 12 11 "time" 13 12 14 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 14 "github.com/bluesky-social/indigo/atproto/data" 15 + "github.com/bluesky-social/indigo/atproto/repo" 16 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 - "github.com/bluesky-social/indigo/mst" 18 - "github.com/bluesky-social/indigo/repo" 19 17 "github.com/bluesky-social/indigo/util" 20 18 "github.com/bluesky-social/indigo/xrpc" 21 19 22 20 "github.com/ipfs/go-cid" 23 - cbor "github.com/ipfs/go-ipld-cbor" 24 - ipld "github.com/ipfs/go-ipld-format" 25 21 "github.com/urfave/cli/v2" 26 - "github.com/xlab/treeprint" 27 22 ) 28 23 29 24 var cmdRepo = &cli.Command{ ··· 189 184 } 190 185 191 186 // read repository tree in to memory 192 - r, err := repo.ReadRepoFromCar(ctx, fi) 187 + _, r, err := repo.LoadFromCAR(ctx, fi) 193 188 if err != nil { 194 189 return fmt.Errorf("failed to parse repo CAR file: %w", err) 195 190 } 196 191 197 - err = r.ForEach(ctx, "", func(k string, v cid.Cid) error { 198 - fmt.Printf("%s\t%s\n", k, v.String()) 192 + err = r.MST.Walk(func(k []byte, v cid.Cid) error { 193 + fmt.Printf("%s\t%s\n", string(k), v.String()) 199 194 return nil 200 195 }) 201 196 if err != nil { ··· 216 211 } 217 212 218 213 // read repository tree in to memory 219 - r, err := repo.ReadRepoFromCar(ctx, fi) 214 + c, _, err := repo.LoadFromCAR(ctx, fi) 220 215 if err != nil { 221 216 return err 222 217 } 223 218 224 - sc := r.SignedCommit() 225 - fmt.Printf("ATProto Repo Spec Version: %d\n", sc.Version) 226 - fmt.Printf("DID: %s\n", sc.Did) 227 - fmt.Printf("Data CID: %s\n", sc.Data) 228 - fmt.Printf("Prev CID: %s\n", sc.Prev) 229 - fmt.Printf("Revision: %s\n", sc.Rev) 219 + fmt.Printf("ATProto Repo Spec Version: %d\n", c.Version) 220 + fmt.Printf("DID: %s\n", c.DID) 221 + fmt.Printf("Data CID: %s\n", c.Data) 222 + fmt.Printf("Prev CID: %s\n", c.Prev) 223 + fmt.Printf("Revision: %s\n", c.Rev) 230 224 // TODO: Signature? 231 225 232 226 return nil ··· 247 241 if err != nil { 248 242 return err 249 243 } 250 - // read repository tree in to memory 251 - r, err := repo.ReadRepoFromCar(ctx, inputCAR) 252 - if err != nil { 253 - return err 254 - } 255 - cst := util.CborStore(r.Blockstore()) 256 - // determine which root cid to use, defaulting to repo data root 257 - rootCID := r.DataCid() 258 - if opts.root != "" { 259 - optsRootCID, err := cid.Decode(opts.root) 260 - if err != nil { 261 - return err 262 - } 263 - rootCID = optsRootCID 264 - } 265 - // start walking mst 266 - exists, err := nodeExists(ctx, cst, rootCID) 267 - if err != nil { 268 - return err 269 - } 270 - tree := treeprint.NewWithRoot(displayCID(&rootCID, exists, opts)) 271 - if exists { 272 - if err := walkMST(ctx, cst, rootCID, tree, opts); err != nil { 273 - return err 274 - } 275 - } 276 - // print tree 277 - fmt.Println(tree.String()) 278 - return nil 279 - } 280 - 281 - func walkMST(ctx context.Context, cst *cbor.BasicIpldStore, cid cid.Cid, tree treeprint.Tree, opts repoMSTOptions) error { 282 - var node mst.NodeData 283 - if err := cst.Get(ctx, cid, &node); err != nil { 284 - return err 285 - } 286 - if node.Left != nil { 287 - exists, err := nodeExists(ctx, cst, *node.Left) 288 - if err != nil { 289 - return err 290 - } 291 - subtree := tree.AddBranch(displayCID(node.Left, exists, opts)) 292 - if exists { 293 - if err := walkMST(ctx, cst, *node.Left, subtree, opts); err != nil { 294 - return err 295 - } 296 - } 297 - } 298 - for _, entry := range node.Entries { 299 - exists, err := nodeExists(ctx, cst, entry.Val) 300 - if err != nil { 301 - return err 302 - } 303 - tree.AddNode(displayEntryVal(&entry, exists, opts)) 304 - if entry.Tree != nil { 305 - exists, err := nodeExists(ctx, cst, *entry.Tree) 306 - if err != nil { 307 - return err 308 - } 309 - subtree := tree.AddBranch(displayCID(entry.Tree, exists, opts)) 310 - if exists { 311 - if err := walkMST(ctx, cst, *entry.Tree, subtree, opts); err != nil { 312 - return err 313 - } 314 - } 315 - } 316 - } 317 - return nil 318 - } 319 - 320 - func displayEntryVal(entry *mst.TreeEntry, exists bool, opts repoMSTOptions) string { 321 - key := string(entry.KeySuffix) 322 - divider := " " 323 - if opts.fullCID { 324 - divider = "\n" 325 - } 326 - return strings.Repeat("∙", int(entry.PrefixLen)) + key + divider + displayCID(&entry.Val, exists, opts) 327 - } 328 - 329 - func displayCID(cid *cid.Cid, exists bool, opts repoMSTOptions) string { 330 - cidDisplay := cid.String() 331 - if !opts.fullCID { 332 - cidDisplay = "…" + string(cidDisplay[len(cidDisplay)-7:]) 333 - } 334 - connector := "─◉" 335 - if !exists { 336 - connector = "─◌" 337 - } 338 - return "[" + cidDisplay + "]" + connector 339 - } 340 - 341 - type repoMSTOptions struct { 342 - carPath string 343 - fullCID bool 344 - root string 345 - } 346 - 347 - func nodeExists(ctx context.Context, cst *cbor.BasicIpldStore, cid cid.Cid) (bool, error) { 348 - if _, err := cst.Blocks.Get(ctx, cid); err != nil { 349 - if errors.Is(err, ipld.ErrNotFound{}) { 350 - return false, nil 351 - } 352 - return false, err 353 - } 354 - return true, nil 244 + return prettyMST(ctx, inputCAR, opts) 355 245 } 356 246 357 247 func runRepoUnpack(cctx *cli.Context) error { ··· 365 255 return err 366 256 } 367 257 368 - r, err := repo.ReadRepoFromCar(ctx, fi) 258 + c, r, err := repo.LoadFromCAR(ctx, fi) 369 259 if err != nil { 370 260 return err 371 261 } 372 262 373 263 // extract DID from repo commit 374 - sc := r.SignedCommit() 375 - did, err := syntax.ParseDID(sc.Did) 264 + did, err := syntax.ParseDID(c.DID) 376 265 if err != nil { 377 266 return err 378 267 } ··· 386 275 // first the commit object as a meta file 387 276 commitPath := topDir + "/_commit.json" 388 277 os.MkdirAll(filepath.Dir(commitPath), os.ModePerm) 389 - commitJSON, err := json.MarshalIndent(sc, "", " ") 278 + commitJSON, err := json.MarshalIndent(c, "", " ") 390 279 if err != nil { 391 280 return err 392 281 } ··· 395 284 } 396 285 397 286 // then all the actual records 398 - err = r.ForEach(ctx, "", func(k string, v cid.Cid) error { 399 - _, recBytes, err := r.GetRecordBytes(ctx, k) 287 + err = r.MST.Walk(func(k []byte, v cid.Cid) error { 288 + col, rkey, err := syntax.ParseRepoPath(string(k)) 289 + if err != nil { 290 + return err 291 + } 292 + recBytes, _, err := r.GetRecordBytes(ctx, col, rkey) 400 293 if err != nil { 401 294 return err 402 295 } 403 296 404 - rec, err := data.UnmarshalCBOR(*recBytes) 297 + rec, err := data.UnmarshalCBOR(recBytes) 405 298 if err != nil { 406 299 return err 407 300 } 408 301 409 - recPath := topDir + "/" + k 302 + recPath := topDir + "/" + string(k) 410 303 fmt.Printf("%s.json\n", recPath) 411 304 os.MkdirAll(filepath.Dir(recPath), os.ModePerm) 412 305 if err != nil {
+127
cmd/goat/repo_prettyprint.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "io" 8 + "strings" 9 + 10 + "github.com/bluesky-social/indigo/mst" 11 + "github.com/bluesky-social/indigo/repo" 12 + "github.com/bluesky-social/indigo/util" 13 + 14 + "github.com/ipfs/go-cid" 15 + cbor "github.com/ipfs/go-ipld-cbor" 16 + ipld "github.com/ipfs/go-ipld-format" 17 + "github.com/xlab/treeprint" 18 + ) 19 + 20 + func prettyMST(ctx context.Context, carFile io.Reader, opts repoMSTOptions) error { 21 + 22 + // read repository tree in to memory 23 + r, err := repo.ReadRepoFromCar(ctx, carFile) 24 + if err != nil { 25 + return err 26 + } 27 + cst := util.CborStore(r.Blockstore()) 28 + // determine which root cid to use, defaulting to repo data root 29 + rootCID := r.DataCid() 30 + if opts.root != "" { 31 + optsRootCID, err := cid.Decode(opts.root) 32 + if err != nil { 33 + return err 34 + } 35 + rootCID = optsRootCID 36 + } 37 + // start walking mst 38 + exists, err := nodeExists(ctx, cst, rootCID) 39 + if err != nil { 40 + return err 41 + } 42 + tree := treeprint.NewWithRoot(displayCID(&rootCID, exists, opts)) 43 + if exists { 44 + if err := walkMST(ctx, cst, rootCID, tree, opts); err != nil { 45 + return err 46 + } 47 + } 48 + // print tree 49 + fmt.Println(tree.String()) 50 + return nil 51 + } 52 + 53 + func walkMST(ctx context.Context, cst *cbor.BasicIpldStore, cid cid.Cid, tree treeprint.Tree, opts repoMSTOptions) error { 54 + var node mst.NodeData 55 + if err := cst.Get(ctx, cid, &node); err != nil { 56 + return err 57 + } 58 + if node.Left != nil { 59 + exists, err := nodeExists(ctx, cst, *node.Left) 60 + if err != nil { 61 + return err 62 + } 63 + subtree := tree.AddBranch(displayCID(node.Left, exists, opts)) 64 + if exists { 65 + if err := walkMST(ctx, cst, *node.Left, subtree, opts); err != nil { 66 + return err 67 + } 68 + } 69 + } 70 + for _, entry := range node.Entries { 71 + exists, err := nodeExists(ctx, cst, entry.Val) 72 + if err != nil { 73 + return err 74 + } 75 + tree.AddNode(displayEntryVal(&entry, exists, opts)) 76 + if entry.Tree != nil { 77 + exists, err := nodeExists(ctx, cst, *entry.Tree) 78 + if err != nil { 79 + return err 80 + } 81 + subtree := tree.AddBranch(displayCID(entry.Tree, exists, opts)) 82 + if exists { 83 + if err := walkMST(ctx, cst, *entry.Tree, subtree, opts); err != nil { 84 + return err 85 + } 86 + } 87 + } 88 + } 89 + return nil 90 + } 91 + 92 + func displayEntryVal(entry *mst.TreeEntry, exists bool, opts repoMSTOptions) string { 93 + key := string(entry.KeySuffix) 94 + divider := " " 95 + if opts.fullCID { 96 + divider = "\n" 97 + } 98 + return strings.Repeat("∙", int(entry.PrefixLen)) + key + divider + displayCID(&entry.Val, exists, opts) 99 + } 100 + 101 + func displayCID(cid *cid.Cid, exists bool, opts repoMSTOptions) string { 102 + cidDisplay := cid.String() 103 + if !opts.fullCID { 104 + cidDisplay = "…" + string(cidDisplay[len(cidDisplay)-7:]) 105 + } 106 + connector := "─◉" 107 + if !exists { 108 + connector = "─◌" 109 + } 110 + return "[" + cidDisplay + "]" + connector 111 + } 112 + 113 + type repoMSTOptions struct { 114 + carPath string 115 + fullCID bool 116 + root string 117 + } 118 + 119 + func nodeExists(ctx context.Context, cst *cbor.BasicIpldStore, cid cid.Cid) (bool, error) { 120 + if _, err := cst.Blocks.Get(ctx, cid); err != nil { 121 + if errors.Is(err, ipld.ErrNotFound{}) { 122 + return false, nil 123 + } 124 + return false, err 125 + } 126 + return true, nil 127 + }