this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

update goat to use new MST implementation

+34 -38
+13 -19
cmd/goat/firehose.go
··· 14 14 15 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 16 "github.com/bluesky-social/indigo/atproto/data" 17 + "github.com/bluesky-social/indigo/atproto/repo" 17 18 "github.com/bluesky-social/indigo/atproto/syntax" 18 19 "github.com/bluesky-social/indigo/events" 19 20 "github.com/bluesky-social/indigo/events/schedulers/parallel" 20 21 lexutil "github.com/bluesky-social/indigo/lex/util" 21 - "github.com/bluesky-social/indigo/repo" 22 - "github.com/bluesky-social/indigo/repomgr" 23 22 24 23 "github.com/carlmjohnson/versioninfo" 25 24 "github.com/gorilla/websocket" ··· 221 220 return nil 222 221 } 223 222 224 - rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 223 + _, rr, err := repo.LoadFromCAR(ctx, bytes.NewReader(evt.Blocks)) 225 224 if err != nil { 226 225 logger.Error("failed to read repo from car", "err", err) 227 226 return nil ··· 255 254 out["collection"] = collection 256 255 out["rkey"] = rkey 257 256 258 - ek := repomgr.EventKind(op.Action) 259 - switch ek { 260 - case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 257 + switch op.Action { 258 + case "create", "update": 259 + coll, rkey, err := syntax.ParseRepoPath(op.Path) 260 + if err != nil { 261 + return err 262 + } 261 263 // read the record bytes from blocks, and verify CID 262 - rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path) 264 + recBytes, rc, err := rr.GetRecordBytes(ctx, coll, rkey) 263 265 if err != nil { 264 266 logger.Error("reading record from event blocks (CAR)", "err", err) 265 267 break 266 268 } 267 - if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 269 + if op.Cid == nil || lexutil.LexLink(*rc) != *op.Cid { 268 270 logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 269 271 break 270 272 } 271 273 272 - switch ek { 273 - case repomgr.EvtKindCreateRecord: 274 - out["action"] = "create" 275 - case repomgr.EvtKindUpdateRecord: 276 - out["action"] = "update" 277 - default: 278 - logger.Error("impossible event kind", "kind", ek) 279 - break 280 - } 281 - d, err := data.UnmarshalCBOR(*recCBOR) 274 + out["action"] = op.Action 275 + d, err := data.UnmarshalCBOR(recBytes) 282 276 if err != nil { 283 277 slog.Warn("failed to parse record CBOR") 284 278 continue ··· 290 284 return err 291 285 } 292 286 fmt.Println(string(b)) 293 - case repomgr.EvtKindDeleteRecord: 287 + case "delete": 294 288 out["action"] = "delete" 295 289 b, err := json.Marshal(out) 296 290 if err != nil {
+21 -19
cmd/goat/repo.go
··· 12 12 13 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 14 "github.com/bluesky-social/indigo/atproto/data" 15 + "github.com/bluesky-social/indigo/atproto/repo" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 16 - "github.com/bluesky-social/indigo/repo" 17 17 "github.com/bluesky-social/indigo/util" 18 18 "github.com/bluesky-social/indigo/xrpc" 19 19 ··· 184 184 } 185 185 186 186 // read repository tree in to memory 187 - r, err := repo.ReadRepoFromCar(ctx, fi) 187 + _, r, err := repo.LoadFromCAR(ctx, fi) 188 188 if err != nil { 189 189 return fmt.Errorf("failed to parse repo CAR file: %w", err) 190 190 } 191 191 192 - err = r.ForEach(ctx, "", func(k string, v cid.Cid) error { 193 - fmt.Printf("%s\t%s\n", k, v.String()) 192 + err = r.MST.Walk(func(k []byte, v cid.Cid) error { 193 + fmt.Printf("%s\t%s\n", string(k), v.String()) 194 194 return nil 195 195 }) 196 196 if err != nil { ··· 211 211 } 212 212 213 213 // read repository tree in to memory 214 - r, err := repo.ReadRepoFromCar(ctx, fi) 214 + c, _, err := repo.LoadFromCAR(ctx, fi) 215 215 if err != nil { 216 216 return err 217 217 } 218 218 219 - sc := r.SignedCommit() 220 - fmt.Printf("ATProto Repo Spec Version: %d\n", sc.Version) 221 - fmt.Printf("DID: %s\n", sc.Did) 222 - fmt.Printf("Data CID: %s\n", sc.Data) 223 - fmt.Printf("Prev CID: %s\n", sc.Prev) 224 - fmt.Printf("Revision: %s\n", sc.Rev) 219 + fmt.Printf("ATProto Repo Spec Version: %d\n", c.Version) 220 + fmt.Printf("DID: %s\n", c.DID) 221 + fmt.Printf("Data CID: %s\n", c.Data) 222 + fmt.Printf("Prev CID: %s\n", c.Prev) 223 + fmt.Printf("Revision: %s\n", c.Rev) 225 224 // TODO: Signature? 226 225 227 226 return nil ··· 256 255 return err 257 256 } 258 257 259 - r, err := repo.ReadRepoFromCar(ctx, fi) 258 + c, r, err := repo.LoadFromCAR(ctx, fi) 260 259 if err != nil { 261 260 return err 262 261 } 263 262 264 263 // extract DID from repo commit 265 - sc := r.SignedCommit() 266 - did, err := syntax.ParseDID(sc.Did) 264 + did, err := syntax.ParseDID(c.DID) 267 265 if err != nil { 268 266 return err 269 267 } ··· 277 275 // first the commit object as a meta file 278 276 commitPath := topDir + "/_commit.json" 279 277 os.MkdirAll(filepath.Dir(commitPath), os.ModePerm) 280 - commitJSON, err := json.MarshalIndent(sc, "", " ") 278 + commitJSON, err := json.MarshalIndent(c, "", " ") 281 279 if err != nil { 282 280 return err 283 281 } ··· 286 284 } 287 285 288 286 // then all the actual records 289 - err = r.ForEach(ctx, "", func(k string, v cid.Cid) error { 290 - _, recBytes, err := r.GetRecordBytes(ctx, k) 287 + err = r.MST.Walk(func(k []byte, v cid.Cid) error { 288 + col, rkey, err := syntax.ParseRepoPath(string(k)) 289 + if err != nil { 290 + return err 291 + } 292 + recBytes, _, err := r.GetRecordBytes(ctx, col, rkey) 291 293 if err != nil { 292 294 return err 293 295 } 294 296 295 - rec, err := data.UnmarshalCBOR(*recBytes) 297 + rec, err := data.UnmarshalCBOR(recBytes) 296 298 if err != nil { 297 299 return err 298 300 } 299 301 300 - recPath := topDir + "/" + k 302 + recPath := topDir + "/" + string(k) 301 303 fmt.Printf("%s.json\n", recPath) 302 304 os.MkdirAll(filepath.Dir(recPath), os.ModePerm) 303 305 if err != nil {